Skip to content

pydantic_ai — Concurrency

ConcurrencyLimitedModel

Bases: WrapperModel

A model wrapper that limits concurrent requests to the underlying model.

This wrapper applies concurrency limiting at the model level, ensuring that the number of concurrent requests to the model does not exceed the configured limit. This is useful for:

  • Respecting API rate limits
  • Managing resource usage
  • Sharing a concurrency pool across multiple models

Example usage:

from pydantic_ai import Agent
from pydantic_ai.models.concurrency import ConcurrencyLimitedModel

# Limit to 5 concurrent requests
model = ConcurrencyLimitedModel('openai:gpt-4o', limiter=5)
agent = Agent(model)

# Or share a limiter across multiple models
from pydantic_ai import ConcurrencyLimiter  # noqa E402

shared_limiter = ConcurrencyLimiter(max_running=10, name='openai-pool')
model1 = ConcurrencyLimitedModel('openai:gpt-4o', limiter=shared_limiter)
model2 = ConcurrencyLimitedModel('openai:gpt-4o-mini', limiter=shared_limiter)

Methods

__init__
def __init__(
    wrapped: Model | KnownModelName,
    limiter: int | ConcurrencyLimit | AbstractConcurrencyLimiter,
)

Initialize the ConcurrencyLimitedModel.

Parameters

wrapped : Model | KnownModelName

The model to wrap, either a Model instance or a known model name.

The concurrency limit configuration. Can be:

  • An int: Simple limit on concurrent operations (unlimited queue).
  • A ConcurrencyLimit: Full configuration with optional backpressure.
  • An AbstractConcurrencyLimiter: A pre-created limiter for sharing across models.
request

@async

def request(
    messages: list[ModelMessage],
    model_settings: ModelSettings | None,
    model_request_parameters: ModelRequestParameters,
) -> ModelResponse

Make a request to the model with concurrency limiting.

Returns

ModelResponse

count_tokens

@async

def count_tokens(
    messages: list[ModelMessage],
    model_settings: ModelSettings | None,
    model_request_parameters: ModelRequestParameters,
) -> RequestUsage

Count tokens with concurrency limiting.

Returns

RequestUsage

request_stream

@async

def request_stream(
    messages: list[ModelMessage],
    model_settings: ModelSettings | None,
    model_request_parameters: ModelRequestParameters,
    run_context: RunContext[Any] | None = None,
) -> AsyncIterator[StreamedResponse]

Make a streaming request to the model with concurrency limiting.

Returns

AsyncIterator[StreamedResponse]

limit_model_concurrency

def limit_model_concurrency(
    model: Model | KnownModelName,
    limiter: AnyConcurrencyLimit,
) -> Model

Wrap a model with concurrency limiting.

This is a convenience function to wrap a model with concurrency limiting. If the limiter is None, the model is returned unchanged.

Example:

from pydantic_ai.models.concurrency import limit_model_concurrency

model = limit_model_concurrency('openai:gpt-4o', limiter=5)

Returns

Model — The wrapped model with concurrency limiting, or the original model if limiter is None.

Parameters

model : Model | KnownModelName

The model to wrap.

The concurrency limit configuration.

AbstractConcurrencyLimiter

Bases: ABC

Abstract base class for concurrency limiters.

Subclass this to create custom concurrency limiters (e.g., Redis-backed distributed limiters).

Example:

from pydantic_ai.concurrency import AbstractConcurrencyLimiter


class RedisConcurrencyLimiter(AbstractConcurrencyLimiter):
    def __init__(self, redis_client, key: str, max_running: int):
        self._redis = redis_client
        self._key = key
        self._max_running = max_running

    async def acquire(self, source: str) -> None:
        # Implement Redis-based distributed locking
        ...

    def release(self) -> None:
        # Release the Redis lock
        ...

Methods

acquire

@abstractmethod

@async

def acquire(source: str) -> None

Acquire a slot, waiting if necessary.

Returns

None

Parameters

source : str

Identifier for observability (e.g., ‘model:gpt-4o’).

release

@abstractmethod

def release() -> None

Release a slot.

Returns

None

ConcurrencyLimiter

Bases: AbstractConcurrencyLimiter

A concurrency limiter that tracks waiting operations for observability.

This class wraps an anyio.CapacityLimiter and tracks the number of waiting operations. When an operation has to wait to acquire a slot, a span is created for observability purposes.

Attributes

name

Name of the limiter for observability.

Type: str | None

waiting_count

Number of operations currently waiting to acquire a slot.

Type: int

running_count

Number of operations currently running.

Type: int

available_count

Number of slots available.

Type: int

max_running

Maximum concurrent operations allowed.

Type: int

Methods

__init__
def __init__(
    max_running: int,
    max_queued: int | None = None,
    name: str | None = None,
    tracer: Tracer | None = None,
)

Initialize the ConcurrencyLimiter.

Parameters

max_running : int

Maximum number of concurrent operations.

max_queued : int | None Default: None

Maximum queue depth before raising ConcurrencyLimitExceeded.

name : str | None Default: None

Optional name for this limiter, used for observability when sharing a limiter across multiple models or agents.

tracer : Tracer | None Default: None

OpenTelemetry tracer for span creation.

from_limit

@classmethod

def from_limit(
    cls,
    limit: int | ConcurrencyLimit,
    name: str | None = None,
    tracer: Tracer | None = None,
) -> Self

Create a ConcurrencyLimiter from a ConcurrencyLimit configuration.

Returns

Self — A configured ConcurrencyLimiter.

Parameters

Either an int for simple limiting or a ConcurrencyLimit for full config.

name : str | None Default: None

Optional name for this limiter, used for observability.

tracer : Tracer | None Default: None

OpenTelemetry tracer for span creation.

acquire

@async

def acquire(source: str) -> None

Acquire a slot, creating a span if waiting is required.

Returns

None

Parameters

source : str

Identifier for the source of this acquisition (e.g., ‘agent:my-agent’ or ‘model:gpt-4’).

release
def release() -> None

Release a slot.

Returns

None

ConcurrencyLimit

Configuration for concurrency limiting with optional backpressure.

Constructor Parameters

max_running : int

Maximum number of concurrent operations allowed.

max_queued : int | None Default: None

Maximum number of operations waiting in the queue. If None, the queue is unlimited. If exceeded, raises ConcurrencyLimitExceeded.

AnyConcurrencyLimit

Type alias for concurrency limit configuration.

Can be:

  • An int: Simple limit on concurrent operations (unlimited queue).
  • A ConcurrencyLimit: Full configuration with optional backpressure.
  • An AbstractConcurrencyLimiter: A pre-created limiter instance for sharing across multiple models/agents.
  • None: No concurrency limiting (default).

Type: TypeAlias Default: 'int | ConcurrencyLimit | AbstractConcurrencyLimiter | None'

ConcurrencyLimitExceeded

Bases: AgentRunError

Error raised when the concurrency queue depth exceeds max_queued.