This lesson focuses on State & Persistence at the intermediate level. Use it to move from definition to implementation-ready explanation.
Concept
LangGraph state uses explicit reducer-driven schemas. Annotated types attach reducers controlling merge behavior. Checkpoints are stored per super-step AND per task - enabling pending writes recovery: if node B fails, node A’s successful write is durable and won’t re-run on resume. Stores provide cross-thread memory; use InMemoryStore only for local development, and use a durable store such as AsyncPostgresStore for production.
Key Facts
- Reducer: function(old_value, new_value) returning merged_value
- operator.add: appends lists; use numeric reducers for counters and add_messages for chat
- Pending writes: per-task durability prevents duplicate side effects on retry
- AsyncPostgresStore/Saver: durable production store and checkpointer
- Checkpointer tables include checkpoints, checkpoint_writes, and checkpoint_blobs
- graph.update_state(config, updates): inject state from outside the running graph
Reference Implementation
from langgraph.store.memory import InMemoryStore
from typing import TypedDict, Annotated, List
def keep_last_10(old: List, new: List) -> List:
return (old + new)[-10:]
def add_int(old: int, new: int) -> int:
return old + new
class AgentState(TypedDict):
messages: Annotated[List, keep_last_10] # rolling window
tool_calls_made: Annotated[int, add_int] # nodes return integers, not lists
final_answer: str # last-write-wins
# Local development Store: cross-thread memory, lost when process exits.
store = InMemoryStore()
store.put(("users", "praveen"), "prefs",
{"lang": "Python", "level": "advanced"})
prefs = store.get(("users", "praveen"), "prefs")
print(prefs.value) # {"lang": "Python", "level": "advanced"}
# Compile with both layers
# app = graph.compile(checkpointer=checkpointer, store=store)
Production Persistence Shape
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.store.postgres import AsyncPostgresStore
async with (
AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer,
AsyncPostgresStore.from_conn_string(DB_URI) as store,
):
# Run setup/migrations in deployment, not per request.
# await checkpointer.setup()
# await store.setup()
app = graph.compile(checkpointer=checkpointer, store=store)
config = {
"configurable": {
"thread_id": "tenant-a:user-42:chat-7",
"checkpoint_ns": "support-agent",
}
}
Postgres checkpointers persist checkpoint rows plus per-task writes in checkpoint_writes, which is why successful sibling nodes do not need to rerun after one parallel branch fails. Use checkpoint_ns to separate graph versions, subgraphs, or assistants that share a thread ID.
Interview Q&A
Q1. What is the difference between a checkpointer and a Store?
A checkpointer saves graph state per thread_id - conversation memory within a session. A Store is a key-value store for cross-thread persistent memory - data that survives across multiple conversations. Use Store for user profiles, long-term preferences, or accumulated knowledge. Compile with both: graph.compile(checkpointer=…, store=…).
Q2. How does pending writes recovery work?
Within a super-step, LangGraph writes each node’s output to a checkpoint_writes table as a task entry. If node B fails, node A’s writes are already durable. On resume, A does not re-run - only B retries. This prevents duplicate side effects like sending an email twice from successful nodes.
Q3. How do you implement a rolling message window to control context length?
Define a custom reducer: def keep_last_n(old, new): return (old + new)[-20:]. Use Annotated[List, keep_last_n] in your TypedDict. This trims state before the next node runs. For production, also consider token-based trimming using LangChain’s trim_messages() utility to stay within model context limits.
Q4. Why can operator.add break counters?
operator.add works only if old and new have compatible types. A counter annotated as int must receive integer updates like tool_calls_made = 1. Returning a list update for that counter creates an int/list TypeError. A named add_int reducer makes that contract obvious.
Q5. What do checkpoint_ns and checkpoint_writes solve?
checkpoint_ns separates histories inside the same thread, often by graph version, assistant, or subgraph. checkpoint_writes records each task’s writes inside a super-step, so a failed parallel branch can resume without rerunning successful sibling branches and duplicating side effects.
Practice Task
Explain when this LangGraph pattern is safer than a linear chain, then name one production failure it prevents.