In our previous tutorials, we created an AI agent capable of responding to the query by surfing on the web. However, when creating agents for long -lasting tasks, two important concepts come into the game: Attestation And StreamingFirmness allows you to save the position of an agent at any point, so that you can resume from that state in future interaction. This is important for long -running applications. On the other hand, streaming lets you emit real -time signs of what the agent is doing at any moment, provides transparency and control over your actions. In this tutorial, we will increase our agent by adding these powerful characteristics.
Agent installation
Let’s start making our agent again. We will load the essential environment variables, install and import the necessary libraries, set the tavili search equipment, the agents will define the state, and finally, form the agent.
pip install langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1
import os
os.environ['TAVILY_API_KEY'] = "<TAVILY_API_KEY>"
os.environ['GROQ_API_KEY'] = "<GROQ_API_KEY>"
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_groq import ChatGroq
from langchain_community.tools.tavily_search import TavilySearchResults
tool = TavilySearchResults(max_results=2)
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
class Agent:
def __init__(self, model, tools, system=""):
self.system = system
graph = StateGraph(AgentState)
graph.add_node("llm", self.call_openai)
graph.add_node("action", self.take_action)
graph.add_conditional_edges("llm", self.exists_action, True: "action", False: END)
graph.add_edge("action", "llm")
graph.set_entry_point("llm")
self.graph = graph.compile()
self.tools = t.name: t for t in tools
self.model = model.bind_tools(tools)
def call_openai(self, state: AgentState):
messages = state['messages']
if self.system:
messages = [SystemMessage(content=self.system)] + messages
message = self.model.invoke(messages)
return 'messages': [message]
def exists_action(self, state: AgentState):
result = state['messages'][-1]
return len(result.tool_calls) > 0
def take_action(self, state: AgentState):
tool_calls = state['messages'][-1].tool_calls
results = []
for t in tool_calls:
print(f"Calling: t")
result = self.tools[t['name']].invoke(t['args'])
results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
print("Back to the model!")
return 'messages': results
Lift up
To add perseverance, we will use Langgraph Checkpointer Speciality A checkpointer saves the status of the agent after every node. For this tutorial, we will use SqlitesverA simple checkpointer that takes advantage of a underlying database, Sqlite. While we will use an in-memory database for simplicity, you can easily connect it to an external database or use other posts such as redis or postgres for stronger firmness.
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
sqlite_conn = sqlite3.connect("checkpoints.sqlite",check_same_thread=False)
memory = SqliteSaver(sqlite_conn)
Next, we will modify our agent to accept a checkpointer:
class Agent:
def __init__(self, model, tools, checkpointer, system=""):
# Everything else remains the same as before
self.graph = graph.compile(checkpointer=checkpointer)
# Everything else after this remains the same
Now, we can enable our agent with firmness:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow-up question, you are allowed to do that!
"""
model = ChatGroq(model="Llama-3.3-70b-Specdec")
bot = Agent(model, [tool], system=prompt, checkpointer=memory)
Add streaming
Streaming is necessary for real -time updates. We are two types of streaming on which we will focus on:
1. Streaming message: Emit intermediate messages such as AI decisions and equipment results.
2. Streaming token: Separate token streaming from LLM’s response.
Let’s start by streaming messages. We will create and use a human message Stream Real time method of inspecting agent’s functions.
messages = [HumanMessage(content="What is the weather in Texas?")]
thread = "configurable": "thread_id": "1"
for event in bot.graph.stream("messages": messages, thread):
for v in event.values():
print(v['messages'])
Last output: The current season in Texas is sunny with a temperature of 19.4 ° C (66.9 ° F) and air speed of 4.3 mph (6.8 kph)… ..
When you run it, you see a stream of results. First, an AI message instructs the agent to make a tavily call, then a tool message with search results, and finally, an AI message answering the question.
Understanding Thread ID
Thread Thread is an important part of the configuration. This allows the agent to maintain separate interactions with different users or references. By assigning a unique thread_ID for each conversation, the agent can keep a track of several interactions simultaneously without joining them.
For example, let’s ask us and continue the conversation, “What about LA?” Using the same thread_ID:
messages = [HumanMessage(content="What about in LA?")]
thread = "configurable": "thread_id": "1"
for event in bot.graph.stream("messages": messages, thread):
for v in event.values():
print(v)
Final output: Los angeles have sunlight with a temperature of 17.2 ° C (63.0 ° F) and air speed of 2.2 mph (3.6 kph).
The agent tells that we are asking about the weather, thanks to perseverance. To verify, let’s ask, “Which is hot?”:
messages = [HumanMessage(content="Which one is warmer?")]
thread = "configurable": "thread_id": "1"
for event in bot.graph.stream("messages": messages, thread):
for v in event.values():
print(v)
Last output: Texas is warm compared to Los Angeles. The current temperature in Texas is 19.4 ° C (66.9 ° F), while the current temperature in Los Angeles is 17.2 ° C (63.0 ° F)
The agent compares the weather correctly in Texas and La. Let’s ask the same question with one different Thread,
messages = [HumanMessage(content="Which one is warmer?")]
thread = "configurable": "thread_id": "2"
for event in bot.graph.stream("messages": messages, thread):
for v in event.values():
print(v)
Output: I need more information to answer that question. Can you please provide more references or specify what two things you are comparing?
This time, the agent gets confused as it does not have access to the history of previous interaction.
Streaming token
To stream tokens, we will use estream_events The method, which is asynchronous. We will also switch to an async checkpointer.
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
messages = [HumanMessage(content="What is the weather in SF?")]
thread = "configurable": "thread_id": "4"
async for event in abot.graph.astream_events("messages": messages, thread, version="v1"):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# Empty content in the context of OpenAI means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(content, end="|")
This will stream the token in real time, allowing you a live view of the agent’s idea process.
conclusion
By adding perseverance and streaming, we have greatly enhanced the capabilities of our AI agent. The firmness allows the agent to maintain reference in interactions, while streaming provides real -time insight into its functions. These features are essential for the manufacture of production-product applications, especially they include many users or human-in-loop interactions.
In the next tutorial, we will dive Human-in-loop interactionWhere perseverance plays an important role in enabling easy cooperation between humans and AI agents. Stay!
Reference:
- ,
Also, don’t forget to follow us Twitter And join us Wire And LinkedIn GROUPDon’t forget to join us 75K+ ML Subredit,
Meet Intelligent: An open-SOS multi-agent framework to evaluate complex constructive AI system (Promoted)
Vineet Kumar Marktekpost has a counseling intern. He is currently following his BS from the Indian Institute of Technology (IIT), Kanpur. He is a machine learning enthusiast. He is emotional about research and deep learning, computer vision and latest progress in related fields.
[Recommended] Join our Telegram channel