Skip to content

Conversation

@Mirrowel
Copy link
Owner

A big refactor of Client.py and usage_manger.py into a modular and extensible system that was VERY needed, allowing much easier addition of new features and providers, windows, hooks.

This commit executes a major architectural refactor, decomposing the monolithic `client.py` and `usage_manager.py` files into modular, domain-specific packages to improve maintainability and separation of concerns.

- **Client Refactor**: `RotatingClient` is now a lightweight facade delegating to:
  - `RequestExecutor`: Unified retry and rotation logic.
  - `StreamingHandler`: Stream processing and error detection.
  - `CredentialFilter`: Tier and priority compatibility filtering.
  - `ModelResolver`: Model name resolution and whitelisting.
  - `ProviderTransforms`: Provider-specific request mutations.

- **Usage Manager Refactor**: `UsageManager` logic is now distributed across:
  - `TrackingEngine`: Usage recording and window management.
  - `LimitEngine`: Enforcement of cooldowns, caps, and fair cycle limits.
  - `SelectionEngine`: Credential selection strategies (balanced/sequential).
  - `CredentialRegistry`: Stable identity management for credentials.
  - `UsageStorage`: Resilient async JSON persistence.

- **Core Infrastructure**: Added `src/rotator_library/core` for shared types, error definitions, and centralized configuration loading.

BREAKING CHANGE: The files `src/rotator_library/client.py` and `src/rotator_library/usage_manager.py` have been deleted. `client.py` is replaced by the `client` package. `usage_manager.py` is replaced by the `usage` package. Direct imports from `rotator_library.usage_manager` must be updated to `rotator_library.usage` or top-level exports.
…mplementation

- Preserve the original monolithic implementation in `_client_legacy.py` and `_usage_manager_legacy.py`.
- Update `RequestExecutor` to support transaction logging, request sanitization, and consecutive quota failure detection.
- Implement `ConcurrentLimitChecker` in the usage limit engine to enforce `max_concurrent` constraints.
- Improve `StreamingHandler` with robust error buffering for fragmented JSON responses.
- Add fair cycle reset logic and quota baseline synchronization to the new `UsageManager`.
This overhaul introduces smart queuing for credential acquisition and shared quota tracking.

- Implement async waiting in `UsageManager`: calls to `acquire_credential` now block (up to a deadline) using asyncio primitives when keys are busy or on cooldown.
- Add Quota Group synchronization: request counts are now synced across models that share a specific quota pool (e.g., Antigravity variants).
- Add support for cached prompt token tracking in usage statistics.
- Refactor `RequestExecutor` to reuse a shared `httpx.AsyncClient` for better performance.
- Correct token counting for Antigravity models by including preprompt overhead.

BREAKING CHANGE: `UsageManager.acquire_credential` is now `async` and must be awaited. `RequestExecutor` now requires an `http_client` argument during initialization.
- **error_handler**: Implement logic to extract `quotaValue` and `quotaId` from Google/Gemini error responses for better rate limit observability.
- **streaming**: Remove legacy `UsageManager` support and the `__init__` method from `StreamingHandler`; usage recording is now delegated to `CredentialContext`.
- **client**: Handle `_parent_log_dir` internal parameter to configure transaction logger output directory.
This commit introduces a comprehensive overhaul of the usage tracking system to support more complex quota management scenarios and integration capabilities.

- **Granular Usage Scopes**: Usage windows can now be scoped to specific models, quota groups, or credentials via `WindowDefinition.applies_to`, enabling precise limit enforcement (e.g., shared model quotas vs. individual key limits).
- **Cost Calculation**: Integrated `litellm` cost calculation for both standard and streaming requests. `approx_cost` is now tracked and persisted in usage statistics.
- **Provider Hooks**: Added `HookDispatcher` and `on_request_complete` interface, allowing provider plugins to intercept request results, override usage counts, or trigger exhaustion based on custom logic.
- **Usage API**: Introduced `UsageAPI` facade (`src/rotator_library/usage/integration/api.py`) to provide a stable interface for external components to query state and manage cooldowns.
- **Fair Cycle Enhancements**: Refined fair cycle tracking with support for global state persistence and configurable tracking modes (credential-level vs. group-level).
- **Configuration**: Expanded environment variable support for custom caps, fair cycle settings, and sequential fallback multipliers.
- **Persistence**: Updated storage schema to handle nested `model_usage` and `group_usage` statistics.

Also in this commit:
- feat(client): add automatic request validation via provider plugins
- fix(streaming): correctly track cached vs. uncached tokens in usage stats
- refactor: add backward compatibility shim in `src/rotator_library/usage_manager.py
Refines the request executor and client to support deep observability and dynamic quota management within the modular architecture.

- **Observability**:
  - Implement a sanitized LiteLLM logging callback to safely pipe provider logs to the library logger.
  - Capture response headers in `UsageManager`, `CredentialContext`, and failure logs to aid debugging.
  - Pass request headers to the failure logger for better context.
- **Usage Management**:
  - Implement `_apply_usage_reset_config` to dynamically generate rolling window definitions (e.g., daily limits) based on provider settings.
  - Fix `fair_cycle_key` resolution logic in the tracking engine.
- **Client & Executor**:
  - Support passing provider-specific LiteLLM parameters via `RequestExecutor`.
  - Update `BackgroundRefresher` to support the new usage manager registry and prevent crashes when managers are missing.
…ization

This commit overhauls how usage statistics are aggregated and initialized to support shared quotas and improve stability.

- **Quota Groups**: Implemented `_backfill_group_usage` to derive shared group statistics (e.g., tiered limits) from individual model windows. Updated `CooldownChecker` to enforce limits at both model and group levels.
- **Initialization**: Added `initialize_usage_managers` with async locking to `RotatingClient`. Updated `main.py` and `background_refresher.py` to invoke this explicitly, ensuring state is loaded before traffic.
- **Persistence**: Switched storage mechanisms to `ResilientStateWriter` and `safe_read_json` to prevent data corruption during atomic writes.
- **Providers**: Refined quota calculations (calculating `quota_used` from fractions) for Chutes, Firmware, and NanoGPT.
- **Antigravity**: Updated system prompts to strictly enforce parallel tool calling behavior.
- **Logging**: Implemented batched logging for quota exhaustion events to reduce noise.
…t execution

This commit introduces comprehensive usage tracking and refactors the client execution flow for better observability and stability.

- Refactor `RotatingClient.acompletion` to be explicitly `async`, ensuring proper execution in `proxy_app`.
- Implement detailed usage summaries in `get_usage_stats`, including token caching percentages, approximate costs, and detailed provider states.
- Add granular logging in `RequestExecutor` to trace credential availability (displaying blocks by cooldowns, fair cycle, or caps) and current quota window saturation.
- Introduce debounced state saving in `UsageManager` to optimize storage I/O and add logic for backfilling model usage data.

BREAKING CHANGE: `RotatingClient.acompletion` is now an `async` function and must be awaited by the caller.
…window manager

- Update `RequestExecutor` to await `usage_manager.get_availability_stats`, ensuring non-blocking execution during availability checks.
- Expose `window_manager` directly as a property on `UsageManager`.
- Refactor `RequestExecutor` to access `window_manager` directly instead of traversing via `limits.windows`.
Expanded the usage tracking system to capture and persist detailed token metrics, specifically reasoning (thinking) tokens and cache creation tokens.

- Updated client executors to extract `reasoning_tokens` and `cache_creation_tokens` from provider responses.
- Extended `UsageStats` and `WindowStats` models to store granular token breakdowns and explicit success/failure counts.
- Adapted storage and aggregation logic to persist and calculate these new metrics across quota windows.
…tions

- Archive the existing `RotatingClient` and `UsageManager` logic into new `_legacy` modules (`_client_legacy.py` and `_usage_manager_legacy.py`). This preserves the stable baseline implementation to facilitate reference and comparison during the ongoing core architecture refactor.
- Add `src/rotator_library/providers/example_provider.py` as a comprehensive reference template. This file documents the standard patterns for implementing providers with advanced usage management, quota groups, custom token counting, and background refresh jobs.
…ntation

This introduces a thread-safe mechanism using `contextvars` to accurately track and report internal API retries within providers.

- Implement `ContextVar` retry counting in `AntigravityProvider` to capture hidden API costs from internal retries (e.g., on empty responses or malformed calls).
- Update `ExampleProvider` with comprehensive patterns for custom usage handling, including retry counting and error-based cooldowns.
- Expand documentation for `UsageAPI` and `HookDispatcher` with detailed usage examples and architectural context.
Updates the usage manager to prevent lower (stale) API usage values from overwriting higher (current) local counters during background synchronizations. API providers often return cached data or update in increments, causing local state regression.

- Implement `max(local, api)` logic for request counts to ensure monotonic growth
- Add `force` parameter to `UsageManager` and quota trackers to allow manual overrides
- Preserve accurate local tracking while allowing forced resets
This introduces a compatibility layer that allows the `RotatingClient` to accept Anthropic-format requests, translating them to the internal OpenAI format for processing and converting responses back.

- Implement `AnthropicHandler` with support for `messages` and `count_tokens`.
- Integrate handler into `RotatingClient` to enable direct Anthropic SDK usage.

Also in this commit:
- feat(usage): add `force_refresh_quota` and `reload_usage_from_disk` for manual state management
- refactor(usage): implement `reload_from_disk` to sync local state without external calls
@Mirrowel Mirrowel self-assigned this Jan 22, 2026
@Mirrowel Mirrowel added enhancement New feature or request Agent Monitored Monitored for AI Agent to review PR's and commits Priority labels Jan 22, 2026
@mirrobot-agent
Copy link
Contributor

I'm starting a comprehensive review of this major core refactor. With 62 files changed and a significant shift towards modularity in Client.py and usage_manager.py, I'll be taking a deep dive into the new architecture. Expect my full feedback shortly.

Copy link
Contributor

@mirrobot-agent mirrobot-agent bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall Assessment

This is a phenomenal refactor of the library core. Moving from the monolithic Client.py and usage_manager.py to a modular, engine-based architecture is a massive leap forward for the project's maintainability and extensibility. The new design is clean, robustly handles complex logic (like fair cycles and shared quota pools), and provides a much more intuitive API through the UsageManager and RotatingClient facades.

The code quality is exceptionally high, with great attention to detail in concurrency control, persistence resilience, and schema migrations.

Architectural Feedback

  • Modularity: The separation of concerns into SelectionEngine, LimitEngine, TrackingEngine, and WindowManager is excellent. It makes the logic much easier to reason about and test.
  • Credential Registry: The introduction of stable IDs based on OAuth metadata or API key hashes is a clever solution to the problem of tracking usage across path changes.
  • Context Manager Pattern: Using CredentialContext for credential acquisition and release ensures that resources are handled correctly even in the face of exceptions.

Key Suggestions

  • Shared Setup in Executor: _execute_non_streaming and _execute_streaming share a lot of initialization and filtering logic. Consolidating this into a helper would reduce duplication.
  • Logging Noise: Consider lowering the log level for credential releases in UsageManager to avoid flooding logs in high-traffic scenarios.
  • Engine Redundancy: The empty release method in TrackingEngine could be removed if it's not intended to hold logic, avoiding confusion about where concurrency state is managed.

Nitpicks and Minor Points

  • I noticed the StreamingHandler is now very cleanly integrated with the new context manager pattern.
  • The migration logic in UsageStorage for v1 -> v2 schema looks solid and handles the legacy path-based keys gracefully.

Questions for the Author

None. The documentation and implementation are very clear.

This review was generated by an AI assistant.

Comment on lines 357 to 370
async def release(
self,
state: CredentialState,
model: str,
) -> None:
"""
Release a credential after request completes.
Args:
state: Credential state
model: Model that was used
"""
async with self._lock:
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is currently a no-op. While the actual release logic (decrementing active_requests and notifying conditions) is correctly handled in UsageManager._release_credential, keeping an empty method here might be confusing for future maintainers. Consider either implementing a hook here if needed or removing it if the TrackingEngine shouldn"t be concerned with concurrency state.

Comment on lines +1716 to +1727
lib_logger.info(
f"Released credential {self._mask_accessor(state.accessor)} "
f"from {model} (remaining concurrent: {remaining}"
f"{f'/{state.max_concurrent}' if state.max_concurrent else ''})"
)
else:
state.active_requests = max(0, state.active_requests - 1)
lib_logger.info(
f"Released credential {self._mask_accessor(state.accessor)} "
f"from {model} (remaining concurrent: {state.active_requests}"
f"{f'/{state.max_concurrent}' if state.max_concurrent else ''})"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In high-concurrency environments, logging every credential release might generate excessive noise in the logs. Consider changing this to DEBUG level or adding a conditional check if detailed concurrency tracking is needed.

Comment on lines 163 to 180
# Get the UsageManager for this provider
usage_manager = self._usage_managers.get(provider)
if not usage_manager:
raise NoAvailableKeysError(f"No UsageManager for provider {provider}")

# Filter credentials by tier
filter_result = self._filter.filter_by_tier(
context.credentials, model, provider
)
credentials = filter_result.all_usable

quota_group = usage_manager.get_model_quota_group(model)

await self._ensure_initialized(usage_manager, context, filter_result)
await self._validate_request(provider, model, context.kwargs)

if not credentials:
raise NoAvailableKeysError(f"No compatible credentials for model {model}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a significant amount of duplicated setup logic between _execute_non_streaming and _execute_streaming (filtering, initialization, validation). While this is already a huge improvement over the legacy code, moving this shared setup into a private helper method would further improve maintainability.

Repository owner deleted a comment from ellipsis-dev bot Jan 22, 2026
…counts

- Extract `_aggregate_model_windows` to unify how usage stats are summed across quota groups and credentials, reducing code duplication.
- Implement `_reconcile_window_counts` to ensure success and failure counters remain consistent (mathematically valid) when total request counts are updated from external quota sources.
- Enable synchronization and backfilling of credential-level windows to reflect aggregated model usage.
- Simplify `update_usage` and `_backfill_group_usage` logic by leveraging the new shared aggregation helpers.
This commit extracts the initialization, credential filtering, and validation steps into a reusable `_prepare_execution` helper method within `RequestExecutor`. This ensures consistency and reduces code duplication between streaming and non-streaming request handlers.

Also in this commit:
- refactor(usage): remove unused `release` method from `TrackingEngine
Copy link
Contributor

@mirrobot-agent mirrobot-agent bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Review

This update makes great strides in refining the core refactor. The architectural improvements in request execution and the more robust usage aggregation logic are excellent additions.

Assessment of New Changes:

  • Shared Setup: The consolidation in RequestExecutor is well-implemented and addresses the previous feedback perfectly.
  • Engine Cleanup: The removal of the redundant release method in TrackingEngine simplifies the tracking flow.
  • Improved Aggregation: The new _aggregate_model_windows and _reconcile_window_counts logic in UsageManager provides a much more resilient way to handle shared quota groups and out-of-sync external metrics. It's a significant improvement over the previous model-syncing approach.

Overall Status:
The PR is in excellent shape. The only remaining item is the logging level adjustment for credential releases, which I've flagged again to ensure it doesn't get missed. Once that's addressed, I'm fully confident in this refactor.

This review was generated by an AI assistant.

async with lock:
state.active_requests = max(0, state.active_requests - 1)
remaining = state.active_requests
lib_logger.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested in the previous review, this log level should be changed to DEBUG. In high-concurrency scenarios, logging every credential release at INFO level creates significant noise.

else:
return await self._execute_non_streaming(context)

async def _prepare_execution(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done on consolidating the shared setup logic. The new _prepare_execution helper cleanly handles the duplicated filtering and initialization steps for both streaming and non-streaming paths.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Agent Monitored Monitored for AI Agent to review PR's and commits enhancement New feature or request Priority

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants