Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/api/endpoints/announcement.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fastapi import APIRouter

from app.config import settings
from app.core.config import settings

router = APIRouter(prefix="/announcement", tags=["announcement"])

Expand Down
2 changes: 1 addition & 1 deletion app/api/endpoints/catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from app.services.token_store import token_store

MAX_RESULTS = 50
SOURCE_ITEMS_LIMIT = 10
SOURCE_ITEMS_LIMIT = 15

router = APIRouter()

Expand Down
2 changes: 1 addition & 1 deletion app/api/endpoints/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def _manifest_handler(response: Response, token: str):

@router.get("/manifest.json")
async def manifest():
manifest = await get_base_manifest()
manifest = get_base_manifest()
# since user is not logged in, return empty catalogs
manifest["catalogs"] = []
return manifest
Expand Down
2 changes: 2 additions & 0 deletions app/api/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from fastapi import APIRouter

from .endpoints.announcement import router as announcement_router
from .endpoints.catalogs import router as catalogs_router
from .endpoints.health import router as health_router
from .endpoints.manifest import router as manifest_router
Expand All @@ -19,3 +20,4 @@ async def root():
api_router.include_router(tokens_router)
api_router.include_router(health_router)
api_router.include_router(meta_router)
api_router.include_router(announcement_router)
11 changes: 10 additions & 1 deletion app/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ async def lifespan(app: FastAPI):
Manage application lifespan events (startup/shutdown).
"""
global catalog_updater
asyncio.create_task(migrate_tokens())
task = asyncio.create_task(migrate_tokens())

# Ensure background exceptions are surfaced in logs
def _on_done(t: asyncio.Task):
try:
t.result()
except Exception as exc:
logger.error(f"migrate_tokens background task failed: {exc}")

task.add_done_callback(_on_done)

# Startup
if settings.AUTO_UPDATE_CATALOGS:
Expand Down
17 changes: 12 additions & 5 deletions app/services/catalog.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime, timezone

from app.core.settings import CatalogConfig, UserSettings
from app.services.row_generator import RowGeneratorService
from app.services.scoring import ScoringService
Expand Down Expand Up @@ -130,30 +132,35 @@ async def get_dynamic_catalogs(
return catalogs

async def _add_item_based_rows(
self, catalogs: list, library_items: dict, content_type: str, language: str, loved_config, watched_config
self,
catalogs: list,
library_items: dict,
content_type: str,
language: str,
loved_config,
watched_config,
):
"""Helper to add 'Because you watched' and 'More like' rows."""

# Helper to parse date
def get_date(item):
import datetime

val = item.get("state", {}).get("lastWatched")
if val:
try:
if isinstance(val, str):
return datetime.datetime.fromisoformat(val.replace("Z", "+00:00"))
return datetime.fromisoformat(val.replace("Z", "+00:00"))
return val
except (ValueError, TypeError):
pass
# Fallback to mtime
val = item.get("_mtime")
if val:
try:
return datetime.datetime.fromisoformat(str(val).replace("Z", "+00:00"))
return datetime.fromisoformat(str(val).replace("Z", "+00:00"))
except (ValueError, TypeError):
pass
return datetime.datetime.min.replace(tzinfo=datetime.UTC)
return datetime.min.replace(tzinfo=timezone.utc)

# 1. More Like <Loved Item>
last_loved = None # Initialize for the watched check
Expand Down
9 changes: 7 additions & 2 deletions app/services/catalog_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ async def refresh_catalogs_for_credentials(token: str, credentials: dict[str, An
addon_installed = await stremio_service.is_addon_installed(auth_key)
if not addon_installed:
logger.info(f"[{redact_token(token)}] User has not installed addon. Removing token from redis")
await token_store.delete_token(key=token)
# Ensure we delete by token, not by raw Redis key
await token_store.delete_token(token=token)
return True
except Exception as e:
logger.exception(f"[{redact_token(token)}] Failed to check if addon is installed: {e}")
Expand All @@ -41,6 +42,7 @@ async def refresh_catalogs_for_credentials(token: str, credentials: dict[str, An
dynamic_catalog_service = DynamicCatalogService(stremio_service=stremio_service)

# Ensure user_settings is available
user_settings = get_default_settings()
if credentials.get("settings"):
try:
user_settings = UserSettings(**credentials["settings"])
Expand Down Expand Up @@ -140,7 +142,10 @@ async def _update_safe(key: str, payload: dict[str, Any]) -> None:

try:
async for key, payload in token_store.iter_payloads():
tasks.append(asyncio.create_task(_update_safe(key, payload)))
# Extract token from redis key prefix
prefix = token_store.KEY_PREFIX
tok = key[len(prefix) :] if key.startswith(prefix) else key # noqa
tasks.append(asyncio.create_task(_update_safe(tok, payload)))

if tasks:
logger.info(f"Starting background refresh for {len(tasks)} tokens...")
Expand Down
21 changes: 12 additions & 9 deletions app/services/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class DiscoveryEngine:

def __init__(self):
self.tmdb_service = TMDBService()
# Limit concurrent discovery calls to avoid rate limiting
self._sem = asyncio.Semaphore(10)

async def discover_recommendations(
self,
Expand Down Expand Up @@ -63,7 +65,7 @@ async def discover_recommendations(
for i in range(2):
params_rating = {
"with_genres": genre_ids,
"sort_by": "ratings.desc",
"sort_by": "vote_average.desc",
"vote_count.gte": 500,
"page": i + 1,
**base_params,
Expand All @@ -85,7 +87,7 @@ async def discover_recommendations(
for i in range(3):
params_rating = {
"with_keywords": keyword_ids,
"sort_by": "ratings.desc",
"sort_by": "vote_average.desc",
"vote_count.gte": 500,
"page": i + 1,
**base_params,
Expand All @@ -105,7 +107,7 @@ async def discover_recommendations(

params_rating = {
"with_cast": str(actor_id),
"sort_by": "ratings.desc",
"sort_by": "vote_average.desc",
"vote_count.gte": 500,
**base_params,
}
Expand All @@ -124,7 +126,7 @@ async def discover_recommendations(

params_rating = {
"with_crew": str(director_id),
"sort_by": "ratings.desc",
"sort_by": "vote_average.desc",
"vote_count.gte": 500,
**base_params,
}
Expand All @@ -143,7 +145,7 @@ async def discover_recommendations(

params_rating = {
"with_origin_country": country_ids,
"sort_by": "ratings.desc",
"sort_by": "vote_average.desc",
"vote_count.gte": 300,
**base_params,
}
Expand All @@ -154,11 +156,11 @@ async def discover_recommendations(
year = top_year[0][0]
# we store year in 10 years bucket
start_year = f"{year}-01-01"
end_year = f"{int(year) + 10}-12-31"
end_year = f"{int(year) + 9}-12-31"
params_rating = {
"primary_release_date.gte": start_year,
"primary_release_date.lte": end_year,
"sort_by": "ratings.desc",
"sort_by": "vote_average.desc",
"vote_count.gte": 500,
**base_params,
}
Expand All @@ -181,7 +183,8 @@ async def discover_recommendations(
async def _fetch_discovery(self, media_type: str, params: dict) -> list[dict]:
"""Helper to call TMDB discovery."""
try:
data = await self.tmdb_service.get_discover(media_type, **params)
return data.get("results", [])
async with self._sem:
data = await self.tmdb_service.get_discover(media_type, **params)
return data.get("results", [])
except Exception:
return []
7 changes: 7 additions & 0 deletions app/services/gemini.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

from google import genai
from loguru import logger

Expand Down Expand Up @@ -52,5 +54,10 @@ def generate_content(self, prompt: str) -> str:
logger.error(f"Error generating content: {e}")
return ""

async def generate_content_async(self, prompt: str) -> str:
"""Async wrapper to avoid blocking the event loop during network calls."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: self.generate_content(prompt))


gemini_service = GeminiService()
Loading