LangGraph — Building Stateful, Multi-Agent LLM Applications
Prerequisites: Python 3.11+, familiarity with async/await, basic understanding of LLMs and chat models (e.g. OpenAI API), some exposure to LangChain concepts (messages, tools) is helpful but not required
What LangGraph Is and Why It Exists
LangGraph is a library from LangChain Inc. for building stateful, multi-actor applications powered by LLMs. Instead of thinking about agents as a single prompt-response loop, LangGraph models your application as a graph — nodes perform work, edges define transitions, and state flows through the entire system. The result is an architecture where you control exactly how your agent reasons, acts, and recovers from failure.
But why build yet another framework? The answer lies in the limitations of what came before it.
The Problem: AgentExecutor Was a Black Box
LangChain's original AgentExecutor gave developers a convenient way to spin up a ReAct-style agent in a few lines of code. Under the hood, it ran a rigid loop: call the LLM, parse the output for a tool invocation, execute the tool, feed the result back, repeat. This worked well for demos but crumbled under real-world requirements.
Need to add a human approval step before executing a dangerous tool? You'd have to monkeypatch the executor. Want the agent to branch into parallel sub-tasks? Not supported. Need to persist the conversation state across server restarts? You were on your own. The fundamental issue was that AgentExecutor hid the control flow, making it nearly impossible to customize without rewriting the internals.
LangGraph is not a replacement for LangChain — it's a complement. LangChain provides integrations (chat models, retrievers, tools), while LangGraph provides the orchestration layer that wires them together. You can also use LangGraph with zero LangChain dependencies.
The Solution: Graphs as the Control Plane
LangGraph's core insight is that most agent architectures — ReAct loops, planning-then-executing pipelines, multi-agent handoffs — can be represented as directed graphs with cycles. By making the graph explicit, you get full visibility and control over every transition your agent can make.
Here's what a minimal LangGraph agent definition looks like — nodes for the LLM call and tool execution, with a conditional edge that decides whether to loop or finish:
from langgraph.graph import StateGraph, START, END
# Define the graph
builder = StateGraph(AgentState)
# Add nodes — each is just a Python function
builder.add_node("call_llm", call_llm)
builder.add_node("use_tools", execute_tools)
# Define edges — including a conditional cycle
builder.add_edge(START, "call_llm")
builder.add_conditional_edges("call_llm", should_continue, {
"tools": "use_tools",
"done": END,
})
builder.add_edge("use_tools", "call_llm") # <-- the loop
graph = builder.compile()
The graph above encodes the classic ReAct pattern: call the LLM, check if it wants to use a tool, execute the tool if so, then loop back. The difference from AgentExecutor is that every edge and node is visible, replaceable, and extensible. Adding a human-in-the-loop approval step is as straightforward as inserting a new node between "call_llm" and "use_tools".
Key Capabilities
LangGraph doesn't just give you graphs — it provides a production-grade runtime with features you'd otherwise have to build yourself.
mindmap
root((LangGraph))
Stateful Graphs
Typed state schemas
Reducer functions
Scoped sub-states
Multi-Agent
Agent handoffs
Supervisor patterns
Parallel execution
Streaming
Token-by-token output
Node-level events
Custom stream channels
Persistence & Checkpointing
Automatic snapshots
Replay from any step
Fault tolerance
Human-in-the-Loop
Interrupt before/after nodes
Approval gates
State editing
Tool Integration
Any Python callable
Async support
Error handling
Deployment
LangGraph Platform
Self-hosted option
REST API serving
Cycles — the Agent Loop
Unlike DAG-only orchestrators (like most workflow engines), LangGraph supports cycles natively. This is what makes agent loops possible: the LLM calls a tool, gets a result, reasons again, calls another tool, and so on until it decides to stop. You define the loop condition as a function on a conditional edge — it's just Python, not a DSL.
State Management
Every graph execution carries a typed state object that flows through nodes. Nodes read from the state, do their work, and return updates. LangGraph merges those updates using reducer functions — for example, appending new messages to a list instead of overwriting it. This model prevents common bugs like lost context or race conditions in parallel branches.
First-Class Streaming
LangGraph provides multiple streaming modes out of the box. You can stream individual LLM tokens as they're generated, stream state updates after each node completes, or stream custom events from inside a node. In production, this means your users see results incrementally instead of staring at a spinner.
Checkpointing and Persistence
Every step in a graph execution can be automatically checkpointed. This gives you time-travel debugging (replay from any previous step), fault tolerance (resume after a crash), and the ability to pause execution — which is exactly how human-in-the-loop works. Checkpointers are pluggable: use SQLite for development, Postgres for production.
Each node in a LangGraph graph is just a Python function that receives state and returns updates. You can call OpenAI's SDK directly, use httpx to hit an API, run a pandas transformation, or invoke a LangChain chain — LangGraph doesn't care. The graph is the orchestration layer; the nodes are your code.
When LangGraph Makes Sense
LangGraph shines when your LLM application goes beyond a single prompt-response pair. If you need an agent that loops until a task is done, multiple agents that collaborate, persistent conversations, or human oversight at critical steps — that's LangGraph territory. For a straightforward "question in, answer out" app, a simple LangChain chain or a direct API call is likely all you need.
LangGraph vs LangChain vs Plain Python — When to Use What
Not every LLM application needs a framework. The right tool depends on the shape of your problem — specifically, how complex your control flow is and how much infrastructure you want to manage yourself. This section compares three approaches head-to-head so you can make an informed decision.
The Three Approaches at a Glance
Plain Python means calling the OpenAI (or Anthropic, etc.) SDK directly. You write functions, if statements, and loops. You get full control, but you also get full responsibility: persistence, streaming, error handling, and state management are all on you.
LangChain (LCEL) gives you composable abstractions — chains, retrievers, output parsers — connected with the pipe (|) operator. It excels at linear pipelines like RAG, prompt-template-to-model-to-parser flows, and simple tool calling. However, LCEL was not designed for cycles, conditional branching, or long-running stateful workflows.
LangGraph models your application as a graph of nodes and edges. Nodes are functions; edges define control flow, including conditional routing and cycles. Persistence, streaming, and human-in-the-loop are built in. The trade-off is a steeper learning curve and more upfront structure.
Decision Flowchart
Use this flowchart to quickly identify which approach fits your use case:
graph TD
A["What does your LLM workflow look like?"] --> B{"Is it a single
prompt → response?"}
B -->|Yes| C["✅ Plain Python"]
B -->|No| D{"Is it a linear
pipeline (no loops)?"}
D -->|Yes| E{"Need built-in RAG,
tool integrations?"}
E -->|Yes| F["✅ LangChain / LCEL"]
E -->|No| C
D -->|No| G{"Need cycles, loops,
or conditional routing?"}
G -->|Yes| H["✅ LangGraph"]
G -->|No| I{"Simple branching
(2-3 paths)?"}
I -->|Yes| F
I -->|No| H
style C fill:#4a9375,color:#fff
style F fill:#4a7f93,color:#fff
style H fill:#935f4a,color:#fff
Comparison Matrix
The table below compares the three approaches across the dimensions that matter most when building LLM applications in production.
| Dimension | Plain Python | LangChain (LCEL) | LangGraph |
|---|---|---|---|
| Control flow | Unlimited — it's just code | Linear pipelines; limited branching via RunnableBranch | Arbitrary graphs with cycles, conditional edges, parallel branches |
| State management | Manual (dicts, classes, databases) | Implicit — data flows through the chain | Explicit TypedDict or Pydantic state; reducers for merging |
| Persistence | Build it yourself (DB, Redis, files) | Not built-in; add via custom callbacks | Built-in checkpointing with SqliteSaver, PostgresSaver, etc. |
| Streaming | Manual with SDK streaming APIs | .stream() and .astream() built in | Token-level, node-level, and custom event streaming built in |
| Human-in-the-loop | Build it yourself | Not natively supported | First-class: interrupt(), approval gates, state editing |
| Multi-agent | Custom orchestration code | Possible but awkward | Native support via subgraphs and supervisor patterns |
| Learning curve | Low (just Python + SDK docs) | Medium (LCEL syntax, Runnable protocol) | Higher (graph concepts, state reducers, checkpointers) |
| Debugging | Standard Python debugging | LangSmith tracing; chain internals can be opaque | LangSmith tracing + step-by-step graph replay |
| Best for | Scripts, prototypes, simple single-call apps | RAG pipelines, prompt chains, tool-calling agents with linear flow | Complex agents, multi-step workflows, chatbots with memory, multi-agent systems |
What Each Approach Looks Like in Code
Seeing the same task in all three styles makes the trade-offs concrete. Here's a simple example: calling an LLM, checking if the response needs a tool call, and either returning the result or executing the tool.
Plain Python
Full control, no abstractions. You handle the loop and the routing yourself.
from openai import OpenAI
client = OpenAI()
def run_agent(messages, tools):
while True:
response = client.chat.completions.create(
model="gpt-4o", messages=messages, tools=tools
)
msg = response.choices[0].message
if msg.tool_calls:
for tc in msg.tool_calls:
result = execute_tool(tc.function.name, tc.function.arguments)
messages.append(msg)
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result})
else:
return msg.content # Done — no more tool calls
This works great for prototypes. But the moment you need persistence across requests, streaming partial results, or retry logic, you're writing all of that from scratch.
LangChain (LCEL)
LangChain shines when your flow is a straight pipeline. The pipe operator chains together prompt, model, and parser in a readable way.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{input}")
])
chain = prompt | ChatOpenAI(model="gpt-4o") | StrOutputParser()
# Clean, linear, easy to reason about
result = chain.invoke({"input": "Explain LangGraph in one sentence."})
Notice there's no loop here. If you need to iterate (call a tool, feed the result back, decide again), LCEL starts fighting you. That's not a bug — it's a design boundary.
LangGraph
LangGraph makes the control flow explicit. Each step is a node, and edges (including conditional ones) define how the graph executes.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
def call_model(state: AgentState):
response = ChatOpenAI(model="gpt-4o").invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: AgentState):
last = state["messages"][-1]
return "tools" if last.tool_calls else END
graph = StateGraph(AgentState)
graph.add_node("agent", call_model)
graph.add_node("tools", tool_executor)
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", should_continue)
graph.add_edge("tools", "agent") # Loop back — this is the cycle
app = graph.compile(checkpointer=memory) # Persistence for free
More code upfront, yes. But you now have a cycle (agent → tools → agent), built-in persistence via the checkpointer, and the ability to stream, pause, and resume this workflow at any point.
LangGraph is built on top of langchain-core. You still use LangChain's chat models, prompts, and tools inside your graph nodes. The two are complementary, not competing — LangGraph replaces LCEL chains for complex orchestration, not the entire LangChain ecosystem.
When to Choose Each
Choose Plain Python when…
- You're building a one-shot script or quick prototype
- Your app makes a single LLM call (or a fixed sequence of calls) with no branching
- You want zero dependencies beyond the model provider's SDK
- You're learning how LLM APIs work and don't want framework magic in the way
Choose LangChain (LCEL) when…
- Your workflow is a linear pipeline: prompt → model → parser → output
- You need RAG with retrievers, text splitters, and vector stores
- You want streaming and async for a chain without writing it yourself
- Branching is simple — two or three paths with
RunnableBranch
Choose LangGraph when…
- Your agent needs to loop — call tools, evaluate results, and decide the next step dynamically
- You need persistence so users can resume conversations or workflows across sessions
- You're building multi-agent systems where specialized agents hand off to each other
- You need human-in-the-loop approval steps in the middle of a workflow
- Reliability matters — you want built-in checkpointing so a crash doesn't lose progress
LangGraph adds real complexity — state schemas, graph definitions, checkpointer setup. If your workflow is a straightforward chain or a single API call, that overhead isn't justified. Start with the simplest approach that works and migrate to LangGraph when you actually hit its sweet spot: cycles, persistence, or multi-agent coordination.
Mental Model: Graphs as State Machines
At its core, a LangGraph application is a directed graph where each node is a Python function that reads and writes to a shared state object, and each edge defines which node runs next. If you've ever seen a state machine diagram — boxes connected by arrows — you already have the right intuition. The boxes are processing steps, the arrows are transitions, and the data flowing through is your application state.
Think of it like an assembly line in a factory. Each worker (node) has a specific job — one calls an LLM, another executes a tool, a third formats the output. The product (state) moves from station to station along conveyor belts (edges). But unlike a simple assembly line, sometimes the product gets sent back to a previous station for rework. That looping capability is what makes LangGraph powerful enough to build autonomous agents.
The Three Building Blocks
Every LangGraph application is built from three primitives: nodes, edges, and state. Nodes are plain functions that receive the current state, do some work, and return updates. Edges connect nodes and determine the order of execution. State is a shared data structure — typically a TypedDict or Pydantic model — that accumulates results as the graph runs.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
next_action: str
def agent_node(state: AgentState) -> dict:
"""Node: call the LLM and decide what to do next."""
response = llm.invoke(state["messages"])
return {"messages": [response], "next_action": parse_action(response)}
def tool_node(state: AgentState) -> dict:
"""Node: execute the tool the agent requested."""
result = execute_tool(state["messages"][-1])
return {"messages": [result]}
Notice that each node function takes the full state and returns only the keys it wants to update — not the entire state object. LangGraph merges these partial updates back into the shared state automatically. For list fields like messages, you can use a reducer function (like add_messages) that appends rather than replaces.
START, END, and the Graph Lifecycle
Every graph has two special sentinel nodes: START and END. START is the entry point — you connect it to whichever node should run first. END signals that execution is complete and the final state should be returned to the caller. No function is associated with these nodes; they exist purely to mark boundaries.
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
graph.add_edge(START, "agent") # Entry: start at the agent
graph.add_edge("tools", "agent") # After tools, always go back to agent
graph.add_conditional_edges("agent", route_decision) # Agent decides next step
app = graph.compile()
Normal Edges vs. Conditional Edges
LangGraph offers two types of edges, and the difference is crucial for understanding control flow.
Normal edges are unconditional — they always route to the same destination. When the tools node finishes, the graph always transitions back to agent. There's no decision-making involved.
Conditional edges call a routing function that inspects the current state and returns the name of the next node. This is how agents make decisions: the LLM output determines whether to call a tool, ask for human input, or finish entirely.
def route_decision(state: AgentState) -> str:
"""Routing function: inspect state and pick the next node."""
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools" # Agent wants to use a tool → go to tool_node
return END # No tool calls → we're done
The routing function does not perform work — it only reads state and returns a node name. Keep routing logic lightweight and side-effect-free. All actual computation belongs inside nodes.
Cycles: Why Graphs Beat DAGs for Agents
Most workflow frameworks — Airflow, Prefect, traditional LangChain chains — use directed acyclic graphs (DAGs). Data flows forward and never revisits a previous step. That's fine for pipelines, but agents are fundamentally iterative. An agent reasons, acts, observes, then reasons again. It needs to loop.
LangGraph explicitly supports cycles. The agent → tools → agent pattern shown in the diagram below is the canonical example: the agent calls a tool, observes the result, and decides whether to call another tool or stop. This loop can repeat as many times as needed — it's not hardcoded to a fixed number of steps.
stateDiagram-v2
[*] --> agent_node: START
agent_node --> tool_node: has tool calls
agent_node --> [*]: no tool calls (END)
tool_node --> agent_node: return result
Back to the assembly line analogy: imagine an inspector (the agent) examines the product and decides it needs another coat of paint. It goes back to the painting station (the tool node), then returns to the inspector. This loop continues until the inspector approves — and only then does the product leave the factory (END).
Supersteps: How Execution Actually Works
LangGraph doesn't just run nodes one at a time in a simple loop. It uses a concept called supersteps. In each superstep, the graph evaluates which nodes are ready to run (i.e., all their incoming edges have been satisfied) and executes them. If multiple nodes are ready simultaneously, they run in the same superstep — potentially in parallel.
After all nodes in a superstep complete, their state updates are merged, and the graph evaluates edges again to determine the next superstep. This continues until execution reaches END or no more nodes are runnable.
| Concept | State Machine Analogy | Assembly Line Analogy |
|---|---|---|
| Node | A state (processing step) | A worker at a station |
| Edge | A transition between states | A conveyor belt between stations |
| State | The current data context | The product being built |
| Conditional edge | A guarded transition | An inspector routing the product |
| Cycle | A loop back to a previous state | Sending the product back for rework |
| Superstep | One tick of the machine | All parallel stations working at once |
When you're designing a LangGraph application, sketch the state diagram first. Identify your nodes (what work needs to happen), your edges (what always follows what), and your conditional edges (where decisions are made). The code will map almost 1:1 to that diagram.
Core API: StateGraph, Nodes, and Edges
Every LangGraph application is built from three primitives: a StateGraph (the builder), nodes (the units of work), and edges (the connections between them). Once you understand how these three pieces snap together, you can construct anything from a simple chain to a complex multi-agent system.
StateGraph — The Builder Class
A StateGraph is the top-level container. You instantiate it with a state schema — a TypedDict that declares every key your graph can read and write. This schema is the single source of truth for what data flows between nodes.
from typing import TypedDict
from langgraph.graph import StateGraph
class MyState(TypedDict):
question: str
answer: str
graph_builder = StateGraph(MyState)
After instantiation, you add nodes and edges to the builder, then call .compile() to produce a runnable graph. The compiled graph is immutable — you can invoke it repeatedly without worrying about mutation.
When a node returns {"answer": "42"}, LangGraph merges that partial dict into the current state. Keys you don't return remain untouched. This is why nodes return partial updates, not full state copies.
Nodes — Where Work Happens
A node is a plain Python function (sync or async) that receives the current state dict and returns a partial state update. You register it with graph.add_node("name", fn). The string name is how you reference the node when wiring edges.
def think(state: MyState) -> dict:
question = state["question"]
# ... call an LLM, run logic, etc.
return {"answer": f"Thinking about: {question}"}
async def refine(state: MyState) -> dict:
raw = state["answer"]
# ... async LLM call to polish the answer
return {"answer": f"Refined: {raw}"}
graph_builder.add_node("think", think)
graph_builder.add_node("refine", refine)
Nodes can do anything — call LLMs, query databases, invoke tools, run computations. The only contract is: take state in, return a partial state dict out. If a node has nothing to update, it can return an empty dict {}.
Edges — Controlling Flow
Edges tell LangGraph which node runs next. There are two types: unconditional and conditional.
Unconditional Edges
add_edge(a, b) means "after node a finishes, always go to node b." Simple and deterministic.
graph_builder.add_edge("think", "refine")
Conditional Edges
add_conditional_edges(source, routing_fn, path_map) lets you branch at runtime. The routing_fn inspects the current state and returns a string key. The path_map maps those keys to target node names.
from langgraph.graph import END
def should_refine(state: MyState) -> str:
if len(state["answer"]) < 50:
return "needs_work"
return "done"
graph_builder.add_conditional_edges(
"think",
should_refine,
{
"needs_work": "refine",
"done": END,
},
)
The routing function is pure logic — it reads state and returns a string. It should not have side effects or modify state. Think of it as a lightweight traffic cop.
START and END Sentinels
LangGraph provides two special sentinel values to mark the entry and exit points of your graph:
| Sentinel | Import | Purpose |
|---|---|---|
START | from langgraph.graph import START | Virtual node representing the graph's entry point. Connect it to your first real node. |
END | from langgraph.graph import END | Virtual node representing termination. Connect your last node to it to signal the graph is done. |
from langgraph.graph import START, END
graph_builder.add_edge(START, "think")
graph_builder.add_edge("refine", END)
Every graph must have at least one edge from START and at least one path that reaches END. Without these, the graph won't know where to begin or when to stop.
Minimal Complete Example
Here's every piece assembled into a runnable script. Two nodes, unconditional flow, compile, invoke.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
# 1. Define your state schema
class AgentState(TypedDict):
question: str
answer: str
# 2. Define node functions
def generate(state: AgentState) -> dict:
return {"answer": f"Draft answer to: {state['question']}"}
def polish(state: AgentState) -> dict:
return {"answer": state["answer"].upper()}
# 3. Build the graph
builder = StateGraph(AgentState)
builder.add_node("generate", generate)
builder.add_node("polish", polish)
builder.add_edge(START, "generate")
builder.add_edge("generate", "polish")
builder.add_edge("polish", END)
# 4. Compile and invoke
graph = builder.compile()
result = graph.invoke({"question": "What is LangGraph?"})
print(result)
# {'question': 'What is LangGraph?', 'answer': 'DRAFT ANSWER TO: WHAT IS LANGGRAPH?'}
Notice that invoke takes a dict matching your state schema (or a subset of it). The returned value is the full final state after all nodes have executed.
Shortcut: add_sequence for Linear Flows
When your nodes run in a straight line with no branching, wiring individual edges is tedious. The add_sequence method handles this in one call — it registers each node and connects them in order, including the START and END edges.
# These two blocks are equivalent:
# --- Verbose way ---
builder = StateGraph(AgentState)
builder.add_node("generate", generate)
builder.add_node("polish", polish)
builder.add_edge(START, "generate")
builder.add_edge("generate", "polish")
builder.add_edge("polish", END)
# --- Shortcut way ---
builder = StateGraph(AgentState)
builder.add_sequence([generate, polish])
When you pass bare functions to add_sequence, LangGraph uses the function name as the node name automatically. You can still reference these nodes by name (e.g., "generate") if you need to add conditional edges later.
Putting It All Together: The Pattern
Every LangGraph application follows the same four-step recipe, regardless of complexity:
-
Define a state schema
Create a
TypedDict(or a Pydantic model) declaring every key your graph needs. This is the shared memory that all nodes read from and write to. -
Write node functions
Each function takes the current state and returns a partial dict of updates. Keep nodes focused — one responsibility per node makes graphs easier to debug and test.
-
Wire edges
Use
add_edgefor fixed paths andadd_conditional_edgesfor runtime branching. Always connectSTARTto your entry node and ensure every path reachesEND. -
Compile and invoke
Call
builder.compile()to get a runnable graph, thengraph.invoke(initial_state)to execute it. The compiled graph is reusable and thread-safe.
Designing Your State Schema
State is the single most important design decision in a LangGraph application. Every node in your graph reads from state and writes back to it — making the schema the contract that binds your entire workflow together. A well-designed state schema keeps your graph modular, debuggable, and easy to extend.
Each key you define in your state schema becomes a channel. When a node returns {"messages": [new_msg]}, LangGraph routes that value to the messages channel. Understanding this channel-based architecture is the key to designing state that works with the framework instead of against it.
Three Ways to Define State
LangGraph supports three approaches for defining your state schema, each offering a different level of strictness and validation. Choose based on how much runtime safety you need.
| Approach | Validation | Best For | Overhead |
|---|---|---|---|
TypedDict | Static only (mypy/pyright) | Most applications — simple, fast | Lowest |
dataclass | Static + default values | State with sensible defaults | Low |
Pydantic BaseModel | Full runtime validation | Untrusted inputs, strict contracts | Higher |
TypedDict — The Default Choice
Most LangGraph examples and applications use TypedDict. It gives you type hints for editor autocompletion and static analysis without any runtime overhead. This is the approach you should start with unless you have a specific reason to need more.
from typing import TypedDict, Annotated
from langgraph.graph import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
tool_results: list[dict]
final_answer: str
iteration_count: int
Dataclass — When You Need Defaults
TypedDict doesn't support default values natively. If you want state keys to initialize automatically (e.g., an empty list or a counter starting at zero), a dataclass is a clean option.
from dataclasses import dataclass, field
from typing import Annotated
from langgraph.graph import add_messages
@dataclass
class AgentState:
messages: Annotated[list, add_messages] = field(default_factory=list)
tool_results: list[dict] = field(default_factory=list)
final_answer: str = ""
iteration_count: int = 0
Pydantic BaseModel — Runtime Validation
When your graph processes untrusted input or you need strict guarantees about data shape, Pydantic gives you runtime validation on every state update. If a node returns a value that doesn't match the schema, Pydantic raises a ValidationError immediately rather than letting bad data propagate through the graph.
from pydantic import BaseModel, field_validator
from typing import Annotated
from langgraph.graph import add_messages
class AgentState(BaseModel):
messages: Annotated[list, add_messages] = []
final_answer: str = ""
confidence: float = 0.0
@field_validator("confidence")
@classmethod
def check_confidence(cls, v: float) -> float:
if not 0.0 <= v <= 1.0:
raise ValueError("confidence must be between 0.0 and 1.0")
return v
With this schema, any node that sets confidence to 1.5 will trigger an immediate validation error — catching the bug at the source rather than downstream.
Structuring State for Real Applications
A good state schema separates concerns into distinct channels. Think of your state keys as falling into three categories: conversation (the message history), intermediate data (working memory used between nodes), and output (the final results your graph produces).
class ResearchAgentState(TypedDict):
# Conversation — the full message history
messages: Annotated[list, add_messages]
# Intermediate data — working memory between nodes
search_queries: list[str]
retrieved_documents: list[dict]
_current_step: str # private: internal routing only
# Output — what the caller cares about
final_answer: str
sources: list[str]
This separation makes each node's role clear. A search node writes to search_queries and retrieved_documents. A synthesize node reads those and writes final_answer and sources. No node needs to understand the entire state — only its own inputs and outputs.
LangGraph serializes state for checkpointing and persistence. Stick to JSON-friendly types: str, int, float, bool, list, dict, and None. Avoid storing raw objects like database connections, open file handles, or lambda functions in state.
Reducers with Annotated Types
By default, when a node returns a value for a state key, LangGraph overwrites the existing value. This works fine for keys like final_answer, but it's a disaster for messages — you'd lose the entire conversation history every time a node runs.
Reducers solve this. By wrapping a type with Annotated, you attach a function that controls how updates are merged into existing state. The built-in add_messages reducer appends new messages and handles deduplication by ID.
from typing import Annotated
from operator import add
from langgraph.graph import add_messages
class AgentState(TypedDict):
# add_messages: appends new messages, deduplicates by ID
messages: Annotated[list, add_messages]
# operator.add: concatenates lists (simple append)
all_tool_calls: Annotated[list[str], add]
# Custom reducer: keep only the last 3 items
recent_queries: Annotated[list[str], lambda old, new: (old + new)[-3:]]
# No annotation: each update fully replaces the value
status: str
The reducer function signature is always (existing_value, new_value) -> merged_value. You can use any callable — a lambda, a named function, or a built-in like operator.add. Custom reducers are powerful for implementing sliding windows, counters, or deduplication logic.
Private State Keys
Not every piece of state should be visible to the caller. Internal routing flags, intermediate computation results, or temporary scratchpad data are implementation details. LangGraph uses an underscore-prefix convention — keys starting with _ signal "this is internal to the graph."
class PlannerState(TypedDict):
# Public — callers provide and consume these
messages: Annotated[list, add_messages]
final_plan: str
# Private — internal graph mechanics
_retry_count: int
_selected_model: str
_reasoning_trace: list[str]
You can formalize this further by using separate schemas for input, output, and internal state. LangGraph lets you pass distinct input/output schemas to StateGraph, so callers only see the keys that matter to them while the graph internally operates on a richer schema.
class InputState(TypedDict):
messages: Annotated[list, add_messages]
class OutputState(TypedDict):
messages: Annotated[list, add_messages]
final_answer: str
class FullState(InputState):
"""Internal state — extends input with private working keys."""
final_answer: str
_search_results: list[dict]
_iteration: int
# Only InputState keys are accepted as input;
# Only OutputState keys are returned to the caller
graph = StateGraph(FullState, input=InputState, output=OutputState)
Begin with just messages and one or two output keys. Add intermediate state keys only when a node genuinely needs to pass structured data to a downstream node. Over-engineering your state schema upfront leads to unused channels and confusion about which keys matter.
Reducers: How State Updates Are Merged
When two or more nodes in your graph write to the same state key, LangGraph needs a rule for combining those writes. That rule is called a reducer. Without one, you get simple overwrite semantics — the last node to run wins, and previous values are silently discarded.
You attach a reducer to a state key using Python's Annotated type hint: Annotated[type, reducer_fn]. The reducer function receives (current_value, new_value) and returns the merged result. This one mechanism unlocks append-only logs, message deduplication, running aggregations, and more.
Default Behavior: Last Write Wins
If you declare a state key with a plain type annotation, there's no reducer. Every write simply replaces the previous value.
from typing import TypedDict
class State(TypedDict):
query: str # No reducer — last write wins
answer: str # Same here
# Node A writes: {"query": "What is LangGraph?"}
# Node B writes: {"query": "Overwritten!"}
# Final state → query == "Overwritten!"
This is fine for keys that only one node ever touches, or where you genuinely want replacement semantics. But for keys like message histories or collected results, you need accumulation — and that's where reducers come in.
Built-in Reducer: operator.add
The simplest built-in reducer is operator.add. For lists, it concatenates the existing value with the new value. Each node returns a list, and those lists get appended together over the course of the graph execution.
import operator
from typing import Annotated, TypedDict
class State(TypedDict):
results: Annotated[list, operator.add] # Append, don't overwrite
# Node A returns: {"results": ["doc_1", "doc_2"]}
# Node B returns: {"results": ["doc_3"]}
# Final state → results == ["doc_1", "doc_2", "doc_3"]
Lists vs Tuples with operator.add
Both list and tuple support operator.add, but they behave differently in a subtle way. With Annotated[list, operator.add], your nodes must return lists. With Annotated[tuple, operator.add], nodes must return tuples. Mixing the two raises a TypeError at runtime because Python doesn't allow list + tuple.
class State(TypedDict):
# Use list — nodes must return lists
items_list: Annotated[list, operator.add]
# Use tuple — nodes must return tuples (immutable)
items_tuple: Annotated[tuple, operator.add]
# ✅ Works: {"items_list": ["a", "b"]}
# ✅ Works: {"items_tuple": ("a", "b")}
# ❌ Fails: {"items_list": ("a", "b")} → TypeError
Prefer list unless you have a specific reason for immutability. It's the more common pattern in LangGraph codebases.
Built-in Reducer: add_messages
For chat-based agents, add_messages from langgraph.graph is the most important reducer. It does more than simple appending — it handles message deduplication by ID. If you return a message with the same id as an existing message, it replaces that message in place instead of duplicating it. This is critical for tool-call loops where messages get re-processed.
from typing import Annotated, TypedDict
from langgraph.graph import add_messages
from langchain_core.messages import AIMessage, HumanMessage, RemoveMessage
class State(TypedDict):
messages: Annotated[list, add_messages]
# Appending: new messages are added to the list
# {"messages": [HumanMessage(content="Hello")]}
# Updating: same ID replaces the existing message
# {"messages": [AIMessage(content="Updated answer", id="msg-123")]}
# Deleting: RemoveMessage removes by ID
# {"messages": [RemoveMessage(id="msg-123")]}
add_messages decidesNew message without an existing ID → append. New message with a matching ID → replace in place. A RemoveMessage → delete the message with that ID. This tri-modal behavior makes it the right default for any conversational state.
Writing Custom Reducers
A custom reducer is any function with the signature (current_value, new_value) → merged_value. You pass it as the second argument inside Annotated. Here are three practical patterns.
Keep the Last N Items
Useful for bounded memory — collect results but never let the list grow beyond a fixed window.
def keep_last_n(n: int):
"""Returns a reducer that keeps only the last n items."""
def reducer(current: list, new: list) -> list:
combined = current + new
return combined[-n:]
return reducer
class State(TypedDict):
recent_queries: Annotated[list, keep_last_n(5)]
# After 7 writes of 1 item each → only the last 5 remain
Merge Dictionaries (Deep Update)
When your state key is a dict and you want each node to contribute partial updates without clobbering the entire dict.
def merge_dicts(current: dict, new: dict) -> dict:
"""Shallow merge — new keys override existing ones."""
return {**current, **new}
class State(TypedDict):
metadata: Annotated[dict, merge_dicts]
# Node A: {"metadata": {"source": "web", "score": 0.9}}
# Node B: {"metadata": {"score": 0.95, "verified": True}}
# Final → {"source": "web", "score": 0.95, "verified": True}
Track a Running Maximum
Sometimes you want to accumulate a scalar — for example, keeping the highest confidence score seen across all nodes.
def running_max(current: float, new: float) -> float:
return max(current, new)
class State(TypedDict):
best_score: Annotated[float, running_max]
# Node A: {"best_score": 0.72}
# Node B: {"best_score": 0.89}
# Node C: {"best_score": 0.65}
# Final → best_score == 0.89
The Default Value Gotcha
When you attach a reducer to a state key, the reducer function receives the current value as its first argument on every update — including the very first one. If the key has no initial value, current is undefined and LangGraph raises an error. You must always provide a default.
# ❌ BROKEN — no default value for a key with a reducer
class BadState(TypedDict):
items: Annotated[list, operator.add]
# First node writes {"items": ["a"]}
# Reducer called: operator.add(???, ["a"]) → Error!
# ✅ FIXED — provide a default via a dataclass or Pydantic model
from dataclasses import dataclass, field
@dataclass
class GoodState:
items: Annotated[list, operator.add] = field(default_factory=list)
best_score: Annotated[float, running_max] = 0.0
metadata: Annotated[dict, merge_dicts] = field(default_factory=dict)
Plain TypedDict has no mechanism for default values. If you use a reducer with TypedDict, you must pass the initial values when invoking the graph (e.g., graph.invoke({"items": [], "best_score": 0.0})). Alternatively, switch to a dataclass or Pydantic BaseModel where defaults are first-class.
Quick Reference
| Reducer | Use Case | Behavior |
|---|---|---|
None (no reducer) | Single-writer keys | Last write wins (overwrite) |
operator.add | Append-only lists/tuples | Concatenates sequences |
add_messages | Chat message histories | Append, deduplicate by ID, supports deletion |
Custom fn(cur, new) | Any merge logic you need | Full control — return the merged value |
MessagesState and Chat-Based Patterns
Almost every LLM application manages a list of messages — user inputs, assistant responses, system prompts, tool calls. Rather than making you define this boilerplate yourself, LangGraph ships MessagesState: a pre-built TypedDict with a single messages key that uses the add_messages reducer under the hood.
This section covers how MessagesState works, the full behavior of add_messages, and the common patterns you'll use to build chat-based agents.
Using MessagesState Directly
MessagesState is a TypedDict with one key: messages, annotated with the add_messages reducer. You can use it as-is for simple chatbot graphs without defining any custom state.
from langgraph.graph import StateGraph, MessagesState
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
def chatbot(state: MessagesState):
response = llm.invoke(state["messages"])
return {"messages": [response]}
graph = StateGraph(MessagesState)
graph.add_node("chatbot", chatbot)
graph.set_entry_point("chatbot")
graph.set_finish_point("chatbot")
app = graph.compile()
When you invoke this graph, you pass messages in and get messages out. The add_messages reducer automatically appends each new message to the existing list rather than replacing it — so conversation history accumulates across turns.
from langchain_core.messages import HumanMessage
result = app.invoke({
"messages": [HumanMessage(content="What is LangGraph?")]
})
# result["messages"] contains:
# [HumanMessage("What is LangGraph?"), AIMessage("LangGraph is...")]
How add_messages Works
The add_messages reducer is the engine behind MessagesState. It does more than simple appending — it supports three distinct operations depending on the messages you return from a node.
1. Appending New Messages
The default behavior: any message without a matching ID in the existing list gets appended.
from langgraph.graph import add_messages
from langchain_core.messages import AIMessage, HumanMessage
existing = [HumanMessage(content="Hi", id="1")]
new = [AIMessage(content="Hello! How can I help?", id="2")]
result = add_messages(existing, new)
# [HumanMessage("Hi", id="1"), AIMessage("Hello! How can I help?", id="2")]
2. Updating Messages by ID
If a new message has the same id as an existing message, it replaces the existing one in place. This is useful for correcting or enriching messages after tool execution.
existing = [AIMessage(content="Draft response", id="msg-42")]
new = [AIMessage(content="Polished final response", id="msg-42")]
result = add_messages(existing, new)
# [AIMessage("Polished final response", id="msg-42")] — replaced, not appended
3. Removing Messages with RemoveMessage
You can delete specific messages from the state by returning RemoveMessage objects that target messages by their id. This is essential for pruning conversation history.
from langchain_core.messages import RemoveMessage
def prune_old_messages(state: MessagesState):
# Remove all but the last 5 messages
messages_to_remove = state["messages"][:-5]
return {
"messages": [RemoveMessage(id=m.id) for m in messages_to_remove]
}
add_messages resolves operationsAll three behaviors — append, update, remove — happen through the same add_messages reducer. It checks each incoming message: if it's a RemoveMessage, the target is deleted. If its id matches an existing message, the existing one is replaced. Otherwise, it's appended. No configuration needed.
Adding System Prompts
A common pattern is to prepend a system message at the start of every LLM call. You don't store it in state — instead, you inject it inside the node function before calling the model. This keeps the system prompt consistent and out of the mutable message history.
from langchain_core.messages import SystemMessage
SYSTEM_PROMPT = SystemMessage(
content="You are a helpful coding assistant. Be concise and use code examples."
)
def chatbot(state: MessagesState):
# Prepend system prompt to every LLM call
messages = [SYSTEM_PROMPT] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
Trimming Messages to Fit Context Windows
Long conversations eventually exceed your model's context window. LangChain provides trim_messages — a utility that trims the message list to fit within a token budget. You call it inside your node, right before the LLM call, so the stored state keeps the full history while the model only sees what fits.
from langchain_core.messages import trim_messages, SystemMessage
def chatbot(state: MessagesState):
trimmed = trim_messages(
state["messages"],
max_tokens=4000,
strategy="last", # keep the most recent messages
token_counter=llm, # use the model's tokenizer
start_on="human", # ensure the first kept message is from the user
include_system=True, # always keep the system message if present
)
messages = [SYSTEM_PROMPT] + trimmed
response = llm.invoke(messages)
return {"messages": [response]}
The strategy="last" option keeps the most recent messages that fit within your token budget. Setting start_on="human" ensures you don't start mid-conversation with an orphaned AI reply, and include_system=True preserves any system message already in the list.
Trim messages inside your node before calling the LLM, rather than removing messages from state with RemoveMessage. This way the full conversation history is preserved in state (useful for debugging and checkpointing), while the model only sees what fits in its context window.
Extending MessagesState with Custom Keys
For anything beyond a simple chatbot, you'll need more than just messages in your state. You can extend MessagesState by subclassing it and adding your own keys. The messages key and its add_messages reducer carry over automatically.
from langgraph.graph import MessagesState
from typing import Annotated
import operator
class AgentState(MessagesState):
# Accumulated tool outputs from the current run
tool_outputs: list[dict]
# Track iterations to prevent infinite loops
iteration_count: Annotated[int, operator.add]
# Final structured answer
final_answer: str | None
Notice that iteration_count uses the operator.add reducer — returning {"iteration_count": 1} from a node increments the counter rather than overwriting it. Meanwhile, tool_outputs has no reducer annotation, so it uses the default last-write-wins behavior.
Using Extended State in an Agent Loop
Here's a practical example: a tool-calling agent that extends MessagesState to track loop iterations and bail out after a maximum number of steps.
from langgraph.graph import StateGraph, END
MAX_ITERATIONS = 5
class AgentState(MessagesState):
iteration_count: Annotated[int, operator.add]
def call_model(state: AgentState):
response = llm.invoke(state["messages"])
return {"messages": [response], "iteration_count": 1}
def should_continue(state: AgentState):
last_message = state["messages"][-1]
if state["iteration_count"] >= MAX_ITERATIONS:
return "end"
if last_message.tool_calls:
return "tools"
return "end"
graph = StateGraph(AgentState)
graph.add_node("agent", call_model)
graph.add_node("tools", tool_node)
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_continue, {
"tools": "tools",
"end": END,
})
graph.add_edge("tools", "agent")
app = graph.compile()
Tool-calling agents can enter infinite loops if the model keeps generating tool calls. Always add an iteration_count or use LangGraph's built-in recursion_limit parameter when compiling to prevent runaway execution and unexpected API costs.
Quick Reference: add_messages Operations
| Operation | What you return from a node | What happens |
|---|---|---|
| Append | Message with a new id (or no id) | Added to the end of the list |
| Update | Message with an existing id | Replaces the matching message in place |
| Remove | RemoveMessage(id=target_id) | Deletes the message with that id |
Input, Output, and Context Schemas
A production graph often carries internal state that callers should never see — intermediate scratchpads, retry counters, partial results. Without schema boundaries, every invoker of your graph is coupled to every implementation detail. LangGraph v1.0 solves this with three distinct schema layers: input, output, and context_schema.
The Full State vs. What Callers See
Your graph's internal state can be as rich as it needs to be. But when another service invokes your graph, it should only need to know about the keys it provides and the keys it reads from the result. The input and output parameters on StateGraph let you draw that boundary explicitly.
Consider a research assistant graph. Internally it tracks a scratchpad for intermediate notes and a step_count for loop control. Callers only send a query and receive an answer.
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from operator import add
# --- Full internal state ---
class ResearchState(TypedDict):
query: str # caller provides this
answer: str # caller reads this
scratchpad: Annotated[list[str], add] # internal only
step_count: int # internal only
# --- Public-facing schemas ---
class InputSchema(TypedDict):
query: str
class OutputSchema(TypedDict):
answer: str
The three TypedDict classes serve different roles. ResearchState is the complete internal state that nodes read and write. InputSchema and OutputSchema are the public contract — they define what goes in and what comes out.
Wiring Input and Output Schemas to the Graph
Pass the schemas as the input and output parameters when constructing the StateGraph. LangGraph validates incoming data against InputSchema and strips the result down to OutputSchema before returning.
def research_node(state: ResearchState) -> dict:
# Node has full access to internal state
notes = state.get("scratchpad", [])
step = state.get("step_count", 0)
# ... perform research using state["query"] ...
return {
"scratchpad": [f"Step {step}: searched for '{state['query']}'"],
"step_count": step + 1,
}
def summarize_node(state: ResearchState) -> dict:
notes = "\n".join(state.get("scratchpad", []))
# ... generate final answer from scratchpad ...
return {"answer": f"Summary based on {len(state['scratchpad'])} research steps."}
# Build the graph with schema boundaries
graph = StateGraph(ResearchState, input=InputSchema, output=OutputSchema)
graph.add_node("research", research_node)
graph.add_node("summarize", summarize_node)
graph.add_edge(START, "research")
graph.add_edge("research", "summarize")
graph.add_edge("summarize", END)
app = graph.compile()
Now when you invoke the graph, the schema boundaries are enforced automatically:
# Caller only provides keys in InputSchema
result = app.invoke({"query": "What are the benefits of RAG?"})
print(result)
# {"answer": "Summary based on 1 research steps."}
# ^^^ Only OutputSchema keys — no scratchpad, no step_count
input and output schemas must be subsets of the full state schema. LangGraph doesn't transform data — it filters keys. If a caller passes a key not in InputSchema, it is silently ignored. If internal state contains keys not in OutputSchema, they are stripped from the return value.
context_schema: Per-Invocation Context via Runtime
Some values need to travel with an invocation but don't belong in the graph state at all — a user ID for audit logging, a language preference for i18n, or an API key for a downstream service. Putting these in state means every node signature changes when you add a new config field, and checkpointed state gets polluted with metadata.
The context_schema parameter (replacing the now-deprecated config_schema) defines a typed context object that nodes access through the Runtime parameter or the get_context() function. Context values are passed at invocation time but live outside the state entirely.
from langgraph.graph import StateGraph, START, END
from langgraph.types import get_context
class MyState(TypedDict):
query: str
answer: str
class ContextSchema(TypedDict):
user_id: str
language: str
Define nodes that read from context. Use get_context() inside any node to retrieve the context values for the current invocation:
def answer_node(state: MyState) -> dict:
ctx = get_context()
user_id = ctx["user_id"]
language = ctx["language"]
# Use context for behavior, not state
print(f"Processing request for user {user_id} in {language}")
if language == "es":
return {"answer": f"Respuesta para: {state['query']}"}
return {"answer": f"Answer for: {state['query']}"}
# Wire it up with context_schema
graph = StateGraph(MyState, context_schema=ContextSchema)
graph.add_node("answer", answer_node)
graph.add_edge(START, "answer")
graph.add_edge("answer", END)
app = graph.compile()
When invoking, pass context values under the context key in the config:
result = app.invoke(
{"query": "Explain LangGraph schemas"},
context={"user_id": "usr_8xk2m", "language": "en"},
)
print(result)
# {"answer": "Answer for: Explain LangGraph schemas"}
Combining All Three Schemas
In practice, you use all three schema layers together. Here's the complete pattern — a graph with a rich internal state, a clean public interface, and typed per-invocation context:
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
from langgraph.graph import StateGraph, START, END
from langgraph.types import get_context
# Full internal state
class AgentState(TypedDict):
query: str
answer: str
scratchpad: Annotated[list[str], add]
step_count: int
# Public contract
class Input(TypedDict):
query: str
class Output(TypedDict):
answer: str
# Per-invocation context (not in state)
class Context(TypedDict):
user_id: str
language: str
def process(state: AgentState) -> dict:
ctx = get_context()
step = state.get("step_count", 0)
return {
"scratchpad": [f"[{ctx['user_id']}] Processed in {ctx['language']}"],
"step_count": step + 1,
"answer": f"Result for '{state['query']}' (lang={ctx['language']})",
}
graph = StateGraph(AgentState, input=Input, output=Output, context_schema=Context)
graph.add_node("process", process)
graph.add_edge(START, "process")
graph.add_edge("process", END)
app = graph.compile()
# Clean invocation — only Input keys in, only Output keys out
result = app.invoke(
{"query": "How do schemas work?"},
context={"user_id": "usr_8xk2m", "language": "en"},
)
print(result) # {"answer": "Result for 'How do schemas work?' (lang=en)"}
Use context_schema instead of stuffing user IDs and config flags into your graph state. Context values are not checkpointed, which keeps your state store clean and avoids leaking sensitive metadata (like API keys) into persisted snapshots.
Quick Reference
| Schema | Purpose | Passed via | Accessible in nodes? |
|---|---|---|---|
input | Filter keys accepted on invocation | app.invoke({...}) | Yes — merged into state |
output | Filter keys returned to caller | Automatic on return | N/A (output filtering) |
context_schema | Typed per-invocation metadata | context={...} in invoke | Yes — via get_context() |
If you previously used config_schema and accessed values via config["configurable"], switch to context_schema and get_context(). The old pattern still works during the deprecation window, but config_schema will be removed in a future release.
Tutorial: Building a ReAct Agent from Scratch
The ReAct (Reason + Act) pattern is one of the most powerful agent architectures: the LLM reasons about what to do, acts by calling a tool, observes the result, and then reasons again. This loop continues until the LLM decides it has enough information to answer. In this tutorial, you'll build a complete ReAct agent using LangGraph's StateGraph.
How the ReAct Loop Works
Before writing code, visualize the data flow. The agent node calls the LLM. If the LLM response contains tool calls, execution routes to the tool executor node, which runs those tools and feeds results back to the agent. If the LLM returns a plain text response (no tool calls), the graph terminates.
graph TD
START([__start__]) --> agent["agent
(LLM call with tools)"]
agent --> check{has tool_calls?}
check -->|Yes| tool_executor["tool_executor
(run tool calls)"]
tool_executor --> agent
check -->|No| END([__end__])
This cycle — agent → tools → agent — is the core of every ReAct agent. LangGraph makes it explicit as a graph structure rather than hiding it inside a while loop. That gives you full visibility into each step's state and the ability to interrupt, checkpoint, or branch at any point.
Step-by-Step Build
-
Install dependencies
You need
langgraph,langchain-openai, andlangchain-core. Make sure yourOPENAI_API_KEYenvironment variable is set.bashpip install langgraph langchain-openai langchain-core -
Define the state schema
LangGraph graphs operate on a shared state object that flows between nodes.
MessagesStateis a built-in schema that manages a list of LangChain messages with automatic deduplication. You'll extend it with astep_countfield to track how many agent-tool loops have executed.pythonfrom langgraph.graph import MessagesState class AgentState(MessagesState): """Extended state that tracks the number of reasoning steps.""" step_count: intMessagesStatealready provides amessageskey with a built-in reducer that appends new messages to the list. Your customstep_countkey uses simple replacement — whatever value a node returns overwrites the previous one. -
Create tools for the agent
You need at least one tool for the agent to call. Here you define two simple ones: a calculator for math expressions and a weather lookup. The
@tooldecorator turns a plain Python function into a LangChain tool with automatic schema generation from the type hints and docstring.pythonfrom langchain_core.tools import tool @tool def calculator(expression: str) -> str: """Evaluate a math expression. Example: '2 + 2' or '15 * 3.5'.""" try: result = eval(expression, {"__builtins__": {}}) return str(result) except Exception as e: return f"Error: {e}" @tool def get_weather(city: str) -> str: """Get the current weather for a city.""" # Simulated responses for demonstration weather_data = { "london": "London: 15°C, cloudy with light rain", "tokyo": "Tokyo: 28°C, sunny and humid", "new york": "New York: 22°C, partly cloudy", } return weather_data.get(city.lower(), f"{city}: data not available") tools = [calculator, get_weather] -
Set up the LLM with tools bound
Bind your tools to the
ChatOpenAImodel. This tells the model what tools are available and their schemas — the model can then choose to call them by returning structuredtool_callsin its response.pythonfrom langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) llm_with_tools = llm.bind_tools(tools) -
Define the agent node
The agent node is the "brain" of the graph. It takes the current state, calls the LLM with the full message history, and returns the LLM's response as a new message plus an incremented step count. Every node in LangGraph receives the state and returns a partial state update — LangGraph merges it back automatically.
pythondef agent_node(state: AgentState) -> dict: """Call the LLM with the current message history.""" response = llm_with_tools.invoke(state["messages"]) return { "messages": [response], "step_count": state.get("step_count", 0) + 1, } -
Define the tool executor node
The tool executor takes the last AI message, extracts its
tool_calls, runs each one, and returns the results asToolMessageobjects. LangGraph providesToolNodeas a ready-made implementation, but building it manually helps you understand what happens under the hood.pythonfrom langchain_core.messages import ToolMessage # Build a lookup dict: tool name -> tool function tool_map = {t.name: t for t in tools} def tool_executor_node(state: AgentState) -> dict: """Execute tool calls from the last AI message.""" last_message = state["messages"][-1] results = [] for call in last_message.tool_calls: tool_fn = tool_map[call["name"]] output = tool_fn.invoke(call["args"]) results.append( ToolMessage(content=str(output), tool_call_id=call["id"]) ) return {"messages": results} -
Define the conditional routing function
This is the decision point. After the agent node runs, you check the LLM's response: if it contains
tool_calls, route to the tool executor; otherwise, the agent is done and you route toEND.pythonfrom langgraph.graph import END def should_continue(state: AgentState) -> str: """Route based on whether the last message has tool calls.""" last_message = state["messages"][-1] if last_message.tool_calls: return "tool_executor" return END -
Assemble and compile the graph
Now you wire everything together with
StateGraph. Add nodes, connect them with edges and conditional edges, set the entry point, and compile. Attaching aMemorySavercheckpointer enables conversation persistence across invocations.pythonfrom langgraph.graph import StateGraph, START from langgraph.checkpoint.memory import MemorySaver # 1. Create graph with state schema graph_builder = StateGraph(AgentState) # 2. Add nodes graph_builder.add_node("agent", agent_node) graph_builder.add_node("tool_executor", tool_executor_node) # 3. Add edges graph_builder.add_edge(START, "agent") # Entry: start at agent graph_builder.add_conditional_edges("agent", should_continue) # Agent decides next step graph_builder.add_edge("tool_executor", "agent") # After tools, loop back # 4. Compile with checkpointer checkpointer = MemorySaver() graph = graph_builder.compile(checkpointer=checkpointer) -
Invoke the agent
Run the agent with a user question. The
configdict with athread_idis required when using a checkpointer — it identifies the conversation session.pythonfrom langchain_core.messages import HumanMessage result = graph.invoke( { "messages": [HumanMessage(content="What's the weather in Tokyo and what's 24 * 15?")], "step_count": 0, }, config={"configurable": {"thread_id": "session-1"}}, ) print(result["messages"][-1].content) print(f"Total agent steps: {result['step_count']}")
Execution Walkthrough
To see what happens inside the graph at each step, use stream instead of invoke. This yields the state updates after each node executes, giving you full observability into the ReAct loop.
for step in graph.stream(
{
"messages": [HumanMessage(content="What's the weather in Tokyo and what's 24 * 15?")],
"step_count": 0,
},
config={"configurable": {"thread_id": "session-2"}},
):
for node_name, state_update in step.items():
print(f"\n{'='*50}")
print(f"Node: {node_name}")
print(f"Step count: {state_update.get('step_count', '-')}")
for msg in state_update.get("messages", []):
print(f" [{msg.type}] {msg.content[:100] if msg.content else '[tool_calls]'}")
Here's what the output reveals. Each numbered step below corresponds to one node execution in the graph:
| Step | Node | What happens | State after |
|---|---|---|---|
| 1 | agent |
LLM sees the user question, decides it needs both tools. Returns an AIMessage with two tool_calls: get_weather("Tokyo") and calculator("24 * 15"). |
step_count: 1, messages: [Human, AI(tool_calls)] |
| 2 | tool_executor |
Executes both tool calls. Returns two ToolMessage results: "Tokyo: 28°C, sunny and humid" and "360". |
step_count: 1, messages: [Human, AI, Tool, Tool] |
| 3 | agent |
LLM sees the tool results, synthesizes a final answer. Returns an AIMessage with no tool calls. | step_count: 2, messages: [Human, AI, Tool, Tool, AI] |
| 4 | (end) | should_continue sees no tool calls → routes to END. Graph terminates. |
Final state returned to caller |
GPT-4o models can return multiple tool calls in a single response. The tool executor node handles all of them in one pass, which is why both the weather lookup and the calculation happen in the same step. This means a single ReAct loop can execute several tools before returning to the LLM.
The Complete Code
Here is the entire agent in one copy-paste-ready block:
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
# --- State ---
class AgentState(MessagesState):
step_count: int
# --- Tools ---
@tool
def calculator(expression: str) -> str:
"""Evaluate a math expression. Example: '2 + 2' or '15 * 3.5'."""
try:
result = eval(expression, {"__builtins__": {}})
return str(result)
except Exception as e:
return f"Error: {e}"
@tool
def get_weather(city: str) -> str:
"""Get the current weather for a city."""
weather_data = {
"london": "London: 15°C, cloudy with light rain",
"tokyo": "Tokyo: 28°C, sunny and humid",
"new york": "New York: 22°C, partly cloudy",
}
return weather_data.get(city.lower(), f"{city}: data not available")
tools = [calculator, get_weather]
# --- LLM ---
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)
# --- Nodes ---
def agent_node(state: AgentState) -> dict:
response = llm_with_tools.invoke(state["messages"])
return {
"messages": [response],
"step_count": state.get("step_count", 0) + 1,
}
tool_map = {t.name: t for t in tools}
def tool_executor_node(state: AgentState) -> dict:
last_message = state["messages"][-1]
results = []
for call in last_message.tool_calls:
tool_fn = tool_map[call["name"]]
output = tool_fn.invoke(call["args"])
results.append(
ToolMessage(content=str(output), tool_call_id=call["id"])
)
return {"messages": results}
# --- Routing ---
def should_continue(state: AgentState) -> str:
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tool_executor"
return END
# --- Graph Assembly ---
graph_builder = StateGraph(AgentState)
graph_builder.add_node("agent", agent_node)
graph_builder.add_node("tool_executor", tool_executor_node)
graph_builder.add_edge(START, "agent")
graph_builder.add_conditional_edges("agent", should_continue)
graph_builder.add_edge("tool_executor", "agent")
checkpointer = MemorySaver()
graph = graph_builder.compile(checkpointer=checkpointer)
# --- Run ---
result = graph.invoke(
{
"messages": [HumanMessage(content="What's the weather in Tokyo and what's 24 * 15?")],
"step_count": 0,
},
config={"configurable": {"thread_id": "session-1"}},
)
print(result["messages"][-1].content)
print(f"Total agent steps: {result['step_count']}")
Add a safety check in should_continue that routes to END if step_count exceeds a threshold (e.g., 5). This prevents runaway loops where the LLM keeps calling tools indefinitely. This is a real-world production pattern — always set a max-step guard on agentic graphs.
Conditional Edges and Dynamic Routing
Static edges connect nodes in a fixed order, but real-world agent workflows are rarely linear. A query classifier might route to different specialist nodes. A quality check might loop back for a retry. A planner might dispatch work to multiple nodes simultaneously. Conditional edges are how you express all of this in LangGraph.
The core idea: a path function inspects the current state and returns a string (or list of strings) indicating which node(s) should execute next. LangGraph evaluates this function at runtime, making the graph's topology dynamic.
The add_conditional_edges Method
The primary API for branching is add_conditional_edges(source, path_fn, path_map). The source is the node that just finished executing. The path_fn receives the current state and returns a string identifying the next node. The optional path_map translates those return values into actual node names.
from langgraph.graph import StateGraph, END
def classify_query(state: AgentState) -> str:
"""Route based on the query category in state."""
category = state["query_category"]
if category == "billing":
return "billing"
elif category == "technical":
return "technical"
else:
return "general"
builder = StateGraph(AgentState)
builder.add_node("classifier", classifier_node)
builder.add_node("billing_agent", billing_node)
builder.add_node("tech_agent", tech_node)
builder.add_node("general_agent", general_node)
# path_fn returns a string; path_map maps it to a node name
builder.add_conditional_edges(
"classifier",
classify_query,
{
"billing": "billing_agent",
"technical": "tech_agent",
"general": "general_agent",
},
)
Without path_map, the return value of path_fn must exactly match a registered node name (or the special END constant). The map is convenient when you want your routing logic decoupled from your node naming, or when you need to route to END — you can return a plain string like "finish" and map it to END.
When you provide a path_map, LangGraph uses it at compile time to know all possible destinations. If you omit path_map, LangGraph infers destinations from type hints or you must explicitly list them. Always prefer providing a path_map for clarity and to help visualization tools render your graph correctly.
Post-Processing with then
Sometimes every branch should converge on the same node after completing. Instead of adding a normal edge from each destination to the convergence node, use the then parameter. It specifies a node that runs after whichever branch was selected.
# After any specialist finishes, always run the "formatter" node
builder.add_conditional_edges(
"classifier",
classify_query,
{
"billing": "billing_agent",
"technical": "tech_agent",
"general": "general_agent",
},
then="formatter", # convergence node
)
This is equivalent to adding builder.add_edge("billing_agent", "formatter"), builder.add_edge("tech_agent", "formatter"), and builder.add_edge("general_agent", "formatter") — but expressed in a single line.
Routing Diagram
Here's how a typical routing pattern looks. The classifier inspects the incoming query, routes to a specialist, and includes a retry loop for the tech agent when a quality check fails.
graph TD
A["🔍 classifier"] -->|billing| B["💰 billing_agent"]
A -->|technical| C["🔧 tech_agent"]
A -->|general| D["📋 general_agent"]
C --> E{"quality_check"}
E -->|pass| F["✅ formatter"]
E -->|fail & retries < 3| C
E -->|fail & retries ≥ 3| F
B --> F
D --> F
F --> G(["END"])
The destinations Parameter on add_node
LangGraph offers an alternative syntax for conditional routing: specifying destinations directly on add_node. Instead of a separate add_conditional_edges call, you declare the routing function and its possible destinations as part of the node definition itself. The node function must return a Command object (covered below) for this to work.
from langgraph.graph import END
# Declare possible destinations so the graph knows the topology
builder.add_node(
"classifier",
classifier_node,
destinations=("billing_agent", "tech_agent", "general_agent", END),
)
This keeps node definition and routing co-located. The destinations tuple tells LangGraph at compile time which nodes this node can route to, which is essential for graph validation and visualization.
Dynamic Fan-Out with Send Objects
Returning a single string routes to one node. But what if you need to send work to multiple nodes simultaneously, each with different inputs? This is where Send objects come in. Your path function returns a list of Send objects, each specifying a target node and the state to send to it.
from langgraph.constants import Send
def fan_out_to_reviewers(state: AgentState) -> list[Send]:
"""Send each document chunk to a separate reviewer node."""
return [
Send("reviewer", {"chunk": chunk, "chunk_index": i})
for i, chunk in enumerate(state["document_chunks"])
]
builder.add_conditional_edges("splitter", fan_out_to_reviewers)
Each Send("reviewer", {...}) creates a parallel execution of the "reviewer" node with its own input state. LangGraph runs all of them concurrently and collects the results. When combined with a reducer on your state (like operator.add on a list field), the fan-in happens automatically — all reviewer outputs get merged back into the parent state.
import operator
from typing import Annotated, TypedDict
class AgentState(TypedDict):
document_chunks: list[str]
# Reducer: append all review results from parallel runs
reviews: Annotated[list[dict], operator.add]
The Command Object — Routing from Inside a Node
With add_conditional_edges, routing logic lives outside the node in a separate path function. The Command object flips this: it lets a node combine a state update and a routing decision in a single return value. This is especially powerful when the routing decision depends on computation that already happened inside the node.
from langgraph.types import Command
def classifier_node(state: AgentState) -> Command:
"""Classify the query and route in one step."""
result = llm.invoke(
f"Classify this query: {state['query']}\n"
"Return one of: billing, technical, general"
)
category = result.content.strip().lower()
# Update state AND specify the next node
return Command(
update={"query_category": category},
goto=category + "_agent", # e.g. "billing_agent"
)
The Command object accepts two key arguments: update (a dict of state changes, applied exactly like a normal node return) and goto (a string or list of strings naming the next node(s)). When using Command, you must declare the destinations parameter on add_node so LangGraph knows the possible transitions at compile time.
# Register the node with its possible destinations
builder.add_node(
"classifier",
classifier_node,
destinations=("billing_agent", "tech_agent", "general_agent"),
)
# No add_conditional_edges needed — routing is inside the node
Use Command when the routing decision is a natural byproduct of the node's computation — it avoids duplicating logic in a separate path function. Use add_conditional_edges when you want routing logic decoupled from node logic, or when routing depends only on state that's already computed. Both approaches are equally valid; pick whichever makes your code easier to follow.
Common Patterns
Pattern 1: Routing Based on LLM Output
The most common pattern in agent workflows. An LLM classifies the input, and the graph routes based on that classification. Use structured output or constrained generation to ensure the LLM returns a valid route.
from pydantic import BaseModel, Literal
class RouteDecision(BaseModel):
route: Literal["search", "calculator", "direct_answer"]
def route_by_llm(state: AgentState) -> str:
decision = llm.with_structured_output(RouteDecision).invoke(
f"How should I handle: {state['query']}"
)
return decision.route
builder.add_conditional_edges("router", route_by_llm)
Pattern 2: Routing Based on State Values
Simple branching based on flags, counts, or computed fields already in the state. No LLM call needed — this is pure logic.
def should_continue(state: AgentState) -> str:
if state.get("error") and state["retry_count"] < 3:
return "retry"
elif state.get("needs_human_review"):
return "human_review"
else:
return "finalize"
builder.add_conditional_edges(
"processor",
should_continue,
{"retry": "processor", "human_review": "reviewer", "finalize": END},
)
Pattern 3: Retry Loops
A quality check node evaluates the output and routes back to the same node if it doesn't meet criteria. The key is tracking the retry count in state to prevent infinite loops.
def quality_gate(state: AgentState) -> Command:
score = evaluate_output(state["draft_response"])
if score >= 0.8:
return Command(update={"final_response": state["draft_response"]}, goto="formatter")
elif state["retry_count"] >= 3:
# Accept best effort after max retries
return Command(update={"final_response": state["draft_response"]}, goto="formatter")
else:
return Command(
update={
"retry_count": state["retry_count"] + 1,
"feedback": f"Score {score:.1f} — improve clarity and specificity.",
},
goto="drafter", # loop back
)
Pattern 4: Fan-Out / Fan-In
Dispatch work to multiple nodes in parallel, then collect results. Use Send for different inputs per branch, or return a list of node names from the path function for same-input fan-out.
# Same-input fan-out: return a list of node names
def parallel_analysis(state: AgentState) -> list[str]:
"""Run sentiment, entity, and topic analysis in parallel."""
return ["sentiment_analyzer", "entity_extractor", "topic_classifier"]
builder.add_conditional_edges("preprocessor", parallel_analysis)
# Different-input fan-out: return Send objects
def map_to_workers(state: AgentState) -> list[Send]:
return [
Send("worker", {"task": task, "context": state["context"]})
for task in state["task_list"]
]
builder.add_conditional_edges("planner", map_to_workers)
When multiple branches write to the same state field, you must define a reducer (e.g., Annotated[list, operator.add]) for that field. Without a reducer, the last branch to complete silently overwrites all earlier results. This is one of the most common bugs in LangGraph fan-out patterns.
Quick Reference
| Mechanism | Where routing lives | Fan-out support | Best for |
|---|---|---|---|
add_conditional_edges | Separate path function | Return list of strings or Send objects | Decoupled routing logic |
Command object | Inside the node | goto=["node_a", "node_b"] | Co-located state update + routing |
destinations param | Declared on add_node | Via Command | Compile-time topology hints |
Send objects | Path function or Command | Each Send gets custom input | Map-reduce / parallel processing |
Graph Compilation and the Execution Engine
Defining nodes and edges gives you a blueprint — a StateGraph object. But this blueprint isn't executable yet. You need to compile it into a runnable object, and that compilation step does far more than just "freeze" the graph. It validates structure, resolves routing logic, and wires up persistence and interrupts.
What .compile() Actually Does
When you call .compile(), LangGraph performs several validation and preparation steps before returning a CompiledGraph — an immutable, executable object that implements the standard LangChain Runnable interface.
Compilation validates that every edge target references an existing node, that no orphan nodes exist (every node is reachable from the entry point), and that the graph has a valid START entry point and at least one path to END. If any of these checks fail, you get a clear error at compile time — not a mysterious failure at runtime.
from langgraph.graph import StateGraph, START, END
builder = StateGraph(AgentState)
builder.add_node("llm_call", call_llm)
builder.add_node("tool_exec", execute_tool)
builder.add_edge(START, "llm_call")
builder.add_conditional_edges("llm_call", should_use_tool, {
"yes": "tool_exec",
"no": END,
})
builder.add_edge("tool_exec", "llm_call")
# Compilation: validates structure, resolves conditionals, returns executable
app = builder.compile()
After this call, app is a CompiledGraph. You cannot add or remove nodes from it. If you need a different graph topology, you compile a new one. This immutability is deliberate — it guarantees that the execution semantics are fixed and deterministic once the graph is compiled.
Compile-Time Options
The .compile() method accepts several keyword arguments that configure the execution engine's behavior. These options are baked into the compiled graph and apply to every invocation.
| Option | Type | Purpose |
|---|---|---|
checkpointer | BaseCheckpointSaver | Enables state persistence across invocations. Required for memory, resumption, and human-in-the-loop workflows. |
interrupt_before | list[str] | Pauses execution before the listed nodes run. Used for human approval gates. |
interrupt_after | list[str] | Pauses execution after the listed nodes complete. Useful for reviewing a node's output before continuing. |
debug | bool | Enables verbose logging of every superstep, node execution, and state mutation. Defaults to False. |
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = builder.compile(
checkpointer=checkpointer,
interrupt_before=["tool_exec"], # pause for human approval
debug=True, # verbose execution logs
)
Without a checkpointer, interrupt_before and interrupt_after have no effect — there's nowhere to save the paused state. If you're building human-in-the-loop workflows, persistence with a checkpointer is mandatory, not optional.
The Execution Model: Supersteps
Understanding how a compiled graph runs is the key to reasoning about LangGraph behavior. The execution model is based on supersteps — discrete, atomic rounds of computation borrowed from the Pregel model of distributed graph processing.
When you call invoke(), the engine doesn't just run nodes one at a time in a serial chain. Instead, it follows a precise loop:
-
Identify eligible nodes
The engine examines which nodes have all their incoming edges satisfied. On the first superstep, this is whichever node
STARTpoints to. On subsequent supersteps, it's any node whose predecessor(s) just completed. -
Execute all eligible nodes in parallel
All eligible nodes in a superstep run concurrently. Each node receives the current state as input and returns a partial state update (a dict of the fields it wants to modify).
-
Merge updates via reducers
The engine collects every node's returned state update and applies them to the graph state using the reducer functions defined in your state schema. If two nodes both update the same field, the reducer determines how those updates combine (e.g., append for lists, overwrite for scalars).
-
Evaluate edges and repeat
The engine evaluates outgoing edges — including conditional edges — from the nodes that just ran. This determines which nodes are eligible for the next superstep. The loop repeats until a node routes to
ENDor an interrupt is hit.
sequenceDiagram
participant C as Client
participant E as Execution Engine
participant S1 as Superstep 1
participant R as Reducer
participant S2 as Superstep 2
C->>E: invoke(initial_state)
activate E
E->>S1: Execute eligible nodes (e.g., llm_call)
activate S1
S1-->>E: Return partial state updates
deactivate S1
E->>R: Merge updates into graph state
R-->>E: Updated state
E->>E: Evaluate edges (conditional routing)
E->>S2: Execute next eligible nodes (e.g., tool_exec)
activate S2
S2-->>E: Return partial state updates
deactivate S2
E->>R: Merge updates into graph state
R-->>E: Updated state
E->>E: Edge routes to END
E-->>C: Return final state
deactivate E
Invoking the Graph: Sync and Async
The compiled graph exposes the standard LangChain Runnable interface, which means you get both synchronous and asynchronous execution out of the box.
# Synchronous — blocks until END is reached or interrupt is hit
final_state = app.invoke(
{"messages": [HumanMessage(content="What's the weather in Paris?")]},
config={"configurable": {"thread_id": "session-42"}},
)
# Asynchronous — same semantics, but non-blocking
final_state = await app.ainvoke(
{"messages": [HumanMessage(content="What's the weather in Paris?")]},
config={"configurable": {"thread_id": "session-42"}},
)
Both invoke() and ainvoke() run the full superstep loop and return the final graph state. The config dict with a thread_id is required when using a checkpointer — it tells the persistence layer which conversation thread to load and save state for. Without a checkpointer, you can omit it.
Recursion Limit: Your Safety Net
Cyclic graphs are one of LangGraph's most powerful features — they let agents loop, retry, and self-correct. But cycles also mean the possibility of infinite loops: a buggy conditional edge that never routes to END, or an LLM that keeps deciding to call tools forever.
LangGraph prevents this with a recursion_limit that defaults to 25 supersteps. If the graph hasn't reached END within that limit, it raises a GraphRecursionError. You can override this per invocation via the config:
# Allow up to 50 supersteps for complex multi-tool workflows
final_state = app.invoke(
{"messages": [HumanMessage(content="Research and summarize AI news")]},
config={"recursion_limit": 50},
)
Each superstep can involve LLM calls that cost time and money. Setting recursion_limit=1000 on a graph with a runaway loop means hundreds of API calls before the error is raised. Start with the default of 25 and increase only when you understand why your graph needs more iterations.
Graph Introspection and Visualization
Once compiled, you can inspect the graph's structure programmatically. The .get_graph() method returns a representation of nodes and edges that you can query or visualize. This is invaluable for debugging and documentation.
# Get the graph structure object
graph = app.get_graph()
# Print all nodes
print(graph.nodes)
# {'__start__': ..., 'llm_call': ..., 'tool_exec': ..., '__end__': ...}
# Generate a Mermaid diagram string for visualization
mermaid_syntax = graph.draw_mermaid()
print(mermaid_syntax)
# Or render directly to a PNG image (requires additional dependencies)
from IPython.display import Image
Image(graph.draw_mermaid_png())
The draw_mermaid() method outputs a Mermaid-formatted string that you can paste into any Mermaid renderer — including GitHub markdown, Notion, or the Mermaid Live Editor. The draw_mermaid_png() variant renders it to an image directly, which is perfect for Jupyter notebooks during development.
Call app.get_graph().draw_mermaid() right after compiling — before you ever invoke the graph. This catches structural mistakes (wrong edges, missing connections) visually, which is much faster than debugging unexpected runtime behavior.
Streaming: Tokens, Events, Updates, and Custom Data
Real-time streaming is what separates a polished AI application from a demo. When an LLM takes 5 seconds to generate a full response, your users shouldn't be staring at a spinner — they should see tokens appear as they're generated. LangGraph treats streaming as a first-class primitive, offering seven distinct stream modes that let you pick exactly the granularity you need.
The Seven Stream Modes at a Glance
| Mode | What You Get | Best For |
|---|---|---|
values | Full state snapshot after each node executes | Debugging, state inspection |
updates | Only the delta (changed keys) from each node | Efficient UIs, progress tracking |
messages | LLM tokens one-by-one as they're generated | Chatbots, real-time text display |
custom | User-defined events via StreamWriter | Progress bars, status updates |
events | LangChain callback events (on_chat_model_start, etc.) | Detailed observability, logging |
debug | Internal execution details (task starts, state checkpoints) | Debugging graph execution flow |
tasks | Task-level progress (queued, running, completed) | Monitoring parallel node execution |
You don't have to pick just one — modes can be combined. Let's walk through the three you'll use most, then cover combining modes, granular event streaming, and custom data emission.
Mode 1: values — Full State After Each Step
The values mode emits the entire state object after each node finishes executing. This is the simplest mode to reason about: you always see the complete picture. The tradeoff is bandwidth — if your state is large, you're sending redundant data with every step.
inputs = {"messages": [HumanMessage(content="Explain quantum computing")]}
for chunk in graph.stream(inputs, stream_mode="values"):
# chunk is the FULL state dict after each node
messages = chunk["messages"]
print(f"State has {len(messages)} message(s)")
print(f"Latest: {messages[-1].content[:80]}...")
Each chunk is a complete copy of your graph's state. If your graph runs three nodes, you'll get three chunks — each one containing the full messages list accumulated up to that point.
Mode 2: updates — Only the Delta
The updates mode sends only what changed. Each chunk is a dictionary keyed by the node name, with the value being the state update that node returned. This is more efficient for frontends that maintain their own state and just need to apply patches.
for chunk in graph.stream(inputs, stream_mode="updates"):
# chunk is {node_name: state_update_dict}
for node_name, update in chunk.items():
print(f"Node '{node_name}' produced:")
if "messages" in update:
print(f" {update['messages'][-1].content[:80]}...")
If a chatbot node returns {"messages": [AIMessage(...)]}, the chunk will be {"chatbot": {"messages": [AIMessage(...)]}}. Notice the node name wrapping the update — this tells you which node produced the change.
Mode 3: messages — Token-by-Token Streaming
This is the mode you want for chatbot UIs. The messages mode hooks into the LLM's streaming interface and yields individual tokens as they're generated. Each emission is a tuple of (message_chunk, metadata).
for msg_chunk, metadata in graph.stream(inputs, stream_mode="messages"):
# msg_chunk.content is a single token (or small piece of text)
# metadata tells you which node and model produced it
if msg_chunk.content:
print(msg_chunk.content, end="", flush=True)
# Output appears token-by-token:
# Quantum► comput►ing► is► a► field► that► lever►ages►...
The messages mode requires your nodes to use LangChain chat models (e.g., ChatOpenAI). If a node does plain Python computation without calling an LLM, it won't emit message chunks — use updates or custom to stream data from those nodes.
The metadata dictionary includes the langgraph_node key, so you can filter tokens by which node emitted them — useful in multi-agent setups where multiple LLMs are running.
# Filter tokens to only show output from the "writer" node
for msg_chunk, metadata in graph.stream(inputs, stream_mode="messages"):
if metadata["langgraph_node"] == "writer" and msg_chunk.content:
print(msg_chunk.content, end="", flush=True)
Combining Multiple Stream Modes
You often want token-level streaming and node-level progress updates at the same time. Pass a list of modes to stream_mode and LangGraph multiplexes them into a single stream. When you combine modes, each emission becomes a tuple of (mode_name, chunk).
for mode, chunk in graph.stream(inputs, stream_mode=["messages", "updates"]):
if mode == "messages":
msg_chunk, metadata = chunk
if msg_chunk.content:
print(msg_chunk.content, end="", flush=True)
elif mode == "updates":
for node_name in chunk:
print(f"\n--- Node '{node_name}' completed ---")
This pattern is perfect for chat UIs that need to stream tokens to the user while also updating a sidebar with "Agent is thinking…" or "Tool call completed" status messages.
Granular Control with astream_events
For the most fine-grained observability, the async astream_events method gives you access to every LangChain callback event — model starts, tool invocations, retriever calls, and more. Each event has a name, event type, and data payload.
async for event in graph.astream_events(inputs, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
# Individual LLM token
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
elif kind == "on_tool_start":
print(f"\n🔧 Calling tool: {event['name']}")
elif kind == "on_tool_end":
print(f"✅ Tool result: {event['data'].output[:100]}")
Prefer stream_mode="messages" for simple token streaming. Reach for astream_events only when you need to intercept specific callback types like tool invocations or retriever queries — it produces a lot of events and requires careful filtering.
Emitting Custom Data with StreamWriter
What if you need to stream progress from a node that doesn't call an LLM — say, a data processing step or a multi-stage retrieval pipeline? The StreamWriter lets you push arbitrary data into the stream from inside any node function. You access it by adding a writer parameter (typed StreamWriter) to your node signature.
from langgraph.config import get_stream_writer
def research_node(state: AgentState):
writer = get_stream_writer()
writer({"status": "searching", "query": state["query"]})
results = search_api(state["query"])
writer({"status": "ranking", "num_results": len(results)})
ranked = rank_results(results)
writer({"status": "complete", "top_result": ranked[0]["title"]})
return {"search_results": ranked}
On the consumer side, these custom events show up when you include "custom" in your stream modes:
for mode, chunk in graph.stream(inputs, stream_mode=["custom", "updates"]):
if mode == "custom":
# chunk is whatever dict you passed to writer()
print(f"[{chunk['status']}] {chunk}")
elif mode == "updates":
for node_name in chunk:
print(f"Node '{node_name}' finished")
# Output:
# [searching] {'status': 'searching', 'query': 'LangGraph streaming'}
# [ranking] {'status': 'ranking', 'num_results': 15}
# [complete] {'status': 'complete', 'top_result': 'Streaming Guide'}
# Node 'research_node' finished
Custom stream data is ephemeral — it is not persisted in checkpoints. If you replay from a checkpoint, custom events won't be re-emitted. Use state updates for any data that must survive graph interrupts or replays.
Async Streaming
Every synchronous streaming method has an async counterpart. In async Python contexts (FastAPI, Jupyter notebooks, etc.), use astream instead of stream. The API is identical — just add async for.
# In an async context (FastAPI route, Jupyter, etc.)
async for msg_chunk, metadata in graph.astream(
inputs, stream_mode="messages"
):
if msg_chunk.content:
yield msg_chunk.content # e.g., SSE to a frontend
Tool Calling and ToolNode Integration
Tools are what turn an LLM from a text generator into an agent that can take action — searching databases, calling APIs, running calculations. LangGraph provides a clean two-step pattern: the LLM decides which tool to call and with what arguments, then a dedicated node executes that tool and feeds results back.
This section covers both the prebuilt ToolNode for quick setups and the custom node approach for when you need full control.
Defining Tools
You define tools using the @tool decorator or by subclassing BaseTool. The decorator approach is the most common — the function's docstring becomes the tool description the LLM sees, so write it carefully.
from langchain_core.tools import tool
@tool
def search_orders(query: str, limit: int = 5) -> list[dict]:
"""Search customer orders by keyword. Returns matching orders
with order_id, status, and total amount."""
# Your actual DB/API logic here
results = db.orders.search(query, limit=limit)
return [{"order_id": r.id, "status": r.status, "total": r.total} for r in results]
@tool
def get_weather(city: str) -> str:
"""Get the current weather for a city. Returns temperature and conditions."""
response = weather_api.get(city=city)
return f"{response.temp}°F, {response.conditions}"
For tools that need complex input validation or async execution, subclass BaseTool instead. This gives you separate _run and _arun methods plus Pydantic schema control.
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
class OrderSearchInput(BaseModel):
query: str = Field(description="Search keyword for orders")
limit: int = Field(default=5, ge=1, le=50)
class SearchOrdersTool(BaseTool):
name: str = "search_orders"
description: str = "Search customer orders by keyword."
args_schema: type[BaseModel] = OrderSearchInput
def _run(self, query: str, limit: int = 5) -> list[dict]:
return db.orders.search(query, limit=limit)
async def _arun(self, query: str, limit: int = 5) -> list[dict]:
return await db.orders.async_search(query, limit=limit)
Binding Tools to the LLM
Before the LLM can call tools, you bind them using llm.bind_tools(). This converts each tool's name, description, and parameter schema into the format the model expects (e.g., OpenAI function calling schema). The bound model doesn't execute tools itself — it returns tool_calls in its response that a downstream node executes.
from langchain_openai import ChatOpenAI
tools = [search_orders, get_weather]
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)
The Prebuilt ToolNode
The fastest way to wire up tool execution is with ToolNode. It inspects the last AI message in state, finds any tool_calls, executes the corresponding tools, and returns the results as ToolMessage objects appended to the message list.
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, MessagesState, START, END
tools = [search_orders, get_weather]
tool_node = ToolNode(tools)
def call_model(state: MessagesState):
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
graph = StateGraph(MessagesState)
graph.add_node("agent", call_model)
graph.add_node("tools", tool_node)
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", tools_condition)
graph.add_edge("tools", "agent")
app = graph.compile()
tools_condition is a prebuilt routing function. It checks the last AI message: if it contains tool_calls, it routes to the "tools" node. Otherwise, it routes to END. This creates the classic agent loop — call model → execute tools → call model again — until the LLM responds without requesting any tools.
Handling Tool Errors Gracefully
Tools can fail — APIs time out, inputs are invalid, databases go down. By default, ToolNode raises exceptions and your graph halts. Setting handle_tool_errors=True catches exceptions and returns the error message as a ToolMessage, giving the LLM a chance to recover (retry with different arguments, apologize, or try a different tool).
# Boolean: catches all exceptions, returns str(error) as ToolMessage
tool_node = ToolNode(tools, handle_tool_errors=True)
# String: returns a fixed message for any error
tool_node = ToolNode(tools, handle_tool_errors="Tool failed. Please try again with different inputs.")
# Callable: custom error handler for full control
def custom_error_handler(error: Exception, tool_call: dict) -> str:
if isinstance(error, RateLimitError):
return "Rate limit hit. Please wait before retrying this tool."
return f"Error in {tool_call['name']}: {str(error)}"
tool_node = ToolNode(tools, handle_tool_errors=custom_error_handler)
Writing a Custom Tool Execution Node
When you need more than what ToolNode offers — custom logging, result transformation, selective parallel execution, or tool-specific retry logic — write your own tool node. The pattern is straightforward: extract tool_calls from the last AI message, dispatch to the right function, and return ToolMessage objects.
import json
import asyncio
from langchain_core.messages import ToolMessage
tools_by_name = {t.name: t for t in tools}
async def custom_tool_node(state: MessagesState):
last_message = state["messages"][-1]
results = []
# Execute all tool calls in parallel
async def run_one(tool_call):
tool = tools_by_name[tool_call["name"]]
try:
result = await tool.ainvoke(tool_call["args"])
# Transform result before returning to the LLM
return ToolMessage(
content=json.dumps(result) if not isinstance(result, str) else result,
tool_call_id=tool_call["id"],
name=tool_call["name"],
)
except Exception as e:
return ToolMessage(
content=f"Error: {str(e)}",
tool_call_id=tool_call["id"],
name=tool_call["name"],
)
results = await asyncio.gather(
*[run_one(tc) for tc in last_message.tool_calls]
)
return {"messages": list(results)}
Use this custom node in your graph exactly where you'd use ToolNode:
graph = StateGraph(MessagesState)
graph.add_node("agent", call_model)
graph.add_node("tools", custom_tool_node) # drop-in replacement
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", tools_condition)
graph.add_edge("tools", "agent")
app = graph.compile()
InjectedToolArg — Passing Runtime Context to Tools
Some tools need context that the LLM shouldn't control or even see — the current user's ID, a database session, an auth token. InjectedToolArg marks parameters as runtime-injected: they're excluded from the tool schema sent to the LLM and instead populated from RunnableConfig at execution time.
from typing import Annotated
from langchain_core.tools import tool, InjectedToolArg
@tool
def get_user_orders(
status: str,
user_id: Annotated[str, InjectedToolArg], # Hidden from LLM
) -> list[dict]:
"""Get orders for the current user filtered by status."""
return db.orders.filter(user_id=user_id, status=status)
@tool
def update_profile(
name: str,
user_id: Annotated[str, InjectedToolArg],
db_session: Annotated[Session, InjectedToolArg],
) -> str:
"""Update the current user's display name."""
user = db_session.query(User).get(user_id)
user.name = name
db_session.commit()
return f"Updated name to {name}"
When you invoke the graph, pass the injected values through the config. The ToolNode automatically extracts them and passes them to the tool.
result = app.invoke(
{"messages": [("user", "Show my pending orders")]},
config={"configurable": {"user_id": "usr_abc123"}},
)
Never let the LLM supply values like user_id or api_key as regular tool arguments. A prompt injection attack could trick the model into passing a different user's ID. Always use InjectedToolArg for security-sensitive parameters so they come from your application code, not the model.
Tool Output Validation Pattern
Sometimes you don't want to blindly forward tool results back to the agent. A validation node sits between tool execution and the next model call, checking whether results make sense before the agent acts on them. This is especially useful for tools that return data the LLM will use in downstream decisions.
from langchain_core.messages import ToolMessage
def validate_tool_output(state: MessagesState):
"""Check tool results before passing them back to the agent."""
messages = state["messages"]
last_tool_messages = []
# Collect all ToolMessages from the most recent tool execution
for msg in reversed(messages):
if isinstance(msg, ToolMessage):
last_tool_messages.append(msg)
else:
break
for tool_msg in last_tool_messages:
# Example: redact sensitive data before the LLM sees it
if "ssn" in tool_msg.content.lower():
tool_msg.content = redact_pii(tool_msg.content)
# Example: flag suspicious results
try:
data = json.loads(tool_msg.content)
if isinstance(data, list) and len(data) > 100:
tool_msg.content = json.dumps(data[:20]) # Truncate large results
except (json.JSONDecodeError, TypeError):
pass
return {"messages": []} # No state changes needed if modifying in place
A cleaner approach is to route based on the validation outcome — sending results back to the tool for retry if they fail checks:
def check_result(state: MessagesState):
"""Route based on whether the tool output is valid."""
last_msg = state["messages"][-1]
if isinstance(last_msg, ToolMessage) and "error" in last_msg.content.lower():
return "agent" # Let the agent decide what to do with the error
return "continue"
graph.add_node("agent", call_model)
graph.add_node("tools", tool_node)
graph.add_node("validate", validate_tool_output)
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", tools_condition)
graph.add_edge("tools", "validate")
graph.add_conditional_edges("validate", check_result, {
"agent": "agent",
"continue": "agent",
})
Prebuilt ToolNode vs Custom Node — When to Use Which
| Criteria | Prebuilt ToolNode | Custom Tool Node |
|---|---|---|
| Setup complexity | One line: ToolNode(tools) | 10–30 lines of custom logic |
| Error handling | handle_tool_errors param | Full try/except with custom recovery |
| Parallel execution | Built-in | You control concurrency limits |
| Result transformation | Not supported | Transform, filter, or redact before returning |
| Logging/metrics | Use callbacks | Inline instrumentation |
| Tool-specific retry logic | Not supported | Retry individual tools with backoff |
| Best for | Prototypes, standard agents | Production systems, sensitive data flows |
Begin every project with the prebuilt ToolNode. It handles parallel execution, error formatting, and InjectedToolArg out of the box. Switch to a custom node only when you hit a concrete limitation — not preemptively. The custom node is a drop-in replacement, so the migration is painless.
Checkpointing: Pause, Resume, and Time-Travel
Every time your LangGraph graph completes a superstep, the entire state is serialized and saved as a checkpoint. This is LangGraph's "short-term memory" — it means your graph can survive process restarts, resume conversations across HTTP requests, and even rewind to any earlier point in the execution. If you've ever wished you could undo an LLM's bad answer and retry from three turns ago, checkpointing makes that trivial.
Checkpointing unlocks three powerful capabilities: resuming a conversation by reloading the latest state, time-traveling to any previous checkpoint, and branching from a historical state to explore a different path. All of this comes from a single abstraction: the checkpointer.
sequenceDiagram
participant User
participant Graph
participant Checkpointer as Checkpointer (DB)
User->>Graph: invoke(input, config={thread_id: "t1"})
Graph->>Graph: Superstep 1 executes
Graph->>Checkpointer: Save checkpoint (cp-1)
Graph->>Graph: Superstep 2 executes
Graph->>Checkpointer: Save checkpoint (cp-2)
Graph-->>User: Return final output
Note over User,Checkpointer: Later — new request, same thread_id
User->>Graph: invoke(input, config={thread_id: "t1"})
Graph->>Checkpointer: Load latest checkpoint (cp-2)
Checkpointer-->>Graph: Restored state
Graph->>Graph: Superstep 3 continues
Graph->>Checkpointer: Save checkpoint (cp-3)
Graph-->>User: Return output
Choosing a Checkpointer
LangGraph ships with multiple checkpointer backends. You pick the one that matches your environment — the graph code stays exactly the same regardless of which backend you use.
| Checkpointer | Backend | Use Case | Persistence |
|---|---|---|---|
MemorySaver | In-memory dict | Development, testing, notebooks | Lost on process restart |
SqliteSaver | SQLite | Single-server production, prototyping | Persisted to disk |
PostgresSaver | PostgreSQL | Multi-server production | Shared across instances |
All checkpointers implement the same BaseCheckpointSaver interface. Swapping from MemorySaver to PostgresSaver requires changing exactly one line of code — the checkpointer instantiation.
Wiring Up a Checkpointer
To enable checkpointing, you pass a checkpointer instance to compile(). Then, every time you invoke the graph, you include a thread_id in the config. The thread ID is how LangGraph groups related checkpoints into a single conversation thread.
from langgraph.checkpoint.memory import MemorySaver
# 1. Create a checkpointer
checkpointer = MemorySaver()
# 2. Compile the graph with the checkpointer
app = graph.compile(checkpointer=checkpointer)
# 3. Invoke with a thread_id — this IS the conversation identifier
config = {"configurable": {"thread_id": "user-123-session-1"}}
result = app.invoke({"messages": [("user", "What is LangGraph?")]}, config)
# 4. Later, invoke again with the SAME thread_id to continue
result = app.invoke({"messages": [("user", "Tell me more")]}, config)
On the second invocation, LangGraph loads the latest checkpoint for thread_id: "user-123-session-1", restores the full state (including all previous messages), and continues from where it left off. The graph doesn't know or care that there was a gap between requests.
Inspecting State and Checkpoint History
Checkpointing isn't just a black box that saves and restores — you can reach in and inspect exactly what's stored. Two methods give you full visibility into your graph's state over time.
Get Current State
graph.get_state(config) returns the latest checkpoint for a thread. The returned StateSnapshot contains the current values of every state channel, the checkpoint config, and metadata about which node ran last.
state = app.get_state(config)
print(state.values) # The full state dict (e.g. messages, counters)
print(state.next) # Tuple of nodes scheduled to run next
print(state.config) # Config with thread_id and checkpoint_id
print(state.parent_config) # Config pointing to the previous checkpoint
Get Full Checkpoint History
graph.get_state_history(config) returns an iterator over every checkpoint for that thread, from newest to oldest. This is your audit trail — you can see exactly what the state looked like after each superstep.
for snapshot in app.get_state_history(config):
print(f"Checkpoint: {snapshot.config['configurable']['checkpoint_id']}")
print(f" Num messages: {len(snapshot.values.get('messages', []))}")
print(f" Next node(s): {snapshot.next}")
print()
Checkpoint Structure
Every checkpoint is identified by three pieces of information that together form a linked list of states:
thread_id— Groups checkpoints into a conversation. All invocations with the same thread ID share a timeline.checkpoint_id— A unique identifier (typically a UUID) for a specific point in time within the thread.parent_config— A reference to the previous checkpoint's config. This creates a chain you can walk backward through, making time-travel possible.
state = app.get_state(config)
# The config that uniquely identifies THIS checkpoint
state.config
# {'configurable': {'thread_id': 'user-123-session-1',
# 'checkpoint_id': '1ef6a...'}}
# The config pointing to the PREVIOUS checkpoint
state.parent_config
# {'configurable': {'thread_id': 'user-123-session-1',
# 'checkpoint_id': '1ef6b...'}}
Time-Travel: Resuming from a Previous Checkpoint
Time-travel means re-invoking your graph from a checkpoint that isn't the latest one. Maybe the LLM hallucinated on turn 5 and you want to retry from turn 4. You find the checkpoint you want, grab its config, and invoke with that config. LangGraph forks from that point — the original timeline is untouched.
# Step 1: Find the checkpoint you want to rewind to
history = list(app.get_state_history(config))
target_checkpoint = history[2] # e.g., two steps back
# Step 2: Re-invoke using that checkpoint's config
# This forks from the old state — the original history is preserved
result = app.invoke(
{"messages": [("user", "Let's try a different approach")]},
target_checkpoint.config
)
Manually Modifying State
Sometimes you don't want to re-invoke from a past checkpoint — you want to edit the current state directly. graph.update_state() lets you patch specific values in the state, creating a new checkpoint with your modifications. This is especially useful for human-in-the-loop corrections, like fixing a tool call result or removing a bad message.
from langchain_core.messages import AIMessage
# Inject a corrected AI response into the state
app.update_state(
config,
{"messages": [AIMessage(content="The correct answer is 42.")]},
as_node="chatbot" # attribute this update to a specific node
)
# The next invocation continues from this corrected state
result = app.invoke(None, config)
update_state applies your values through the same reducers defined in your state schema. If your messages channel uses an append-style reducer, the new message is appended — it doesn't replace the list. Design your reducers with manual updates in mind.
Production Setup: Switching to PostgresSaver
Moving from development to production means swapping MemorySaver for a durable backend. Here's the one-line change to use PostgreSQL — everything else in your graph code remains identical.
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:pass@localhost:5432/myapp"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup() # Creates tables on first run
app = graph.compile(checkpointer=checkpointer)
# Everything else is the same
config = {"configurable": {"thread_id": "user-456"}}
result = app.invoke({"messages": [("user", "Hello")]}, config)
Use MemorySaver in your tests and notebooks for speed, then set the checkpointer via an environment variable or dependency injection in production. This way, your graph logic is never coupled to a specific storage backend.
Human-in-the-Loop: Approvals, Edits, and Interrupts
Autonomous agents are powerful, but sometimes you need a human to approve a dangerous action, correct a hallucinated output, or choose between competing strategies. LangGraph provides first-class primitives for pausing execution mid-graph, surfacing information to a human, and resuming with their input.
There are three core patterns for inserting human decision points. Each trades off flexibility against simplicity — choose the one that matches your UX.
sequenceDiagram
participant Client
participant Graph
participant Node
Client->>Graph: invoke(input, config)
Graph->>Node: execute node
Node->>Graph: interrupt("approve?")
Graph-->>Client: paused — returns interrupt value
Note over Client: Human reviews and decides
Client->>Graph: Command(resume="approved")
Graph->>Node: resumes with "approved"
Node->>Graph: returns result
Graph-->>Client: final output
Pattern 1 — Interrupt and Resume
The most flexible pattern. You call interrupt(value) inside any node function to pause the graph and send an arbitrary value to the client. The client inspects that value, gets human input, and resumes with Command(resume=response). The interrupt() call then returns the human's response, and the node continues executing from that point.
This is ideal when the node itself knows best when to pause — for example, after generating a plan but before executing it.
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict
class State(TypedDict):
query: str
plan: str
result: str
def planning_node(state: State) -> dict:
plan = llm.invoke(f"Create a plan for: {state['query']}")
# Pause here — surface the plan for human approval
human_response = interrupt({
"plan": plan.content,
"question": "Do you approve this plan? (yes/edit/no)"
})
if human_response == "no":
return {"result": "Cancelled by user."}
# If human edited, use their version; otherwise use original
approved_plan = human_response if human_response != "yes" else plan.content
return {"plan": approved_plan}
def execution_node(state: State) -> dict:
result = llm.invoke(f"Execute this plan: {state['plan']}")
return {"result": result.content}
# Build the graph
builder = StateGraph(State)
builder.add_node("planner", planning_node)
builder.add_node("executor", execution_node)
builder.add_edge("__start__", "planner")
builder.add_edge("planner", "executor")
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
Running this graph pauses at the interrupt() call. Resume it by invoking with a Command:
config = {"configurable": {"thread_id": "session-42"}}
# First invocation — graph pauses at interrupt()
result = graph.invoke({"query": "Summarize Q3 revenue"}, config)
# result contains the interrupt value:
# {"plan": "1. Pull Q3 data ...", "question": "Do you approve..."}
# Human approves — resume the graph
final = graph.invoke(Command(resume="yes"), config)
print(final["result"])
All human-in-the-loop patterns require a checkpointer. Without one, the graph has no way to persist its paused state between invocations. Use MemorySaver for development and a database-backed checkpointer (Postgres, SQLite) for production.
Pattern 2 — Approve/Reject with interrupt_before
If you don't want to modify node code, you can pause the graph before a specific node runs by passing interrupt_before at compile time. This is perfect for gating dangerous operations — the node itself stays clean, and the approval logic lives in the orchestration layer.
After the graph pauses, you inspect the current state to see what's about to happen. Then you either resume (let the node run) or modify the state first.
def generate_email(state: State) -> dict:
draft = llm.invoke(f"Draft email for: {state['query']}")
return {"draft": draft.content}
def send_email(state: State) -> dict:
"""Dangerous node — actually sends an email."""
email_service.send(to=state["recipient"], body=state["draft"])
return {"result": "Email sent successfully."}
builder = StateGraph(State)
builder.add_node("drafter", generate_email)
builder.add_node("sender", send_email)
builder.add_edge("__start__", "drafter")
builder.add_edge("drafter", "sender")
# Gate the dangerous node — graph pauses BEFORE sender runs
graph = builder.compile(
checkpointer=MemorySaver(),
interrupt_before=["sender"]
)
The client workflow: invoke, inspect, then approve or reject:
config = {"configurable": {"thread_id": "email-review-7"}}
# Step 1: Run — pauses before "sender"
graph.invoke({"query": "Send Q3 results to stakeholders"}, config)
# Step 2: Inspect state to see the draft
snapshot = graph.get_state(config)
print(snapshot.values["draft"]) # Review the LLM-generated email
print(snapshot.next) # ("sender",) — confirms what's next
# Step 3a: Approve — resume with None to let sender run
graph.invoke(None, config)
# Step 3b: OR Reject — update state to skip sending
# graph.update_state(config, {"result": "Email rejected by reviewer."})
Pattern 3 — Edit State Before Resuming
The most powerful pattern for human correction. After any interrupt (whether from interrupt() or interrupt_before), you can call update_state() to modify the graph's state before resuming. This lets a human fix hallucinated data, adjust parameters, or rewrite LLM outputs.
config = {"configurable": {"thread_id": "edit-session-3"}}
# Graph runs and pauses before "sender"
graph.invoke({"query": "Weekly update to the team"}, config)
# Human reviews the draft and wants to edit it
snapshot = graph.get_state(config)
original_draft = snapshot.values["draft"]
# Correct the LLM output before the email gets sent
edited_draft = original_draft.replace(
"Revenue increased by 500%", # LLM hallucination
"Revenue increased by 12%" # Actual figure
)
# Write the corrected state back
graph.update_state(config, {"draft": edited_draft})
# Now resume — sender uses the human-corrected draft
graph.invoke(None, config)
Redirecting Flow with Command(goto=...)
Sometimes the human's decision shouldn't just continue the current path — it should redirect the graph to a different node entirely. Command accepts a goto parameter that overrides the graph's normal edges. This is useful for "retry" flows or routing based on human judgment.
def review_node(state: State) -> dict:
decision = interrupt({
"draft": state["draft"],
"options": ["approve", "revise", "escalate"]
})
if decision == "approve":
return Command(goto="send_node")
elif decision == "revise":
return Command(goto="drafting_node") # Loop back to rewrite
else:
return Command(goto="escalation_node") # Route to a manager
With goto, a single review node becomes a routing hub. The human controls where the graph goes next without needing complex conditional edges.
Choosing the Right Pattern
| Pattern | Best For | Who Controls the Pause? | Modifies State? |
|---|---|---|---|
interrupt(value) | Dynamic pauses decided inside node logic | The node itself | Via resume value |
interrupt_before | Gating dangerous nodes without changing their code | Graph compile config | Optional via update_state |
update_state() | Human correction of LLM outputs | Either of the above | Yes — directly |
UX Patterns in Practice
Chatbot Confirmation
The most common pattern: an agent proposes an action and asks the user before executing. The interrupt() value becomes the confirmation message shown in the chat UI. The user's reply is the resume value.
def tool_call_node(state: State) -> dict:
tool_name = state["proposed_tool"]
tool_args = state["proposed_args"]
# Ask user before running the tool
approval = interrupt({
"message": f"I'd like to call `{tool_name}` with {tool_args}. OK?",
"type": "confirmation"
})
if approval["confirmed"]:
result = execute_tool(tool_name, tool_args)
return {"tool_result": result}
else:
# Human selected a different tool
alt = approval.get("alternative_tool", tool_name)
result = execute_tool(alt, tool_args)
return {"tool_result": result}
Review Workflows
For document generation, code review, or content pipelines, use interrupt_before to pause before the "publish" step. A reviewer inspects the output via get_state(), edits with update_state() if needed, and then resumes. Multiple reviewers can be chained by having multiple interrupt points in sequence.
Human-Guided Tool Selection
When the agent is uncertain which tool to use, interrupt() can present the options and let the human choose. The Command(goto=...) pattern then routes to the appropriate tool node based on the human's selection, turning the agent into a semi-autonomous assistant that defers to human expertise on ambiguous decisions.
Design your interrupt() values as structured data (dicts with type, message, options fields) rather than plain strings. This makes it easy for frontend code to render the right UI component — a confirmation dialog, a multi-select, or a text editor — based on the interrupt type.
Memory: Short-Term (Checkpoints) vs Long-Term (Store)
LangGraph separates memory into two distinct systems: short-term memory powered by checkpointers, and long-term memory powered by the Store API. Understanding when to use each — and how they complement each other — is key to building agents that feel stateful and personalized.
Short-term memory is scoped to a single conversation thread. Long-term memory persists across every thread and conversation your application handles. Think of it like a person's working memory (what's happening right now) versus their actual memories (what they recall from the past).
erDiagram
THREAD ||--o{ CHECKPOINT : "has many"
CHECKPOINT {
string thread_id
string checkpoint_id
json channel_values
json metadata
}
USER ||--o{ MEMORY : "has many"
MEMORY {
string namespace
string key
json value
}
THREAD }o--|| GRAPH_EXECUTION : "runs in"
MEMORY }o--|| GRAPH_EXECUTION : "accessed during"
GRAPH_EXECUTION {
string graph_id
string thread_id
string run_id
}
Short-Term Memory: The Checkpointer
A checkpointer automatically saves the full state of your graph after every node execution. This state is scoped to a thread — a single conversation identified by a thread_id. When a user resumes a conversation, the checkpointer restores exactly where they left off, including all messages, intermediate values, and pending tasks.
You've likely already used this if you've passed a thread_id in config. The checkpointer handles resume, replay, and even time-travel debugging — rolling back to any previous step in the conversation.
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Each thread_id gets its own conversation history
config = {"configurable": {"thread_id": "user-123-session-1"}}
result = graph.invoke({"messages": [("user", "Hi there!")]}, config)
# Resume the same conversation later — full state is restored
result = graph.invoke({"messages": [("user", "What did I just say?")]}, config)
The critical limitation: checkpointer data belongs to a thread. Start a new thread, and the agent has no memory of previous conversations. That's where long-term memory comes in.
Long-Term Memory: The Store API
The Store (built on the BaseStore interface) is a namespaced key-value store that lives outside any single thread. It persists data across conversations, across threads, and across time. You use it to remember things about users, cache expensive results, or build a knowledge base that grows with every interaction.
The Store API has three core operations:
| Operation | Signature | Purpose |
|---|---|---|
put | store.put(namespace, key, value) | Create or update an item |
get | store.get(namespace, key) | Retrieve a specific item by key |
search | store.search(namespace, query=...) | Find items — with optional semantic search |
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
# Store a user preference
store.put(("users", "user-123", "preferences"), "theme", {"value": "dark", "set_at": "2024-11-01"})
# Retrieve it later — even from a different thread
item = store.get(("users", "user-123", "preferences"), "theme")
print(item.value) # {"value": "dark", "set_at": "2024-11-01"}
# Search across a namespace
results = store.search(("users", "user-123", "preferences"))
for item in results:
print(f"{item.key}: {item.value}")
Accessing the Store Inside Graph Nodes
LangGraph injects the store into your nodes at runtime — you just declare it as a parameter. This works the same way that config and state are injected. The store you pass to compile() becomes available in every node automatically.
from langchain_core.runnables import RunnableConfig
from langgraph.store.base import BaseStore
def personalize_response(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
"""Node that reads user preferences from the Store."""
user_id = config["configurable"]["user_id"]
# Fetch preferences saved across all previous conversations
prefs = store.get(("users", user_id, "preferences"), "communication_style")
style = prefs.value["style"] if prefs else "neutral"
# Use `style` to customize the LLM system prompt...
return {"messages": [ai_response]}
def learn_preferences(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
"""Node that writes user preferences to the Store."""
user_id = config["configurable"]["user_id"]
# Persist something learned during this conversation
store.put(
("users", user_id, "preferences"),
"communication_style",
{"style": "concise", "learned_from": "thread-456"}
)
return state
Wire these nodes into your graph, then compile with both a checkpointer and a store:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore
checkpointer = MemorySaver() # short-term: per-thread conversation state
store = InMemoryStore() # long-term: cross-thread persistent memory
graph = builder.compile(checkpointer=checkpointer, store=store)
Namespace Design Patterns
Namespaces are tuples that create a hierarchical path to your data — much like a file system. A well-designed namespace scheme keeps your store organized as it scales. The convention is to move from general to specific: (entity_type, entity_id, data_category).
# User-scoped data
store.put(("users", "u-42", "preferences"), "theme", {"value": "dark"})
store.put(("users", "u-42", "preferences"), "language", {"value": "en"})
store.put(("users", "u-42", "facts"), "hometown", {"value": "Austin, TX"})
# Organization-scoped knowledge base
store.put(("orgs", "acme", "policies"), "refund-policy", {"text": "...", "updated": "2024-10"})
# Search everything under a namespace level
all_prefs = store.search(("users", "u-42", "preferences"))
all_user_data = store.search(("users", "u-42")) # broader search
Always include the user_id in your namespace when storing user-specific data. This ensures one user can never accidentally read another user's memories, and makes cleanup straightforward when a user requests data deletion.
Semantic Search in the Store
The store.search() method supports an optional query parameter for semantic (embedding-based) search. Instead of exact key lookups, you can find memories by meaning. This is powerful for use cases like recalling relevant facts about a user when the exact key isn't known.
from langchain_openai import OpenAIEmbeddings
store = InMemoryStore(index={"embed": OpenAIEmbeddings(model="text-embedding-3-small")})
# Store facts about the user (embeddings generated automatically)
store.put(("users", "u-42", "facts"), "diet", {"text": "User is vegetarian and avoids gluten"})
store.put(("users", "u-42", "facts"), "hobby", {"text": "User enjoys trail running on weekends"})
# Later, search by meaning — not exact keys
results = store.search(("users", "u-42", "facts"), query="food preferences", limit=3)
# Returns the "diet" fact even though "food" doesn't appear in the key
Development vs Production Stores
For local development and testing, InMemoryStore works perfectly — it's fast, requires zero setup, and keeps everything in process memory. For production, switch to PostgresStore (or another persistent backend) so that memories survive restarts and scale across multiple server instances.
from langgraph.store.memory import InMemoryStore
# Zero config — great for prototyping
store = InMemoryStore()
graph = builder.compile(checkpointer=MemorySaver(), store=store)
from langgraph.store.postgres import PostgresStore
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:pass@localhost:5432/myapp"
# Persistent — survives restarts, scales horizontally
with PostgresStore.from_conn_string(DB_URI) as store:
store.setup() # creates tables on first run
checkpointer = PostgresSaver.from_conn_string(DB_URI)
checkpointer.setup()
graph = builder.compile(checkpointer=checkpointer, store=store)
The Relationship: Conversation-Scoped vs Application-Scoped
The mental model is straightforward. The checkpointer owns everything about a single conversation thread — the messages, the state at each step, the ability to rewind or resume. The store owns everything that transcends any single conversation — user profiles, learned facts, cached computations, shared knowledge.
| Aspect | Checkpointer (Short-Term) | Store (Long-Term) |
|---|---|---|
| Scope | Single thread / conversation | Entire application / all threads |
| Keyed by | thread_id | Namespace tuple + key |
| Stores | Full graph state (messages, channels) | Arbitrary key-value data |
| Written by | Automatic after each node | Explicit store.put() calls |
| Use case | Resume, replay, time-travel | User prefs, facts, knowledge base |
| Dev implementation | MemorySaver | InMemoryStore |
| Prod implementation | PostgresSaver | PostgresStore |
The checkpointer writes automatically — you don't call any save method. The Store requires explicit put() calls, giving you full control over what gets persisted long-term and when. This is by design: not everything in a conversation is worth remembering forever.
Common Use Cases for Long-Term Store
- User preferences across conversations — Store language, tone, formatting preferences that the agent recalls in every new thread.
- Building a knowledge base from interactions — Extract facts from conversations (e.g., "user works at Acme Corp") and store them for future reference.
- Caching expensive computations — Save the results of costly API calls or complex reasoning chains so they can be reused without re-running.
- Cross-agent shared memory — In multi-agent systems, one agent can write findings to the store and another agent can read them, even in a different thread.
Subgraphs and Graph Composition
Real-world LLM applications rarely fit inside a single flat graph. You might have a research agent, a writing agent, and a review agent — each with its own internal logic. LangGraph handles this with subgraphs: a compiled graph added as a node in a parent graph. The parent orchestrates; the children execute.
There are three composition patterns you need to know: shared state, different state with mapping, and subgraph-as-a-tool. Each solves a different integration problem.
graph TB
subgraph Parent["Parent Graph"]
direction TB
Start((Start)) --> Router{Route}
subgraph SG1["research_agent (Subgraph)"]
direction TB
R1[search_web] --> R2[summarize]
R2 --> R3[extract_facts]
end
subgraph SG2["writer_agent (Subgraph)"]
direction TB
W1[draft] --> W2[review]
W2 --> W3[polish]
end
Router -->|research| SG1
Router -->|write| SG2
SG1 --> Merge[merge_results]
SG2 --> Merge
Merge --> End((End))
end
style Parent fill:#1a1a2e,stroke:#7c3aed,stroke-width:2px,color:#e2e8f0
style SG1 fill:#1e293b,stroke:#3b82f6,stroke-width:2px,color:#e2e8f0
style SG2 fill:#1e293b,stroke:#10b981,stroke-width:2px,color:#e2e8f0
Pattern 1: Subgraph with Shared State
The simplest pattern. When the parent graph and child graph share the same state schema, state flows directly — no transformation needed. The subgraph reads from and writes to the same state keys the parent uses.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator
class SharedState(TypedDict):
messages: Annotated[list, operator.add]
facts: Annotated[list, operator.add]
# --- Child graph: research agent ---
def search_web(state: SharedState) -> dict:
return {"messages": ["Searched the web for relevant info."]}
def extract_facts(state: SharedState) -> dict:
return {"facts": ["LangGraph supports subgraphs."]}
child_builder = StateGraph(SharedState)
child_builder.add_node("search_web", search_web)
child_builder.add_node("extract_facts", extract_facts)
child_builder.add_edge(START, "search_web")
child_builder.add_edge("search_web", "extract_facts")
child_builder.add_edge("extract_facts", END)
research_agent = child_builder.compile()
# --- Parent graph ---
def synthesize(state: SharedState) -> dict:
return {"messages": [f"Synthesized {len(state['facts'])} facts."]}
parent_builder = StateGraph(SharedState)
parent_builder.add_node("research", research_agent) # compiled graph as a node
parent_builder.add_node("synthesize", synthesize)
parent_builder.add_edge(START, "research")
parent_builder.add_edge("research", "synthesize")
parent_builder.add_edge("synthesize", END)
parent_graph = parent_builder.compile()
The key line is add_node("research", research_agent). You pass the compiled graph directly — LangGraph treats it as any other node. Because both graphs use SharedState, the parent's state is passed into the subgraph and the subgraph's output merges back using the reducer functions (here, operator.add).
Pattern 2: Subgraph with Different State
Often, a subgraph has its own internal state that doesn't match the parent. A research subgraph might track search_queries and raw_results internally, while the parent only cares about messages and facts. You bridge the gap with an input/output transformation function.
class ParentState(TypedDict):
messages: Annotated[list, operator.add]
topic: str
class ResearchState(TypedDict):
search_queries: list[str]
raw_results: list[str]
summary: str
# Build and compile the research subgraph using ResearchState
research_builder = StateGraph(ResearchState)
# ... add nodes and edges ...
research_subgraph = research_builder.compile()
# Wrapper function: transforms parent state → subgraph state → parent state
def call_research(state: ParentState) -> dict:
# Map parent state to subgraph input
result = research_subgraph.invoke({
"search_queries": [state["topic"]],
"raw_results": [],
"summary": "",
})
# Map subgraph output back to parent state
return {"messages": [f"Research summary: {result['summary']}"]}
parent_builder = StateGraph(ParentState)
parent_builder.add_node("research", call_research) # wrapper, not raw subgraph
parent_builder.add_edge(START, "research")
parent_builder.add_edge("research", END)
parent_graph = parent_builder.compile()
Shared state is simpler but couples the subgraph to the parent's schema. Different state with a wrapper function gives you full isolation — the subgraph can evolve independently. Prefer different state when subgraphs are maintained by different teams or reused across multiple parent graphs.
Pattern 3: Subgraph as a Tool
Sometimes you want the LLM itself to decide when to invoke a subgraph. Wrap the compiled graph in a tool, and the LLM can call it like any other function. This is especially powerful in multi-agent architectures where a supervisor agent delegates tasks dynamically.
from langchain_core.tools import tool
# Assume research_subgraph is already compiled
@tool
def research_tool(topic: str) -> str:
"""Run a multi-step research workflow on the given topic."""
result = research_subgraph.invoke({
"search_queries": [topic],
"raw_results": [],
"summary": "",
})
return result["summary"]
# Now bind this tool to your LLM in a ReAct-style agent
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(
model=llm,
tools=[research_tool], # LLM decides when to call this
)
The LLM sees research_tool as a function with a description. When it determines it needs research, it invokes the tool — which internally runs the entire subgraph. This gives you LLM-driven orchestration instead of hardcoded routing.
When to Use Subgraphs
| Scenario | Pattern | Why |
|---|---|---|
| Complex logic that deserves its own graph | Shared or different state | Encapsulation — the parent graph stays clean and readable |
| Reusable component across multiple apps | Different state | Isolation — the subgraph doesn't depend on any parent's schema |
| Team-based development | Different state | Each team owns and tests its subgraph independently |
| LLM should decide when to invoke | Subgraph as tool | Dynamic orchestration — routing is not hardcoded |
| Independent checkpointing / streaming | Any pattern | Subgraphs get their own namespaced checkpoints |
Accessing Subgraph State and Checkpoints
LangGraph automatically namespaces subgraph checkpoints under the parent. This means the parent can inspect the internal state of any subgraph at any point — critical for debugging and observability. You do this with the subgraphs=True parameter.
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
parent_graph = parent_builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "session-42"}}
result = parent_graph.invoke({"messages": [], "topic": "LangGraph"}, config)
# Get full state snapshot including subgraph internals
state = parent_graph.get_state(config, subgraphs=True)
# The state object includes nested subgraph states
print(state.values) # parent state
for task in state.tasks:
# Each task may contain subgraph state snapshots
print(f"Task: {task.name}")
if hasattr(task, 'state') and task.state:
print(f" Subgraph state: {task.state.values}")
When you pass subgraphs=True, the returned StateSnapshot includes the full state tree. Each subgraph's checkpoint is stored under a namespaced key derived from the node name and the thread ID. This lets you replay, inspect, or even modify subgraph state mid-execution when using human-in-the-loop patterns.
When you stream a parent graph with stream_mode="updates", you get events from subgraph nodes too. The event metadata includes the namespace path so you can tell which subgraph emitted it. Use subgraphs=True in graph.stream() to get a tuple of (namespace, event) for each update.
Putting It Together: A Complete Example
Here's a compact end-to-end example showing a parent graph that composes two subgraphs — one with shared state and one invoked through a wrapper.
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
import operator
# --- Shared state ---
class AppState(TypedDict):
messages: Annotated[list[str], operator.add]
next_step: str
# --- Subgraph A: validator (shared state) ---
def validate(state: AppState) -> dict:
is_valid = len(state["messages"]) > 0
return {"messages": [f"Validation: {'passed' if is_valid else 'failed'}"]}
validator_builder = StateGraph(AppState)
validator_builder.add_node("validate", validate)
validator_builder.add_edge(START, "validate")
validator_builder.add_edge("validate", END)
validator = validator_builder.compile()
# --- Subgraph B: formatter (different state, via wrapper) ---
class FormatterState(TypedDict):
raw_text: str
formatted_text: str
def format_text(state: FormatterState) -> dict:
return {"formatted_text": state["raw_text"].upper()}
formatter_builder = StateGraph(FormatterState)
formatter_builder.add_node("format_text", format_text)
formatter_builder.add_edge(START, "format_text")
formatter_builder.add_edge("format_text", END)
formatter_subgraph = formatter_builder.compile()
def call_formatter(state: AppState) -> dict:
last_msg = state["messages"][-1] if state["messages"] else ""
result = formatter_subgraph.invoke({"raw_text": last_msg, "formatted_text": ""})
return {"messages": [result["formatted_text"]]}
# --- Router ---
def router(state: AppState) -> Literal["validator", "formatter"]:
return state.get("next_step", "validator")
# --- Parent graph ---
parent = StateGraph(AppState)
parent.add_node("validator", validator) # shared state subgraph
parent.add_node("formatter", call_formatter) # different state via wrapper
parent.add_conditional_edges(START, router)
parent.add_edge("validator", END)
parent.add_edge("formatter", END)
app = parent.compile(checkpointer=MemorySaver())
# Run it
config = {"configurable": {"thread_id": "demo-1"}}
result = app.invoke(
{"messages": ["Hello world"], "next_step": "formatter"},
config,
)
print(result["messages"])
# ["Hello world", "HELLO WORLD"]
Multi-Agent Architectures: Supervisor, Swarm, and Hierarchical
When a single agent isn't enough — when your application needs research and coding and planning — you need multiple agents working together. LangGraph provides three primary patterns for orchestrating multi-agent systems, each with distinct trade-offs in control, flexibility, and scalability.
Before diving into architectures, every pattern shares one building block: the individual agent. LangGraph's create_react_agent prebuilt gives you a fully functional ReAct agent in a single call, which you can then compose into any multi-agent topology.
from langgraph.prebuilt import create_react_agent
# Each specialist is a self-contained ReAct agent with its own tools
researcher = create_react_agent(
model, tools=[search_tool, wiki_tool], name="researcher"
)
coder = create_react_agent(
model, tools=[python_repl, file_writer], name="coder"
)
reviewer = create_react_agent(
model, tools=[lint_tool, test_runner], name="reviewer"
)
Each of these agents is a full subgraph — it has its own state, its own tool-calling loop, and can be invoked independently. The architecture you choose determines how these agents get orchestrated.
graph LR
subgraph Supervisor["① Supervisor Pattern"]
direction TB
S[Supervisor LLM] -->|route| A1[Researcher]
S -->|route| A2[Coder]
S -->|route| A3[Reviewer]
A1 -->|result| S
A2 -->|result| S
A3 -->|result| S
end
subgraph Swarm["② Swarm Pattern"]
direction TB
B1[Researcher] <-->|handoff| B2[Coder]
B2 <-->|handoff| B3[Reviewer]
B1 <-->|handoff| B3
end
subgraph Hierarchical["③ Hierarchical Pattern"]
direction TB
T[Coordinator] --> TL1[Research Lead]
T --> TL2[Engineering Lead]
TL1 --> SP1[Web Searcher]
TL1 --> SP2[Analyst]
TL2 --> SP3[Coder]
TL2 --> SP4[Tester]
end
The Supervisor Pattern
The supervisor pattern is the most intuitive multi-agent architecture. A central LLM — the supervisor — receives the user's request and decides which specialist agent to invoke next. After the specialist returns its result, the supervisor evaluates progress and either delegates to another agent or returns the final answer. This loop continues until the task is complete.
The supervisor acts as a router and orchestrator. It never does the "real work" itself — it reasons about which agent is best suited for the current step and dispatches accordingly.
from langgraph.prebuilt import create_react_agent
from langgraph_supervisor import create_supervisor
# Build specialist agents
researcher = create_react_agent(model, tools=[search_tool], name="researcher")
coder = create_react_agent(model, tools=[python_repl], name="coder")
# Supervisor decides who to call and when to stop
supervisor = create_supervisor(
model=model,
agents=[researcher, coder],
prompt=(
"You are a project manager. Delegate research tasks to 'researcher' "
"and implementation tasks to 'coder'. Combine their outputs into a "
"final answer."
),
)
app = supervisor.compile()
# The supervisor loops: route → specialist → evaluate → route or finish
result = app.invoke({
"messages": [{"role": "user", "content": "Research FastAPI auth patterns and write an example"}]
})
The supervisor sees the full message history, so it knows what each specialist has already done. It keeps looping — research, then code, then maybe more research — until it decides the task is fully addressed.
The Swarm Pattern
The swarm pattern removes the central controller entirely. Instead, agents hand off directly to each other using Command(goto="agent_name"). Each agent decides on its own when it's done and who should take over next. There's no supervisor making routing decisions — the intelligence is distributed across all agents.
This is particularly powerful when workflows aren't linear. A researcher might hand off to a coder, who discovers a gap and hands back to the researcher, who then escalates to a reviewer — all without a central bottleneck.
from langgraph.prebuilt import create_react_agent
from langgraph_swarm import create_swarm, create_handoff_tool
# Define handoff tools — these let agents transfer control
researcher = create_react_agent(
model,
tools=[search_tool, create_handoff_tool(agent_name="coder")],
name="researcher",
prompt="Research the topic. When you have enough info, hand off to 'coder'.",
)
coder = create_react_agent(
model,
tools=[python_repl, create_handoff_tool(agent_name="researcher")],
name="coder",
prompt="Write code based on research. Hand back to 'researcher' if you need more info.",
)
# No central controller — agents self-organize
swarm = create_swarm(agents=[researcher, coder], default_agent="researcher")
app = swarm.compile()
result = app.invoke({
"messages": [{"role": "user", "content": "Build a Redis caching decorator"}]
})
Under the hood, create_handoff_tool generates a tool that, when called by the agent, returns a Command(goto="agent_name"). This transfers control — and optionally state — to the target agent. The swarm graph routes execution to whichever agent was most recently handed off to.
The Hierarchical Pattern
The hierarchical pattern extends the supervisor idea: supervisors can supervise other supervisors. A top-level coordinator delegates to team leads, who in turn manage their own specialist agents. This creates a tree structure that mirrors how real organizations break down complex problems.
You build this by composing supervisors. Each "team" is itself a supervisor graph, and the top-level supervisor treats those teams as its agents.
from langgraph.prebuilt import create_react_agent
from langgraph_supervisor import create_supervisor
# --- Layer 1: Specialist agents ---
web_searcher = create_react_agent(model, tools=[search_tool], name="web_searcher")
analyst = create_react_agent(model, tools=[calc_tool], name="analyst")
coder = create_react_agent(model, tools=[python_repl], name="coder")
tester = create_react_agent(model, tools=[test_runner], name="tester")
# --- Layer 2: Team leads (supervisors over specialists) ---
research_team = create_supervisor(
model=model,
agents=[web_searcher, analyst],
name="research_lead",
prompt="Coordinate research: use web_searcher for data, analyst for analysis.",
).compile()
engineering_team = create_supervisor(
model=model,
agents=[coder, tester],
name="engineering_lead",
prompt="Coordinate engineering: coder writes code, tester validates it.",
).compile()
# --- Layer 3: Top-level coordinator ---
coordinator = create_supervisor(
model=model,
agents=[research_team, engineering_team],
prompt="Break the task into research and engineering phases. Delegate accordingly.",
)
app = coordinator.compile()
Each layer only sees its direct reports. The coordinator doesn't know about web_searcher or tester — it only talks to research_lead and engineering_lead. This encapsulation keeps prompts focused and manageable as the system scales.
Shared State vs. Isolated State
How agents share information is a critical design decision. LangGraph gives you two models for inter-agent communication, and the right choice depends on your architecture.
Shared state means all agents read from and write to the same graph state — typically a shared messages list. The supervisor pattern uses this by default: the supervisor and all specialists append to one conversation thread. This makes coordination simple but means agents can see (and be confused by) each other's intermediate work.
Isolated state means each agent subgraph maintains its own internal state. Only the inputs and outputs cross the boundary. The swarm pattern supports this through optional state transfer in handoffs — you control exactly what context the next agent receives.
In shared-state systems, specialist agents receive the full message history — including other agents' tool calls and intermediate reasoning. This inflates token usage and can confuse agents that lack context for those messages. If your specialists are getting distracted by irrelevant history, switch to isolated state with explicit input/output boundaries.
Trade-off Comparison
| Dimension | Supervisor | Swarm | Hierarchical |
|---|---|---|---|
| Complexity | Low — one routing LLM | Medium — distributed logic | High — nested supervisors |
| Bottleneck risk | High — every step goes through supervisor | None — agents route directly | Medium — distributed across team leads |
| Debuggability | Easy — linear trace through supervisor | Hard — nondeterministic handoff chains | Moderate — tree-structured traces |
| Latency per step | 2 LLM calls (supervisor + agent) | 1 LLM call (agent decides and acts) | 3+ LLM calls (coordinator → lead → agent) |
| Scalability | Limited — supervisor prompt grows with agents | Good — add agents without changing others | Best — add entire teams independently |
| Best for | 3–5 agents, well-defined tasks | Peer-to-peer workflows, customer service | Large orgs, 10+ agents, complex pipelines |
The supervisor pattern is the right default for most projects. It's the easiest to reason about, test, and debug. Only move to swarm when you need dynamic peer-to-peer handoffs, or to hierarchical when your agent count exceeds what a single supervisor can manage in its context window (typically 5–7 agents).
Mixing Patterns
These patterns aren't mutually exclusive. A common production architecture uses hierarchical supervisors at the top level with swarm-style handoffs within a team. For example, an engineering team lead (supervisor) might manage a coder and tester that hand off to each other freely via Command(goto=...), while the top-level coordinator routes between the engineering team and a research team using standard supervisor routing.
The key insight is that each "agent" in LangGraph is just a subgraph. Whether that subgraph is a single create_react_agent, a supervisor managing three specialists, or a swarm of peer agents — it all composes the same way. This composability is what makes LangGraph's multi-agent story powerful: you pick the right pattern at each level of your system.
The Functional API: @entrypoint and @task
Not every workflow needs a graph. LangGraph's Functional API lets you write workflows as plain Python functions while still getting checkpointing, resumption, and streaming for free. Instead of defining nodes and edges, you decorate a function with @entrypoint and break it into checkpointed steps with @task.
The core idea: an @entrypoint is your workflow boundary, and each @task inside it is a cached unit of work. If the workflow is interrupted and resumed, any task that already completed is skipped — its cached result is replayed instead of re-executed.
Basic Structure
Here's the minimal pattern. The @entrypoint decorator takes a checkpointer argument to enable state persistence. Each @task function returns a value that gets checkpointed automatically.
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
@task
def fetch_weather(city: str) -> dict:
"""Each @task is a checkpointed step."""
response = requests.get(f"https://api.weather.com/{city}")
return response.json()
@task
def fetch_news(topic: str) -> list:
response = requests.get(f"https://api.news.com/search?q={topic}")
return response.json()["articles"]
@entrypoint(checkpointer=MemorySaver())
def morning_briefing(inputs: dict) -> str:
city = inputs["city"]
topic = inputs["topic"]
# Tasks return futures — call .result() to get the value
weather = fetch_weather(city).result()
articles = fetch_news(topic).result()
return f"Weather in {city}: {weather['summary']}\nTop story: {articles[0]['title']}"
Notice that @task functions return futures. You call .result() to resolve them. This is how LangGraph tracks which tasks have completed — if you resume after a crash, any task whose .result() was already resolved will return its cached value instantly.
A Complete Research Workflow
Let's build something more realistic: a research workflow that queries multiple sources, then synthesizes the results with an LLM. Each API call is its own @task, so a failure in the synthesis step won't re-fetch data you already have.
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
@task
def search_arxiv(query: str) -> list[str]:
"""Search arXiv for recent papers. Checkpointed — won't re-run on resume."""
import arxiv
results = arxiv.Search(query=query, max_results=5).results()
return [f"{r.title}: {r.summary[:200]}" for r in results]
@task
def search_wikipedia(query: str) -> str:
"""Fetch Wikipedia summary. Checkpointed separately from arXiv."""
import wikipedia
return wikipedia.summary(query, sentences=5)
@task
def synthesize(question: str, papers: list[str], wiki_summary: str) -> str:
"""Use LLM to synthesize all sources into a research brief."""
context = f"Wikipedia:\n{wiki_summary}\n\nRecent Papers:\n"
context += "\n".join(f"- {p}" for p in papers)
response = llm.invoke(
f"Based on these sources, write a research brief about: {question}\n\n{context}"
)
return response.content
@entrypoint(checkpointer=MemorySaver())
def research(question: str) -> str:
# Both searches run as separate checkpointed tasks
papers = search_arxiv(question).result()
wiki = search_wikipedia(question).result()
# If the LLM call fails, arxiv + wikipedia results are still cached
brief = synthesize(question, papers, wiki).result()
return brief
# Invoke like any LangGraph graph
result = research.invoke(
"transformer architecture advances in 2024",
config={"configurable": {"thread_id": "research-001"}}
)
Checkpointed results are scoped to the thread_id. If you invoke the workflow with a new thread ID, all tasks run fresh. Same thread ID + same inputs = cached results on resume.
Returning Results with entrypoint.final
Sometimes you need to return a value to the caller and persist additional state for future resumptions. The entrypoint.final function lets you do both: the first argument is the return value, and the save parameter is the state written to the checkpoint.
@entrypoint(checkpointer=MemorySaver())
def research_with_memory(question: str, previous: dict) -> str:
# `previous` is loaded from the checkpoint (empty dict on first run)
past_queries = previous.get("queries", [])
papers = search_arxiv(question).result()
wiki = search_wikipedia(question).result()
brief = synthesize(question, papers, wiki).result()
# Return the brief to the caller, but also save state for next invocation
updated_state = {
"queries": past_queries + [question],
"last_brief": brief,
}
return entrypoint.final(value=brief, save=updated_state)
On the next invocation with the same thread_id, the previous parameter receives the saved state. This is how you build multi-turn workflows — each run accumulates context from prior runs.
Retry Policies on Tasks
External API calls fail. The @task decorator accepts a retry_policy parameter that handles transient errors automatically. You configure the backoff strategy and which exceptions to retry.
from langgraph.types import RetryPolicy
# Retry up to 3 times with exponential backoff
@task(retry_policy=RetryPolicy(max_attempts=3, backoff_factor=2.0))
def call_flaky_api(query: str) -> dict:
response = requests.get(f"https://unreliable-api.com/search?q={query}")
response.raise_for_status() # Raises on 4xx/5xx — triggers retry
return response.json()
# Retry only on specific exceptions
@task(retry_policy=RetryPolicy(
max_attempts=5,
retry_on=(requests.ConnectionError, requests.Timeout),
))
def call_slow_api(query: str) -> dict:
response = requests.get(
f"https://slow-api.com/data?q={query}", timeout=10
)
return response.json()
Functional API vs. StateGraph
Both APIs produce LangGraph workflows with checkpointing and streaming. The difference is in what control flow patterns they support naturally. Here's how to choose:
| Criteria | Functional API | StateGraph |
|---|---|---|
| Workflow shape | Linear or branching (tree) | Any shape, including cycles |
| Control flow | Python if/for/while | Conditional edges, routing functions |
| State management | Regular variables + entrypoint.final | Typed state schema with reducers |
| Cycles / loops | Not supported | First-class support |
| Human-in-the-loop | Supported via interrupt() | Supported via interrupt() |
| Learning curve | Low — just Python functions | Moderate — graph concepts required |
Use the Functional API when your workflow is a straight pipeline or a fan-out/fan-in tree — situations where standard Python control flow reads clearly. Switch to StateGraph when you need an agent loop (think → act → observe → think again) or complex conditional routing between many nodes.
Interoperability: Mixing Both APIs
The Functional API and StateGraph aren't mutually exclusive. A functional @entrypoint can be used as a node inside a StateGraph, and a compiled StateGraph can be called from within a @task. This lets you use the right abstraction at each level of your system.
from langgraph.graph import StateGraph, START, END
from langgraph.func import entrypoint, task
from typing import TypedDict
# A functional entrypoint used as a node in a StateGraph
@task
def fetch_data(url: str) -> dict:
return requests.get(url).json()
@entrypoint()
def research_node(inputs: dict) -> str:
data = fetch_data(inputs["url"]).result()
summary = llm.invoke(f"Summarize: {data}").content
return summary
# Wire it into a StateGraph alongside graph-native nodes
class AgentState(TypedDict):
url: str
research: str
decision: str
graph = StateGraph(AgentState)
graph.add_node("research", research_node) # functional entrypoint as a node
graph.add_node("decide", decide_next_step) # regular graph node
graph.add_edge(START, "research")
graph.add_edge("research", "decide")
graph.add_edge("decide", END)
app = graph.compile(checkpointer=MemorySaver())
A pragmatic approach: prototype your workflow with the Functional API for speed. If you later need cycles or complex routing, refactor just that portion into a StateGraph — and call it from your existing functional code, or vice versa.
Map-Reduce and Dynamic Fan-Out with Send
Graphs often need to run the same node multiple times in parallel with different inputs — research multiple topics, process a batch of documents, or query several APIs at once. LangGraph's Send primitive provides exactly this: dynamic fan-out from a conditional edge, where you spawn N parallel executions of a node at runtime based on the current state.
This is the map-reduce pattern in LangGraph. A planner decides what work to do, Send fans it out across parallel branches, and a reducer fans the results back in.
graph LR
P["🗂️ planner"] -->|conditional edge| S1["Send('research', topic1)"]
P -->|conditional edge| S2["Send('research', topic2)"]
P -->|conditional edge| S3["Send('research', topic3)"]
S1 --> R1["research (topic1)"]
S2 --> R2["research (topic2)"]
S3 --> R3["research (topic3)"]
R1 --> SYN["synthesize"]
R2 --> SYN
R3 --> SYN
style P fill:#4a9eff,color:#fff,stroke:#2d7cd6
style R1 fill:#f5a623,color:#fff,stroke:#d4891c
style R2 fill:#f5a623,color:#fff,stroke:#d4891c
style R3 fill:#f5a623,color:#fff,stroke:#d4891c
style SYN fill:#7ed321,color:#fff,stroke:#5ca018
style S1 fill:#eee,color:#333,stroke:#ccc
style S2 fill:#eee,color:#333,stroke:#ccc
style S3 fill:#eee,color:#333,stroke:#ccc
How Send Works
A Send object takes two arguments: the target node name and the input for that particular execution. You return a list of Send objects from a conditional edge function, and LangGraph spawns one execution per Send. The number of branches is determined at runtime — not at graph-compile time — which is what makes this "dynamic" fan-out.
Each Send creates its own isolated state for that branch. The target node receives only the input you pass through Send, not the full parent graph state. When all branches complete, their outputs are collected and merged back into the parent state via a reducer function.
Full Example: Plan → Research → Synthesize
Here's the complete map-reduce pattern. A planner generates research topics, each topic is researched in parallel, and a synthesizer combines all findings.
Step 1: Define the State with a Reducer
The findings field uses the operator.add reducer so that each parallel branch's output is appended rather than overwritten.
import operator
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
class ResearchState(TypedDict):
topic: str # main topic
subjects: list[str] # planned subtopics
findings: Annotated[list[str], operator.add] # reducer: append results
summary: str # final synthesis
Step 2: Define the Nodes
def planner(state: ResearchState) -> dict:
"""Generates subtopics to research in parallel."""
# In practice, call an LLM here to generate subtopics
subjects = [f"{state['topic']} - {area}" for area in ["history", "applications", "future"]]
return {"subjects": subjects}
def research(state: ResearchState) -> dict:
"""Runs once per Send — researches a single subject."""
subject = state["subject"] # note: this comes from Send's input, not parent state
# In practice, call an LLM or retriever here
finding = f"Key findings about {subject}: [researched content]"
return {"findings": [finding]} # list because the reducer uses operator.add
def synthesize(state: ResearchState) -> dict:
"""Combines all parallel research findings into a summary."""
all_findings = "\n".join(state["findings"])
summary = f"Synthesis of {len(state['findings'])} topics:\n{all_findings}"
return {"summary": summary}
Step 3: Wire Up the Graph with Send
The routing function inspects the planner's output and returns a Send for each subject. This is where the fan-out happens.
def route_research(state: ResearchState) -> list[Send]:
"""Fan out: create one Send per subject."""
return [Send("research", {"subject": s}) for s in state["subjects"]]
# Build the graph
graph = StateGraph(ResearchState)
graph.add_node("planner", planner)
graph.add_node("research", research)
graph.add_node("synthesize", synthesize)
graph.add_edge(START, "planner")
graph.add_conditional_edges("planner", route_research, ["research"])
graph.add_edge("research", "synthesize")
graph.add_edge("synthesize", END)
app = graph.compile()
Step 4: Run It
result = app.invoke({"topic": "quantum computing"})
print(result["summary"])
# Synthesis of 3 topics:
# Key findings about quantum computing - history: [researched content]
# Key findings about quantum computing - applications: [researched content]
# Key findings about quantum computing - future: [researched content]
Each parallel research branch returns {"findings": [finding]}. Because findings uses operator.add as its reducer, LangGraph concatenates these lists together. Without a reducer, each branch would overwrite the previous one and you'd only see the last result. A reducer on the collecting field is essential for map-reduce to work.
Send's Input Creates Isolated Branch State
The second argument to Send(node_name, input) becomes the entire state for that branch's execution. The target node sees only what you pass in — it does not automatically inherit the parent graph's full state. This is a deliberate design: each branch operates independently.
# You control exactly what each branch receives
return [
Send("research", {"subject": "quantum history", "depth": "detailed"}),
Send("research", {"subject": "quantum applications", "depth": "overview"}),
Send("research", {"subject": "quantum future", "depth": "speculative"}),
]
# Each branch can receive different parameters — not just different topics
Limitations of Send
| Limitation | Details | Workaround |
|---|---|---|
| No inter-branch communication | Parallel branches cannot read each other's outputs during execution. Each branch is fully isolated. | Do cross-branch logic in the downstream node (e.g., synthesize) after fan-in. |
| Isolated state per branch | Each Send creates a fresh state object. The branch doesn't see the parent's full state unless you explicitly pass it. | Include any needed parent state data in the Send input dict. |
| All branches must target the same downstream edge | The conditional edge function returns Send objects, and all branches converge at the same next node. | Use a router inside the target node if you need divergent logic. |
Send with Subgraphs for Complex Parallel Workflows
When each parallel branch needs to run multiple steps (not just a single node), you can target a subgraph with Send. The subgraph encapsulates an entire multi-step workflow that executes independently per branch.
class BranchState(TypedDict):
subject: str
raw_data: str
analysis: str
# Define a multi-step subgraph for each branch
branch_graph = StateGraph(BranchState)
branch_graph.add_node("fetch", fetch_data) # step 1: gather raw data
branch_graph.add_node("analyze", analyze_data) # step 2: analyze it
branch_graph.add_edge(START, "fetch")
branch_graph.add_edge("fetch", "analyze")
branch_graph.add_edge("analyze", END)
branch_subgraph = branch_graph.compile()
# Use the subgraph as a node in the parent graph
parent_graph = StateGraph(ResearchState)
parent_graph.add_node("planner", planner)
parent_graph.add_node("research_branch", branch_subgraph) # subgraph as node
parent_graph.add_node("synthesize", synthesize)
parent_graph.add_edge(START, "planner")
parent_graph.add_conditional_edges("planner", route_research, ["research_branch"])
parent_graph.add_edge("research_branch", "synthesize")
parent_graph.add_edge("synthesize", END)
Each Send("research_branch", {...}) now triggers a full fetch → analyze pipeline in parallel. The subgraph's final state is merged back into the parent via the reducer, just like a single-node branch.
You can mix Send objects with regular string returns in a conditional edge function. Return Send objects for the branches you want to fan out, and a string node name for a single next-node transition. This is useful when the planner sometimes decides parallelism isn't needed.
Error Handling, Retries, and Fault Tolerance
Production LLM agents live in a hostile world — API rate limits, model timeouts, transient network failures, and malformed responses are daily realities. LangGraph provides first-class primitives for retries, checkpointing, and graceful degradation so your agents can survive these failures without losing progress.
stateDiagram-v2
[*] --> NodeExecution
NodeExecution --> Success: No error
NodeExecution --> Failure: Exception raised
Success --> NextNode
NextNode --> [*]
Failure --> Retry: attempts < max
Retry --> NodeExecution: backoff + jitter
Failure --> ErrorState: max retries exceeded
ErrorState --> FallbackNode: fallback configured
ErrorState --> RaiseException: no fallback
FallbackNode --> NextNode
RaiseException --> [*]: graph stops, checkpoint preserves last good state
RetryPolicy Configuration
LangGraph’s RetryPolicy lets you configure automatic retries on a per-node basis. When a node raises a matching exception, the framework retries it with exponential backoff — no manual loop needed. You attach the policy directly when registering a node on the graph.
from langgraph.pregel import RetryPolicy
retry = RetryPolicy(
initial_interval=0.5, # seconds before first retry
backoff_factor=2.0, # multiply interval each attempt
max_interval=10.0, # cap the backoff at 10 seconds
max_attempts=5, # give up after 5 total attempts
jitter=True, # add randomness to avoid thundering herd
retry_on=(TimeoutError, ConnectionError), # only retry these
)
graph.add_node("call_llm", call_llm_node, retry=retry)
The retry sequence for the config above looks like: 0.5s → 1s → 2s → 4s → 8s (capped at 10s). With jitter=True, each interval gets a random offset so parallel retries don’t collide. If all 5 attempts fail, the original exception propagates up.
| Parameter | Default | Purpose |
|---|---|---|
initial_interval | 0.5 | Seconds to wait before the first retry |
backoff_factor | 2.0 | Multiplier applied to the interval after each attempt |
max_interval | 128.0 | Upper bound on wait time between retries (seconds) |
max_attempts | 3 | Total number of attempts (including the first call) |
jitter | True | Add random jitter to prevent thundering herd |
retry_on | All exceptions | Tuple of exception types that trigger a retry |
Each node can have its own RetryPolicy. A node that calls a flaky external API might get 5 retries with aggressive backoff, while a deterministic data-transform node needs none. Use retry_on to narrow which exceptions actually trigger retries — you don’t want to retry on ValueError caused by bad prompt logic.
Graceful Degradation with try/except
Retries handle transient failures, but sometimes you need logic within the node to degrade gracefully — return a partial result, use a cached value, or set a flag in state that downstream nodes can read. This is standard Python: wrap the risky call in try/except and update state accordingly.
async def research_node(state: AgentState) -> dict:
"""Fetch data from external API, fall back to cached results."""
try:
results = await fetch_research_api(state["query"], timeout=10)
return {"research": results, "research_source": "live"}
except (TimeoutError, ConnectionError) as e:
cached = get_cached_results(state["query"])
if cached:
return {"research": cached, "research_source": "cache"}
return {"research": [], "research_source": "unavailable", "errors": [str(e)]}
Downstream nodes can inspect state["research_source"] to decide how to proceed — for example, a summarizer node might add a disclaimer when working from cached data.
Fallback Node Pattern
A common production pattern is routing to a fallback node when the primary path fails. For example, if your primary GPT-4o call fails after retries, you can fall back to a cheaper, more reliable model. You implement this using conditional edges that check error state.
def primary_llm(state: AgentState) -> dict:
try:
response = call_gpt4o(state["messages"])
return {"messages": [response], "llm_failed": False}
except Exception as e:
return {"messages": [], "llm_failed": True, "last_error": str(e)}
def fallback_llm(state: AgentState) -> dict:
response = call_gpt4o_mini(state["messages"]) # cheaper, more reliable
return {"messages": [response], "llm_failed": False}
def route_after_primary(state: AgentState) -> str:
return "fallback_llm" if state.get("llm_failed") else "process_response"
graph.add_node("primary_llm", primary_llm)
graph.add_node("fallback_llm", fallback_llm)
graph.add_node("process_response", process_response)
graph.add_conditional_edges("primary_llm", route_after_primary)
graph.add_edge("fallback_llm", "process_response")
Timeout Handling in Nodes
LLM API calls can hang indefinitely if you don’t set timeouts. Always configure timeouts at the client level and within your node logic. The asyncio.wait_for wrapper is the most reliable approach for async nodes.
import asyncio
from langchain_openai import ChatOpenAI
# Client-level timeout (applies to every call via this instance)
llm = ChatOpenAI(model="gpt-4o", request_timeout=30)
async def call_llm_node(state: AgentState) -> dict:
try:
# Node-level timeout as a safety net
response = await asyncio.wait_for(
llm.ainvoke(state["messages"]),
timeout=45.0 # slightly longer than client timeout
)
return {"messages": [response]}
except asyncio.TimeoutError:
return {"messages": [], "llm_failed": True, "last_error": "LLM timeout"}
Checkpoint-Based Recovery
LangGraph checkpoints state after each superstep (a round of node executions). If your process crashes — a server restart, an OOM termination, a deployment — you don’t lose everything. The graph can resume from the last checkpoint, replaying only the incomplete step.
This works automatically when you configure a checkpointer. On resume, LangGraph loads the last saved state and re-executes from the node that was in progress when the crash occurred. Nodes that already completed are not re-run.
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://user:pass@localhost/db")
app = graph.compile(checkpointer=checkpointer)
# First run — crashes midway through "analyze" node
config = {"configurable": {"thread_id": "task-42"}}
try:
result = await app.ainvoke(initial_state, config)
except Exception:
pass # process died here
# After restart — resumes from last checkpoint, not from scratch
result = await app.ainvoke(None, config) # pass None to resume
If a node sends an email and then crashes before the checkpoint is written, resuming will re-run that node — and send the email again. For side effects like API calls, database writes, or notifications, design your nodes to be idempotent (e.g., use deduplication keys) or guard with external state checks.
Error Propagation and Unhandled Exceptions
When a node raises an exception that isn’t caught (and retries are exhausted or not configured), the graph execution stops immediately. The exception propagates up to the caller of invoke() or ainvoke(). Critically, the checkpoint from the previous superstep is preserved — so you have the last known good state.
This means you can inspect what went wrong, fix the issue (maybe update a prompt or config), and resume from that checkpoint. The failing node will re-execute with the same input state it had before.
config = {"configurable": {"thread_id": "task-99"}}
try:
result = await app.ainvoke({"query": "analyze market trends"}, config)
except Exception as e:
print(f"Graph failed: {e}")
# Inspect the last good checkpoint
snapshot = await app.aget_state(config)
print(f"Failed at node: {snapshot.next}") # which node was next
print(f"Last good state: {snapshot.values}") # state before failure
# After fixing the issue, resume
result = await app.ainvoke(None, config)
NodeInterrupt for Programmatic Interrupts
Sometimes you want to stop execution programmatically — to request human approval, collect additional input, or enforce a review gate. The NodeInterrupt exception is designed for exactly this. Unlike regular exceptions, it’s treated as an intentional pause, not a failure.
from langgraph.errors import NodeInterrupt
def approval_gate(state: AgentState) -> dict:
if state["total_cost"] > 100.0:
raise NodeInterrupt(
"Cost exceeds $100 limit. Approve to continue."
)
return state # under threshold, proceed automatically
# When interrupted, the graph pauses and checkpoints
# Resume after human approval:
await app.ainvoke(None, config) # continues from the interrupted node
NodeInterrupt gives you conditional interrupts — the node runs its logic and decides whether to pause based on the data. The compile-time interrupt_before and interrupt_after options always pause at a given node regardless of state. Use NodeInterrupt when the decision to pause depends on runtime values like cost, risk score, or content flags.
Debugging, Visualization, and LangSmith Integration
LangGraph graphs can become complex — multiple nodes, conditional edges, cycles. When something goes wrong (or even when things go right), you need clear ways to inspect what your graph looks like, what it's doing at each step, and where time and tokens are being spent. This section covers three layers of debugging: visual inspection, runtime debug output, and full production-grade tracing with LangSmith.
Visualizing Your Graph Structure
Every compiled LangGraph graph can render itself as a diagram. This is invaluable for verifying that your edges and conditional routing are wired correctly before you even run the graph. The simplest method generates a Mermaid-syntax string you can paste into any Mermaid renderer.
from langgraph.graph import StateGraph
# After building and compiling your graph
app = graph_builder.compile()
# Get the Mermaid diagram as a string
mermaid_str = app.get_graph().draw_mermaid()
print(mermaid_str)
For Jupyter notebooks, you can render a PNG image inline. This calls the Mermaid.ink API under the hood to produce a raster image directly in the notebook cell output.
from IPython.display import display, Image
# Render as PNG — great for Jupyter notebooks
png_bytes = app.get_graph().draw_mermaid_png()
display(Image(png_bytes))
# Or save to a file for documentation
with open("graph_diagram.png", "wb") as f:
f.write(png_bytes)
draw_mermaid_png() requires network access to the Mermaid.ink rendering service by default. For offline use, install pyppeteer or playwright and pass draw_method=MermaidDrawMethod.PYPPETEER to render locally.
Debug Mode and Stream Debugging
Visualization shows you the structure, but when you need to understand runtime behavior — which node executed, what the state looked like at each step, and why a conditional edge chose a particular path — you need debug-level output. LangGraph provides two complementary mechanisms.
Verbose Compile-Time Debugging
Pass debug=True when compiling the graph. This enables verbose logging that prints detailed information about every node invocation, state transition, and edge evaluation directly to the console.
# Enable verbose debug logging at compile time
app = graph_builder.compile(debug=True)
# Now every invocation prints detailed step info
result = app.invoke({"messages": [("user", "Hello!")]})
Debug Stream Mode
For more structured inspection, use stream_mode="debug". Instead of just printing logs, this yields detailed debug events as structured data you can programmatically inspect. Each event includes the node name, the input/output state, and timing information.
app = graph_builder.compile()
for event in app.stream(
{"messages": [("user", "What's the weather in NYC?")]},
stream_mode="debug",
):
# Each event is a dict with type, node, and payload
print(f"[{event['type']}] Node: {event.get('payload', {}).get('name', 'N/A')}")
if event["type"] == "task_result":
print(f" Result: {event['payload']['result']}")
Simple Console Debugging with a Print Callback
For quick-and-dirty debugging without changing stream modes, you can attach a simple callback that prints each step as it executes. This is the lightest-weight approach — useful during development when you just want to see the execution flow in your terminal.
def print_step(step: dict):
"""Simple callback to log each graph step."""
for node_name, output in step.items():
print(f"--- Node: {node_name} ---")
print(f" Output keys: {list(output.keys()) if isinstance(output, dict) else type(output)}")
# Use with stream to intercept each step
for step in app.stream({"messages": [("user", "Summarize this doc")]}):
print_step(step)
LangSmith Integration
Console debugging is fine during development, but in production you need persistent, searchable traces with full visibility into LLM calls, token usage, latencies, and tool invocations. LangSmith provides exactly this — and LangGraph integrates with it automatically. You don't instrument individual nodes; you set two environment variables and every graph execution is traced.
Enabling LangSmith Tracing
# Set these environment variables before running your application
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="lsv2_pt_your_api_key_here"
# Optional: specify a project name for organization
export LANGCHAIN_PROJECT="my-langgraph-agent"
Once these variables are set, every call to app.invoke() or app.stream() automatically sends trace data to LangSmith. In the LangSmith UI, you can see a hierarchical view of each run: the parent graph execution, each node that fired, every LLM call with its prompt and completion, every tool call with its arguments and return value, plus latency and token counts at each level.
Adding Custom Metadata and Tags
In production, you often need to filter traces by user, session, or feature flag. LangGraph lets you attach metadata and tags to any run via the config parameter. These appear as searchable fields in LangSmith, so you can quickly find all runs for a specific user or environment.
result = app.invoke(
{"messages": [("user", "Book a flight to London")]},
config={
"run_name": "travel-agent-booking",
"tags": ["production", "travel-agent", "v2.1"],
"metadata": {
"user_id": "usr_abc123",
"session_id": "sess_xyz789",
"environment": "production",
"feature_flags": ["new-routing-logic"],
},
},
)
In the LangSmith UI, you can then filter by tag (e.g., show all production runs) or search by metadata field (e.g., metadata.user_id = "usr_abc123"). This makes it straightforward to trace a specific user's session or compare runs across different versions of your graph.
LangGraph Studio — Interactive Visual Debugging
LangGraph Studio is a desktop application (available for macOS) that takes debugging to a different level entirely. Instead of reading logs or scanning trace timelines, you get an interactive visual environment where you can watch your graph execute node by node, inspect state at every step, and even modify state and replay from any point.
| Feature | What It Shows You |
|---|---|
| Graph Visualization | Live rendering of your graph structure with the currently active node highlighted |
| State Inspector | Full state object at each node — expand and drill into nested fields |
| Step-Through Execution | Pause at each node, inspect inputs/outputs, then continue |
| Modify & Replay | Edit state at any checkpoint and re-run the graph from that point |
| Thread History | Browse all conversation threads and their execution traces |
To use LangGraph Studio, you need a langgraph.json configuration file in your project root that tells the studio where to find your graph definition. The studio reads this file and launches a local server that serves your graph for interactive debugging.
{
"dependencies": ["."],
"graphs": {
"agent": "./agent.py:graph"
},
"env": ".env"
}
Open LangGraph Studio, point it at your project directory, and it auto-discovers the graph from this config. You can then send messages, watch nodes light up as they execute, click any node to inspect its input/output state, and use the thread panel to review previous executions. The modify-and-replay feature is particularly powerful for debugging conditional logic — you can manually set a state value and see which branch the graph takes.
Use these tools at the right stage: draw_mermaid() during graph construction to verify structure, debug=True during local development to trace execution, LangSmith in staging/production for persistent observability, and LangGraph Studio when you need to interactively diagnose a tricky bug.
Testing LangGraph Applications
Graph-based LLM applications have a unique testing challenge: non-deterministic output lives inside a deterministic execution structure. The good news is that LangGraph's architecture — where nodes are plain functions and the graph is a compiled, invokable object — lends itself naturally to testing at multiple levels of granularity.
A solid test suite for a LangGraph application covers five layers: unit tests for individual nodes, integration tests for the compiled graph, edge routing logic, checkpointer behavior, and strategies for taming LLM non-determinism.
Pytest Fixtures for Common Setup
Before diving into individual test types, set up reusable fixtures. These provide a deterministic LLM, a sample state, and a compiled graph you can share across your test suite.
import pytest
from langchain_core.language_models import FakeListChatModel
from langchain_core.messages import AIMessage, HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
next_action: str
@pytest.fixture
def fake_llm():
"""A deterministic LLM that returns responses in order."""
return FakeListChatModel(
responses=[
"I'll look that up for you.",
"The answer is 42.",
]
)
@pytest.fixture
def sample_state():
return {
"messages": [HumanMessage(content="What is the meaning of life?")],
"next_action": "",
}
@pytest.fixture
def compiled_graph(fake_llm):
"""Build and compile a minimal agent graph for testing."""
def chatbot(state: AgentState):
response = fake_llm.invoke(state["messages"])
return {"messages": [response]}
def router(state: AgentState):
last = state["messages"][-1].content
if "look that up" in last:
return {"next_action": "search"}
return {"next_action": "done"}
builder = StateGraph(AgentState)
builder.add_node("chatbot", chatbot)
builder.add_node("router", router)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", "router")
builder.add_edge("router", END)
return builder.compile()
Unit Testing Nodes
Nodes in LangGraph are plain functions that accept state and return a partial state update. This makes them trivially testable — no graph compilation needed. Pass in a mock state dict, call the function, and assert on the returned update.
# Define the node under test (your application code)
def classify_intent(state: AgentState):
"""Classify user intent based on the last message."""
last_msg = state["messages"][-1].content.lower()
if "weather" in last_msg:
return {"next_action": "weather_tool"}
elif "calculate" in last_msg:
return {"next_action": "calculator_tool"}
return {"next_action": "general_chat"}
# Tests — no graph needed, just call the function directly
class TestClassifyIntent:
def test_weather_intent(self):
state = {
"messages": [HumanMessage(content="What's the weather in Paris?")],
"next_action": "",
}
result = classify_intent(state)
assert result == {"next_action": "weather_tool"}
def test_calculator_intent(self):
state = {
"messages": [HumanMessage(content="Calculate 5 + 3")],
"next_action": "",
}
result = classify_intent(state)
assert result == {"next_action": "calculator_tool"}
def test_fallback_intent(self):
state = {
"messages": [HumanMessage(content="Tell me a joke")],
"next_action": "",
}
result = classify_intent(state)
assert result == {"next_action": "general_chat"}
Notice that the test never touches a graph or an LLM. The node is just a function — you pass in state, you get state back. This is the fastest feedback loop you have, so put as much logic here as possible.
Integration Testing the Compiled Graph
Once individual nodes work, test that the graph wires them together correctly. Invoke the compiled graph with an input and assert on the final state. Using a FakeListChatModel keeps the test deterministic and fast.
def test_full_graph_execution(compiled_graph, sample_state):
"""Invoke the graph end-to-end and verify the final state."""
result = compiled_graph.invoke(sample_state)
# Graph should have added messages from chatbot
assert len(result["messages"]) == 2
assert isinstance(result["messages"][-1], AIMessage)
# Router should have classified the action
assert result["next_action"] == "search"
def test_graph_produces_expected_message(compiled_graph):
"""Verify the actual content the graph produces."""
state = {
"messages": [HumanMessage(content="Hello")],
"next_action": "",
}
result = compiled_graph.invoke(state)
# FakeListChatModel returns first response in list
assert result["messages"][-1].content == "I'll look that up for you."
Using Deterministic LLMs with FakeListChatModel
The FakeListChatModel from langchain_core is your primary tool for eliminating LLM non-determinism in tests. It cycles through a list of predetermined string responses, one per invocation. This means you can script exact conversations.
from langchain_core.language_models import FakeListChatModel
from langchain_core.messages import HumanMessage
def test_fake_llm_cycles_responses():
llm = FakeListChatModel(responses=["First", "Second", "Third"])
r1 = llm.invoke([HumanMessage(content="Hi")])
assert r1.content == "First"
r2 = llm.invoke([HumanMessage(content="Hi again")])
assert r2.content == "Second"
r3 = llm.invoke([HumanMessage(content="One more")])
assert r3.content == "Third"
def test_fake_llm_with_structured_json():
"""Simulate an LLM that returns JSON for tool calls."""
llm = FakeListChatModel(
responses=['{"action": "search", "query": "LangGraph docs"}']
)
result = llm.invoke([HumanMessage(content="Find LangGraph docs")])
import json
parsed = json.loads(result.content)
assert parsed["action"] == "search"
assert "LangGraph" in parsed["query"]
The cleanest pattern is to accept the LLM as a parameter when building your graph (dependency injection). In your application code, pass the real model. In tests, pass FakeListChatModel. Avoid patching globals when you can inject instead.
Testing Conditional Edges
Conditional edges route execution based on a path function that reads the current state and returns a node name. Because the routing function is separate from the graph wiring, you can test it in isolation — just like a node.
# The routing function used with add_conditional_edges
def should_continue(state: AgentState) -> str:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return "end"
# Test the routing function directly — no graph needed
class TestConditionalEdges:
def test_routes_to_tools_when_tool_calls_present(self):
mock_msg = AIMessage(content="", tool_calls=[
{"name": "search", "args": {"q": "test"}, "id": "1"}
])
state = {"messages": [mock_msg], "next_action": ""}
assert should_continue(state) == "tools"
def test_routes_to_end_when_no_tool_calls(self):
mock_msg = AIMessage(content="Here is your answer.")
state = {"messages": [mock_msg], "next_action": ""}
assert should_continue(state) == "end"
def test_routes_to_end_with_empty_tool_calls(self):
mock_msg = AIMessage(content="Done.", tool_calls=[])
state = {"messages": [mock_msg], "next_action": ""}
assert should_continue(state) == "end"
Testing the path function directly is fast and exhaustive. You can cover every branch — including edge cases like empty tool call lists — without ever invoking the graph.
Testing with Checkpointers
Checkpointers enable the "resume from where you left off" pattern in LangGraph. Testing this requires a multi-step approach: invoke the graph, inspect the checkpoint, then re-invoke with the same thread ID and verify it resumes correctly.
from langgraph.checkpoint.memory import MemorySaver
@pytest.fixture
def graph_with_checkpointer(fake_llm):
"""Compile the graph with an in-memory checkpointer."""
def chatbot(state: AgentState):
response = fake_llm.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(AgentState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
memory = MemorySaver()
return builder.compile(checkpointer=memory)
def test_checkpoint_preserves_conversation(graph_with_checkpointer):
config = {"configurable": {"thread_id": "test-thread-1"}}
# First invocation
result1 = graph_with_checkpointer.invoke(
{"messages": [HumanMessage(content="Hi")], "next_action": ""},
config=config,
)
assert len(result1["messages"]) == 2 # human + AI
# Second invocation on same thread — state accumulates
result2 = graph_with_checkpointer.invoke(
{"messages": [HumanMessage(content="Follow up")],"next_action": ""},
config=config,
)
# Should have: original human, first AI, new human, second AI
assert len(result2["messages"]) == 4
def test_separate_threads_are_isolated(graph_with_checkpointer):
config_a = {"configurable": {"thread_id": "thread-a"}}
config_b = {"configurable": {"thread_id": "thread-b"}}
graph_with_checkpointer.invoke(
{"messages": [HumanMessage(content="Thread A msg")], "next_action": ""},
config=config_a,
)
result_b = graph_with_checkpointer.invoke(
{"messages": [HumanMessage(content="Thread B msg")], "next_action": ""},
config=config_b,
)
# Thread B should only have its own messages
assert len(result_b["messages"]) == 2
Testing Async Graphs with pytest-asyncio
If your graph uses async nodes (common when calling APIs or async tool executors), you need pytest-asyncio to test them. The key difference is using ainvoke instead of invoke.
import pytest_asyncio
import pytest
# Define an async node
async def async_chatbot(state: AgentState):
# In real code this might call an async API
return {"messages": [AIMessage(content="Async response")]}
@pytest_asyncio.fixture
async def async_graph():
builder = StateGraph(AgentState)
builder.add_node("chatbot", async_chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
return builder.compile()
@pytest.mark.asyncio
async def test_async_graph_invocation(async_graph):
result = await async_graph.ainvoke({
"messages": [HumanMessage(content="Hello async")],
"next_action": "",
})
assert result["messages"][-1].content == "Async response"
@pytest.mark.asyncio
async def test_async_graph_streaming(async_graph):
"""Test streaming output from an async graph."""
events = []
async for event in async_graph.astream_events(
{"messages": [HumanMessage(content="Stream test")], "next_action": ""},
version="v2",
):
events.append(event)
# Verify we received events from the chatbot node
node_events = [e for e in events if e.get("name") == "chatbot"]
assert len(node_events) > 0
Add asyncio_mode = "auto" to your pyproject.toml under [tool.pytest.ini_options] so you don't need the @pytest.mark.asyncio decorator on every test.
Snapshot Testing
Snapshot testing records the trace of a graph execution and compares it against future runs. This is especially useful for catching unintended regressions in graph routing or output format. Use pytest-snapshot or a simple JSON comparison.
import json
from pathlib import Path
SNAPSHOT_DIR = Path(__file__).parent / "snapshots"
def serialize_trace(result: dict) -> dict:
"""Convert graph result to a JSON-serializable snapshot."""
return {
"message_count": len(result["messages"]),
"message_types": [type(m).__name__ for m in result["messages"]],
"message_contents": [m.content for m in result["messages"]],
"next_action": result.get("next_action", ""),
}
def test_graph_snapshot(compiled_graph, sample_state):
result = compiled_graph.invoke(sample_state)
trace = serialize_trace(result)
snapshot_path = SNAPSHOT_DIR / "basic_invocation.json"
if not snapshot_path.exists():
# First run: create the snapshot
snapshot_path.parent.mkdir(parents=True, exist_ok=True)
snapshot_path.write_text(json.dumps(trace, indent=2))
pytest.skip("Snapshot created — re-run to validate.")
# Subsequent runs: compare against saved snapshot
saved = json.loads(snapshot_path.read_text())
assert trace == saved, (
f"Snapshot mismatch!\nExpected: {saved}\nGot: {trace}"
)
Run with --snapshot-update (or delete the snapshot file) when you intentionally change graph behavior. The key insight is to serialize only the structure you care about — message counts, types, and routing decisions — not raw LLM content that might vary.
Handling Non-Deterministic LLM Output
Even with FakeListChatModel covering most tests, you'll eventually want to test against a real LLM — for smoke tests, staging validation, or prompt regression testing. Real LLM output is inherently non-deterministic, so your assertions need to be flexible.
Strategy 1: Assert on Structure, Not Content
def test_real_llm_returns_valid_structure(real_graph):
"""Don't assert on exact text — assert on shape."""
result = real_graph.invoke({
"messages": [HumanMessage(content="Summarize quantum computing")],
"next_action": "",
})
last_msg = result["messages"][-1]
# Assert on structure, not content
assert isinstance(last_msg, AIMessage)
assert len(last_msg.content) > 50 # non-trivial response
assert result["next_action"] in ("done", "search", "general_chat")
Strategy 2: Use Structured Output Constraints
from pydantic import BaseModel
class AnalysisResult(BaseModel):
sentiment: str # "positive", "negative", "neutral"
confidence: float
summary: str
def test_structured_output_is_valid(real_llm):
"""When using structured output, validate the schema."""
structured_llm = real_llm.with_structured_output(AnalysisResult)
result = structured_llm.invoke("Analyze: I love this product!")
assert result.sentiment in ("positive", "negative", "neutral")
assert 0.0 <= result.confidence <= 1.0
assert len(result.summary) > 0
Strategy 3: Seed Parameters for Reproducibility
from langchain_openai import ChatOpenAI
@pytest.fixture
def seeded_llm():
"""Use seed parameter for near-deterministic output (OpenAI)."""
return ChatOpenAI(
model="gpt-4o",
temperature=0,
model_kwargs={"seed": 42},
)
def test_seeded_output_is_consistent(seeded_llm):
"""Same seed + same input ~ same output (not guaranteed, but close)."""
msg = [HumanMessage(content="What is 2+2? Reply with just the number.")]
r1 = seeded_llm.invoke(msg)
r2 = seeded_llm.invoke(msg)
# With seed and temperature=0, these should usually match
assert r1.content.strip() == r2.content.strip()
OpenAI's seed parameter makes output "mostly deterministic" — not fully. Model updates and infrastructure changes can still alter responses. Use seeds for reducing flakiness, not for exact-match assertions. Always prefer structural assertions for CI pipelines.
Testing Strategy Summary
| Test Level | What You Test | LLM Strategy | Speed |
|---|---|---|---|
| Unit (nodes) | Individual node logic | No LLM / mock state | Milliseconds |
| Unit (edges) | Routing path functions | No LLM / mock state | Milliseconds |
| Integration | Compiled graph end-to-end | FakeListChatModel | Fast (~100ms) |
| Checkpoint | State persistence & resume | FakeListChatModel | Fast (~100ms) |
| Snapshot | Regression in output shape | FakeListChatModel | Fast (~100ms) |
| Smoke (real LLM) | Prompt quality & structure | Real LLM + seed | Seconds |
Build your test pyramid with node and edge unit tests at the base (many, fast), integration tests with fake LLMs in the middle (moderate count), and a thin layer of real-LLM smoke tests at the top (few, slow, structural assertions only). This gives you confidence without slow, flaky CI runs.
Performance Optimization
A LangGraph application that works correctly can still be painfully slow or needlessly expensive. LLM calls dominate both latency and cost, and graph structure determines how much of that work happens sequentially versus in parallel. This section covers the concrete levers you have — caching, parallelism, async execution, smart model routing, state management, and profiling — to make your graphs fast and cost-effective.
Node-Level Caching with CachePolicy
Expensive operations like LLM calls and external API requests often produce the same output for the same input. LangGraph lets you attach a CachePolicy to individual nodes so that repeated invocations with identical input state are served from cache instead of re-executed. You configure a TTL (time-to-live) and optionally specify custom cache keys to control what constitutes a "cache hit."
from langgraph.graph import StateGraph
from langgraph.cache.policy import CachePolicy
builder = StateGraph(MyState)
# Cache the LLM research node for 5 minutes,
# keyed only on the "query" field of state
builder.add_node(
"research",
research_node,
cache_policy=CachePolicy(
ttl=300, # seconds
key_func=lambda state: state["query"],
),
)
# No caching for the routing node — it's cheap
builder.add_node("router", router_node)
The key_func parameter is critical. By default, the entire input state is hashed to produce a cache key. If your state contains timestamps or message IDs that change on every invocation, you'll get zero cache hits. Extract only the semantically meaningful fields — the query text, the document ID, or the user intent — to build a useful cache key.
Caching shines for deterministic or near-deterministic calls: embeddings, structured extraction with temperature=0, and external API lookups. For creative generation with high temperature, caching is counterproductive — you want different outputs each time.
Parallel Node Execution
LangGraph uses the concept of supersteps. Within a single superstep, all nodes that have their dependencies satisfied run concurrently. This happens automatically — you don't call a "parallel" API. Your job is to design the graph so that independent work fans out rather than chains sequentially.
Consider a research agent that needs to search the web, query a database, and fetch documents. If these three operations don't depend on each other's output, wire them as siblings from the same parent node rather than in a chain:
builder = StateGraph(ResearchState)
builder.add_node("plan", plan_node)
builder.add_node("web_search", web_search_node)
builder.add_node("db_query", db_query_node)
builder.add_node("doc_fetch", doc_fetch_node)
builder.add_node("synthesize", synthesize_node)
builder.add_edge(START, "plan")
# Fan out: all three run in the same superstep
builder.add_edge("plan", "web_search")
builder.add_edge("plan", "db_query")
builder.add_edge("plan", "doc_fetch")
# Fan in: synthesize waits for all three to complete
builder.add_edge("web_search", "synthesize")
builder.add_edge("db_query", "synthesize")
builder.add_edge("doc_fetch", "synthesize")
With this fan-out/fan-in pattern, the three data-fetching nodes execute in the same superstep. Total latency for that step equals the slowest of the three, not the sum. If each takes ~2 seconds, you save ~4 seconds per invocation compared to a sequential chain.
Async Execution
LLM calls and API requests are I/O-bound — your application spends most of its time waiting for network responses. Python's asyncio lets you overlap that waiting time. LangGraph supports this natively: define your node functions with async def and invoke your graph with ainvoke or astream.
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
# Async node function
async def research_node(state: ResearchState) -> dict:
response = await llm.ainvoke(state["messages"])
return {"messages": [response]}
# Async invocation — use ainvoke / astream
result = await graph.ainvoke({"messages": [user_msg]})
# Streaming async — tokens arrive as they're generated
async for event in graph.astream_events(
{"messages": [user_msg]}, version="v2"
):
print(event)
When parallel nodes are also async, you get the best of both worlds: LangGraph runs the concurrent superstep nodes as async tasks on the event loop, so network I/O from all of them overlaps efficiently without threads.
Reducing LLM Calls
The single most impactful optimization is calling the LLM fewer times and using cheaper models where possible. Here are four concrete techniques:
| Technique | How It Works | Typical Savings |
|---|---|---|
| Early exit conditions | Check state before calling the LLM — if the answer is already available or the request is invalid, short-circuit | Eliminates wasted calls entirely |
| Model tiering | Use a fast, cheap model (gpt-4o-mini) for routing, classification, and validation; reserve expensive models (gpt-4o) for final generation | 50–80% cost reduction on routing nodes |
| Result caching | Cache LLM responses with CachePolicy or an external cache (Redis) keyed on the prompt hash | 100% savings on repeated queries |
| Batch consolidation | Combine multiple small LLM calls into one structured prompt that returns all results at once | Reduces per-call overhead and latency |
Model tiering is especially powerful in multi-agent graphs. A router node that decides which specialist agent to invoke doesn't need GPT-4o — a fast, cheap model makes that classification just as accurately at a fraction of the cost and latency.
cheap_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
expensive_llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
async def router_node(state: AgentState) -> dict:
"""Cheap model decides which agent handles the task."""
route = await cheap_llm.ainvoke(
[SystemMessage(content="Classify this request as: research, code, or general.")]
+ state["messages"]
)
return {"next_agent": route.content.strip().lower()}
async def generation_node(state: AgentState) -> dict:
"""Expensive model produces the final answer."""
response = await expensive_llm.ainvoke(state["messages"])
return {"messages": [response]}
State Size Management
Every time a node executes, LangGraph reads and writes the current state. If your state contains large objects — full documents, images, raw API responses — this serialization overhead adds up fast. It also inflates checkpoint storage when you're using persistence.
The fix is straightforward: store references, not data. Keep a document ID or S3 URL in state, and fetch the actual content inside the node that needs it. For output, use output_schema to prevent large internal fields from leaking to the caller:
from typing import Annotated
from langgraph.graph import StateGraph
from operator import add
class InternalState(TypedDict):
messages: Annotated[list, add]
documents: list[str] # full document texts (large!)
doc_refs: list[str] # document IDs (small)
intermediate_results: dict # scratch space for nodes
class OutputState(TypedDict):
messages: Annotated[list, add]
doc_refs: list[str] # only references exposed
# output_schema prevents InternalState bloat from being returned
graph = StateGraph(InternalState, output=OutputState)
Message Trimming
Conversation history grows with every turn. Left unchecked, it will exceed your model's context window and cause errors — or simply waste tokens on irrelevant ancient messages. LangChain's trim_messages utility lets you prune the message list before sending it to the LLM, keeping only what fits within a token budget.
from langchain_core.messages import trim_messages
async def chatbot_node(state: ChatState) -> dict:
# Keep the system message + most recent messages within 4k tokens
trimmed = trim_messages(
state["messages"],
max_tokens=4000,
strategy="last", # keep most recent messages
token_counter=llm, # use the model's tokenizer
include_system=True, # always preserve system prompt
start_on="human", # ensure trimmed list starts on a human turn
)
response = await llm.ainvoke(trimmed)
return {"messages": [response]}
The strategy="last" setting drops the oldest messages first, which is the right default for most chatbots. The include_system=True flag ensures your system prompt is never trimmed away. Set start_on="human" to avoid starting mid-conversation on an assistant message, which can confuse the model.
For long-running agents, consider adding a summarization node that condenses older messages into a single summary message before trimming kicks in. This preserves context that pure truncation would lose. Store the summary as a SystemMessage so it persists through trimming.
Profiling with LangSmith
You can't optimize what you can't measure. LangSmith gives you a trace view of every graph execution, breaking it down by node with latency, token usage, and cost for each step. Enable it by setting two environment variables — no code changes needed:
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=lsv2_pt_your_key_here
Once tracing is active, every invoke or ainvoke call produces a trace in LangSmith. The trace waterfall shows you exactly which nodes are your bottlenecks. Look for:
- Longest bars — these nodes dominate your end-to-end latency. Prioritize caching or model-tiering for them.
- Sequential runs that could be parallel — if two nodes always appear one after the other but don't depend on each other's output, refactor the graph edges to run them concurrently.
- Repeated identical calls — the same LLM prompt appearing multiple times in one trace is a clear sign you need caching.
- High token counts — nodes consuming disproportionate tokens likely need message trimming or a smaller context window.
Always profile before optimizing. Adding caching to a node that takes 50ms while another node takes 8 seconds is wasted effort. LangSmith traces give you the data to focus on what actually matters.
Deployment with LangGraph Platform and Cloud
Building a LangGraph agent locally is only half the story. To serve it to users, integrate it with frontends, or run it at scale, you need a deployment layer. LangGraph Platform provides exactly that — a standardized way to package, serve, and manage your graphs as production HTTP services.
The platform has three layers: the LangGraph Server (an HTTP API wrapping your graphs), the LangGraph CLI (for local dev and building images), and LangGraph Cloud (managed hosting by LangChain). Together, they take you from langgraph dev on your laptop to a horizontally-scaled deployment behind a load balancer.
graph LR
Dev["Developer"] -->|langgraph CLI| Server["LangGraph Server"]
subgraph Platform["LangGraph Platform"]
Server --- Graphs["Graph Definitions"]
Server --- CP["PostgreSQL Checkpointer"]
Server --- Store["Shared Store"]
end
Server -->|REST API| SDK["Client SDKs\n(Python / JS)"]
Server -.->|Traces| LS["LangSmith\nObservability"]
The langgraph.json Configuration File
Every LangGraph Platform project starts with a langgraph.json file at the project root. This file tells the server where to find your graphs, what dependencies to install, and which environment variables to inject. Think of it as the manifest that turns a Python project into a deployable LangGraph application.
{
"dependencies": ["."],
"graphs": {
"chatbot": "./src/chatbot/graph.py:graph",
"researcher": "./src/researcher/graph.py:graph"
},
"env": ".env",
"python_version": "3.11",
"pip_config_file": "pip.conf",
"dockerfile_lines": []
}
The graphs field maps route names to Python module paths. The value "./src/chatbot/graph.py:graph" means "import the graph variable from src/chatbot/graph.py." Each entry becomes its own API endpoint on the server. The dependencies array lists pip-installable paths — ["."] installs your project's pyproject.toml or setup.py.
The env field points to a .env file loaded at server startup. Never commit this file to version control — it typically contains OPENAI_API_KEY, LANGSMITH_API_KEY, and database connection strings. For cloud deployments, you set these through the LangSmith UI or CLI instead.
LangGraph CLI: Local Development and Builds
The CLI is how you interact with the platform locally. It runs your graphs in a development server with hot-reload, and it builds Docker images for production deployment. Install it with pip install langgraph-cli (include the [inmem] extra for local dev to use an in-memory checkpointer instead of requiring PostgreSQL).
# Install the CLI with in-memory support for local dev
pip install "langgraph-cli[inmem]"
# Start a local dev server (hot-reload enabled)
langgraph dev
# Build a production Docker image
langgraph build -t my-agent:latest
langgraph dev starts a local LangGraph Server at http://localhost:2024 with all graphs defined in langgraph.json. It watches for file changes and reloads automatically. The langgraph build command packages everything into a Docker image that runs the LangGraph Server — identical to what you'd deploy in production.
The REST API
The LangGraph Server exposes a REST API that lets any client — a React frontend, a mobile app, a cron job — interact with your graphs. The API is organized around three core concepts: assistants (your graph definitions), threads (conversation sessions with persisted state), and runs (individual graph executions).
| Endpoint | Method | Purpose |
|---|---|---|
/assistants | GET | List available graphs (assistants) |
/threads | POST | Create a new conversation thread |
/threads/{id}/runs | POST | Execute a graph run on a thread |
/threads/{id}/runs/stream | POST | Stream graph execution via SSE |
/threads/{id}/state | GET | Retrieve current thread state |
/threads/{id}/state | POST | Update thread state (human-in-the-loop) |
/threads/{id}/history | GET | Get full state checkpoint history |
Here's a typical interaction flow: create a thread, post a run with user input, and stream the response back. The streaming endpoint uses Server-Sent Events (SSE), so the client receives tokens as they're generated.
# Create a thread
curl -X POST http://localhost:2024/threads \
-H "Content-Type: application/json" \
-d '{}'
# Run a graph on that thread (streaming)
curl -X POST http://localhost:2024/threads/<thread_id>/runs/stream \
-H "Content-Type: application/json" \
-d '{
"assistant_id": "chatbot",
"input": {"messages": [{"role": "user", "content": "Explain LangGraph"}]},
"stream_mode": ["events"]
}'
Python and JavaScript SDKs
While the REST API works with any HTTP client, the official SDKs provide a much cleaner developer experience. They handle thread management, streaming deserialization, and authentication automatically. You use the same SDK whether you're talking to a local dev server or a cloud deployment — only the URL changes.
from langgraph_sdk import get_client
client = get_client(url="http://localhost:2024")
# Create a thread
thread = await client.threads.create()
# Stream a run
async for event in client.runs.stream(
thread_id=thread["thread_id"],
assistant_id="chatbot",
input={"messages": [{"role": "user", "content": "What is LangGraph?"}]},
stream_mode="events",
):
print(event.data)
# Get the thread's current state
state = await client.threads.get_state(thread["thread_id"])
print(state["values"]["messages"][-1])
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: "http://localhost:2024" });
// Create a thread
const thread = await client.threads.create();
// Stream a run
const stream = client.runs.stream(
thread.thread_id,
"chatbot",
{ input: { messages: [{ role: "user", content: "What is LangGraph?" }] },
streamMode: "events" }
);
for await (const event of stream) {
console.log(event.data);
}
Deployment Options
LangGraph Platform supports three deployment models, each suited to different organizational needs. The graph code and configuration remain identical across all three — you choose your deployment target based on operational requirements, not code changes.
| Option | How It Works | Best For |
|---|---|---|
| Self-Hosted (Docker) | Run langgraph build to produce a Docker image. Deploy it to your own infrastructure (ECS, Kubernetes, GCP Cloud Run). You manage the PostgreSQL database, scaling, and networking. | Teams with strict data residency, existing infra, or cost optimization needs. |
| LangGraph Cloud | Push your code to a GitHub repo connected via the LangSmith UI. LangGraph Cloud builds, deploys, and scales automatically. Includes built-in PostgreSQL, monitoring, and auto-scaling. | Fast iteration, teams without dedicated DevOps, or prototyping. |
| Bring Your Own Cloud (BYOC) | LangGraph's managed control plane provisions infrastructure inside your AWS/GCP account. You keep data sovereignty while LangGraph handles orchestration. | Enterprises needing managed operations with data in their own VPC. |
Scaling with PostgreSQL
Horizontal scaling works because the LangGraph Server is stateless — all state lives in the PostgreSQL checkpointer. You can run multiple server instances behind a load balancer and they'll all read and write the same threads, checkpoints, and store data. This is the same pattern used by web frameworks like Django or Rails.
# docker-compose.yml — Multi-instance LangGraph with shared Postgres
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: langgraph
POSTGRES_USER: langgraph
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- "5432:5432"
langgraph-server:
image: my-agent:latest
depends_on:
- postgres
environment:
DATABASE_URI: postgres://langgraph:${POSTGRES_PASSWORD}@postgres:5432/langgraph
deploy:
replicas: 3
ports:
- "2024:2024"
volumes:
pgdata:
Each replica connects to the same PostgreSQL instance. A request to create a thread on replica 1 produces state that's immediately visible on replica 2. For production, use a managed PostgreSQL service (RDS, Cloud SQL, Supabase) with connection pooling enabled.
Set the LANGSMITH_API_KEY environment variable on your deployed server to automatically send all traces to LangSmith. Every graph run, tool call, and LLM invocation will appear in the LangSmith dashboard — no code changes required.
Authentication and CORS
The LangGraph Server supports custom authentication middleware and CORS configuration for production deployments. You define an authentication handler in your project and reference it in langgraph.json. This handler runs before every request and can validate API keys, JWT tokens, or any custom scheme.
# src/auth.py
from langgraph_sdk import Auth
auth = Auth()
@auth.authenticate
async def authenticate(headers: dict) -> str:
api_key = headers.get("x-api-key")
if api_key != expected_key:
raise Auth.exceptions.HTTPException(status_code=401, detail="Invalid API key")
return api_key # returned value is available as `user` in handlers
{
"dependencies": ["."],
"graphs": {
"chatbot": "./src/chatbot/graph.py:graph"
},
"auth": {
"path": "./src/auth.py:auth"
},
"http": {
"cors": {
"allow_origins": ["https://myapp.com"],
"allow_methods": ["GET", "POST"],
"allow_headers": ["x-api-key"]
}
},
"env": ".env"
}
Complete Deployment Workflow
Here's the end-to-end process from local development to production deployment. This workflow applies whether you're deploying to your own Kubernetes cluster or using LangGraph Cloud.
-
Develop locally with
langgraph devWrite your graph code and iterate with the hot-reloading dev server. The server uses an in-memory checkpointer by default, so no database setup is needed.
bashcd my-agent-project langgraph dev -
Test your graphs via the SDK
Write integration tests against the local server. This validates both your graph logic and the API contract your clients will depend on.
pythonasync def test_chatbot(): client = get_client(url="http://localhost:2024") thread = await client.threads.create() result = await client.runs.wait( thread["thread_id"], assistant_id="chatbot", input={"messages": [{"role": "user", "content": "Hello"}]}, ) assert len(result["messages"]) > 1 -
Build the Docker image
The
langgraph buildcommand readslanggraph.json, installs dependencies, and packages everything into a production-ready Docker image.bashlanggraph build -t my-agent:v1.0.0 # Test the image locally with a real Postgres docker compose up -
Push and deploy
Push the image to your container registry and deploy to your platform of choice. Set the
DATABASE_URIenvironment variable to point at your production PostgreSQL instance.bash# Push to a container registry docker tag my-agent:v1.0.0 registry.example.com/my-agent:v1.0.0 docker push registry.example.com/my-agent:v1.0.0 # Deploy (example: AWS ECS, GCP Cloud Run, Kubernetes, etc.) kubectl set image deployment/my-agent my-agent=registry.example.com/my-agent:v1.0.0
The self-hosted Docker image requires a LangGraph Platform license (included with LangSmith paid plans). The image validates this via the LANGGRAPH_API_KEY environment variable at startup. Without it, the server will refuse to start. If you need an open-source alternative, you can run your CompiledGraph directly behind FastAPI, but you'll lose the threads/runs API, built-in persistence management, and the SDK compatibility.
Migrating from LangChain AgentExecutor to LangGraph
If you've been building agents with LangChain's AgentExecutor, you've likely hit its ceiling — limited control over execution flow, no built-in persistence, and awkward workarounds for anything beyond simple ReAct loops. LangChain itself now recommends LangGraph as the successor for all non-trivial agent use cases. AgentExecutor remains available but is effectively in maintenance mode.
The good news: the migration is straightforward. Every concept in AgentExecutor maps directly to a LangGraph primitive, and you can start with the high-level create_react_agent helper before gradually adding custom control flow.
Concept Mapping: AgentExecutor → LangGraph
Understanding the 1:1 mapping between the two frameworks makes migration mechanical rather than creative. Here's how each piece translates:
| AgentExecutor Concept | LangGraph Equivalent | Notes |
|---|---|---|
agent (prompt + LLM + tools) | Agent node (a function calling the LLM) | You define the node function; it calls model.bind_tools() |
| Tool execution loop | Tool node (ToolNode) | Prebuilt ToolNode handles tool dispatch automatically |
| Internal while-loop | Cycle via conditional edges | A conditional edge checks for tool calls and routes back or ends |
max_iterations | recursion_limit in config | Passed at invocation: {"recursion_limit": 25} |
early_stopping_method | Custom logic in conditional edge | You control exactly what happens when the limit is reached |
return_intermediate_steps | Full state is always accessible | Every message (LLM + tool) lives in the state's message list |
| Custom output parser | Post-processing node or state transform | Add a final node that reshapes output before returning |
Side-by-Side: The Same Agent, Two Frameworks
Let's build the same tool-calling agent — one that can search the web — using both approaches. This makes the structural differences concrete.
The AgentExecutor Way (Legacy)
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.tools.tavily_search import TavilySearchResults
llm = ChatOpenAI(model="gpt-4o")
tools = [TavilySearchResults(max_results=1)]
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("placeholder", "{messages}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(
agent=agent,
tools=tools,
max_iterations=10,
verbose=True,
)
result = executor.invoke({"messages": [("human", "What is the weather in SF?")]})
This works, but everything happens inside the executor's opaque loop. You can't inject logic between the LLM call and tool execution, add approval steps, or persist state across sessions.
The LangGraph Way
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
llm = ChatOpenAI(model="gpt-4o")
tools = [TavilySearchResults(max_results=1)]
model_with_tools = llm.bind_tools(tools)
def agent_node(state: MessagesState):
response = model_with_tools.invoke(state["messages"])
return {"messages": [response]}
tool_node = ToolNode(tools=tools)
graph = StateGraph(MessagesState)
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", tools_condition)
graph.add_edge("tools", "agent")
app = graph.compile()
result = app.invoke(
{"messages": [("human", "What is the weather in SF?")]},
config={"recursion_limit": 25},
)
More lines, but every piece is explicit. The agent_node calls the LLM. The tool_node executes tools. The conditional edge (tools_condition) checks whether the LLM response contains tool calls — if yes, route to "tools"; if no, route to END. This is the same ReAct loop, but now you own every transition.
The Shortcut: create_react_agent
If you want LangGraph's benefits without manually wiring nodes and edges, the create_react_agent prebuilt helper gives you the same graph in a single call. This is the closest equivalent to AgentExecutor's simplicity.
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
llm = ChatOpenAI(model="gpt-4o")
tools = [TavilySearchResults(max_results=1)]
# One line — creates the same agent + tools + conditional edge graph
app = create_react_agent(llm, tools)
result = app.invoke(
{"messages": [("human", "What is the weather in SF?")]},
config={"recursion_limit": 25},
)
Use create_react_agent as your starting point. When you need custom routing, human-in-the-loop, or multi-agent composition, switch to the manual StateGraph approach — you already understand the underlying structure.
What You Gain After Migration
The migration isn't just a framework swap — it unlocks capabilities that were impossible or hacky with AgentExecutor:
- Human-in-the-loop: Use
interrupt_beforeorinterrupt_afteron any node to pause execution and wait for human approval before continuing. - Custom routing: Conditional edges let you route to different nodes based on tool call types, message content, or any state attribute — not just "has tool calls or not."
- Multi-agent composition: Compose multiple agents as subgraphs within a parent graph. Each agent is a node with its own internal loop.
- Streaming control: Stream tokens from the LLM node, tool execution events, or full state updates at each step — choose the granularity you need.
- State persistence: Add a checkpointer (e.g.,
SqliteSaver,PostgresSaver) and the graph automatically saves/restores state across sessions using thread IDs.
Migration Steps
-
Audit your current AgentExecutor setup
Identify your LLM, tools, prompt template, and any custom configuration like
max_iterations,early_stopping_method,return_intermediate_steps, orhandle_parsing_errors. List any custom output parsers or callbacks you rely on. -
Define your state schema
For most agents,
MessagesState(a prebuilt schema with a singlemessageskey) is sufficient. If you tracked extra fields, extend it:pythonfrom langgraph.graph import MessagesState class AgentState(MessagesState): # Add any custom fields your agent tracked iteration_count: int final_answer: str -
Create nodes for agent reasoning and tool execution
Your agent node wraps the LLM call. The tool node can use the prebuilt
ToolNodeor a custom function if you need pre/post-processing around tool calls.pythonfrom langchain_core.messages import SystemMessage def agent_node(state: AgentState): sys_msg = SystemMessage(content="You are a helpful assistant.") response = model_with_tools.invoke([sys_msg] + state["messages"]) return {"messages": [response]} # For custom output parsing, add a post-processing node: def format_output(state: AgentState): last_msg = state["messages"][-1] return {"final_answer": last_msg.content} -
Wire edges, add conditional routing, and compile
Connect the nodes into a graph. The conditional edge replaces
AgentExecutor's internal loop logic. If you hadearly_stopping_method, implement that logic in a custom condition function.pythonfrom langgraph.graph import StateGraph, START, END from langgraph.prebuilt import ToolNode, tools_condition graph = StateGraph(AgentState) graph.add_node("agent", agent_node) graph.add_node("tools", ToolNode(tools=tools)) graph.add_node("format", format_output) graph.add_edge(START, "agent") graph.add_conditional_edges("agent", tools_condition) graph.add_edge("tools", "agent") app = graph.compile() -
Test that behavior matches your original agent
Run the same prompts through both implementations and compare outputs. Pay special attention to edge cases: tool errors, multi-step reasoning, and hitting the iteration/recursion limit. Use
verbose=Trueon the old agent andapp.stream()on the new one to compare step-by-step execution.python# Stream events to inspect each step for event in app.stream( {"messages": [("human", "What is the weather in SF?")]}, config={"recursion_limit": 25}, ): for node_name, output in event.items(): print(f"--- {node_name} ---") print(output["messages"][-1].pretty_print())
Handling max_iterations and Early Stopping
In AgentExecutor, max_iterations capped the loop. In LangGraph, you pass recursion_limit in the config. Note that the recursion limit counts every node invocation (agent + tool), so set it to roughly 2 × max_iterations + 1 to match equivalent behavior.
# AgentExecutor: max_iterations=10
# LangGraph equivalent: each iteration = agent call + tool call = 2 steps
result = app.invoke(
{"messages": [("human", "Complex multi-step question")]},
config={"recursion_limit": 21}, # 2 * 10 + 1
)
# For custom early stopping (e.g., stop if agent says "I don't know"):
def custom_should_continue(state: AgentState):
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
# Custom early stopping logic replaces early_stopping_method
if "I don't know" in last_message.content:
return "early_exit"
return END
graph.add_conditional_edges(
"agent",
custom_should_continue,
{"tools": "tools", "early_exit": "format", END: END},
)
Unlike AgentExecutor which silently stops at max_iterations, LangGraph raises a GraphRecursionError when the limit is hit. Wrap your invoke() call in a try/except if you want graceful degradation, or set the limit high enough that it only triggers on genuine infinite loops.
You don't need to migrate all agents at once. AgentExecutor and LangGraph agents use the same tool and LLM interfaces. You can run them side-by-side, migrate one agent at a time, and only invest in custom graph topology when a specific agent needs it.
Real-World Patterns, Pitfalls, and Best Practices
Once you understand LangGraph's primitives — nodes, edges, state, and conditional routing — the next question is: how do production teams actually compose them? This section distills four battle-tested graph patterns, the most common ways projects go wrong, and a set of practices that keep LangGraph codebases maintainable as they grow.
Pattern 1 — Plan-and-Execute
Plan-and-Execute separates thinking from doing. A planner node produces a numbered list of steps, an executor carries out one step at a time, and an observer inspects the result. After each step the re-planner can revise remaining steps, add new ones, or declare the task complete. This separation means the planner LLM can be a powerful reasoning model (e.g. GPT-4o) while the executor can be a cheaper, faster model or even a deterministic function.
graph LR
S([START]) --> planner["🧠 Planner\n(creates step list)"]
planner --> executor["⚙️ Executor\n(runs current step)"]
executor --> observer["🔍 Observer\n(check results)"]
observer --> replanner["📝 Re-planner\n(update plan)"]
replanner -->|"steps remaining"| executor
replanner -->|"plan complete"| E([END])
The state carries both the full plan and a pointer to the current step. After execution, the observer writes a status (success, partial, failed) that the re-planner uses to decide whether to revise, skip, or finish.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
class PlanExecuteState(TypedDict):
objective: str
plan: list[str] # remaining steps
current_step: str
step_result: str
final_answer: str
def planner(state: PlanExecuteState) -> dict:
# LLM call: break objective into numbered steps
steps = plan_with_llm(state["objective"])
return {"plan": steps, "current_step": steps[0]}
def executor(state: PlanExecuteState) -> dict:
result = execute_step(state["current_step"])
return {"step_result": result}
def replanner(state: PlanExecuteState) -> dict:
# LLM call: revise remaining plan given the result
revised = replan_with_llm(state["plan"], state["step_result"])
if not revised:
return {"final_answer": state["step_result"], "plan": []}
return {"plan": revised, "current_step": revised[0]}
def should_continue(state: PlanExecuteState) -> str:
return "executor" if state["plan"] else END
graph = StateGraph(PlanExecuteState)
graph.add_node("planner", planner)
graph.add_node("executor", executor)
graph.add_node("replanner", replanner)
graph.add_edge(START, "planner")
graph.add_edge("planner", "executor")
graph.add_edge("executor", "replanner")
graph.add_conditional_edges("replanner", should_continue)
app = graph.compile()
The key insight is that re-planning after every step makes the agent self-correcting. If step 2 returns unexpected data, the re-planner can insert a new step 2a to handle it — something a static chain can never do.
Pattern 2 — Reflection / Self-Critique
In the reflection pattern, the agent generates an initial output and then a separate critic node evaluates it against quality criteria (accuracy, completeness, tone). The critic either approves the output — sending it to END — or returns structured feedback that routes back to the generator for another pass. This loop usually converges in 1-3 iterations.
class ReflectionState(TypedDict):
request: str
draft: str
critique: str
is_approved: bool
iteration: int
def generator(state: ReflectionState) -> dict:
draft = generate_with_llm(state["request"], state.get("critique", ""))
return {"draft": draft, "iteration": state.get("iteration", 0) + 1}
def critic(state: ReflectionState) -> dict:
evaluation = critique_with_llm(state["draft"], state["request"])
return {"critique": evaluation["feedback"],
"is_approved": evaluation["score"] >= 0.8}
def route_after_critique(state: ReflectionState) -> str:
if state["is_approved"] or state["iteration"] >= 3:
return END
return "generator"
Notice the iteration guard — even if the critic never approves, the loop exits after 3 passes. This is a pattern you should apply to every cycle in your graph. Relying solely on recursion_limit as your only safety net is a code smell.
Pattern 3 — Multi-Turn RAG
Basic RAG retrieves once and generates. Multi-turn RAG lets the agent decide whether the retrieved context is sufficient or whether it needs to refine the query and retrieve again. This is especially useful for ambiguous questions or large knowledge bases where the first retrieval rarely returns everything needed.
def retriever(state: RAGState) -> dict:
docs = vector_store.similarity_search(state["current_query"], k=5)
# Append to accumulated context — don't replace it
return {"context": state.get("context", []) + docs,
"retrieval_count": state.get("retrieval_count", 0) + 1}
def grader(state: RAGState) -> str:
"""Decide: answer with what we have, or refine and retrieve again."""
if state["retrieval_count"] >= 3:
return "generate" # hard cap — answer with best-effort context
has_enough = grade_context(state["context"], state["question"])
return "generate" if has_enough else "rewrite_query"
def query_rewriter(state: RAGState) -> dict:
refined = rewrite_with_llm(state["question"], state["context"])
return {"current_query": refined}
The graph flows retriever → grader → (generate | rewrite_query → retriever). The accumulated context grows with each pass, and a hard cap on retrieval_count prevents runaway loops. This pattern dramatically improves answer quality for complex, multi-faceted questions.
Pattern 4 — Parallel Tool Execution
When an LLM returns multiple tool calls in a single response, you can fan them out to run simultaneously using LangGraph's Send API. Each tool call spawns its own execution, and results are collected back into the parent state before the next LLM turn. This cuts latency proportionally to the number of concurrent calls.
from langgraph.constants import Send
def route_tools(state: AgentState) -> list[Send]:
"""Fan out: one Send per tool call in the last AI message."""
last_msg = state["messages"][-1]
return [
Send("execute_tool", {"tool_call": tc})
for tc in last_msg.tool_calls
]
def execute_tool(state: dict) -> dict:
tc = state["tool_call"]
result = tool_registry[tc["name"]].invoke(tc["args"])
return {"messages": [ToolMessage(content=result, tool_call_id=tc["id"])]}
graph.add_conditional_edges("agent", route_tools)
If the agent asks for a weather lookup, a database query, and a calendar check all at once, all three run in parallel. The messages reducer (typically add_messages) merges the ToolMessage results back into the conversation.
Common Pitfalls
Knowing the patterns is half the battle. The other half is knowing what goes wrong. These are the issues that show up repeatedly in production LangGraph projects.
| Pitfall | What Happens | How to Prevent It |
|---|---|---|
| Infinite loops | A conditional edge never resolves to END, the graph spins until it hits the default recursion limit (25) and throws an error. | Always set an explicit recursion_limit when compiling. Add iteration counters inside state for loops you control. |
| State explosion | Stuffing full documents, entire chat histories, or intermediate reasoning chains into state. Checkpointing slows to a crawl; memory usage spikes. | Keep state lean. Store references (doc IDs) instead of full content. Trim message history with a sliding window or summarization node. |
| Over-engineering | Building a 12-node graph for what is really a linear prompt → retrieve → generate chain. | If your flow has no conditional branching or cycles, you don't need LangGraph. A simple chain or create_react_agent will do. |
| Ignoring streaming | Users stare at a blank screen for 10+ seconds while the graph runs end-to-end before returning anything. | Use astream_events or stream with stream_mode="messages" to push token-level updates as they happen. |
The default recursion_limit of 25 may be too high for your use case (burning tokens in a loop) or too low (complex multi-step plans). Always set it explicitly: graph.compile(checkpointer=memory, recursion_limit=15). Treat it like a timeout — pick a value that matches your worst-case expected depth.
Best Practices
Keep nodes small and focused
Each node should do exactly one thing: call an LLM, run a tool, transform state, or make a routing decision. When a node starts doing two of these, split it. Small nodes are easier to test, easier to retry on failure, and easier to reuse across graphs.
Use meaningful state keys
Name state fields for what they represent, not how they're used. retrieved_documents beats docs. plan_steps_remaining beats steps. When you come back to a graph in three months, clear names are the best documentation you have.
Handle errors at the node level
Wrap LLM calls and tool invocations in try/except blocks inside the node. Write error information into state (e.g., last_error) so downstream conditional edges can route to a fallback path instead of crashing the entire graph.
Test routing logic separately
Conditional edge functions are pure functions of state — they take a state dict and return a string. Test them with plain unit tests and synthetic state dicts. Don't rely on end-to-end graph runs to verify routing.
# Unit test for routing — no LLM needed
def test_should_continue_with_remaining_steps():
state = {"plan": ["step2", "step3"], "final_answer": ""}
assert should_continue(state) == "executor"
def test_should_continue_when_plan_empty():
state = {"plan": [], "final_answer": "done"}
assert should_continue(state) == "__end__"
Use subgraphs for team boundaries
When multiple teams contribute to the same application, give each team a subgraph with a well-defined input/output state schema. The parent graph calls into each subgraph as a single node. This keeps ownership clear and lets teams iterate independently without merge conflicts in one massive graph definition.
Prefer create_react_agent first
Before building a custom graph, ask: "Would a standard ReAct loop with tools solve this?" LangGraph ships create_react_agent, which wires up the agent → tool → agent loop for you. Reach for a custom StateGraph only when you need custom state, conditional routing, human-in-the-loop checkpoints, or multi-agent coordination.
About 80% of agent use cases work fine with create_react_agent plus a good set of tools. The remaining 20% — multi-agent orchestration, plan-and-execute, human approval gates — is where custom StateGraph definitions shine. Start simple and promote to a full graph only when you hit a limitation.