pydantic_graph.beta
The next version of the pydantic-graph framework with enhanced graph execution capabilities.
This module provides a parallel control flow graph execution framework with support for:
- ‘Step’ nodes for task execution
- ‘Decision’ nodes for conditional branching
- ‘Fork’ nodes for parallel execution coordination
- ‘Join’ nodes and ‘Reducer’s for re-joining parallel executions
- Mermaid diagram generation for graph visualization
Bases: Generic[T]
A workaround for type checker limitations when using complex type expressions.
This class serves as a wrapper for types that cannot normally be used in positions
requiring type[T], such as Any, Union[...], or Literal[...]. It provides a
way to pass these complex type expressions to functions expecting concrete types.
Bases: Generic[StateT, DepsT, InputT]
Context information passed to step functions during graph execution.
The step context provides access to the current graph state, dependencies, and input data for a step.
The input data for this step.
This must be a property to ensure correct variance behavior
Type: InputT
Bases: Generic[OutputT]
Entry point node for graph execution.
The StartNode represents the beginning of a graph execution flow.
Fixed identifier for the start node.
Default: NodeID('__start__')
Bases: Generic[InputT]
Terminal node representing the completion of graph execution.
The EndNode marks the successful completion of a graph execution flow and can collect the final output data.
Fixed identifier for the end node.
Default: NodeID('__end__')
Bases: Generic[StateT, DepsT, GraphInputT, GraphOutputT]
A builder for constructing executable graph definitions.
GraphBuilder provides a fluent interface for defining nodes, edges, and routing in a graph workflow. It supports typed state, dependencies, and input/output validation.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
Type: str | None Default: name
The type of the graph state.
Type: TypeOrTypeExpression[StateT] Default: state_type
The type of the dependencies.
Type: TypeOrTypeExpression[DepsT] Default: deps_type
The type of the graph input data.
Type: TypeOrTypeExpression[GraphInputT] Default: input_type
The type of the graph output data.
Type: TypeOrTypeExpression[GraphOutputT] Default: output_type
Whether to automatically create instrumentation spans.
Type: bool Default: auto_instrument
Get the start node for the graph.
Type: StartNode[GraphInputT]
Get the end node for the graph.
Type: EndNode[GraphOutputT]
def __init__(
name: str | None = None,
state_type: TypeOrTypeExpression[StateT] = NoneType,
deps_type: TypeOrTypeExpression[DepsT] = NoneType,
input_type: TypeOrTypeExpression[GraphInputT] = NoneType,
output_type: TypeOrTypeExpression[GraphOutputT] = NoneType,
auto_instrument: bool = True,
)
Initialize a graph builder.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
The type of the graph state
The type of the dependencies
The type of the graph input data
The type of the graph output data
auto_instrument : bool Default: True
Whether to automatically create instrumentation spans
def step(
node_id: str | None = None,
label: str | None = None,
) -> Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]]
def step(
call: StepFunction[StateT, DepsT, InputT, OutputT],
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, OutputT]
Create a step from a step function.
This method can be used as a decorator or called directly to create a step node from an async function.
Step[StateT, DepsT, InputT, OutputT] | Callable[[StepFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, OutputT]] — Either a Step instance or a decorator function
call : StepFunction[StateT, DepsT, InputT, OutputT] | None Default: None
The step function to wrap
Optional ID for the node
Optional human-readable label
def stream(
node_id: str | None = None,
label: str | None = None,
) -> Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
def stream(
call: StreamFunction[StateT, DepsT, InputT, OutputT],
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]
def stream(
call: StreamFunction[StateT, DepsT, InputT, OutputT] | None = None,
node_id: str | None = None,
label: str | None = None,
) -> Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]]
Create a step from an async iterator (which functions like a “stream”).
This method can be used as a decorator or called directly to create a step node from an async function.
Step[StateT, DepsT, InputT, AsyncIterable[OutputT]] | Callable[[StreamFunction[StateT, DepsT, InputT, OutputT]], Step[StateT, DepsT, InputT, AsyncIterable[OutputT]]] — Either a Step instance or a decorator function
call : StreamFunction[StateT, DepsT, InputT, OutputT] | None Default: None
The step function to wrap
Optional ID for the node
Optional human-readable label
def add(edges: EdgePath[StateT, DepsT] = ()) -> None
Add one or more edge paths to the graph.
This method processes edge paths and automatically creates any necessary fork nodes for broadcasts and maps.
The edge paths to add to the graph
def add_edge(
source: Source[T],
destination: Destination[T],
label: str | None = None,
) -> None
Add a simple edge between two nodes.
The source node
The destination node
Optional label for the edge
def add_mapping_edge(
source: Source[Iterable[T]],
map_to: Destination[T],
pre_map_label: str | None = None,
post_map_label: str | None = None,
fork_id: ForkID | None = None,
downstream_join_id: JoinID | None = None,
) -> None
Add an edge that maps iterable data across parallel paths.
source : Source[Iterable[T]]
The source node that produces iterable data
The destination node that receives individual items
Optional label before the map operation
Optional label after the map operation
fork_id : ForkID | None Default: None
Optional ID for the fork node produced for this map operation
downstream_join_id : JoinID | None Default: None
Optional ID of a join node that will always be downstream of this map. Specifying this ensures correct handling if you try to map an empty iterable.
def edge_from(
sources: Source[SourceOutputT] = (),
) -> EdgePathBuilder[StateT, DepsT, SourceOutputT]
Create an edge path builder starting from the given source nodes.
EdgePathBuilder[StateT, DepsT, SourceOutputT] — An EdgePathBuilder for constructing the complete edge path
The source nodes to start the edge path from
def decision(
note: str | None = None,
node_id: str | None = None,
) -> Decision[StateT, DepsT, Never]
Create a new decision node.
Decision[StateT, DepsT, Never] — A new Decision node with no branches
Optional note to describe the decision logic
Optional ID for the node produced for this decision logic
def match(
source: TypeOrTypeExpression[SourceT],
matches: Callable[[Any], bool] | None = None,
) -> DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never]
Create a decision branch matcher.
DecisionBranchBuilder[StateT, DepsT, SourceT, SourceT, Never] — A DecisionBranchBuilder for constructing the branch
The type or type expression to match against
Optional custom matching function
def match_node(
source: type[SourceNodeT],
matches: Callable[[Any], bool] | None = None,
) -> DecisionBranch[SourceNodeT]
Create a decision branch for BaseNode subclasses.
This is similar to match() but specifically designed for matching against BaseNode types from the v1 system.
DecisionBranch[SourceNodeT] — A DecisionBranch for the BaseNode type
source : type[SourceNodeT]
The BaseNode subclass to match against
Optional custom matching function
def node(
node_type: type[BaseNode[StateT, DepsT, GraphOutputT]],
) -> EdgePath[StateT, DepsT]
Create an edge path from a BaseNode class.
This method integrates v1-style BaseNode classes into the v2 graph system by analyzing their type hints and creating appropriate edges.
EdgePath[StateT, DepsT] — An EdgePath representing the node and its connections
node_type : type[BaseNode[StateT, DepsT, GraphOutputT]]
The BaseNode subclass to integrate
GraphSetupError— If the node type is missing required type hints
def build(
validate_graph_structure: bool = True,
) -> Graph[StateT, DepsT, GraphInputT, GraphOutputT]
Build the final executable graph from the accumulated nodes and edges.
This method performs validation, normalization, and analysis of the graph structure to create a complete, executable graph instance.
Graph[StateT, DepsT, GraphInputT, GraphOutputT] — A complete Graph instance ready for execution
validate_graph_structure : bool Default: True
whether to perform validation of the graph structure See the docstring of _validate_graph_structure below for more details.
ValueError— If the graph structure is invalid (e.g., join without parent fork)
Bases: Generic[StateT, DepsT, InputT, OutputT]
A complete graph definition ready for execution.
The Graph class represents a complete workflow graph with typed inputs, outputs, state, and dependencies. It contains all nodes, edges, and metadata needed for execution.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
The type of the graph state.
Type: type[StateT]
The type of the dependencies.
Type: type[DepsT]
The type of the input data.
Type: type[InputT]
The type of the output data.
Type: type[OutputT]
Whether to automatically create instrumentation spans.
Type: bool
All nodes in the graph indexed by their ID.
Type: dict[NodeID, AnyNode]
Outgoing paths from each source node.
Type: dict[NodeID, list[Path]]
Parent fork information for each join node.
Type: dict[JoinID, ParentFork[NodeID]]
For each join, the set of other joins that appear between it and its parent fork.
Used to determine which joins are “final” (have no other joins as intermediates) and which joins should preserve fork stacks when proceeding downstream.
Type: dict[JoinID, set[JoinID]]
def get_parent_fork(join_id: JoinID) -> ParentFork[NodeID]
Get the parent fork information for a join node.
ParentFork[NodeID] — The parent fork information for the join
The ID of the join node
RuntimeError— If the join ID is not found or has no parent fork
def is_final_join(join_id: JoinID) -> bool
Check if a join is ‘final’ (has no downstream joins with the same parent fork).
A join is non-final if it appears as an intermediate node for another join with the same parent fork.
bool — True if the join is final, False if it’s non-final
The ID of the join node
@async
def run(
state: StateT = None,
deps: DepsT = None,
inputs: InputT = None,
span: AbstractContextManager[AbstractSpan] | None = None,
infer_name: bool = True,
) -> OutputT
Execute the graph and return the final output.
This is the main entry point for graph execution. It runs the graph to completion and returns the final output value.
OutputT — The final output from the graph execution
The graph state instance
The dependencies instance
The input data for the graph
span : AbstractContextManager[AbstractSpan] | None Default: None
Optional span for tracing/instrumentation
infer_name : bool Default: True
Whether to infer the graph name from the calling frame.
@async
def iter(
state: StateT = None,
deps: DepsT = None,
inputs: InputT = None,
span: AbstractContextManager[AbstractSpan] | None = None,
infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, OutputT]]
Create an iterator for step-by-step graph execution.
This method allows for more fine-grained control over graph execution, enabling inspection of intermediate states and results.
AsyncIterator[GraphRun[StateT, DepsT, OutputT]]
The graph state instance
The dependencies instance
The input data for the graph
span : AbstractContextManager[AbstractSpan] | None Default: None
Optional span for tracing/instrumentation
infer_name : bool Default: True
Whether to infer the graph name from the calling frame.
def render(
title: str | None = None,
direction: StateDiagramDirection | None = None,
) -> str
Render the graph as a Mermaid diagram string.
str — A string containing the Mermaid diagram representation
Optional title for the diagram
direction : StateDiagramDirection | None Default: None
Optional direction for the diagram layout
def __str__() -> str
Return a Mermaid diagram representation of the graph.
str — A string containing the Mermaid diagram of the graph
Bases: BaseNode[StateT, DepsT, Any]
A base node that represents a step with bound inputs.
StepNode bridges between the v1 and v2 graph execution systems by wrapping
a Step with bound inputs in a BaseNode interface.
It is not meant to be run directly but rather used to indicate transitions
to v2-style steps.
The step to execute.
Type: Step[StateT, DepsT, Any, Any]
The inputs bound to this step.
Type: Any
@async
def run(ctx: GraphRunContext[StateT, DepsT]) -> BaseNode[StateT, DepsT, Any] | End[Any]
Attempt to run the step node.
BaseNode[StateT, DepsT, Any] | End[Any] — The result of step execution
The graph execution context
NotImplementedError— Always raised as StepNode is not meant to be run directly