#!/usr/bin/env python #-*- coding:utf-8 -*- import time import redis import logging conn = redis.Redis() LOGLEVEL = { logging.DEBUG: "debug", logging.INFO: "info", logging.WARNING: "warning", logging.ERROR: "error" } # 记录最新日志 def log_to_redis(message, level=logging.INFO): log_list = "log:recent:" + LOGLEVEL[level] # 请在下面完成要求的功能 #********* Begin *********# #********* End *********# # 记录常见日志 def log_to_redis_by_frequency(message, level=logging.INFO): log_key = "log:count:" + LOGLEVEL[level] start_key = log_key + ":start" # 请在下面完成要求的功能 #********* Begin *********# #********* End *********#在Begin-End区域编写log_to_redis(message, level=logging.INFO):函数,实现记录最新日志的功能,具体参数与要求如下: 方法参数message为日记内容; 定义一个 redis 列表,列表key为log:recent:info, 列表value为时间加-加message,要求每次新的日记内容需存在列表头部,并只保留最新的5000个元素。 注意:value中的时间取time.asctime() 编写log_to_redis_by_frequency(message, level=logging.INFO):函数,实现统计常见日志次数及写入最新日记、并对历史日志记录进行迁移,具体参数与要求如下: 在方法log_to_redis_by_frequency中,方法参数message为日记内容; 定义一个 redis 列表,列表key为log:count:info,列表value为message; 定义一个 redis 字符串,字符串key为log:count:info:start,字符串value为当前小时数; 对历史日志记录进行迁移功能实现:当value 小于当前时间小时数,则重命名redis列表,修改列表key为log:count:info:lasthour,修改redis 字符串值为当前时间小时数; 统计常见日志次数及写入最新日记功能实现:存储redis列表的成员为message,分数自增1,并调用log_to_redis方法存最新消息; 考虑到历史日记记录迁移的并发问题,使用swatch监控log:count:info,如果log:count:info已经修改,则捕获redis.exceptions.WatchError,从而退出。
时间: 2025-05-16 09:45:16 浏览: 16
### 使用 Python 和 Redis 实现 `log_to_redis` 和 `log_to_redis_by_frequency`
#### 1. 实现 `log_to_redis` 函数
该函数的主要功能是将最新的日志记录存储到 Redis 的列表中,并确保列表的大小不超过设定的最大长度(例如5000)。如果超出,则移除最早的日志项。
以下是实现代码:
```python
import redis
def log_to_redis(redis_client, list_key, max_size, message):
# 将消息推送到 Redis 列表的右侧 (新日志总是追加到最后)
redis_client.rpush(list_key, message)
# 如果列表大小超过了最大限制,删除最旧的日志项
if redis_client.llen(list_key) > max_size:
redis_client.lpop(list_key) # 移除左侧最早的一条日志
```
上述代码实现了以下逻辑:
- 调用 `rpush` 方法将新的日志消息加入 Redis 列表[^1]。
- 使用 `llen` 获取当前列表长度,当其超过预设的最大值时调用 `lpop` 删除最老的消息[^2]。
---
#### 2. 实现 `log_to_redis_by_frequency` 函数
此函数的目标是对特定事件的发生频率进行统计,并在达到一定阈值时采取行动(如迁移历史日志),同时解决可能存在的并发问题。为了完成这些目标,可以采用如下方法:
##### 主要设计思路
- **计数器**:使用 Redis 中的字符串类型作为简单的计数器来跟踪某个键对应的事件发生次数。
- **过期时间**:为每个计数器设置短暂的有效期限,以便自动清除陈旧的数据。
- **锁机制**:通过 Redis 提供的分布式锁特性防止多个客户端同时修改共享资源引发冲突。
下面是具体实现代码:
```python
import time
from redis.exceptions import LockError
def log_to_redis_by_frequency(redis_client, event_key_prefix, threshold, interval_seconds, migrate_callback=None):
timestamp = int(time.time()) # 当前时间戳
# 构造基于时间段的时间窗口键名
window_start_time = (timestamp // interval_seconds) * interval_seconds
key = f"{event_key_prefix}:{window_start_time}"
with redis_client.lock(f"lock:{key}", timeout=5): # 加锁避免竞争条件
try:
current_count = redis_client.get(key) or b'0'
count = int(current_count.decode('utf-8')) + 1
# 更新计数值并设置有效期
redis_client.setex(key, interval_seconds, count)
# 达到阈值则触发回调函数
if count >= threshold and callable(migrate_callback):
migrate_callback(event_key_prefix, window_start_time, count)
except LockError as e:
print(f"Lock acquisition failed due to {e}")
```
这段代码涵盖了以下几个方面:
- 计算当前时间所属的时间窗口起始时刻,并以此构建唯一的键名称[^3]。
- 增加对应键的计数器值并通过 `setex` 设置有限存活周期以减少内存占用[^4]。
- 若检测到某段时间内的访问频次达到了给定上限,则可选地执行自定义的历史数据迁移或其他业务逻辑操作[^5]。
- 运用了 Redis 锁服务保障多线程或多进程环境下的安全性[^6]。
---
### 注意事项
- 上述两部分均依赖于已初始化好的 Redis 客户端实例 (`redis.StrictRedis`) 来连接远程数据库服务器。
- 对于高负载场景下频繁写入的情况考虑优化批量提交方式或者引入流水线技术提高效率。
阅读全文
相关推荐

















