Skip to content

pydantic_graph.graph_builder

Builder-based graph API: builder, graph runner, and mermaid rendering.

This module is the canonical home for the builder-based graph API: GraphBuilder for declaratively constructing executable graphs, Graph and GraphRun for executing them, and the mermaid rendering helpers used by Graph.render().

The same public symbols are re-exported from pydantic_graph directly. The deprecated pydantic_graph.beta.* namespace also forwards here.

The contents of three previously-separate modules are bundled here because they co-occupy the names (graph, mermaid) that the legacy BaseNode-based runner currently holds at the top level. In v2, after the legacy runner is removed, Graph and the mermaid helpers are expected to move out of this file to pydantic_graph.graph and pydantic_graph.mermaid respectively.

EndMarker

Bases: Generic[OutputT]

A marker indicating the end of graph execution with a final value.

EndMarker is used internally to signal that the graph has completed execution and carries the final output value.

ErrorMarker

A marker indicating that a graph node raised an exception.

Yielded by the graph iterator instead of raising immediately, allowing the caller to recover by sending new tasks via GraphRun.next() or GraphRun.override_next(). If the caller does not override, the error is re-raised on the next iteration.

Attributes

error

The exception raised by the node.

Type: BaseException

JoinItem

An item representing data flowing into a join operation.

JoinItem carries input data from a parallel execution path to a join node, along with metadata about which execution ‘fork’ it originated from.

Attributes

join_id

The ID of the join node this item is targeting.

Type: JoinID

inputs

The input data for the join operation.

Type: Any

fork_stack

The stack of ForkStackItems that led to producing this join item.

Type: ForkStack

Graph

Bases: Generic[StateT, DepsT, InputT, OutputT]

A complete graph definition ready for execution.

The Graph class represents a complete workflow graph with typed inputs, outputs, state, and dependencies. It contains all nodes, edges, and metadata needed for execution.

Attributes

name

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

Type: str | None

state_type

The type of the graph state.

Type: type[StateT]

deps_type

The type of the dependencies.

Type: type[DepsT]

input_type

The type of the input data.

Type: type[InputT]

output_type

The type of the output data.

Type: type[OutputT]

auto_instrument

Whether to automatically create instrumentation spans.

Type: bool

nodes

All nodes in the graph indexed by their ID.

Type: dict[NodeID, AnyNode]

edges_by_source

Outgoing paths from each source node.

Type: dict[NodeID, list[Path]]

parent_forks

Parent fork information for each join node.

Type: dict[JoinID, ParentFork[NodeID]]

intermediate_join_nodes

For each join, the set of other joins that appear between it and its parent fork.

Used to determine which joins are “final” (have no other joins as intermediates) and which joins should preserve fork stacks when proceeding downstream.

Type: dict[JoinID, set[JoinID]]

Methods

get_parent_fork
def get_parent_fork(join_id: JoinID) -> ParentFork[NodeID]

Get the parent fork information for a join node.

Returns

ParentFork[NodeID] — The parent fork information for the join

Parameters

join_id : JoinID

The ID of the join node

Raises
  • RuntimeError — If the join ID is not found or has no parent fork
is_final_join
def is_final_join(join_id: JoinID) -> bool

Check if a join is ‘final’ (has no downstream joins with the same parent fork).

A join is non-final if it appears as an intermediate node for another join with the same parent fork.

Returns

bool — True if the join is final, False if it’s non-final

Parameters

join_id : JoinID

The ID of the join node

run

@async

def run(
    state: StateT = None,
    deps: DepsT = None,
    inputs: InputT = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> OutputT

Execute the graph and return the final output.

This is the main entry point for graph execution. It runs the graph to completion and returns the final output value.

Returns

OutputT — The final output from the graph execution

Parameters

state : StateT Default: None

The graph state instance

deps : DepsT Default: None

The dependencies instance

inputs : InputT Default: None

The input data for the graph

span : AbstractContextManager[AbstractSpan] | None Default: None

Optional span for tracing/instrumentation

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

iter

@async

def iter(
    state: StateT = None,
    deps: DepsT = None,
    inputs: InputT = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, OutputT]]

Create an iterator for step-by-step graph execution.

This method allows for more fine-grained control over graph execution, enabling inspection of intermediate states and results.

Returns

AsyncIterator[GraphRun[StateT, DepsT, OutputT]]

Parameters

state : StateT Default: None

The graph state instance

deps : DepsT Default: None

The dependencies instance

inputs : InputT Default: None

The input data for the graph

span : AbstractContextManager[AbstractSpan] | None Default: None

Optional span for tracing/instrumentation

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

render
def render(
    title: str | None = None,
    direction: StateDiagramDirection | None = None,
) -> str

Render the graph as a Mermaid diagram string.

Returns

str — A string containing the Mermaid diagram representation

Parameters

title : str | None Default: None

Optional title for the diagram

direction : StateDiagramDirection | None Default: None

Optional direction for the diagram layout

__str__
def __str__() -> str

Return a Mermaid diagram representation of the graph.

Returns

str — A string containing the Mermaid diagram of the graph

GraphTaskRequest

A request to run a task representing the execution of a node in the graph.

GraphTaskRequest encapsulates all the information needed to execute a specific node, including its inputs and the fork context it’s executing within.

Attributes

node_id

The ID of the node to execute.

Type: NodeID

inputs

The input data for the node.

Type: Any

fork_stack

Stack of forks that have been entered.

Used by the GraphRun to decide when to proceed through joins.

Type: ForkStack Default: field(repr=False)

GraphTask

Bases: GraphTaskRequest

A task representing the execution of a node in the graph.

GraphTask encapsulates all the information needed to execute a specific node, including its inputs and the fork context it’s executing within, and has a unique ID to identify the task within the graph run.

Attributes

task_id

Unique identifier for this task.

Type: TaskID Default: field(repr=False)

GraphRun

Bases: Generic[StateT, DepsT, OutputT]

A single execution instance of a graph.

GraphRun manages the execution state for a single run of a graph, including task scheduling, fork/join coordination, and result tracking.

Attributes

graph

The graph being executed.

Default: graph

state

The graph state instance.

Default: state

deps

The dependencies instance.

Default: deps

inputs

The initial input data.

Default: inputs

next_task

Get the next task(s) to be executed.

Type: EndMarker[OutputT] | ErrorMarker | Sequence[GraphTask]

output

Get the final output if the graph has completed.

Type: OutputT | None

Methods

__init__
def __init__(
    graph: Graph[StateT, DepsT, InputT, OutputT],
    state: StateT,
    deps: DepsT,
    inputs: InputT,
    traceparent: str | None,
)

Initialize a graph run.

Parameters

graph : Graph[StateT, DepsT, InputT, OutputT]

The graph to execute

state : StateT

The graph state instance

deps : DepsT

The dependencies instance

inputs : InputT

The input data for the graph

traceparent : str | None

Optional trace parent for instrumentation

__aiter__
def __aiter__() -> AsyncIterator[EndMarker[OutputT] | Sequence[GraphTask]]

Return self as an async iterator.

Returns

AsyncIterator[EndMarker[OutputT] | Sequence[GraphTask]] — Self for async iteration

__anext__

@async

def __anext__() -> EndMarker[OutputT] | Sequence[GraphTask]

Get the next item in the async iteration.

Returns

EndMarker[OutputT] | Sequence[GraphTask] — The next execution result from the graph

Raises
  • Exception — If a node raised an error and the caller has not recovered via override_next().
next

@async

def next(
    value: EndMarker[OutputT] | Sequence[GraphTaskRequest] | None = None,
) -> EndMarker[OutputT] | Sequence[GraphTask]

Advance the graph execution by one step.

This method allows for sending a value to the iterator, which is useful for resuming iteration or overriding intermediate results.

Returns

EndMarker[OutputT] | Sequence[GraphTask] — The next execution result: either an EndMarker, or sequence of GraphTasks

Parameters

value : EndMarker[OutputT] | Sequence[GraphTaskRequest] | None Default: None

Optional value to send to the iterator

override_next
def override_next(value: Sequence[GraphTaskRequest] | EndMarker[OutputT]) -> None

Override the next pending step, allowing the graph to continue after an End or error.

This is used by hook systems (like after_node_run or on_node_run_error) to redirect the graph to a new node when the current step produced an End result or raised an error, or to signal early completion by passing an EndMarker.

Must only be called between iterations (not while an iteration is in flight).

Returns

None

Parameters

value : Sequence[GraphTaskRequest] | EndMarker[OutputT]

New task requests to execute next, or an EndMarker to signal completion.

GraphBuilder

Bases: Generic[StateT, DepsT, GraphInputT, GraphOutputT]

A builder for constructing executable graph definitions.

GraphBuilder provides a fluent interface for defining nodes, edges, and routing in a graph workflow. It supports typed state, dependencies, and input/output validation.

Attributes

name

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

Type: str | None Default: name

state_type

The type of the graph state.

Type: TypeOrTypeExpression[StateT] Default: state_type

deps_type

The type of the dependencies.

Type: TypeOrTypeExpression[DepsT] Default: deps_type

input_type

The type of the graph input data.

Type: TypeOrTypeExpression[GraphInputT] Default: input_type

output_type

The type of the graph output data.

Type: TypeOrTypeExpression[GraphOutputT] Default: output_type

auto_instrument

Whether to automatically create instrumentation spans.

Type: bool Default: auto_instrument

start_node

Get the start node for the graph.

Type: StartNode[GraphInputT]

end_node

Get the end node for the graph.

Type: EndNode[GraphOutputT]

Methods

__init__
def __init__(
    name: str | None = None,
    state_type: TypeOrTypeExpression[StateT] = NoneType,
    deps_type: TypeOrTypeExpression[DepsT] = NoneType,
    input_type: TypeOrTypeExpression[GraphInputT] = NoneType,
    output_type: TypeOrTypeExpression[GraphOutputT] = NoneType,
    auto_instrument: bool = True,
)

Initialize a graph builder.

Parameters

name : str | None Default: None

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

state_type : TypeOrTypeExpression[StateT] Default: NoneType

The type of the graph state

deps_type : TypeOrTypeExpression[DepsT] Default: NoneType

The type of the dependencies

input_type : TypeOrTypeExpression[GraphInputT] Default: NoneType

The type of the graph input data

output_type : TypeOrTypeExpression[GraphOutputT] Default: NoneType

The type of the graph output data

auto_instrument : bool Default: True

Whether to automatically create instrumentation spans

step
def step(
    node_id: str | None = None,
    label: str | None = None,
) -> Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]]
def step(
    call: StepFunction[StateT, DepsT, InputT, OutputT],
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, OutputT]

Create a step from a step function.

This method can be used as a decorator or called directly to create a step node from an async function.

Returns

Step[StateT, DepsT, InputT, OutputT] | Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]] — Either a Step instance or a decorator function

Parameters

call : StepFunction[StateT, DepsT, InputT, OutputT] | None Default: None

The step function to wrap

node_id : str | None Default: None

Optional ID for the node

label : str | None Default: None

Optional human-readable label

stream
def stream(
    node_id: str | None = None,
    label: str | None = None,
) -> Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
def stream(
    call: StreamFunction[StateT, DepsT, InputT, OutputT],
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]
def stream(
    call: StreamFunction[StateT, DepsT, InputT, OutputT] | None = None,
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]

Create a step from an async iterator (which functions like a “stream”).

This method can be used as a decorator or called directly to create a step node from an async function.

Returns

Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]] — Either a Step instance or a decorator function

Parameters

call : StreamFunction[StateT, DepsT, InputT, OutputT] | None Default: None

The step function to wrap

node_id : str | None Default: None

Optional ID for the node

label : str | None Default: None

Optional human-readable label

add
def add(edges: EdgePath[StateT, DepsT] = ()) -> None

Add one or more edge paths to the graph.

This method processes edge paths and automatically creates any necessary fork nodes for broadcasts and maps.

Returns

None

Parameters

*edges : EdgePath[StateT, DepsT] Default: ()

The edge paths to add to the graph

add_edge
def add_edge(
    source: Source[T],
    destination: Destination[T],
    label: str | None = None,
) -> None

Add a simple edge between two nodes.

Returns

None

Parameters

source : Source[T]

The source node

destination : Destination[T]

The destination node

label : str | None Default: None

Optional label for the edge

add_mapping_edge
def add_mapping_edge(
    source: Source[Iterable[T]],
    map_to: Destination[T],
    pre_map_label: str | None = None,
    post_map_label: str | None = None,
    fork_id: ForkID | None = None,
    downstream_join_id: JoinID | None = None,
) -> None

Add an edge that maps iterable data across parallel paths.

Returns

None

Parameters

source : Source[Iterable[T]]

The source node that produces iterable data

map_to : Destination[T]

The destination node that receives individual items

pre_map_label : str | None Default: None

Optional label before the map operation

post_map_label : str | None Default: None

Optional label after the map operation

fork_id : ForkID | None Default: None

Optional ID for the fork node produced for this map operation

downstream_join_id : JoinID | None Default: None

Optional ID of a join node that will always be downstream of this map. Specifying this ensures correct handling if you try to map an empty iterable.

edge_from
def edge_from(
    sources: Source[SourceOutputT] = (),
) -> EdgePathBuilder[StateT, DepsT, SourceOutputT]

Create an edge path builder starting from the given source nodes.

Returns

EdgePathBuilder[StateT, DepsT, SourceOutputT] — An EdgePathBuilder for constructing the complete edge path

Parameters

*sources : Source[SourceOutputT] Default: ()

The source nodes to start the edge path from

decision
def decision(
    note: str | None = None,
    node_id: str | None = None,
) -> Decision[StateT, DepsT, Never]

Create a new decision node.

Returns

Decision[StateT, DepsT, Never] — A new Decision node with no branches

Parameters

note : str | None Default: None

Optional note to describe the decision logic

node_id : str | None Default: None

Optional ID for the node produced for this decision logic

match
def match(
    source: TypeOrTypeExpression[SourceT],
    matches: Callable[[Any], bool] | None = None,
) -> DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never]

Create a decision branch matcher.

Returns

DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never] — A DecisionBranchBuilder for constructing the branch

Parameters

source : TypeOrTypeExpression[SourceT]

The type or type expression to match against

matches : Callable[[Any], bool] | None Default: None

Optional custom matching function

match_node
def match_node(
    source: type[SourceNodeT],
    matches: Callable[[Any], bool] | None = None,
) -> DecisionBranch[SourceNodeT]

Create a decision branch for BaseNode subclasses.

This is similar to match() but specifically designed for matching against BaseNode types from the v1 system.

Returns

DecisionBranch[SourceNodeT] — A DecisionBranch for the BaseNode type

Parameters

source : type[SourceNodeT]

The BaseNode subclass to match against

matches : Callable[[Any], bool] | None Default: None

Optional custom matching function

node
def node(
    node_type: type[BaseNode[StateT, DepsT, GraphOutputT]],
) -> EdgePath[StateT, DepsT]

Create an edge path from a BaseNode class.

This method integrates v1-style BaseNode classes into the v2 graph system by analyzing their type hints and creating appropriate edges.

Returns

EdgePath[StateT, DepsT] — An EdgePath representing the node and its connections

Parameters

node_type : type[BaseNode[StateT, DepsT, GraphOutputT]]

The BaseNode subclass to integrate

Raises
  • GraphSetupError — If the node type is missing required type hints
build
def build(
    validate_graph_structure: bool = True,
) -> Graph[StateT, DepsT, GraphInputT, GraphOutputT]

Build the final executable graph from the accumulated nodes and edges.

This method performs validation, normalization, and analysis of the graph structure to create a complete, executable graph instance.

Returns

Graph[StateT, DepsT, GraphInputT, GraphOutputT] — A complete Graph instance ready for execution

Parameters

validate_graph_structure : bool Default: True

whether to perform validation of the graph structure See the docstring of _validate_graph_structure below for more details.

Raises
  • ValueError — If the graph structure is invalid (e.g., join without parent fork)

MermaidNode

A mermaid node.

MermaidEdge

A mermaid edge.

MermaidGraph

A mermaid graph.

build_mermaid_graph

def build_mermaid_graph(
    graph_nodes: dict[NodeID, AnyNode],
    graph_edges_by_source: dict[NodeID, list[Path]],
) -> MermaidGraph

Build a mermaid graph.

Returns

MermaidGraph

StateT

Type variable for graph state.

Default: TypeVar('StateT', infer_variance=True)

DepsT

Type variable for graph dependencies.

Default: TypeVar('DepsT', infer_variance=True)

InputT

Type variable for graph inputs.

Default: TypeVar('InputT', infer_variance=True)

OutputT

Type variable for graph outputs.

Default: TypeVar('OutputT', infer_variance=True)

DEFAULT_HIGHLIGHT_CSS

The default CSS to use for highlighting nodes.

Default: 'fill:#fdff32'

StateDiagramDirection

Used to specify the direction of the state diagram generated by mermaid.

  • 'TB': Top to bottom, this is the default for mermaid charts.
  • 'LR': Left to right
  • 'RL': Right to left
  • 'BT': Bottom to top

Default: Literal['TB', 'LR', 'RL', 'BT']