Skip to content

pydantic_graph.beta

The next version of the pydantic-graph framework with enhanced graph execution capabilities.

This module provides a parallel control flow graph execution framework with support for:

  • ‘Step’ nodes for task execution
  • ‘Decision’ nodes for conditional branching
  • ‘Fork’ nodes for parallel execution coordination
  • ‘Join’ nodes and ‘Reducer’s for re-joining parallel executions
  • Mermaid diagram generation for graph visualization

TypeExpression

Bases: Generic[T]

A workaround for type checker limitations when using complex type expressions.

This class serves as a wrapper for types that cannot normally be used in positions requiring type[T], such as Any, Union[...], or Literal[...]. It provides a way to pass these complex type expressions to functions expecting concrete types.

StepContext

Bases: Generic[StateT, DepsT, InputT]

Context information passed to step functions during graph execution.

The step context provides access to the current graph state, dependencies, and input data for a step.

Attributes

inputs

The input data for this step.

This must be a property to ensure correct variance behavior

Type: InputT

StartNode

Bases: Generic[OutputT]

Entry point node for graph execution.

The StartNode represents the beginning of a graph execution flow.

Attributes

id

Fixed identifier for the start node.

Default: NodeID('__start__')

EndNode

Bases: Generic[InputT]

Terminal node representing the completion of graph execution.

The EndNode marks the successful completion of a graph execution flow and can collect the final output data.

Attributes

id

Fixed identifier for the end node.

Default: NodeID('__end__')

GraphBuilder

Bases: Generic[StateT, DepsT, GraphInputT, GraphOutputT]

A builder for constructing executable graph definitions.

GraphBuilder provides a fluent interface for defining nodes, edges, and routing in a graph workflow. It supports typed state, dependencies, and input/output validation.

Attributes

name

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

Type: str | None Default: name

state_type

The type of the graph state.

Type: TypeOrTypeExpression[StateT] Default: state_type

deps_type

The type of the dependencies.

Type: TypeOrTypeExpression[DepsT] Default: deps_type

input_type

The type of the graph input data.

Type: TypeOrTypeExpression[GraphInputT] Default: input_type

output_type

The type of the graph output data.

Type: TypeOrTypeExpression[GraphOutputT] Default: output_type

auto_instrument

Whether to automatically create instrumentation spans.

Type: bool Default: auto_instrument

start_node

Get the start node for the graph.

Type: StartNode[GraphInputT]

end_node

Get the end node for the graph.

Type: EndNode[GraphOutputT]

Methods

__init__
def __init__(
    name: str | None = None,
    state_type: TypeOrTypeExpression[StateT] = NoneType,
    deps_type: TypeOrTypeExpression[DepsT] = NoneType,
    input_type: TypeOrTypeExpression[GraphInputT] = NoneType,
    output_type: TypeOrTypeExpression[GraphOutputT] = NoneType,
    auto_instrument: bool = True,
)

Initialize a graph builder.

Parameters

name : str | None Default: None

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

state_type : TypeOrTypeExpression[StateT] Default: NoneType

The type of the graph state

deps_type : TypeOrTypeExpression[DepsT] Default: NoneType

The type of the dependencies

input_type : TypeOrTypeExpression[GraphInputT] Default: NoneType

The type of the graph input data

output_type : TypeOrTypeExpression[GraphOutputT] Default: NoneType

The type of the graph output data

auto_instrument : bool Default: True

Whether to automatically create instrumentation spans

step
def step(
    node_id: str | None = None,
    label: str | None = None,
) -> Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]]
def step(
    call: StepFunction[StateT, DepsT, InputT, OutputT],
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, OutputT]

Create a step from a step function.

This method can be used as a decorator or called directly to create a step node from an async function.

Returns

Step[StateT, DepsT, InputT, OutputT] | Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]] — Either a Step instance or a decorator function

Parameters

call : StepFunction[StateT, DepsT, InputT, OutputT] | None Default: None

The step function to wrap

node_id : str | None Default: None

Optional ID for the node

label : str | None Default: None

Optional human-readable label

stream
def stream(
    node_id: str | None = None,
    label: str | None = None,
) -> Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
def stream(
    call: StreamFunction[StateT, DepsT, InputT, OutputT],
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]
def stream(
    call: StreamFunction[StateT, DepsT, InputT, OutputT] | None = None,
    node_id: str | None = None,
    label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]

Create a step from an async iterator (which functions like a “stream”).

This method can be used as a decorator or called directly to create a step node from an async function.

Returns

Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]] — Either a Step instance or a decorator function

Parameters

call : StreamFunction[StateT, DepsT, InputT, OutputT] | None Default: None

The step function to wrap

node_id : str | None Default: None

Optional ID for the node

label : str | None Default: None

Optional human-readable label

add
def add(edges: EdgePath[StateT, DepsT] = ()) -> None

Add one or more edge paths to the graph.

This method processes edge paths and automatically creates any necessary fork nodes for broadcasts and maps.

Returns

None

Parameters

*edges : EdgePath[StateT, DepsT] Default: ()

The edge paths to add to the graph

add_edge
def add_edge(
    source: Source[T],
    destination: Destination[T],
    label: str | None = None,
) -> None

Add a simple edge between two nodes.

Returns

None

Parameters

source : Source[T]

The source node

destination : Destination[T]

The destination node

label : str | None Default: None

Optional label for the edge

add_mapping_edge
def add_mapping_edge(
    source: Source[Iterable[T]],
    map_to: Destination[T],
    pre_map_label: str | None = None,
    post_map_label: str | None = None,
    fork_id: ForkID | None = None,
    downstream_join_id: JoinID | None = None,
) -> None

Add an edge that maps iterable data across parallel paths.

Returns

None

Parameters

source : Source[Iterable[T]]

The source node that produces iterable data

map_to : Destination[T]

The destination node that receives individual items

pre_map_label : str | None Default: None

Optional label before the map operation

post_map_label : str | None Default: None

Optional label after the map operation

fork_id : ForkID | None Default: None

Optional ID for the fork node produced for this map operation

downstream_join_id : JoinID | None Default: None

Optional ID of a join node that will always be downstream of this map. Specifying this ensures correct handling if you try to map an empty iterable.

edge_from
def edge_from(
    sources: Source[SourceOutputT] = (),
) -> EdgePathBuilder[StateT, DepsT, SourceOutputT]

Create an edge path builder starting from the given source nodes.

Returns

EdgePathBuilder[StateT, DepsT, SourceOutputT] — An EdgePathBuilder for constructing the complete edge path

Parameters

*sources : Source[SourceOutputT] Default: ()

The source nodes to start the edge path from

decision
def decision(
    note: str | None = None,
    node_id: str | None = None,
) -> Decision[StateT, DepsT, Never]

Create a new decision node.

Returns

Decision[StateT, DepsT, Never] — A new Decision node with no branches

Parameters

note : str | None Default: None

Optional note to describe the decision logic

node_id : str | None Default: None

Optional ID for the node produced for this decision logic

match
def match(
    source: TypeOrTypeExpression[SourceT],
    matches: Callable[[Any], bool] | None = None,
) -> DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never]

Create a decision branch matcher.

Returns

DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never] — A DecisionBranchBuilder for constructing the branch

Parameters

source : TypeOrTypeExpression[SourceT]

The type or type expression to match against

matches : Callable[[Any], bool] | None Default: None

Optional custom matching function

match_node
def match_node(
    source: type[SourceNodeT],
    matches: Callable[[Any], bool] | None = None,
) -> DecisionBranch[SourceNodeT]

Create a decision branch for BaseNode subclasses.

This is similar to match() but specifically designed for matching against BaseNode types from the v1 system.

Returns

DecisionBranch[SourceNodeT] — A DecisionBranch for the BaseNode type

Parameters

source : type[SourceNodeT]

The BaseNode subclass to match against

matches : Callable[[Any], bool] | None Default: None

Optional custom matching function

node
def node(
    node_type: type[BaseNode[StateT, DepsT, GraphOutputT]],
) -> EdgePath[StateT, DepsT]

Create an edge path from a BaseNode class.

This method integrates v1-style BaseNode classes into the v2 graph system by analyzing their type hints and creating appropriate edges.

Returns

EdgePath[StateT, DepsT] — An EdgePath representing the node and its connections

Parameters

node_type : type[BaseNode[StateT, DepsT, GraphOutputT]]

The BaseNode subclass to integrate

Raises
  • GraphSetupError — If the node type is missing required type hints
build
def build(
    validate_graph_structure: bool = True,
) -> Graph[StateT, DepsT, GraphInputT, GraphOutputT]

Build the final executable graph from the accumulated nodes and edges.

This method performs validation, normalization, and analysis of the graph structure to create a complete, executable graph instance.

Returns

Graph[StateT, DepsT, GraphInputT, GraphOutputT] — A complete Graph instance ready for execution

Parameters

validate_graph_structure : bool Default: True

whether to perform validation of the graph structure See the docstring of _validate_graph_structure below for more details.

Raises
  • ValueError — If the graph structure is invalid (e.g., join without parent fork)

Graph

Bases: Generic[StateT, DepsT, InputT, OutputT]

A complete graph definition ready for execution.

The Graph class represents a complete workflow graph with typed inputs, outputs, state, and dependencies. It contains all nodes, edges, and metadata needed for execution.

Attributes

name

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

Type: str | None

state_type

The type of the graph state.

Type: type[StateT]

deps_type

The type of the dependencies.

Type: type[DepsT]

input_type

The type of the input data.

Type: type[InputT]

output_type

The type of the output data.

Type: type[OutputT]

auto_instrument

Whether to automatically create instrumentation spans.

Type: bool

nodes

All nodes in the graph indexed by their ID.

Type: dict[NodeID, AnyNode]

edges_by_source

Outgoing paths from each source node.

Type: dict[NodeID, list[Path]]

parent_forks

Parent fork information for each join node.

Type: dict[JoinID, ParentFork[NodeID]]

intermediate_join_nodes

For each join, the set of other joins that appear between it and its parent fork.

Used to determine which joins are “final” (have no other joins as intermediates) and which joins should preserve fork stacks when proceeding downstream.

Type: dict[JoinID, set[JoinID]]

Methods

get_parent_fork
def get_parent_fork(join_id: JoinID) -> ParentFork[NodeID]

Get the parent fork information for a join node.

Returns

ParentFork[NodeID] — The parent fork information for the join

Parameters

join_id : JoinID

The ID of the join node

Raises
  • RuntimeError — If the join ID is not found or has no parent fork
is_final_join
def is_final_join(join_id: JoinID) -> bool

Check if a join is ‘final’ (has no downstream joins with the same parent fork).

A join is non-final if it appears as an intermediate node for another join with the same parent fork.

Returns

bool — True if the join is final, False if it’s non-final

Parameters

join_id : JoinID

The ID of the join node

run

@async

def run(
    state: StateT = None,
    deps: DepsT = None,
    inputs: InputT = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> OutputT

Execute the graph and return the final output.

This is the main entry point for graph execution. It runs the graph to completion and returns the final output value.

Returns

OutputT — The final output from the graph execution

Parameters

state : StateT Default: None

The graph state instance

deps : DepsT Default: None

The dependencies instance

inputs : InputT Default: None

The input data for the graph

span : AbstractContextManager[AbstractSpan] | None Default: None

Optional span for tracing/instrumentation

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

iter

@async

def iter(
    state: StateT = None,
    deps: DepsT = None,
    inputs: InputT = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, OutputT]]

Create an iterator for step-by-step graph execution.

This method allows for more fine-grained control over graph execution, enabling inspection of intermediate states and results.

Returns

AsyncIterator[GraphRun[StateT, DepsT, OutputT]]

Parameters

state : StateT Default: None

The graph state instance

deps : DepsT Default: None

The dependencies instance

inputs : InputT Default: None

The input data for the graph

span : AbstractContextManager[AbstractSpan] | None Default: None

Optional span for tracing/instrumentation

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

render
def render(
    title: str | None = None,
    direction: StateDiagramDirection | None = None,
) -> str

Render the graph as a Mermaid diagram string.

Returns

str — A string containing the Mermaid diagram representation

Parameters

title : str | None Default: None

Optional title for the diagram

direction : StateDiagramDirection | None Default: None

Optional direction for the diagram layout

__str__
def __str__() -> str

Return a Mermaid diagram representation of the graph.

Returns

str — A string containing the Mermaid diagram of the graph

StepNode

Bases: BaseNode[StateT, DepsT, Any]

A base node that represents a step with bound inputs.

StepNode bridges between the v1 and v2 graph execution systems by wrapping a Step with bound inputs in a BaseNode interface. It is not meant to be run directly but rather used to indicate transitions to v2-style steps.

Attributes

step

The step to execute.

Type: Step[StateT, DepsT, Any, Any]

inputs

The inputs bound to this step.

Type: Any

Methods

run

@async

def run(ctx: GraphRunContext[StateT, DepsT]) -> BaseNode[StateT, DepsT, Any] | End[Any]

Attempt to run the step node.

Returns

BaseNode[StateT, DepsT, Any] | End[Any] — The result of step execution

Parameters

ctx : GraphRunContext[StateT, DepsT]

The graph execution context

Raises
  • NotImplementedError — Always raised as StepNode is not meant to be run directly