LangGraph / Intermediate Track Module 3 / 10
LangGraph Intermediate ⏱ 28 min
DEV

State & Persistence: Intermediate

Checkpoints & long-running agents

How to Use This Lesson

  • Start with the user problem, then map the pattern to architecture and failure modes.
  • If a code or design example is included, change one assumption and reason through the impact.
  • Use role callouts, checklists, and Q&A sections as implementation or interview prep notes.

This lesson focuses on State & Persistence at the intermediate level. Use it to move from definition to implementation-ready explanation.

Concept

LangGraph state uses explicit reducer-driven schemas. Annotated types attach reducers controlling merge behavior. Checkpoints are stored per super-step AND per task - enabling pending writes recovery: if node B fails, node A’s successful write is durable and won’t re-run on resume. Stores provide cross-thread memory; use InMemoryStore only for local development, and use a durable store such as AsyncPostgresStore for production.

Key Facts

  • Reducer: function(old_value, new_value) returning merged_value
  • operator.add: appends lists; use numeric reducers for counters and add_messages for chat
  • Pending writes: per-task durability prevents duplicate side effects on retry
  • AsyncPostgresStore/Saver: durable production store and checkpointer
  • Checkpointer tables include checkpoints, checkpoint_writes, and checkpoint_blobs
  • graph.update_state(config, updates): inject state from outside the running graph

Reference Implementation

from langgraph.store.memory import InMemoryStore
from typing import TypedDict, Annotated, List

def keep_last_10(old: List, new: List) -> List:
    return (old + new)[-10:]

def add_int(old: int, new: int) -> int:
    return old + new

class AgentState(TypedDict):
    messages: Annotated[List, keep_last_10]       # rolling window
    tool_calls_made: Annotated[int, add_int]       # nodes return integers, not lists
    final_answer: str                              # last-write-wins

# Local development Store: cross-thread memory, lost when process exits.
store = InMemoryStore()
store.put(("users", "praveen"), "prefs",
    {"lang": "Python", "level": "advanced"})
prefs = store.get(("users", "praveen"), "prefs")
print(prefs.value)  # {"lang": "Python", "level": "advanced"}

# Compile with both layers
# app = graph.compile(checkpointer=checkpointer, store=store)

Production Persistence Shape

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.store.postgres import AsyncPostgresStore

async with (
    AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer,
    AsyncPostgresStore.from_conn_string(DB_URI) as store,
):
    # Run setup/migrations in deployment, not per request.
    # await checkpointer.setup()
    # await store.setup()
    app = graph.compile(checkpointer=checkpointer, store=store)

config = {
    "configurable": {
        "thread_id": "tenant-a:user-42:chat-7",
        "checkpoint_ns": "support-agent",
    }
}

Postgres checkpointers persist checkpoint rows plus per-task writes in checkpoint_writes, which is why successful sibling nodes do not need to rerun after one parallel branch fails. Use checkpoint_ns to separate graph versions, subgraphs, or assistants that share a thread ID.

Interview Q&A

Q1. What is the difference between a checkpointer and a Store?

A checkpointer saves graph state per thread_id - conversation memory within a session. A Store is a key-value store for cross-thread persistent memory - data that survives across multiple conversations. Use Store for user profiles, long-term preferences, or accumulated knowledge. Compile with both: graph.compile(checkpointer=…, store=…).

Q2. How does pending writes recovery work?

Within a super-step, LangGraph writes each node’s output to a checkpoint_writes table as a task entry. If node B fails, node A’s writes are already durable. On resume, A does not re-run - only B retries. This prevents duplicate side effects like sending an email twice from successful nodes.

Q3. How do you implement a rolling message window to control context length?

Define a custom reducer: def keep_last_n(old, new): return (old + new)[-20:]. Use Annotated[List, keep_last_n] in your TypedDict. This trims state before the next node runs. For production, also consider token-based trimming using LangChain’s trim_messages() utility to stay within model context limits.

Q4. Why can operator.add break counters?

operator.add works only if old and new have compatible types. A counter annotated as int must receive integer updates like tool_calls_made = 1. Returning a list update for that counter creates an int/list TypeError. A named add_int reducer makes that contract obvious.

Q5. What do checkpoint_ns and checkpoint_writes solve?

checkpoint_ns separates histories inside the same thread, often by graph version, assistant, or subgraph. checkpoint_writes records each task’s writes inside a super-step, so a failed parallel branch can resume without rerunning successful sibling branches and duplicating side effects.

Practice Task

Explain when this LangGraph pattern is safer than a linear chain, then name one production failure it prevents.