redis缓存穿透和击穿x雪崩
时间: 2025-02-24 08:16:17 浏览: 32
### Redis缓存穿透、击穿和雪崩及其解决方案
#### 缓存穿透 (Cache Penetration)
当查询的数据在数据库中不存在,每次请求都会穿透到数据库层,造成大量无意义的查询压力。这种情况不仅浪费资源,还可能被恶意利用进行攻击。
为了防止这种现象的发生,可以采取如下措施:
- **布隆过滤器**:使用布隆过滤器来判断数据是否存在,如果布隆过滤器返回false,则可以直接断定该键一定不存在于缓存与数据库之中[^1]。
- **空对象缓存**:对于确实不存在的数据项,在缓存中存储一个特殊的标记(如`null`),并设置较短的有效期。这样下次再遇到相同的查询时就可以直接命中缓存而无需访问数据库。
```python
import redis
r = redis.Redis()
def get_data_with_null_object(key):
data = r.get(key)
if not data:
# 假设db_get是从数据库获取数据的方法
db_result = db_get(key)
if not db_result:
r.setex(key, 60, "NULL") # 设置过期时间为60秒
return db_result
elif data.decode('utf-8') == 'NULL':
return None
else:
return data
```
#### 缓存击穿 (Cache Breakdown)
某个热点key突然失效,此时大量的并发请求会瞬间打到数据库上,给数据库带来巨大压力。为了避免这一情况发生,可采用以下方法:
- **加锁机制**:通过分布式锁控制同一时间只有一个线程能够更新缓存中的特定条目;其他线程则等待直到新的值已经被加载回缓存为止。
- **设置随机有效期**:为热Key设定带有轻微波动范围的时间戳作为其TTL(Time To Live),从而减少因多个实例同时到期而导致的大规模刷新操作的可能性。
```python
from threading import Lock
lock_dict = {}
def set_value_with_lock(redis_client, key, value, ttl=None):
lock_key = f'lock:{key}'
with Lock() as lock:
acquired = False
try:
while True:
if not redis_client.exists(lock_key):
acquired = bool(redis_client.setnx(lock_key, 1))
if acquired:
break
time.sleep(0.1) # 尝试获得锁
redis_client.set(key, value, ex=ttl or random.randint(90, 120)) # TTL带有一些随机性
finally:
if acquired:
redis_client.delete(lock_key)
def get_or_set_cache(redis_client, key, fetch_func, ttl=None):
result = redis_client.get(key)
if result is None:
set_value_with_lock(redis_client, key, fetch_func(), ttl)
result = redis_client.get(key)
return result
```
#### 缓存雪崩 (Cache Avalanche)
由于某些原因导致大量缓存在几乎相同时间内集体失效,进而引发对后端服务的巨大冲击。针对此问题有几种常见处理方式:
- **分片策略**:将不同类型的业务逻辑按照一定的规则分配至不同的Redis节点保存,即使部分机器出现问题也不会影响整个系统的正常运作。
- **限流降级**:引入熔断器模式,在极端情况下自动拒绝超出服务能力之外的新请求,保护核心功能不受损害。
```python
class RateLimiter(object):
def __init__(self, rate_limit_per_minute):
self.rate_limit_per_minute = rate_limit_per_minute
self.requests_in_last_min = []
def allow_request(self):
current_time = int(time.time())
one_minute_ago = current_time - 60
filtered_requests = list(filter(lambda t: t >= one_minute_ago,
self.requests_in_last_min))
if len(filtered_requests) < self.rate_limit_per_minute:
self.requests_in_last_min.append(current_time)
return True
else:
return False
rate_limiter = RateLimiter(rate_limit_per_minute=100)
if rate_limiter.allow_request():
process_user_request()
else:
respond_with_error("Too many requests.")
```
阅读全文
相关推荐

















