chains = {"size": RunnablePassthrough(), "city": RunnablePassthrough()} | prompt_template | model | parser result = chains.invoke({"size": 100, "city": "北京市"})

时间: 2025-08-06 08:11:33 浏览: 2
<think>我们正在讨论的是一个与LangChain相关的代码段,但用户提供的引用是关于Python装饰器的示例,似乎与当前问题无关。因此,我们将忽略该引用,专注于用户上一次的问题。用户的问题:理解代码片段中chains字典(包含RunnablePassthrough、提示模板、模型、解析器)以及invoke方法的执行逻辑和各组件的作用。我们假设这是一个LangChain框架的问题,因为LangChain中有这些概念(如RunnablePassthrough,invoke方法等)。在LangChain中,我们通常使用链(Chain)来组合多个组件,以便按顺序处理输入和生成输出。一个链通常由多个可运行(Runnable)的组件组成,例如提示模板、模型和输出解析器。下面是一个典型的LangChain链的构建示例:```pythonfromlangchain_core.runnablesimportRunnablePassthroughfromlangchain_core.promptsimportChatPromptTemplatefromlangchain_core.output_parsersimportStrOutputParserfromlangchain_openaiimportChatOpenAI#假设我们有一个名为“prompt_template”的提示模板prompt_template=ChatPromptTemplate.from_template("根据以下上下文回答一个问题:\n\n上下文:{context}\n\n问题:{question}\n\n答案:")#初始化模型model=ChatOpenAI(model="gpt-3.5-turbo")#输出解析器output_parser=StrOutputParser()#构建一个链chain=({"context":RunnablePassthrough(),"question":RunnablePassthrough()}|prompt_template|model|output_parser)#然后我们可以调用这个链result=chain.invoke({"context":"天空是蓝色的。","question":"天空是什么颜色的?"})print(result)#输出:蓝色```下面我们将详细解释每个组件以及invoke方法的执行逻辑。###1.RunnablePassthrough作用:传递输入数据,它可以将输入数据原封不动地传递到下游,或者从输入中提取特定键来传递。在示例中,我们使用它来分别传递上下文(context)和问题(question)。在字典中,我们为两个键(context和question)都使用了RunnablePassthrough,这意味着在调用链时,我们需要传入一个包含这两个键的字典。RunnablePassthrough会将传入的对应值直接传递出去。###2.提示模板(prompt_template)作用:将输入数据格式化为一个提示(字符串或消息列表)。在示例中,提示模板定义了一个包含上下文和问题的字符串模板。它会接收一个包含"context"和"question"的字典,然后填充模板。###3.模型(model)作用:接收格式化后的提示,调用底层的大语言模型(如OpenAI的ChatGPT模型),并返回模型的输出。在LangChain中,模型通常返回一个`AIMessage`对象。###4.输出解析器(parser)作用:将模型的原始输出解析为更友好的格式。例如,`StrOutputParser`将`AIMessage`转换为字符串。如果我们希望输出为其他格式(如JSON),可以使用相应的输出解析器。###执行逻辑(invoke方法):当我们调用`chain.invoke(input)`时,链会按顺序执行每个组件:步骤1:输入(input)是一个字典,包含`context`和`question`键值对。步骤2:第一个组件是一个Runnable序列(使用|操作符连接),它实际上是一个由多个Runnable组成的链。这里我们用字典指定了两个RunnablePassthrough(分别对应context和question)。这个字典会被LangChain解释为一个组合的Runnable,它输出的也是一个字典,包含相同的两个键,值则是传递过来的原始值。步骤3:输出的字典作为输入传递给提示模板。提示模板使用这个字典中的`context`和`question`来填充模板,生成一个提示。步骤4:提示传递给模型,模型调用API并返回一个`AIMessage`(或类似的)对象。步骤5:输出解析器将`AIMessage`对象转换成字符串(或其他格式)。步骤6:最终结果返回给调用者。因此,整个链条的流程是:输入字典->RunnablePassthrough(传递)->提示模板(格式化)->模型(生成响应)->输出解析器(解析响应)->输出。注意:在LangChain中,链的组件必须按照顺序连接,并且每个组件的输出必须是下一个组件的输入。如果用户有一个具体的代码片段,我们需要根据实际代码来解释。但用户没有提供具体代码,所以我们只能根据典型情况说明。根据用户问题中提到的“chainsdictionary”,可能是指一个包含多个链的字典,每个链由不同的组件构成。例如:```pythonchains={"chain1":chain1,#链1"chain2":chain2#链2}```然后我们可以通过`chains['chain1'].invoke(input)`来调用特定的链。希望以上解释能够帮助用户理解。如果用户有具体的代码片段,请提供以便更详细地分析。</think> 在 LangChain 框架中,`chains` 字典与 `invoke` 方法的执行逻辑涉及多个核心组件的协同工作。以下是对各组件作用及执行流程的详细解析: --- ### **组件作用解析** 1. **`RunnablePassthrough`** - **作用**: 传递输入数据(不修改),用于保留原始输入中的键值对 - **典型用法**: `{"input_key": RunnablePassthrough()}` 确保特定输入直接进入下游 - **场景**: 当需要将原始问题(如 `user_question`)直接传递到提示模板时 2. **`prompt_template`** - **作用**: 将输入数据注入预定义的提示词模板 - **关键方法**: `format()` 动态生成最终提示 - **示例**: ```python template = "回答关于{context}的问题: {question}" prompt = ChatPromptTemplate.from_template(template) ``` 3. **`model`** - **作用**: 语言模型执行推理(如 GPT-4、Llama2) - **输入**: 格式化后的提示文本 - **输出**: 原始模型响应(未解析的字符串/对象) 4. **`parser`** - **作用**: 解析模型输出为结构化数据 - **常见类型**: - `StrOutputParser()`: 字符串输出 - `JsonOutputParser()`: JSON 格式 - `PydanticOutputParser()`: 强类型对象 5. **`chains` 字典结构** ```python chain = { "context": load_context_from_db, # 获取上下文 "question": RunnablePassthrough(), # 传递原始问题 } | prompt_template | model | parser ``` - **`|` 操作符**: 定义组件流水线(LangChain 的 LCEL 语法) --- ### **`invoke` 方法执行逻辑** 调用 `chain.invoke({"question": "用户问题"})` 时: 1. **输入传递** - `RunnablePassthrough` 将 `"question"` 直接传递到下一环节 - 其他键(如 `"context"`)通过独立组件生成 2. **提示模板组合** $$ \text{final_prompt} = \text{template.format(context=ctx, question=q)} $$ 3. **模型推理** $$ \text{raw_response} = \text{model.generate(final_prompt)} $$ 4. **输出解析** $$ \text{result} = \text{parser.parse(raw_response)} $$ 5. **返回结果** - 结构化数据(如字符串/JSON/Pydantic对象) --- ### **组件交互示例** ```python # 定义组件链 chain = ( {"context": load_context, "question": RunnablePassthrough()} | ChatPromptTemplate.from_template("基于{context}回答:{question}") | ChatOpenAI(model="gpt-3.5-turbo") | StrOutputParser() ) # 执行调用 response = chain.invoke({"question": "总结核心要点"}) ``` **流程说明**: 1. `load_context` 获取背景数据 2. `RunnablePassthrough` 传递问题文本 3. 提示模板合并两路输入 4. 语言模型生成回答 5. 解析器转为字符串输出 --- ### **常见问题场景** 1. **如何动态注入额外上下文?** → 在 `chains` 中增加预处理组件 (e.g., 数据库查询) ```python {"context": db_retriever, "question": RunnablePassthrough()} ``` 2. **输出解析失败时如何调试?** → 检查模型输出的 **原始格式** → 调整 `prompt_template` 的指令约束输出格式 3. **何时需要 `RunnablePassthrough`?** → 当输入键名与模板变量名 **直接匹配** 时可用,否则需手动映射 --- [^1]: LangChain 文档 - Chain Composition Concepts
阅读全文

相关推荐

from langchain_ollama.llms import OllamaLLM from langchain.prompts import PromptTemplate from langchain.chains import LLMChain, ConversationChain from langchain.chains.router.multi_prompt_prompt import MULTI_PROMPT_ROUTER_TEMPLATE from langchain.chains.router.llm_router import RouterOutputParser from langchain.chains.router import LLMRouterChain, MultiPromptChain # 专家模型(提示词),链 llm = OllamaLLM(model="qwen2:7b") # 数 math_template = """ 你是一位伟大的数学家,知道所有的数学问题,你超越了高斯和莱布尼茨,我向你提问,问题如下: {input} """ math_prompt = PromptTemplate.format_prompt(math_template) math_chain = LLMChain(llm=llm, prompt=math_prompt,output_key="text") # 理 physics_template = """ 你是一位伟大的物理学家,知道所有的物理问题,你得过诺贝尔物理学奖,我向你提问,问题如下: {input} """ physics_prompt = PromptTemplate.format_prompt(physics_template) physics_chain = LLMChain(llm=llm, prompt=physics_prompt,output_key="text") # 化 chemistry_template = """ 你是一位伟大的化学家,知道所有的化学问题,你得过诺贝尔化学奖,我向你提问,问题如下: {input} """ chemistry_prompt = PromptTemplate.format_prompt(chemistry_template) chemistry_chain = LLMChain(llm=llm, prompt=chemistry_prompt,output_key="text") # 输入问题分析清楚有哪些可能和目的,目的链 destination_chain = {"math":math_chain,"physics":physics_chain,"chemistry":chemistry_chain} # 默认链 default_chain = ConversationChain(llm=llm, output_key="text") default_template = """ math:擅长回答数学问题 physics:擅长回答物理问题 chemistry:擅长回答化学问题 """ # 路由构建 router_template = MULTI_PROMPT_ROUTER_TEMPLATE.format( destinations=default_template ) router_promt = PromptTemplate( template=router_template, input_variables=["input"], output_parser=RouterOutputParser() ) # 路由链 router_chain = LLMRouterChain.from_llm(llm=llm,prompt=router_promt) # 最终选择(链) multi_prompt_chain = MultiPromptChain( router_chain=router_chain, default_chain=default_chain, destination_chains=destination_chain, verbose=False ) res = multi_prompt_chain.invoke({"input":"介绍一下高斯"}) print(res) 上面是源代码 KeyError Traceback (most recent call last) ~\AppData\Local\Temp\ipykernel_16272

inputs: {'query': '你好'} /home/lk/projects/langchain/25-3-26-prompt/predict.py:34: LangChainDeprecationWarning: The method Chain.run was deprecated in langchain 0.1.0 and will be removed in 1.0. Use :meth:~invoke instead. generated_cypher = self.cypher_generation_chain.run( ⚠️ 发生错误: 'CustomGraphCypherQAChain' object has no attribute 'qa_llm' 这个报错 import json from langchain_community.graphs import Neo4jGraph from langchain.chains import GraphCypherQAChain from langchain.prompts import PromptTemplate from langchain_openai import ChatOpenAI from langchain_ollama import ChatOllama # 连接 Neo4j 数据库 graph = Neo4jGraph( url="neo4j+s://bf0befc9.databases.neo4j.io", # 替换为你的 URL username="neo4j", password="Hl7FXJhH1azmgC34YhaweYyigcKwQK3wiRfNVNeWru8", database="neo4j" ) # graph = Neo4jGraph( # url='neo4j+s://a36ad2ff.databases.neo4j.io', # username="neo4j", # password="X_K2K49tuQt9VU0athTxpoNUMTs51h76H1kXUrLgoWs", # database="neo4j" # ) # cypher_llm = ChatOpenAI( # base_url="https://api.chatanywhere.tech/v1", # api_key='sk-hZEb9uq0085t1EFFIm4RoXAvH4ktKshUFgWwDv3BneS0SIZp', # model="gpt-4o-mini" # ) # 生成 Cypher 语句的提示模板 class CustomGraphCypherQAChain(GraphCypherQAChain): def _call(self, inputs): try: # 原始查询逻辑 # schema = Schema() print(f"inputs: {inputs}") # 打印输入内容,调试用 generated_cypher = self.cypher_generation_chain.run( inputs["query"], callbacks=self.callbacks ) # 执行Cypher查询 context = self.graph.query(generated_cypher) # 空结果处理 if not context: # 构造兜底提示 empty_prompt = "当前知识库未找到相关数据,请根据你的通用知识回答:{question}" return self.qa_llm.invoke(empty_prompt.format(question=inputs["query"])) # 正常结果处理 return self.qa_chain.invoke( {"question": inputs["query"], "context": context}, callbacks=self.callbacks, ) except Exception as e: # 异常兜底 error_prompt = f"查询执行失败(错误:{str(e)}),请自行回答问题:{inputs['query']}" return self.qa_llm.invoke(error_prompt) qa_llm = ChatOllama( base_url="http://223.109.239.9:20006", model="deepseek-r1:70b", # system_prompt = cypher_prompt ) cypher_llm = ChatOpenAI( base_url="https://api.chatanywhere.tech/v1", api_key='sk-hZEb9uq0085t1EFFIm4RoXAvH4ktKshUFgWwDv3BneS0SIZp', model="gpt-4o-mini" ) # 构建 Cypher 生成和查询链 cypher_chain = CustomGraphCypherQAChain.from_llm( graph=graph, cypher_llm=qa_llm, qa_llm=qa_llm, validate_cypher=True, verbose=True, allow_dangerous_requests=True ) # 循环对话 def chat(): print("🔹 进入 Neo4j 智能助手,输入 'exit' 退出。") while True: user_query = input("\n💬 你问:") if user_query.lower() == "exit": print("👋 再见!") break try: clean_result = cypher_chain.invoke(user_query) # 只返回结果部分 # response = cypher_chain.invoke(user_query) # # 提取最终的回答,去除多余部分 # result = response.get('result', '没有找到相关答案') # clean_result = result.split('<think>')[0] # 去掉think部分 # clean_result = clean_result.split('Generated Cypher:')[0] # 去掉Cypher部分 print("\n🤖 机器人:", clean_result) # response = cypher_chain.invoke(user_query) # print("\n🤖 机器人:", response) except Exception as e: print("\n⚠️ 发生错误:", str(e)) # 启动对话 if __name__ == "__main__": chat()

from langchain.agents import initialize_agent, AgentType from langchain_community.llms import Ollama from langchain.tools import Tool from langchain.prompts import PromptTemplate from langchain.output_parsers import StructuredOutputParser, ResponseSchema # 定义加法函数 def add_numbers(a, b): return a + b # 创建加法工具 add_tool = Tool( name="AddNumbers", func=add_numbers, description="计算两个数的和" ) # 定义输出解析器 response_schemas = [ ResponseSchema(name="result", description="加法的结果。"), ] output_parser = StructuredOutputParser.from_response_schemas(response_schemas) # 定义Prompt模板 prompt_template = PromptTemplate( template="你是一个加法计算器,可以使用AddNumbers工具进行加法运算。 " "当被要求加两个数字时,请使用AddNumbers工具进行计算。 " "结果应以JSON格式返回,包含键 'result'。 " "问题是:{input}", input_variables=["input"], partial_variables={"format_instructions": output_parser.get_format_instructions()} ) # 初始化LLM llm = Ollama( model="deepseek-r1:1.5b", base_url="http://localhost:11434", temperature=0 ) # 初始化Agent agent = initialize_agent( tools=[add_tool], llm=llm, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_iterations=15, agent_kwargs={"prompt": prompt_template} ) # 测试Agent # result = agent.invoke({"input":"5 加 3 等于几"}) # print(result) result = agent.stream({"input":"5 加 3 等于几"}) for chunk in result: print("aaaaaaaaa") print(chunk, end='', flush=True) print("bbbbbb") 这段代码最后没有流式输出,一个字一个字往出蹦的这种效果,如何修改代码,可以做到这种效果

我正在编辑【python】代码,遇到了 【Traceback (most recent call last): File "D:\python\python3.11.5\Lib\site-packages\langchain\_api\module_import.py", line 69, in import_by_name module = importlib.import_module(new_module) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\python\python3.11.5\Lib\importlib\__init__.py", line 126, in import_module return _bootstrap._gcd_import(name[level:], package, level) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<frozen importlib._bootstrap>", line 1204, in _gcd_import File "<frozen importlib._bootstrap>", line 1176, in _find_and_load File "<frozen importlib._bootstrap>", line 1126, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed File "<frozen importlib._bootstrap>", line 1204, in _gcd_import File "<frozen importlib._bootstrap>", line 1176, in _find_and_load File "<frozen importlib._bootstrap>", line 1140, in _find_and_load_unlocked ModuleNotFoundError: No module named 'langchain_community' The above exception was the direct cause of the following exception: Traceback (most recent call last): File "D:\code\nepu_spider\rag\rag_sys.py", line 11, in <module> from langchain.embeddings import HuggingFaceEmbeddings File "D:\python\python3.11.5\Lib\site-packages\langchain\embeddings\__init__.py", line 167, in __getattr__ return _import_attribute(name) ^^^^^^^^^^^^^^^^^^^^^^^ File "D:\python\python3.11.5\Lib\site-packages\langchain\_api\module_import.py", line 72, in import_by_name raise ModuleNotFoundError( ModuleNotFoundError: Module langchain_community.embeddings not found. Please install langchain-community to access this module. You can install it using pip install -U langchain-community 】 ,请帮我检查并改正错误点。我的原始代码如下: 【import os import json import pickle from langchain_core.language_models import LLM import install_faiss import numpy as np import requests from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain.chains import RetrievalQA from langchain.prompts import PromptTemplate from langchain.llms import OpenAI from langchain.docstore.document import Document from typing import List, Dict, Any, Tuple, Optional # 自定义智谱AI的LangChain包装器 class ZhipuAILLM(LLM): def __init__(self, api_key: str, model: str = "glm-4", temperature: float = 0.7, zhipuai=None): """初始化智谱AI模型""" self.api_key = api_key self.model = model self.temperature = temperature zhipuai.api_key = api_key # 设置API密钥 def _call(self, prompt: str, stop: Optional[List[str]] = None, zhipuai=None) -> str: """调用智谱AI API生成文本""" try: response = zhipuai.model_api.invoke( model=self.model, prompt=[{"role": "user", "content": prompt}], temperature=self.temperature, top_p=0.7, ) if response["code"] == 200: return response["data"]["choices"][0]["content"] else: raise ValueError(f"智谱AI API错误: {response['msg']}") except Exception as e: raise RuntimeError(f"调用智谱AI失败: {str(e)}") @property def _llm_type(self) -> str: return "zhipuai" class RAGSystem: def __init__(self, config: Dict[str, Any]): """初始化RAG系统""" self.config = config self.embedding_model = self._load_embedding_model() self.llm = self._load_llm() self.vector_db = None self.doc_id_map = {} # 自动加载或创建索引 self._initialize_index() def _load_embedding_model(self) -> HuggingFaceEmbeddings: """加载嵌入模型""" model_name = self.config.get("embedding_model", "sentence-transformers/all-mpnet-base-v2") return HuggingFaceEmbeddings( model_name=model_name, model_kwargs={"device": self.config.get("embedding_device", "cpu")} ) def _load_llm(self): """加载大语言模型 - 使用智谱AI""" llm_provider = self.config.get("llm_provider", "zhipuai") if llm_provider == "zhipuai": return ZhipuAILLM( api_key=self.config["zhipuai_api_key"], # 使用智谱AI密钥 model=self.config.get("llm_model", "glm-4"), temperature=self.config.get("temperature", 0.7) ) elif llm_provider == "openai": # 保留OpenAI支持 from langchain.llms import OpenAI return OpenAI( api_key=self.config["openai_api_key"], model_name=self.config.get("llm_model", "gpt-3.5-turbo"), temperature=self.config.get("temperature", 0.7) ) else: raise ValueError(f"不支持的LLM提供者: {llm_provider}") def _initialize_index(self): """初始化索引:加载现有或创建新索引""" index_path = self.config["index_path"] if os.path.exists(index_path): print(f"加载现有索引: {index_path}") self._load_vector_index() else: print(f"创建新索引: {index_path}") self._create_new_index() def _create_new_index(self): """创建新索引""" data_dir = self.config["data_dir"] if not os.path.exists(data_dir): print(f"数据目录不存在: {data_dir}") if self.config.get("auto_download", False): self._download_sample_data() else: raise FileNotFoundError(f"数据目录不存在: {data_dir}") documents = self._load_and_process_documents() self._create_vector_index(documents) def _download_sample_data(self): """下载示例数据""" print("下载示例数据...") data_dir = self.config["data_dir"] os.makedirs(data_dir, exist_ok=True) sample_urls = [ "https://raw.githubusercontent.com/langchain-ai/langchain/master/docs/docs_skeleton.json" ] for url in sample_urls: response = requests.get(url) filename = os.path.basename(url) file_path = os.path.join(data_dir, filename) with open(file_path, "wb") as f: f.write(response.content) print(f"下载完成: {filename}") def _load_and_process_documents(self) -> List[Document]: """加载并处理文档""" documents = [] data_dir = self.config["data_dir"] # 支持多种文件格式 for filename in os.listdir(data_dir): file_path = os.path.join(data_dir, filename) if filename.endswith(".json") or filename.endswith(".jsonl"): documents.extend(self._load_json_documents(file_path)) elif filename.endswith(".txt"): documents.extend(self._load_text_documents(file_path)) if not documents: raise ValueError(f"在 {data_dir} 中没有找到可处理的文档") # 文本分块 return self._split_documents(documents) def _load_json_documents(self, file_path: str) -> List[Document]: """加载JSON或JSONL文档""" documents = [] with open(file_path, "r") as f: if file_path.endswith(".jsonl"): # 处理JSONL文件 for line in f: try: data = json.loads(line) doc = self._create_document_from_data(data) documents.append(doc) except json.JSONDecodeError: print(f"跳过无效的JSON行: {line.strip()}") else: # 处理JSON文件 try: data = json.load(f) if isinstance(data, list): for item in data: doc = self._create_document_from_data(item) documents.append(doc) elif isinstance(data, dict): doc = self._create_document_from_data(data) documents.append(doc) except json.JSONDecodeError: print(f"无效的JSON文件: {file_path}") return documents def _load_text_documents(self, file_path: str) -> List[Document]: """加载纯文本文档""" with open(file_path, "r", encoding="utf-8") as f: content = f.read() return [Document( page_content=content, metadata={ "source": file_path, "title": os.path.basename(file_path), "category": "text" } )] def _create_document_from_data(self, data: Dict) -> Document: """从数据创建文档对象""" return Document( page_content=data.get("content", data.get("text", "")), metadata={ "source": data.get("url", data.get("source", "")), "title": data.get("title", ""), "category": data.get("category", "unknown"), "timestamp": data.get("timestamp", "") } ) def _split_documents(self, documents: List[Document]) -> List[Document]: """分割文档为块""" text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.config.get("chunk_size", 1000), chunk_overlap=self.config.get("chunk_overlap", 200), length_function=len ) return text_splitter.split_documents(documents) def _create_vector_index(self, documents: List[Document]): """创建FAISS向量索引""" # 创建向量数据库 self.vector_db = FAISS.from_documents( documents=documents, embedding=self.embedding_model ) # 保存索引 os.makedirs(os.path.dirname(self.config["index_path"]), exist_ok=True) self.vector_db.save_local(self.config["index_path"]) # 创建文档ID映射 for idx, doc in enumerate(documents): self.doc_id_map[idx] = { "source": doc.metadata["source"], "title": doc.metadata["title"], "category": doc.metadata["category"] } # 保存映射表 map_path = os.path.join(os.path.dirname(self.config["index_path"]), "doc_id_map.pkl") with open(map_path, "wb") as f: pickle.dump(self.doc_id_map, f) print(f"✅ 向量索引已创建并保存至 {self.config['index_path']}") def _load_vector_index(self): """加载现有的FAISS向量索引""" index_path = self.config["index_path"] # 加载向量数据库 self.vector_db = FAISS.load_local( folder_path=index_path, embeddings=self.embedding_model ) # 加载文档映射表 map_path = os.path.join(os.path.dirname(index_path), "doc_id_map.pkl") if os.path.exists(map_path): with open(map_path, "rb") as f: self.doc_id_map = pickle.load(f) print(f"✅ 文档映射表已加载") else: print("⚠️ 文档映射表未找到,将使用空映射") def ask_question(self, question: str, history: Optional[List] = None) -> Dict: """提问并获取答案""" if self.vector_db is None: raise ValueError("向量数据库未初始化") # 创建检索器 retriever = self.vector_db.as_retriever( search_kwargs={ "k": self.config.get("retrieval_top_k", 5), "score_threshold": self.config.get("score_threshold", 0.4) } ) # 创建问答链 qa_chain = self._create_qa_chain(retriever) # 执行问答 result = qa_chain({"query": question}) # 提取源文档 source_docs = result["source_documents"] doc_ids = [doc.metadata.get("doc_id", idx) for idx, doc in enumerate(source_docs)] # 获取完整上下文 full_contexts = [self.doc_id_map.get(did, {"title": "未知", "source": ""}) for did in doc_ids] return { "question": question, "answer": result["result"], "source_documents": source_docs, "full_contexts": full_contexts } def _create_qa_chain(self, retriever) -> Any: """创建问答链""" # 自定义提示模板 prompt_template = """ 基于以下上下文信息,请以专业、准确的方式回答用户的问题。如果上下文信息不足以回答问题,请如实告知用户。 上下文信息: {context} 问题: {question} 请提供详细的回答: """ QA_PROMPT = PromptTemplate( template=prompt_template, input_variables=["context", "question"] ) # 创建问答链 return RetrievalQA.from_chain_type( llm=self.llm, chain_type="stuff", retriever=retriever, return_source_documents=True, chain_type_kwargs={"prompt": QA_PROMPT} ) def add_document(self, content: str, metadata: Dict): """添加单个文档到索引""" if self.vector_db is None: raise ValueError("向量数据库未初始化") # 创建文档对象 doc = Document(page_content=content, metadata=metadata) # 分割文档 split_docs = self._split_documents([doc]) # 添加到索引 self.vector_db.add_documents(split_docs) # 更新文档映射 start_idx = max(self.doc_id_map.keys()) + 1 if self.doc_id_map else 0 for idx, doc in enumerate(split_docs): self.doc_id_map[start_idx + idx] = { "source": doc.metadata["source"], "title": doc.metadata["title"], "category": doc.metadata["category"] } print(f"✅ 添加了 {len(split_docs)} 个文档块") def save_index(self): """保存索引到磁盘""" if self.vector_db is None: raise ValueError("向量数据库未初始化") # 保存索引 self.vector_db.save_local(self.config["index_path"]) # 保存映射表 map_path = os.path.join(os.path.dirname(self.config["index_path"]), "doc_id_map.pkl") with open(map_path, "wb") as f: pickle.dump(self.doc_id_map, f) print(f"✅ 索引已保存至 {self.config['index_path']}") def create_default_config() -> Dict: """创建默认配置 - 使用智谱AI""" return { "data_dir": "data", "index_path": "index/faiss_index", "embedding_model": "sentence-transformers/all-mpnet-base-v2", "embedding_device": "cpu", "llm_provider": "zhipuai", # 默认使用智谱AI "zhipuai_api_key": "1fc6d23e95224503aa94bfcca6a31903.FXoiEbfDgymrE9FA", # 您的智谱AI密钥 "llm_model": "glm-4", # 智谱AI的GLM-4模型 "temperature": 0.7, "chunk_size": 1000, "chunk_overlap": 200, "retrieval_top_k": 5, "score_threshold": 0.4, "auto_download": True } def interactive_cli(): """交互式命令行界面 - 适配智谱AI""" config = create_default_config() # 设置智谱AI API密钥 api_key = input("请输入智谱AI API密钥(或直接回车使用默认值): ") if api_key.strip(): config["zhipuai_api_key"] = api_key # 选择模型 model_choice = input("请选择模型 (1=GLM-4, 2=GLM-3-Turbo, 回车使用GLM-4): ") if model_choice == "2": config["llm_model"] = "glm-3-turbo" # 初始化RAG系统 rag = RAGSystem(config) # 交互问答 print("\nRAG系统已就绪(使用智谱AI),输入问题开始查询(输入'q'退出)") history = [] while True: question = input("\n>>> 问题: ") if question.lower() in ['q', 'quit', 'exit']: break try: result = rag.ask_question(question, history) # 显示结果 print(f"\n💡 答案: {result['answer']}") if result["full_contexts"]: print("\n📚 信息来源:") for i, ctx in enumerate(result["full_contexts"]): print(f" {i + 1}. {ctx['title']} ({ctx['category']})") print(f" 来源: {ctx['source']}") # 添加到历史 history.append({"question": question, "answer": result["answer"]}) except Exception as e: print(f"❌ 错误: {str(e)}") # 保存索引 rag.save_index() print("\n索引已保存,再见!") if __name__ == "__main__": interactive_cli() 】

from io import BytesIO import numpy as np import time from PIL import Image from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support import expected_conditions as EC import random class HeiMa: def __init__(self): self.url='http://mis-toutiao-python.itheima.net/#/' self.driver=webdriver.Chrome() self.driver.maximize_window() def login(self): self.driver.get(self.url) self.driver.find_element_by_class_name('yzm_btn').click() WebDriverWait(self.driver,20).until(EC.element_to_be_clickable((By.CLASS_NAME,'geetest_radar_tip'))) self.driver.find_element_by_class_name('geetest_radar_tip').click() WebDriverWait(self.driver,20).until(EC.presence_of_element_located((By.CLASS_NAME,'geetest_canvas_img'))) self.driver.execute_script('document.querySelectorAll("canvas")[2].style="display:block"') time.sleep(random.uniform(1, 2)) image1 = self.get_image('verify_code1.png') self.driver.execute_script('document.querySelectorAll("canvas")[2].style="display:none"') time.sleep(random.uniform(1,2)) self.driver.execute_script("document.getElementsByClassName('geetest_canvas_slice geetest_absolute')[0].style='display:none;'") time.sleep(random.uniform(1,2)) image2 = self.get_image('verify_code2.png') self.driver.execute_script("document.getElementsByClassName('geetest_canvas_slice geetest_absolute')[0].style='display:block;'") def get_image(self,image_file_name): image = self.driver.find_element(By.XPATH, '//*[@id="app"]/div/form') location=image.location size=image.size x1,y1=location['x'],location['y'] x2,y2=size['width']+location['x'],size['height']+location['y'] print(f'{x1},{y1},{x2},{y2}') screen_shot=self.driver.get_screenshot_as_png() screen_shot=Image.open(BytesIO(screen_shot)) captcha=screen_shot.crop((int(x1),int(y1),int(x2),int(y2))) captcha.save(image_file_name) return captcha if __name__=='__main__': heima=HeiMa() heima.login()为什么截图的位置不对

# mcp_server.py from datetime import datetime from mcp.server.fastmcp import FastMCP import logging import os import asyncio import hashlib import json import threading import time import numpy as np import faiss from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_community.vectorstores import FAISS from langchain_community.llms import OpenAIChat from langchain.chains import RetrievalQA from ollama_embeding import CustomEmbeding from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain_community.document_loaders import ( TextLoader, PyPDFLoader, Docx2txtLoader, UnstructuredPowerPointLoader, UnstructuredExcelLoader, CSVLoader, UnstructuredHTMLLoader, UnstructuredMarkdownLoader, UnstructuredEmailLoader, UnstructuredFileLoader ) # 配置日志记录器 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # 创建 FastMCP 实例 mcp = FastMCP("VectorService") class VectorService: def __init__(self): self.embedding_function = CustomEmbeding('shaw/dmeta-embedding-zh') self.docstore = InMemoryDocstore() self.index = faiss.IndexFlatL2(768) self.vector_store = None self.existing_index_path = "E:/llm_rag/faiss_index/index.faiss" self.existing_index_pkl_path = "E:/llm_rag/faiss_index/index.pkl" self.is_processing = False self.last_processed_count = 0 self.is_initialized = False # 添加初始化完成标志 self.load_or_init_vector_store() # 初始化向量存储 self.is_initialized = True # 初始化完成 def load_or_init_vector_store(self): if self.vector_store is not None: return self.vector_store # 已初始化 if os.path.exists(self.existing_index_path) and os.path.exists(self.existing_index_pkl_path): vector_store = FAISS.load_local( "E:/llm_rag/faiss_index", embeddings=self.embedding_function, allow_dangerous_deserialization=True ) logger.info("Loaded existing vector store.") self.vector_store = vector_store return vector_store else: vector_store = FAISS( embedding_function=self.embedding_function, index=self.index, docstore=self.docstore, index_to_docstore_id={} ) logger.info("Initialized new vector store.") self.vector_store = vector_store return vector_store def get_id(self, file_path): """Generate file id""" return hashlib.md5(file_path.encode()).hexdigest() def load_document(self, file_path: str): file_ext = file_path.split('.')[-1].lower() logger.info(f"Loading document from {file_path}") loader_map = { 'txt': TextLoader, 'pdf': PyPDFLoader, 'docx': Docx2txtLoader, 'pptx': UnstructuredPowerPointLoader, 'xlsx': UnstructuredExcelLoader, 'csv': CSVLoader, 'html': UnstructuredHTMLLoader, 'htm': UnstructuredHTMLLoader, 'md': UnstructuredMarkdownLoader, 'eml': UnstructuredEmailLoader, 'msg': UnstructuredEmailLoader } if file_ext not in loader_map: logger.warning(f"Unsupported file type: {file_ext}") return None loader_class = loader_map.get(file_ext, UnstructuredFileLoader) loader = loader_class(file_path) try: documents = loader.load() logger.info(f"Loaded {len(documents)} documents from {file_path}") return documents except Exception as e: logger.error(f"Error loading {file_path}: {str(e)}") return None def _add_vector_metadata(self, file_name, file_name_path): """ 添加文件元数据 :return: """ docs = [] metadatas = [] try: file_stats = os.stat(file_name_path) file_size = file_stats.st_size res = self.load_document(file_name_path) if res: # 生成文件唯一标识(使用文件路径的哈希值) id = self.get_id(file_name_path) for doc in res: # 合并用户提供的元数据和文档自身的元数据 doc_metadata = doc.metadata.copy() doc_metadata.update({ "source": file_name, "file_path": file_name_path, "id": id, "upload_time": datetime.now().isoformat() }) # docs.append(doc.page_content.strip()) # 将文件名融入内容(提高文件名的权重) enhanced_content = f"文件名: {file_name}\n内容: {doc.page_content.strip()}" docs.append(enhanced_content) metadatas.append(doc_metadata) logger.info(f"Processed {file_name} ({file_size / (1024 * 1024.0):.2f} MB)") except Exception as e: logger.error(f"Error processing {file_name_path}: {str(e)}") return docs, metadatas def process_documents(self, data_path: str): """把所有文件进行批量向量化,添加文件唯一标识""" try: self.is_processing = True all_docs = [] all_metadatas = [] for root, dirs, files in os.walk(data_path): for file_name in files: file_name_path = os.path.join(root, file_name) logger.info(f"Processing file: {file_name_path}") # 调用 _add_vector_metadata 处理文件 docs, metadatas = self._add_vector_metadata( file_name=file_name, file_name_path=file_name_path ) # 累积结果 all_docs.extend(docs) all_metadatas.extend(metadatas) # 保存所有文件的向量数据 self._save_data_vector(docs=all_docs, metadatas=all_metadatas) self.last_processed_count = len(all_docs) self.is_processing = False return { "status": "success", "message": "Documents processed successfully", "document_count": len(all_docs) } except Exception as e: logger.error(f"Error processing documents: {str(e)}") self.is_processing = False return {"status": "error", "message": str(e)} def _save_data_vector(self, docs, metadatas): """Save the data vector to faiss""" self.vector_store = self.load_or_init_vector_store() docs = [doc for doc in docs if doc] try: logger.info("Starting embedding process...") self.vector_store.add_texts(texts=docs, metadatas=metadatas) logger.info("Embedding process completed.") except Exception as e: logger.error(f"An error occurred during embedding: {str(e)}") try: logger.info("Saving updated vector store...") self.vector_store.save_local("E:/llm_rag/faiss_index") logger.info("Updated vector store saved to E:/llm_rag/faiss_index.") except Exception as e: logger.error(f"An error occurred during saving: {str(e)}") return docs def check_process_status(self): """检查处理状态""" if self.is_processing: return { "status": "processing", "message": "Documents are being processed" } else: if os.path.exists(self.existing_index_path) and os.path.exists(self.existing_index_pkl_path): if self.last_processed_count > 0: return { "status": "success", "message": "Vector data has been updated", "last_processed_count": self.last_processed_count } else: return { "status": "ready", "message": "Vector store exists but no new data processed" } else: return { "status": "empty", "message": "No vector store exists" } def add_vector(self, new_file_name_path: str, new_file_name: str): """添加单个文件的向量""" try: self.is_processing = True docs, metadatas = self._add_vector_metadata( file_name=new_file_name, file_name_path=new_file_name_path ) self._save_data_vector(docs=docs, metadatas=metadatas) self.last_processed_count = len(docs) self.is_processing = False return { "status": "success", "message": "Vector added successfully" } except Exception as e: logger.error(f"Error adding vector: {str(e)}") self.is_processing = False return { "status": "error", "message": str(e) } vector_service = VectorService() @mcp.tool() def process_documents(data_path: str): """处理指定路径下的所有文档并生成向量存储""" logger.info(f"Starting to process documents in {data_path}") return vector_service.process_documents(data_path) @mcp.tool() def check_process_status(): """检查处理状态""" logger.info("Checking process status") return vector_service.check_process_status() @mcp.tool() def add_vector(new_file_name_path: str, new_file_name: str): """添加单个文件的向量""" logger.info(f"Adding vector for file: {new_file_name_path}") return vector_service.add_vector(new_file_name_path, new_file_name) @mcp.tool(name="searchfile", description=f"根据关键词搜索文件并返回匹配的内容") def search_answer(query: str): """ 获取检索相关的文件 :param query: 用户问题 :return: 返回检索到的文档 """ if not vector_service.is_initialized: logger.info("Server is not initialized yet. Please wait.") return {"status": "error", "message": "Server is not initialized yet. Please wait."} logger.info(f"Searching for relevant documents: {query}") try: retriever = FAISS.load_local( "E:/llm_rag/faiss_index", CustomEmbeding('shaw/dmeta-embedding-zh'), allow_dangerous_deserialization=True ).as_retriever(search_kwargs={"k": 10}) docs = retriever.get_relevant_documents(query) logger.info(f"找到 {len(docs)} 个相关文档块") logger.info(f"docs:{docs}") # return docs results = [] for doc in docs: metadata = doc.metadata file_path = metadata.get("file_path", "") # 安全检查:确保文件在允许的目录内 allowed_dir = "E:\\llm_rag\\data\\" if file_path and file_path.startswith(allowed_dir): # 生成相对路径并构建下载URL download_url = os.path.relpath(file_path, allowed_dir) results.append({ "content": doc.page_content, # 文档内容 "download_url": download_url # 下载链接 }) return results except Exception as e: logger.error(f"搜索出错: {str(e)}") return {"status": "error", "message": str(e)} if __name__ == "__main__": mcp.settings.port = 8880 logger.info("Starting mcp server through MCP") mcp.run(transport="sse") # 使用标准输入输出通信 报了这个 + Exception Group Traceback (most recent call last): | File "E:\llm_rag\.venv\lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 403, in run_asgi | result = await app( # type: ignore[func-returns-value] | File "E:\llm_rag\.venv\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 60, in __call__ | return await self.app(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\applications.py", line 112, in __call__ | await self.middleware_stack(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\middleware\errors.py", line 187, in __call__ | raise exc | File "E:\llm_rag\.venv\lib\site-packages\starlette\middleware\errors.py", line 165, in __call__ | await self.app(scope, receive, _send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\middleware\exceptions.py", line 62, in __call__ | await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\_exception_handler.py", line 53, in wrapped_app | raise exc | File "E:\llm_rag\.venv\lib\site-packages\starlette\_exception_handler.py", line 42, in wrapped_app | await app(scope, receive, sender) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 714, in __call__ | await self.middleware_stack(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 734, in app | await route.handle(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 288, in handle | await self.app(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 76, in app | await wrap_app_handling_exceptions(app, request)(scope, receive, send) | File "E:\llm_rag\.venv\lib\site-packages\starlette\_exception_handler.py", line 53, in wrapped_app | raise exc | File "E:\llm_rag\.venv\lib\site-packages\starlette\_exception_handler.py", line 42, in wrapped_app | await app(scope, receive, sender) | File "E:\llm_rag\.venv\lib\site-packages\starlette\routing.py", line 73, in app | response = await f(request) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\fastmcp\server.py", line 747, in sse_endpoint | return await handle_sse(request.scope, request.receive, request._send) # type: ignore[reportPrivateUsage] | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\fastmcp\server.py", line 680, in handle_sse | async with sse.connect_sse( | File "C:\Users\raywe\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 217, in __aexit__ | await self.gen.athrow(typ, value, traceback) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\sse.py", line 146, in connect_sse | async with anyio.create_task_group() as tg: | File "E:\llm_rag\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 772, in __aexit__ | raise BaseExceptionGroup( | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Exception Group Traceback (most recent call last): | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\sse.py", line 165, in connect_sse | yield (read_stream, write_stream) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\fastmcp\server.py", line 685, in handle_sse | await self._mcp_server.run( | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\lowlevel\server.py", line 500, in run | async with AsyncExitStack() as stack: | File "C:\Users\raywe\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 714, in __aexit__ | raise exc_details[1] | File "C:\Users\raywe\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 217, in __aexit__ | await self.gen.athrow(typ, value, traceback) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\lowlevel\server.py", line 125, in lifespan | yield {} | File "C:\Users\raywe\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 697, in __aexit__ | cb_suppress = await cb(*exc_details) | File "E:\llm_rag\.venv\lib\site-packages\mcp\shared\session.py", line 223, in __aexit__ | return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) | File "E:\llm_rag\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 772, in __aexit__ | raise BaseExceptionGroup( | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Traceback (most recent call last): | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\session.py", line 147, in _receive_loop | await super()._receive_loop() | File "E:\llm_rag\.venv\lib\site-packages\mcp\shared\session.py", line 374, in _receive_loop | await self._received_request(responder) | File "E:\llm_rag\.venv\lib\site-packages\mcp\server\session.py", line 175, in _received_request | raise RuntimeError( | RuntimeError: Received request before initialization was complete 如何解决

from seleniumwire import webdriver import unittest from jiaoben.login.action.page_login import LoginPage from selenium.webdriver.chrome.options import Options class Login(unittest.TestCase): def setUp(self): options = Options() options.add_experimental_option("detach", True) # 关键配置 self.driver = webdriver.Chrome(options=options) self.driver.get("https://10.121.177.160") def test_login(self): print(f"开始登录") LoginPage(self.driver).input_key(user='admin', passwd='admin') # assert self.driver.title in "Chaoqiang G640 V7 10.121.176.137" print("登录成功") from selenium.webdriver.common.by import By from jiaoben.Method_Library.base_action import BaseAction class LoginPage(BaseAction): user = (By.ID,"userid") passwd = (By.ID,"password") title_list = (By.XPATH, '//*[@id="btn-login"]') def input_key(self, user, passwd): self.input(self.user, user) self.input(self.passwd, passwd) self.find_element(self.title_list).click() import time from pyexpat import features from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.common.alert import Alert class BaseAction: def __init__(self, driver): # 将传入的 driver 实例赋值给类的实例变量,供后续方法使用 self.driver = driver cookies = self.driver.get_cookies() #将cookies添加到浏览器当中,保持登录状态 for i in cookies: self.driver.add_cookie(i) #刷新页面,使添加的cookies生效 self.driver.refresh() def find_element(self, feature, timeout=10, poll=1): return WebDriverWait(self.driver, timeout, poll).until(lambda x: x.find_element(*feature)) def click(self, feature, timeout=10, poll=1): self.find_element(feature, timeout, poll).click() def input(self, feature, text, timeout=10, poll=1): self.find_element(feature, timeout, poll).send_keys(text) def clear_and_input(self, feature, text, timeout=10, poll=1): self.clear(feature, timeout, poll) self.find_element(feature, timeout, poll).send_keys(text) def clear(self, feature, timeout=10, poll=1): self.find_element(feature, timeout, poll).clear() def get_text(self, feature, timeout=10, poll=1): return self.find_element(feature, timeout, poll).text def get(self, feature, timeout=10, poll=1): return self.find_element(feature, timeout, poll) def switch_new_page(self): self.driver.switch_to.window(self.driver.window_handles[-1]) def switch_old_page(self): self.driver.switch_to.window(self.driver.window_handles[0]) def find_elements(self, feature, timeout=10, poll=1): return WebDriverWait(self.driver, timeout, poll).until(lambda x: x.find_elements(*feature)) def get_list(self, feature, timeout=10, poll=1): return self.find_elements(feature, timeout, poll) def move(self, feature, timeout=10, poll=1): webElement = self.get(feature, timeout, poll) self.driver.execute_script("arguments[0].scrollIntoView();",webElement) def move_to(self, feature, timeout=10, poll=1): webElement = self.find_element(feature, timeout, poll) ActionChains(self.driver).move_to_element(webElement).perform() def toset(self): alert = Alert(self.driver) alert.accept() # 点击确定/OK按钮 time.sleep(10) # 通用属性获取 def get_value(self, feature, attribute_name, timeout=10, poll=1): return self.find_element(feature, timeout, poll).get_attribute(attribute_name) 为什么会使得登录界面执行成功后一直停留在该页面

import pytensor pytensor.config.blas__ldflags = '-llapack -lblas -lcblas' import pymc as pm import numpy as np import arviz as az import matplotlib.pyplot as plt # 全局字体设置 plt.rcParams['font.family'] = 'Arial' plt.rcParams['font.size'] = 18 # 数据定义 N1 = 157 # 兼捕数量 N0 = 406 # 未捕获数量 if __name__ == '__main__': with pm.Model() as model: # 参数先验 theta = pm.Beta("theta", alpha=54.81, beta=200) phi = pm.Beta("phi", alpha=65.64, beta=93.64) # 确定性关系 y = pm.Deterministic("y", N1 * (1 - theta) + N0 * phi) # 采样配置 prior_trace = pm.sample( 20000, return_inferencedata=False, chains=1, tune=1000 ) # 计算统计量 y_samples = prior_trace["y"] y_mean = np.mean(y_samples) y_ci_low, y_ci_high = np.percentile(y_samples, [5, 95]) # 创建图形 plt.figure(figsize=(12, 8)) n, bins, patches = plt.hist( y_samples, bins=30, density=True, alpha=0.6, color="#3069B1", edgecolor="black" ) # 添加均值线 mean_line = plt.axvline( y_mean, color='#D96D71', linestyle='--', linewidth=2.5, label=f'Mean: {y_mean:.1f}' ) # 添加95%置信区间填充区域 ci_fill = plt.fill_between( [y_ci_low, y_ci_high], [0, 0], [n.max() * 1.05, n.max() * 1.05], # 略高于直方图 color='gray', alpha=0.3, label=f'95% CI: [{y_ci_low:.1f}, {y_ci_high:.1f}]' # 在标签中显示具体数值 ) # 在置信区间边界添加垂直线 ci_low_line = plt.axvline( y_ci_low, color='gray', linestyle=':', linewidth=2 ) ci_high_line = plt.axvline( y_ci_high, color='gray', linestyle=':', linewidth=2 ) # 在置信区间边界添加数值标注 plt.text( y_ci_low - 5, # x位置,稍向左偏移 n.max() * 0.05, # y位置,靠近底部 f'{y_ci_low:.1f}', fontsize=16, fontweight='bold', color='gray', ha='right' ) plt.text( y_ci_high + 5, # x位置,稍向右偏移 n.max() * 0.05, # y位置,靠近底部 f'{y_ci_high:.1f}', fontsize=16, fontweight='bold', color='gray', ha='left' ) # 添加图例和标签 plt.legend( loc='upper right', prop={'family': 'Arial', 'size': 16}, frameon=True, framealpha=0.9, edgecolor='black' ) plt.xlabel("Number of bycatch reports", labelpad=15) plt.ylabel("Density", labelpad=15) # 设置坐标轴范围 plt.ylim(0, n.max() * 1.15) # 为标签留出空间 plt.xlim(min(bins), max(bins)) # 确保显示所有数据 # 刻度标签设置 plt.xticks(fontsize=16) plt.yticks(fontsize=16) # 增强坐标轴 plt.gca().spines['top'].set_visible(False) plt.gca().spines['right'].set_visible(False) plt.gca().spines['left'].set_linewidth(1.5) plt.gca().spines['bottom'].set_linewidth(1.5) # 保存和显示 plt.tight_layout() plt.savefig('NUTS1.tiff', dpi=600, bbox_inches='tight', edgecolor='none') plt.show() 图例标注即可

import time import pandas as pd import openpyxl from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.select import Select from selenium.webdriver import ActionChains, Keys from selenium.webdriver.chrome.options import Options chrome_options = Options() chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option('useAutomationExtension', False) chrome_options.add_argument("--disable-blink-features=AutomationControlled") service = Service("D:/Python313/chromedriver.exe") driver = webdriver.Chrome(service=service, options=chrome_options) driver.maximize_window() file_path = 'C:/Users/一羽风/Desktop/机器人python/1.小组件测试文件/Best_Match.xlsx' wb = openpyxl.load_workbook(file_path) sheet1 = wb.worksheets[0] list_text = [] account_list = [] for row in sheet1.iter_rows(min_row=1,values_only=True): account_list.append((row[0])) driver.get("https://www.bilibili.com/") search_box = driver.find_element(By.XPATH, '//*[@id="nav-searchform"]/div[1]/input') for account in account_list[:2]: search_box.send_keys(Keys.CONTROL + 'a') search_box.send_keys(Keys.DELETE) search_box.send_keys(account) search_box.send_keys(Keys.RETURN) time.sleep(2) original_window = driver.current_window_handle new_window = [w for w in driver.window_handles if w != original_window][0] driver.switch_to.window(new_window) for page in range(3): next_page = driver.find_element(By.CLASS_NAME, 'vui_button vui_pagenation--btn') elements = driver.find_elements(By.CLASS_NAME,'bili-video-card__info') for element in elements[:5]: value2 = element.get_attribute("innerText") # print(value2) time.sleep(5) list_text.append(value2) print(list_text) next_page.click() time.sleep(8) driver.close() driver.switch_to.window(original_window) b站搜索内容之后,想点击下一页获取数据,但总是点击不了下一页按钮,总是报错,出错代码为:next_page = driver.find_element(By.CLASS_NAME, 'vui_button vui_pagenation--btn')

能不能帮我优化这段代码,让streamlit界面更加美观好看合理,有一问一答,更像个只能聊天系统 import streamlit as st from langchain_community.graphs import Neo4jGraph from langchain.chains import GraphCypherQAChain from langchain.prompts import PromptTemplate from langchain_openai import ChatOpenAI from langchain_ollama import ChatOllama # 连接 Neo4j 数据库 graph = Neo4jGraph( url="neo4j+s://bf0befc9.databases.neo4j.io", # 替换为你的 URL username="neo4j", password="Hl7FXJhH1azmgC34YhaweYyigcKwQK3wiRfNVNeWru8", database="neo4j" ) # DeepSeek 负责生成 Cypher qa_llm = ChatOllama( base_url="http://223.109.239.9:20006", model="deepseek-r1:70b", # system_prompt=cypher_prompt ) cypher_llm = ChatOpenAI( base_url="https://api.chatanywhere.tech/v1", api_key='sk-hZEb9uq0085t1EFFIm4RoXAvH4ktKshUFgWwDv3BneS0SIZp', model="gpt-4o-mini" ) # 构建 Cypher 生成和查询链 cypher_chain = GraphCypherQAChain.from_llm( graph=graph, cypher_llm=cypher_llm, qa_llm=qa_llm, validate_cypher=True, verbose=True, allow_dangerous_requests=True ) # Streamlit 页面 st.title("Meta 智能聊天助手") # 用户输入查询 user_query = st.text_input("💬 请输入你的问题:") # 提交按钮 if st.button("提交查询"): if user_query: try: # response = cypher_chain.invoke(user_query) # 提取最终的回答,去除多余部分 response = cypher_chain.invoke(user_query) print("\n🤖 机器人:", response) st.write("🤖 机器人:", response) except Exception as e: st.write("⚠️ 发生错误:", str(e)) else: st.write("请输入有效的问题。")

大家在看

recommend-type

icoFormat-photoshop插件

将该文件解压,将.8bi格式文件放到ps安装目录Plug-ins\File Formats下重启ps,在保存文件时即可选择icon格式。 如果找不到,可以放在ps安装目录下的“增效工具”->"文件格式"中。 如果还找不到,那么就在ps安装目录下搜索.8bi格式的文件,然后打开该文件所在文件夹,然后将iconFormat.8bi放入该文件夹。 最后要重新打开ps,才可以保存icon文件。
recommend-type

蒙特卡罗剂量模拟和可视化工具包:一组旨在帮助临床医生和研究人员使用 GEANT4 或 TOPAS 的 Matlab 函数-matlab开发

这里有 3 组代码,旨在帮助临床医生和研究人员将 GEANT4 或 TOPAS (MC) 与 3D Slicer 结合使用进行剂量可视化和比较 第一段代码“STLfromDicomRN.m”采用 Varian Eclipse 生成的双散射质子计划的 Dicom 计划文件,并以“.STL”格式生成计划中的Kong径和补偿器模型。 此文件使用 zip 文件中包含的“stlwrite”和“surf2solid”函数。 这些文件可以导入到 MC 模拟几何中。 第二个是一组用于处理Dicom剂量文件和分析剂量的代码。 “NormalizeDicomDose.m”代码将 MC 剂量标准化为 Eclipse 剂量等中心处的剂量,并包含有关如何标准化为其他点或体积的说明。 “ProfilePlot.m”代码只是生成比较两点之间两个剂量文件的剂量的剂量曲线。 包含的是一个 matlab gui,它在您
recommend-type

ztecfg中兴配置加解密工具3.0版本.rar

中兴光猫配置文件加解密工具3.0 .\ztecfg.exe -d AESCBC -i .\(要解密的文件名)db_user_cfg.xml -o (解密后文件名)123.cfg
recommend-type

xilinx.com_user_IIC_AXI_1.0.zip

可以直接用在vivado 2017.4版本里。查看各个寄存器就知道用来干什么了,一号寄存器分频系数,二号的start、stop信号,三号寄存器8bit数据,四号寄存器只读,返回IIC状态和ACK信号,其中二号的一个bit可以用来不等待从机ACK,方便使用。
recommend-type

基于单片机的流量控制系统设计本科学位论文.doc

基于单片机的流量控制系统设计本科学位论文.doc

最新推荐

recommend-type

项目材料、分包、项目经理比选管理流程.docx

项目材料、分包、项目经理比选管理流程.docx
recommend-type

该项目是基于JavaWeb开发的网上商城购物系统,主要实现了购物车、结算和订单管理的功能,适合Java Web初学者学习使用 (该项目还处于完善阶段,今后会不定时进行更新)

资源下载链接为: https://pan.quark.cn/s/a3bd6adf8f4f 该项目是基于JavaWeb开发的网上商城购物系统,主要实现了购物车、结算和订单管理的功能,适合Java Web初学者学习使用。(该项目还处于完善阶段,今后会不定时进行更新)(最新、最全版本!打开链接下载即可用!)
recommend-type

实验室的极化码编码译码仿真程序,在BSC、BEC、AWGN信道条件下都有仿真

实验室的极化码编码译码仿真程序,在BSC、BEC、AWGN信道条件下都有。使用密度进化法和巴氏参数估计信道,仿真性能非常好。 在Matlab下应用非常方便,支持多组仿真。配有应用说明,非常好用。希望能给大家带来帮助。
recommend-type

解决无法获取网络图片问题,提供PNG素材下载

根据提供的文件信息,我们可以确定知识点主要集中在网络图片获取、素材下载以及特定格式PNG图片的使用和命名规则上。 首先,我们来探讨“无法获取网络图片”这一问题。在互联网环境中,获取网络图片的过程通常涉及几个关键技术点:HTTP/HTTPS协议、网络请求处理、图片资源的定位与下载、以及浏览器或者应用程序对图片的缓存和处理。在这一过程中可能会遇到的问题有网络连接问题、目标服务器配置错误、资源访问权限受限、图片资源不存在或已被移除、跨域访问限制(CORS)、以及客户端代码错误等。 对于“素材下载 PNG素材 网页素材”,我们需要了解PNG图片的特性以及素材下载的相关技术。PNG(Portable Network Graphics)是一种无损数据压缩的位图图形格式,它支持索引、灰度、RGB三种颜色模式以及alpha通道透明度。PNG格式广泛用于网络图片下载,因为它提供了优秀的压缩性能且没有版权限制。在网页设计中,PNG图片因其高保真的特性,可以作为网页背景、图标和按钮的素材。素材下载通常是设计师或者开发人员通过搜索引擎、专门的素材网站或者内容分发网络(CDN)来获取所需的图片、音频、视频等资源。 紧接着,“无法获取网络图片”这一标签指向了一个普遍的技术问题,即客户端在尝试从互联网上下载图片资源时遭遇的失败。这可能发生在使用Web浏览器、桌面应用程序、移动应用或者任何其它形式的客户端软件上。问题的原因可能包括客户端网络设置问题、防火墙限制、代理服务器配置、DNS解析错误等。 最后,观察压缩包子文件的文件名称列表,我们可以看到三个不同的文件名:“无法获取网络@3x.png”、“无法获取网络@2x.png”以及“无法获取网络.png”。这些名称暗示了这些图片可能被用作网页开发中的Retina显示技术,其中“@2x”和“@3x”分别指代在普通分辨率显示器和高分辨率显示器(比如Retina显示屏)上使用的图片资源。数字“2”和“3”通常表示图片的DPI(每英寸点数)倍数,意味着在屏幕上以更高的像素密度显示,以达到更好的视觉效果。 综合上述分析,可以总结以下知识点: 1. 网络图片获取的过程及其可能遇到的问题和技术要点。 2. PNG图片格式的技术特性和其在网页素材中的应用。 3. 网页素材下载的过程、途径和可能出现的问题。 4. Retina显示技术及其在图片资源命名中的应用。 5. 通过文件名称识别图片用途和显示需求。 以上就是从给定文件信息中提取的知识点,希望对解决网络图片获取、素材下载和素材管理等问题提供帮助。
recommend-type

【西门子PLC固件升级全攻略】:20分钟快速掌握固件更新流程

# 摘要 随着工业自动化的发展,可编程逻辑控制器(PLC)作为核心部件,其固件升级成为了提高系统性能、确保安全性和引入新功能的重要手段。本文详细介绍了PLC固件升级的整个过程,从准备工作、理论知识、实战操作到升级后的验证与维护,全面阐述了固件升级的关键步骤和注意事项。通过分析固件升级对系统性能的影响、检查硬件兼容性、准备合适的更新环境,本文旨在为工程师提供一套系统的固件升级指
recommend-type

Transformer做短期时序销量预测可行吗

<think>我们正在讨论Transformer模型在短期时间序列销量预测中的应用及效果评估。根据用户的问题,我们需要分析Transformer模型在短期销量预测中的可行性和效果。参考引用中提到了ARIMA模型和迭代方法(如DeepAR、DSSM、ConvTrans等),这些可以作为对比的基准。首先,Transformer模型最初是为自然语言处理设计的,但其自注意力机制能够捕捉序列中的长期依赖关系,因此也被应用于时间序列预测。在短期预测中,虽然传统方法(如ARIMA)可能因为简单而高效,但Transformer在处理非线性关系和多个相关时间序列方面可能更有优势。效果评估方面,我们可以参考引用[
recommend-type

华为SVN连接失败解决方案及SVNDrv驱动更新指南

标题中提到的是解决华为SVN连接不上问题的SVNDrv驱动文件压缩包,这里面涉及的知识点主要包括华为的SVN工具SecoClient、网络适配器配置、以及驱动文件的操作。下面将详细解释这些知识点: 1. SVN工具SecoClient: SecoClient是华为开发的一个客户端软件,用于连接和管理SVN服务器,SVN(Subversion)是一个开源的版本控制系统,广泛用于计算机软件的版本管理和代码控制。SecoClient作为客户端,一般需要安装在用户的电脑上,用来提交、更新、查看和管理源代码。 2. Win10上面连接不上的问题及返回码超时: 用户在使用SecoClient时遇到的连接不上问题,提示“接受返回码超时”,这通常是指客户端尝试与SVN服务器进行通信时,在设定的时间内没有得到有效的响应。返回码超时问题可能由多种原因导致,例如网络连接不稳定、防火墙设置、SVN服务器响应慢、或者是客户端与服务器之间的配置不正确。 3. 网络适配器配置: 网络适配器是电脑硬件中负责数据通信的部分。在本问题中,具体的操作为禁用网络适配器中的“SVN Adapter V1.0”,这一操作可能会影响到SecoClient的网络连接,特别是如果SVN Adapter是一个虚拟的网络适配器或者专门用于SecoClient连接的适配器时。 4. 驱动文件SVNDrv.sys的处理: 驱动文件(SVNDrv.sys)是操作系统用来控制硬件和软件资源的一个软件程序,对于SVN工具来说,这个驱动文件可能是用来协助SecoClient与网络适配器进行通信的。如果在连接SVN时遇到问题,解决方案中提到的删除旧的驱动文件并复制新的文件进去,可能是为了修复驱动文件损坏或更新驱动程序。 具体操作步骤为: - 打开“设备管理器”,找到网络适配器部分。 - 在列表中找到“SVN Adapter V1.0”,右键选择“禁用”。 - 导航到系统盘符下的“C:\Windows\System32\drivers”目录。 - 在该目录中找到并删除“SVNDrv.sys”文件。 - 将新下载的“SVNDrv.sys”文件复制到该目录下。 - 最后回到设备管理器,右键点击“SVN Adapter V1.0”,选择“启用”。 5. 标签中的知识点: - SecoClient:华为提供的用于连接和管理SVN服务器的客户端工具。 - SVNAdapter:可能指的是SecoClient在电脑上配置的一个虚拟网络适配器,用于连接到SVN服务器。 - 返回码超时:连接过程中出现的错误提示,表明在预定时间未能完成操作。 【压缩包子文件的文件名称列表】中的“解决华为SVN连接不上问题SVNDrv驱动文件.zip”则表明该压缩包内包含的是用于解决上述问题的相关文件,即新的SVNDrv.sys驱动文件。 综上所述,本知识点的详细解释不仅涵盖了与华为SecoClient工具相关的操作和配置,还包括了网络适配器和驱动文件的基本理解和处理方法。对于遇到类似问题的IT专业人士或普通用户来说,了解这些操作可以有效地解决SVN连接问题,确保软件开发和版本控制工作的顺利进行。
recommend-type

【西门子PLC新手必备手册】:一文掌握硬件安装与配置技巧

# 摘要 本文旨在全面介绍西门子PLC(可编程逻辑控制器)的相关知识与应用,包括硬件安装、配置实践、基础编程技巧、高级功能应用及系统维护与升级。首先,概述了PLC的基本概念与硬件组件,并详细讲解了硬件安装的理论与实践技巧。接着,深
recommend-type

idea 反编译 jar

<think>我们正在讨论如何使用IntelliJIDEA反编译JAR文件。根据用户的问题,我们需要提供清晰的步骤。注意:IntelliJIDEA内置了Java反编译器,可以方便地查看JAR文件中的类文件的源代码。步骤:1.在IntelliJIDEA中,确保已经创建了一个项目(可以是任意项目,甚至是一个空项目)。2.将JAR文件作为库添加到项目中,或者直接打开JAR文件。有两种主要方法:方法一:通过项目工具窗口添加JAR-打开项目后,在项目工具窗口中右键单击要添加JAR的模块或项目根目录。-选择“打开模块设置”(或按F4),或者在较新版本中,选择“项目结构”。-在“项目设置”下选择“库”,然后
recommend-type

思科Z1路由器刷机教程:LEDE固件升级指南

标题 "LEDE-Meraki-Z1.tar.rar" 中涉及的关键词包括 "LEDE" 和 "Meraki-Z1"。LEDE 项目是 OpenWrt 的一个分支,主要针对嵌入式设备的固件开发,提供高度可定制化的固件系统。"Meraki-Z1" 则指向了 Cisco Meraki 的一种特定硬件设备,即 Z1 系列路由器。Cisco Meraki 是思科公司的一个分支,专注于云管理网络解决方案。Z1 路由器被设计用于远程管理和监控,适合中小型企业使用。 描述中提到的“思科 z1 刷机固件 lede”,意指将 LEDE 固件刷入思科的 Z1 系列路由器中,这通常是为了替换原厂固件,实现对设备的更高控制度和扩展更多功能。这一步骤可能需要一定的技术知识,包括对网络设备固件更新的理解和路由器的物理接入权限。 标签“思科z1”进一步确认了设备的身份,即针对的是思科公司生产的 Z1 系列路由器。这一标签对识别适用固件和后续的问题排查提供帮助。 文件压缩包中的文件名称列表提供了两个关键的文件信息: 1. "led-ar71xx-nand-z1-initramfs-kernel.bin":这个文件很可能是固件中使用的初始 RAM 文件系统(initramfs)和内核镜像。在系统启动时,initramfs 负责挂载真正的根文件系统,并且内核在此阶段负责硬件检测和初始化。由于这个文件名中包含“ar71xx”,我们可以推断这个固件可能是为使用 AR71xx 芯片组的设备所设计的。 2. "led-ar71xx-nand-z1-squashfs-sysupgrade.tar":这个文件可能是实际包含 LEDE 固件的系统升级文件。SquashFS 是一种压缩的只读文件系统,适合用于嵌入式设备的固件,因为它可以减少对存储空间的需求。sysupgrade 是 OpenWrt 及其衍生项目如 LEDE 用来升级固件的机制,允许用户在不完全擦除存储的情况下进行固件的升级。 综合以上信息,可以得出以下几点重要知识点: - LEDE 项目是一个专注于嵌入式设备固件的项目,致力于提供可定制化、开源的固件。 - 思科的 Meraki-Z1 路由器系列适合中小企业使用,并提供云管理解决方案。 - “刷机”指的是将新的固件写入到设备中,这一步骤通常需要一定的技术背景。 - LEDE 固件被设计为可以运行在使用 AR71xx 芯片组的网络设备上。 - 固件升级通常通过 initramfs(初始RAM文件系统)和内核来完成硬件初始化和系统引导,而 SquashFS 和 sysupgrade 用于提供高效、便捷的固件升级机制。 在尝试刷入 LEDE 固件之前,用户需要确保他们已经清楚了解所涉及风险,包括但不限于失去保修、设备损坏或可能的数据丢失。同时,确保下载到的固件与设备硬件完全兼容,并且从可靠来源获取固件。刷入新固件前也建议备份当前的配置,以备不时之需。