Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions src/a2a_server/adcp_a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
# Import core functions for direct calls (raw functions without FastMCP decorators)
from datetime import UTC, datetime

from sqlalchemy import select

from src.core.audit_logger import get_audit_logger
from src.core.auth_utils import get_principal_from_token
from src.core.config_loader import get_current_tenant
Expand Down Expand Up @@ -633,16 +635,13 @@ async def on_get_task_push_notification_config(

# Query database for config
with get_db_session() as db:
config = (
db.query(DBPushNotificationConfig)
.filter_by(
id=config_id,
tenant_id=tool_context.tenant_id,
principal_id=tool_context.principal_id,
is_active=True,
)
.first()
stmt = select(DBPushNotificationConfig).filter_by(
id=config_id,
tenant_id=tool_context.tenant_id,
principal_id=tool_context.principal_id,
is_active=True,
)
config = db.scalars(stmt).first()

if not config:
raise ServerError(NotFoundError(message=f"Push notification config not found: {config_id}"))
Expand Down Expand Up @@ -723,11 +722,10 @@ async def on_set_task_push_notification_config(
# Create or update configuration
with get_db_session() as db:
# Check if config exists
existing_config = (
db.query(DBPushNotificationConfig)
.filter_by(id=config_id, tenant_id=tool_context.tenant_id, principal_id=tool_context.principal_id)
.first()
stmt = select(DBPushNotificationConfig).filter_by(
id=config_id, tenant_id=tool_context.tenant_id, principal_id=tool_context.principal_id
)
existing_config = db.scalars(stmt).first()

if existing_config:
# Update existing config
Expand Down Expand Up @@ -797,11 +795,10 @@ async def on_list_task_push_notification_config(

# Query database for all active configs
with get_db_session() as db:
configs = (
db.query(DBPushNotificationConfig)
.filter_by(tenant_id=tool_context.tenant_id, principal_id=tool_context.principal_id, is_active=True)
.all()
stmt = select(DBPushNotificationConfig).filter_by(
tenant_id=tool_context.tenant_id, principal_id=tool_context.principal_id, is_active=True
)
configs = db.scalars(stmt).all()

# Convert to A2A format
configs_list = []
Expand Down Expand Up @@ -862,11 +859,10 @@ async def on_delete_task_push_notification_config(

# Query database and mark as inactive
with get_db_session() as db:
config = (
db.query(DBPushNotificationConfig)
.filter_by(id=config_id, tenant_id=tool_context.tenant_id, principal_id=tool_context.principal_id)
.first()
stmt = select(DBPushNotificationConfig).filter_by(
id=config_id, tenant_id=tool_context.tenant_id, principal_id=tool_context.principal_id
)
config = db.scalars(stmt).first()

if not config:
raise ServerError(NotFoundError(message=f"Push notification config not found: {config_id}"))
Expand Down
63 changes: 32 additions & 31 deletions src/adapters/gam/managers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datetime import UTC, datetime, timedelta
from typing import Any

from sqlalchemy import func, select
from sqlalchemy.orm import Session

from src.adapters.gam.client import GAMClientManager
Expand Down Expand Up @@ -287,7 +288,8 @@ def get_sync_status(self, db_session: Session, sync_id: str) -> dict[str, Any] |
Returns:
Sync job status information or None if not found
"""
sync_job = db_session.query(SyncJob).filter_by(sync_id=sync_id, tenant_id=self.tenant_id).first()
stmt = select(SyncJob).filter_by(sync_id=sync_id, tenant_id=self.tenant_id)
sync_job = db_session.scalars(stmt).first()

if not sync_job:
return None
Expand Down Expand Up @@ -331,16 +333,20 @@ def get_sync_history(
Returns:
Sync history with pagination info
"""
query = db_session.query(SyncJob).filter_by(tenant_id=self.tenant_id)
stmt = select(SyncJob).filter_by(tenant_id=self.tenant_id)

if status_filter:
query = query.filter_by(status=status_filter)
stmt = stmt.filter_by(status=status_filter)

# Get total count
total = query.count()
count_stmt = select(func.count()).select_from(SyncJob).where(SyncJob.tenant_id == self.tenant_id)
if status_filter:
count_stmt = count_stmt.where(SyncJob.status == status_filter)
total = db_session.scalar(count_stmt)

# Get results
sync_jobs = query.order_by(SyncJob.started_at.desc()).limit(limit).offset(offset).all()
stmt = stmt.order_by(SyncJob.started_at.desc()).limit(limit).offset(offset)
sync_jobs = db_session.scalars(stmt).all()

results = []
for job in sync_jobs:
Expand Down Expand Up @@ -384,16 +390,13 @@ def needs_sync(self, db_session: Session, sync_type: str, max_age_hours: int = 2
"""
cutoff_time = datetime.now(UTC) - timedelta(hours=max_age_hours)

recent_sync = (
db_session.query(SyncJob)
.filter(
SyncJob.tenant_id == self.tenant_id,
SyncJob.sync_type == sync_type,
SyncJob.status == "completed",
SyncJob.completed_at >= cutoff_time,
)
.first()
stmt = select(SyncJob).where(
SyncJob.tenant_id == self.tenant_id,
SyncJob.sync_type == sync_type,
SyncJob.status == "completed",
SyncJob.completed_at >= cutoff_time,
)
recent_sync = db_session.scalars(stmt).first()

return recent_sync is None

Expand All @@ -409,16 +412,13 @@ def _get_recent_sync(self, db_session: Session, sync_type: str) -> dict[str, Any
"""
today = datetime.now(UTC).replace(hour=0, minute=0, second=0)

recent_sync = (
db_session.query(SyncJob)
.filter(
SyncJob.tenant_id == self.tenant_id,
SyncJob.sync_type == sync_type,
SyncJob.status.in_(["running", "completed"]),
SyncJob.started_at >= today,
)
.first()
stmt = select(SyncJob).where(
SyncJob.tenant_id == self.tenant_id,
SyncJob.sync_type == sync_type,
SyncJob.status.in_(["running", "completed"]),
SyncJob.started_at >= today,
)
recent_sync = db_session.scalars(stmt).first()

if not recent_sync:
return None
Expand Down Expand Up @@ -484,29 +484,30 @@ def get_sync_stats(self, db_session: Session, hours: int = 24) -> dict[str, Any]
# Count by status
status_counts = {}
for status in ["pending", "running", "completed", "failed"]:
count = (
db_session.query(SyncJob)
.filter(
count_stmt = (
select(func.count())
.select_from(SyncJob)
.where(
SyncJob.tenant_id == self.tenant_id,
SyncJob.status == status,
SyncJob.started_at >= since,
)
.count()
)
count = db_session.scalar(count_stmt)
status_counts[status] = count

# Get recent failures
recent_failures = (
db_session.query(SyncJob)
.filter(
stmt = (
select(SyncJob)
.where(
SyncJob.tenant_id == self.tenant_id,
SyncJob.status == "failed",
SyncJob.started_at >= since,
)
.order_by(SyncJob.started_at.desc())
.limit(5)
.all()
)
recent_failures = db_session.scalars(stmt).all()

failures = []
for job in recent_failures:
Expand Down
5 changes: 4 additions & 1 deletion src/adapters/gam/managers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,16 @@ def create_manual_order_workflow_step(
step_id = f"c{uuid.uuid4().hex[:5]}" # 6 chars total

# Use naming template from adapter config, or fallback to default
from sqlalchemy import select

from src.adapters.gam.utils.naming import apply_naming_template, build_order_name_context
from src.core.database.database_session import get_db_session
from src.core.database.models import AdapterConfig

order_name_template = "{campaign_name|promoted_offering} - {date_range}" # Default
with get_db_session() as db_session:
adapter_config = db_session.query(AdapterConfig).filter_by(tenant_id=self.tenant_id).first()
stmt = select(AdapterConfig).filter_by(tenant_id=self.tenant_id)
adapter_config = db_session.scalars(stmt).first()
if adapter_config and adapter_config.gam_order_name_template:
order_name_template = adapter_config.gam_order_name_template

Expand Down
29 changes: 17 additions & 12 deletions src/adapters/gam_reporting_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import pytz
from flask import Blueprint, jsonify, request, session
from sqlalchemy import select

from scripts.ops.gam_helper import get_ad_manager_client_for_tenant
from src.adapters.gam_reporting_service import GAMReportingService
Expand Down Expand Up @@ -117,7 +118,8 @@ def get_gam_reporting(tenant_id: str):

# Check if tenant is using GAM
with get_db_session() as db_session:
tenant = db_session.query(Tenant).filter_by(tenant_id=tenant_id).first()
stmt = select(Tenant).filter_by(tenant_id=tenant_id)
tenant = db_session.scalars(stmt).first()

if not tenant or tenant.ad_server != "google_ad_manager":
return jsonify({"error": "GAM reporting is only available for tenants using Google Ad Manager"}), 400
Expand Down Expand Up @@ -219,7 +221,8 @@ def get_advertiser_summary(tenant_id: str, advertiser_id: str):

# Check if tenant is using GAM
with get_db_session() as db_session:
tenant = db_session.query(Tenant).filter_by(tenant_id=tenant_id).first()
stmt = select(Tenant).filter_by(tenant_id=tenant_id)
tenant = db_session.scalars(stmt).first()

if not tenant or tenant.ad_server != "google_ad_manager":
return jsonify({"error": "GAM reporting is only available for tenants using Google Ad Manager"}), 400
Expand Down Expand Up @@ -282,7 +285,8 @@ def get_principal_reporting(tenant_id: str, principal_id: str):

# Get the principal's advertiser_id
with get_db_session() as db_session:
principal = db_session.query(Principal).filter_by(tenant_id=tenant_id, principal_id=principal_id).first()
stmt = select(Principal).filter_by(tenant_id=tenant_id, principal_id=principal_id)
principal = db_session.scalars(stmt).first()

if not principal:
return jsonify({"error": "Principal not found"}), 404
Expand Down Expand Up @@ -324,9 +328,8 @@ def get_principal_reporting(tenant_id: str, principal_id: str):
from src.core.database.models import AdapterConfig

with get_db_session() as db_session:
adapter_config = (
db_session.query(AdapterConfig).filter_by(tenant_id=tenant_id, adapter_type="google_ad_manager").first()
)
stmt = select(AdapterConfig).filter_by(tenant_id=tenant_id, adapter_type="google_ad_manager")
adapter_config = db_session.scalars(stmt).first()

if not adapter_config:
# Default to America/New_York if no config found
Expand Down Expand Up @@ -396,7 +399,8 @@ def get_country_breakdown(tenant_id: str):

# Check if tenant is using GAM
with get_db_session() as db_session:
tenant = db_session.query(Tenant).filter_by(tenant_id=tenant_id).first()
stmt = select(Tenant).filter_by(tenant_id=tenant_id)
tenant = db_session.scalars(stmt).first()

if not tenant or tenant.ad_server != "google_ad_manager":
return jsonify({"error": "GAM reporting is only available for tenants using Google Ad Manager"}), 400
Expand Down Expand Up @@ -479,7 +483,8 @@ def get_ad_unit_breakdown(tenant_id: str):

# Check if tenant is using GAM
with get_db_session() as db_session:
tenant = db_session.query(Tenant).filter_by(tenant_id=tenant_id).first()
stmt = select(Tenant).filter_by(tenant_id=tenant_id)
tenant = db_session.scalars(stmt).first()

if not tenant or tenant.ad_server != "google_ad_manager":
return jsonify({"error": "GAM reporting is only available for tenants using Google Ad Manager"}), 400
Expand Down Expand Up @@ -564,7 +569,8 @@ def get_principal_summary(tenant_id: str, principal_id: str):

# Get the principal's advertiser_id
with get_db_session() as db_session:
principal = db_session.query(Principal).filter_by(tenant_id=tenant_id, principal_id=principal_id).first()
stmt = select(Principal).filter_by(tenant_id=tenant_id, principal_id=principal_id)
principal = db_session.scalars(stmt).first()

if not principal:
return jsonify({"error": "Principal not found"}), 404
Expand Down Expand Up @@ -596,9 +602,8 @@ def get_principal_summary(tenant_id: str, principal_id: str):
from src.core.database.models import AdapterConfig

with get_db_session() as db_session:
adapter_config = (
db_session.query(AdapterConfig).filter_by(tenant_id=tenant_id, adapter_type="google_ad_manager").first()
)
stmt = select(AdapterConfig).filter_by(tenant_id=tenant_id, adapter_type="google_ad_manager")
adapter_config = db_session.scalars(stmt).first()

if not adapter_config:
# Default to America/New_York if no config found
Expand Down
5 changes: 4 additions & 1 deletion src/adapters/google_ad_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,16 @@ def create_media_buy(

# Automatic mode - create order directly
# Use naming template from adapter config, or fallback to default
from sqlalchemy import select

from src.adapters.gam.utils.naming import apply_naming_template, build_order_name_context
from src.core.database.database_session import get_db_session
from src.core.database.models import AdapterConfig

order_name_template = "{campaign_name|promoted_offering} - {date_range}" # Default
with get_db_session() as db_session:
adapter_config = db_session.query(AdapterConfig).filter_by(tenant_id=self.tenant_id).first()
stmt = select(AdapterConfig).filter_by(tenant_id=self.tenant_id)
adapter_config = db_session.scalars(stmt).first()
if adapter_config and adapter_config.gam_order_name_template:
order_name_template = adapter_config.gam_order_name_template

Expand Down
Loading
Loading