pydantic_graph.beta.graph_builder
Graph builder for constructing executable graph definitions.
This module provides the GraphBuilder class and related utilities for constructing typed, executable graph definitions with steps, joins, decisions, and edge routing.
Bases: Generic[StateT, DepsT, GraphInputT, GraphOutputT]
A builder for constructing executable graph definitions.
GraphBuilder provides a fluent interface for defining nodes, edges, and routing in a graph workflow. It supports typed state, dependencies, and input/output validation.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
Type: str | None Default: name
The type of the graph state.
Type: TypeOrTypeExpression[StateT] Default: state_type
The type of the dependencies.
Type: TypeOrTypeExpression[DepsT] Default: deps_type
The type of the graph input data.
Type: TypeOrTypeExpression[GraphInputT] Default: input_type
The type of the graph output data.
Type: TypeOrTypeExpression[GraphOutputT] Default: output_type
Whether to automatically create instrumentation spans.
Type: bool Default: auto_instrument
Get the start node for the graph.
Type: StartNode[GraphInputT]
Get the end node for the graph.
Type: EndNode[GraphOutputT]
def __init__(
name: str | None = None,
state_type: TypeOrTypeExpression[StateT] = NoneType,
deps_type: TypeOrTypeExpression[DepsT] = NoneType,
input_type: TypeOrTypeExpression[GraphInputT] = NoneType,
output_type: TypeOrTypeExpression[GraphOutputT] = NoneType,
auto_instrument: bool = True,
)
Initialize a graph builder.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
The type of the graph state
The type of the dependencies
The type of the graph input data
The type of the graph output data
auto_instrument : bool Default: True
Whether to automatically create instrumentation spans
def step(
node_id: str | None = None,
label: str | None = None,
) -> Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]]
def step(
call: StepFunction[StateT, DepsT, InputT, OutputT],
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, OutputT]
Create a step from a step function.
This method can be used as a decorator or called directly to create a step node from an async function.
Step[StateT, DepsT, InputT, OutputT] | Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]] — Either a Step instance or a decorator function
call : StepFunction[StateT, DepsT, InputT, OutputT] | None Default: None
The step function to wrap
Optional ID for the node
Optional human-readable label
def stream(
node_id: str | None = None,
label: str | None = None,
) -> Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
def stream(
call: StreamFunction[StateT, DepsT, InputT, OutputT],
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]
def stream(
call: StreamFunction[StateT, DepsT, InputT, OutputT] | None = None,
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
Create a step from an async iterator (which functions like a “stream”).
This method can be used as a decorator or called directly to create a step node from an async function.
Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]] — Either a Step instance or a decorator function
call : StreamFunction[StateT, DepsT, InputT, OutputT] | None Default: None
The step function to wrap
Optional ID for the node
Optional human-readable label
def add(edges: EdgePath[StateT, DepsT] = ()) -> None
Add one or more edge paths to the graph.
This method processes edge paths and automatically creates any necessary fork nodes for broadcasts and maps.
The edge paths to add to the graph
def add_edge(
source: Source[T],
destination: Destination[T],
label: str | None = None,
) -> None
Add a simple edge between two nodes.
The source node
The destination node
Optional label for the edge
def add_mapping_edge(
source: Source[Iterable[T]],
map_to: Destination[T],
pre_map_label: str | None = None,
post_map_label: str | None = None,
fork_id: ForkID | None = None,
downstream_join_id: JoinID | None = None,
) -> None
Add an edge that maps iterable data across parallel paths.
source : Source[Iterable[T]]
The source node that produces iterable data
The destination node that receives individual items
Optional label before the map operation
Optional label after the map operation
fork_id : ForkID | None Default: None
Optional ID for the fork node produced for this map operation
downstream_join_id : JoinID | None Default: None
Optional ID of a join node that will always be downstream of this map. Specifying this ensures correct handling if you try to map an empty iterable.
def edge_from(
sources: Source[SourceOutputT] = (),
) -> EdgePathBuilder[StateT, DepsT, SourceOutputT]
Create an edge path builder starting from the given source nodes.
EdgePathBuilder[StateT, DepsT, SourceOutputT] — An EdgePathBuilder for constructing the complete edge path
The source nodes to start the edge path from
def decision(
note: str | None = None,
node_id: str | None = None,
) -> Decision[StateT, DepsT, Never]
Create a new decision node.
Decision[StateT, DepsT, Never] — A new Decision node with no branches
Optional note to describe the decision logic
Optional ID for the node produced for this decision logic
def match(
source: TypeOrTypeExpression[SourceT],
matches: Callable[[Any], bool] | None = None,
) -> DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never]
Create a decision branch matcher.
DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never] — A DecisionBranchBuilder for constructing the branch
The type or type expression to match against
Optional custom matching function
def match_node(
source: type[SourceNodeT],
matches: Callable[[Any], bool] | None = None,
) -> DecisionBranch[SourceNodeT]
Create a decision branch for BaseNode subclasses.
This is similar to match() but specifically designed for matching against BaseNode types from the v1 system.
DecisionBranch[SourceNodeT] — A DecisionBranch for the BaseNode type
source : type[SourceNodeT]
The BaseNode subclass to match against
Optional custom matching function
def node(
node_type: type[BaseNode[StateT, DepsT, GraphOutputT]],
) -> EdgePath[StateT, DepsT]
Create an edge path from a BaseNode class.
This method integrates v1-style BaseNode classes into the v2 graph system by analyzing their type hints and creating appropriate edges.
EdgePath[StateT, DepsT] — An EdgePath representing the node and its connections
node_type : type[BaseNode[StateT, DepsT, GraphOutputT]]
The BaseNode subclass to integrate
GraphSetupError— If the node type is missing required type hints
def build(
validate_graph_structure: bool = True,
) -> Graph[StateT, DepsT, GraphInputT, GraphOutputT]
Build the final executable graph from the accumulated nodes and edges.
This method performs validation, normalization, and analysis of the graph structure to create a complete, executable graph instance.
Graph[StateT, DepsT, GraphInputT, GraphOutputT] — A complete Graph instance ready for execution
validate_graph_structure : bool Default: True
whether to perform validation of the graph structure See the docstring of _validate_graph_structure below for more details.
ValueError— If the graph structure is invalid (e.g., join without parent fork)