pydantic_graph.graph_builder
Builder-based graph API: builder, graph runner, and mermaid rendering.
This module is the canonical home for the builder-based graph API:
GraphBuilder for declaratively
constructing executable graphs, Graph
and GraphRun for executing them, and
the mermaid rendering helpers used by Graph.render().
The same public symbols are re-exported from pydantic_graph directly. The
deprecated pydantic_graph.beta.* namespace also forwards here.
The contents of three previously-separate modules are bundled here because they
co-occupy the names (graph, mermaid) that the legacy BaseNode-based
runner currently holds at the top level. In v2, after the legacy runner is
removed, Graph and the mermaid helpers are expected to move out of this file
to pydantic_graph.graph and pydantic_graph.mermaid respectively.
Bases: Generic[OutputT]
A marker indicating the end of graph execution with a final value.
EndMarker is used internally to signal that the graph has completed execution and carries the final output value.
A marker indicating that a graph node raised an exception.
Yielded by the graph iterator instead of raising immediately, allowing the caller
to recover by sending new tasks via GraphRun.next() or GraphRun.override_next().
If the caller does not override, the error is re-raised on the next iteration.
The exception raised by the node.
Type: BaseException
An item representing data flowing into a join operation.
JoinItem carries input data from a parallel execution path to a join node, along with metadata about which execution ‘fork’ it originated from.
The ID of the join node this item is targeting.
Type: JoinID
The input data for the join operation.
Type: Any
The stack of ForkStackItems that led to producing this join item.
Type: ForkStack
Bases: Generic[StateT, DepsT, InputT, OutputT]
A complete graph definition ready for execution.
The Graph class represents a complete workflow graph with typed inputs, outputs, state, and dependencies. It contains all nodes, edges, and metadata needed for execution.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
The type of the graph state.
Type: type[StateT]
The type of the dependencies.
Type: type[DepsT]
The type of the input data.
Type: type[InputT]
The type of the output data.
Type: type[OutputT]
Whether to automatically create instrumentation spans.
Type: bool
All nodes in the graph indexed by their ID.
Type: dict[NodeID, AnyNode]
Outgoing paths from each source node.
Type: dict[NodeID, list[Path]]
Parent fork information for each join node.
Type: dict[JoinID, ParentFork[NodeID]]
For each join, the set of other joins that appear between it and its parent fork.
Used to determine which joins are “final” (have no other joins as intermediates) and which joins should preserve fork stacks when proceeding downstream.
Type: dict[JoinID, set[JoinID]]
def get_parent_fork(join_id: JoinID) -> ParentFork[NodeID]
Get the parent fork information for a join node.
ParentFork[NodeID] — The parent fork information for the join
The ID of the join node
RuntimeError— If the join ID is not found or has no parent fork
def is_final_join(join_id: JoinID) -> bool
Check if a join is ‘final’ (has no downstream joins with the same parent fork).
A join is non-final if it appears as an intermediate node for another join with the same parent fork.
bool — True if the join is final, False if it’s non-final
The ID of the join node
@async
def run(
state: StateT = None,
deps: DepsT = None,
inputs: InputT = None,
span: AbstractContextManager[AbstractSpan] | None = None,
infer_name: bool = True,
) -> OutputT
Execute the graph and return the final output.
This is the main entry point for graph execution. It runs the graph to completion and returns the final output value.
OutputT — The final output from the graph execution
The graph state instance
The dependencies instance
The input data for the graph
span : AbstractContextManager[AbstractSpan] | None Default: None
Optional span for tracing/instrumentation
infer_name : bool Default: True
Whether to infer the graph name from the calling frame.
@async
def iter(
state: StateT = None,
deps: DepsT = None,
inputs: InputT = None,
span: AbstractContextManager[AbstractSpan] | None = None,
infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, OutputT]]
Create an iterator for step-by-step graph execution.
This method allows for more fine-grained control over graph execution, enabling inspection of intermediate states and results.
AsyncIterator[GraphRun[StateT, DepsT, OutputT]]
The graph state instance
The dependencies instance
The input data for the graph
span : AbstractContextManager[AbstractSpan] | None Default: None
Optional span for tracing/instrumentation
infer_name : bool Default: True
Whether to infer the graph name from the calling frame.
def render(
title: str | None = None,
direction: StateDiagramDirection | None = None,
) -> str
Render the graph as a Mermaid diagram string.
str — A string containing the Mermaid diagram representation
Optional title for the diagram
direction : StateDiagramDirection | None Default: None
Optional direction for the diagram layout
def __str__() -> str
Return a Mermaid diagram representation of the graph.
str — A string containing the Mermaid diagram of the graph
A request to run a task representing the execution of a node in the graph.
GraphTaskRequest encapsulates all the information needed to execute a specific node, including its inputs and the fork context it’s executing within.
The ID of the node to execute.
Type: NodeID
The input data for the node.
Type: Any
Stack of forks that have been entered.
Used by the GraphRun to decide when to proceed through joins.
Type: ForkStack Default: field(repr=False)
Bases: GraphTaskRequest
A task representing the execution of a node in the graph.
GraphTask encapsulates all the information needed to execute a specific node, including its inputs and the fork context it’s executing within, and has a unique ID to identify the task within the graph run.
Unique identifier for this task.
Type: TaskID Default: field(repr=False)
Bases: Generic[StateT, DepsT, OutputT]
A single execution instance of a graph.
GraphRun manages the execution state for a single run of a graph, including task scheduling, fork/join coordination, and result tracking.
The graph being executed.
Default: graph
The graph state instance.
Default: state
The dependencies instance.
Default: deps
The initial input data.
Default: inputs
Get the next task(s) to be executed.
Type: EndMarker[OutputT] | ErrorMarker | Sequence[GraphTask]
Get the final output if the graph has completed.
Type: OutputT | None
def __init__(
graph: Graph[StateT, DepsT, InputT, OutputT],
state: StateT,
deps: DepsT,
inputs: InputT,
traceparent: str | None,
)
Initialize a graph run.
The graph to execute
The graph state instance
The dependencies instance
The input data for the graph
Optional trace parent for instrumentation
def __aiter__() -> AsyncIterator[EndMarker[OutputT] | Sequence[GraphTask]]
Return self as an async iterator.
AsyncIterator[EndMarker[OutputT] | Sequence[GraphTask]] — Self for async iteration
@async
def __anext__() -> EndMarker[OutputT] | Sequence[GraphTask]
Get the next item in the async iteration.
EndMarker[OutputT] | Sequence[GraphTask] — The next execution result from the graph
Exception— If a node raised an error and the caller has not recovered viaoverride_next().
@async
def next(
value: EndMarker[OutputT] | Sequence[GraphTaskRequest] | None = None,
) -> EndMarker[OutputT] | Sequence[GraphTask]
Advance the graph execution by one step.
This method allows for sending a value to the iterator, which is useful for resuming iteration or overriding intermediate results.
EndMarker[OutputT] | Sequence[GraphTask] — The next execution result: either an EndMarker, or sequence of GraphTasks
Optional value to send to the iterator
def override_next(value: Sequence[GraphTaskRequest] | EndMarker[OutputT]) -> None
Override the next pending step, allowing the graph to continue after an End or error.
This is used by hook systems (like after_node_run or on_node_run_error) to redirect
the graph to a new node when the current step produced an End result or raised an error,
or to signal early completion by passing an EndMarker.
Must only be called between iterations (not while an iteration is in flight).
value : Sequence[GraphTaskRequest] | EndMarker[OutputT]
New task requests to execute next, or an EndMarker to signal completion.
Bases: Generic[StateT, DepsT, GraphInputT, GraphOutputT]
A builder for constructing executable graph definitions.
GraphBuilder provides a fluent interface for defining nodes, edges, and routing in a graph workflow. It supports typed state, dependencies, and input/output validation.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
Type: str | None Default: name
The type of the graph state.
Type: TypeOrTypeExpression[StateT] Default: state_type
The type of the dependencies.
Type: TypeOrTypeExpression[DepsT] Default: deps_type
The type of the graph input data.
Type: TypeOrTypeExpression[GraphInputT] Default: input_type
The type of the graph output data.
Type: TypeOrTypeExpression[GraphOutputT] Default: output_type
Whether to automatically create instrumentation spans.
Type: bool Default: auto_instrument
Get the start node for the graph.
Type: StartNode[GraphInputT]
Get the end node for the graph.
Type: EndNode[GraphOutputT]
def __init__(
name: str | None = None,
state_type: TypeOrTypeExpression[StateT] = NoneType,
deps_type: TypeOrTypeExpression[DepsT] = NoneType,
input_type: TypeOrTypeExpression[GraphInputT] = NoneType,
output_type: TypeOrTypeExpression[GraphOutputT] = NoneType,
auto_instrument: bool = True,
)
Initialize a graph builder.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
The type of the graph state
The type of the dependencies
The type of the graph input data
The type of the graph output data
auto_instrument : bool Default: True
Whether to automatically create instrumentation spans
def step(
node_id: str | None = None,
label: str | None = None,
) -> Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]]
def step(
call: StepFunction[StateT, DepsT, InputT, OutputT],
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, OutputT]
Create a step from a step function.
This method can be used as a decorator or called directly to create a step node from an async function.
Step[StateT, DepsT, InputT, OutputT] | Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]] — Either a Step instance or a decorator function
call : StepFunction[StateT, DepsT, InputT, OutputT] | None Default: None
The step function to wrap
Optional ID for the node
Optional human-readable label
def stream(
node_id: str | None = None,
label: str | None = None,
) -> Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
def stream(
call: StreamFunction[StateT, DepsT, InputT, OutputT],
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]
def stream(
call: StreamFunction[StateT, DepsT, InputT, OutputT] | None = None,
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
Create a step from an async iterator (which functions like a “stream”).
This method can be used as a decorator or called directly to create a step node from an async function.
Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]] — Either a Step instance or a decorator function
call : StreamFunction[StateT, DepsT, InputT, OutputT] | None Default: None
The step function to wrap
Optional ID for the node
Optional human-readable label
def add(edges: EdgePath[StateT, DepsT] = ()) -> None
Add one or more edge paths to the graph.
This method processes edge paths and automatically creates any necessary fork nodes for broadcasts and maps.
The edge paths to add to the graph
def add_edge(
source: Source[T],
destination: Destination[T],
label: str | None = None,
) -> None
Add a simple edge between two nodes.
The source node
The destination node
Optional label for the edge
def add_mapping_edge(
source: Source[Iterable[T]],
map_to: Destination[T],
pre_map_label: str | None = None,
post_map_label: str | None = None,
fork_id: ForkID | None = None,
downstream_join_id: JoinID | None = None,
) -> None
Add an edge that maps iterable data across parallel paths.
source : Source[Iterable[T]]
The source node that produces iterable data
The destination node that receives individual items
Optional label before the map operation
Optional label after the map operation
fork_id : ForkID | None Default: None
Optional ID for the fork node produced for this map operation
downstream_join_id : JoinID | None Default: None
Optional ID of a join node that will always be downstream of this map. Specifying this ensures correct handling if you try to map an empty iterable.
def edge_from(
sources: Source[SourceOutputT] = (),
) -> EdgePathBuilder[StateT, DepsT, SourceOutputT]
Create an edge path builder starting from the given source nodes.
EdgePathBuilder[StateT, DepsT, SourceOutputT] — An EdgePathBuilder for constructing the complete edge path
The source nodes to start the edge path from
def decision(
note: str | None = None,
node_id: str | None = None,
) -> Decision[StateT, DepsT, Never]
Create a new decision node.
Decision[StateT, DepsT, Never] — A new Decision node with no branches
Optional note to describe the decision logic
Optional ID for the node produced for this decision logic
def match(
source: TypeOrTypeExpression[SourceT],
matches: Callable[[Any], bool] | None = None,
) -> DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never]
Create a decision branch matcher.
DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never] — A DecisionBranchBuilder for constructing the branch
The type or type expression to match against
Optional custom matching function
def match_node(
source: type[SourceNodeT],
matches: Callable[[Any], bool] | None = None,
) -> DecisionBranch[SourceNodeT]
Create a decision branch for BaseNode subclasses.
This is similar to match() but specifically designed for matching against BaseNode types from the v1 system.
DecisionBranch[SourceNodeT] — A DecisionBranch for the BaseNode type
source : type[SourceNodeT]
The BaseNode subclass to match against
Optional custom matching function
def node(
node_type: type[BaseNode[StateT, DepsT, GraphOutputT]],
) -> EdgePath[StateT, DepsT]
Create an edge path from a BaseNode class.
This method integrates v1-style BaseNode classes into the v2 graph system by analyzing their type hints and creating appropriate edges.
EdgePath[StateT, DepsT] — An EdgePath representing the node and its connections
node_type : type[BaseNode[StateT, DepsT, GraphOutputT]]
The BaseNode subclass to integrate
GraphSetupError— If the node type is missing required type hints
def build(
validate_graph_structure: bool = True,
) -> Graph[StateT, DepsT, GraphInputT, GraphOutputT]
Build the final executable graph from the accumulated nodes and edges.
This method performs validation, normalization, and analysis of the graph structure to create a complete, executable graph instance.
Graph[StateT, DepsT, GraphInputT, GraphOutputT] — A complete Graph instance ready for execution
validate_graph_structure : bool Default: True
whether to perform validation of the graph structure See the docstring of _validate_graph_structure below for more details.
ValueError— If the graph structure is invalid (e.g., join without parent fork)
A mermaid node.
A mermaid edge.
A mermaid graph.
def build_mermaid_graph(
graph_nodes: dict[NodeID, AnyNode],
graph_edges_by_source: dict[NodeID, list[Path]],
) -> MermaidGraph
Build a mermaid graph.
MermaidGraph
Type variable for graph state.
Default: TypeVar('StateT', infer_variance=True)
Type variable for graph dependencies.
Default: TypeVar('DepsT', infer_variance=True)
Type variable for graph inputs.
Default: TypeVar('InputT', infer_variance=True)
Type variable for graph outputs.
Default: TypeVar('OutputT', infer_variance=True)
The default CSS to use for highlighting nodes.
Default: 'fill:#fdff32'
Used to specify the direction of the state diagram generated by mermaid.
'TB': Top to bottom, this is the default for mermaid charts.'LR': Left to right'RL': Right to left'BT': Bottom to top
Default: Literal['TB', 'LR', 'RL', 'BT']