Skip to content

Commit c0e0d6c

Browse files
authored
Merge pull request #684 from tisnik/lcore-741-quota-limiter-scheduler
LCORE-741: quota limiter scheduler
2 parents 6feb4e3 + cb0ebdf commit c0e0d6c

File tree

3 files changed

+310
-0
lines changed

3 files changed

+310
-0
lines changed

src/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,7 @@
144144

145145
# Default embedding vector dimension for the sentence transformer model
146146
DEFAULT_EMBEDDING_DIMENSION = 768
147+
148+
# quota limiters constants
149+
USER_QUOTA_LIMITER = "user_limiter"
150+
CLUSTER_QUOTA_LIMITER = "cluster_limiter"

src/lightspeed_stack.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from configuration import configuration
1515
from llama_stack_configuration import generate_configuration
1616
from runners.uvicorn import start_uvicorn
17+
from runners.quota_scheduler import start_quota_scheduler
1718

1819
FORMAT = "%(message)s"
1920
logging.basicConfig(
@@ -120,6 +121,8 @@ def main() -> None:
120121
# (step is needed because process context isn’t shared).
121122
os.environ["LIGHTSPEED_STACK_CONFIG_PATH"] = args.config_file
122123

124+
# start the runners
125+
start_quota_scheduler(configuration.configuration)
123126
# if every previous steps don't fail, start the service on specified port
124127
start_uvicorn(configuration.service_configuration)
125128
logger.info("Lightspeed Core Stack finished")

src/runners/quota_scheduler.py

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
"""User and cluster quota scheduler runner."""
2+
3+
from typing import Any
4+
from threading import Thread
5+
from time import sleep
6+
7+
import sqlite3
8+
import psycopg2
9+
10+
import constants
11+
from log import get_logger
12+
from models.config import (
13+
Configuration,
14+
QuotaHandlersConfiguration,
15+
QuotaLimiterConfiguration,
16+
PostgreSQLDatabaseConfiguration,
17+
SQLiteDatabaseConfiguration,
18+
)
19+
20+
logger = get_logger(__name__)
21+
22+
23+
CREATE_QUOTA_TABLE = """
24+
CREATE TABLE IF NOT EXISTS quota_limits (
25+
id text NOT NULL,
26+
subject char(1) NOT NULL,
27+
quota_limit int NOT NULL,
28+
available int,
29+
updated_at timestamp with time zone,
30+
revoked_at timestamp with time zone,
31+
PRIMARY KEY(id, subject)
32+
);
33+
"""
34+
35+
36+
INCREASE_QUOTA_STATEMENT_PG = """
37+
UPDATE quota_limits
38+
SET available=available+%s, revoked_at=NOW()
39+
WHERE subject=%s
40+
AND revoked_at < NOW() - INTERVAL %s ;
41+
"""
42+
43+
44+
INCREASE_QUOTA_STATEMENT_SQLITE = """
45+
UPDATE quota_limits
46+
SET available=available+?, revoked_at=datetime('now')
47+
WHERE subject=?
48+
AND revoked_at < datetime('now', ?);
49+
"""
50+
51+
52+
RESET_QUOTA_STATEMENT_PG = """
53+
UPDATE quota_limits
54+
SET available=%s, revoked_at=NOW()
55+
WHERE subject=%s
56+
AND revoked_at < NOW() - INTERVAL %s ;
57+
"""
58+
59+
60+
RESET_QUOTA_STATEMENT_SQLITE = """
61+
UPDATE quota_limits
62+
SET available=?, revoked_at=datetime('now')
63+
WHERE subject=?
64+
AND revoked_at < datetime('now', ?);
65+
"""
66+
67+
68+
def quota_scheduler(config: QuotaHandlersConfiguration) -> bool:
69+
"""Quota scheduler task."""
70+
if config is None:
71+
logger.warning("Quota limiters are not configured, skipping")
72+
return False
73+
74+
if config.sqlite is None and config.postgres is None:
75+
logger.warning("Storage for quota limiter is not set, skipping")
76+
return False
77+
78+
if len(config.limiters) == 0:
79+
logger.warning("No limiters are setup, skipping")
80+
return False
81+
82+
connection = connect(config)
83+
if connection is None:
84+
logger.warning("Can not connect to database, skipping")
85+
return False
86+
87+
init_tables(connection)
88+
period = config.scheduler.period
89+
90+
increase_quota_statement = get_increase_quota_statement(config)
91+
reset_quota_statement = get_reset_quota_statement(config)
92+
93+
logger.info(
94+
"Quota scheduler started in separated thread with period set to %d seconds",
95+
period,
96+
)
97+
98+
while True:
99+
logger.info("Quota scheduler sync started")
100+
for limiter in config.limiters:
101+
try:
102+
quota_revocation(
103+
connection, limiter, increase_quota_statement, reset_quota_statement
104+
)
105+
except Exception as e: # pylint: disable=broad-exception-caught
106+
logger.error("Quota revoke error: %s", e)
107+
logger.info("Quota scheduler sync finished")
108+
sleep(period)
109+
# unreachable code
110+
connection.close()
111+
return True
112+
113+
114+
def get_increase_quota_statement(config: QuotaHandlersConfiguration) -> str:
115+
"""Get the SQL statement to increase quota."""
116+
if config.sqlite is not None:
117+
return INCREASE_QUOTA_STATEMENT_SQLITE
118+
return INCREASE_QUOTA_STATEMENT_PG
119+
120+
121+
def get_reset_quota_statement(config: QuotaHandlersConfiguration) -> str:
122+
"""Get the SQL statement to reset quota."""
123+
if config.sqlite is not None:
124+
return RESET_QUOTA_STATEMENT_SQLITE
125+
return RESET_QUOTA_STATEMENT_PG
126+
127+
128+
def quota_revocation(
129+
connection: Any,
130+
quota_limiter: QuotaLimiterConfiguration,
131+
increase_quota_statement: str,
132+
reset_quota_statement: str,
133+
) -> None:
134+
"""Quota revocation mechanism."""
135+
logger.info(
136+
"Quota revocation mechanism for limiter '%s' of type '%s'",
137+
quota_limiter.name,
138+
quota_limiter.type,
139+
)
140+
141+
if quota_limiter.type is None:
142+
raise ValueError("Limiter type not set, skipping revocation")
143+
144+
if quota_limiter.period is None:
145+
raise ValueError("Limiter period not set, skipping revocation")
146+
147+
subject_id = get_subject_id(quota_limiter.type)
148+
149+
if quota_limiter.quota_increase is not None:
150+
increase_quota(
151+
connection,
152+
increase_quota_statement,
153+
subject_id,
154+
quota_limiter.quota_increase,
155+
quota_limiter.period,
156+
)
157+
158+
if quota_limiter.initial_quota is not None and quota_limiter.initial_quota > 0:
159+
reset_quota(
160+
connection,
161+
reset_quota_statement,
162+
subject_id,
163+
quota_limiter.initial_quota,
164+
quota_limiter.period,
165+
)
166+
167+
168+
def increase_quota(
169+
connection: Any,
170+
update_statement: str,
171+
subject_id: str,
172+
increase_by: int,
173+
period: str,
174+
) -> None:
175+
"""Increase quota by specified amount."""
176+
logger.info(
177+
"Increasing quota for subject '%s' by %d when period %s is reached",
178+
subject_id,
179+
increase_by,
180+
period,
181+
)
182+
183+
# for compatibility with SQLite it is not possible to use context manager
184+
# there
185+
cursor = connection.cursor()
186+
cursor.execute(
187+
update_statement,
188+
(
189+
increase_by,
190+
subject_id,
191+
period,
192+
),
193+
)
194+
cursor.close()
195+
connection.commit()
196+
logger.info("Changed %d rows in database", cursor.rowcount)
197+
198+
199+
def reset_quota(
200+
connection: Any,
201+
update_statement: str,
202+
subject_id: str,
203+
reset_to: int,
204+
period: str,
205+
) -> None:
206+
"""Reset quota to specified amount."""
207+
logger.info(
208+
"Resetting quota for subject '%s' to %d when period %s is reached",
209+
subject_id,
210+
reset_to,
211+
period,
212+
)
213+
214+
# for compatibility with SQLite it is not possible to use context manager
215+
# there
216+
cursor = connection.cursor()
217+
cursor.execute(
218+
update_statement,
219+
(
220+
reset_to,
221+
subject_id,
222+
period,
223+
),
224+
)
225+
cursor.close()
226+
connection.commit()
227+
logger.info("Changed %d rows in database", cursor.rowcount)
228+
229+
230+
def get_subject_id(limiter_type: str) -> str:
231+
"""Get subject ID based on quota limiter type."""
232+
match limiter_type:
233+
case constants.USER_QUOTA_LIMITER:
234+
return "u"
235+
case constants.CLUSTER_QUOTA_LIMITER:
236+
return "c"
237+
case _:
238+
return "?"
239+
240+
241+
def connect(config: QuotaHandlersConfiguration) -> Any:
242+
"""Initialize connection to database."""
243+
logger.info("Initializing connection to quota limiter database")
244+
if config.postgres is not None:
245+
return connect_pg(config.postgres)
246+
if config.sqlite is not None:
247+
return connect_sqlite(config.sqlite)
248+
return None
249+
250+
251+
def connect_pg(config: PostgreSQLDatabaseConfiguration) -> Any:
252+
"""Initialize connection to PostgreSQL database."""
253+
logger.info("Connecting to PostgreSQL storage")
254+
connection = psycopg2.connect(
255+
host=config.host,
256+
port=config.port,
257+
user=config.user,
258+
password=config.password.get_secret_value(),
259+
dbname=config.db,
260+
sslmode=config.ssl_mode,
261+
# sslrootcert=config.ca_cert_path,
262+
gssencmode=config.gss_encmode,
263+
)
264+
if connection is not None:
265+
connection.autocommit = True
266+
return connection
267+
268+
269+
def connect_sqlite(config: SQLiteDatabaseConfiguration) -> Any:
270+
"""Initialize connection to database."""
271+
logger.info("Connecting to SQLite storage")
272+
# make sure the connection will have known state
273+
# even if SQLite is not alive
274+
connection = None
275+
try:
276+
connection = sqlite3.connect(database=config.db_path)
277+
except sqlite3.Error as e:
278+
if connection is not None:
279+
connection.close()
280+
logger.exception("Error initializing SQLite cache:\n%s", e)
281+
raise
282+
connection.autocommit = True
283+
return connection
284+
285+
286+
def init_tables(connection: Any) -> None:
287+
"""Initialize tables used by quota limiter."""
288+
logger.info("Initializing tables for quota limiter")
289+
cursor = connection.cursor()
290+
cursor.execute(CREATE_QUOTA_TABLE)
291+
cursor.close()
292+
connection.commit()
293+
294+
295+
def start_quota_scheduler(configuration: Configuration) -> None:
296+
"""Start user and cluster quota scheduler in separate thread."""
297+
logger.info("Starting quota scheduler")
298+
thread = Thread(
299+
target=quota_scheduler,
300+
daemon=True,
301+
args=(configuration.quota_handlers,),
302+
)
303+
thread.start()

0 commit comments

Comments
 (0)