Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 2 additions & 314 deletions src/admin/blueprints/gam.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,320 +556,8 @@ def get_gam_custom_targeting_keys(tenant_id):
return jsonify({"error": str(e)}), 500


@gam_bp.route("/sync-inventory", methods=["POST"])
@log_admin_action("sync_gam_inventory")
@require_tenant_access()
def sync_gam_inventory(tenant_id):
"""Trigger GAM inventory sync for a tenant (background job).

Request body:
mode: "full" (default) or "incremental"
- full: Complete reset - deletes all inventory and re-syncs everything
- incremental: Only fetches items modified since last successful sync
"""
if session.get("role") == "viewer":
return jsonify({"success": False, "error": "Access denied"}), 403

try:
# Get sync mode from request body (default to "incremental" - safer since it doesn't delete data)
# Use force=True and silent=True to handle empty/malformed requests gracefully
request_data = request.get_json(force=True, silent=True) or {}
sync_mode = request_data.get("mode", "incremental")

logger.info(f"Inventory sync request - tenant: {tenant_id}, mode: {sync_mode}, request_data: {request_data}")

if sync_mode not in ["full", "incremental"]:
return jsonify({"success": False, "error": "Invalid sync mode. Must be 'full' or 'incremental'"}), 400

with get_db_session() as db_session:
# Get tenant and adapter config
tenant = db_session.scalars(select(Tenant).filter_by(tenant_id=tenant_id)).first()
if not tenant:
return jsonify({"success": False, "error": "Tenant not found"}), 404

from src.core.database.models import AdapterConfig, SyncJob

adapter_config = db_session.scalars(select(AdapterConfig).filter_by(tenant_id=tenant_id)).first()

if not adapter_config or not adapter_config.gam_network_code or not adapter_config.gam_refresh_token:
return (
jsonify(
{
"success": False,
"error": "Please connect your GAM account before trying to sync inventory. Go to Ad Server settings to configure GAM.",
}
),
400,
)

# Check for existing running sync
existing_sync = db_session.scalars(
select(SyncJob).where(
SyncJob.tenant_id == tenant_id, SyncJob.status == "running", SyncJob.sync_type == "inventory"
)
).first()

if existing_sync:
# Check if sync is stale (running for >1 hour with no progress updates)
from datetime import timedelta

# Make started_at timezone-aware if it's naive (from database)
started_at = existing_sync.started_at
if started_at.tzinfo is None:
started_at = started_at.replace(tzinfo=UTC)

time_running = datetime.now(UTC) - started_at
is_stale = time_running > timedelta(hours=1) and not existing_sync.progress

if is_stale:
# Mark stale sync as failed and allow new sync to start
existing_sync.status = "failed"
existing_sync.completed_at = datetime.now(UTC)
existing_sync.error_message = (
"Sync thread died (stale after 1+ hour with no progress) - marked as failed to allow fresh sync"
)
db_session.commit()
logger.warning(
f"Marked stale sync {existing_sync.sync_id} as failed (running since {existing_sync.started_at}, no progress)"
)
else:
# Sync is actually running, return 409
return (
jsonify(
{
"success": False,
"in_progress": True,
"sync_id": existing_sync.sync_id,
"message": "Sync already in progress",
}
),
409,
)

# Extract config values before starting background thread (avoid session issues)
gam_network_code = adapter_config.gam_network_code
gam_refresh_token = adapter_config.gam_refresh_token

# Get last successful sync time for incremental mode
last_sync_time = None
if sync_mode == "incremental":
last_successful_sync = db_session.scalars(
select(SyncJob)
.where(
SyncJob.tenant_id == tenant_id,
SyncJob.sync_type == "inventory",
SyncJob.status == "completed",
)
.order_by(SyncJob.completed_at.desc())
).first()

if last_successful_sync and last_successful_sync.completed_at:
last_sync_time = last_successful_sync.completed_at
logger.info(f"Incremental sync: using last successful sync time: {last_sync_time}")
else:
logger.warning(
"Incremental sync requested but no previous successful sync found - falling back to full sync"
)
sync_mode = "full"
last_sync_time = None # Reset since we're doing full sync

# Create sync job
sync_id = f"sync_{tenant_id}_{int(datetime.now(UTC).timestamp())}"
sync_job = SyncJob(
sync_id=sync_id,
tenant_id=tenant_id,
adapter_type="google_ad_manager",
sync_type="inventory",
status="pending",
started_at=datetime.now(UTC),
triggered_by="admin_ui",
triggered_by_id=session.get("user_email", "unknown"),
)
db_session.add(sync_job)
db_session.commit()

# Start background sync (using threading for now - can upgrade to Celery later)
import threading

def run_sync():
try:
with get_db_session() as bg_session:
# Update status to running
bg_sync_job = bg_session.scalars(select(SyncJob).filter_by(sync_id=sync_id)).first()
bg_sync_job.status = "running"
bg_session.commit()

# Create OAuth2 client
from googleads import oauth2

oauth2_client = oauth2.GoogleRefreshTokenClient(
client_id=os.environ.get("GAM_OAUTH_CLIENT_ID"),
client_secret=os.environ.get("GAM_OAUTH_CLIENT_SECRET"),
refresh_token=gam_refresh_token,
)

# Create GAM client
client = ad_manager.AdManagerClient(
oauth2_client, "AdCP Sales Agent", network_code=gam_network_code
)

# Initialize GAM inventory discovery
discovery = GAMInventoryDiscovery(client=client, tenant_id=tenant_id)

# Helper function to update progress
def update_progress(phase: str, phase_num: int, total_phases: int, count: int = 0):
bg_sync_job.progress = {
"phase": phase,
"phase_num": phase_num,
"total_phases": total_phases,
"count": count,
}
bg_session.commit()

# Perform inventory sync with progress tracking
# Note: sync_mode and last_sync_time are captured from outer scope
total_phases = 7 if sync_mode == "full" else 6 # Add delete phase for full reset
from datetime import datetime as dt

start_time = dt.now()

# Phase 0: Full reset - delete all existing inventory (only for full sync)
if sync_mode == "full":
update_progress("Deleting Existing Inventory", 1, total_phases)
from sqlalchemy import delete

from src.core.database.models import GAMInventory

stmt = delete(GAMInventory).where(GAMInventory.tenant_id == tenant_id)
bg_session.execute(stmt)
bg_session.commit()
logger.info(f"Full reset: deleted all existing inventory for tenant {tenant_id}")

# Adjust phase numbers if we did full reset
phase_offset = 1 if sync_mode == "full" else 0

# Initialize inventory service for streaming writes
from src.services.gam_inventory_service import GAMInventoryService

inventory_service = GAMInventoryService(bg_session)
sync_time = dt.now()

# Phase 1: Ad Units (fetch → write → clear memory)
update_progress("Discovering Ad Units", 1 + phase_offset, total_phases)
ad_units = discovery.discover_ad_units(since=last_sync_time)
update_progress("Writing Ad Units to DB", 1 + phase_offset, total_phases, len(ad_units))
inventory_service._write_inventory_batch(tenant_id, "ad_unit", ad_units, sync_time)
ad_units_count = len(ad_units)
discovery.ad_units.clear() # Clear from memory
logger.info(f"Wrote {ad_units_count} ad units to database")

# Phase 2: Placements (fetch → write → clear memory)
update_progress("Discovering Placements", 2 + phase_offset, total_phases)
placements = discovery.discover_placements(since=last_sync_time)
update_progress("Writing Placements to DB", 2 + phase_offset, total_phases, len(placements))
inventory_service._write_inventory_batch(tenant_id, "placement", placements, sync_time)
placements_count = len(placements)
discovery.placements.clear() # Clear from memory
logger.info(f"Wrote {placements_count} placements to database")

# Phase 3: Labels (fetch → write → clear memory)
update_progress("Discovering Labels", 3 + phase_offset, total_phases)
labels = discovery.discover_labels(since=last_sync_time)
update_progress("Writing Labels to DB", 3 + phase_offset, total_phases, len(labels))
inventory_service._write_inventory_batch(tenant_id, "label", labels, sync_time)
labels_count = len(labels)
discovery.labels.clear() # Clear from memory
logger.info(f"Wrote {labels_count} labels to database")

# Phase 4: Custom Targeting Keys (fetch → write → clear memory)
update_progress("Discovering Targeting Keys", 4 + phase_offset, total_phases)
custom_targeting = discovery.discover_custom_targeting(fetch_values=False, since=last_sync_time)
update_progress(
"Writing Targeting Keys to DB",
4 + phase_offset,
total_phases,
custom_targeting.get("total_keys", 0),
)
inventory_service._write_custom_targeting_keys(
tenant_id, list(discovery.custom_targeting_keys.values()), sync_time
)
targeting_count = len(discovery.custom_targeting_keys)
discovery.custom_targeting_keys.clear() # Clear from memory
discovery.custom_targeting_values.clear() # Clear from memory
logger.info(f"Wrote {targeting_count} targeting keys to database")

# Phase 5: Audience Segments (fetch → write → clear memory)
update_progress("Discovering Audience Segments", 5 + phase_offset, total_phases)
audience_segments = discovery.discover_audience_segments(since=last_sync_time)
update_progress(
"Writing Audience Segments to DB", 5 + phase_offset, total_phases, len(audience_segments)
)
inventory_service._write_inventory_batch(
tenant_id, "audience_segment", audience_segments, sync_time
)
segments_count = len(audience_segments)
discovery.audience_segments.clear() # Clear from memory
logger.info(f"Wrote {segments_count} audience segments to database")

# Phase 6: Mark stale inventory
update_progress("Marking Stale Inventory", 6 + phase_offset, total_phases)
inventory_service._mark_stale_inventory(tenant_id, sync_time)

# Build result summary
end_time = dt.now()
result = {
"tenant_id": tenant_id,
"sync_time": end_time.isoformat(),
"duration_seconds": (end_time - start_time).total_seconds(),
"ad_units": {"total": ad_units_count},
"placements": {"total": placements_count},
"labels": {"total": labels_count},
"custom_targeting": {
"total_keys": targeting_count,
"note": "Values lazy loaded on demand",
},
"audience_segments": {"total": segments_count},
"streaming": True,
"memory_optimized": True,
}

# Update sync job with success
bg_sync_job.status = "completed"
bg_sync_job.completed_at = datetime.now(UTC)
bg_sync_job.summary = json.dumps(result)
bg_session.commit()

logger.info(f"Successfully synced GAM inventory for tenant {tenant_id}")

except Exception as e:
logger.error(f"Error syncing GAM inventory for tenant {tenant_id}: {e}", exc_info=True)
try:
with get_db_session() as err_session:
err_sync_job = err_session.scalars(select(SyncJob).filter_by(sync_id=sync_id)).first()
if err_sync_job:
err_sync_job.status = "failed"
err_sync_job.completed_at = datetime.now(UTC)
err_sync_job.error_message = str(e)
err_session.commit()
except:
pass # Ignore errors in error handling

# Start background thread
sync_thread = threading.Thread(target=run_sync, daemon=True)
sync_thread.start()

return jsonify(
{
"success": True,
"sync_id": sync_id,
"message": "Inventory sync started in background",
"status": "pending",
}
)

except Exception as e:
logger.error(f"Error starting GAM inventory sync for tenant {tenant_id}: {e}", exc_info=True)
return jsonify({"success": False, "error": str(e)}), 500
# Removed: /sync-inventory endpoint - use /api/tenant/<tenant_id>/inventory/sync instead
# The new endpoint is in inventory.py and uses background_sync_service.py


@gam_bp.route("/sync-status/<sync_id>", methods=["GET"])
Expand Down
2 changes: 2 additions & 0 deletions src/admin/blueprints/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,15 @@ def sync_inventory(tenant_id):

# Parse request body
data = request.get_json() or {}
sync_mode = data.get("mode", "incremental") # Default to incremental (safer)
sync_types = data.get("types", None)
custom_targeting_limit = data.get("custom_targeting_limit")
audience_segment_limit = data.get("audience_segment_limit")

# Start background sync
sync_id = start_inventory_sync_background(
tenant_id=tenant_id,
sync_mode=sync_mode,
sync_types=sync_types,
custom_targeting_limit=custom_targeting_limit,
audience_segment_limit=audience_segment_limit,
Expand Down
Loading