pydantic_evals.online
Online evaluation — attach evaluators to live functions for automatic background evaluation.
This module provides the infrastructure for running evaluators on production (or staging) traffic.
The same Evaluator instances used with Dataset.evaluate() work here, the difference is in how
they are wired up (decorator vs dataset) rather than what they are.
Example:
from dataclasses import dataclass
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate
@dataclass
class IsNonEmpty(Evaluator):
def evaluate(self, ctx: EvaluatorContext) -> bool:
return bool(ctx.output)
@evaluate(IsNonEmpty())
async def my_function(x: int) -> int:
return x
Context available when deciding whether to sample an evaluator.
Contains the information available before the decorated function runs — the evaluator instance, function inputs, config metadata, and a per-call random seed. The function’s output and duration are not yet available at sampling time.
The evaluator being sampled.
Type: Evaluator
The inputs to the decorated function.
Type: Any
Metadata from the OnlineEvalConfig, if set.
A uniform random value in [0, 1) generated once per decorated function call.
Shared across all evaluators for the same call. In 'correlated' sampling mode this is
used automatically; in 'independent' mode it is available for custom sample_rate
callables that want to implement their own correlated logic.
Type: float
Identifies a span that evaluation results should be associated with.
Used by sinks to associate evaluation results with the original function execution span.
The trace ID of the span.
Type: str
The span ID of the span.
Type: str
Bases: Protocol
Protocol for evaluation result destinations.
Implementations receive evaluation results and can send them to any backend (Logfire annotations, custom callback, stdout, etc.).
@async
def submit(
results: Sequence[EvaluationResult],
failures: Sequence[EvaluatorFailure],
context: EvaluatorContext,
span_reference: SpanReference | None,
) -> None
Submit evaluation results to the sink.
results : Sequence[EvaluationResult]
Evaluation results from successful evaluator runs.
failures : Sequence[EvaluatorFailure]
Failures from evaluator runs that raised exceptions.
The full evaluator context for the function call.
span_reference : SpanReference | None
Reference to the OTel span for the function call, if available.
An EvaluationSink that delegates to a user-provided callable.
The callback receives the results, failures, and context. The span_reference is not
passed to the callback — use a custom EvaluationSink implementation if you need it.
Wraps an Evaluator with per-evaluator online configuration.
Different evaluators often need different settings — a cheap heuristic should run on 100% of traffic while an expensive LLM judge might run on only 1%.
The evaluator to run.
Type: Evaluator
Probability of running this evaluator (0.0–1.0), or a callable returning a float or bool.
When a callable, it receives a SamplingContext
with the function inputs, config metadata, and evaluator name — but not the output or
duration (which aren’t available yet at sampling time).
Defaults to None, which uses the config’s default_sample_rate at each call.
Set explicitly to override.
Type: float | Callable[[SamplingContext], float | bool] | None Default: None
Maximum number of concurrent evaluations for this evaluator.
Type: int Default: 10
Override sink(s) for this evaluator. If None, the config’s default_sink is used.
Type: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None Default: None
Called when an evaluation is dropped because max_concurrency was reached.
Receives the EvaluatorContext that would have been evaluated. Can be sync or async.
If None (the default), dropped evaluations are silently ignored.
Type: OnMaxConcurrencyCallback | None Default: None
Called synchronously when a sample_rate callable raises an exception.
Receives the exception and the evaluator. Must be sync (not async), since sampling
runs before the decorated function. If set, the evaluator is skipped. If None,
uses the config’s on_sampling_error default. If neither is set, the exception
propagates to the caller.
Type: OnSamplingErrorCallback | None Default: None
Called when an exception occurs in a sink or on_max_concurrency callback.
Receives the exception, evaluator context, evaluator instance, and a location string
('sink' or 'on_max_concurrency'). Can be sync or async.
If None, uses the config’s on_error default. If neither is set, exceptions are
silently suppressed.
Type: OnErrorCallback | None Default: None
Bases: Protocol
Protocol for retrieving stored evaluator contexts.
Implementations reconstruct EvaluatorContext
objects from stored traces (e.g., Logfire). The batch method allows fetching contexts
for multiple spans in a single call.
@async
def fetch(span: SpanReference) -> EvaluatorContext
Fetch an evaluator context for a single span.
EvaluatorContext — The evaluator context for the span.
Reference to the span to fetch context for.
@async
def fetch_many(spans: Sequence[SpanReference]) -> list[EvaluatorContext]
Fetch evaluator contexts for multiple spans in a single batch.
list[EvaluatorContext] — Evaluator contexts in the same order as the input spans.
spans : Sequence[SpanReference]
References to the spans to fetch context for.
Holds cross-evaluator defaults for online evaluation.
Create instances for different evaluation configurations, or use the global
DEFAULT_CONFIG via the module-level evaluate() and configure() functions.
Default sink(s) for evaluators that don’t specify their own.
Type: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None Default: None
Default sample rate for evaluators that don’t specify their own.
Type: float | Callable[[SamplingContext], float | bool] Default: 1.0
Controls how per-evaluator sample rates interact for a single call.
'independent'(default): each evaluator decides independently.'correlated': a shared random seed is used so that lower-rate evaluators’ calls are a subset of higher-rate ones, minimising total overhead.
See SamplingMode for details.
Type: SamplingMode Default: 'independent'
Whether online evaluation is enabled for this config.
Type: bool Default: True
Optional metadata to include in evaluator contexts.
Type: dict[str, Any] | None Default: None
Default handler called when an evaluation is dropped because max_concurrency was reached.
Receives the EvaluatorContext that would have been evaluated. Can be sync or async.
If None (the default), dropped evaluations are silently ignored.
Per-evaluator OnlineEvaluator.on_max_concurrency overrides this default.
Type: OnMaxConcurrencyCallback | None Default: None
Default handler called synchronously when a sample_rate callable raises.
Receives the exception and the evaluator. Must be sync (not async).
If set, the evaluator is skipped. If None (the default), the exception
propagates to the caller.
Per-evaluator OnlineEvaluator.on_sampling_error overrides this default.
Type: OnSamplingErrorCallback | None Default: None
Default handler called when an exception occurs in a sink or on_max_concurrency callback.
Receives the exception, evaluator context, evaluator instance, and a location string
('sink' or 'on_max_concurrency'). Can be sync or async.
If None (the default), exceptions are silently suppressed.
Per-evaluator OnlineEvaluator.on_error overrides this default.
Type: OnErrorCallback | None Default: None
def evaluate(
evaluators: Evaluator | OnlineEvaluator = (),
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]
Decorator to attach online evaluators to a function.
Bare Evaluator instances are auto-wrapped in OnlineEvaluator at decoration time
(so concurrency semaphores are shared across calls). Their sample_rate defaults to
None, which resolves to the config’s default_sample_rate at each call — so
changes to the config after decoration take effect.
Callable[[Callable[_P, _R]], Callable[_P, _R]] — A decorator that wraps the function with online evaluation.
Evaluators to attach. Can be Evaluator or OnlineEvaluator instances.
def disable_evaluation() -> Iterator[None]
Context manager to disable all online evaluation in the current context.
When active, decorated functions still execute normally but no evaluators are dispatched.
@async
def run_evaluators(
evaluators: Sequence[Evaluator],
context: EvaluatorContext,
) -> tuple[list[EvaluationResult], list[EvaluatorFailure]]
Run evaluators on a context and return results.
Useful for re-running evaluators from stored data.
tuple[list[EvaluationResult], list[EvaluatorFailure]] — A tuple of (results, failures).
evaluators : Sequence[Evaluator]
The evaluators to run.
The evaluator context to evaluate against.
def evaluate(
evaluators: Evaluator | OnlineEvaluator = (),
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]
Decorator to attach online evaluators to a function using the global default config.
Equivalent to DEFAULT_CONFIG.evaluate(...).
Example:
from dataclasses import dataclass
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate
@dataclass
class IsNonEmpty(Evaluator):
def evaluate(self, ctx: EvaluatorContext) -> bool:
return bool(ctx.output)
@evaluate(IsNonEmpty())
async def my_function(x: int) -> int:
return x
Callable[[Callable[_P, _R]], Callable[_P, _R]] — A decorator that wraps the function with online evaluation.
Evaluators to attach. Can be Evaluator or OnlineEvaluator instances.
def configure(
default_sink: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None | Unset = UNSET,
default_sample_rate: float | Callable[[SamplingContext], float | bool] | Unset = UNSET,
sampling_mode: SamplingMode | Unset = UNSET,
enabled: bool | Unset = UNSET,
metadata: dict[str, Any] | None | Unset = UNSET,
on_max_concurrency: OnMaxConcurrencyCallback | None | Unset = UNSET,
on_sampling_error: OnSamplingErrorCallback | None | Unset = UNSET,
on_error: OnErrorCallback | None | Unset = UNSET,
) -> None
Configure the global default OnlineEvalConfig.
Only provided values are updated; unset arguments are ignored.
Pass None explicitly to clear default_sink, metadata, on_max_concurrency,
on_sampling_error, or on_error.
default_sink : EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None | Unset Default: UNSET
Default sink(s) for evaluators. Pass None to clear.
Default sample rate for evaluators.
Sampling mode ('independent' or 'correlated').
enabled : bool | Unset Default: UNSET
Whether online evaluation is enabled.
Metadata to include in evaluator contexts. Pass None to clear.
on_max_concurrency : OnMaxConcurrencyCallback | None | Unset Default: UNSET
Default handler for dropped evaluations. Pass None to clear.
on_sampling_error : OnSamplingErrorCallback | None | Unset Default: UNSET
Default handler for sample_rate exceptions. Pass None to clear.
on_error : OnErrorCallback | None | Unset Default: UNSET
Default handler for pipeline exceptions. Pass None to clear.
@async
def wait_for_evaluations(timeout: float = 30.0) -> None
Wait for all pending background evaluation tasks and threads to complete.
This is useful in tests to deterministically wait for background evaluators to finish instead of relying on timing-based sleeps.
For async decorated functions, evaluators run as tasks on the caller’s event loop and are awaited directly. For sync decorated functions, evaluators run in background threads which are joined with the given timeout.
timeout : float Default: 30.0
Maximum seconds to wait for each background thread. Defaults to 30.
The location within the online evaluation pipeline where an error occurred.
Default: Literal['sink', 'on_max_concurrency']
Controls how per-evaluator sample rates interact across evaluators for a single call.
'independent'(default): Each evaluator flips its own coin. With N evaluators each at rate r, the probability of any evaluation overhead is1 − (1−r)^N.'correlated': A single random seed is generated per call and shared across evaluators. An evaluator runs whencall_seed < rate, so lower-rate evaluators’ calls are always a subset of higher-rate ones. The probability of any overhead equalsmax(rate_i).
Default: Literal['independent', 'correlated']
Callback invoked when an evaluation is dropped due to concurrency limits.
Receives the EvaluatorContext that would have been evaluated. Can be sync or async.
Default: Callable[[EvaluatorContext], None | Awaitable[None]]
Callback invoked when a sample_rate callable raises an exception.
Called synchronously before the decorated function runs. Receives the exception
and the evaluator whose sample_rate failed. Must be sync (not async).
If set, the evaluator is skipped. If not set, the exception propagates to the caller.
Default: Callable[[Exception, Evaluator], None]
Callback invoked when an exception occurs in the online evaluation pipeline.
Receives the exception, the evaluator context, the evaluator instance, and a location string indicating where the error occurred. Can be sync or async.
Default: Callable[[Exception, EvaluatorContext, Evaluator, OnErrorLocation], None | Awaitable[None]]
Type alias for bare callables accepted wherever an EvaluationSink is expected.
Auto-wrapped in CallbackSink when passed as a sink parameter.
Default: Callable[[Sequence[EvaluationResult], Sequence[EvaluatorFailure], EvaluatorContext], None | Awaitable[None]]
The global default OnlineEvalConfig instance.
Module-level functions like evaluate() and configure() delegate to this instance.
Default: OnlineEvalConfig()