-
Notifications
You must be signed in to change notification settings - Fork 240
/
Copy pathserver.py
executable file
·117 lines (93 loc) · 3.91 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python
"""Example LangChain server exposes a conversational retrieval agent.
Relevant LangChain documentation:
* Creating a custom agent: https://python.langchain.com/docs/modules/agents/how_to/custom_agent
* Streaming with agents: https://python.langchain.com/docs/modules/agents/how_to/streaming#custom-streaming-with-events
* General streaming documentation: https://python.langchain.com/docs/expression_language/streaming
**ATTENTION**
1. To support streaming individual tokens you will need to use the astream events
endpoint rather than the streaming endpoint.
2. This example does not truncate message history, so it will crash if you
send too many messages (exceed token length).
3. The playground at the moment does not render agent output well! If you want to
use the playground you need to customize it's output server side using astream
events by wrapping it within another runnable.
4. See the client notebook it has an example of how to use stream_events client side!
"""
from typing import Any
from fastapi import FastAPI
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad import format_to_openai_functions
from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_core.utils.function_calling import format_tool_to_openai_function
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from pydantic import BaseModel
from langserve import add_routes
vectorstore = FAISS.from_texts(
["cats like fish", "dogs like sticks"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
@tool
def get_eugene_thoughts(query: str) -> list:
"""Returns Eugene's thoughts on a topic."""
return retriever.get_relevant_documents(query)
tools = [get_eugene_thoughts]
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant."),
# Please note that the ordering of the user input vs.
# the agent_scratchpad is important.
# The agent_scratchpad is a working space for the agent to think,
# invoke tools, see tools outputs in order to respond to the given
# user input. It has to come AFTER the user input.
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
]
)
# We need to set streaming=True on the LLM to support streaming individual tokens.
# Tokens will be available when using the stream_log / stream events endpoints,
# but not when using the stream endpoint since the stream implementation for agent
# streams action observation pairs not individual tokens.
# See the client notebook that shows how to use the stream events endpoint.
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
llm_with_tools = llm.bind(functions=[format_tool_to_openai_function(t) for t in tools])
agent = (
{
"input": lambda x: x["input"],
"agent_scratchpad": lambda x: format_to_openai_functions(
x["intermediate_steps"]
),
}
| prompt
| llm_with_tools
| OpenAIFunctionsAgentOutputParser()
)
agent_executor = AgentExecutor(agent=agent, tools=tools)
app = FastAPI(
title="LangChain Server",
version="1.0",
description="Spin up a simple api server using LangChain's Runnable interfaces",
)
# We need to add these input/output schemas because the current AgentExecutor
# is lacking in schemas.
class Input(BaseModel):
input: str
class Output(BaseModel):
output: Any
# Adds routes to the app for using the chain under:
# /invoke
# /batch
# /stream
# /stream_events
add_routes(
app,
agent_executor.with_types(input_type=Input, output_type=Output).with_config(
{"run_name": "agent"}
),
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="localhost", port=8000)