活动介绍
file-type

Java实现的cron_manager定时任务管理器

ZIP文件

下载需积分: 14 | 45KB | 更新于2024-11-01 | 76 浏览量 | 0 下载量 举报 收藏
download 立即下载
cron是一个用于Unix和类Unix系统的定时任务调度器,它允许用户在指定的时间和日期运行预定义的命令或脚本。在分布式应用环境中,对定时任务的管理变得尤为重要,因为它可以用来执行各种周期性任务,如数据备份、日志轮转、定时清理、定时发送邮件等。 在本设计中,我们需要实现一个集中管理器,其主要功能包括调度cron作业、检查cron作业状态、处理重试逻辑以及监控任务执行情况。这样的系统不仅需要可靠,而且要能够在分布式环境下良好运作。以下是针对标题和描述中提到的知识点的详细解释: 1. 定时任务调度:在Unix/Linux系统中,cron作业通常由cron守护进程负责调度。系统管理员可以使用crontab命令来添加、修改或删除定时任务。每个cron作业都由五个时间字段和一个要执行的命令组成,时间字段分别代表分钟、小时、日、月和星期几。 2. 分布式应用:在分布式应用环境中,多个计算节点需要协同工作。这就意味着定时任务管理器需要能够在多个节点之间同步和分发任务。这通常涉及到网络通信和分布式存储,以便在不同节点之间共享任务信息。 3. 可靠性:对于任何任务调度器而言,可靠性是至关重要的。这包括确保任务不会因为系统故障而丢失,以及能够在出现错误时进行恢复。为了提高可靠性,可以采用主从架构、消息队列、日志记录和事务处理等技术。 4. 任务状态检查:定时任务管理器需要能够实时监控任务的执行状态。这包括任务是否成功执行、失败原因以及下次执行的时间点。状态信息可以帮助运维人员及时发现并处理潜在的问题。 5. 重试逻辑:在任务执行失败时,系统需要能够根据预设的规则进行重试。这可能涉及重试间隔、重试次数限制等参数的配置。合理配置重试逻辑可以减少因暂时性错误而导致的任务失败。 6. 监控:定时任务管理器需要提供监控功能,以便能够跟踪任务的执行情况,包括执行时间、执行频率、以及历史的成功和失败记录。通过监控工具,可以实现对任务执行效率和效果的分析。 7. 使用Java开发:考虑到本系统需要使用Java语言进行开发。Java是一种广泛使用的编程语言,具有跨平台、面向对象、安全性高等特点。使用Java开发可以方便地部署在不同的系统环境中,并且能够利用丰富的开源库和框架来提高开发效率。 8. 压缩包子文件的文件名称列表:"cron_manager-master":这表明实际的项目文件或代码库是在一个压缩包中,文件名"cron_manager-master"暗示这是一个主版本或源代码包,可能包含了与cron作业管理相关的代码文件、配置文件、文档和可能的构建脚本。 综上所述,本设计中的cron_manager将是一个健壮的系统,它通过提供集中管理、分布式支持、高可靠性、状态检查、重试机制和监控功能,以确保在复杂的分布式应用环境中有效和可靠地执行定时任务。

相关推荐

filetype

URL = "mysql+aiomysql://root:[email protected]/prison" class AppSettings(BaseModel): """App Settings""" API_DEBUG: Optional[bool] = False APP_INFO: Optional[Dict[str, Any]] HTTP_TIMEOUT: Optional[float] = 4.0 DATACENTER_ID: Optional[int] = 1 RBAC_MODEL_PATH: Optional[str] = 'rbac_model.conf' RBAC_POLICY_PATH: Optional[str] = 'rbac_policy.csv' PUBKEY: Optional[str] SECRET_KEY: Optional[str] ALGORITHM: Optional[str] = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES: Optional[int] = 30 class MilvusSettings(BaseModel): """数据库配置""" MILVUS_DRIVER: str = "http" MILVUS_HOST: str = "192.168.1.159" MILVUS_PORT: int = "19530" MILVUS_USER: str = "root" MILVUS_PASSWORD: str = "Milvus" def get_milvus_url(self) -> Union[str]: return { "uri": f"{self.MILVUS_DRIVER}://{self.MILVUS_HOST}:{self.MILVUS_PORT}", # "token": f"{self.MILVUS_USER}:{self.MILVUS_PASSWORD}" } class DatabaseSettings(BaseModel): """数据库配置""" DATABASE_DRIVER: str = "mysql+aiomysql" DATABASE_HOST: str DATABASE_PORT: int DATABASE_USER: str DATABASE_PASSWORD: str DATABASE_NAME: str DATABASE_URL: Optional[str] = None def get_dburl(self) -> str: return URL class RedisSettings(BaseModel): """Redis 配置""" REDIS_HOST: str REDIS_PORT: int REDIS_PASSWORD: str REDIS_DB: int REDIS_URL: Optional[RedisDsn] = None REDIS_MINSIZE: int = 1 REDIS_MAXSIZE: int = 10 REDIS_ENCODING: str = "utf-8" def get_redis_address(self) -> Tuple[str, Dict[str, Any]]: opts = dict( db=self.REDIS_DB, minsize=self.REDIS_MINSIZE, maxsize=self.REDIS_MAXSIZE, encoding=self.REDIS_ENCODING, ) if self.REDIS_PASSWORD: opts.update({"password": self.REDIS_PASSWORD}) if self.REDIS_URL: return self.REDIS_URL, opts else: return f"redis://10.8.0.2:6379/0", opts class MongoDBSettings(BaseModel): MONGO_HOST: Optional[str] MONGO_PORT: Union[Optional[int], str] MONGO_USER: Optional[str] MONGO_PASSWORD: Optional[str] MONGO_URL: Optional[str] = None MONGO_DB: Optional[str] def connect_opts(self) -> Dict[str, Any]: if self.MONGO_URL: return {"host": self.MONGO_URL} return { "host": self.MONGO_HOST, "port": self.MONGO_PORT, "username": self.MONGO_USER, "password": self.MONGO_PASSWORD, } class AMQPSettings(BaseModel): AMQP_HOST: Optional[str] AMQP_PORT: Union[Optional[int], str] AMQP_USER: Optional[str] AMQP_PASSWORD: Optional[str] AMQP_URL: Optional[str] = None AMQP_QUEUE: Optional[str] class OSSSettings(BaseModel): OSS_ENDPOINT: str OSS_ACCESS_KEY_ID: Optional[str] OSS_ACCESS_KEY_SECRET: Optional[str] OSS_BUCKET_NAME: str class MINIIOSettings(BaseModel): MINIIO_HOST: Optional[str] MINIIO_PORT: Optional[str] = "9000" MINIIO_ACCESS_KEY: Optional[str] MINIIO_SECRET_KEY: Optional[str] MINIIO_BUCKET_NAME: Optional[str] MINIIO_DOINGS_PATH: Optional[str] MINIIO_SHOPPING_PATH: Optional[str] class SettingsMixin( BaseSettings, MilvusSettings, DatabaseSettings, RedisSettings, MongoDBSettings, AMQPSettings, OSSSettings, MINIIOSettings, AppSettings, ): """Settings Mixin""" T = TypeVar("T") def singleton(cls: Type[T]) -> Callable[[Any, Any], T]: """单例模式装饰器""" instances: Dict[Type[T], T] = {} def get_instance(*args: Any, **kwargs: Any) -> T: if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] return get_instance @singleton class Settings(SettingsMixin): """基础配置""" env: str = Field(default="dev", env="ENV") class Config: env_prefix = "" env_file: str = os.getenv("CONFIG_PATH", "conf/online.conf") env_file_encoding: str = "utf-8" @property def debug(self) -> bool: if self.env == "prod": return False return True @property def milvus(self) -> MilvusSettings: return MilvusSettings(**self.dict()) @property def db(self) -> DatabaseSettings: return DatabaseSettings(**self.dict()) @property def redis(self) -> RedisSettings: return RedisSettings(**self.dict()) @property def api(self) -> APISettings: return APISettings(_env_file=self.Config.env_file) @property def app(self) -> AppSettings: return AppSettings(**self.dict()) background_tasks = set() # 定义异步打印函数 async def async_print(msg): print(msg) # 实际打印操作仍是同步的 await asyncio.sleep(0) # 主动让出控制权,实现异步效果 async def register_async_cron() -> None: task = asyncio.create_task(async_cron()) # 将 task 添加到集合中,以保持强引用,避免垃圾回收 background_tasks.add(task) # 等待所有任务完成 # await asyncio.gather(*[task]) # 为了防止 task 被永远保持强引用,而无法被垃圾回收 # 让每个 task 在结束后将自己从集合中移除: # task.add_done_callback(background_tasks.discard) await asyncio.sleep(0) # 主动让出控制权,实现异步效果 async def register_redis() -> None: address = "redis://10.8.0.2:6379/0" redis: aioredis.Redis = await aioredis.create_redis_pool(address) return redis async def async_cron(): async def get_database() -> AsyncGenerator[AsyncSession, None]: db = AsyncSession(expire_on_commit=False) try: yield db except Exception as e: await db.rollback() raise e finally: await db.close() seconds = 5 db = AsyncSession(expire_on_commit=False) redis_client: aioredis.Redis = await register_redis() logger.info("开始执行定时任务") # 配置Redis实例连接 redis_instances = [ f'redis://:[email protected]:6379/0' ] # 初始化锁管理器 lock_manager = Aioredlock(redis_instances) while True: await asyncio.sleep(seconds) try: await lock_manager.lock("resource_name", lock_timeout=seconds) logger.info(f"成功获取锁,正在执行受保护的任务...") data: Equipment = Equipment() list_content: Optional[List[Equipment]] = await data.list(db, 0, 200, preload=False) for item in list_content: static = "offline" equ = Equipment(**item.to_dict()) tester = ICMPSocketPing(equ.ip_address, timeout=1, retry=1) redis_icmp_ping = await redis_client.get(f"ipadder_{equ.ip_address}") # if not redis_icmp_ping: redis_icmp_ping = "offline" if tester.ping(): # "online" static = "online" if redis_icmp_ping != static: await equ.update_part(session=db, detail=False, **{'id': equ.id, 'status': EquipmentStatusEnum.ONLINE}) else: # "offline" static = "offline" if redis_icmp_ping != static: await equ.update_part(session=db, detail=False, **{'id': equ.id, 'status': EquipmentStatusEnum.OFFLINE}) pass logger.info(f'摄像头 {equ.ip_address} 状态 {static}') await redis_client.setex(f"ipadder_{equ.ip_address}", seconds, static) except LockError as e: # Lock not acquired continue await asyncio.sleep(0) # 主动让出控制权,实现异步效果 await lock_manager.destroy() async def main(): # 创建异步任务并发执行 task1 = asyncio.create_task(register_async_cron()) # 等待所有任务完成 await task1 if __name__ == "__main__": asyncio.run(main()) # 运行异步主函数 print("Hello World!")简述下列代码输出结果

filetype

import asyncio import websockets import json import struct import datetime import logging import traceback import mysql.connector from mysql.connector import pooling from mysql.connector import errorcode from typing import Dict, List, Optional # ====================== # 配置参数 # ====================== CONFIG = { "AUTH_ENABLED": False, # 是否启用认证 "APP_SECRET": "2d0c7a6f35a74721bdae2b0077735938", # AppSecretKey "FACTORY": "ztzbStdTag", # 工厂标识 "VALID_TOKEN": { # 有效Token "appSecret": "BG7DpcHE4oBjsD5x6lkI8KQpfRSWDF6M", "fullCode": "ztzbStdTagJsonPush" }, "WEBSOCKET_HOST": "0.0.0.0", "WEBSOCKET_PORT": 8765, "MAX_CONNECTIONS": 100, "DB_WRITE_BATCH_SIZE": 50, "DB_WRITE_TIMEOUT": 5.0, # MySQL数据库配置 "MYSQL": { "HOST": "192.168.191.11", "PORT": 3306, "USER": "root", "PASSWORD": "Adu@123.", "DATABASE": "CF_HIDB", "POOL_SIZE": 5, "POOL_NAME": "sensor_pool", "POOL_RESET_SESSION": True } } # ====================== # 极简日志设置 # ====================== def setup_logging(): """配置极简日志系统""" logger = logging.getLogger("SensorServer") logger.setLevel(logging.INFO) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) logger.addHandler(console_handler) return logger # 全局日志器 logger = setup_logging() # ====================== # MySQL数据库管理 # ====================== class MySQLDatabaseManager: def __init__(self): self.data_buffer = [] self.last_write_time = datetime.datetime.now() self.connection_pool = self._create_connection_pool() self._init_db() def _create_connection_pool(self) -> pooling.MySQLConnectionPool: """创建MySQL连接池""" try: return pooling.MySQLConnectionPool( pool_name=CONFIG["MYSQL"]["POOL_NAME"], pool_size=CONFIG["MYSQL"]["POOL_SIZE"], pool_reset_session=CONFIG["MYSQL"]["POOL_RESET_SESSION"], host=CONFIG["MYSQL"]["HOST"], port=CONFIG["MYSQL"]["PORT"], user=CONFIG["MYSQL"]["USER"], password=CONFIG["MYSQL"]["PASSWORD"], database=CONFIG["MYSQL"]["DATABASE"] ) except mysql.connector.Error as err: logger.error(f"MySQL连接池创建失败: {err}") raise def _init_db(self) -> None: """初始化数据库表结构""" try: conn = self.connection_pool.get_connection() cursor = conn.cursor() # 创建设备表 cursor.execute(''' CREATE TABLE IF NOT EXISTS devices ( id INT AUTO_INCREMENT PRIMARY KEY, device_id VARCHAR(50) NOT NULL UNIQUE, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ''') # 创建传感器数据表 cursor.execute(''' CREATE TABLE IF NOT EXISTS sensor_data ( id BIGINT AUTO_INCREMENT PRIMARY KEY, device_id INT NOT NULL, timestamp TIMESTAMP NOT NULL, sensor_values JSON NOT NULL, received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_device_id (device_id), INDEX idx_timestamp (timestamp), FOREIGN KEY (device_id) REFERENCES devices(id) ON DELETE CASCADE ) ''') conn.commit() conn.close() except mysql.connector.Error as err: logger.error(f"数据库初始化失败: {err}") def _get_or_create_device(self, conn, device_id: str) -> int: """获取或创建设备记录""" cursor = conn.cursor() try: # 尝试获取设备ID cursor.execute( "SELECT id FROM devices WHERE device_id = %s", (device_id,) ) device = cursor.fetchone() if device: device_id = device[0] # 更新最后出现时间 cursor.execute( "UPDATE devices SET last_seen = CURRENT_TIMESTAMP WHERE id = %s", (device_id,) ) return device_id else: # 创建新设备 cursor.execute( "INSERT INTO devices (device_id) VALUES (%s)", (device_id,) ) return cursor.lastrowid except mysql.connector.Error as err: logger.error(f"设备操作失败: {err}") raise def insert_sensor_data(self, device_id: str, timestamp: str, sensor_values: List[float]) -> None: """将数据添加到缓冲区""" # 保留小数点后两位 rounded_values = [round(value, 2) for value in sensor_values] self.data_buffer.append({ 'device_id': device_id, 'timestamp': timestamp, 'sensor_values': rounded_values }) # 检查是否满足批量写入条件 now = datetime.datetime.now() buffer_full = len(self.data_buffer) >= CONFIG["DB_WRITE_BATCH_SIZE"] timeout_reached = (now - self.last_write_time).total_seconds() >= CONFIG["DB_WRITE_TIMEOUT"] if buffer_full or timeout_reached: self.flush_buffer() def flush_buffer(self) -> None: """将缓冲区数据写入数据库""" if not self.data_buffer: return try: conn = self.connection_pool.get_connection() cursor = conn.cursor() # 批量写入设备数据 device_ids = [] for data in self.data_buffer: # 获取或创建设备ID device_db_id = self._get_or_create_device( conn, data['device_id'] ) device_ids.append(device_db_id) # 准备批量插入传感器数据 sensor_data = [] for i, data in enumerate(self.data_buffer): sensor_data.append(( device_ids[i], data['timestamp'], json.dumps(data['sensor_values']) )) # 批量插入传感器数据 cursor.executemany( "INSERT INTO sensor_data (device_id, timestamp, sensor_values) " "VALUES (%s, %s, %s)", sensor_data ) conn.commit() logger.info(f"写入 {len(self.data_buffer)} 条数据") # 清空缓冲区 self.data_buffer.clear() self.last_write_time = datetime.datetime.now() except mysql.connector.Error as err: if err.errno == errorcode.ER_TRUNCATED_WRONG_VALUE: # 日期时间错误,尝试修正 logger.warning("检测到日期时间格式错误,尝试修正...") for data in self.data_buffer: try: # 尝试使用当前时间 data['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") except: pass # 重试写入 self.flush_buffer() else: logger.error(f"数据库写入失败: {err}") except Exception as e: logger.error(f"数据库操作异常: {e}") finally: if 'conn' in locals() and conn.is_connected(): conn.close() # ====================== # 数据包解析 # ====================== def parse_timestamp(timestamp_bytes: bytes) -> str: """解析时间戳字节数据为字符串""" try: timestamp_str = timestamp_bytes.decode('ascii').strip() # 提取日期时间部分 (YYYY-MM-DD HH:MM:SS) return timestamp_str[:19] except: # 使用当前时间作为默认值 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") def parse_sensor_values(data: bytes, start_index: int) -> List[float]: """解析传感器值列表""" values = [] index = start_index max_count = (len(data) - index) // 4 for _ in range(max_count): if index + 4 > len(data): break value_bytes = data[index:index + 4] index += 4 try: # 尝试小端序解析 value = struct.unpack('<f', value_bytes)[0] values.append(value) except: try: # 尝试大端序解析 value = struct.unpack('>f', value_bytes)[0] values.append(value) except: # 解析失败,跳过此值 continue return values def parse_binary_packet(raw_data: bytes) -> Optional[Dict]: """解析二进制数据包""" try: # 1. 验证包头 HEADER = b'$\x00\x08\x00\x00\x00' if not raw_data.startswith(HEADER): return None # 2. 解析设备ID DEVICE_ID_START = len(HEADER) DEVICE_ID_END = DEVICE_ID_START + 8 device_id_bytes = raw_data[DEVICE_ID_START:DEVICE_ID_END] # 3. 解析时间戳 TIMESTAMP_START = DEVICE_ID_END TIMESTAMP_END = TIMESTAMP_START + 19 timestamp_bytes = raw_data[TIMESTAMP_START:TIMESTAMP_END] # 4. 解析传感器值 SENSOR_DATA_START = TIMESTAMP_END sensor_values = parse_sensor_values(raw_data, SENSOR_DATA_START) # 5. 返回解析结果 return { 'device_id': device_id_bytes.decode('ascii', errors='ignore').strip(), 'timestamp': parse_timestamp(timestamp_bytes), 'sensor_values': sensor_values } except Exception: return None # ====================== # WebSocket 服务器 (带认证) # ====================== class SensorWebSocketServer: def __init__(self, host: str, port: int, db_manager: MySQLDatabaseManager): self.host = host self.port = port self.db_manager = db_manager self.connections = set() self.server = None self.authenticated_clients = set() # 已认证的客户端集合 async def handler(self, websocket, path: str) -> None: """处理WebSocket连接""" self.connections.add(websocket) client_ip = websocket.remote_address[0] if websocket.remote_address else "unknown" logger.info(f"客户端连接: {client_ip}") try: # 如果启用认证,先进行认证 if CONFIG["AUTH_ENABLED"]: # 等待认证消息 auth_message = await websocket.recv() try: auth_data = json.loads(auth_message) # 验证Token if (auth_data.get("appSecret") == CONFIG["VALID_TOKEN"]["appSecret"] and auth_data.get("fullCode") == CONFIG["VALID_TOKEN"]["fullCode"]): self.authenticated_clients.add(websocket) await websocket.send("AuthSuccess") logger.info(f"客户端认证成功: {client_ip}") else: await websocket.send("AuthFailed: Invalid token") logger.warning(f"客户端认证失败: {client_ip}") return except: await websocket.send("AuthFailed: Invalid format") logger.warning(f"无效的认证消息: {client_ip}") return # 处理数据消息 async for message in websocket: if not isinstance(message, bytes): continue # 如果启用认证但客户端未认证,跳过处理 if CONFIG["AUTH_ENABLED"] and websocket not in self.authenticated_clients: continue parsed_data = parse_binary_packet(message) if parsed_data: # 存储到数据库 self.db_manager.insert_sensor_data( parsed_data['device_id'], parsed_data['timestamp'], parsed_data['sensor_values'] ) except websockets.exceptions.ConnectionClosed: logger.info(f"客户端断开: {client_ip}") except Exception as e: logger.error(f"处理客户端时出错: {e}") finally: if websocket in self.connections: self.connections.remove(websocket) if websocket in self.authenticated_clients: self.authenticated_clients.remove(websocket) async def start(self) -> None: """启动WebSocket服务器""" self.server = await websockets.serve( self.handler, self.host, self.port, max_size=2 ** 20, # 1MB ping_interval=60, ping_timeout=30, close_timeout=10, max_queue=CONFIG["MAX_CONNECTIONS"] ) logger.info(f"服务器启动: ws://{self.host}:{self.port}") logger.info(f"认证状态: {'启用' if CONFIG['AUTH_ENABLED'] else '禁用'}") async def stop(self) -> None: """停止WebSocket服务器""" if self.server: self.server.close() await self.server.wait_closed() logger.info("服务器已停止") # ====================== # 主程序 # ====================== async def main(): # 初始化MySQL数据库管理器 db_manager = MySQLDatabaseManager() # 创建WebSocket服务器 server = SensorWebSocketServer( CONFIG["WEBSOCKET_HOST"], CONFIG["WEBSOCKET_PORT"], db_manager ) # 启动服务器 await server.start() try: # 运行主循环 while True: await asyncio.sleep(5) # 定期刷新数据库缓冲区 db_manager.flush_buffer() except asyncio.CancelledError: pass except KeyboardInterrupt: logger.info("收到停止信号") finally: # 确保所有缓冲数据都写入数据库 db_manager.flush_buffer() await server.stop() # 启动程序 if __name__ == "__main__": logger.info("=" * 40) logger.info("传感器数据采集服务器启动") logger.info(f"时间: {datetime.datetime.now()}") logger.info(f"MySQL主机: {CONFIG['MYSQL']['HOST']}:{CONFIG['MYSQL']['PORT']}") logger.info(f"数据库: {CONFIG['MYSQL']['DATABASE']}") logger.info("=" * 40) try: asyncio.run(main()) except Exception as e: logger.error(f"服务器异常停止: {e}") finally: logger.info("=" * 40) logger.info("服务器已停止运行") logger.info("=" * 40) 部署到NAS的docker里

filetype
秦风明
  • 粉丝: 50
上传资源 快速赚钱