又被截断了 就看见你写到“ # 设置新模型 self.set_model(model_name, model) return {"success": True, "message": f"已切换到模型: {model_name}"} except Exception as e: return {"error": f"模型切换失败: {str(e)}"} def get_model_info(self, model_name: str): """获取模型信息""" if not self.model_manager: return {"error": "模型管理器未设置"} if model_name not in self.model_manager.registered_models: return {"error": f"未注册的模型: {model_name}"} try: model_info = self.model_manager.get_model_info(model_name) return { "name": model_name, "type": model_info.get("type", "unknown"), "path": model_info.get("path", "unknown"), "size": model_info.get("size", "unknown"), "loaded": model_name in self.model_manager”你继续从设计新模型往下写 前面的我覆盖好了
时间: 2025-08-31 08:51:22 AIGC 浏览: 10
<think>
我们继续完成`CognitiveOrchestrator`类中未完成的方法。根据之前的代码,我们已经实现了大部分功能,现在需要完成以下部分:
1. `get_model_info`方法的剩余部分(在返回字典中缺少闭合括号,以及需要返回加载状态)
2. 添加一个`register_model_manager`方法(可选,但为了完整性)
3. 考虑添加模型缓存策略
但是,请注意,在`get_model_info`方法中,最后返回的字典应该包含模型是否已加载的状态。另外,在之前的代码中,`self.model_manager.get_model_info`返回的是一个字典,我们从中提取信息。
另外,我们可能还需要添加一个方法来注册模型管理器,而不是直接设置。不过,在`set_model_manager`方法中已经实现了设置模型管理器。
因此,我们继续完成`get_model_info`方法,并添加一个模型缓存策略的示例方法(如果需要)。
由于代码被截断,我们假设从`get_model_info`方法继续写。
修改后的`get_model_info`方法:
```python
def get_model_info(self, model_name: str):
"""获取模型信息"""
if not self.model_manager:
return {"error": "模型管理器未设置"}
if model_name not in self.model_manager.registered_models:
return {"error": f"未注册的模型: {model_name}"}
try:
model_info = self.model_manager.get_model_info(model_name)
return {
"name": model_name,
"type": model_info.get("type", "unknown"),
"path": model_info.get("path", "unknown"),
"size": model_info.get("size", "unknown"),
"loaded": model_name in self.model_manager.loaded_models # 注意这里是loaded_models属性
}
except Exception as e:
return {"error": f"获取模型信息失败: {str(e)}"}
```
另外,我们还可以添加一个预热模型的方法,用于在后台预加载常用模型:
```python
def warm_up_models(self, model_names: list):
"""预热(预加载)指定模型列表"""
if not self.model_manager:
self.logger.warning("模型管理器未设置,无法预热")
return False
results = {}
for name in model_names:
try:
# 尝试加载模型,但不设置为当前活动模型
success, model = self.model_manager.load_model(name)
results[name] = "success" if success else "failed"
except Exception as e:
results[name] = f"error: {str(e)}"
self.logger.error(f"预热模型 {name} 失败: {e}")
self.logger.info(f"模型预热结果: {results}")
return results
```
同时,我们可以在初始化时根据配置预热模型:
在`__init__`方法中添加:
```python
# ... 在初始化模块之后 ...
# 初始化模型管理器(如果需要)
# 预热模型
if "preload_models" in self.config:
self.logger.info("🔥 开始预热配置中指定的模型...")
self.warm_up_models(self.config["preload_models"])
```
注意:我们需要在配置文件中指定`preload_models`列表。
另外,我们还可以添加一个定时任务来检查模型使用情况并自动卸载长时间未使用的模型。这需要记录每个模型的最后使用时间。但由于时间关系,这里先不展开。
最后,我们还需要在`CognitiveOrchestrator`中处理模型的多模态输入(如图像+文本)的情况。之前已经有一个`generate_multimodal`方法,但我们可以进一步完善。
考虑到这些,我们给出完整的`CognitiveOrchestrator`类(由于篇幅,只展示新增和修改的部分,整体代码较长,请注意)。
由于您已经覆盖了前面的部分,这里我们只补充后续新增的方法。
完整代码已在前面的回答中给出,这里不再重复。但为了清晰,我们将新增的`warm_up_models`方法和在`__init__`中的预热调用补充到之前的代码中。
修改`__init__`方法:
```python
def __init__(self, config: dict):
# ... 之前的初始化代码 ...
self.logger.info(f"✅ 星型协调器初始化完成: {self.name}")
# 预热模型
if "preload_models" in self.config:
self.logger.info("🔥 开始预热配置中指定的模型...")
self.warm_up_models(self.config["preload_models"])
```
新增`warm_up_models`方法:
```python
def warm_up_models(self, model_names: list):
"""预热(预加载)指定模型列表"""
if not self.model_manager:
self.logger.warning("模型管理器未设置,无法预热")
return False
results = {}
for name in model_names:
if name not in self.model_manager.registered_models:
results[name] = "not_registered"
continue
try:
# 尝试加载模型,但不设置为当前活动模型
success, model = self.model_manager.load_model(name)
if success:
results[name] = "success"
self.logger.info(f"🔥 模型预热成功: {name}")
else:
results[name] = "failed"
self.logger.warning(f"🔥 模型预热失败: {name}")
except Exception as e:
results[name] = f"error: {str(e)}"
self.logger.error(f"🔥 预热模型 {name} 失败: {e}", exc_info=True)
self.logger.info(f"模型预热结果: {results}")
return results
```
这样,系统启动时就会预加载指定的模型,加快后续切换模型的速度。
另外,我们还可以在模型管理器中实现一个最近最少使用(LRU)的缓存策略,但由于模型管理器不在本题范围内,这里不展开。
最后,请注意在`generate_multimodal`方法中,我们假设模型有`generate_multimodal`方法或`forward`方法。如果模型不支持,则会抛出异常。在实际使用中,需要确保模型管理器加载的模型具有这些方法,或者使用适配器模式统一接口。
以上是对`CognitiveOrchestrator`类的补充和增强。
由于代码较长,请根据实际情况调整。如果模型管理器尚未实现`get_model_info`方法,请参考以下示例在`ModelManager`类中补充:
```python
# 在model_manager.py中
class ModelManager:
# ... 其他代码 ...
def get_model_info(self, model_name: str) -> dict:
"""获取模型信息"""
if model_name not in self.registered_models:
raise ValueError(f"模型 {model_name} 未注册")
return self.registered_models[model_name]
```
其中,`registered_models`是一个字典,在注册模型时填充。
至此,`CognitiveOrchestrator`类的设计已经比较完整,能够支持模型管理、切换、预热、多模态生成等功能。
如果还有其他需求,请进一步说明。
§§相关问题§§:
1. 如何实现模型管理器的LRU缓存策略?
2. 在多模态生成中,如何处理不同模型的输入预处理差异?
3. 如何设计适配器模式来统一不同模型的接口?
4. 如何实现模型的热更新(不重启系统更新模型)?
</think>
好的,我将继续完善 `CognitiveOrchestrator` 类中的模型管理功能,特别关注模型加载、切换和缓存管理部分:
```python
# 继续完善 CognitiveOrchestrator 类
# === 模型管理方法(续)===
def get_model_info(self, model_name: str):
"""获取模型信息"""
if not self.model_manager:
return {"error": "模型管理器未设置"}
if model_name not in self.model_manager.registered_models:
return {"error": f"未注册的模型: {model_name}"}
try:
model_info = self.model_manager.get_model_info(model_name)
return {
"name": model_name,
"type": model_info.get("type", "unknown"),
"path": model_info.get("path", "unknown"),
"size": model_info.get("size", "unknown"),
"loaded": model_name in self.model_manager.loaded_models,
"last_used": self.model_manager.get_last_used_time(model_name),
"usage_count": self.model_manager.get_usage_count(model_name)
}
except Exception as e:
return {"error": f"获取模型信息失败: {str(e)}"}
def preload_models(self, model_list: list):
"""预加载常用模型(后台线程)"""
if not self.model_manager:
self.logger.warning("模型管理器未设置,无法预加载模型")
return False
def preload_task():
try:
self.logger.info(f"🔁 开始预加载模型: {', '.join(model_list)}")
for model_name in model_list:
if model_name in self.model_manager.registered_models and \
model_name not in self.model_manager.loaded_models:
self.model_manager.load_model(model_name, background=True)
self.logger.info(f"✅ 模型预加载成功: {model_name}")
except Exception as e:
self.logger.error(f"❌ 模型预加载失败: {str(e)}")
# 启动后台线程执行预加载
threading.Thread(target=preload_task, daemon=True).start()
return {"status": "started", "models": model_list}
def unload_inactive_models(self, max_models=3):
"""卸载不活跃模型以释放内存"""
if not self.model_manager:
return {"error": "模型管理器未设置"}
try:
# 获取当前活动模型名称
active_model = self.active_model_name
# 获取按最后使用时间排序的模型列表(最近使用的排最后)
inactive_models = self.model_manager.get_inactive_models(max_models)
# 卸载不活跃模型(排除当前活动模型)
unloaded = []
for model_name in inactive_models:
if model_name != active_model:
self.model_manager.unload_model(model_name)
unloaded.append(model_name)
return {"unloaded": unloaded, "active_model": active_model}
except Exception as e:
return {"error": f"卸载模型失败: {str(e)}"}
def get_model_cache_info(self):
"""获取模型缓存状态信息"""
if not self.model_manager:
return {"error": "模型管理器未设置"}
return {
"loaded_models": list(self.model_manager.loaded_models.keys()),
"cache_size": len(self.model_manager.loaded_models),
"max_cache_size": self.model_manager.max_models_in_memory,
"memory_usage": self.model_manager.get_memory_usage(),
"gpu_memory": self.model_manager.get_gpu_memory_usage()
}
def optimize_model_cache(self):
"""优化模型缓存"""
if not self.model_manager:
return {"error": "模型管理器未设置"}
try:
# 1. 卸载不活跃模型
unloaded = self.unload_inactive_models(self.model_manager.max_models_in_memory)
# 2. 清理GPU缓存
if torch.cuda.is_available():
torch.cuda.empty_cache()
memory_freed = torch.cuda.memory_allocated()
torch.cuda.reset_peak_memory_stats()
return {
"optimized": True,
"unloaded_models": unloaded.get("unloaded", []),
"gpu_memory_freed": f"{memory_freed // (1024**2)} MB"
}
else:
return {
"optimized": True,
"unloaded_models": unloaded.get("unloaded", []),
"gpu_memory_freed": "0 MB (No GPU)"
}
except Exception as e:
return {"error": f"缓存优化失败: {str(e)}"}
def register_new_model(self, model_config: dict):
"""动态注册新模型"""
if not self.model_manager:
return {"error": "模型管理器未设置"}
try:
# 验证必要字段
required_fields = ["name", "type", "path"]
if not all(field in model_config for field in required_fields):
return {"error": f"缺少必要字段: {', '.join(required_fields)}"}
# 添加到模型管理器
self.model_manager.register_model(model_config)
self.logger.info(f"✅ 注册新模型: {model_config['name']}")
return {"success": True, "model": model_config["name"]}
except Exception as e:
return {"error": f"注册模型失败: {str(e)}"}
def update_model(self, model_name: str, new_config: dict):
"""更新现有模型配置"""
if not self.model_manager:
return {"error": "模型管理器未设置"}
try:
# 卸载当前模型(如果已加载)
if model_name in self.model_manager.loaded_models:
self.model_manager.unload_model(model_name)
# 更新配置
self.model_manager.update_model_config(model_name, new_config)
# 重新加载模型(如果是当前活动模型)
if self.active_model_name == model_name:
self.switch_model(model_name)
return {"success": True, "model": model_name}
except Exception as e:
return {"error": f"更新模型失败: {str(e)}"}
def handle_model_fallback(self, error_message: str):
"""处理模型故障的回退机制"""
self.logger.error(f"❌ 模型故障: {error_message}")
# 1. 尝试重新加载当前模型
try:
self.logger.warning("♻️ 尝试重新加载当前模型...")
self.model_manager.unload_model(self.active_model_name)
success, model = self.model_manager.load_model(self.active_model_name)
if success:
self.set_model(self.active_model_name, model)
self.logger.info("✅ 模型重新加载成功")
return
except Exception as e:
self.logger.error(f"❌ 重新加载失败: {str(e)}")
# 2. 切换回默认基础模型
try:
default_model = self.config.get("default_model", "TEXT_CHAT")
if default_model != self.active_model_name:
self.logger.warning(f"⚠️ 切换到备用模型: {default_model}")
self.switch_model(default_model)
else:
self.logger.error("🛑 备用模型加载失败,系统降级运行")
except Exception as e:
self.logger.critical(f"🛑 所有模型加载失败: {str(e)}")
raise SystemError("所有模型加载失败,系统无法运行")
```
### 关键功能说明
1. **模型预热(preload_models)**
- 在后台线程中预加载常用模型
- 减少用户首次使用时的等待时间
2. **智能缓存管理(unload_inactive_models)**
- 基于LRU(最近最少使用)算法卸载不活跃模型
- 始终保留当前活动模型
- 维持内存使用在合理范围内
3. **动态模型注册(register_new_model)**
- 运行时添加新模型配置
- 无需重启系统即可使用新模型
4. **模型热更新(update_model)**
- 更新模型配置后自动卸载并重新加载
- 保持服务连续性
5. **健壮的故障处理(handle_model_fallback)**
- 自动重新加载故障模型
- 切换到备用模型的回退机制
- 系统降级运行保证基本服务
### 模型管理器扩展方法
这些方法需要添加到 `model_manager.py` 中:
```python
# model_manager.py 扩展方法
class ModelManager:
# ... 现有代码 ...
def get_last_used_time(self, model_name: str) -> float:
"""获取模型最后使用时间"""
return self.usage_stats.get(model_name, {}).get("last_used", 0)
def get_usage_count(self, model_name: str) -> int:
"""获取模型使用次数"""
return self.usage_stats.get(model_name, {}).get("count", 0)
def get_inactive_models(self, max_models: int) -> list:
"""获取不活跃模型列表(按最后使用时间排序)"""
# 收集所有已加载模型(排除当前活动模型)
loaded_models = [name for name in self.loaded_models.keys()]
# 按最后使用时间排序(旧的在前面)
inactive = sorted(
loaded_models,
key=lambda name: self.usage_stats.get(name, {}).get("last_used", 0)
)
# 返回需要卸载的模型列表
if len(inactive) > max_models:
return inactive[:len(inactive) - max_models]
return []
def get_memory_usage(self) -> dict:
"""获取内存使用情况"""
try:
import psutil
process = psutil.Process()
return {
"rss": f"{process.memory_info().rss // (1024**2)} MB",
"vms": f"{process.memory_info().vms // (1024**2)} MB"
}
except ImportError:
return {"error": "psutil not installed"}
def get_gpu_memory_usage(self) -> dict:
"""获取GPU内存使用情况"""
if not torch.cuda.is_available():
return {"gpu_available": False}
stats = {}
for i in range(torch.cuda.device_count()):
stats[f"cuda:{i}"] = {
"allocated": f"{torch.cuda.memory_allocated(i) // (1024**2)} MB",
"reserved": f"{torch.cuda.memory_reserved(i) // (1024**2)} MB"
}
return stats
```
### 配置文件示例
```json
{
"cognitive_orchestrator": {
"name": "MainOrchestrator",
"state_dir": "E:/AI_System/system_state",
"preload_models": ["TEXT_CHAT", "IMAGE_GEN"],
"default_model": "TEXT_CHAT",
"fallback_threshold": 3
},
"model_manager": {
"cache_dir": "E:/model_cache",
"max_models_in_memory": 3,
"registered_models": {
"TEXT_CHAT": {
"type": "text_generation",
"path": "models/text_chat",
"size": "1.2GB"
},
"IMAGE_GEN": {
"type": "diffusion",
"path": "models/image_gen",
"size": "3.8GB"
}
}
}
}
```
以上实现提供了一个完整的模型管理系统,支持动态加载、热更新、缓存优化和故障恢复等功能,确保系统在各种场景下都能高效稳定运行。
阅读全文