pydantic_ai — Concurrency
Bases: WrapperModel
A model wrapper that limits concurrent requests to the underlying model.
This wrapper applies concurrency limiting at the model level, ensuring that the number of concurrent requests to the model does not exceed the configured limit. This is useful for:
- Respecting API rate limits
- Managing resource usage
- Sharing a concurrency pool across multiple models
Example usage:
from pydantic_ai import Agent
from pydantic_ai.models.concurrency import ConcurrencyLimitedModel
# Limit to 5 concurrent requests
model = ConcurrencyLimitedModel('openai:gpt-4o', limiter=5)
agent = Agent(model)
# Or share a limiter across multiple models
from pydantic_ai import ConcurrencyLimiter # noqa E402
shared_limiter = ConcurrencyLimiter(max_running=10, name='openai-pool')
model1 = ConcurrencyLimitedModel('openai:gpt-4o', limiter=shared_limiter)
model2 = ConcurrencyLimitedModel('openai:gpt-4o-mini', limiter=shared_limiter)
def __init__(
wrapped: Model | KnownModelName,
limiter: int | ConcurrencyLimit | AbstractConcurrencyLimiter,
)
Initialize the ConcurrencyLimitedModel.
The model to wrap, either a Model instance or a known model name.
limiter : int | ConcurrencyLimit | AbstractConcurrencyLimiter
The concurrency limit configuration. Can be:
- An
int: Simple limit on concurrent operations (unlimited queue). - A
ConcurrencyLimit: Full configuration with optional backpressure. - An
AbstractConcurrencyLimiter: A pre-created limiter for sharing across models.
@async
def request(
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
) -> ModelResponse
Make a request to the model with concurrency limiting.
@async
def count_tokens(
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
) -> RequestUsage
Count tokens with concurrency limiting.
@async
def request_stream(
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
run_context: RunContext[Any] | None = None,
) -> AsyncIterator[StreamedResponse]
Make a streaming request to the model with concurrency limiting.
AsyncIterator[StreamedResponse]
def limit_model_concurrency(
model: Model | KnownModelName,
limiter: AnyConcurrencyLimit,
) -> Model
Wrap a model with concurrency limiting.
This is a convenience function to wrap a model with concurrency limiting. If the limiter is None, the model is returned unchanged.
Example:
from pydantic_ai.models.concurrency import limit_model_concurrency
model = limit_model_concurrency('openai:gpt-4o', limiter=5)
Model — The wrapped model with concurrency limiting, or the original model if limiter is None.
The model to wrap.
limiter : AnyConcurrencyLimit
The concurrency limit configuration.
Bases: ABC
Abstract base class for concurrency limiters.
Subclass this to create custom concurrency limiters (e.g., Redis-backed distributed limiters).
Example:
from pydantic_ai.concurrency import AbstractConcurrencyLimiter
class RedisConcurrencyLimiter(AbstractConcurrencyLimiter):
def __init__(self, redis_client, key: str, max_running: int):
self._redis = redis_client
self._key = key
self._max_running = max_running
async def acquire(self, source: str) -> None:
# Implement Redis-based distributed locking
...
def release(self) -> None:
# Release the Redis lock
...
@abstractmethod
@async
def acquire(source: str) -> None
Acquire a slot, waiting if necessary.
source : str
Identifier for observability (e.g., ‘model:gpt-4o’).
@abstractmethod
def release() -> None
Release a slot.
Bases: AbstractConcurrencyLimiter
A concurrency limiter that tracks waiting operations for observability.
This class wraps an anyio.CapacityLimiter and tracks the number of waiting operations. When an operation has to wait to acquire a slot, a span is created for observability purposes.
Name of the limiter for observability.
Number of operations currently waiting to acquire a slot.
Type: int
Number of operations currently running.
Type: int
Number of slots available.
Type: int
Maximum concurrent operations allowed.
Type: int
def __init__(
max_running: int,
max_queued: int | None = None,
name: str | None = None,
tracer: Tracer | None = None,
)
Initialize the ConcurrencyLimiter.
max_running : int
Maximum number of concurrent operations.
Maximum queue depth before raising ConcurrencyLimitExceeded.
Optional name for this limiter, used for observability when sharing a limiter across multiple models or agents.
tracer : Tracer | None Default: None
OpenTelemetry tracer for span creation.
@classmethod
def from_limit(
cls,
limit: int | ConcurrencyLimit,
name: str | None = None,
tracer: Tracer | None = None,
) -> Self
Create a ConcurrencyLimiter from a ConcurrencyLimit configuration.
Self — A configured ConcurrencyLimiter.
limit : int | ConcurrencyLimit
Either an int for simple limiting or a ConcurrencyLimit for full config.
Optional name for this limiter, used for observability.
tracer : Tracer | None Default: None
OpenTelemetry tracer for span creation.
@async
def acquire(source: str) -> None
Acquire a slot, creating a span if waiting is required.
source : str
Identifier for the source of this acquisition (e.g., ‘agent:my-agent’ or ‘model:gpt-4’).
def release() -> None
Release a slot.
Configuration for concurrency limiting with optional backpressure.
max_running : int
Maximum number of concurrent operations allowed.
Maximum number of operations waiting in the queue.
If None, the queue is unlimited. If exceeded, raises ConcurrencyLimitExceeded.
Type alias for concurrency limit configuration.
Can be:
- An
int: Simple limit on concurrent operations (unlimited queue). - A
ConcurrencyLimit: Full configuration with optional backpressure. - An
AbstractConcurrencyLimiter: A pre-created limiter instance for sharing across multiple models/agents. None: No concurrency limiting (default).
Type: TypeAlias Default: 'int | ConcurrencyLimit | AbstractConcurrencyLimiter | None'
Bases: AgentRunError
Error raised when the concurrency queue depth exceeds max_queued.