pydantic_graph.beta.join
Join operations and reducers for graph execution.
This module provides the core components for joining parallel execution paths in a graph, including various reducer types that aggregate data from multiple sources into a single output.
The state of a join during graph execution associated to a particular fork run.
Bases: Generic[StateT, DepsT]
Context information passed to reducer functions during graph execution.
The reducer context provides access to the current graph state and dependencies.
The state of the graph run.
Type: StateT
The deps for the graph run.
Type: DepsT
def cancel_sibling_tasks()
Cancel all sibling tasks created from the same fork.
You can call this if you want your join to have early-stopping behavior.
Bases: Protocol
A protocol for a type that supports adding to itself.
Bases: Generic[T]
A reducer that returns the first value it encounters, and cancels all other tasks.
def __call__(ctx: ReducerContext[object, object], current: T, inputs: T) -> T
The reducer function.
T
Bases: Generic[StateT, DepsT, InputT, OutputT]
A join operation that synchronizes and aggregates parallel execution paths.
A join defines how to combine outputs from multiple parallel execution paths
using a ReducerFunction. It specifies which fork
it joins (if any) and manages the initialization of reducers.
def as_node(inputs: None = None) -> JoinNode[StateT, DepsT]
def as_node(inputs: InputT) -> JoinNode[StateT, DepsT]
Create a step node with bound inputs.
JoinNode[StateT, DepsT] — A StepNode with this step and the bound inputs
inputs : InputT | None Default: None
The input data to bind to this step, or None
Bases: BaseNode[StateT, DepsT, Any]
A base node that represents a join item with bound inputs.
JoinNode bridges between the v1 and v2 graph execution systems by wrapping
a Join with bound inputs in a BaseNode interface.
It is not meant to be run directly but rather used to indicate transitions
to v2-style steps.
The step to execute.
Type: Join[StateT, DepsT, Any, Any]
The inputs bound to this step.
Type: Any
@async
def run(ctx: GraphRunContext[StateT, DepsT]) -> BaseNode[StateT, DepsT, Any] | End[Any]
Attempt to run the join node.
BaseNode[StateT, DepsT, Any] | End[Any] — The result of step execution
The graph execution context
NotImplementedError— Always raised as StepNode is not meant to be run directly
def reduce_null(current: None, inputs: Any) -> None
A reducer that discards all input data and returns None.
def reduce_list_append(current: list[T], inputs: T) -> list[T]
A reducer that appends to a list.
list[T]
def reduce_list_extend(current: list[T], inputs: Iterable[T]) -> list[T]
A reducer that extends a list.
list[T]
def reduce_dict_update(current: dict[K, V], inputs: Mapping[K, V]) -> dict[K, V]
A reducer that updates a dict.
dict[K, V]
def reduce_sum(current: NumericT, inputs: NumericT) -> NumericT
A reducer that sums numbers.
NumericT
A function used for reducing inputs to a join node.
Default: TypeAliasType('ReducerFunction', ContextReducerFunction[StateT, DepsT, InputT, OutputT] | PlainReducerFunction[InputT, OutputT], type_params=(StateT, DepsT, InputT, OutputT))