Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
2028c27
refactor(core): 🔨 modularize client and usage architecture
Mirrowel Jan 21, 2026
cd744cc
refactor(core): 🔨 finalize modular architecture and preserve legacy i…
Mirrowel Jan 21, 2026
26f2846
feat(core): ✨ add async credential waiting and quota group sync
Mirrowel Jan 21, 2026
1c86c22
feat(core): ✨ parse granular Google quota details and cleanup streaming
Mirrowel Jan 21, 2026
523ea31
feat(usage): ✨ implement granular tracking, cost calculation, and hooks
Mirrowel Jan 21, 2026
01dffe7
feat(core): ✨ integrate advanced logging and dynamic usage configuration
Mirrowel Jan 21, 2026
673ca0e
feat(usage): ✨ implement quota group aggregation and explicit initial…
Mirrowel Jan 21, 2026
3450c73
feat(core): ✨ enhance usage stats aggregation and enforce async clien…
Mirrowel Jan 21, 2026
790c01e
refactor(core): 🔨 transition availability checks to async and expose …
Mirrowel Jan 21, 2026
4f1cceb
feat(usage): ✨ track reasoning and cache write tokens
Mirrowel Jan 22, 2026
2136d98
refactor(core): 🔨 preserve legacy client and usage manager implementa…
Mirrowel Jan 22, 2026
dfd2070
feat(usage): ✨ track internal provider retries and enhance api docume…
Mirrowel Jan 22, 2026
22ef3f6
fix(usage): 🐛 prevent stale api data from overwriting local counts
Mirrowel Jan 22, 2026
7ed4c9c
feat(client): ✨ add Anthropic API compatibility handler
Mirrowel Jan 22, 2026
ba60834
refactor(usage): 🔨 centralize window aggregation and reconcile usage …
Mirrowel Jan 22, 2026
94231e0
refactor(client): 🔨 centralize request execution setup logic
Mirrowel Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ cache/

oauth_creds/

usage/
6 changes: 5 additions & 1 deletion src/proxy_app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ async def process_credential(provider: str, path: str, provider_instance):
max_concurrent_requests_per_key=max_concurrent_requests_per_key,
)

await client.initialize_usage_managers()

# Log loaded credentials summary (compact, always visible for deployment verification)
# _api_summary = ', '.join([f"{p}:{len(c)}" for p, c in api_keys.items()]) if api_keys else "none"
# _oauth_summary = ', '.join([f"{p}:{len(c)}" for p, c in oauth_credentials.items()]) if oauth_credentials else "none"
Expand Down Expand Up @@ -956,7 +958,9 @@ async def chat_completions(
is_streaming = request_data.get("stream", False)

if is_streaming:
response_generator = client.acompletion(request=request, **request_data)
response_generator = await client.acompletion(
request=request, **request_data
)
return StreamingResponse(
streaming_response_wrapper(
request, request_data, response_generator, raw_logger
Expand Down
File renamed without changes.
3,980 changes: 3,980 additions & 0 deletions src/rotator_library/_usage_manager_legacy.py

Large diffs are not rendered by default.

21 changes: 15 additions & 6 deletions src/rotator_library/background_refresher.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,13 @@ async def _run_provider_background_job(
# Run immediately on start if configured
if run_on_start:
try:
await provider.run_background_job(
self._client.usage_manager, credentials
)
usage_manager = self._client.usage_managers.get(provider_name)
if usage_manager is None:
lib_logger.debug(
f"Skipping {provider_name} {job_name}: no UsageManager"
)
return
await provider.run_background_job(usage_manager, credentials)
lib_logger.debug(f"{provider_name} {job_name}: initial run complete")
except Exception as e:
lib_logger.error(
Expand All @@ -247,9 +251,13 @@ async def _run_provider_background_job(
while True:
try:
await asyncio.sleep(interval)
await provider.run_background_job(
self._client.usage_manager, credentials
)
usage_manager = self._client.usage_managers.get(provider_name)
if usage_manager is None:
lib_logger.debug(
f"Skipping {provider_name} {job_name}: no UsageManager"
)
return
await provider.run_background_job(usage_manager, credentials)
lib_logger.debug(f"{provider_name} {job_name}: periodic run complete")
except asyncio.CancelledError:
lib_logger.debug(f"{provider_name} {job_name}: cancelled")
Expand All @@ -259,6 +267,7 @@ async def _run_provider_background_job(

async def _run(self):
"""The main loop for OAuth token refresh."""
await self._client.initialize_usage_managers()
# Initialize credentials (load persisted tiers) before starting
await self._initialize_credentials()

Expand Down
49 changes: 49 additions & 0 deletions src/rotator_library/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# SPDX-License-Identifier: LGPL-3.0-only
# Copyright (c) 2026 Mirrowel

"""
Client package for LLM API key rotation.

This package provides the RotatingClient and associated components
for intelligent credential rotation and retry logic.

Public API:
RotatingClient: Main client class for making API requests
StreamedAPIError: Exception for streaming errors

Components (for advanced usage):
RequestExecutor: Unified retry/rotation logic
CredentialFilter: Tier compatibility filtering
ModelResolver: Model name resolution
ProviderTransforms: Provider-specific transforms
StreamingHandler: Streaming response processing
"""

from .rotating_client import RotatingClient
from ..core.errors import StreamedAPIError

# Also expose components for advanced usage
from .executor import RequestExecutor
from .filters import CredentialFilter
from .models import ModelResolver
from .transforms import ProviderTransforms
from .streaming import StreamingHandler
from .anthropic import AnthropicHandler
from .types import AvailabilityStats, RetryState, ExecutionResult

__all__ = [
# Main public API
"RotatingClient",
"StreamedAPIError",
# Components
"RequestExecutor",
"CredentialFilter",
"ModelResolver",
"ProviderTransforms",
"StreamingHandler",
"AnthropicHandler",
# Types
"AvailabilityStats",
"RetryState",
"ExecutionResult",
]
203 changes: 203 additions & 0 deletions src/rotator_library/client/anthropic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# SPDX-License-Identifier: LGPL-3.0-only
# Copyright (c) 2026 Mirrowel

"""
Anthropic API compatibility handler for RotatingClient.

This module provides Anthropic SDK compatibility methods that allow using
Anthropic's Messages API format with the credential rotation system.
"""

import json
import logging
import uuid
from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional

from ..anthropic_compat import (
AnthropicMessagesRequest,
AnthropicCountTokensRequest,
translate_anthropic_request,
openai_to_anthropic_response,
anthropic_streaming_wrapper,
anthropic_to_openai_messages,
anthropic_to_openai_tools,
)
from ..transaction_logger import TransactionLogger

if TYPE_CHECKING:
from .rotating_client import RotatingClient

lib_logger = logging.getLogger("rotator_library")


class AnthropicHandler:
"""
Handler for Anthropic API compatibility methods.

This class provides methods to handle Anthropic Messages API requests
by translating them to OpenAI format, processing through the client's
acompletion method, and converting responses back to Anthropic format.

Example:
handler = AnthropicHandler(client)
response = await handler.messages(request, raw_request)
"""

def __init__(self, client: "RotatingClient"):
"""
Initialize the Anthropic handler.

Args:
client: The RotatingClient instance to use for completions
"""
self._client = client

async def messages(
self,
request: AnthropicMessagesRequest,
raw_request: Optional[Any] = None,
pre_request_callback: Optional[callable] = None,
) -> Any:
"""
Handle Anthropic Messages API requests.

This method accepts requests in Anthropic's format, translates them to
OpenAI format internally, processes them through the existing acompletion
method, and returns responses in Anthropic's format.

Args:
request: An AnthropicMessagesRequest object
raw_request: Optional raw request object for disconnect checks
pre_request_callback: Optional async callback before each API request

Returns:
For non-streaming: dict in Anthropic Messages format
For streaming: AsyncGenerator yielding Anthropic SSE format strings
"""
request_id = f"msg_{uuid.uuid4().hex[:24]}"
original_model = request.model

# Extract provider from model for logging
provider = original_model.split("/")[0] if "/" in original_model else "unknown"

# Create Anthropic transaction logger if request logging is enabled
anthropic_logger = None
if self._client.enable_request_logging:
anthropic_logger = TransactionLogger(
provider,
original_model,
enabled=True,
api_format="ant",
)
# Log original Anthropic request
anthropic_logger.log_request(
request.model_dump(exclude_none=True),
filename="anthropic_request.json",
)

# Translate Anthropic request to OpenAI format
openai_request = translate_anthropic_request(request)

# Pass parent log directory to acompletion for nested logging
if anthropic_logger and anthropic_logger.log_dir:
openai_request["_parent_log_dir"] = anthropic_logger.log_dir

if request.stream:
# Streaming response
response_generator = self._client.acompletion(
request=raw_request,
pre_request_callback=pre_request_callback,
**openai_request,
)

# Create disconnect checker if raw_request provided
is_disconnected = None
if raw_request is not None and hasattr(raw_request, "is_disconnected"):
is_disconnected = raw_request.is_disconnected

# Return the streaming wrapper
# Note: For streaming, the anthropic response logging happens in the wrapper
return anthropic_streaming_wrapper(
openai_stream=response_generator,
original_model=original_model,
request_id=request_id,
is_disconnected=is_disconnected,
transaction_logger=anthropic_logger,
)
else:
# Non-streaming response
response = await self._client.acompletion(
request=raw_request,
pre_request_callback=pre_request_callback,
**openai_request,
)

# Convert OpenAI response to Anthropic format
openai_response = (
response.model_dump()
if hasattr(response, "model_dump")
else dict(response)
)
anthropic_response = openai_to_anthropic_response(
openai_response, original_model
)

# Override the ID with our request ID
anthropic_response["id"] = request_id

# Log Anthropic response
if anthropic_logger:
anthropic_logger.log_response(
anthropic_response,
filename="anthropic_response.json",
)

return anthropic_response

async def count_tokens(
self,
request: AnthropicCountTokensRequest,
) -> dict:
"""
Handle Anthropic count_tokens API requests.

Counts the number of tokens that would be used by a Messages API request.
This is useful for estimating costs and managing context windows.

Args:
request: An AnthropicCountTokensRequest object

Returns:
Dict with input_tokens count in Anthropic format
"""
anthropic_request = request.model_dump(exclude_none=True)

openai_messages = anthropic_to_openai_messages(
anthropic_request.get("messages", []), anthropic_request.get("system")
)

# Count tokens for messages
message_tokens = self._client.token_count(
model=request.model,
messages=openai_messages,
)

# Count tokens for tools if present
tool_tokens = 0
if request.tools:
# Tools add tokens based on their definitions
# Convert to JSON string and count tokens for tool definitions
openai_tools = anthropic_to_openai_tools(
[tool.model_dump() for tool in request.tools]
)
if openai_tools:
# Serialize tools to count their token contribution
tools_text = json.dumps(openai_tools)
tool_tokens = self._client.token_count(
model=request.model,
text=tools_text,
)

total_tokens = message_tokens + tool_tokens

return {"input_tokens": total_tokens}
Loading
Loading