活动介绍

milvus: error while loading shared libraries: libaio.so.1:

时间: 2025-05-07 09:13:07 浏览: 48
### Milvus 运行时缺少 `libaio.so.1` 共享库的解决方案 当遇到错误提示 `error while loading shared libraries: libaio.so.1: cannot open shared object file: No such file or directory`[^2] 时,这表明系统中缺失了名为 `libaio.so.1` 的共享库文件。此问题通常发生在 Linux 系统上运行依赖该库的应用程序(如 Milvus)时。 以下是解决问题的具体方法: #### 方法一:安装 `libaio` 库 在大多数基于 Debian 或 Ubuntu 的发行版中,可以通过以下命令来安装所需的库: ```bash sudo apt-get update && sudo apt-get install -y libaio1 ``` 对于基于 Red Hat 或 CentOS 的系统,则可以使用以下命令: ```bash sudo yum install -y libaio ``` 上述操作会自动下载并安装 `libaio` 及其关联的 `.so` 文件到系统的标准路径下,从而解决加载失败的问题。 #### 方法二:手动指定动态链接器路径 如果由于权限或其他原因无法通过包管理工具安装 `libaio`,也可以尝试手动设置环境变量 `LD_LIBRARY_PATH` 来指向已有的 `libaio.so.1` 文件位置。例如: ```bash export LD_LIBRARY_PATH=/path/to/lib:$LD_LIBRARY_PATH ``` 其中 `/path/to/lib` 是实际存储有 `libaio.so.1` 文件的目录地址。完成配置后重新启动 Milvus 即可验证效果。 #### 方法三:检查配置文件是否存在 虽然本问题是关于共享库丢失的情况,但如果同时存在其他异常状况,比如服务器配置文件不存在等问题也可能间接影响正常工作流程。因此建议确认是否有如下警告信息:“Config file not exist: /var/lib/milvus/conf/server_config.yaml”[^3] 并采取相应措施修复基础环境设定后再继续排查具体技术层面障碍。 --- ### 提供一段示例代码用于测试是否成功加载所需库 下面是一段简单的 C++ 测试代码用来检测当前环境中能否正确找到并载入目标共享对象(`libaio`)。 ```cpp #include <iostream> #include <dlfcn.h> int main() { void* handle = dlopen("libaio.so.1", RTLD_LAZY); if (!handle) { std::cerr << "Cannot load library: " << dlerror() << '\n'; return EXIT_FAILURE; } std::cout << "Library loaded successfully." << std::endl; dlclose(handle); return EXIT_SUCCESS; } ``` 编译执行以上脚本可以帮助快速判断之前提到的方法是否生效。 ---
阅读全文

相关推荐

# ======================================================= # 业务领域多模态RAG智能问答系统 (Business-RAG-MultiModal) # v2.1 - 最终稳定版 # ======================================================= # --- 核心依赖库导入 --- import os import hashlib import json import logging import base64 import pathlib import re import requests # 用于直接调用Ollama API import time # 新增:用于重试机制 from typing import List, Dict # --- LlamaIndex 核心导入 --- from llama_index.core import Settings from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, PromptTemplate, Document, StorageContext from llama_index.core.readers.base import BaseReader as LlamaBaseReader from llama_index.core.node_parser import SentenceSplitter from llama_index.core.schema import TextNode from llama_index.llms.ollama import Ollama from llama_index.core.postprocessor import SentenceTransformerRerank from llama_index.embeddings.huggingface import HuggingFaceEmbedding # --- Milvus 相关导入 --- from llama_index.vector_stores.milvus import MilvusVectorStore from pymilvus import utility, connections, Collection # --- 配置日志 --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ======================================================= # 1. 全局配置区 # ======================================================= CONFIG = { "knowledge_base_dir": "knowledge_base", "image_cache_file": "image_description_cache_offline.json", "embed_model_path": "D:/models/text2vec-base-chinese", "reranker_model_path": "D:/models/bge-reranker-v2-m3", "llm_model_name": "qwen3:8b", "mllm_model_name": "llava", "llm_request_timeout": 600.0, "chunk_size": 512, "chunk_overlap": 50, "retrieval_top_k": 10, "rerank_top_n": 3, "device": "cpu", # --- Milvus配置 --- "milvus_host": "127.0.0.1", # 确保与Docker容器端口一致 "milvus_port": "19530", # 默认端口 "milvus_collection": "law_rag_collection_v1", "vector_dim": 768 } # ======================================================= # 2. 核心功能函数区 # ======================================================= def load_image_cache(): if os.path.exists(CONFIG["image_cache_file"]): with open(CONFIG["image_cache_file"], 'r', encoding='utf-8') as f: try: return json.load(f) except json.JSONDecodeError: return {} return {} def save_image_cache(cache): with open(CONFIG["image_cache_file"], 'w', encoding='utf-8') as f: json.dump(cache, f, ensure_ascii=False, indent=4) def get_image_description_from_mllm(image_bytes: bytes, local_mllm: Ollama, question="请详细描述这张图片的内容,如果图片中有文字也请一并识别出来。") -> str: """ 【修正版】直接调用Ollama API来获取图片描述,并使用正确的sha256。 """ image_cache = load_image_cache() image_hash = hashlib.sha256(image_bytes).hexdigest() if image_hash in image_cache: logger.info(f" - 发现已缓存的离线图片描述 (hash: {image_hash[:8]}...),从缓存加载。") return image_cache[image_hash] logger.info(f" - 未找到图片缓存,正在直接调用Ollama API (模型: {local_mllm.model})...") try: image_b64 = base64.b64encode(image_bytes).decode("utf-8") payload = { "model": local_mllm.model, "prompt": question, "images": [image_b64], "stream": False } response = requests.post( "http://localhost:11434/api/generate", json=payload, timeout=CONFIG["llm_request_timeout"] ) response.raise_for_status() response_data = response.json() description = response_data.get('response', '[模型未返回有效描述]').strip() formatted_description = f"[图片描述]: {description}\n" image_cache[image_hash] = formatted_description save_image_cache(image_cache) return formatted_description except requests.exceptions.RequestException as e: logger.error(f"直接调用Ollama API处理图片时发生网络异常: {e}") return "[网络异常导致图片处理失败]\n" except Exception as e: logger.error(f"处理图片时发生未知异常: {e}") return "[未知异常导致图片处理失败]\n" class CustomMultimodalReader(LlamaBaseReader): def __init__(self, mllm_instance: Ollama): super().__init__() self.mllm = mllm_instance def load_data(self, file_path_obj: pathlib.Path, extra_info: Dict = None) -> List[Document]: file_path_str = str(file_path_obj) if file_path_str.endswith(".pdf"): return self._load_pdf(file_path_str, extra_info) elif file_path_str.endswith(".docx"): return self._load_docx(file_path_str, extra_info) else: # 为 .txt 文件添加一个基本的加载器 try: with open(file_path_str, 'r', encoding='utf-8') as f: text = f.read() return [Document(text=text, extra_info={**(extra_info or {}), "file_name": os.path.basename(file_path_str)})] except Exception as e: logger.error(f"处理TXT文件 '{file_path_str}' 时发生错误: {e}") return [] def _load_pdf(self, file_path: str, extra_info: Dict = None) -> List[Document]: documents = [] try: import pypdf with open(file_path, "rb") as fp: reader = pypdf.PdfReader(fp) if reader.is_encrypted: logger.warning(f"文件 {os.path.basename(file_path)} 是加密PDF,已跳过。") return [] for i, page in enumerate(reader.pages): page_info = {**(extra_info or {}), "page_label": str(i + 1), "file_name": os.path.basename(file_path)} if page_text := page.extract_text(): documents.append(Document(text=page_text.strip(), extra_info=page_info.copy())) for img_file_obj in page.images: if img_bytes := img_file_obj.data: image_description = get_image_description_from_mllm(img_bytes, self.mllm) documents.append(Document(text=image_description, extra_info={**page_info.copy(), "content_type": "image_description"})) except Exception as e: logger.error(f"处理PDF文件 '{file_path}' 时发生错误: {e}") return documents def _load_docx(self, file_path: str, extra_info: Dict = None) -> List[Document]: documents = [] try: import docx doc = docx.Document(file_path) file_info = {**(extra_info or {}), "file_name": os.path.basename(file_path)} for para in doc.paragraphs: if para.text.strip(): documents.append(Document(text=para.text.strip(), extra_info=file_info.copy())) for table in doc.tables: table_text = "\n".join([" | ".join([cell.text for cell in row.cells]) for row in table.rows]).strip() if table_text: documents.append(Document(text=f"[表格内容]:\n{table_text}", extra_info={**file_info.copy(), "content_type": "table_content"})) for rel in doc.part.rels.values(): if "image" in rel.target_ref: if image_bytes := rel.target_part.blob: image_description = get_image_description_from_mllm(image_bytes, self.mllm) documents.append(Document(text=image_description, extra_info={**file_info.copy(), "content_type": "image_description"})) except Exception as e: logger.error(f"处理DOCX文件 '{file_path}' 时发生错误: {e}") return documents # --- RAG核心流程函数 --- def setup_models_and_services(): """集中加载所有本地AI模型,并进行全局配置。""" logger.info("--- 步骤A: 加载所有本地AI模型 ---") embed_model = HuggingFaceEmbedding( model_name=CONFIG["embed_model_path"], device=CONFIG["device"] ) logger.info(f"成功加载本地嵌入模型: {CONFIG['embed_model_path']}") llm_model = Ollama(model=CONFIG["llm_model_name"], request_timeout=CONFIG["llm_request_timeout"]) logger.info(f"成功配置本地Ollama LLM (模型: {CONFIG['llm_model_name']})") mllm_for_parsing = Ollama(model=CONFIG["mllm_model_name"], request_timeout=300.0) logger.info(f"成功配置本地Ollama MLLM (模型: {CONFIG['mllm_model_name']})") reranker = SentenceTransformerRerank( model=CONFIG["reranker_model_path"], top_n=CONFIG["rerank_top_n"], device=CONFIG["device"] ) logger.info(f"成功加载本地重排模型: {CONFIG['reranker_model_path']}") # 将加载好的模型设置到全局 Settings 中,确保所有组件统一使用 Settings.embed_model = embed_model Settings.llm = llm_model logger.info("--- 已将embed_model和llm配置为全局默认 ---") logger.info("--- 所有AI模型加载完成 ---") return llm_model, embed_model, reranker, mllm_for_parsing def build_knowledge_index(mllm_for_parsing: Ollama, embed_model: HuggingFaceEmbedding): """ 【最终修正版】构建向量索引并将其持久化到Milvus数据库。 包含手动嵌入生成以绕过库的潜在bug。 """ logger.info("--- 步骤B: 连接Milvus并构建/加载知识库向量索引 ---") # ===== 新增:Milvus连接重试机制 ===== max_retries = 5 retry_delay = 5 # 秒 connected = False for attempt in range(max_retries): try: connections.connect(alias="default", host=CONFIG["milvus_host"], port=CONFIG["milvus_port"]) logger.info(f"成功连接到 Milvus 服务 at {CONFIG['milvus_host']}:{CONFIG['milvus_port']}") connected = True break except Exception as e: logger.warning(f"连接Milvus失败(尝试 {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: logger.info(f"{retry_delay}秒后重试...") time.sleep(retry_delay) else: logger.error(f"无法连接到 Milvus 服务,已达到最大重试次数") raise ConnectionError(f"无法连接Milvus: {str(e)}") if not connected: raise RuntimeError("Milvus连接失败") vector_store = MilvusVectorStore( uri=f"http://{CONFIG['milvus_host']}:{CONFIG['milvus_port']}", collection_name=CONFIG["milvus_collection"], dim=CONFIG["vector_dim"], overwrite=False ) collection_exists_and_has_content = False if utility.has_collection(CONFIG["milvus_collection"]): collection = Collection(name=CONFIG["milvus_collection"]) collection.load() if collection.num_entities > 0: collection_exists_and_has_content = True if collection_exists_and_has_content: logger.info(f"在Milvus中已找到包含实体的集合,直接加载索引...") index = VectorStoreIndex.from_vector_store(vector_store) logger.info("从Milvus加载索引完成。") else: if utility.has_collection(CONFIG["milvus_collection"]): logger.info("在Milvus中找到空集合,开始处理并填充数据...") else: logger.info(f"在Milvus中未找到集合,开始完整的数据处理和索引构建流程...") # 步骤 1: 数据加载和切分 (此部分不变) reader = SimpleDirectoryReader( input_dir=CONFIG["knowledge_base_dir"], required_exts=[".pdf", ".docx", ".txt"], file_extractor={".pdf": CustomMultimodalReader(mllm_instance=mllm_for_parsing), ".docx": CustomMultimodalReader(mllm_instance=mllm_for_parsing)}, recursive=True ) documents = reader.load_data(show_progress=True) all_nodes = [] sentence_splitter = SentenceSplitter(chunk_size=CONFIG["chunk_size"], chunk_overlap=CONFIG["chunk_overlap"]) for doc in documents: filename = doc.metadata.get("file_name", "").lower() if doc.metadata.get("content_type") == "image_description": all_nodes.append(doc); continue if filename.endswith(".pdf"): article_pattern = r'(第[一二三四五六七八九十百千万零〇\d]+条)'; text_chunks = re.split(article_pattern, doc.text); i = 1 while i < len(text_chunks): article_title = text_chunks[i]; article_content = text_chunks[i+1] if (i + 1) < len(text_chunks) else "" full_article_text = (article_title + article_content).strip() if full_article_text: node = Document(text=full_article_text, extra_info=doc.metadata.copy()); all_nodes.append(node) i += 2 else: nodes = sentence_splitter.get_nodes_from_documents([doc]); all_nodes.extend(nodes) logger.info(f"文档条件化切分完毕,共生成 {len(all_nodes)} 个内容块 (Nodes)。") # --- 【核心修正】 --- # 步骤 2: 手动、显式地为所有节点生成向量嵌入 logger.info(f"正在为 {len(all_nodes)} 个节点手动生成向量嵌入...") for node in all_nodes: # node.get_content() 是获取节点文本最稳健的方法 node.embedding = embed_model.get_text_embedding(node.get_content()) logger.info("所有节点的向量嵌入已手动生成。") # --- 【核心修正结束】 --- # 步骤 3: 将已经带有向量的节点添加到Milvus logger.info(f"正在将 {len(all_nodes)} 个带有预生成向量的节点添加到Milvus...") vector_store.add(all_nodes) logger.info("节点已成功添加到Milvus。") # 步骤 4: 从已填充的向量存储创建索引对象 index = VectorStoreIndex.from_vector_store(vector_store) logger.info("索引对象创建完成。") connections.disconnect("default") logger.info("已断开与 Milvus 服务的连接。") return index def run_query_pipeline(index: VectorStoreIndex, llm_model: Ollama, reranker: SentenceTransformerRerank): """启动问答流程,循环处理预设的问题。""" logger.info("--- 步骤C: 开始RAG问答流程 ---") QA_PROMPT_TEMPLATE = PromptTemplate( "你是一个专业的业务问答助手,负责根据内部知识库提供精准、可靠的回答。\n\n" "**你的任务是:**\n" "1. 仔细阅读下面提供的“参考信息”。\n" "2. 根据“参考信息”直接回答“用户问题”,禁止进行任何形式的猜测、推理或使用你自己的知识。\n" "3. **引用来源**:在回答中,如果引用了某份文件的内容,必须在相关句子末尾用 (文件名) 的格式注明来源。\n" "4. **版本对比**:如果参考信息来自不同版本的文件(例如,文件名中包含年份),请对比说明它们之间的差异。\n" "5. **提供建议**:在回答的最后,根据回答内容,提供1-2条具体、可执行的业务建议。\n" "6. **未知问题**:如果“参考信息”中完全没有能回答问题的内容,你必须且只能回答:“根据提供的资料,无法回答该问题。”\n" "7. **格式要求**:回答的最后,必须附上一个“参考依据”列表,列出所有被引用的文件名。\n\n" "---------------------\n" "**参考信息:**\n{context_str}\n" "---------------------\n" "**用户问题:** {query_str}\n\n" "**你的回答:**\n" ) query_engine = index.as_query_engine( similarity_top_k=CONFIG["retrieval_top_k"], node_postprocessors=[reranker], text_qa_template=QA_PROMPT_TEMPLATE # llm会从全局Settings获取 ) questions = [ "根据附图1的技术架构图,究竟是哪个芯片独立负责生成刷新信号?", "数据出境安全评估申报流程图里,如果个人信息达到10万人规模该怎么办?", "我国公民的基本权力以及义务有哪些", "借钱不还怎么办?" ] for q in questions: logger.info(f"\n{'='*70}\n--- 用户提问: {q} ---") try: response = query_engine.query(q) logger.info("\n--- 模型最终回答 ---\n" + str(response)) logger.info("\n--- 回答引用的参考信息 (经重排后) ---") for i, node_with_score in enumerate(response.source_nodes): logger.info(f"--- 来源 {i+1} (得分: {node_with_score.score:.4f}, 文件: {node_with_score.metadata.get('file_name', 'N/A')}) ---") node = node_with_score.node if hasattr(node, 'text') and node.text: logger.info(f"内容预览: {node.text[:150]}...\n") else: logger.info(f"内容预览: [这是一个非文本节点,类型为: {type(node).__name__}]\n") except Exception as e: logger.error(f"执行RAG查询时发生错误: {e}", exc_info=True) logger.info(f"{'='*70}") # ======================================================= # 3. 主执行入口 # ======================================================= if __name__ == "__main__": logger.info("===== 启动业务领域多模态RAG智能问答系统 =====") try: llm, embed_model, reranker, mllm = setup_models_and_services() knowledge_index = build_knowledge_index(mllm, embed_model) run_query_pipeline(knowledge_index, llm, reranker) except Exception as e: logger.error(f"程序主流程发生致命错误,即将退出: {e}", exc_info=True) exit(1) logger.info("\n===== RAG系统执行完成 =====") 这是我的代码,(rag_project_env) PS D:\new_rag> & D:/miniconda/envs/rag_project_env/python.exe d:/new_rag/law_rag.py 2025-07-24 11:26:07,150 - INFO - ===== 启动业务领域多模态RAG智能问答系统 ===== 2025-07-24 11:26:07,150 - INFO - --- 步骤A: 加载所有本地AI模型 --- 2025-07-24 11:26:07,153 - INFO - Load pretrained SentenceTransformer: D:/models/text2vec-base-chinese 2025-07-24 11:26:08,791 - INFO - 成功加载本地嵌入模型: D:/models/text2vec-base-chinese 2025-07-24 11:26:08,791 - INFO - 成功配置本地Ollama LLM (模型: qwen3:8b) 2025-07-24 11:26:08,791 - INFO - 成功配置本地Ollama MLLM (模型: llava) 2025-07-24 11:26:09,586 - INFO - 成功加载本地重排模型: D:/models/bge-reranker-v2-m3 2025-07-24 11:26:09,586 - INFO - --- 已将embed_model和llm配置为全局默认 --- 2025-07-24 11:26:09,586 - INFO - --- 所有AI模型加载完成 --- 2025-07-24 11:26:09,586 - INFO - --- 步骤B: 连接Milvus并构建/加载知识库向量索引 --- 2025-07-24 11:26:19,712 - WARNING - 连接Milvus失败(尝试 1/5): <MilvusException: (code=2, message=Fail connecting to server on 127.0.0.1:19530, illegal connection params or server unavailable)> 2025-07-24 11:26:19,712 - INFO - 5秒后重试... 2025-07-24 11:26:34,849 - WARNING - 连接Milvus失败(尝试 2/5): <MilvusException: (code=2, message=Fail connecting to server on 127.0.0.1:19530, illegal connection params or server unavailable)> 2025-07-24 11:26:34,849 - INFO - 5秒后重试... 2025-07-24 11:26:49,974 - WARNING - 连接Milvus失败(尝试 3/5): <MilvusException: (code=2, message=Fail connecting to server on 127.0.0.1:19530, illegal connection params or server unavailable)> 2025-07-24 11:26:49,978 - INFO - 5秒后重试... 2025-07-24 11:27:05,102 - WARNING - 连接Milvus失败(尝试 4/5): <MilvusException: (code=2, message=Fail connecting to server on 127.0.0.1:19530, illegal connection params or server unavailable)> 2025-07-24 11:27:05,102 - INFO - 5秒后重试... 2025-07-24 11:27:20,192 - WARNING - 连接Milvus失败(尝试 5/5): <MilvusException: (code=2, message=Fail connecting to server on 127.0.0.1:19530, illegal connection params or server unavailable)> 2025-07-24 11:27:20,192 - ERROR - 无法连接到 Milvus 服务,已达到最大重试次数 2025-07-24 11:27:20,192 - ERROR - 程序主流程发生致命错误,即将退出: 无法连接Milvus: <MilvusException: (code=2, message=Fail connecting to server on 127.0.0.1:19530, illegal connection params or server unavailable)> Traceback (most recent call last): File "d:\new_rag\law_rag.py", line 230, in build_knowledge_index connections.connect(alias="default", host=CONFIG["milvus_host"], port=CONFIG["milvus_port"]) File "D:\miniconda\envs\rag_project_env\lib\site-packages\pymilvus\orm\connections.py", line 459, in connect connect_milvus(**kwargs, user=user, password=password, token=token, db_name=db_name) File "D:\miniconda\envs\rag_project_env\lib\site-packages\pymilvus\orm\connections.py", line 420, in connect_milvus raise e from e File "D:\miniconda\envs\rag_project_env\lib\site-packages\pymilvus\orm\connections.py", line 412, in connect_milvus gh._wait_for_channel_ready(timeout=timeout) File "D:\miniconda\envs\rag_project_env\lib\site-packages\pymilvus\client\grpc_handler.py", line 159, in _wait_for_channel_ready raise MilvusException( pymilvus.exceptions.MilvusException: <MilvusException: (code=2, message=Fail connecting to server on 127.0.0.1:19530, illegal connection params or server unavailable)> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "d:\new_rag\law_rag.py", line 374, in <module> knowledge_index = build_knowledge_index(mllm, embed_model) File "d:\new_rag\law_rag.py", line 241, in build_knowledge_index raise ConnectionError(f"无法连接Milvus: {str(e)}") ConnectionError: 无法连接Milvus: <MilvusException: (code=2, message=Fail connecting to server on 127.0.0.1:19530, illegal connection params or server unavailable)>这是我的报错,services: etcd: container_name: milvus-etcd image: quay.io/coreos/etcd:v3.5.5 environment: - ETCD_AUTO_COMPACTION_MODE=revision - ETCD_AUTO_COMPACTION_RETENTION=1000 - ETCD_QUOTA_BACKEND_BYTES=4294967296 - ETCD_SNAPSHOT_COUNT=50000 volumes: - ./volumes/etcd:/etcd command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd minio: container_name: milvus-minio image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z environment: - MINIO_ROOT_USER=minioadmin - MINIO_ROOT_PASSWORD=minioadmin volumes: - ./volumes/minio:/minio_data command: minio server /minio_data standalone: container_name: milvus-standalone image: milvusdb/milvus:v2.5.0 # <-- 修正点在这里 command: ["milvus", "run", "standalone"] environment: - ETCD_ENDPOINTS=etcd:2379 - MINIO_ADDRESS=minio:9000 volumes: - ./volumes/milvus:/var/lib/milvus ports: - "19530:19530" # Milvus port - "9091:9091" # Milvus metrics port depends_on: - "etcd" - "minio" volumes: etcd: minio: milvus:这是我的docker-compose.yml,请帮我解决我的报错,或者更改我的代码让我避开这个问题

# -*- coding: UTF-8 -*- import logging from datetime import datetime import cx_Oracle import numpy as np import requests from pymilvus import connections, Collection, utility, CollectionSchema, FieldSchema, DataType, MilvusException from apscheduler.schedulers.background import BackgroundScheduler import time import re import sys import os from pathlib import Path import json # 获取当前脚本的父目录(即项目根目录) current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(str(current_dir)) # 将项目根目录添加到 sys.path from config.config1 import LOGGING_CONFIG, ORACLE_CONFIG, MODEL_CONFIG, MILVUS_CONFIG # 初始化日志 log_file_path = LOGGING_CONFIG["log_file"] log_file_path = Path(log_file_path) log_file_path.parent.mkdir(exist_ok=True) logging.basicConfig( level=LOGGING_CONFIG["level"], format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(log_file_path), logging.StreamHandler() ] ) logger = logging.getLogger("MaterialSync") class OracleClient: """Oracle数据库客户端""" def __init__(self): self.conn = None self.connect() def connect(self): try: self.conn = cx_Oracle.connect('ecology/tt2354oa@10.12.163.65/oadb', encoding='UTF-8', nencoding='UTF-8') # 指定编码 logger.info("Connected to Oracle database") except Exception as e: logger.error(f"Oracle connection failed: {str(e)}") raise def fetch_all_data(self): """从Oracle数据库中获取所有数据""" try: cursor = self.conn.cursor() query = """ SELECT TO_CHAR(matnr) AS matnr, TO_CHAR(matkl) AS matkl, TO_CHAR(maktx) AS maktx, TO_CHAR(classfication) AS classfication FROM Material_MASTER@ERPLINK WHERE mandt = '688' AND ( matnr like 'DJ%' OR matnr like 'DY%' ) ORDER BY matnr """ cursor.execute(query) columns = [col[0].lower() for col in cursor.description] return [dict(zip(columns, row)) for row in cursor] except Exception as e: logger.error(f"Oracle query failed: {str(e)}") return [] finally: cursor.close() class VectorServiceClient: """HTTP调用模型服务进行向量编码""" def __init__(self): self.service_url = MODEL_CONFIG["model_service_url"] self.timeout = 120 # 请求超时时间(秒) logger.info(f"Using vector service: {self.service_url}") def batch_encode_dense(self, texts): """批量生成密集向量""" return self._call_vector_service(texts, "dense") def batch_encode_sparse(self, texts): """批量生成稀疏向量""" return self._call_vector_service(texts, "sparse") def _call_vector_service(self, texts, vector_type): """调用向量服务通用方法""" try: if not texts: return [] # 准备请求数据 payload = { "texts": texts, "type": vector_type # 添加向量类型参数 } # 配置请求头 headers = { "Content-Type": "application/json; charset=utf-8", "Accept": "application/json" } # 详细记录请求格式 logger.debug(f"Request payload details:") logger.debug(f" Vector type: {vector_type}") logger.debug(f" Text count: {len(texts)}") # 记录前3条文本的详细信息 for i, text in enumerate(texts[:3]): logger.debug(f" Text #{i + 1} (length={len(text)}): {text[:100]}{'...' if len(text) > 100 else ''}") # 记录整个请求体(限制长度) payload_json = json.dumps(payload, ensure_ascii=False) if len(payload_json) > 1000: logger.debug(f" Full request body (truncated): {payload_json[:1000]}...") else: logger.debug(f" Full request body: {payload_json}") # 发送请求到模型服务 response = requests.post( self.service_url, json=payload, headers=headers , timeout=self.timeout ) # 检查响应状态 response.raise_for_status() # 解析响应数据 result = response.json() if "error" in result: logger.error(f"Vector service error ({vector_type}): {result['error']}") raise ValueError(result["error"]) if "vectors" not in result: logger.error(f"Invalid response from {vector_type} service: vectors not found") logger.error(f"Response: {json.dumps(result, ensure_ascii=False)[:500]}") raise ValueError(f"Invalid response from {vector_type} service") logger.info(f"Successfully encoded {len(texts)} texts for {vector_type} vectors") # 对于密集向量,转换为numpy数组 if vector_type == "dense": vectors = np.array(result["vectors"]) # 验证向量维度 expected_dim = MILVUS_CONFIG["vector_dim"] if vectors.shape[1] != expected_dim: logger.error(f"Vector dimension mismatch: expected {expected_dim}, got {vectors.shape[1]}") raise ValueError("Vector dimension mismatch") return vectors else: # 稀疏向量直接返回字典列表 return result["vectors"] except requests.exceptions.RequestException as e: logger.error(f"Request to {vector_type} service failed: {str(e)}") raise except Exception as e: logger.error(f"Encoding via {vector_type} service failed: {str(e)}") raise class MilvusHandler: """Milvus数据库处理器""" def __init__(self): self.collection = None self.vector_service = VectorServiceClient() self.connect() self.prepare_collection() def connect(self): try: connections.connect( host=MILVUS_CONFIG["host"], port=MILVUS_CONFIG["port"] ) logger.info(f"Connected to Milvus: {MILVUS_CONFIG['host']}") except Exception as e: logger.error(f"Milvus connection failed: {str(e)}") raise def prepare_collection(self): """准备集合(自动创建)""" collection_name = MILVUS_CONFIG["collection_name"] if not utility.has_collection(collection_name): fields = [ FieldSchema(name="matnr", dtype=DataType.VARCHAR, is_primary=True, max_length=100), FieldSchema(name="matkl", dtype=DataType.VARCHAR, max_length=50), FieldSchema(name="maktx", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="classfication", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="maktx_vector", dtype=DataType.FLOAT_VECTOR, dim=MILVUS_CONFIG["vector_dim"]), FieldSchema(name="classfication_vector", dtype=DataType.SPARSE_FLOAT_VECTOR) ] schema = CollectionSchema(fields, "Material vector storage") self.collection = Collection(collection_name, schema) # 创建稀疏向量索引 self.collection.create_index( "classfication_vector", {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"} ) # 创建密集向量索引 self.collection.create_index( "maktx_vector", {"index_type": "IVF_FLAT", "metric_type": "IP", "params": {"nlist": 1024}} ) logger.info(f"Created collection with both vector types: {collection_name}") else: self.collection = Collection(collection_name) logger.info(f"Loaded collection schema: {collection_name}") # 确保集合已加载 self.ensure_collection_loaded() def ensure_collection_loaded(self): """确保集合已加载到内存""" try: collection_name = self.collection.name load_state = utility.load_state(collection_name) # 检查集合是否已加载 if load_state != "Loaded": logger.info(f"Collection state is {load_state}, loading now...") self.collection.load() logger.info("Collection loaded successfully") else: logger.info(f"Collection is already loaded (state: {load_state})") except MilvusException as e: logger.error(f"Failed to load collection: {str(e)}") raise except Exception as e: logger.error(f"Error checking collection state: {str(e)}") # 如果无法检查状态,尝试直接加载 try: self.collection.load() logger.info("Collection loaded successfully (using fallback)") except Exception as e2: logger.error(f"Fallback loading failed: {str(e2)}") raise def batch_upsert(self, data, batch_size=500): """分批次插入或更新数据""" total_records = len(data) processed_count = 0 for i in range(0, total_records, batch_size): batch_data = data[i:i + batch_size] # 确保集合已加载 self.ensure_collection_loaded() # 数据清洗 valid_batch_data = [] for item in batch_data: try: cleaned_item = { "matnr": self.clean_utf8(item["matnr"], 'matnr', item['matnr']), "matkl": self.clean_utf8(item["matkl"], 'matkl', item['matnr']), "maktx": self.clean_utf8(item["maktx"], 'maktx', item['matnr']), "classfication": self.clean_utf8(item.get("classfication", ""), 'classfication', item['matnr']) } # 验证UTF-8 if all(self.validate_utf8_string(v) for k, v in cleaned_item.items()): valid_batch_data.append(cleaned_item) else: logger.warning(f"Invalid UTF-8 data skipped: {cleaned_item}") except Exception as e: logger.error(f"Error cleaning item: {str(e)}") if not valid_batch_data: logger.info(f"No valid data in batch {i // batch_size + 1}") continue logger.info(f"Processing batch {i // batch_size + 1} with {len(valid_batch_data)} items") # 查询当前批次中已存在的物料编码 matnr_list = [item['matnr'] for item in valid_batch_data] existing_data = [] try: # 构建安全的查询表达式 safe_matnrs = [f"'{matnr}'" for matnr in matnr_list] expr = f"matnr in [{','.join(safe_matnrs)}]" logger.debug(f"Querying Milvus with expression: {expr}") existing_data = self.collection.query( expr=expr, output_fields=["matnr", "maktx", "classfication", "maktx_vector", "classfication_vector"] ) logger.debug(f"Found {len(existing_data)} existing records") except MilvusException as e: logger.error(f"Milvus query failed: {str(e)}") # 回退方案:逐个查询 logger.warning("Falling back to individual queries") for matnr in matnr_list: try: expr = f"matnr == '{matnr}'" item_data = self.collection.query(expr, output_fields=["matnr", "maktx", "classfication", "maktx_vector", "classfication_vector"]) if item_data: existing_data.extend(item_data) except Exception as e: logger.error(f"Failed to query matnr {matnr}: {str(e)}") existing_dict = {item["matnr"]: item for item in existing_data} # 准备需要重新生成向量的数据 maktx_to_encode = [] # 需要生成密集向量的物料描述 class_to_encode = [] # 需要生成稀疏向量的特征值 maktx_indices = [] # 需要更新密集向量的索引 class_indices = [] # 需要更新稀疏向量的索引 # 准备upsert数据 upsert_data = [] for idx, item in enumerate(valid_batch_data): matnr = item["matnr"] existing = existing_dict.get(matnr, {}) # 检查物料描述是否变化 if matnr in existing_dict: if item["maktx"] == existing.get("maktx", ""): # 物料描述相同,复用现有向量 item["maktx_vector"] = existing.get("maktx_vector") else: # 物料描述变化,需要重新生成 maktx_to_encode.append(item["maktx"]) maktx_indices.append(idx) else: # 新记录,需要生成向量 maktx_to_encode.append(item["maktx"]) maktx_indices.append(idx) # 处理特征值向量 class_value = item["classfication"] # 特征值为空的情况 if not class_value or class_value.isspace(): item["classfication_vector"] = None else: # 特征值不为空 if matnr in existing_dict: if class_value == existing.get("classfication", ""): # 特征值相同,复用现有向量 item["classfication_vector"] = existing.get("classfication_vector") else: # 特征值变化,需要重新生成 class_to_encode.append(class_value) class_indices.append(idx) else: # 新记录,需要生成向量 class_to_encode.append(class_value) class_indices.append(idx) upsert_data.append(item) # 批量生成物料描述向量(密集) if maktx_to_encode: try: logger.info(f"Encoding {len(maktx_to_encode)} dense vectors for maktx...") dense_vectors = self.vector_service.batch_encode_dense(maktx_to_encode) # 将向量分配给对应的记录 for vec_idx, data_idx in enumerate(maktx_indices): upsert_data[data_idx]["maktx_vector"] = dense_vectors[vec_idx] except Exception as e: logger.error(f"Failed to encode dense vectors: {str(e)}") # 跳过这个批次 continue # 批量生成特征值向量(稀疏) if class_to_encode: try: logger.info(f"Encoding {len(class_to_encode)} sparse vectors for classfication...") sparse_vectors = self.vector_service.batch_encode_sparse(class_to_encode) # 将向量分配给对应的记录 for vec_idx, data_idx in enumerate(class_indices): # 确保索引在范围内 if vec_idx < len(sparse_vectors): upsert_data[data_idx]["classfication_vector"] = sparse_vectors[vec_idx] except Exception as e: logger.error(f"Failed to encode sparse vectors: {str(e)}") # 跳过这个批次 continue # 准备Milvus实体数据 entities = [ [item["matnr"] for item in upsert_data], [item["matkl"] for item in upsert_data], [item["maktx"] for item in upsert_data], [item["classfication"] for item in upsert_data], [item.get("maktx_vector", []).tolist() if hasattr(item.get("maktx_vector", None), 'tolist') else [] for item in upsert_data], [self.format_sparse_vector(item.get("classfication_vector")) for item in upsert_data] ] # 执行upsert操作 if upsert_data: try: logger.info(f"Upserting {len(upsert_data)} records to Milvus...") self.collection.upsert(entities) self.collection.flush() # 统计空特征值数量 empty_class_count = sum(1 for item in upsert_data if not item["classfication"] or item["classfication"].isspace()) logger.info(f"Upserted batch {i // batch_size + 1}: " f"{len(upsert_data)} records ({empty_class_count} empty classfication)") processed_count += len(upsert_data) except MilvusException as e: logger.error(f"Milvus upsert failed: {str(e)}") # 记录前3条失败数据 for j in range(min(3, len(upsert_data))): sample = upsert_data[j] logger.error(f"Failed sample {j + 1}: matnr={sample['matnr']}, " f"maktx_len={len(sample['maktx'])}, " f"class_len={len(sample['classfication']) if sample['classfication'] else 0}") return processed_count def format_sparse_vector(self, vec): """格式化稀疏向量为Milvus兼容格式""" if vec is None: return {} # FlagEmbedding 返回的是 {token: weight} 格式 if isinstance(vec, dict): # 转换为 {index: weight} 格式 # 这里我们不需要实际索引,只需确保键是整数 # 使用枚举创建新索引,因为原始token字符串Milvus无法处理 formatted = {} for idx, (token, weight) in enumerate(vec.items()): # 确保权重非负 if float(weight) > 0: formatted[int(idx)] = float(weight) return formatted # 如果传入的是列表或其他格式,转换为字典 try: if isinstance(vec, (list, tuple, np.ndarray)): # 转换为稀疏字典格式,只保留正值 return {i: float(val) for i, val in enumerate(vec) if float(val) > 0} return {} except Exception as e: logger.error(f"Failed to format sparse vector: {str(e)}") return {} @staticmethod def validate_utf8_string(s): try: s.encode('utf-8').decode('utf-8') return True except (UnicodeEncodeError, UnicodeDecodeError): return False @staticmethod def clean_utf8(value, field_name, item_id): """强化 UTF-8 清洗逻辑""" if value is None: return '' try: value_str = str(value) cleaned = re.sub(r'\\u[0-9a-fA-F]{4}', '', value_str) cleaned = cleaned.replace('\xa0', ' ') cleaned = cleaned.encode('utf-8', errors='replace').decode('utf-8') return cleaned except Exception as e: logger.warning(f"Failed to clean UTF-8 for [{field_name}] ({item_id}): {str(e)}") return '' class SyncScheduler: """同步调度器""" def __init__(self): self.oracle = OracleClient() self.milvus = MilvusHandler() def execute_sync(self): """执行同步任务""" logger.info("Starting sync job...") start_time = time.time() try: # 从Oracle获取所有数据 logger.info("Fetching data from Oracle...") all_data = self.oracle.fetch_all_data() if not all_data: logger.info("No data found in Oracle") return logger.info(f"Retrieved {len(all_data)} records from Oracle") # 数据校验和清理 cleaned_data = [] invalid_count = 0 empty_class_count = 0 for item in all_data: try: # 处理可能的键名变化 class_value = item.get('classfication', item.get('classfication', '')) # 数据清洗 cleaned_item = { "matnr": self.clean_utf8(item['matnr'], 'matnr', item['matnr']), "matkl": self.clean_utf8(item['matkl'], 'matkl', item['matnr']), "maktx": self.clean_utf8(item['maktx'], 'maktx', item['matnr']), "classfication": self.clean_utf8(class_value, 'classfication', item['matnr']) } # 统计空特征值 if not cleaned_item["classfication"] or cleaned_item["classfication"].isspace(): empty_class_count += 1 # 验证UTF-8 if all(self.is_valid_utf8(v) for v in cleaned_item.values()): cleaned_data.append(cleaned_item) else: invalid_count += 1 logger.warning(f"Invalid UTF-8 data skipped: matnr={item['matnr']}") except Exception as e: invalid_count += 1 logger.error(f"Error processing item: {item}, error: {str(e)}") if invalid_count > 0: logger.warning(f"Skipped {invalid_count} invalid records") if cleaned_data: processed_count = self.milvus.batch_upsert(cleaned_data) logger.info(f"Successfully processed {processed_count}/{len(cleaned_data)} records") else: logger.warning("No valid data to sync") duration = time.time() - start_time logger.info(f"Sync job completed in {duration:.2f} seconds") except Exception as e: logger.error(f"Sync failed: {str(e)}") duration = time.time() - start_time logger.error(f"Sync job failed after {duration:.2f} seconds") # 尝试重新连接Milvus try: logger.info("Attempting to reconnect to Milvus...") self.milvus = MilvusHandler() logger.info("Milvus reconnected successfully") except Exception as reconnect_error: logger.error(f"Reconnection failed: {str(reconnect_error)}") @staticmethod def clean_utf8(value, field_name, item_id): """强化 UTF-8 清洗逻辑""" if value is None: return '' try: value_str = str(value) cleaned = re.sub(r'\\u[0-9a-fA-F]{4}', '', value_str) cleaned = cleaned.replace('\xa0', ' ') cleaned = cleaned.encode('utf-8', errors='replace').decode('utf-8') return cleaned except Exception as e: logger.warning(f"Failed to clean UTF-8 for [{field_name}] ({item_id}): {str(e)}") return '' @staticmethod def is_valid_utf8(s): try: s.encode('utf-8').decode('utf-8') return True except UnicodeError: return False if __name__ == "__main__": scheduler = BackgroundScheduler() sync = SyncScheduler() # 立即执行一次同步 logger.info("Executing initial sync...") sync.execute_sync() # 每天凌晨2点执行 scheduler.add_job(sync.execute_sync, "cron", hour=10, minute=58) try: logger.info("Scheduler started with HTTP vector services") scheduler.start() # 保持主程序运行 while True: time.sleep(60) except (KeyboardInterrupt, SystemExit): logger.info("Scheduler stopped") scheduler.shutdown() except Exception as e: logger.error(f"Unexpected error: {str(e)}") import traceback logger.error(traceback.format_exc()) 这个代码报错2025-07-05 10:58:16,307 - ERROR - Request to dense service failed: 400 Client Error: Bad Request for url: http://10.162.244.27:8088/encode 2025-07-05 10:58:16,307 - ERROR - Failed to encode dense vectors: 400 Client Error: Bad Request for url: http://10.162.244.27:8088/encode

import logging from flask import Flask, request, jsonify import numpy as np import requests import time from pymilvus import ( Collection, connections, utility, AnnSearchRequest, # 添加这个导入 Reranker, # 添加这个导入 WeightedRanker # 添加这个导入 ) from config.config1 import MILVUS_CONFIG, MODEL_CONFIG # 初始化日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("logs/classfication_query.log"), logging.StreamHandler() ] ) logger = logging.getLogger("ClassficationQuery") app = Flask(__name__) class VectorServiceClient: """HTTP调用模型服务进行向量编码(密集和稀疏)""" def __init__(self): self.service_url = MODEL_CONFIG["model_service_url"] self.timeout = 120 logger.info(f"Using vector service: {self.service_url}") def batch_encode_dense(self, texts): """批量生成密集向量""" return self._call_vector_service(texts, "dense") def batch_encode_sparse(self, texts): """批量生成稀疏向量""" return self._call_vector_service(texts, "sparse") def _call_vector_service(self, texts, vector_type): """调用向量服务通用方法""" try: if not texts: return [] payload = { "texts": texts, "type": vector_type } response = requests.post( self.service_url, headers={"Content-Type": "application/json"}, json=payload, timeout=self.timeout ) if response.status_code >= 400: error_detail = f"HTTP {response.status_code} Error: " try: error_detail += response.json().get("error", response.text[:500]) except: error_detail += response.text[:500] logger.error(f"{vector_type} service error: {error_detail}") response.raise_for_status() result = response.json() if "error" in result: logger.error(f"Vector service error ({vector_type}): {result['error']}") raise ValueError(result["error"]) if "vectors" not in result: logger.error(f"Invalid response from {vector_type} service: vectors not found") raise ValueError(f"Invalid response from {vector_type} service") return result["vectors"] except requests.exceptions.RequestException as e: logger.error(f"Request to {vector_type} service failed: {str(e)}") raise except Exception as e: logger.error(f"Encoding via {vector_type} service failed: {str(e)}") raise class ClassficationSearchHandler: """特征值搜索处理器""" def __init__(self): self.vector_service = VectorServiceClient() self.collection = None self.connect() self.load_collection() def connect(self): """连接Milvus数据库""" try: connections.connect( host=MILVUS_CONFIG["host"], port=MILVUS_CONFIG["port"] ) logger.info(f"Connected to Milvus: {MILVUS_CONFIG['host']}") except Exception as e: logger.error(f"Milvus connection failed: {str(e)}") raise def load_collection(self): """加载集合""" collection_name = MILVUS_CONFIG["collection_name"] try: if not utility.has_collection(collection_name): logger.error(f"Collection {collection_name} does not exist") raise ValueError(f"Collection {collection_name} not found") self.collection = Collection(collection_name) self.collection.load() logger.info(f"Loaded collection: {collection_name}") except Exception as e: logger.error(f"Failed to load collection: {str(e)}") raise def hybrid_search(self, query_text, top_k=5, dense_weight=0.5, sparse_weight=0.5): """ 执行混合检索(密集向量 + 稀疏向量) 参数: query_text: 查询文本(特征值) top_k: 返回结果数量 dense_weight: 密集向量权重 sparse_weight: 稀疏向量权重 返回: 排序后的结果列表 """ start_time = time.time() # 1. 编码查询文本 try: logger.info(f"Encoding query text: '{query_text}'") # 编码密集向量 dense_vectors = self.vector_service.batch_encode_dense([query_text]) if not dense_vectors or len(dense_vectors) == 0: logger.error("Dense vector encoding returned empty result") return [] dense_vector = dense_vectors[0] logger.info(f"Dense vector generated, length: {len(dense_vector)}") # 编码稀疏向量 sparse_vectors = self.vector_service.batch_encode_sparse([query_text]) if not sparse_vectors or len(sparse_vectors) == 0: logger.error("Sparse vector encoding returned empty result") return [] sparse_vector = sparse_vectors[0] logger.info(f"Sparse vector generated, length: {len(sparse_vector)}") except Exception as e: logger.error(f"Vector encoding failed: {str(e)}") return [] # 2. 创建搜索请求对象 try: # 创建密集向量搜索请求 dense_search_req = AnnSearchRequest( data=[dense_vector], # 注意:需要是二维列表 anns_field="classfication_dense_vector", param={"metric_type": "IP", "params": {"nprobe": 16}}, limit=top_k * 3, # 获取更多候选结果用于融合 weight=dense_weight ) # 创建稀疏向量搜索请求 sparse_search_req = AnnSearchRequest( data=[sparse_vector], # 注意:需要是二维列表 anns_field="classfication_sparse_vector", param={"metric_type": "IP", "params": {}}, # 稀疏向量不需要nprobe limit=top_k * 3, weight=sparse_weight ) # 3. 执行混合搜索 logger.info("Executing hybrid search...") start_search = time.time() # 使用RRF(Reciprocal Rank Fusion)进行结果融合 rerank = Reranker( strategy="rrf", # 使用RRF策略 params={"k": 60} # RRF参数 ) # 执行混合搜索 results = self.collection.hybrid_search( [dense_search_req, sparse_search_req], # 搜索请求列表 rerank=rerank, # 重排策略 limit=top_k, # 最终返回结果数量 output_fields=["matnr", "matkl", "maktx", "classfication"] ) search_time = time.time() - start_search logger.info(f"Hybrid search completed in {search_time:.2f}s, found {len(results)} results") except Exception as e: logger.error(f"Hybrid search failed: {str(e)}", exc_info=True) return [] # 4. 处理并返回结果 formatted_results = [] for i, hit in enumerate(results): entity = hit.entity formatted_results.append({ "rank": i + 1, "matnr": entity.get("matnr", ""), "matkl": entity.get("matkl", ""), "maktx": entity.get("maktx", ""), "classfication": entity.get("classfication", ""), "score": hit.score }) # 记录前5个结果 if i < 5: logger.info(f"Result #{i + 1}: MATNR={entity.get('matnr')}, Score={hit.score:.4f}") total_time = time.time() - start_time logger.info(f"Total search time: {total_time:.2f}s") return formatted_results # 初始化搜索处理器 search_handler = ClassficationSearchHandler() @app.route('/query_similar_by_classfication', methods=['POST']) def query_similar_by_classfication(): """特征值相似度查询接口""" try: data = request.json if not data or "query_text" not in data: return jsonify({"error": "Missing 'query_text' parameter"}), 400 query_text = data["query_text"] top_k = data.get("top_k", 5) dense_weight = data.get("dense_weight", 0.5) sparse_weight = data.get("sparse_weight", 0.5) logger.info(f"New query: text='{query_text}', top_k={top_k}, " f"dense_weight={dense_weight}, sparse_weight={sparse_weight}") # 执行混合搜索 results = search_handler.hybrid_search( query_text, top_k=top_k, dense_weight=dense_weight, sparse_weight=sparse_weight ) if not results: logger.info("No results found") return jsonify({"results": []}) return jsonify({"results": results}) except Exception as e: logger.error(f"API error: {str(e)}", exc_info=True) return jsonify({"error": "Internal server error"}), 500 @app.route('/health', methods=['GET']) def health_check(): """健康检查端点""" try: # 检查Milvus连接 if not search_handler.collection: return jsonify({"status": "down", "reason": "Milvus not connected"}), 500 # 检查模型服务 try: response = requests.get(f"{MODEL_CONFIG['model_service_url']}/health", timeout=5) if response.status_code != 200: return jsonify({"status": "down", "reason": "Model service unavailable"}), 500 except Exception as e: return jsonify({"status": "down", "reason": f"Model service error: {str(e)}"}), 500 return jsonify({"status": "up"}), 200 except Exception as e: return jsonify({"status": "down", "reason": str(e)}), 500 if __name__ == '__main__': try: logger.info("Starting Classfication Query Service on 0.0.0.0:8081") app.run(host='0.0.0.0', port=2379, debug=False) except Exception as e: logger.error(f"Failed to start service: {str(e)}")这段代码报错2025-07-07 16:05:40,087 - INFO - Executing hybrid search... 2025-07-07 16:05:40,087 [ERROR][handler]: Unexpected error: [hybrid_search], 'dict' object has no attribute 'data', <Time: {'RPC start': '2025-07-07 16:05:40.087355', 'Exception': '2025-07-07 16:05:40.087428'}> (decorators.py:158) 2025-07-07 16:05:40,087 - ERROR - Hybrid search failed: <MilvusException: (code=1, message=Unexpected error, message=<'dict' object has no attribute 'data'>)> 2025-07-07 16:05:40,087 - INFO - No results found

最新推荐

recommend-type

TMS320F28335 SVPWM三相逆变学习板卡:硬件组成与功能详解

基于TMS320F28335 DSP的SVPWM三相逆变学习板卡,涵盖硬件组成、供电与保护机制、SVPWM技术原理及其优势、应用场景和输入电压范围。文中还展示了闭环控制程序的工作流程,并附有简化的示例代码。该板卡采用高效的SVPWM技术,使逆变器电压利用率提升至1.1倍,远高于传统SPWM的0.866倍,适用于多种逆变和控制任务,具有广泛的实际应用价值。 适合人群:对电力电子、嵌入式系统和数字控制感兴趣的工程师和技术爱好者。 使用场景及目标:①研究和学习SVPWM技术及其在三相逆变中的应用;②掌握TMS320F28335 DSP的硬件设计和编程技巧;③应用于电机控制、电源管理等领域,提高逆变效率和稳定性。 其他说明:文中提供的示例代码有助于理解和实现AD采样数据处理及SVPWM更新,便于读者快速上手实践。
recommend-type

一个一键设置时间同步并关闭防火墙的桌面应用小工具

一个一键设置时间同步并关闭防火墙的桌面应用小工具
recommend-type

MATLAB实现主从博弈电热综合能源系统的动态定价与智能能量管理仿真

内容概要:本文介绍了一个基于MATLAB的主从博弈电热综合能源系统动态定价与智能能量管理仿真实验平台。该平台利用主从博弈理论和多时间尺度优化方法,构建了动态定价决策层、用户响应层和耦合约束处理模块。核心代码采用了双层循环结构进行博弈均衡迭代搜索,结合非线性规划和混合整数规划求解器。热力系统建模引入了热惯性的动态传播模型,通过滑动窗口和指数衰减加权求和模拟热量传递的滞后特性。此外,还设计了非对称奖惩的价格激励机制,以及可视化工具展示博弈策略的演化过程。 适合人群:从事电力系统、能源管理和博弈论研究的专业人士,尤其是对MATLAB编程有一定基础的研究人员和技术人员。 使用场景及目标:适用于研究电热综合能源系统的动态定价策略、用户响应行为及其相互作用。目标是探索最优的能量管理策略,提高能源利用效率,降低运营成本。 阅读建议:由于涉及复杂的数学模型和算法实现,建议读者在阅读过程中结合相关理论知识,逐步理解和调试代码,以便更好地掌握模型的工作原理和应用场景。
recommend-type

YOLOv5与海康相机在工业视觉领域的高效图像采集与目标检测解决方案 深度学习

内容概要:本文介绍了YOLOv5与海康相机在工业视觉领域的结合应用。首先阐述了海康相机的图像采集技术及其在多个领域的广泛应用,接着详细解释了YOLOv5作为一种高精度、高效率的深度学习目标检测算法的工作原理。两者结合实现了高效的图像采集和目标检测,YOLOv5通过C++进行推理并封装成DLL文件,便于与其他系统的集成。同时,调用海康SDK确保图像的实时性和稳定性。该系统还支持MFC、Qt、LabVIEW等多种调用方式,为用户提供更多选择和灵活性。最后展望了该技术在未来智能制造、质量检测等领域的广阔应用前景。 适合人群:从事工业视觉、智能制造、图像处理等相关领域的技术人员和研究人员。 使用场景及目标:适用于需要高效图像采集和目标检测的应用场景,如智能制造、质量检测、物流等。目标是提高工业自动化的效率和准确性。 其他说明:随着AI和物联网技术的发展,YOLOv5与海康相机的结合将进一步提升检测精度和处理速度,为工业自动化提供更强有力的支持。
recommend-type

基于MATLAB平台的多目标粒子群算法在冷热电联供综合能源系统优化运行中的应用 v4.0

基于MATLAB平台的多目标粒子群算法(MOPSO)在冷热电联供综合能源系统(CCHP)中的应用。通过构建含冷、热、电负荷的优化调度模型,综合考虑了燃气轮机、电制冷机、锅炉以及风光机组等资源,并与上级电网的购售电交易相结合。目标是实现CCHP系统的经济运行,降低用户购电、购热、冷量的成本,提高CCHP收益。文中还分析了多目标优化策略及其影响因素,如能源价格、负荷变化和电网调度策略。 适合人群:从事能源系统优化研究的技术人员、研究生及相关领域的研究人员。 使用场景及目标:适用于需要优化冷热电联供系统运行的研究和实际项目,旨在通过多目标粒子群算法提升系统的经济性和稳定性。 其他说明:本文不仅提供了详细的MATLAB代码解析,还深入探讨了优化策略的理论依据和实际效果,有助于理解和应用多目标粒子群算法解决复杂的能源系统优化问题。
recommend-type

Pansophica开源项目:智能Web搜索代理的探索

Pansophica开源项目是一个相对较新且具有创新性的智能Web搜索代理,它突破了传统搜索引擎的界限,提供了一种全新的交互方式。首先,我们来探讨“智能Web搜索代理”这一概念。智能Web搜索代理是一个软件程序或服务,它可以根据用户的查询自动执行Web搜索,并尝试根据用户的兴趣、历史搜索记录或其他输入来提供个性化的搜索结果。 Pansophica所代表的不仅仅是搜索结果的展示,它还强调了一个交互式的体验,在动态和交互式虚拟现实中呈现搜索结果。这种呈现方式与现有的搜索体验有着根本的不同。目前的搜索引擎,如Google、Bing和Baidu等,多以静态文本和链接列表的形式展示结果。而Pansophica通过提供一个虚拟现实环境,使得搜索者可以“扭转”视角,进行“飞行”探索,以及“弹网”来浏览不同的内容。这种多维度的交互方式使得信息的浏览变得更加快速和直观,有望改变用户与网络信息互动的方式。 接着,我们关注Pansophica的“开源”属性。所谓开源,指的是软件的源代码可以被公众获取,任何个人或组织都可以自由地使用、学习、修改和分发这些代码。开源软件通常由社区进行开发和维护,这样的模式鼓励了协作创新并减少了重复性劳动,因为全世界的开发者都可以贡献自己的力量。Pansophica项目作为开源软件,意味着其他开发者可以访问和使用其源代码,进一步改进和扩展其功能,甚至可以为Pansophica构建新的应用或服务。 最后,文件名称“Pansophica-src-1.3”表明了我们讨论的特定版本的Pansophica开源代码。数字“1.3”很可能指的是该版本号,表明这是Pansophica项目的第1.3个公开版本。这个版本号对于开发者来说很重要,因为它可以帮助他们追踪更新的进度、比较版本间的差异以及选择最适合其项目的版本。 综合以上信息,Pansophica开源项目不仅在搜索代理领域引入了创新的交互方式,而且通过开源的方式,提供了技术共享和协作的可能性。对于IT行业来说,开源项目是推动技术发展的重要力量,它激发了社区的活力,促进了知识的传播和技术的进步。通过构建一个开源社区,Pansophica可以吸引来自世界各地的开发者和研究人员,共同探索如何将搜索技术与交互式虚拟现实技术结合起来,为用户带来更加丰富和有效的信息检索体验。同时,Pansophica的开源性质也使其更容易适应未来技术的发展,随着社区的不断贡献和反馈,这一平台有望在功能和性能上得到持续的优化和增强。
recommend-type

跨平台内容提取无忧:coze工作流应对社交媒体挑战

# 1. 社交媒体内容提取的必要性与挑战 ## 1.1 内容提取的必要性 在数字时代,社交媒体内容是信息与数据流动的主干道,公司、个人和研究者都在寻求从这些信息中提取价值。无论是为了市场趋势分析、品牌监控、消费者洞察还是公共舆论监测,有效的社交媒体内容提取变得至关重要。这不仅有助于企业做出数据驱动的决策,也能使个人更好地理
recommend-type

vrrp主设备发送的免费arp

<think>好的,我现在需要回答用户关于VRRP主设备发送免费ARP的目的和工作原理的问题。首先,我得回忆一下VRRP的基本概念和工作机制。VRRP,虚拟路由冗余协议,主要用于在局域网中提供默认网关的冗余备份,确保网络的高可用性。主设备(Master)负责转发流量,而备用设备(Backup)则在主设备故障时接管工作。 用户的问题集中在主设备发送免费ARP的目的和机制上。根据引用[2],免费ARP用于通知下游设备虚拟MAC地址的变更。当主设备被选举出来后,它需要让局域网内的其他设备知道虚拟IP对应的MAC地址已经指向自己,这样流量才会被正确路由到主设备。免费ARP的作用应该就是更新这些设备的
recommend-type

为Ghost博客平台打造的Meteor流星包装使用指南

从给定文件信息中,我们可以提炼出以下IT知识点: ### 标题知识点:流星Ghost软件包 1. **流星Ghost软件包的用途**:流星Ghost软件包是专为Ghost博客平台设计的流星(Meteor)应用程序。流星是一个开源的全栈JavaScript平台,用于开发高性能和易于编写的Web应用程序。Ghost是一个开源博客平台,它提供了一个简单且专业的写作环境。 2. **软件包的作用**:流星Ghost软件包允许用户在流星平台上轻松集成Ghost博客。这样做的好处是可以利用流星的实时特性以及易于开发和部署的应用程序框架,同时还能享受到Ghost博客系统的便利和美观。 ### 描述知识点:流星Ghost软件包的使用方法 1. **软件包安装方式**:用户可以通过流星的命令行工具添加名为`mrt:ghost`的软件包。`mrt`是流星的一个命令行工具,用于添加、管理以及配置软件包。 2. **初始化Ghost服务器**:描述中提供了如何在服务器启动时运行Ghost的基本代码示例。这段代码使用了JavaScript的Promise异步操作,`ghost().then(function (ghostServer) {...})`这行代码表示当Ghost服务器初始化完成后,会在Promise的回调函数中提供一个Ghost服务器实例。 3. **配置Ghost博客**:在`then`方法中,首先会获取到Ghost服务器的配置对象`config`,用户可以在此处进行自定义设置,例如修改主题、配置等。 4. **启动Ghost服务器**:在配置完成之后,通过调用`ghostServer.start()`来启动Ghost服务,使其能够处理博客相关的请求。 5. **Web浏览器导航**:一旦流星服务器启动并运行,用户便可以通过Web浏览器访问Ghost博客平台。 ### 标签知识点:JavaScript 1. **JavaScript作为流星Ghost软件包的开发语言**:标签指出流星Ghost软件包是使用JavaScript语言开发的。JavaScript是一种在浏览器端广泛使用的脚本语言,它也是流星平台的基础编程语言。 2. **流星和Ghost共同使用的语言**:JavaScript同样也是Ghost博客平台的开发语言。这表明流星Ghost软件包可以无缝集成,因为底层技术栈相同。 ### 压缩包子文件的文件名称列表知识点:meteor-ghost-master 1. **版本控制和软件包结构**:文件名称`meteor-ghost-master`暗示了该软件包可能托管在像GitHub这样的版本控制系统上。文件名中的`master`通常指的是主分支或主版本。 2. **软件包的目录结构**:通过文件名称可以推断出该软件包可能拥有一个标准的流星软件包结构,包含了初始化、配置、运行等必要的模块和文件。 3. **软件包的维护状态**:由于文件名没有包含特定的版本号,我们无法直接得知软件包的最新更新情况。通常,软件包维护者会将最新的版本代码放在`master`分支上。 ### 总结 流星Ghost软件包提供了一个有效的解决方案,使得流星平台的开发者能够在他们的应用中添加Ghost博客功能。软件包的使用简便,通过流星的命令行工具安装,并通过JavaScript代码配置和启动Ghost服务。通过流星Ghost软件包,开发者能够享受流星的实时特性以及Ghost博客系统的便利性。此外,软件包的命名和结构也暗示了其维护和版本控制的模式,有助于开发者更好地理解如何使用和维护这一软件包。
recommend-type

抖音标题生成自动化:用coze工作流释放创意

# 1. 抖音标题生成自动化的重要性 随着社交媒体平台的崛起,内容的吸引力很大程度上取决于标题的创意与精准性。抖音作为一个日活亿级的短视频平台,高质量的标题能够有效提高视频的点击率,增加内容的传播。但是,人工撰写标题不仅耗时耗力,而且很难做到快速响应热点,自动化标题生成工具应运而生。coze工作流,作为一种实现自动化生成抖音标题的工具,其重要性不言而喻。它能够利用大数据分析和机器学习技术,提高标题的吸引
手机看
程序员都在用的中文IT技术交流社区

程序员都在用的中文IT技术交流社区

专业的中文 IT 技术社区,与千万技术人共成长

专业的中文 IT 技术社区,与千万技术人共成长

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

客服 返回
顶部