Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions src/services/gam_inventory_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from datetime import datetime, timedelta
from typing import Any

from sqlalchemy import String, and_, create_engine, delete, func, or_, select
from sqlalchemy import String, and_, create_engine, delete, func, or_, select, text
from sqlalchemy.orm import Session, scoped_session, sessionmaker

from src.adapters.gam_inventory_discovery import (
Expand Down Expand Up @@ -526,9 +526,22 @@ def _write_custom_targeting_keys(self, tenant_id: str, keys: list, sync_time: da
BATCH_SIZE = 500

# Load existing key IDs once
# Use a fresh query to avoid stale connection issues
stmt = select(GAMInventory.inventory_id, GAMInventory.id).where(
and_(GAMInventory.tenant_id == tenant_id, GAMInventory.inventory_type == "custom_targeting_key")
)

# Explicitly expire_all() and test connection health before querying
# This prevents hanging on stale connections in long-running syncs
self.db.expire_all()

# Test connection is alive with a simple query (connection keep-alive)
try:
self.db.execute(text("SELECT 1")).scalar()
except Exception as e:
logger.warning(f"Connection test failed, will retry query: {e}")
# Connection is stale - SQLAlchemy will automatically reconnect on next query

existing = self.db.execute(stmt).all()
existing_ids = {row.inventory_id: row.id for row in existing}

Expand Down Expand Up @@ -653,12 +666,21 @@ def _convert_item_to_db_format(self, tenant_id: str, inventory_type: str, item,
raise ValueError(f"Unknown inventory type: {inventory_type}")

def _flush_batch(self, to_insert: list, to_update: list):
"""Flush a batch of inserts and updates to database.
"""Flush a batch of inserts and updates to database with timeout and connection recovery.

Args:
to_insert: List of items to insert
to_update: List of items to update
"""
from sqlalchemy.exc import DBAPIError, OperationalError

from src.adapters.gam.utils.timeout_handler import TimeoutError, timeout

@timeout(seconds=120) # 2 minute timeout for database operations
def _commit_with_timeout():
"""Commit with timeout to prevent indefinite hangs."""
self.db.commit()

try:
if to_insert:
logger.info(f"📝 Starting bulk insert of {len(to_insert)} items...")
Expand All @@ -668,9 +690,28 @@ def _flush_batch(self, to_insert: list, to_update: list):
logger.info(f"📝 Starting bulk update of {len(to_update)} items...")
self.db.bulk_update_mappings(GAMInventory, to_update)
logger.info(f"✅ Batch updated {len(to_update)} items")
logger.info("💾 Committing batch transaction...")
self.db.commit()
logger.info("💾 Committing batch transaction (120s timeout)...")
_commit_with_timeout()
logger.info("✅ Batch committed successfully")
except TimeoutError as e:
logger.error(f"⏰ Database commit timed out after 120s: {e}")
logger.error(f" Insert count: {len(to_insert)}, Update count: {len(to_update)}")
logger.error(" This usually indicates: lost connection, lock contention, or large transaction")
self.db.rollback()
raise TimeoutError(
"Database commit timed out after 120s - possible lost connection, lock contention, or large transaction"
)
except (OperationalError, DBAPIError) as e:
# Connection errors - log and re-raise with context
logger.error(f"❌ Database connection error during batch write: {e}")
logger.error(f" Insert count: {len(to_insert)}, Update count: {len(to_update)}")
logger.error(" Connection may have been lost during long-running sync")
self.db.rollback()
raise OperationalError(
"Database connection lost during batch write. This can happen in long-running syncs if the connection times out.",
params=None,
orig=e.orig if hasattr(e, "orig") else None,
)
except Exception as e:
logger.error(f"❌ Batch write failed: {e}", exc_info=True)
logger.error(f" Insert count: {len(to_insert)}, Update count: {len(to_update)}")
Expand Down