Skip to content

pydantic_graph.beta.graph

Core graph execution engine for the next version of the pydantic-graph library.

This module provides the main Graph class and GraphRun execution engine that handles the orchestration of nodes, edges, and parallel execution paths in the graph-based workflow system.

EndMarker

Bases: Generic[OutputT]

A marker indicating the end of graph execution with a final value.

EndMarker is used internally to signal that the graph has completed execution and carries the final output value.

JoinItem

An item representing data flowing into a join operation.

JoinItem carries input data from a parallel execution path to a join node, along with metadata about which execution ‘fork’ it originated from.

Attributes

join_id

The ID of the join node this item is targeting.

Type: JoinID

inputs

The input data for the join operation.

Type: Any

fork_stack

The stack of ForkStackItems that led to producing this join item.

Type: ForkStack

Graph

Bases: Generic[StateT, DepsT, InputT, OutputT]

A complete graph definition ready for execution.

The Graph class represents a complete workflow graph with typed inputs, outputs, state, and dependencies. It contains all nodes, edges, and metadata needed for execution.

Attributes

name

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

Type: str | None

state_type

The type of the graph state.

Type: type[StateT]

deps_type

The type of the dependencies.

Type: type[DepsT]

input_type

The type of the input data.

Type: type[InputT]

output_type

The type of the output data.

Type: type[OutputT]

auto_instrument

Whether to automatically create instrumentation spans.

Type: bool

nodes

All nodes in the graph indexed by their ID.

Type: dict[NodeID, AnyNode]

edges_by_source

Outgoing paths from each source node.

Type: dict[NodeID, list[Path]]

parent_forks

Parent fork information for each join node.

Type: dict[JoinID, ParentFork[NodeID]]

intermediate_join_nodes

For each join, the set of other joins that appear between it and its parent fork.

Used to determine which joins are “final” (have no other joins as intermediates) and which joins should preserve fork stacks when proceeding downstream.

Type: dict[JoinID, set[JoinID]]

Methods

get_parent_fork
def get_parent_fork(join_id: JoinID) -> ParentFork[NodeID]

Get the parent fork information for a join node.

Returns

ParentFork[NodeID] — The parent fork information for the join

Parameters

join_id : JoinID

The ID of the join node

Raises
  • RuntimeError — If the join ID is not found or has no parent fork
is_final_join
def is_final_join(join_id: JoinID) -> bool

Check if a join is ‘final’ (has no downstream joins with the same parent fork).

A join is non-final if it appears as an intermediate node for another join with the same parent fork.

Returns

bool — True if the join is final, False if it’s non-final

Parameters

join_id : JoinID

The ID of the join node

run

@async

def run(
    state: StateT = None,
    deps: DepsT = None,
    inputs: InputT = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> OutputT

Execute the graph and return the final output.

This is the main entry point for graph execution. It runs the graph to completion and returns the final output value.

Returns

OutputT — The final output from the graph execution

Parameters

state : StateT Default: None

The graph state instance

deps : DepsT Default: None

The dependencies instance

inputs : InputT Default: None

The input data for the graph

span : AbstractContextManager[AbstractSpan] | None Default: None

Optional span for tracing/instrumentation

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

iter

@async

def iter(
    state: StateT = None,
    deps: DepsT = None,
    inputs: InputT = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, OutputT]]

Create an iterator for step-by-step graph execution.

This method allows for more fine-grained control over graph execution, enabling inspection of intermediate states and results.

Returns

AsyncIterator[GraphRun[StateT, DepsT, OutputT]]

Parameters

state : StateT Default: None

The graph state instance

deps : DepsT Default: None

The dependencies instance

inputs : InputT Default: None

The input data for the graph

span : AbstractContextManager[AbstractSpan] | None Default: None

Optional span for tracing/instrumentation

infer_name : bool Default: True

Whether to infer the graph name from the calling frame.

render
def render(
    title: str | None = None,
    direction: StateDiagramDirection | None = None,
) -> str

Render the graph as a Mermaid diagram string.

Returns

str — A string containing the Mermaid diagram representation

Parameters

title : str | None Default: None

Optional title for the diagram

direction : StateDiagramDirection | None Default: None

Optional direction for the diagram layout

__str__
def __str__() -> str

Return a Mermaid diagram representation of the graph.

Returns

str — A string containing the Mermaid diagram of the graph

GraphTaskRequest

A request to run a task representing the execution of a node in the graph.

GraphTaskRequest encapsulates all the information needed to execute a specific node, including its inputs and the fork context it’s executing within.

Attributes

node_id

The ID of the node to execute.

Type: NodeID

inputs

The input data for the node.

Type: Any

fork_stack

Stack of forks that have been entered.

Used by the GraphRun to decide when to proceed through joins.

Type: ForkStack Default: field(repr=False)

GraphTask

Bases: GraphTaskRequest

A task representing the execution of a node in the graph.

GraphTask encapsulates all the information needed to execute a specific node, including its inputs and the fork context it’s executing within, and has a unique ID to identify the task within the graph run.

Attributes

task_id

Unique identifier for this task.

Type: TaskID Default: field(repr=False)

GraphRun

Bases: Generic[StateT, DepsT, OutputT]

A single execution instance of a graph.

GraphRun manages the execution state for a single run of a graph, including task scheduling, fork/join coordination, and result tracking.

Attributes

graph

The graph being executed.

Default: graph

state

The graph state instance.

Default: state

deps

The dependencies instance.

Default: deps

inputs

The initial input data.

Default: inputs

next_task

Get the next task(s) to be executed.

Type: EndMarker[OutputT] | Sequence[GraphTask]

output

Get the final output if the graph has completed.

Type: OutputT | None

Methods

__init__
def __init__(
    graph: Graph[StateT, DepsT, InputT, OutputT],
    state: StateT,
    deps: DepsT,
    inputs: InputT,
    traceparent: str | None,
)

Initialize a graph run.

Parameters

graph : Graph[StateT, DepsT, InputT, OutputT]

The graph to execute

state : StateT

The graph state instance

deps : DepsT

The dependencies instance

inputs : InputT

The input data for the graph

traceparent : str | None

Optional trace parent for instrumentation

__aiter__
def __aiter__() -> AsyncIterator[EndMarker[OutputT] | Sequence[GraphTask]]

Return self as an async iterator.

Returns

AsyncIterator[EndMarker[OutputT] | Sequence[GraphTask]] — Self for async iteration

__anext__

@async

def __anext__() -> EndMarker[OutputT] | Sequence[GraphTask]

Get the next item in the async iteration.

Returns

EndMarker[OutputT] | Sequence[GraphTask] — The next execution result from the graph

next

@async

def next(
    value: EndMarker[OutputT] | Sequence[GraphTaskRequest] | None = None,
) -> EndMarker[OutputT] | Sequence[GraphTask]

Advance the graph execution by one step.

This method allows for sending a value to the iterator, which is useful for resuming iteration or overriding intermediate results.

Returns

EndMarker[OutputT] | Sequence[GraphTask] — The next execution result: either an EndMarker, or sequence of GraphTasks

Parameters

value : EndMarker[OutputT] | Sequence[GraphTaskRequest] | None Default: None

Optional value to send to the iterator

StateT

Type variable for graph state.

Default: TypeVar('StateT', infer_variance=True)

DepsT

Type variable for graph dependencies.

Default: TypeVar('DepsT', infer_variance=True)

InputT

Type variable for graph inputs.

Default: TypeVar('InputT', infer_variance=True)

OutputT

Type variable for graph outputs.

Default: TypeVar('OutputT', infer_variance=True)