Skip to content

pydantic_graph

Graph

Bases: Generic[StateT, DepsT, RunEndT]

Definition of a graph.

In pydantic-graph, a graph is a collection of nodes that can be run in sequence. The nodes define their outgoing edges — e.g. which nodes may be run next, and thereby the structure of the graph.

Here’s a very simple example of a graph which increments a number by 1, but makes sure the number is never 42 at the end.

never_42.py
from __future__ import annotations

from dataclasses import dataclass

from pydantic_graph import BaseNode, End, Graph, GraphRunContext

@dataclass
class MyState:
    number: int

@dataclass
class Increment(BaseNode[MyState]):
    async def run(self, ctx: GraphRunContext) -> Check42:
        ctx.state.number += 1
        return Check42()

@dataclass
class Check42(BaseNode[MyState, None, int]):
    async def run(self, ctx: GraphRunContext) -> Increment | End[int]:
        if ctx.state.number == 42:
            return Increment()
        else:
            return End(ctx.state.number)

never_42_graph = Graph(nodes=(Increment, Check42))

(This example is complete, it can be run “as is”)

See run For an example of running graph, and mermaid_code for an example of generating a mermaid diagram from the graph.

Methods

__init__
def __init__(
    nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
    name: str | None = None,
    state_type: type[StateT] | _utils.Unset = _utils.UNSET,
    run_end_type: type[RunEndT] | _utils.Unset = _utils.UNSET,
    auto_instrument: bool = True,
)

Create a graph from a sequence of nodes.

Parameters

nodes : Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]

The nodes which make up the graph, nodes need to be unique and all be generic in the same state type.

name : str | None Default: None

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

state_type : type[StateT] | _utils.Unset Default: _utils.UNSET

The type of the state for the graph, this can generally be inferred from nodes.

run_end_type : type[RunEndT] | _utils.Unset Default: _utils.UNSET

The type of the result of running the graph, this can generally be inferred from nodes.

auto_instrument : bool Default: True

Whether to create a span for the graph run and the execution of each node’s run method.

run

@async

def run(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    infer_name: bool = True,
) -> GraphRunResult[StateT, RunEndT]

Run the graph from a starting node until it ends.

Here’s an example of running the graph from above:

run_never_42.py
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(1)
    await never_42_graph.run(Increment(), state=state)
    print(state)
    #> MyState(number=2)

    state = MyState(41)
    await never_42_graph.run(Increment(), state=state)
    print(state)
    #> MyState(number=43)
Returns

GraphRunResult[StateT, RunEndT] — A GraphRunResult containing information about the run, including its final result.

Parameters

start_node : BaseNode[StateT, DepsT, RunEndT]

the first node to run, since the graph definition doesn’t define the entry point in the graph, you need to provide the starting node.

state : StateT Default: None

The initial state of the graph.

deps : DepsT Default: None

The dependencies of the graph.

persistence : BaseStatePersistence[StateT, RunEndT] | None Default: None

State persistence interface, defaults to SimpleStatePersistence if None.

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

run_sync
def run_sync(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    infer_name: bool = True,
) -> GraphRunResult[StateT, RunEndT]

Synchronously run the graph.

This is a convenience method that wraps self.run with loop.run_until_complete(...). You therefore can’t use this method inside async code or if there’s an active event loop.

Returns

GraphRunResult[StateT, RunEndT] — The result type from ending the run and the history of the run.

Parameters

start_node : BaseNode[StateT, DepsT, RunEndT]

the first node to run, since the graph definition doesn’t define the entry point in the graph, you need to provide the starting node.

state : StateT Default: None

The initial state of the graph.

deps : DepsT Default: None

The dependencies of the graph.

persistence : BaseStatePersistence[StateT, RunEndT] | None Default: None

State persistence interface, defaults to SimpleStatePersistence if None.

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

iter

@async

def iter(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]

A contextmanager which can be used to iterate over the graph’s nodes as they are executed.

This method returns a GraphRun object which can be used to async-iterate over the nodes of this Graph as they are executed. This is the API to use if you want to record or interact with the nodes as the graph execution unfolds.

The GraphRun can also be used to manually drive the graph execution by calling GraphRun.next.

The GraphRun provides access to the full run history, state, deps, and the final result of the run once it has completed.

For more details, see the API documentation of GraphRun.

Returns: A GraphRun that can be async iterated over to drive the graph to completion.

Returns

AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]

Parameters

start_node : BaseNode[StateT, DepsT, RunEndT]

the first node to run. Since the graph definition doesn’t define the entry point in the graph, you need to provide the starting node.

state : StateT Default: None

The initial state of the graph.

deps : DepsT Default: None

The dependencies of the graph.

persistence : BaseStatePersistence[StateT, RunEndT] | None Default: None

State persistence interface, defaults to SimpleStatePersistence if None.

span : AbstractContextManager[AbstractSpan] | None Default: None

The span to use for the graph run. If not provided, a new span will be created.

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

iter_from_persistence

@async

def iter_from_persistence(
    persistence: BaseStatePersistence[StateT, RunEndT],
    deps: DepsT = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]

A contextmanager to iterate over the graph’s nodes as they are executed, created from a persistence object.

This method has similar functionality to iter, but instead of passing the node to run, it will restore the node and state from state persistence.

Returns: A GraphRun that can be async iterated over to drive the graph to completion.

Returns

AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]

Parameters

persistence : BaseStatePersistence[StateT, RunEndT]

The state persistence interface to use.

deps : DepsT Default: None

The dependencies of the graph.

span : AbstractContextManager[AbstractSpan] | None Default: None

The span to use for the graph run. If not provided, a new span will be created.

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

initialize

@async

def initialize(
    node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    state: StateT = None,
    infer_name: bool = True,
) -> None

Initialize a new graph run in persistence without running it.

This is useful if you want to set up a graph run to be run later, e.g. via iter_from_persistence.

Returns

None

Parameters

node : BaseNode[StateT, DepsT, RunEndT]

The node to run first.

persistence : BaseStatePersistence[StateT, RunEndT]

State persistence interface.

state : StateT Default: None

The start state of the graph.

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

mermaid_code
def mermaid_code(
    start_node: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
    title: str | None | typing_extensions.Literal[False] = None,
    edge_labels: bool = True,
    notes: bool = True,
    highlighted_nodes: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
    highlight_css: str = mermaid.DEFAULT_HIGHLIGHT_CSS,
    infer_name: bool = True,
    direction: mermaid.StateDiagramDirection | None = None,
) -> str

Generate a diagram representing the graph as mermaid diagram.

This method calls pydantic_graph.mermaid.generate_code.

Here’s an example of generating a diagram for the graph from above:

mermaid_never_42.py
from never_42 import Increment, never_42_graph

print(never_42_graph.mermaid_code(start_node=Increment))
'''
---
title: never_42_graph
---
stateDiagram-v2
  [*] --> Increment
  Increment --> Check42
  Check42 --> Increment
  Check42 --> [*]
'''

The rendered diagram will look like this:

---
title: never_42_graph
---
stateDiagram-v2
  [*] --> Increment
  Increment --> Check42
  Check42 --> Increment
  Check42 --> [*]
Returns

str — The mermaid code for the graph, which can then be rendered as a diagram.

Parameters

start_node : Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None Default: None

The node or nodes which can start the graph.

title : str | None | typing_extensions.Literal[False] Default: None

The title of the diagram, use False to not include a title.

edge_labels : bool Default: True

Whether to include edge labels.

notes : bool Default: True

Whether to include notes on each node.

highlighted_nodes : Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None Default: None

Optional node or nodes to highlight.

highlight_css : str Default: mermaid.DEFAULT_HIGHLIGHT_CSS

The CSS to use for highlighting nodes.

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

direction : mermaid.StateDiagramDirection | None Default: None

The direction of flow.

mermaid_image
def mermaid_image(
    infer_name: bool = True,
    kwargs: typing_extensions.Unpack[mermaid.MermaidConfig] = {},
) -> bytes

Generate a diagram representing the graph as an image.

The format and diagram can be customized using kwargs, see pydantic_graph.mermaid.MermaidConfig.

Returns

bytes — The image bytes.

Parameters

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

**kwargs : typing_extensions.Unpack[mermaid.MermaidConfig] Default: \{\}

Additional arguments to pass to mermaid.request_image.

mermaid_save
def mermaid_save(
    path: Path | str,
    infer_name: bool = True,
    kwargs: typing_extensions.Unpack[mermaid.MermaidConfig] = {},
) -> None

Generate a diagram representing the graph and save it as an image.

The format and diagram can be customized using kwargs, see pydantic_graph.mermaid.MermaidConfig.

Returns

None

Parameters

path : Path | str

The path to save the image to.

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

**kwargs : typing_extensions.Unpack[mermaid.MermaidConfig] Default: \{\}

Additional arguments to pass to mermaid.save_image.

get_nodes
def get_nodes() -> Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]

Get the nodes in the graph.

Returns

Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]

GraphRun

Bases: Generic[StateT, DepsT, RunEndT]

A stateful, async-iterable run of a Graph.

You typically get a GraphRun instance from calling async with [my_graph.iter(...)](/docs/ai/api/pydantic_graph/graph/#pydantic_graph.graph.Graph.iter) as graph_run:. That gives you the ability to iterate through nodes as they run, either by async for iteration or by repeatedly calling .next(...).

Here’s an example of iterating over the graph from above:

iter_never_42.py
from copy import deepcopy
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(1)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
        async for node in graph_run:
            node_states.append((node, deepcopy(graph_run.state)))
        print(node_states)
        '''
        [
            (Increment(), MyState(number=1)),
            (Increment(), MyState(number=1)),
            (Check42(), MyState(number=2)),
            (End(data=2), MyState(number=2)),
        ]
        '''

    state = MyState(41)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
        async for node in graph_run:
            node_states.append((node, deepcopy(graph_run.state)))
        print(node_states)
        '''
        [
            (Increment(), MyState(number=41)),
            (Increment(), MyState(number=41)),
            (Check42(), MyState(number=42)),
            (Increment(), MyState(number=42)),
            (Check42(), MyState(number=43)),
            (End(data=43), MyState(number=43)),
        ]
        '''

See the GraphRun.next documentation for an example of how to manually drive the graph run.

Attributes

next_node

The next node that will be run in the graph.

This is the next node that will be used during async iteration, or if a node is not passed to self.next(...).

Type: BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

result

The final result of the graph run if the run is completed, otherwise None.

Type: GraphRunResult[StateT, RunEndT] | None

Methods

__init__
def __init__(
    graph: Graph[StateT, DepsT, RunEndT],
    start_node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    state: StateT,
    deps: DepsT,
    traceparent: str | None,
    snapshot_id: str | None = None,
)

Create a new run for a given graph, starting at the specified node.

Typically, you’ll use Graph.iter rather than calling this directly.

Parameters

graph : Graph[StateT, DepsT, RunEndT]

The Graph to run.

start_node : BaseNode[StateT, DepsT, RunEndT]

The node where execution will begin.

persistence : BaseStatePersistence[StateT, RunEndT]

State persistence interface.

state : StateT

A shared state object or primitive (like a counter, dataclass, etc.) that is available to all nodes via ctx.state.

deps : DepsT

Optional dependencies that each node can access via ctx.deps, e.g. database connections, configuration, or logging clients.

traceparent : str | None

The traceparent for the span used for the graph run.

snapshot_id : str | None Default: None

The ID of the snapshot the node came from.

next

@async

def next(
    node: BaseNode[StateT, DepsT, RunEndT] | None = None,
) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

Manually drive the graph run by passing in the node you want to run next.

This lets you inspect or mutate the node before continuing execution, or skip certain nodes under dynamic conditions. The graph run should stop when you return an End node.

Here’s an example of using next to drive the graph from above:

next_never_42.py
from copy import deepcopy
from pydantic_graph import End
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(48)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        next_node = graph_run.next_node  # start with the first node
        node_states = [(next_node, deepcopy(graph_run.state))]

        while not isinstance(next_node, End):
            if graph_run.state.number == 50:
                graph_run.state.number = 42
            next_node = await graph_run.next(next_node)
            node_states.append((next_node, deepcopy(graph_run.state)))

        print(node_states)
        '''
        [
            (Increment(), MyState(number=48)),
            (Check42(), MyState(number=49)),
            (End(data=49), MyState(number=49)),
        ]
        '''
Returns

BaseNode[StateT, DepsT, RunEndT] | End[RunEndT] — The next node returned by the graph logic, or an End node if BaseNode[StateT, DepsT, RunEndT] | End[RunEndT] — the run has completed.

Parameters

node : BaseNode[StateT, DepsT, RunEndT] | None Default: None

The node to run next in the graph. If not specified, uses self.next_node, which is initialized to the start_node of the run and updated each time a new node is returned.

__anext__

@async

def __anext__() -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

Use the last returned node as the input to Graph.next.

Returns

BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

GraphRunResult

Bases: Generic[StateT, RunEndT]

The final result of running a graph.