# -*- coding: UTF-8 -*- import logging from datetime import datetime import cx_Oracle import numpy as np import requests from pymilvus import connections, Collection, utility, CollectionSchema, FieldSchema, DataType, MilvusException from apscheduler.schedulers.background import BackgroundScheduler import time import re import sys import os from pathlib import Path import json # 获取当前脚本的父目录(即项目根目录) current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(str(current_dir)) # 将项目根目录添加到 sys.path from config.config1 import LOGGING_CONFIG, ORACLE_CONFIG, MODEL_CONFIG, MILVUS_CONFIG # 初始化日志 log_file_path = LOGGING_CONFIG["log_file"] log_file_path = Path(log_file_path) log_file_path.parent.mkdir(exist_ok=True) logging.basicConfig( level=LOGGING_CONFIG["level"], format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(log_file_path), logging.StreamHandler() ] ) logger = logging.getLogger("MaterialSync") class OracleClient: """Oracle数据库客户端""" def __init__(self): self.conn = None self.connect() def connect(self): try: self.conn = cx_Oracle.connect('ecology/[email protected]/oadb', encoding='UTF-8', nencoding='UTF-8') # 指定编码 logger.info("Connected to Oracle database") except Exception as e: logger.error(f"Oracle connection failed: {str(e)}") raise def fetch_all_data(self): """从Oracle数据库中获取所有数据""" try: cursor = self.conn.cursor() query = """ SELECT TO_CHAR(matnr) AS matnr, TO_CHAR(matkl) AS matkl, TO_CHAR(maktx) AS maktx, TO_CHAR(classfication) AS classfication FROM Material_MASTER@ERPLINK WHERE mandt = '688' AND ( matnr like 'DJ%' OR matnr like 'DY%' ) ORDER BY matnr """ cursor.execute(query) columns = [col[0].lower() for col in cursor.description] return [dict(zip(columns, row)) for row in cursor] except Exception as e: logger.error(f"Oracle query failed: {str(e)}") return [] finally: cursor.close() class VectorServiceClient: """HTTP调用模型服务进行向量编码""" def __init__(self): self.service_url = MODEL_CONFIG["model_service_url"] self.timeout = 120 # 请求超时时间(秒) logger.info(f"Using vector service: {self.service_url}") def batch_encode_dense(self, texts): """批量生成密集向量""" return self._call_vector_service(texts, "dense") def batch_encode_sparse(self, texts): """批量生成稀疏向量""" return self._call_vector_service(texts, "sparse") def _call_vector_service(self, texts, vector_type): """调用向量服务通用方法""" try: if not texts: return [] # 准备请求数据 payload = { "texts": texts, "type": vector_type # 添加向量类型参数 } # 配置请求头 headers = { "Content-Type": "application/json; charset=utf-8", "Accept": "application/json" } # 详细记录请求格式 logger.debug(f"Request payload details:") logger.debug(f" Vector type: {vector_type}") logger.debug(f" Text count: {len(texts)}") # 记录前3条文本的详细信息 for i, text in enumerate(texts[:3]): logger.debug(f" Text #{i + 1} (length={len(text)}): {text[:100]}{'...' if len(text) > 100 else ''}") # 记录整个请求体(限制长度) payload_json = json.dumps(payload, ensure_ascii=False) if len(payload_json) > 1000: logger.debug(f" Full request body (truncated): {payload_json[:1000]}...") else: logger.debug(f" Full request body: {payload_json}") # 发送请求到模型服务 response = requests.post( self.service_url, json=payload, headers=headers , timeout=self.timeout ) # 检查响应状态 response.raise_for_status() # 解析响应数据 result = response.json() if "error" in result: logger.error(f"Vector service error ({vector_type}): {result['error']}") raise ValueError(result["error"]) if "vectors" not in result: logger.error(f"Invalid response from {vector_type} service: vectors not found") logger.error(f"Response: {json.dumps(result, ensure_ascii=False)[:500]}") raise ValueError(f"Invalid response from {vector_type} service") logger.info(f"Successfully encoded {len(texts)} texts for {vector_type} vectors") # 对于密集向量,转换为numpy数组 if vector_type == "dense": vectors = np.array(result["vectors"]) # 验证向量维度 expected_dim = MILVUS_CONFIG["vector_dim"] if vectors.shape[1] != expected_dim: logger.error(f"Vector dimension mismatch: expected {expected_dim}, got {vectors.shape[1]}") raise ValueError("Vector dimension mismatch") return vectors else: # 稀疏向量直接返回字典列表 return result["vectors"] except requests.exceptions.RequestException as e: logger.error(f"Request to {vector_type} service failed: {str(e)}") raise except Exception as e: logger.error(f"Encoding via {vector_type} service failed: {str(e)}") raise class MilvusHandler: """Milvus数据库处理器""" def __init__(self): self.collection = None self.vector_service = VectorServiceClient() self.connect() self.prepare_collection() def connect(self): try: connections.connect( host=MILVUS_CONFIG["host"], port=MILVUS_CONFIG["port"] ) logger.info(f"Connected to Milvus: {MILVUS_CONFIG['host']}") except Exception as e: logger.error(f"Milvus connection failed: {str(e)}") raise def prepare_collection(self): """准备集合(自动创建)""" collection_name = MILVUS_CONFIG["collection_name"] if not utility.has_collection(collection_name): fields = [ FieldSchema(name="matnr", dtype=DataType.VARCHAR, is_primary=True, max_length=100), FieldSchema(name="matkl", dtype=DataType.VARCHAR, max_length=50), FieldSchema(name="maktx", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="classfication", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="maktx_vector", dtype=DataType.FLOAT_VECTOR, dim=MILVUS_CONFIG["vector_dim"]), FieldSchema(name="classfication_vector", dtype=DataType.SPARSE_FLOAT_VECTOR) ] schema = CollectionSchema(fields, "Material vector storage") self.collection = Collection(collection_name, schema) # 创建稀疏向量索引 self.collection.create_index( "classfication_vector", {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"} ) # 创建密集向量索引 self.collection.create_index( "maktx_vector", {"index_type": "IVF_FLAT", "metric_type": "IP", "params": {"nlist": 1024}} ) logger.info(f"Created collection with both vector types: {collection_name}") else: self.collection = Collection(collection_name) logger.info(f"Loaded collection schema: {collection_name}") # 确保集合已加载 self.ensure_collection_loaded() def ensure_collection_loaded(self): """确保集合已加载到内存""" try: collection_name = self.collection.name load_state = utility.load_state(collection_name) # 检查集合是否已加载 if load_state != "Loaded": logger.info(f"Collection state is {load_state}, loading now...") self.collection.load() logger.info("Collection loaded successfully") else: logger.info(f"Collection is already loaded (state: {load_state})") except MilvusException as e: logger.error(f"Failed to load collection: {str(e)}") raise except Exception as e: logger.error(f"Error checking collection state: {str(e)}") # 如果无法检查状态,尝试直接加载 try: self.collection.load() logger.info("Collection loaded successfully (using fallback)") except Exception as e2: logger.error(f"Fallback loading failed: {str(e2)}") raise def batch_upsert(self, data, batch_size=500): """分批次插入或更新数据""" total_records = len(data) processed_count = 0 for i in range(0, total_records, batch_size): batch_data = data[i:i + batch_size] # 确保集合已加载 self.ensure_collection_loaded() # 数据清洗 valid_batch_data = [] for item in batch_data: try: cleaned_item = { "matnr": self.clean_utf8(item["matnr"], 'matnr', item['matnr']), "matkl": self.clean_utf8(item["matkl"], 'matkl', item['matnr']), "maktx": self.clean_utf8(item["maktx"], 'maktx', item['matnr']), "classfication": self.clean_utf8(item.get("classfication", ""), 'classfication', item['matnr']) } # 验证UTF-8 if all(self.validate_utf8_string(v) for k, v in cleaned_item.items()): valid_batch_data.append(cleaned_item) else: logger.warning(f"Invalid UTF-8 data skipped: {cleaned_item}") except Exception as e: logger.error(f"Error cleaning item: {str(e)}") if not valid_batch_data: logger.info(f"No valid data in batch {i // batch_size + 1}") continue logger.info(f"Processing batch {i // batch_size + 1} with {len(valid_batch_data)} items") # 查询当前批次中已存在的物料编码 matnr_list = [item['matnr'] for item in valid_batch_data] existing_data = [] try: # 构建安全的查询表达式 safe_matnrs = [f"'{matnr}'" for matnr in matnr_list] expr = f"matnr in [{','.join(safe_matnrs)}]" logger.debug(f"Querying Milvus with expression: {expr}") existing_data = self.collection.query( expr=expr, output_fields=["matnr", "maktx", "classfication", "maktx_vector", "classfication_vector"] ) logger.debug(f"Found {len(existing_data)} existing records") except MilvusException as e: logger.error(f"Milvus query failed: {str(e)}") # 回退方案:逐个查询 logger.warning("Falling back to individual queries") for matnr in matnr_list: try: expr = f"matnr == '{matnr}'" item_data = self.collection.query(expr, output_fields=["matnr", "maktx", "classfication", "maktx_vector", "classfication_vector"]) if item_data: existing_data.extend(item_data) except Exception as e: logger.error(f"Failed to query matnr {matnr}: {str(e)}") existing_dict = {item["matnr"]: item for item in existing_data} # 准备需要重新生成向量的数据 maktx_to_encode = [] # 需要生成密集向量的物料描述 class_to_encode = [] # 需要生成稀疏向量的特征值 maktx_indices = [] # 需要更新密集向量的索引 class_indices = [] # 需要更新稀疏向量的索引 # 准备upsert数据 upsert_data = [] for idx, item in enumerate(valid_batch_data): matnr = item["matnr"] existing = existing_dict.get(matnr, {}) # 检查物料描述是否变化 if matnr in existing_dict: if item["maktx"] == existing.get("maktx", ""): # 物料描述相同,复用现有向量 item["maktx_vector"] = existing.get("maktx_vector") else: # 物料描述变化,需要重新生成 maktx_to_encode.append(item["maktx"]) maktx_indices.append(idx) else: # 新记录,需要生成向量 maktx_to_encode.append(item["maktx"]) maktx_indices.append(idx) # 处理特征值向量 class_value = item["classfication"] # 特征值为空的情况 if not class_value or class_value.isspace(): item["classfication_vector"] = None else: # 特征值不为空 if matnr in existing_dict: if class_value == existing.get("classfication", ""): # 特征值相同,复用现有向量 item["classfication_vector"] = existing.get("classfication_vector") else: # 特征值变化,需要重新生成 class_to_encode.append(class_value) class_indices.append(idx) else: # 新记录,需要生成向量 class_to_encode.append(class_value) class_indices.append(idx) upsert_data.append(item) # 批量生成物料描述向量(密集) if maktx_to_encode: try: logger.info(f"Encoding {len(maktx_to_encode)} dense vectors for maktx...") dense_vectors = self.vector_service.batch_encode_dense(maktx_to_encode) # 将向量分配给对应的记录 for vec_idx, data_idx in enumerate(maktx_indices): upsert_data[data_idx]["maktx_vector"] = dense_vectors[vec_idx] except Exception as e: logger.error(f"Failed to encode dense vectors: {str(e)}") # 跳过这个批次 continue # 批量生成特征值向量(稀疏) if class_to_encode: try: logger.info(f"Encoding {len(class_to_encode)} sparse vectors for classfication...") sparse_vectors = self.vector_service.batch_encode_sparse(class_to_encode) # 将向量分配给对应的记录 for vec_idx, data_idx in enumerate(class_indices): # 确保索引在范围内 if vec_idx < len(sparse_vectors): upsert_data[data_idx]["classfication_vector"] = sparse_vectors[vec_idx] except Exception as e: logger.error(f"Failed to encode sparse vectors: {str(e)}") # 跳过这个批次 continue # 准备Milvus实体数据 entities = [ [item["matnr"] for item in upsert_data], [item["matkl"] for item in upsert_data], [item["maktx"] for item in upsert_data], [item["classfication"] for item in upsert_data], [item.get("maktx_vector", []).tolist() if hasattr(item.get("maktx_vector", None), 'tolist') else [] for item in upsert_data], [self.format_sparse_vector(item.get("classfication_vector")) for item in upsert_data] ] # 执行upsert操作 if upsert_data: try: logger.info(f"Upserting {len(upsert_data)} records to Milvus...") self.collection.upsert(entities) self.collection.flush() # 统计空特征值数量 empty_class_count = sum(1 for item in upsert_data if not item["classfication"] or item["classfication"].isspace()) logger.info(f"Upserted batch {i // batch_size + 1}: " f"{len(upsert_data)} records ({empty_class_count} empty classfication)") processed_count += len(upsert_data) except MilvusException as e: logger.error(f"Milvus upsert failed: {str(e)}") # 记录前3条失败数据 for j in range(min(3, len(upsert_data))): sample = upsert_data[j] logger.error(f"Failed sample {j + 1}: matnr={sample['matnr']}, " f"maktx_len={len(sample['maktx'])}, " f"class_len={len(sample['classfication']) if sample['classfication'] else 0}") return processed_count def format_sparse_vector(self, vec): """格式化稀疏向量为Milvus兼容格式""" if vec is None: return {} # FlagEmbedding 返回的是 {token: weight} 格式 if isinstance(vec, dict): # 转换为 {index: weight} 格式 # 这里我们不需要实际索引,只需确保键是整数 # 使用枚举创建新索引,因为原始token字符串Milvus无法处理 formatted = {} for idx, (token, weight) in enumerate(vec.items()): # 确保权重非负 if float(weight) > 0: formatted[int(idx)] = float(weight) return formatted # 如果传入的是列表或其他格式,转换为字典 try: if isinstance(vec, (list, tuple, np.ndarray)): # 转换为稀疏字典格式,只保留正值 return {i: float(val) for i, val in enumerate(vec) if float(val) > 0} return {} except Exception as e: logger.error(f"Failed to format sparse vector: {str(e)}") return {} @staticmethod def validate_utf8_string(s): try: s.encode('utf-8').decode('utf-8') return True except (UnicodeEncodeError, UnicodeDecodeError): return False @staticmethod def clean_utf8(value, field_name, item_id): """强化 UTF-8 清洗逻辑""" if value is None: return '' try: value_str = str(value) cleaned = re.sub(r'\\u[0-9a-fA-F]{4}', '', value_str) cleaned = cleaned.replace('\xa0', ' ') cleaned = cleaned.encode('utf-8', errors='replace').decode('utf-8') return cleaned except Exception as e: logger.warning(f"Failed to clean UTF-8 for [{field_name}] ({item_id}): {str(e)}") return '' class SyncScheduler: """同步调度器""" def __init__(self): self.oracle = OracleClient() self.milvus = MilvusHandler() def execute_sync(self): """执行同步任务""" logger.info("Starting sync job...") start_time = time.time() try: # 从Oracle获取所有数据 logger.info("Fetching data from Oracle...") all_data = self.oracle.fetch_all_data() if not all_data: logger.info("No data found in Oracle") return logger.info(f"Retrieved {len(all_data)} records from Oracle") # 数据校验和清理 cleaned_data = [] invalid_count = 0 empty_class_count = 0 for item in all_data: try: # 处理可能的键名变化 class_value = item.get('classfication', item.get('classfication', '')) # 数据清洗 cleaned_item = { "matnr": self.clean_utf8(item['matnr'], 'matnr', item['matnr']), "matkl": self.clean_utf8(item['matkl'], 'matkl', item['matnr']), "maktx": self.clean_utf8(item['maktx'], 'maktx', item['matnr']), "classfication": self.clean_utf8(class_value, 'classfication', item['matnr']) } # 统计空特征值 if not cleaned_item["classfication"] or cleaned_item["classfication"].isspace(): empty_class_count += 1 # 验证UTF-8 if all(self.is_valid_utf8(v) for v in cleaned_item.values()): cleaned_data.append(cleaned_item) else: invalid_count += 1 logger.warning(f"Invalid UTF-8 data skipped: matnr={item['matnr']}") except Exception as e: invalid_count += 1 logger.error(f"Error processing item: {item}, error: {str(e)}") if invalid_count > 0: logger.warning(f"Skipped {invalid_count} invalid records") if cleaned_data: processed_count = self.milvus.batch_upsert(cleaned_data) logger.info(f"Successfully processed {processed_count}/{len(cleaned_data)} records") else: logger.warning("No valid data to sync") duration = time.time() - start_time logger.info(f"Sync job completed in {duration:.2f} seconds") except Exception as e: logger.error(f"Sync failed: {str(e)}") duration = time.time() - start_time logger.error(f"Sync job failed after {duration:.2f} seconds") # 尝试重新连接Milvus try: logger.info("Attempting to reconnect to Milvus...") self.milvus = MilvusHandler() logger.info("Milvus reconnected successfully") except Exception as reconnect_error: logger.error(f"Reconnection failed: {str(reconnect_error)}") @staticmethod def clean_utf8(value, field_name, item_id): """强化 UTF-8 清洗逻辑""" if value is None: return '' try: value_str = str(value) cleaned = re.sub(r'\\u[0-9a-fA-F]{4}', '', value_str) cleaned = cleaned.replace('\xa0', ' ') cleaned = cleaned.encode('utf-8', errors='replace').decode('utf-8') return cleaned except Exception as e: logger.warning(f"Failed to clean UTF-8 for [{field_name}] ({item_id}): {str(e)}") return '' @staticmethod def is_valid_utf8(s): try: s.encode('utf-8').decode('utf-8') return True except UnicodeError: return False if __name__ == "__main__": scheduler = BackgroundScheduler() sync = SyncScheduler() # 立即执行一次同步 logger.info("Executing initial sync...") sync.execute_sync() # 每天凌晨2点执行 scheduler.add_job(sync.execute_sync, "cron", hour=10, minute=58) try: logger.info("Scheduler started with HTTP vector services") scheduler.start() # 保持主程序运行 while True: time.sleep(60) except (KeyboardInterrupt, SystemExit): logger.info("Scheduler stopped") scheduler.shutdown() except Exception as e: logger.error(f"Unexpected error: {str(e)}") import traceback logger.error(traceback.format_exc()) 这个代码报错2025-07-05 10:58:16,307 - ERROR - Request to dense service failed: 400 Client Error: Bad Request for url: http://10.162.244.27:8088/encode 2025-07-05 10:58:16,307 - ERROR - Failed to encode dense vectors: 400 Client Error: Bad Request for url: http://10.162.244.27:8088/encode
时间: 2025-07-25 14:31:19 浏览: 10
<think>我们正在处理一个HTTP 400 Bad Request错误,该错误发生在向向量服务接口http://10.162.244.27:8088/encode发送请求时。HTTP 400错误表示客户端发送的请求无效,服务器无法理解或处理。以下我们将逐步分析可能的原因和解决方法。
### 步骤1: 检查请求的URL和端口
首先确认请求的URL和端口是否正确。确保没有拼写错误,且服务确实在指定的地址和端口上运行。可以使用工具如`curl`或`telnet`测试连通性:
```bash
telnet 10.162.244.27 8088
```
如果无法连接,可能是网络问题或服务未运行。
### 步骤2: 验证请求方法
确保使用了正确的HTTP方法(如POST)。根据常见的向量服务接口设计,/encode接口通常使用POST方法。
### 步骤3: 检查请求头
HTTP请求头可能存在问题,特别是`Content-Type`。向量服务通常要求请求体为JSON格式,因此需要设置:
```
Content-Type: application/json
```
同时,检查是否有遗漏必要的头,如`Accept`或`Authorization`(如果服务需要认证)。
### 步骤4: 检查请求体格式
请求体(body)的格式错误是导致400错误的常见原因。确保请求体是有效的JSON,并且结构符合服务的要求。例如,一个典型的向量编码请求体可能如下:
```json
{
"texts": ["这是一个样例文本"],
"batch_size": 32
}
```
请参考向量服务的API文档,确认所需的字段和格式。
### 步骤5: 检查数据内容
- 确保文本数据编码正确(通常为UTF-8)。
- 避免发送空文本或格式错误的文本(如包含非法字符)。
- 如果请求体过大,服务器可能拒绝处理,导致400错误。尝试减小请求的批量大小。
### 步骤6: 使用工具调试
使用`curl`命令测试请求,便于观察请求和响应细节:
```bash
curl -v -X POST http://10.162.244.27:8088/encode \
-H "Content-Type: application/json" \
-d '{"texts": ["测试文本"]}'
```
在`-v`模式下,可以查看完整的请求和响应头,有助于定位问题。
### 步骤7: 查看服务端日志
如果可能,检查向量服务的日志。服务端日志通常会记录更详细的错误原因(如具体哪个字段缺失或格式错误)。
### 步骤8: 确认服务状态
确保向量服务正在运行且健康。如果服务内部出现错误,有时也可能返回400(但更常见的是500错误)。可以通过其他接口(如健康检查接口)确认服务状态。
### 总结
HTTP 400错误通常由客户端请求的格式或内容问题引起。通过逐步检查URL、方法、请求头、请求体以及数据内容,大多数问题可以解决。如果问题依旧,建议联系服务提供者或查看更详细的错误日志。
§§相关问题§§
1. 如何通过日志调试HTTP 400错误?
2. 向量服务常见的API设计规范有哪些?
3. 除了400错误,调用向量服务时还可能遇到哪些HTTP错误码?</think>针对调用向量服务接口 `http://10.162.244.27:8088/encode` 时出现的 HTTP 400 Bad Request 错误,以下是系统化的排查和解决方法:
### 一、错误原因分析
HTTP 400 错误表示**客户端请求存在语法或格式问题**,服务器无法处理。常见原因包括:
1. **请求体格式错误**:JSON结构不正确或字段缺失
2. **数据编码问题**:非UTF-8字符或二进制数据未正确编码
3. **参数类型错误**:数值超出范围(如引用[2]中整型溢出问题)
4. **请求头缺失**:缺少必要的`Content-Type`或`Authorization`
5. **URL路径错误**:接口路径拼写错误
### 二、关键解决步骤
#### 1. 验证请求格式
确保请求体为合法JSON格式:
```json
// 正确示例
{
"texts": ["文本1", "文本2"],
"batch_size": 32
}
```
使用JSON验证工具(如[jsonlint.com](https://jsonlint.com))检查语法。
#### 2. 检查数据范围
验证数值参数是否在有效范围内:
- 若使用整型参数(如`batch_size`),确保值在 $-2^{31}$ 到 $2^{31}-1$ 之间(引用[2])
- 浮点数避免使用`NaN`或`Infinity`
#### 3. 确认请求头
必须包含:
```http
Content-Type: application/json
Authorization: Bearer <token> // 如需认证
```
#### 4. 调试工具验证
使用`curl`测试请求:
```bash
curl -v -X POST http://10.162.244.27:8088/encode \
-H "Content-Type: application/json" \
-d '{"texts":["测试文本"]}'
```
观察响应头中的错误详情。
#### 5. 服务端日志检查
联系服务维护者获取错误日志,通常会包含具体错误字段,如:
```log
ERROR: Invalid parameter 'dim'=10240, max=768
```
### 三、高级排查方案
若基础步骤无效:
1. **缩小请求规模**:发送单条最小数据测试
2. **捕获网络包**:用Wireshark分析原始请求
3. **版本兼容检查**:确认客户端SDK与服务版本匹配
4. **替代方案**:使用Postman构造请求,排除代码逻辑问题
> **重要提示**:若服务使用类似Wav2Lip的模型(引用[3]),需特别注意二进制数据的Base64编码规范。
### 四、预防措施
1. 使用强类型语言(如TypeScript)定义请求接口
2. 添加请求预校验逻辑:
```python
if not isinstance(texts, list) or len(texts) == 0:
raise ValueError("Invalid texts parameter")
```
3. 实现重试机制,处理瞬时网络错误
阅读全文