Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from cache.cache_factory import CacheFactory

from quota.quota_limiter import QuotaLimiter
from quota.token_usage_history import TokenUsageHistory
from quota.quota_limiter_factory import QuotaLimiterFactory

logger = logging.getLogger(__name__)
Expand All @@ -52,6 +53,7 @@ def __init__(self) -> None:
self._configuration: Optional[Configuration] = None
self._conversation_cache: Optional[Cache] = None
self._quota_limiters: list[QuotaLimiter] = []
self._token_usage_history: Optional[TokenUsageHistory] = None

def load_configuration(self, filename: str) -> None:
"""Load configuration from YAML file."""
Expand Down Expand Up @@ -180,5 +182,32 @@ def quota_limiters(self) -> list[QuotaLimiter]:
)
return self._quota_limiters

@property
def token_usage_history(self) -> Optional[TokenUsageHistory]:
"""
Provide the token usage history object for the application.

If token history is enabled in the loaded quota handlers configuration,
creates and caches a TokenUsageHistory instance and returns it. If
token history is disabled, returns None.

Returns:
TokenUsageHistory | None: The cached TokenUsageHistory instance
when enabled, otherwise `None`.

Raises:
LogicError: If the configuration has not been loaded.
"""
if self._configuration is None:
raise LogicError("logic error: configuration is not loaded")
if (
self._token_usage_history is None
and self._configuration.quota_handlers.enable_token_history
):
self._token_usage_history = TokenUsageHistory(
self._configuration.quota_handlers
)
return self._token_usage_history


configuration: AppConfig = AppConfig()
2 changes: 1 addition & 1 deletion src/quota/quota_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def connect(self) -> None:
self.connection.autocommit = True

def connected(self) -> bool:
"""Check if connection to cache is alive."""
"""Check if connection to quota limiter database is alive."""
if self.connection is None:
logger.warning("Not connected, need to reconnect later")
return False
Expand Down
43 changes: 43 additions & 0 deletions src/quota/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,46 @@
SET available=available+?, updated_at=?
WHERE id=? AND subject=?
"""

CREATE_TOKEN_USAGE_TABLE = """
CREATE TABLE IF NOT EXISTS token_usage (
user_id text NOT NULL,
provider text NOT NULL,
model text NOT NULL,
input_tokens int,
output_tokens int,
updated_at timestamp with time zone,
PRIMARY KEY(user_id, provider, model)
);
""" # noqa: S105

INIT_TOKEN_USAGE_FOR_USER = """
INSERT INTO token_usage (user_id, provider, model, input_tokens, output_tokens, updated_at)
VALUES (%s, %s, %s, 0, 0, %s)
""" # noqa: S105

CONSUME_TOKENS_FOR_USER_SQLITE = """
INSERT INTO token_usage (user_id, provider, model, input_tokens, output_tokens, updated_at)
VALUES (:user_id, :provider, :model, :input_tokens, :output_tokens, :updated_at)
ON CONFLICT (user_id, provider, model)
DO UPDATE
SET input_tokens=token_usage.input_tokens+:input_tokens,
output_tokens=token_usage.output_tokens+:output_tokens,
updated_at=:updated_at
WHERE token_usage.user_id=:user_id
AND token_usage.provider=:provider
AND token_usage.model=:model
""" # noqa: E501

CONSUME_TOKENS_FOR_USER_PG = """
INSERT INTO token_usage (user_id, provider, model, input_tokens, output_tokens, updated_at)
VALUES (%(user_id)s, %(provider)s, %(model)s, %(input_tokens)s, %(output_tokens)s, %(updated_at)s)
ON CONFLICT (user_id, provider, model)
DO UPDATE
SET input_tokens=token_usage.input_tokens+%(input_tokens)s,
output_tokens=token_usage.output_tokens+%(output_tokens)s,
updated_at=%(updated_at)s
WHERE token_usage.user_id=%(user_id)s
AND token_usage.provider=%(provider)s
AND token_usage.model=%(model)s
""" # noqa: E501
191 changes: 191 additions & 0 deletions src/quota/token_usage_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""Class with implementation of storage for token usage history.
One table named `token_usage` is used to store statistic about token usage
history. Input and output token count are stored for each triple (user_id,
provider, model). This triple is also used as a primary key to this table.
"""

import sqlite3
from datetime import datetime
import psycopg2

from log import get_logger

from quota.connect_pg import connect_pg
from quota.connect_sqlite import connect_sqlite
from quota.sql import (
CREATE_TOKEN_USAGE_TABLE,
CONSUME_TOKENS_FOR_USER_PG,
CONSUME_TOKENS_FOR_USER_SQLITE,
)

from models.config import QuotaHandlersConfiguration
from utils.connection_decorator import connection

logger = get_logger(__name__)


class TokenUsageHistory:
"""Class with implementation of storage for token usage history."""

def __init__(self, configuration: QuotaHandlersConfiguration) -> None:
"""Initialize token usage history storage.
Initialize TokenUsageHistory with the provided configuration and
establish a database connection.
Stores SQLite and PostgreSQL connection settings for later reconnection
attempts, initializes the internal connection state, and opens the
database connection.
Parameters:
configuration (QuotaHandlersConfiguration): Configuration
containing `sqlite` and `postgres` connection settings.
"""
# store the configuration, it will be used
# by reconnection logic later, if needed
self.sqlite_connection_config = configuration.sqlite
self.postgres_connection_config = configuration.postgres
self.connection = None
Comment on lines +47 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add type annotations for class attributes.

The coding guidelines require complete type annotations for all class attributes. Add type hints for sqlite_connection_config, postgres_connection_config, and connection.

As per coding guidelines, all class attributes must include type annotations.

🔎 Proposed fix
+from typing import Any
+from models.config import SQLiteDatabaseConfiguration, PostgreSQLDatabaseConfiguration
+
 class TokenUsageHistory:
     """Class with implementation of storage for token usage history."""
 
     def __init__(self, configuration: QuotaHandlersConfiguration) -> None:
         """Initialize token usage history storage.
 
         Initialize TokenUsageHistory with the provided configuration and
         establish a database connection.
 
         Stores SQLite and PostgreSQL connection settings for later reconnection
         attempts, initializes the internal connection state, and opens the
         database connection.
 
         Parameters:
             configuration (QuotaHandlersConfiguration): Configuration
             containing `sqlite` and `postgres` connection settings.
         """
         # store the configuration, it will be used
         # by reconnection logic later, if needed
-        self.sqlite_connection_config = configuration.sqlite
-        self.postgres_connection_config = configuration.postgres
-        self.connection = None
+        self.sqlite_connection_config: SQLiteDatabaseConfiguration | None = configuration.sqlite
+        self.postgres_connection_config: PostgreSQLDatabaseConfiguration | None = configuration.postgres
+        self.connection: Any | None = None

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/quota/token_usage_history.py around lines 47-49, the three class
attributes lack type annotations; add explicit type hints for
sqlite_connection_config, postgres_connection_config, and connection. Import
typing (e.g., from typing import Optional, Any) and annotate
sqlite_connection_config and postgres_connection_config with the concrete config
types from your configuration module (e.g., configuration.SQLiteConfig /
configuration.PostgresConfig) or use Any if those types are not exported, and
annotate connection as Optional[...] with the appropriate DB connection type
(e.g., sqlite3.Connection / asyncpg.Connection) or Optional[Any] if unknown;
ensure the annotations are added at the class attribute level (not just in
__init__) so static type checkers pick them up.


# initialize connection to DB
self.connect()

# pylint: disable=W0201
def connect(self) -> None:
"""Initialize connection to database.
Establish a database connection for token usage history and ensure required tables exist.
Selects PostgreSQL if its configuration is present, otherwise uses
SQLite; initializes the token_usage table, enables autocommit on the
connection, and ensures the connection is closed and the exception is
re-raised if table initialization fails.
Raises:
ValueError: If neither PostgreSQL nor SQLite configuration is provided.
"""
logger.info("Initializing connection to quota usage history database")
if self.postgres_connection_config is not None:
self.connection = connect_pg(self.postgres_connection_config)
if self.sqlite_connection_config is not None:
self.connection = connect_sqlite(self.sqlite_connection_config)
if self.connection is None:
return
Comment on lines +69 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

SQLite configuration silently overrides PostgreSQL when both are present.

If both postgres_connection_config and sqlite_connection_config are set, the SQLite connection will overwrite the PostgreSQL connection without any warning. Consider using elif to enforce mutual exclusivity or logging a warning when both are configured.

🔎 Proposed fix using elif
         logger.info("Initializing connection to quota usage history database")
         if self.postgres_connection_config is not None:
             self.connection = connect_pg(self.postgres_connection_config)
-        if self.sqlite_connection_config is not None:
+        elif self.sqlite_connection_config is not None:
             self.connection = connect_sqlite(self.sqlite_connection_config)
         if self.connection is None:
             return

Note: The same pattern exists in QuotaLimiter.connect() (lines 90-93 in quota_limiter.py). Consider addressing both for consistency.

🤖 Prompt for AI Agents
In src/quota/token_usage_history.py around lines 69 to 74 the current logic sets
self.connection to Postgres then immediately to SQLite if both configs exist,
silently overwriting Postgres; change the second conditional to an elif so only
one connection is chosen (or explicitly detect both and log/raise) — update to
use elif to enforce mutual exclusivity and add a warning log if both configs are
provided; apply the same fix in src/quota/quota_limiter.py at lines ~90-93 for
consistency.

Comment on lines +65 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Docstring claims ValueError is raised, but implementation returns silently.

The docstring states "Raises: ValueError: If neither PostgreSQL nor SQLite configuration is provided," but the code returns early without raising any exception when self.connection is None. Either update the docstring or add the ValueError.

🔎 Proposed fix to match docstring
         if self.postgres_connection_config is not None:
             self.connection = connect_pg(self.postgres_connection_config)
         if self.sqlite_connection_config is not None:
             self.connection = connect_sqlite(self.sqlite_connection_config)
         if self.connection is None:
-            return
+            raise ValueError("Neither PostgreSQL nor SQLite configuration is provided")

Or update the docstring to reflect the current behavior:

-        Raises:
-            ValueError: If neither PostgreSQL nor SQLite configuration is provided.
+        Note:
+            Returns early without establishing a connection if neither PostgreSQL
+            nor SQLite configuration is provided.
🤖 Prompt for AI Agents
In src/quota/token_usage_history.py around lines 65-74, the docstring says a
ValueError is raised when neither PostgreSQL nor SQLite config is provided, but
the code currently just returns when self.connection is None; update the
implementation to raise a ValueError with a clear message (e.g., "No database
configuration provided for quota usage history") instead of returning, so
behavior matches the docstring.


try:
self._initialize_tables()
except Exception as e:
self.connection.close()
logger.exception("Error initializing quota usage history database:\n%s", e)
raise

self.connection.autocommit = True

@connection
def consume_tokens( # pylint: disable=too-many-arguments,too-many-positional-arguments
self,
user_id: str,
provider: str,
model: str,
input_tokens: int,
output_tokens: int,
) -> None:
"""Consume tokens by given user.
Record token usage for a specific user/provider/model triple in persistent storage.
Parameters:
user_id (str): Identifier of the user whose token usage will be updated.
provider (str): Provider name associated with the usage (e.g., "openai").
model (str): Model name associated with the usage (e.g., "gpt-4").
input_tokens (int): Number of input tokens to add to the stored usage.
output_tokens (int): Number of output tokens to add to the stored usage.
Raises:
ValueError: If no database backend configuration (Postgres or SQLite) is available.
"""
logger.info(
"Token usage for user %s, provider %s and model %s changed by %d, %d tokens",
user_id,
provider,
model,
input_tokens,
output_tokens,
)
query_statement: str = ""
if self.postgres_connection_config is not None:
query_statement = CONSUME_TOKENS_FOR_USER_PG
if self.sqlite_connection_config is not None:
query_statement = CONSUME_TOKENS_FOR_USER_SQLITE
Comment on lines +117 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same precedence issue as in connect(): SQLite overrides PostgreSQL.

The query statement selection has the same issue where SQLite config overwrites PostgreSQL config if both are present. Use elif for consistency with the recommended fix in connect().

🔎 Proposed fix
         query_statement: str = ""
         if self.postgres_connection_config is not None:
             query_statement = CONSUME_TOKENS_FOR_USER_PG
-        if self.sqlite_connection_config is not None:
+        elif self.sqlite_connection_config is not None:
             query_statement = CONSUME_TOKENS_FOR_USER_SQLITE
🤖 Prompt for AI Agents
In src/quota/token_usage_history.py around lines 117 to 120, the selection of
query_statement uses two separate ifs so the SQLite branch can overwrite the
PostgreSQL branch when both configs are present; change the second conditional
to elif so PostgreSQL takes precedence (matching the connect() fix) and ensure
query_statement is set only once based on that precedence.


# check if the connection was established
if self.connection is None:
logger.warning("Not connected, need to reconnect later")
return

# timestamp to be used
updated_at = datetime.now()

# it is not possible to use context manager there, because SQLite does
# not support it
cursor = self.connection.cursor()
cursor.execute(
query_statement,
{
"user_id": user_id,
"provider": provider,
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"updated_at": updated_at,
},
)
cursor.close()

def connected(self) -> bool:
"""Check if connection to quota usage history database is alive.
Verify that the storage connection for token usage history is alive.
Returns:
`true` if the database connection is present and responds to a
simple query, `false` otherwise.
"""
if self.connection is None:
logger.warning("Not connected, need to reconnect later")
return False
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
logger.info("Connection to storage is ok")
return True
except (psycopg2.OperationalError, sqlite3.Error) as e:
logger.error("Disconnected from storage: %s", e)
return False
finally:
if cursor is not None:
try:
cursor.close()
except Exception: # pylint: disable=broad-exception-caught
logger.warning("Unable to close cursor")

def _initialize_tables(self) -> None:
"""Initialize tables used by quota limiter.
Ensure the token_usage table exists in the configured database.
Creates the table required to store per-user token usage history (input/output tokens per
user_id, provider, model) and commits the change to the database.
"""
# check if the connection was established
if self.connection is None:
logger.warning("Not connected, need to reconnect later")
return

logger.info("Initializing tables for token usage history")
cursor = self.connection.cursor()
cursor.execute(CREATE_TOKEN_USAGE_TABLE)
cursor.close()
self.connection.commit()
Loading