Skip to content

pydantic_graph.beta.graph_builder

Graph builder for constructing executable graph definitions.

This module provides the GraphBuilder class and related utilities for constructing typed, executable graph definitions with steps, joins, decisions, and edge routing.

GraphBuilder

Bases: Generic[StateT, DepsT, GraphInputT, GraphOutputT]

A builder for constructing executable graph definitions.

GraphBuilder provides a fluent interface for defining nodes, edges, and routing in a graph workflow. It supports typed state, dependencies, and input/output validation.

Attributes

name

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

Type: str | None Default: name

state_type

The type of the graph state.

Type: TypeOrTypeExpression[StateT] Default: state_type

deps_type

The type of the dependencies.

Type: TypeOrTypeExpression[DepsT] Default: deps_type

input_type

The type of the graph input data.

Type: TypeOrTypeExpression[GraphInputT] Default: input_type

output_type

The type of the graph output data.

Type: TypeOrTypeExpression[GraphOutputT] Default: output_type

auto_instrument

Whether to automatically create instrumentation spans.

Type: bool Default: auto_instrument

start_node

Get the start node for the graph.

Type: StartNode[GraphInputT]

end_node

Get the end node for the graph.

Type: EndNode[GraphOutputT]

Methods

__init__
def __init__(
    name: str | None = None,
    state_type: TypeOrTypeExpression[StateT] = NoneType,
    deps_type: TypeOrTypeExpression[DepsT] = NoneType,
    input_type: TypeOrTypeExpression[GraphInputT] = NoneType,
    output_type: TypeOrTypeExpression[GraphOutputT] = NoneType,
    auto_instrument: bool = True,
)

Initialize a graph builder.

Parameters

name : str | None Default: None

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

state_type : TypeOrTypeExpression[StateT] Default: NoneType

The type of the graph state

deps_type : TypeOrTypeExpression[DepsT] Default: NoneType

The type of the dependencies

input_type : TypeOrTypeExpression[GraphInputT] Default: NoneType

The type of the graph input data

output_type : TypeOrTypeExpression[GraphOutputT] Default: NoneType

The type of the graph output data

auto_instrument : bool Default: True

Whether to automatically create instrumentation spans

step
def step(
    node_id: str | None = None,
    label: str | None = None,
) -> Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]]
def step(
    call: StepFunction[StateT, DepsT, InputT, OutputT],
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, OutputT]

Create a step from a step function.

This method can be used as a decorator or called directly to create a step node from an async function.

Returns

Step[StateT, DepsT, InputT, OutputT] | Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]] — Either a Step instance or a decorator function

Parameters

call : StepFunction[StateT, DepsT, InputT, OutputT] | None Default: None

The step function to wrap

node_id : str | None Default: None

Optional ID for the node

label : str | None Default: None

Optional human-readable label

stream
def stream(
    node_id: str | None = None,
    label: str | None = None,
) -> Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
def stream(
    call: StreamFunction[StateT, DepsT, InputT, OutputT],
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]
def stream(
    call: StreamFunction[StateT, DepsT, InputT, OutputT] | None = None,
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]

Create a step from an async iterator (which functions like a “stream”).

This method can be used as a decorator or called directly to create a step node from an async function.

Returns

Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]] — Either a Step instance or a decorator function

Parameters

call : StreamFunction[StateT, DepsT, InputT, OutputT] | None Default: None

The step function to wrap

node_id : str | None Default: None

Optional ID for the node

label : str | None Default: None

Optional human-readable label

add
def add(edges: EdgePath[StateT, DepsT] = ()) -> None

Add one or more edge paths to the graph.

This method processes edge paths and automatically creates any necessary fork nodes for broadcasts and maps.

Returns

None

Parameters

*edges : EdgePath[StateT, DepsT] Default: ()

The edge paths to add to the graph

add_edge
def add_edge(
    source: Source[T],
    destination: Destination[T],
    label: str | None = None,
) -> None

Add a simple edge between two nodes.

Returns

None

Parameters

source : Source[T]

The source node

destination : Destination[T]

The destination node

label : str | None Default: None

Optional label for the edge

add_mapping_edge
def add_mapping_edge(
    source: Source[Iterable[T]],
    map_to: Destination[T],
    pre_map_label: str | None = None,
    post_map_label: str | None = None,
    fork_id: ForkID | None = None,
    downstream_join_id: JoinID | None = None,
) -> None

Add an edge that maps iterable data across parallel paths.

Returns

None

Parameters

source : Source[Iterable[T]]

The source node that produces iterable data

map_to : Destination[T]

The destination node that receives individual items

pre_map_label : str | None Default: None

Optional label before the map operation

post_map_label : str | None Default: None

Optional label after the map operation

fork_id : ForkID | None Default: None

Optional ID for the fork node produced for this map operation

downstream_join_id : JoinID | None Default: None

Optional ID of a join node that will always be downstream of this map. Specifying this ensures correct handling if you try to map an empty iterable.

edge_from
def edge_from(
    sources: Source[SourceOutputT] = (),
) -> EdgePathBuilder[StateT, DepsT, SourceOutputT]

Create an edge path builder starting from the given source nodes.

Returns

EdgePathBuilder[StateT, DepsT, SourceOutputT] — An EdgePathBuilder for constructing the complete edge path

Parameters

*sources : Source[SourceOutputT] Default: ()

The source nodes to start the edge path from

decision
def decision(
    note: str | None = None,
    node_id: str | None = None,
) -> Decision[StateT, DepsT, Never]

Create a new decision node.

Returns

Decision[StateT, DepsT, Never] — A new Decision node with no branches

Parameters

note : str | None Default: None

Optional note to describe the decision logic

node_id : str | None Default: None

Optional ID for the node produced for this decision logic

match
def match(
    source: TypeOrTypeExpression[SourceT],
    matches: Callable[[Any], bool] | None = None,
) -> DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never]

Create a decision branch matcher.

Returns

DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never] — A DecisionBranchBuilder for constructing the branch

Parameters

source : TypeOrTypeExpression[SourceT]

The type or type expression to match against

matches : Callable[[Any], bool] | None Default: None

Optional custom matching function

match_node
def match_node(
    source: type[SourceNodeT],
    matches: Callable[[Any], bool] | None = None,
) -> DecisionBranch[SourceNodeT]

Create a decision branch for BaseNode subclasses.

This is similar to match() but specifically designed for matching against BaseNode types from the v1 system.

Returns

DecisionBranch[SourceNodeT] — A DecisionBranch for the BaseNode type

Parameters

source : type[SourceNodeT]

The BaseNode subclass to match against

matches : Callable[[Any], bool] | None Default: None

Optional custom matching function

node
def node(
    node_type: type[BaseNode[StateT, DepsT, GraphOutputT]],
) -> EdgePath[StateT, DepsT]

Create an edge path from a BaseNode class.

This method integrates v1-style BaseNode classes into the v2 graph system by analyzing their type hints and creating appropriate edges.

Returns

EdgePath[StateT, DepsT] — An EdgePath representing the node and its connections

Parameters

node_type : type[BaseNode[StateT, DepsT, GraphOutputT]]

The BaseNode subclass to integrate

Raises
  • GraphSetupError — If the node type is missing required type hints
build
def build(
    validate_graph_structure: bool = True,
) -> Graph[StateT, DepsT, GraphInputT, GraphOutputT]

Build the final executable graph from the accumulated nodes and edges.

This method performs validation, normalization, and analysis of the graph structure to create a complete, executable graph instance.

Returns

Graph[StateT, DepsT, GraphInputT, GraphOutputT] — A complete Graph instance ready for execution

Parameters

validate_graph_structure : bool Default: True

whether to perform validation of the graph structure See the docstring of _validate_graph_structure below for more details.

Raises
  • ValueError — If the graph structure is invalid (e.g., join without parent fork)