pydantic_graph.beta.graph
Core graph execution engine for the next version of the pydantic-graph library.
This module provides the main Graph class and GraphRun execution engine that
handles the orchestration of nodes, edges, and parallel execution paths in
the graph-based workflow system.
Bases: Generic[OutputT]
A marker indicating the end of graph execution with a final value.
EndMarker is used internally to signal that the graph has completed execution and carries the final output value.
An item representing data flowing into a join operation.
JoinItem carries input data from a parallel execution path to a join node, along with metadata about which execution ‘fork’ it originated from.
The ID of the join node this item is targeting.
Type: JoinID
The input data for the join operation.
Type: Any
The stack of ForkStackItems that led to producing this join item.
Type: ForkStack
Bases: Generic[StateT, DepsT, InputT, OutputT]
A complete graph definition ready for execution.
The Graph class represents a complete workflow graph with typed inputs, outputs, state, and dependencies. It contains all nodes, edges, and metadata needed for execution.
Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.
The type of the graph state.
Type: type[StateT]
The type of the dependencies.
Type: type[DepsT]
The type of the input data.
Type: type[InputT]
The type of the output data.
Type: type[OutputT]
Whether to automatically create instrumentation spans.
Type: bool
All nodes in the graph indexed by their ID.
Type: dict[NodeID, AnyNode]
Outgoing paths from each source node.
Type: dict[NodeID, list[Path]]
Parent fork information for each join node.
Type: dict[JoinID, ParentFork[NodeID]]
For each join, the set of other joins that appear between it and its parent fork.
Used to determine which joins are “final” (have no other joins as intermediates) and which joins should preserve fork stacks when proceeding downstream.
Type: dict[JoinID, set[JoinID]]
def get_parent_fork(join_id: JoinID) -> ParentFork[NodeID]
Get the parent fork information for a join node.
ParentFork[NodeID] — The parent fork information for the join
The ID of the join node
RuntimeError— If the join ID is not found or has no parent fork
def is_final_join(join_id: JoinID) -> bool
Check if a join is ‘final’ (has no downstream joins with the same parent fork).
A join is non-final if it appears as an intermediate node for another join with the same parent fork.
bool — True if the join is final, False if it’s non-final
The ID of the join node
@async
def run(
state: StateT = None,
deps: DepsT = None,
inputs: InputT = None,
span: AbstractContextManager[AbstractSpan] | None = None,
infer_name: bool = True,
) -> OutputT
Execute the graph and return the final output.
This is the main entry point for graph execution. It runs the graph to completion and returns the final output value.
OutputT — The final output from the graph execution
The graph state instance
The dependencies instance
The input data for the graph
span : AbstractContextManager[AbstractSpan] | None Default: None
Optional span for tracing/instrumentation
infer_name : bool Default: True
Whether to infer the graph name from the calling frame.
@async
def iter(
state: StateT = None,
deps: DepsT = None,
inputs: InputT = None,
span: AbstractContextManager[AbstractSpan] | None = None,
infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, OutputT]]
Create an iterator for step-by-step graph execution.
This method allows for more fine-grained control over graph execution, enabling inspection of intermediate states and results.
AsyncIterator[GraphRun[StateT, DepsT, OutputT]]
The graph state instance
The dependencies instance
The input data for the graph
span : AbstractContextManager[AbstractSpan] | None Default: None
Optional span for tracing/instrumentation
infer_name : bool Default: True
Whether to infer the graph name from the calling frame.
def render(
title: str | None = None,
direction: StateDiagramDirection | None = None,
) -> str
Render the graph as a Mermaid diagram string.
str — A string containing the Mermaid diagram representation
Optional title for the diagram
direction : StateDiagramDirection | None Default: None
Optional direction for the diagram layout
def __str__() -> str
Return a Mermaid diagram representation of the graph.
str — A string containing the Mermaid diagram of the graph
A request to run a task representing the execution of a node in the graph.
GraphTaskRequest encapsulates all the information needed to execute a specific node, including its inputs and the fork context it’s executing within.
The ID of the node to execute.
Type: NodeID
The input data for the node.
Type: Any
Stack of forks that have been entered.
Used by the GraphRun to decide when to proceed through joins.
Type: ForkStack Default: field(repr=False)
Bases: GraphTaskRequest
A task representing the execution of a node in the graph.
GraphTask encapsulates all the information needed to execute a specific node, including its inputs and the fork context it’s executing within, and has a unique ID to identify the task within the graph run.
Unique identifier for this task.
Type: TaskID Default: field(repr=False)
Bases: Generic[StateT, DepsT, OutputT]
A single execution instance of a graph.
GraphRun manages the execution state for a single run of a graph, including task scheduling, fork/join coordination, and result tracking.
The graph being executed.
Default: graph
The graph state instance.
Default: state
The dependencies instance.
Default: deps
The initial input data.
Default: inputs
Get the next task(s) to be executed.
Type: EndMarker[OutputT] | Sequence[GraphTask]
Get the final output if the graph has completed.
Type: OutputT | None
def __init__(
graph: Graph[StateT, DepsT, InputT, OutputT],
state: StateT,
deps: DepsT,
inputs: InputT,
traceparent: str | None,
)
Initialize a graph run.
The graph to execute
The graph state instance
The dependencies instance
The input data for the graph
Optional trace parent for instrumentation
def __aiter__() -> AsyncIterator[EndMarker[OutputT] | Sequence[GraphTask]]
Return self as an async iterator.
AsyncIterator[EndMarker[OutputT] | Sequence[GraphTask]] — Self for async iteration
@async
def __anext__() -> EndMarker[OutputT] | Sequence[GraphTask]
Get the next item in the async iteration.
EndMarker[OutputT] | Sequence[GraphTask] — The next execution result from the graph
@async
def next(
value: EndMarker[OutputT] | Sequence[GraphTaskRequest] | None = None,
) -> EndMarker[OutputT] | Sequence[GraphTask]
Advance the graph execution by one step.
This method allows for sending a value to the iterator, which is useful for resuming iteration or overriding intermediate results.
EndMarker[OutputT] | Sequence[GraphTask] — The next execution result: either an EndMarker, or sequence of GraphTasks
Optional value to send to the iterator
Type variable for graph state.
Default: TypeVar('StateT', infer_variance=True)
Type variable for graph dependencies.
Default: TypeVar('DepsT', infer_variance=True)
Type variable for graph inputs.
Default: TypeVar('InputT', infer_variance=True)
Type variable for graph outputs.
Default: TypeVar('OutputT', infer_variance=True)