Skip to content

pydantic_graph.beta.join

Join operations and reducers for graph execution.

This module provides the core components for joining parallel execution paths in a graph, including various reducer types that aggregate data from multiple sources into a single output.

JoinState

The state of a join during graph execution associated to a particular fork run.

ReducerContext

Bases: Generic[StateT, DepsT]

Context information passed to reducer functions during graph execution.

The reducer context provides access to the current graph state and dependencies.

Attributes

state

The state of the graph run.

Type: StateT

deps

The deps for the graph run.

Type: DepsT

Methods

cancel_sibling_tasks
def cancel_sibling_tasks()

Cancel all sibling tasks created from the same fork.

You can call this if you want your join to have early-stopping behavior.

SupportsSum

Bases: Protocol

A protocol for a type that supports adding to itself.

ReduceFirstValue

Bases: Generic[T]

A reducer that returns the first value it encounters, and cancels all other tasks.

Methods

__call__
def __call__(ctx: ReducerContext[object, object], current: T, inputs: T) -> T

The reducer function.

Returns

T

Join

Bases: Generic[StateT, DepsT, InputT, OutputT]

A join operation that synchronizes and aggregates parallel execution paths.

A join defines how to combine outputs from multiple parallel execution paths using a ReducerFunction. It specifies which fork it joins (if any) and manages the initialization of reducers.

Methods

as_node
def as_node(inputs: None = None) -> JoinNode[StateT, DepsT]
def as_node(inputs: InputT) -> JoinNode[StateT, DepsT]

Create a step node with bound inputs.

Returns

JoinNode[StateT, DepsT] — A StepNode with this step and the bound inputs

Parameters

inputs : InputT | None Default: None

The input data to bind to this step, or None

JoinNode

Bases: BaseNode[StateT, DepsT, Any]

A base node that represents a join item with bound inputs.

JoinNode bridges between the v1 and v2 graph execution systems by wrapping a Join with bound inputs in a BaseNode interface. It is not meant to be run directly but rather used to indicate transitions to v2-style steps.

Attributes

join

The step to execute.

Type: Join[StateT, DepsT, Any, Any]

inputs

The inputs bound to this step.

Type: Any

Methods

run

@async

def run(ctx: GraphRunContext[StateT, DepsT]) -> BaseNode[StateT, DepsT, Any] | End[Any]

Attempt to run the join node.

Returns

BaseNode[StateT, DepsT, Any] | End[Any] — The result of step execution

Parameters

ctx : GraphRunContext[StateT, DepsT]

The graph execution context

Raises
  • NotImplementedError — Always raised as StepNode is not meant to be run directly

reduce_null

def reduce_null(current: None, inputs: Any) -> None

A reducer that discards all input data and returns None.

Returns

None

reduce_list_append

def reduce_list_append(current: list[T], inputs: T) -> list[T]

A reducer that appends to a list.

Returns

list[T]

reduce_list_extend

def reduce_list_extend(current: list[T], inputs: Iterable[T]) -> list[T]

A reducer that extends a list.

Returns

list[T]

reduce_dict_update

def reduce_dict_update(current: dict[K, V], inputs: Mapping[K, V]) -> dict[K, V]

A reducer that updates a dict.

Returns

dict[K, V]

reduce_sum

def reduce_sum(current: NumericT, inputs: NumericT) -> NumericT

A reducer that sums numbers.

Returns

NumericT

ReducerFunction

A function used for reducing inputs to a join node.

Default: TypeAliasType('ReducerFunction', ContextReducerFunction[StateT, DepsT, InputT, OutputT] | PlainReducerFunction[InputT, OutputT], type_params=(StateT, DepsT, InputT, OutputT))