diff --git a/src/admin/blueprints/gam.py b/src/admin/blueprints/gam.py index e7d06a159..5f5ec1f13 100644 --- a/src/admin/blueprints/gam.py +++ b/src/admin/blueprints/gam.py @@ -556,320 +556,8 @@ def get_gam_custom_targeting_keys(tenant_id): return jsonify({"error": str(e)}), 500 -@gam_bp.route("/sync-inventory", methods=["POST"]) -@log_admin_action("sync_gam_inventory") -@require_tenant_access() -def sync_gam_inventory(tenant_id): - """Trigger GAM inventory sync for a tenant (background job). - - Request body: - mode: "full" (default) or "incremental" - - full: Complete reset - deletes all inventory and re-syncs everything - - incremental: Only fetches items modified since last successful sync - """ - if session.get("role") == "viewer": - return jsonify({"success": False, "error": "Access denied"}), 403 - - try: - # Get sync mode from request body (default to "incremental" - safer since it doesn't delete data) - # Use force=True and silent=True to handle empty/malformed requests gracefully - request_data = request.get_json(force=True, silent=True) or {} - sync_mode = request_data.get("mode", "incremental") - - logger.info(f"Inventory sync request - tenant: {tenant_id}, mode: {sync_mode}, request_data: {request_data}") - - if sync_mode not in ["full", "incremental"]: - return jsonify({"success": False, "error": "Invalid sync mode. Must be 'full' or 'incremental'"}), 400 - - with get_db_session() as db_session: - # Get tenant and adapter config - tenant = db_session.scalars(select(Tenant).filter_by(tenant_id=tenant_id)).first() - if not tenant: - return jsonify({"success": False, "error": "Tenant not found"}), 404 - - from src.core.database.models import AdapterConfig, SyncJob - - adapter_config = db_session.scalars(select(AdapterConfig).filter_by(tenant_id=tenant_id)).first() - - if not adapter_config or not adapter_config.gam_network_code or not adapter_config.gam_refresh_token: - return ( - jsonify( - { - "success": False, - "error": "Please connect your GAM account before trying to sync inventory. Go to Ad Server settings to configure GAM.", - } - ), - 400, - ) - - # Check for existing running sync - existing_sync = db_session.scalars( - select(SyncJob).where( - SyncJob.tenant_id == tenant_id, SyncJob.status == "running", SyncJob.sync_type == "inventory" - ) - ).first() - - if existing_sync: - # Check if sync is stale (running for >1 hour with no progress updates) - from datetime import timedelta - - # Make started_at timezone-aware if it's naive (from database) - started_at = existing_sync.started_at - if started_at.tzinfo is None: - started_at = started_at.replace(tzinfo=UTC) - - time_running = datetime.now(UTC) - started_at - is_stale = time_running > timedelta(hours=1) and not existing_sync.progress - - if is_stale: - # Mark stale sync as failed and allow new sync to start - existing_sync.status = "failed" - existing_sync.completed_at = datetime.now(UTC) - existing_sync.error_message = ( - "Sync thread died (stale after 1+ hour with no progress) - marked as failed to allow fresh sync" - ) - db_session.commit() - logger.warning( - f"Marked stale sync {existing_sync.sync_id} as failed (running since {existing_sync.started_at}, no progress)" - ) - else: - # Sync is actually running, return 409 - return ( - jsonify( - { - "success": False, - "in_progress": True, - "sync_id": existing_sync.sync_id, - "message": "Sync already in progress", - } - ), - 409, - ) - - # Extract config values before starting background thread (avoid session issues) - gam_network_code = adapter_config.gam_network_code - gam_refresh_token = adapter_config.gam_refresh_token - - # Get last successful sync time for incremental mode - last_sync_time = None - if sync_mode == "incremental": - last_successful_sync = db_session.scalars( - select(SyncJob) - .where( - SyncJob.tenant_id == tenant_id, - SyncJob.sync_type == "inventory", - SyncJob.status == "completed", - ) - .order_by(SyncJob.completed_at.desc()) - ).first() - - if last_successful_sync and last_successful_sync.completed_at: - last_sync_time = last_successful_sync.completed_at - logger.info(f"Incremental sync: using last successful sync time: {last_sync_time}") - else: - logger.warning( - "Incremental sync requested but no previous successful sync found - falling back to full sync" - ) - sync_mode = "full" - last_sync_time = None # Reset since we're doing full sync - - # Create sync job - sync_id = f"sync_{tenant_id}_{int(datetime.now(UTC).timestamp())}" - sync_job = SyncJob( - sync_id=sync_id, - tenant_id=tenant_id, - adapter_type="google_ad_manager", - sync_type="inventory", - status="pending", - started_at=datetime.now(UTC), - triggered_by="admin_ui", - triggered_by_id=session.get("user_email", "unknown"), - ) - db_session.add(sync_job) - db_session.commit() - - # Start background sync (using threading for now - can upgrade to Celery later) - import threading - - def run_sync(): - try: - with get_db_session() as bg_session: - # Update status to running - bg_sync_job = bg_session.scalars(select(SyncJob).filter_by(sync_id=sync_id)).first() - bg_sync_job.status = "running" - bg_session.commit() - - # Create OAuth2 client - from googleads import oauth2 - - oauth2_client = oauth2.GoogleRefreshTokenClient( - client_id=os.environ.get("GAM_OAUTH_CLIENT_ID"), - client_secret=os.environ.get("GAM_OAUTH_CLIENT_SECRET"), - refresh_token=gam_refresh_token, - ) - - # Create GAM client - client = ad_manager.AdManagerClient( - oauth2_client, "AdCP Sales Agent", network_code=gam_network_code - ) - - # Initialize GAM inventory discovery - discovery = GAMInventoryDiscovery(client=client, tenant_id=tenant_id) - - # Helper function to update progress - def update_progress(phase: str, phase_num: int, total_phases: int, count: int = 0): - bg_sync_job.progress = { - "phase": phase, - "phase_num": phase_num, - "total_phases": total_phases, - "count": count, - } - bg_session.commit() - - # Perform inventory sync with progress tracking - # Note: sync_mode and last_sync_time are captured from outer scope - total_phases = 7 if sync_mode == "full" else 6 # Add delete phase for full reset - from datetime import datetime as dt - - start_time = dt.now() - - # Phase 0: Full reset - delete all existing inventory (only for full sync) - if sync_mode == "full": - update_progress("Deleting Existing Inventory", 1, total_phases) - from sqlalchemy import delete - - from src.core.database.models import GAMInventory - - stmt = delete(GAMInventory).where(GAMInventory.tenant_id == tenant_id) - bg_session.execute(stmt) - bg_session.commit() - logger.info(f"Full reset: deleted all existing inventory for tenant {tenant_id}") - - # Adjust phase numbers if we did full reset - phase_offset = 1 if sync_mode == "full" else 0 - - # Initialize inventory service for streaming writes - from src.services.gam_inventory_service import GAMInventoryService - - inventory_service = GAMInventoryService(bg_session) - sync_time = dt.now() - - # Phase 1: Ad Units (fetch → write → clear memory) - update_progress("Discovering Ad Units", 1 + phase_offset, total_phases) - ad_units = discovery.discover_ad_units(since=last_sync_time) - update_progress("Writing Ad Units to DB", 1 + phase_offset, total_phases, len(ad_units)) - inventory_service._write_inventory_batch(tenant_id, "ad_unit", ad_units, sync_time) - ad_units_count = len(ad_units) - discovery.ad_units.clear() # Clear from memory - logger.info(f"Wrote {ad_units_count} ad units to database") - - # Phase 2: Placements (fetch → write → clear memory) - update_progress("Discovering Placements", 2 + phase_offset, total_phases) - placements = discovery.discover_placements(since=last_sync_time) - update_progress("Writing Placements to DB", 2 + phase_offset, total_phases, len(placements)) - inventory_service._write_inventory_batch(tenant_id, "placement", placements, sync_time) - placements_count = len(placements) - discovery.placements.clear() # Clear from memory - logger.info(f"Wrote {placements_count} placements to database") - - # Phase 3: Labels (fetch → write → clear memory) - update_progress("Discovering Labels", 3 + phase_offset, total_phases) - labels = discovery.discover_labels(since=last_sync_time) - update_progress("Writing Labels to DB", 3 + phase_offset, total_phases, len(labels)) - inventory_service._write_inventory_batch(tenant_id, "label", labels, sync_time) - labels_count = len(labels) - discovery.labels.clear() # Clear from memory - logger.info(f"Wrote {labels_count} labels to database") - - # Phase 4: Custom Targeting Keys (fetch → write → clear memory) - update_progress("Discovering Targeting Keys", 4 + phase_offset, total_phases) - custom_targeting = discovery.discover_custom_targeting(fetch_values=False, since=last_sync_time) - update_progress( - "Writing Targeting Keys to DB", - 4 + phase_offset, - total_phases, - custom_targeting.get("total_keys", 0), - ) - inventory_service._write_custom_targeting_keys( - tenant_id, list(discovery.custom_targeting_keys.values()), sync_time - ) - targeting_count = len(discovery.custom_targeting_keys) - discovery.custom_targeting_keys.clear() # Clear from memory - discovery.custom_targeting_values.clear() # Clear from memory - logger.info(f"Wrote {targeting_count} targeting keys to database") - - # Phase 5: Audience Segments (fetch → write → clear memory) - update_progress("Discovering Audience Segments", 5 + phase_offset, total_phases) - audience_segments = discovery.discover_audience_segments(since=last_sync_time) - update_progress( - "Writing Audience Segments to DB", 5 + phase_offset, total_phases, len(audience_segments) - ) - inventory_service._write_inventory_batch( - tenant_id, "audience_segment", audience_segments, sync_time - ) - segments_count = len(audience_segments) - discovery.audience_segments.clear() # Clear from memory - logger.info(f"Wrote {segments_count} audience segments to database") - - # Phase 6: Mark stale inventory - update_progress("Marking Stale Inventory", 6 + phase_offset, total_phases) - inventory_service._mark_stale_inventory(tenant_id, sync_time) - - # Build result summary - end_time = dt.now() - result = { - "tenant_id": tenant_id, - "sync_time": end_time.isoformat(), - "duration_seconds": (end_time - start_time).total_seconds(), - "ad_units": {"total": ad_units_count}, - "placements": {"total": placements_count}, - "labels": {"total": labels_count}, - "custom_targeting": { - "total_keys": targeting_count, - "note": "Values lazy loaded on demand", - }, - "audience_segments": {"total": segments_count}, - "streaming": True, - "memory_optimized": True, - } - - # Update sync job with success - bg_sync_job.status = "completed" - bg_sync_job.completed_at = datetime.now(UTC) - bg_sync_job.summary = json.dumps(result) - bg_session.commit() - - logger.info(f"Successfully synced GAM inventory for tenant {tenant_id}") - - except Exception as e: - logger.error(f"Error syncing GAM inventory for tenant {tenant_id}: {e}", exc_info=True) - try: - with get_db_session() as err_session: - err_sync_job = err_session.scalars(select(SyncJob).filter_by(sync_id=sync_id)).first() - if err_sync_job: - err_sync_job.status = "failed" - err_sync_job.completed_at = datetime.now(UTC) - err_sync_job.error_message = str(e) - err_session.commit() - except: - pass # Ignore errors in error handling - - # Start background thread - sync_thread = threading.Thread(target=run_sync, daemon=True) - sync_thread.start() - - return jsonify( - { - "success": True, - "sync_id": sync_id, - "message": "Inventory sync started in background", - "status": "pending", - } - ) - - except Exception as e: - logger.error(f"Error starting GAM inventory sync for tenant {tenant_id}: {e}", exc_info=True) - return jsonify({"success": False, "error": str(e)}), 500 +# Removed: /sync-inventory endpoint - use /api/tenant//inventory/sync instead +# The new endpoint is in inventory.py and uses background_sync_service.py @gam_bp.route("/sync-status/", methods=["GET"]) diff --git a/src/admin/blueprints/inventory.py b/src/admin/blueprints/inventory.py index 3de553221..67a8e22b1 100644 --- a/src/admin/blueprints/inventory.py +++ b/src/admin/blueprints/inventory.py @@ -528,6 +528,7 @@ def sync_inventory(tenant_id): # Parse request body data = request.get_json() or {} + sync_mode = data.get("mode", "incremental") # Default to incremental (safer) sync_types = data.get("types", None) custom_targeting_limit = data.get("custom_targeting_limit") audience_segment_limit = data.get("audience_segment_limit") @@ -535,6 +536,7 @@ def sync_inventory(tenant_id): # Start background sync sync_id = start_inventory_sync_background( tenant_id=tenant_id, + sync_mode=sync_mode, sync_types=sync_types, custom_targeting_limit=custom_targeting_limit, audience_segment_limit=audience_segment_limit, diff --git a/src/services/background_sync_service.py b/src/services/background_sync_service.py index 873a550b9..46296d9f9 100644 --- a/src/services/background_sync_service.py +++ b/src/services/background_sync_service.py @@ -24,6 +24,7 @@ def start_inventory_sync_background( tenant_id: str, + sync_mode: str = "incremental", sync_types: list[str] | None = None, custom_targeting_limit: int | None = None, audience_segment_limit: int | None = None, @@ -33,6 +34,7 @@ def start_inventory_sync_background( Args: tenant_id: Tenant ID to sync + sync_mode: "full" (delete all and resync) or "incremental" (only sync changed items since last successful sync) sync_types: Optional list of inventory types to sync custom_targeting_limit: Optional limit on custom targeting values audience_segment_limit: Optional limit on audience segments @@ -47,11 +49,39 @@ def start_inventory_sync_background( # Create sync job record with get_db_session() as db: # Check if sync already running - stmt = select(SyncJob).where(SyncJob.tenant_id == tenant_id, SyncJob.status == "running") + stmt = select(SyncJob).where( + SyncJob.tenant_id == tenant_id, SyncJob.status == "running", SyncJob.sync_type == "inventory" + ) existing_sync = db.scalars(stmt).first() if existing_sync: - raise ValueError(f"Sync already running for tenant {tenant_id}: {existing_sync.sync_id}") + # Check if sync is stale (running for >1 hour with no progress updates) + from datetime import timedelta + + # Make started_at timezone-aware if it's naive (from database) + started_at = existing_sync.started_at + if started_at.tzinfo is None: + started_at = started_at.replace(tzinfo=UTC) + + time_running = datetime.now(UTC) - started_at + is_stale = time_running > timedelta(hours=1) and not existing_sync.progress_data + + if is_stale: + # Mark stale sync as failed and allow new sync to start + existing_sync.status = "failed" + existing_sync.completed_at = datetime.now(UTC) + existing_sync.error_message = ( + "Sync thread died (stale after 1+ hour with no progress) - marked as failed to allow fresh sync" + ) + db.commit() + logger.warning( + f"Marked stale sync {existing_sync.sync_id} as failed (running since {existing_sync.started_at}, no progress)" + ) + else: + # Sync is actually running, raise error + raise ValueError( + f"Sync already running for tenant {tenant_id}: {existing_sync.sync_id} (started {started_at})" + ) # Create new sync job sync_id = f"sync_{tenant_id}_{int(datetime.now(UTC).timestamp())}" @@ -78,7 +108,7 @@ def start_inventory_sync_background( # Start background thread thread = threading.Thread( target=_run_sync_thread, - args=(tenant_id, sync_id, sync_types, custom_targeting_limit, audience_segment_limit), + args=(tenant_id, sync_id, sync_mode, sync_types, custom_targeting_limit, audience_segment_limit), daemon=True, name=f"sync-{sync_id}", ) @@ -95,16 +125,26 @@ def start_inventory_sync_background( def _run_sync_thread( tenant_id: str, sync_id: str, + sync_mode: str, sync_types: list[str] | None, custom_targeting_limit: int | None, audience_segment_limit: int | None, ): """ - Run the actual sync in a background thread. + Run the actual sync in a background thread with detailed phase-by-phase progress. This function runs in a separate thread and updates the SyncJob record as it progresses. If the thread is interrupted (container restart), the job will remain in 'running' state until cleaned up. + + Progress tracking: + - Phase 0 (full mode only): Deleting existing inventory (1/7) + - Phase 1: Discovering Ad Units (2/7 or 1/6) + - Phase 2: Discovering Placements (3/7 or 2/6) + - Phase 3: Discovering Labels (4/7 or 3/6) + - Phase 4: Discovering Custom Targeting (5/7 or 4/6) + - Phase 5: Discovering Audience Segments (6/7 or 5/6) + - Phase 6: Marking Stale Inventory (7/7 or 6/6) """ try: logger.info(f"[{sync_id}] Starting inventory sync for {tenant_id}") @@ -176,29 +216,145 @@ def _run_sync_thread( oauth2_client, "AdCP Sales Agent", network_code=adapter_config.gam_network_code ) - # Update progress: Starting discovery - _update_sync_progress(sync_id, {"phase": "Discovering inventory from GAM", "phase_num": 1, "total_phases": 2}) + # Get last successful sync time for incremental mode + last_sync_time = None + if sync_mode == "incremental": + with get_db_session() as db: + from sqlalchemy import desc + + last_successful_sync = db.scalars( + select(SyncJob) + .where( + SyncJob.tenant_id == tenant_id, + SyncJob.sync_type == "inventory", + SyncJob.status == "completed", + ) + .order_by(desc(SyncJob.completed_at)) + ).first() + + if last_successful_sync and last_successful_sync.completed_at: + last_sync_time = last_successful_sync.completed_at + logger.info(f"[{sync_id}] Incremental sync: using last successful sync time: {last_sync_time}") + else: + logger.warning( + f"[{sync_id}] Incremental sync requested but no previous successful sync found - falling back to full sync" + ) + sync_mode = "full" + last_sync_time = None + + # Calculate total phases + total_phases = 7 if sync_mode == "full" else 6 # Add delete phase for full reset + phase_offset = 1 if sync_mode == "full" else 0 # Initialize discovery discovery = GAMInventoryDiscovery(client=client, tenant_id=tenant_id) - - # Perform sync - if sync_types: - result = discovery.sync_selective( - sync_types=sync_types, - custom_targeting_limit=custom_targeting_limit, - audience_segment_limit=audience_segment_limit, + start_time = datetime.now() + + # Helper function to update progress + def update_progress(phase: str, phase_num: int, count: int = 0): + _update_sync_progress( + sync_id, + { + "phase": phase, + "phase_num": phase_num, + "total_phases": total_phases, + "count": count, + "mode": sync_mode, + }, ) - else: - result = discovery.sync_all() - # Update progress: Saving to database - _update_sync_progress(sync_id, {"phase": "Saving to database", "phase_num": 2, "total_phases": 2}) + # Phase 0: Full reset - delete all existing inventory (only for full sync) + if sync_mode == "full": + update_progress("Deleting Existing Inventory", 1) + with get_db_session() as db: + from sqlalchemy import delete - # Save to database (fresh session) + from src.core.database.models import GAMInventory + + stmt = delete(GAMInventory).where(GAMInventory.tenant_id == tenant_id) + db.execute(stmt) + db.commit() + logger.info(f"[{sync_id}] Full reset: deleted all existing inventory for tenant {tenant_id}") + + # Initialize inventory service for streaming writes with get_db_session() as db: inventory_service = GAMInventoryService(db) - inventory_service._save_inventory_to_db(tenant_id, discovery) + sync_time = datetime.now() + + # Phase 1: Ad Units (fetch → write → clear memory) + update_progress("Discovering Ad Units", 1 + phase_offset) + ad_units = discovery.discover_ad_units(since=last_sync_time) + update_progress("Writing Ad Units to DB", 1 + phase_offset, len(ad_units)) + inventory_service._write_inventory_batch(tenant_id, "ad_unit", ad_units, sync_time) + ad_units_count = len(ad_units) + discovery.ad_units.clear() # Clear from memory + logger.info(f"[{sync_id}] Wrote {ad_units_count} ad units to database") + + # Phase 2: Placements (fetch → write → clear memory) + update_progress("Discovering Placements", 2 + phase_offset) + placements = discovery.discover_placements(since=last_sync_time) + update_progress("Writing Placements to DB", 2 + phase_offset, len(placements)) + inventory_service._write_inventory_batch(tenant_id, "placement", placements, sync_time) + placements_count = len(placements) + discovery.placements.clear() # Clear from memory + logger.info(f"[{sync_id}] Wrote {placements_count} placements to database") + + # Phase 3: Labels (fetch → write → clear memory) + update_progress("Discovering Labels", 3 + phase_offset) + labels = discovery.discover_labels(since=last_sync_time) + update_progress("Writing Labels to DB", 3 + phase_offset, len(labels)) + inventory_service._write_inventory_batch(tenant_id, "label", labels, sync_time) + labels_count = len(labels) + discovery.labels.clear() # Clear from memory + logger.info(f"[{sync_id}] Wrote {labels_count} labels to database") + + # Phase 4: Custom Targeting Keys (fetch → write → clear memory) + update_progress("Discovering Targeting Keys", 4 + phase_offset) + custom_targeting = discovery.discover_custom_targeting(fetch_values=False, since=last_sync_time) + update_progress( + "Writing Targeting Keys to DB", + 4 + phase_offset, + custom_targeting.get("total_keys", 0), + ) + inventory_service._write_custom_targeting_keys( + tenant_id, list(discovery.custom_targeting_keys.values()), sync_time + ) + targeting_count = len(discovery.custom_targeting_keys) + discovery.custom_targeting_keys.clear() # Clear from memory + discovery.custom_targeting_values.clear() # Clear from memory + logger.info(f"[{sync_id}] Wrote {targeting_count} targeting keys to database") + + # Phase 5: Audience Segments (fetch → write → clear memory) + update_progress("Discovering Audience Segments", 5 + phase_offset) + audience_segments = discovery.discover_audience_segments(since=last_sync_time) + update_progress("Writing Audience Segments to DB", 5 + phase_offset, len(audience_segments)) + inventory_service._write_inventory_batch(tenant_id, "audience_segment", audience_segments, sync_time) + segments_count = len(audience_segments) + discovery.audience_segments.clear() # Clear from memory + logger.info(f"[{sync_id}] Wrote {segments_count} audience segments to database") + + # Phase 6: Mark stale inventory + update_progress("Marking Stale Inventory", 6 + phase_offset) + inventory_service._mark_stale_inventory(tenant_id, sync_time) + + # Build result summary + end_time = datetime.now() + result = { + "tenant_id": tenant_id, + "sync_time": end_time.isoformat(), + "duration_seconds": (end_time - start_time).total_seconds(), + "mode": sync_mode, + "ad_units": {"total": ad_units_count}, + "placements": {"total": placements_count}, + "labels": {"total": labels_count}, + "custom_targeting": { + "total_keys": targeting_count, + "note": "Values lazy loaded on demand", + }, + "audience_segments": {"total": segments_count}, + "streaming": True, + "memory_optimized": True, + } # Mark complete _mark_sync_complete(sync_id, result)