Skip to content

[Feature] Centralized Anthropic API token-bucket rate limiter across concurrent agent loops #1069

@cgcardona

Description

@cgcardona

Problem

Every dispatched agent loop applies a cooperative per-loop delay via AC_MIN_TURN_DELAY_SECS=0.5 (a asyncio.sleep between LLM turns). This is not a real rate limiter — it is a naively cooperative delay that provides no protection against burst conditions:

  • Multiple agents finishing tool calls simultaneously and requesting the next LLM turn will all fire requests in the same moment, easily exceeding Anthropic's Tier 4 RPM/TPM limits
  • A 429 from Anthropic is currently retried with simple exponential backoff — all agents retry simultaneously, which triggers a thundering-herd second wave
  • The delay adds latency even when only one agent is running and the API is under no load

This becomes meaningfully worse once Coasts-style isolation is in place and more concurrent agents are feasible — more parallel runners means more concurrent API calls.

Proposed solution

Replace the per-loop asyncio.sleep with a centralized async token-bucket rate limiter shared across all agent loops in the process. The limiter enforces a configurable requests-per-minute and tokens-per-minute ceiling, refilling the bucket at the configured rate.

Architecture

agentception process
  ┌────────────────────────┐
  │  AnthropicRateLimiter       │  ← singleton, process-global
  │  ──────────────────────── │
  │  rpm_bucket: TokenBucket    │
  │  tpm_bucket: TokenBucket    │
  │  acquire(tokens) -> None   │  ← awaited before every LLM call
  └────────────────────────┘
         ↑                 ↑
   agent loop 1       agent loop 2  ...N

Implementation

agentception/services/rate_limiter.py

from __future__ import annotations

import asyncio
import time
from dataclasses import dataclass, field


@dataclass
class TokenBucket:
    capacity: float
    refill_rate: float  # tokens per second
    _tokens: float = field(init=False)
    _last_refill: float = field(init=False)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)

    def __post_init__(self) -> None:
        self._tokens = self.capacity
        self._last_refill = time.monotonic()

    async def acquire(self, tokens: float = 1.0) -> None:
        async with self._lock:
            self._refill()
            while self._tokens < tokens:
                deficit = tokens - self._tokens
                wait = deficit / self.refill_rate
                await asyncio.sleep(wait)
                self._refill()
            self._tokens -= tokens

    def _refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self._last_refill
        self._tokens = min(self.capacity, self._tokens + elapsed * self.refill_rate)
        self._last_refill = now


class AnthropicRateLimiter:
    def __init__(self, rpm: int, tpm: int) -> None:
        self._rpm_bucket = TokenBucket(capacity=rpm, refill_rate=rpm / 60.0)
        self._tpm_bucket = TokenBucket(capacity=tpm, refill_rate=tpm / 60.0)

    async def acquire(self, estimated_tokens: int = 1000) -> None:
        """Await before every Anthropic API call. Blocks until capacity is available."""
        await asyncio.gather(
            self._rpm_bucket.acquire(1),
            self._tpm_bucket.acquire(float(estimated_tokens)),
        )


_limiter: AnthropicRateLimiter | None = None


def get_limiter() -> AnthropicRateLimiter:
    global _limiter
    if _limiter is None:
        raise RuntimeError("Rate limiter not initialised. Call init_limiter() at startup.")
    return _limiter


def init_limiter(rpm: int, tpm: int) -> None:
    global _limiter
    _limiter = AnthropicRateLimiter(rpm=rpm, tpm=tpm)

Startup (agentception/main.py lifespan):

init_limiter(rpm=settings.AC_ANTHROPIC_RPM, tpm=settings.AC_ANTHROPIC_TPM)

Agent loop (agentception/services/agent_loop.py or equivalent — wherever the client.messages.create call lives):

await get_limiter().acquire(estimated_tokens=prompt_token_estimate)
response = await client.messages.create(...)

Config additions (agentception/config.py):

AC_ANTHROPIC_RPM: int = 4000   # Tier 4 default; lower for Tier 3
AC_ANTHROPIC_TPM: int = 400000  # Tier 4 default

Remove AC_MIN_TURN_DELAY_SECS (dead config after this is merged).

429 handling

When a 429 is received despite the limiter (Anthropic's server-side burst protection), the limiter should:

  1. Parse the retry-after header (Anthropic always includes it)
  2. Drain the RPM bucket by the retry-after duration so subsequent callers also back off
  3. Re-raise the error for the caller to retry once the bucket refills

This prevents the thundering-herd second wave.

Acceptance criteria

  • A single AnthropicRateLimiter singleton is initialised at startup and shared across all agent loops
  • AC_MIN_TURN_DELAY_SECS config var and all references to it are deleted
  • 429 responses drain the RPM bucket by the retry-after duration
  • Under a simulated burst of 20 concurrent acquire() calls, no more than AC_ANTHROPIC_RPM calls are dispatched per minute
  • mypy clean, zero Any, unit tests with a mock clock

References

  • agentception/config.pyAC_MIN_TURN_DELAY_SECS
  • The agent loop implementation (wherever client.messages.create is called)
  • agentception/services/run_factory.py

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions