Skip to content

Commit

Permalink
fix: sleep in cache for early requests
Browse files Browse the repository at this point in the history
  • Loading branch information
robcxyz committed May 17, 2024
1 parent 361ad4d commit 88b3777
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 63 deletions.
10 changes: 10 additions & 0 deletions balanced_backend/api/v1/endpoints/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from asyncio import sleep

from balanced_backend.cache.cache import cache
from balanced_backend.log import logger


async def sleep_while_empty_cache(cache_item):
while len(getattr(cache, cache_item)) == 0:
logger.info(f"Early cache request {cache_item}")
await sleep(1)
43 changes: 18 additions & 25 deletions balanced_backend/api/v1/endpoints/cmc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import TYPE_CHECKING
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException

from balanced_backend.db import session_factory
from balanced_backend.api.v1.endpoints._utils import sleep_while_empty_cache
from balanced_backend.cache.cache import cache
from balanced_backend.cache import cmc

if TYPE_CHECKING:
from balanced_backend.models.cmc import (
Expand All @@ -18,45 +17,39 @@

@router.get("/summary")
async def get_cmc_summary() -> list['SummaryCMC']:
output = cache.cmc_summary
if len(output) == 0:
with session_factory() as session:
cmc.update_cmc_summary(session=session)
output = cache.cmc_summary
return output
await sleep_while_empty_cache('cmc_summary')
return cache.cmc_summary


@router.get("/ticker")
async def get_cmc_ticker() -> dict[str, 'TickerCMC']:
output = cache.cmc_tickers
if len(output) == 0:
with session_factory() as session:
cmc.update_cmc_tickers(session=session)
output = cache.cmc_tickers
return output
await sleep_while_empty_cache('cmc_tickers')
return cache.cmc_tickers


@router.get("/orderbook/{market_pair}")
async def get_cmc_orderbook(
market_pair: str,
) -> dict[str, 'OrderBookCMC']:
await sleep_while_empty_cache('cmc_orderbook')
try:
output = cache.cmc_orderbook[market_pair]
return cache.cmc_orderbook[market_pair]
except KeyError:
with session_factory() as session:
cmc.update_cmc_order_book(session=session)
output = cache.cmc_orderbook[market_pair]
return output
raise HTTPException(
status_code=204,
detail=f"market_pair not found - check /summary for available pairs."
)


@router.get("/trades/{market_pair}")
async def get_cmc_trades(
market_pair: str,
) -> dict[str, 'TradeCMC']:
await sleep_while_empty_cache('cmc_trades')
try:
output = cache.cmc_trades[market_pair]
return cache.cmc_trades[market_pair]
except KeyError:
with session_factory() as session:
cmc.update_cmc_trades(session=session)
output = cache.cmc_trades[market_pair]
return output
raise HTTPException(
status_code=204,
detail=f"market_pair not found - check /summary for available pairs."
)
46 changes: 19 additions & 27 deletions balanced_backend/api/v1/endpoints/coingecko.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import TYPE_CHECKING
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException

from balanced_backend.db import session_factory
from balanced_backend.api.v1.endpoints._utils import sleep_while_empty_cache
from balanced_backend.cache.cache import cache
from balanced_backend.cache import coingecko

if TYPE_CHECKING:
from balanced_backend.models.coingecko import (
Expand All @@ -18,51 +17,44 @@

@router.get("/pairs")
async def get_coingecko_summary() -> list['PairsCoinGecko']:
output = cache.coingecko_pairs
if len(output) == 0:
with session_factory() as session:
coingecko.update_coingecko_pairs(session=session)
output = cache.coingecko_pairs
return output
await sleep_while_empty_cache('coingecko_pairs')
return cache.coingecko_pairs


@router.get("/tickers")
async def get_coingecko_ticker() -> dict[str, 'TickerCoinGecko']:
output = cache.coingecko_tickers
if len(output) == 0:
with session_factory() as session:
coingecko.update_coingecko_tickers(session=session)
output = cache.coingecko_tickers
return output
await sleep_while_empty_cache('coingecko_tickers')
return cache.coingecko_tickers


@router.get("/orderbook")
async def get_coingecko_orderbook(
ticker_id: str = None,
) -> dict[str, 'OrderBookCoinGecko']:
await sleep_while_empty_cache('coingecko_orderbook')
if ticker_id is None:
return cache.coingecko_orderbook

try:
output = cache.coingecko_orderbook[ticker_id]
return cache.coingecko_orderbook[ticker_id]
except KeyError:
with session_factory() as session:
coingecko.update_coingecko_orderbook(session=session)
output = cache.coingecko_orderbook[ticker_id]
return output
raise HTTPException(
status_code=204,
detail=f"ticker_id not found - check /summary for available pairs."
)



@router.get("/historical_trades")
async def get_coingecko_trades(
ticker_id: str = None,
) -> dict[str, 'HistoricalCoinGecko']:
await sleep_while_empty_cache('coingecko_historical')
if ticker_id is None:
return cache.coingecko_historical

try:
output = cache.coingecko_historical[ticker_id]
return cache.coingecko_historical[ticker_id]
except KeyError:
with session_factory() as session:
coingecko.update_coingecko_historical(session=session)
output = cache.coingecko_historical[ticker_id]
return output
raise HTTPException(
status_code=204,
detail=f"ticker_id not found - check /summary for available pairs."
)
29 changes: 18 additions & 11 deletions balanced_backend/cache/cache_cron.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

from loguru import logger
from typing import TypedDict, Callable
from apscheduler.schedulers.background import BlockingScheduler
Expand All @@ -14,35 +16,35 @@ class Cron(TypedDict):
CRONS: list[Cron] = [
{
'func': cmc.update_cmc_summary,
'interval': 60 * 60,
'interval': 60,
},
{
'func': cmc.update_cmc_order_book,
'interval': 60 * 60,
'interval': 60,
},
{
'func': cmc.update_cmc_tickers,
'interval': 60 * 60,
'interval': 60,
},
{
'func': cmc.update_cmc_trades,
'interval': 60 * 60,
'interval': 60,
},
{
'func': coingecko.update_coingecko_pairs,
'interval': 60 * 60,
'interval': 60,
},
{
'func': coingecko.update_coingecko_orderbook,
'interval': 60 * 60,
'interval': 60,
},
{
'func': coingecko.update_coingecko_tickers,
'interval': 60 * 60,
'interval': 60,
},
{
'func': coingecko.update_coingecko_historical,
'interval': 60 * 60,
'interval': 60 * 5,
},
]

Expand All @@ -52,14 +54,19 @@ def run_cron_with_session(cron: Callable):
cron(session=session)


async def run_all_crons():
tasks = [run_cron_with_session(i['func']) for i in CRONS]
await asyncio.gather(*tasks)


def cache_cron():
logger.info("Starting metrics server.")
sched = BlockingScheduler()

for i in CRONS:
# Run the jobs immediately in order
run_cron_with_session(i['func'])
# Run the jobs immediately in parallel
asyncio.run(run_all_crons())

for i in CRONS:
# Then run them in the scheduler
sched.add_job(
func=run_cron_with_session,
Expand Down

0 comments on commit 88b3777

Please sign in to comment.