Skip to content

pydantic_graph.persistence

NodeSnapshot

Bases: Generic[StateT, RunEndT]

History step describing the execution of a node in a graph.

Attributes

state

The state of the graph before the node is run.

Type: StateT

node

The node to run next.

Type: Annotated[BaseNode[StateT, Any, RunEndT], _utils.CustomNodeSchema()]

start_ts

The timestamp when the node started running, None until the run starts.

Type: datetime | None Default: None

duration

The duration of the node run in seconds, if the node has been run.

Type: float | None Default: None

status

The status of the snapshot.

Type: SnapshotStatus Default: 'created'

kind

The kind of history step, can be used as a discriminator when deserializing history.

Type: Literal[‘node’] Default: 'node'

id

Unique ID of the snapshot.

Type: str Default: UNSET_SNAPSHOT_ID

EndSnapshot

Bases: Generic[StateT, RunEndT]

History step describing the end of a graph run.

Attributes

state

The state of the graph at the end of the run.

Type: StateT

result

The result of the graph run.

Type: End[RunEndT]

ts

The timestamp when the graph run ended.

Type: datetime Default: field(default_factory=(_utils.now_utc))

kind

The kind of history step, can be used as a discriminator when deserializing history.

Type: Literal[‘end’] Default: 'end'

id

Unique ID of the snapshot.

Type: str Default: UNSET_SNAPSHOT_ID

node

Shim to get the result.

Useful to allow [snapshot.node for snapshot in persistence.history].

Type: End[RunEndT]

BaseStatePersistence

Bases: ABC, Generic[StateT, RunEndT]

Abstract base class for storing the state of a graph run.

Each instance of a BaseStatePersistence subclass should be used for a single graph run.

Methods

snapshot_node

@abstractmethod

@async

def snapshot_node(state: StateT, next_node: BaseNode[StateT, Any, RunEndT]) -> None

Snapshot the state of a graph, when the next step is to run a node.

This method should add a NodeSnapshot to persistence.

Returns

None

Parameters

state : StateT

The state of the graph.

next_node : BaseNode[StateT, Any, RunEndT]

The next node to run.

snapshot_node_if_new

@abstractmethod

@async

def snapshot_node_if_new(
    snapshot_id: str,
    state: StateT,
    next_node: BaseNode[StateT, Any, RunEndT],
) -> None

Snapshot the state of a graph if the snapshot ID doesn’t already exist in persistence.

This method will generally call snapshot_node but should do so in an atomic way.

Returns

None

Parameters

snapshot_id : str

The ID of the snapshot to check.

state : StateT

The state of the graph.

next_node : BaseNode[StateT, Any, RunEndT]

The next node to run.

snapshot_end

@abstractmethod

@async

def snapshot_end(state: StateT, end: End[RunEndT]) -> None

Snapshot the state of a graph when the graph has ended.

This method should add an EndSnapshot to persistence.

Returns

None

Parameters

state : StateT

The state of the graph.

end : End[RunEndT]

data from the end of the run.

record_run

@abstractmethod

def record_run(snapshot_id: str) -> AbstractAsyncContextManager[None]

Record the run of the node, or error if the node is already running.

In particular this should set:

Returns

AbstractAsyncContextManager[None] — An async context manager that records the run of the node.

Parameters

snapshot_id : str

The ID of the snapshot to record.

Raises
  • GraphNodeRunningError — if the node status it not 'created' or 'pending'.
  • LookupError — if the snapshot ID is not found in persistence.
load_next

@abstractmethod

@async

def load_next() -> NodeSnapshot[StateT, RunEndT] | None

Retrieve a node snapshot with status 'created’ and set its status to 'pending'.

This is used by Graph.iter_from_persistence to get the next node to run.

Returns: The snapshot, or None if no snapshot with status 'created’ exists.

Returns

NodeSnapshot[StateT, RunEndT] | None

load_all

@abstractmethod

@async

def load_all() -> list[Snapshot[StateT, RunEndT]]

Load the entire history of snapshots.

load_all is not used by pydantic-graph itself, instead it’s provided to make it convenient to get all snapshots from persistence.

Returns: The list of snapshots.

Returns

list[Snapshot[StateT, RunEndT]]

set_graph_types
def set_graph_types(graph: Graph[StateT, Any, RunEndT]) -> None

Set the types of the state and run end from a graph.

You generally won’t need to customise this method, instead implement set_types and should_set_types.

Returns

None

should_set_types
def should_set_types() -> bool

Whether types need to be set.

Implementations should override this method to return True when types have not been set if they are needed.

Returns

bool

set_types
def set_types(state_type: type[StateT], run_end_type: type[RunEndT]) -> None

Set the types of the state and run end.

This can be used to create type adapters for serializing and deserializing snapshots, e.g. with build_snapshot_list_type_adapter.

Returns

None

Parameters

state_type : type[StateT]

The state type.

run_end_type : type[RunEndT]

The run end type.

build_snapshot_list_type_adapter

def build_snapshot_list_type_adapter(
    state_t: type[StateT],
    run_end_t: type[RunEndT],
) -> pydantic.TypeAdapter[list[Snapshot[StateT, RunEndT]]]

Build a type adapter for a list of snapshots.

This method should be called from within set_types where context variables will be set such that Pydantic can create a schema for NodeSnapshot.node.

Returns

pydantic.TypeAdapter[list[Snapshot[StateT, RunEndT]]]

SnapshotStatus

The status of a snapshot.

  • 'created': The snapshot has been created but not yet run.
  • 'pending': The snapshot has been retrieved with load_next but not yet run.
  • 'running': The snapshot is currently running.
  • 'success': The snapshot has been run successfully.
  • 'error': The snapshot has been run but an error occurred.

Default: Literal['created', 'pending', 'running', 'success', 'error']

Snapshot

A step in the history of a graph run.

Graph.run returns a list of these steps describing the execution of the graph, together with the run return value.

Default: NodeSnapshot[StateT, RunEndT] | EndSnapshot[StateT, RunEndT]

In memory state persistence.

This module provides simple in memory state persistence for graphs.

SimpleStatePersistence

Bases: BaseStatePersistence[StateT, RunEndT]

Simple in memory state persistence that just hold the latest snapshot.

If no state persistence implementation is provided when running a graph, this is used by default.

Attributes

last_snapshot

The last snapshot.

Type: Snapshot[StateT, RunEndT] | None Default: None

FullStatePersistence

Bases: BaseStatePersistence[StateT, RunEndT]

In memory state persistence that hold a list of snapshots.

Attributes

deep_copy

Whether to deep copy the state and nodes when storing them.

Defaults to True so even if nodes or state are modified after the snapshot is taken, the persistence history will record the value at the time of the snapshot.

Type: bool Default: True

history

List of snapshots taken during the graph run.

Type: list[Snapshot[StateT, RunEndT]] Default: field(default_factory=(list[Snapshot[StateT, RunEndT]]))

Methods

dump_json
def dump_json(indent: int | None = None) -> bytes

Dump the history to JSON bytes.

Returns

bytes

load_json
def load_json(json_data: str | bytes | bytearray) -> None

Load the history from JSON.

Returns

None

FileStatePersistence

Bases: BaseStatePersistence[StateT, RunEndT]

File based state persistence that hold graph run state in a JSON file.

Attributes

json_file

Path to the JSON file where the snapshots are stored.

You should use a different file for each graph run, but a single file should be reused for multiple steps of the same run.

For example if you have a run ID of the form run_123abc, you might create a FileStatePersistence thus:

from pathlib import Path

from pydantic_graph import FullStatePersistence

run_id = 'run_123abc'
persistence = FullStatePersistence(Path('runs') / f'{run_id}.json')

Type: Path

Methods

should_set_types
def should_set_types() -> bool

Whether types need to be set.

Returns

bool