LangChain 表达式语言 (LCEL) 食谱
LangChain 表达式语言 (LCEL) 食谱
本文档提供了LangChain表达式语言(LCEL)的实用示例和模式,帮助您解决常见问题和构建复杂应用。
基础模式
1. 简单问答链
最基础的LCEL链,将提示模板、模型和输出解析器连接起来:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("回答这个问题: {question}")
model = ChatOpenAI()
output_parser = StrOutputParser()
chain = prompt | model | output_parser
result = chain.invoke({"question": "什么是机器学习?"})
print(result)
2. 多重提示组合
将多个提示模板组合为一个,用于复杂指令:
system_prompt = "你是一位有{years}年经验的{profession}专家。"
question_prompt = "请回答关于{topic}的问题: {question}"
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", question_prompt)
])
chain = prompt | model | output_parser
result = chain.invoke({
"years": "10",
"profession": "人工智能",
"topic": "深度学习",
"question": "神经网络和传统机器学习有什么区别?"
})
3. 自定义函数集成
将Python函数集成到LCEL链中:
from langchain_core.runnables import RunnableLambda
def get_current_weather(location):
# 模拟天气API调用
weather_data = {"北京": "晴朗,25°C", "上海": "多云,28°C", "深圳": "雨,30°C"}
return weather_data.get(location, "未知")
weather_chain = RunnableLambda(get_current_weather)
# 在更大的链中使用
prompt = ChatPromptTemplate.from_template(
"地点: {location}\n当前天气: {weather}\n根据天气情况,推荐适合的活动。"
)
chain = {
"location": lambda x: x["location"],
"weather": lambda x: weather_chain.invoke(x["location"])
} | prompt | model | output_parser
result = chain.invoke({"location": "北京"})
中级模式
1. 检索增强生成 (RAG)
基础RAG模式,从向量数据库检索相关文档并生成回答:
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
# 假设已有文档集合和向量数据库
vectorstore = Chroma(embedding_function=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
# 创建RAG提示模板
template = """根据以下上下文回答问题。如果上下文中没有答案,就说不知道。
上下文:
{context}
问题: {question}
回答:"""
prompt = ChatPromptTemplate.from_template(template)
# 定义检索格式化函数
def format_docs(docs):
return "\n\n".join([doc.page_content for doc in docs])
# 构建RAG链
rag_chain = {
"context": lambda x: format_docs(retriever.get_relevant_documents(x["question"])),
"question": lambda x: x["question"]
} | prompt | model | output_parser
result = rag_chain.invoke({"question": "什么是量子计算?"})
2. 对话记忆整合
为LCEL链添加对话历史管理:
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import MessagesPlaceholder
# 创建包含历史的提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一位友好的AI助手。"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | model | output_parser
# 初始化对话历史
history = []
def chat(user_input):
global history
# 调用链并包含历史
response = chain.invoke({"history": history, "input": user_input})
# 更新历史
history.append(HumanMessage(content=user_input))
history.append(AIMessage(content=response))
return response
# 使用示例
print(chat("你好!"))
print(chat("我们刚才聊了什么?"))
3. 工具使用与函数调用
创建集成工具的LCEL链:
from langchain.tools import tool
from langchain.agents import AgentExecutor, create_openai_tools_agent
# 定义工具
@tool
def search(query: str) -> str:
"""搜索互联网以获取信息"""
# 模拟搜索结果
return f"关于{query}的搜索结果..."
@tool
def calculator(expression: str) -> float:
"""计算数学表达式"""
return eval(expression)
# 创建工具列表
tools = [search, calculator]
# 创建提示
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个有用的AI助手,可以使用工具来帮助回答问题。"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
# 创建代理
agent = create_openai_tools_agent(model, tools, prompt)
# 创建代理执行器
agent_executor = AgentExecutor(agent=agent, tools=tools)
# 在LCEL链中使用
chain = {
"input": lambda x: x["question"],
"history": lambda x: x.get("history", [])
} | agent_executor
result = chain.invoke({"question": "谁是中国的总理?另外,计算25 * 16是多少?"})
高级模式
1. 多模型路由
根据输入内容选择不同的模型:
from langchain_core.runnables import RunnableBranch
# 创建不同的模型
fast_model = ChatOpenAI(model="gpt-3.5-turbo")
smarter_model = ChatOpenAI(model="gpt-4")
# 定义选择逻辑
def is_complex_question(input_dict):
question = input_dict["question"]
complex_indicators = ["比较", "分析", "为什么", "如何", "评估"]
word_count = len(question)
return any(indicator in question for indicator in complex_indicators) or word_count > 50
# 创建分支路由
model_router = RunnableBranch(
(is_complex_question, smarter_model),
fast_model # 默认模型
)
# 集成到链中
chain = prompt | model_router | output_parser
result = chain.invoke({"question": "请深入分析人工智能对就业市场的长期影响。"})
2. 流水线处理与重试机制
创建包含多阶段处理和错误重试的复杂链:
import time
from langchain_core.runnables import RunnablePassthrough
# 自定义处理函数
def extract_keywords(text):
# 假设实现关键词提取
return ["关键词1", "关键词2", "关键词3"]
def search_documents(keywords):
# 假设实现基于关键词的文档搜索
time.sleep(1) # 模拟搜索延迟
return ["文档1", "文档2"]
# 添加重试逻辑的函数
retryable_search = RunnableLambda(search_documents).with_retry(
max_retries=3,
on_retry=lambda x: print(f"重试搜索: {x}"),
retry_sleep=2.0
)
# 构建多阶段处理流水线
pipeline = {
"original_query": RunnablePassthrough(),
"keywords": lambda x: extract_keywords(x["query"]),
"documents": lambda x: retryable_search(x["keywords"]),
"query": lambda x: x["query"]
} | prompt | model | output_parser
result = pipeline.invoke({"query": "量子计算的商业应用"})
3. 并行处理与结果聚合
并行调用多个模型并聚合结果:
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field
# 定义输出结构
class Analysis(BaseModel):
summary: str = Field(description="内容摘要")
sentiment: str = Field(description="情感分析")
key_points: list = Field(description="关键点列表")
# 创建不同的分析链
summary_chain = ChatPromptTemplate.from_template(
"请总结以下内容: {text}"
) | ChatOpenAI(temperature=0.1) | StrOutputParser()
sentiment_chain = ChatPromptTemplate.from_template(
"请分析以下内容的情感(积极、消极或中性): {text}"
) | ChatOpenAI(temperature=0.1) | StrOutputParser()
key_points_chain = ChatPromptTemplate.from_template(
"请从以下内容中提取5个关键点: {text}"
) | ChatOpenAI(temperature=0.1) | JsonOutputParser()
# 并行执行链
parallel_chain = {
"summary": summary_chain,
"sentiment": sentiment_chain,
"key_points": key_points_chain
}
# 构建完整的分析链
analysis_chain = lambda x: parallel_chain.invoke({"text": x["document"]})
# 使用示例
document = """人工智能的进步速度令人震惊。近年来,大型语言模型展现了前所未有的能力,
从编写代码到创作内容,再到辅助科学研究。这些进步带来了巨大的社会效益,
但同时也引发了关于就业、隐私和安全等方面的担忧。研究人员和政策制定者
正在努力制定框架,确保AI发展以负责任和公平的方式进行。"""
result = analysis_chain({"document": document})
print(result)
4. 流式处理与增量输出
实现流式响应的LCEL链:
from langchain_core.output_parsers import StrOutputParser
# 创建基本链
stream_chain = prompt | ChatOpenAI(streaming=True) | StrOutputParser()
# 流式处理示例
def process_streaming():
question = "请写一首关于人工智能的短诗。"
print("开始生成...")
for chunk in stream_chain.stream({"question": question}):
print(chunk, end="", flush=True)
print("\n完成!")
# 异步流式处理
async def process_async_streaming():
question = "请写一首关于人工智能的短诗。"
print("开始异步生成...")
async for chunk in stream_chain.astream({"question": question}):
print(chunk, end="", flush=True)
print("\n完成!")
实用技巧
1. 输入验证和预处理
from pydantic import BaseModel, Field, validator
# 定义输入模型
class QuestionInput(BaseModel):
question: str = Field(..., description="用户问题")
language: str = Field("中文", description="回答语言")
@validator("question")
def question_not_empty(cls, v):
if not v.strip():
raise ValueError("问题不能为空")
return v
# 输入验证函数
def validate_input(input_dict):
try:
validated = QuestionInput(**input_dict)
return validated.dict()
except Exception as e:
return {"error": str(e)}
# 在链中使用输入验证
validated_chain = RunnableLambda(validate_input) | prompt | model | output_parser
2. 缓存和记忆模式
from langchain_core.cache import InMemoryCache, SQLiteCache
# 内存缓存
memory_cache = InMemoryCache()
model_with_memory_cache = model.with_config({"cache": memory_cache})
# 持久化缓存
sqlite_cache = SQLiteCache(database_path="./cache.db")
model_with_persistent_cache = model.with_config({"cache": sqlite_cache})
# 在链中使用缓存
cached_chain = prompt | model_with_memory_cache | output_parser
3. 调试和跟踪
from langchain_core.tracers import ConsoleTracer
from langchain_core.callbacks import StdOutCallbackHandler
# 创建跟踪器
console_tracer = ConsoleTracer()
stdout_handler = StdOutCallbackHandler()
# 在调用时启用跟踪
result = chain.invoke(
{"question": "什么是深度学习?"},
config={"callbacks": [console_tracer, stdout_handler]}
)
# 持久化跟踪
from langchain.callbacks import FileCallbackHandler
with open("trace.log", "w") as f:
file_handler = FileCallbackHandler(f)
result = chain.invoke(
{"question": "什么是深度学习?"},
config={"callbacks": [file_handler]}
)
4. 错误处理模式
from langchain_core.runnables import RunnableConfig
# 创建自定义错误处理器
def handle_error(error, **kwargs):
print(f"发生错误: {type(error).__name__}: {str(error)}")
return "抱歉,处理您的请求时出现了错误。"
# 在配置中使用错误处理器
config = RunnableConfig(
callbacks=[],
error_handling={
"handler": handle_error
}
)
# 使用配置
result = chain.invoke({"question": "复杂问题"}, config=config)
实际应用示例
1. 智能客服机器人
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
# 初始化组件
prompt = ChatPromptTemplate.from_messages([
("system", "你是一位专业的客服代表,帮助解决产品问题。使用友好专业的语气。"),
MessagesPlaceholder(variable_name="history"),
("human", "{question}")
])
model = ChatOpenAI(temperature=0.3)
# 创建基本链
customer_service_chain = prompt | model | StrOutputParser()
# 模拟对话系统
class CustomerServiceBot:
def __init__(self):
self.history = []
def respond(self, user_question):
response = customer_service_chain.invoke({
"history": self.history,
"question": user_question
})
# 更新对话历史
self.history.append(HumanMessage(content=user_question))
self.history.append(AIMessage(content=response))
return response
def clear_history(self):
self.history = []
return "对话历史已清除"
2. 文档分析系统
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 文档加载和处理
def process_document(file_path):
# 加载文档
if file_path.endswith(".pdf"):
loader = PyPDFLoader(file_path)
else:
raise ValueError("不支持的文件类型")
documents = loader.load()
# 分割文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
splits = text_splitter.split_documents(documents)
return splits
# 向量存储和检索
def create_retrieval_chain(splits):
# 创建向量存储
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
# 创建RAG链
template = """根据以下上下文回答问题:
{context}
问题: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n".join([doc.page_content for doc in docs])
rag_chain = {
"context": lambda x: format_docs(retriever.get_relevant_documents(x["question"])),
"question": lambda x: x["question"]
} | prompt | model | StrOutputParser()
return rag_chain
结论
LCEL提供了强大而灵活的组件组合方式,使您能够构建从简单到复杂的各种AI应用。随着您对这些模式的掌握,可以创建更高效、可维护和功能丰富的应用程序。
本食谱中的模式可以混合搭配,根据您的具体需求进行调整。随着LangChain的不断发展,更多强大的模式将会出现,进一步扩展LCEL的能力。
如需了解更多关于LCEL的详细信息,请参考LCEL接口文档。
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 数据·开英
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果