pydantic_graph.persistence
Bases: Generic[StateT, RunEndT]
History step describing the execution of a node in a graph.
The state of the graph before the node is run.
Type: StateT
The node to run next.
Type: Annotated[BaseNode[StateT, Any, RunEndT], _utils.CustomNodeSchema()]
The timestamp when the node started running, None until the run starts.
Type: datetime | None Default: None
The duration of the node run in seconds, if the node has been run.
Type: float | None Default: None
The status of the snapshot.
Type: SnapshotStatus Default: 'created'
The kind of history step, can be used as a discriminator when deserializing history.
Type: Literal[‘node’] Default: 'node'
Unique ID of the snapshot.
Type: str Default: UNSET_SNAPSHOT_ID
Bases: Generic[StateT, RunEndT]
History step describing the end of a graph run.
The state of the graph at the end of the run.
Type: StateT
The result of the graph run.
Type: End[RunEndT]
The timestamp when the graph run ended.
Type: datetime Default: field(default_factory=(_utils.now_utc))
The kind of history step, can be used as a discriminator when deserializing history.
Type: Literal[‘end’] Default: 'end'
Unique ID of the snapshot.
Type: str Default: UNSET_SNAPSHOT_ID
Shim to get the result.
Useful to allow [snapshot.node for snapshot in persistence.history].
Type: End[RunEndT]
Bases: ABC, Generic[StateT, RunEndT]
Abstract base class for storing the state of a graph run.
Each instance of a BaseStatePersistence subclass should be used for a single graph run.
@abstractmethod
@async
def snapshot_node(state: StateT, next_node: BaseNode[StateT, Any, RunEndT]) -> None
Snapshot the state of a graph, when the next step is to run a node.
This method should add a NodeSnapshot to persistence.
The state of the graph.
next_node : BaseNode[StateT, Any, RunEndT]
The next node to run.
@abstractmethod
@async
def snapshot_node_if_new(
snapshot_id: str,
state: StateT,
next_node: BaseNode[StateT, Any, RunEndT],
) -> None
Snapshot the state of a graph if the snapshot ID doesn’t already exist in persistence.
This method will generally call snapshot_node
but should do so in an atomic way.
snapshot_id : str
The ID of the snapshot to check.
The state of the graph.
next_node : BaseNode[StateT, Any, RunEndT]
The next node to run.
@abstractmethod
@async
def snapshot_end(state: StateT, end: End[RunEndT]) -> None
Snapshot the state of a graph when the graph has ended.
This method should add an EndSnapshot to persistence.
The state of the graph.
data from the end of the run.
@abstractmethod
def record_run(snapshot_id: str) -> AbstractAsyncContextManager[None]
Record the run of the node, or error if the node is already running.
In particular this should set:
NodeSnapshot.statusto'running'andNodeSnapshot.start_tswhen the run starts.NodeSnapshot.statusto'success'or'error'andNodeSnapshot.durationwhen the run finishes.
AbstractAsyncContextManager[None] — An async context manager that records the run of the node.
snapshot_id : str
The ID of the snapshot to record.
GraphNodeRunningError— if the node status it not'created'or'pending'.LookupError— if the snapshot ID is not found in persistence.
@abstractmethod
@async
def load_next() -> NodeSnapshot[StateT, RunEndT] | None
Retrieve a node snapshot with status 'created’ and set its status to 'pending'.
This is used by Graph.iter_from_persistence
to get the next node to run.
Returns: The snapshot, or None if no snapshot with status 'created’ exists.
NodeSnapshot[StateT, RunEndT] | None
@abstractmethod
@async
def load_all() -> list[Snapshot[StateT, RunEndT]]
Load the entire history of snapshots.
load_all is not used by pydantic-graph itself, instead it’s provided to make it convenient to
get all snapshots from persistence.
Returns: The list of snapshots.
list[Snapshot[StateT, RunEndT]]
def set_graph_types(graph: Graph[StateT, Any, RunEndT]) -> None
Set the types of the state and run end from a graph.
You generally won’t need to customise this method, instead implement
set_types and
should_set_types.
def should_set_types() -> bool
Whether types need to be set.
Implementations should override this method to return True when types have not been set if they are needed.
def set_types(state_type: type[StateT], run_end_type: type[RunEndT]) -> None
Set the types of the state and run end.
This can be used to create type adapters for serializing and deserializing snapshots,
e.g. with build_snapshot_list_type_adapter.
state_type : type[StateT]
The state type.
run_end_type : type[RunEndT]
The run end type.
def build_snapshot_list_type_adapter(
state_t: type[StateT],
run_end_t: type[RunEndT],
) -> pydantic.TypeAdapter[list[Snapshot[StateT, RunEndT]]]
Build a type adapter for a list of snapshots.
This method should be called from within
set_types
where context variables will be set such that Pydantic can create a schema for
NodeSnapshot.node.
pydantic.TypeAdapter[list[Snapshot[StateT, RunEndT]]]
The status of a snapshot.
'created': The snapshot has been created but not yet run.'pending': The snapshot has been retrieved withload_nextbut not yet run.'running': The snapshot is currently running.'success': The snapshot has been run successfully.'error': The snapshot has been run but an error occurred.
Default: Literal['created', 'pending', 'running', 'success', 'error']
A step in the history of a graph run.
Graph.run returns a list of these steps describing the execution of the graph,
together with the run return value.
Default: NodeSnapshot[StateT, RunEndT] | EndSnapshot[StateT, RunEndT]
In memory state persistence.
This module provides simple in memory state persistence for graphs.
Bases: BaseStatePersistence[StateT, RunEndT]
Simple in memory state persistence that just hold the latest snapshot.
If no state persistence implementation is provided when running a graph, this is used by default.
The last snapshot.
Type: Snapshot[StateT, RunEndT] | None Default: None
Bases: BaseStatePersistence[StateT, RunEndT]
In memory state persistence that hold a list of snapshots.
Whether to deep copy the state and nodes when storing them.
Defaults to True so even if nodes or state are modified after the snapshot is taken,
the persistence history will record the value at the time of the snapshot.
Type: bool Default: True
List of snapshots taken during the graph run.
Type: list[Snapshot[StateT, RunEndT]] Default: field(default_factory=(list[Snapshot[StateT, RunEndT]]))
def dump_json(indent: int | None = None) -> bytes
Dump the history to JSON bytes.
def load_json(json_data: str | bytes | bytearray) -> None
Load the history from JSON.
Bases: BaseStatePersistence[StateT, RunEndT]
File based state persistence that hold graph run state in a JSON file.
Path to the JSON file where the snapshots are stored.
You should use a different file for each graph run, but a single file should be reused for multiple steps of the same run.
For example if you have a run ID of the form run_123abc, you might create a FileStatePersistence thus:
from pathlib import Path
from pydantic_graph import FullStatePersistence
run_id = 'run_123abc'
persistence = FullStatePersistence(Path('runs') / f'{run_id}.json')
Type: Path
def should_set_types() -> bool
Whether types need to be set.