Skip to content

pydantic_evals.online

Online evaluation — attach evaluators to live functions for automatic background evaluation.

This module provides the infrastructure for running evaluators on production (or staging) traffic. The same Evaluator instances used with Dataset.evaluate() work here, the difference is in how they are wired up (decorator vs dataset) rather than what they are.

Example:

from dataclasses import dataclass

from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate


@dataclass
class IsNonEmpty(Evaluator):
    def evaluate(self, ctx: EvaluatorContext) -> bool:
        return bool(ctx.output)


@evaluate(IsNonEmpty())
async def my_function(x: int) -> int:
    return x

SamplingContext

Context available when deciding whether to sample an evaluator.

Contains the information available before the decorated function runs — the evaluator instance, function inputs, config metadata, and a per-call random seed. The function’s output and duration are not yet available at sampling time.

Attributes

evaluator

The evaluator being sampled.

Type: Evaluator

inputs

The inputs to the decorated function.

Type: Any

metadata

Metadata from the OnlineEvalConfig, if set.

Type: dict[str, Any] | None

call_seed

A uniform random value in [0, 1) generated once per decorated function call.

Shared across all evaluators for the same call. In 'correlated' sampling mode this is used automatically; in 'independent' mode it is available for custom sample_rate callables that want to implement their own correlated logic.

Type: float

SpanReference

Identifies a span that evaluation results should be associated with.

Used by sinks to associate evaluation results with the original function execution span.

Attributes

trace_id

The trace ID of the span.

Type: str

span_id

The span ID of the span.

Type: str

EvaluationSink

Bases: Protocol

Protocol for evaluation result destinations.

Implementations receive evaluation results and can send them to any backend (Logfire annotations, custom callback, stdout, etc.).

Methods

submit

@async

def submit(
    results: Sequence[EvaluationResult],
    failures: Sequence[EvaluatorFailure],
    context: EvaluatorContext,
    span_reference: SpanReference | None,
) -> None

Submit evaluation results to the sink.

Returns

None

Parameters

results : Sequence[EvaluationResult]

Evaluation results from successful evaluator runs.

failures : Sequence[EvaluatorFailure]

Failures from evaluator runs that raised exceptions.

context : EvaluatorContext

The full evaluator context for the function call.

span_reference : SpanReference | None

Reference to the OTel span for the function call, if available.

CallbackSink

An EvaluationSink that delegates to a user-provided callable.

The callback receives the results, failures, and context. The span_reference is not passed to the callback — use a custom EvaluationSink implementation if you need it.

OnlineEvaluator

Wraps an Evaluator with per-evaluator online configuration.

Different evaluators often need different settings — a cheap heuristic should run on 100% of traffic while an expensive LLM judge might run on only 1%.

Attributes

evaluator

The evaluator to run.

Type: Evaluator

sample_rate

Probability of running this evaluator (0.0–1.0), or a callable returning a float or bool.

When a callable, it receives a SamplingContext with the function inputs, config metadata, and evaluator name — but not the output or duration (which aren’t available yet at sampling time).

Defaults to None, which uses the config’s default_sample_rate at each call. Set explicitly to override.

Type: float | Callable[[SamplingContext], float | bool] | None Default: None

max_concurrency

Maximum number of concurrent evaluations for this evaluator.

Type: int Default: 10

sink

Override sink(s) for this evaluator. If None, the config’s default_sink is used.

Type: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None Default: None

on_max_concurrency

Called when an evaluation is dropped because max_concurrency was reached.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async. If None (the default), dropped evaluations are silently ignored.

Type: OnMaxConcurrencyCallback | None Default: None

on_sampling_error

Called synchronously when a sample_rate callable raises an exception.

Receives the exception and the evaluator. Must be sync (not async), since sampling runs before the decorated function. If set, the evaluator is skipped. If None, uses the config’s on_sampling_error default. If neither is set, the exception propagates to the caller.

Type: OnSamplingErrorCallback | None Default: None

on_error

Called when an exception occurs in a sink or on_max_concurrency callback.

Receives the exception, evaluator context, evaluator instance, and a location string ('sink' or 'on_max_concurrency'). Can be sync or async. If None, uses the config’s on_error default. If neither is set, exceptions are silently suppressed.

Type: OnErrorCallback | None Default: None

EvaluatorContextSource

Bases: Protocol

Protocol for retrieving stored evaluator contexts.

Implementations reconstruct EvaluatorContext objects from stored traces (e.g., Logfire). The batch method allows fetching contexts for multiple spans in a single call.

Methods

fetch

@async

def fetch(span: SpanReference) -> EvaluatorContext

Fetch an evaluator context for a single span.

Returns

EvaluatorContext — The evaluator context for the span.

Parameters

span : SpanReference

Reference to the span to fetch context for.

fetch_many

@async

def fetch_many(spans: Sequence[SpanReference]) -> list[EvaluatorContext]

Fetch evaluator contexts for multiple spans in a single batch.

Returns

list[EvaluatorContext] — Evaluator contexts in the same order as the input spans.

Parameters

spans : Sequence[SpanReference]

References to the spans to fetch context for.

OnlineEvalConfig

Holds cross-evaluator defaults for online evaluation.

Create instances for different evaluation configurations, or use the global DEFAULT_CONFIG via the module-level evaluate() and configure() functions.

Attributes

default_sink

Default sink(s) for evaluators that don’t specify their own.

Type: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None Default: None

default_sample_rate

Default sample rate for evaluators that don’t specify their own.

Type: float | Callable[[SamplingContext], float | bool] Default: 1.0

sampling_mode

Controls how per-evaluator sample rates interact for a single call.

  • 'independent' (default): each evaluator decides independently.
  • 'correlated': a shared random seed is used so that lower-rate evaluators’ calls are a subset of higher-rate ones, minimising total overhead.

See SamplingMode for details.

Type: SamplingMode Default: 'independent'

enabled

Whether online evaluation is enabled for this config.

Type: bool Default: True

metadata

Optional metadata to include in evaluator contexts.

Type: dict[str, Any] | None Default: None

on_max_concurrency

Default handler called when an evaluation is dropped because max_concurrency was reached.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async. If None (the default), dropped evaluations are silently ignored. Per-evaluator OnlineEvaluator.on_max_concurrency overrides this default.

Type: OnMaxConcurrencyCallback | None Default: None

on_sampling_error

Default handler called synchronously when a sample_rate callable raises.

Receives the exception and the evaluator. Must be sync (not async). If set, the evaluator is skipped. If None (the default), the exception propagates to the caller. Per-evaluator OnlineEvaluator.on_sampling_error overrides this default.

Type: OnSamplingErrorCallback | None Default: None

on_error

Default handler called when an exception occurs in a sink or on_max_concurrency callback.

Receives the exception, evaluator context, evaluator instance, and a location string ('sink' or 'on_max_concurrency'). Can be sync or async. If None (the default), exceptions are silently suppressed. Per-evaluator OnlineEvaluator.on_error overrides this default.

Type: OnErrorCallback | None Default: None

Methods

evaluate
def evaluate(
    evaluators: Evaluator | OnlineEvaluator = (),
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]

Decorator to attach online evaluators to a function.

Bare Evaluator instances are auto-wrapped in OnlineEvaluator at decoration time (so concurrency semaphores are shared across calls). Their sample_rate defaults to None, which resolves to the config’s default_sample_rate at each call — so changes to the config after decoration take effect.

Returns

Callable[[Callable[_P, _R]], Callable[_P, _R]] — A decorator that wraps the function with online evaluation.

Parameters

*evaluators : Evaluator | OnlineEvaluator Default: ()

Evaluators to attach. Can be Evaluator or OnlineEvaluator instances.

disable_evaluation

def disable_evaluation() -> Iterator[None]

Context manager to disable all online evaluation in the current context.

When active, decorated functions still execute normally but no evaluators are dispatched.

Returns

Iterator[None]

run_evaluators

@async

def run_evaluators(
    evaluators: Sequence[Evaluator],
    context: EvaluatorContext,
) -> tuple[list[EvaluationResult], list[EvaluatorFailure]]

Run evaluators on a context and return results.

Useful for re-running evaluators from stored data.

Returns

tuple[list[EvaluationResult], list[EvaluatorFailure]] — A tuple of (results, failures).

Parameters

evaluators : Sequence[Evaluator]

The evaluators to run.

context : EvaluatorContext

The evaluator context to evaluate against.

evaluate

def evaluate(
    evaluators: Evaluator | OnlineEvaluator = (),
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]

Decorator to attach online evaluators to a function using the global default config.

Equivalent to DEFAULT_CONFIG.evaluate(...).

Example:

from dataclasses import dataclass

from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate


@dataclass
class IsNonEmpty(Evaluator):
    def evaluate(self, ctx: EvaluatorContext) -> bool:
        return bool(ctx.output)


@evaluate(IsNonEmpty())
async def my_function(x: int) -> int:
    return x

Returns

Callable[[Callable[_P, _R]], Callable[_P, _R]] — A decorator that wraps the function with online evaluation.

Parameters

*evaluators : Evaluator | OnlineEvaluator Default: ()

Evaluators to attach. Can be Evaluator or OnlineEvaluator instances.

configure

def configure(
    default_sink: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None | Unset = UNSET,
    default_sample_rate: float | Callable[[SamplingContext], float | bool] | Unset = UNSET,
    sampling_mode: SamplingMode | Unset = UNSET,
    enabled: bool | Unset = UNSET,
    metadata: dict[str, Any] | None | Unset = UNSET,
    on_max_concurrency: OnMaxConcurrencyCallback | None | Unset = UNSET,
    on_sampling_error: OnSamplingErrorCallback | None | Unset = UNSET,
    on_error: OnErrorCallback | None | Unset = UNSET,
) -> None

Configure the global default OnlineEvalConfig.

Only provided values are updated; unset arguments are ignored. Pass None explicitly to clear default_sink, metadata, on_max_concurrency, on_sampling_error, or on_error.

Returns

None

Parameters

default_sink : EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None | Unset Default: UNSET

Default sink(s) for evaluators. Pass None to clear.

default_sample_rate : float | Callable[[SamplingContext], float | bool] | Unset Default: UNSET

Default sample rate for evaluators.

sampling_mode : SamplingMode | Unset Default: UNSET

Sampling mode ('independent' or 'correlated').

enabled : bool | Unset Default: UNSET

Whether online evaluation is enabled.

metadata : dict[str, Any] | None | Unset Default: UNSET

Metadata to include in evaluator contexts. Pass None to clear.

on_max_concurrency : OnMaxConcurrencyCallback | None | Unset Default: UNSET

Default handler for dropped evaluations. Pass None to clear.

on_sampling_error : OnSamplingErrorCallback | None | Unset Default: UNSET

Default handler for sample_rate exceptions. Pass None to clear.

on_error : OnErrorCallback | None | Unset Default: UNSET

Default handler for pipeline exceptions. Pass None to clear.

wait_for_evaluations

@async

def wait_for_evaluations(timeout: float = 30.0) -> None

Wait for all pending background evaluation tasks and threads to complete.

This is useful in tests to deterministically wait for background evaluators to finish instead of relying on timing-based sleeps.

For async decorated functions, evaluators run as tasks on the caller’s event loop and are awaited directly. For sync decorated functions, evaluators run in background threads which are joined with the given timeout.

Returns

None

Parameters

timeout : float Default: 30.0

Maximum seconds to wait for each background thread. Defaults to 30.

OnErrorLocation

The location within the online evaluation pipeline where an error occurred.

Default: Literal['sink', 'on_max_concurrency']

SamplingMode

Controls how per-evaluator sample rates interact across evaluators for a single call.

  • 'independent' (default): Each evaluator flips its own coin. With N evaluators each at rate r, the probability of any evaluation overhead is 1 − (1−r)^N.
  • 'correlated': A single random seed is generated per call and shared across evaluators. An evaluator runs when call_seed < rate, so lower-rate evaluators’ calls are always a subset of higher-rate ones. The probability of any overhead equals max(rate_i).

Default: Literal['independent', 'correlated']

OnMaxConcurrencyCallback

Callback invoked when an evaluation is dropped due to concurrency limits.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async.

Default: Callable[[EvaluatorContext], None | Awaitable[None]]

OnSamplingErrorCallback

Callback invoked when a sample_rate callable raises an exception.

Called synchronously before the decorated function runs. Receives the exception and the evaluator whose sample_rate failed. Must be sync (not async). If set, the evaluator is skipped. If not set, the exception propagates to the caller.

Default: Callable[[Exception, Evaluator], None]

OnErrorCallback

Callback invoked when an exception occurs in the online evaluation pipeline.

Receives the exception, the evaluator context, the evaluator instance, and a location string indicating where the error occurred. Can be sync or async.

Default: Callable[[Exception, EvaluatorContext, Evaluator, OnErrorLocation], None | Awaitable[None]]

SinkCallback

Type alias for bare callables accepted wherever an EvaluationSink is expected.

Auto-wrapped in CallbackSink when passed as a sink parameter.

Default: Callable[[Sequence[EvaluationResult], Sequence[EvaluatorFailure], EvaluatorContext], None | Awaitable[None]]

DEFAULT_CONFIG

The global default OnlineEvalConfig instance.

Module-level functions like evaluate() and configure() delegate to this instance.

Default: OnlineEvalConfig()