diff --git a/alembic/versions/e38f2f6f395a_add_mock_manual_approval_required.py b/alembic/versions/e38f2f6f395a_add_mock_manual_approval_required.py new file mode 100644 index 000000000..0ff6838c0 --- /dev/null +++ b/alembic/versions/e38f2f6f395a_add_mock_manual_approval_required.py @@ -0,0 +1,36 @@ +"""add_mock_manual_approval_required + +Revision ID: e38f2f6f395a +Revises: faaed3b71428 +Create Date: 2025-10-23 20:06:20.766732 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'e38f2f6f395a' +down_revision: Union[str, Sequence[str], None] = 'faaed3b71428' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Add mock_manual_approval_required column to adapter_config table + op.add_column('adapter_config', sa.Column('mock_manual_approval_required', sa.Boolean(), nullable=True)) + + # Set default value to False for existing rows + op.execute("UPDATE adapter_config SET mock_manual_approval_required = false WHERE adapter_type = 'mock'") + + # Make the column non-nullable after setting defaults + op.alter_column('adapter_config', 'mock_manual_approval_required', nullable=False, server_default=sa.false()) + + +def downgrade() -> None: + """Downgrade schema.""" + # Remove mock_manual_approval_required column + op.drop_column('adapter_config', 'mock_manual_approval_required') diff --git a/run_all_tests.sh b/run_all_tests.sh index c62245509..4a302728f 100755 --- a/run_all_tests.sh +++ b/run_all_tests.sh @@ -67,10 +67,15 @@ echo "" setup_docker_stack() { echo -e "${BLUE}🐳 Starting complete Docker stack (PostgreSQL + servers)...${NC}" - # Clean up any existing containers/volumes - echo "Cleaning up any existing Docker services and volumes..." - docker-compose down -v 2>/dev/null || true - docker volume prune -f 2>/dev/null || true + # Use unique project name to isolate from local dev environment + # This ensures test containers don't interfere with your running local containers + local TEST_PROJECT_NAME="adcp-test-$$" # $$ = process ID, ensures uniqueness + export COMPOSE_PROJECT_NAME="$TEST_PROJECT_NAME" + + # Clean up ONLY this test project's containers/volumes (not your local dev!) + echo "Cleaning up any existing TEST containers (project: $TEST_PROJECT_NAME)..." + docker-compose -p "$TEST_PROJECT_NAME" down -v 2>/dev/null || true + # DO NOT run docker volume prune - that affects ALL Docker volumes! # If ports are still in use, find new ones if lsof -i :${POSTGRES_PORT} >/dev/null 2>&1; then @@ -116,15 +121,15 @@ print(' '.join(map(str, ports))) # Build and start services echo "Building Docker images (this may take 2-3 minutes on first run)..." - if ! docker-compose build --progress=plain 2>&1 | grep -E "(Step|#|Building|exporting)" | tail -20; then + if ! docker-compose -p "$TEST_PROJECT_NAME" build --progress=plain 2>&1 | grep -E "(Step|#|Building|exporting)" | tail -20; then echo -e "${RED}❌ Docker build failed${NC}" exit 1 fi echo "Starting Docker services..." - if ! docker-compose up -d; then + if ! docker-compose -p "$TEST_PROJECT_NAME" up -d; then echo -e "${RED}❌ Docker services failed to start${NC}" - docker-compose logs + docker-compose -p "$TEST_PROJECT_NAME" logs exit 1 fi @@ -143,7 +148,7 @@ print(' '.join(map(str, ports))) fi # Check PostgreSQL - if docker-compose exec -T postgres pg_isready -U adcp_user >/dev/null 2>&1; then + if docker-compose -p "$TEST_PROJECT_NAME" exec -T postgres pg_isready -U adcp_user >/dev/null 2>&1; then echo -e "${GREEN}✓ PostgreSQL is ready (${elapsed}s)${NC}" break fi @@ -154,12 +159,12 @@ print(' '.join(map(str, ports))) # Run migrations echo "Running database migrations..." # Use docker-compose exec to run migrations inside the container - if ! docker-compose exec -T postgres psql -U adcp_user -d postgres -c "CREATE DATABASE adcp_test" 2>/dev/null; then + if ! docker-compose -p "$TEST_PROJECT_NAME" exec -T postgres psql -U adcp_user -d postgres -c "CREATE DATABASE adcp_test" 2>/dev/null; then echo "Database adcp_test already exists, continuing..." fi - # Export for tests - export DATABASE_URL="postgresql://adcp_user:test_password@localhost:${POSTGRES_PORT}/adcp_test" + # Export for tests - MUST match docker-compose.yml POSTGRES_PASSWORD + export DATABASE_URL="postgresql://adcp_user:secure_password_change_me@localhost:${POSTGRES_PORT}/adcp_test" echo -e "${GREEN}✓ Docker stack is ready${NC}" echo " PostgreSQL: localhost:${POSTGRES_PORT}" @@ -170,8 +175,9 @@ print(' '.join(map(str, ports))) # Docker teardown function teardown_docker_stack() { - echo -e "${BLUE}🐳 Stopping Docker stack...${NC}" - docker-compose down -v 2>/dev/null || true + echo -e "${BLUE}🐳 Stopping TEST Docker stack (project: $COMPOSE_PROJECT_NAME)...${NC}" + docker-compose -p "$COMPOSE_PROJECT_NAME" down -v 2>/dev/null || true + echo -e "${GREEN}✓ Test containers cleaned up (your local dev containers are untouched)${NC}" } # Trap to ensure cleanup on exit diff --git a/src/a2a_server/adcp_a2a_server.py b/src/a2a_server/adcp_a2a_server.py index 204eabe40..17cfbeb30 100644 --- a/src/a2a_server/adcp_a2a_server.py +++ b/src/a2a_server/adcp_a2a_server.py @@ -2215,6 +2215,18 @@ def main(): extended_agent_card_url="/agent.json", ) + # Add CORS middleware for browser compatibility (must be added early to wrap all responses) + from starlette.middleware.cors import CORSMiddleware + + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Allow all origins for A2A protocol + allow_credentials=True, + allow_methods=["*"], # Allow all HTTP methods + allow_headers=["*"], # Allow all headers + ) + logger.info("CORS middleware enabled for browser compatibility") + # Override the agent card endpoints to support tenant-specific URLs def create_dynamic_agent_card(request) -> AgentCard: """Create agent card with tenant-specific URL from request headers.""" @@ -2265,12 +2277,26 @@ def get_protocol(hostname: str) -> str: async def dynamic_agent_discovery(request): """Override for /.well-known/agent.json with tenant-specific URL.""" + from starlette.responses import Response + + # Handle OPTIONS preflight requests (CORS middleware will add headers) + if request.method == "OPTIONS": + return Response(status_code=204) + dynamic_card = create_dynamic_agent_card(request) + # CORS middleware automatically adds CORS headers return JSONResponse(dynamic_card.model_dump()) async def dynamic_agent_card_endpoint(request): """Override for /agent.json with tenant-specific URL.""" + from starlette.responses import Response + + # Handle OPTIONS preflight requests (CORS middleware will add headers) + if request.method == "OPTIONS": + return Response(status_code=204) + dynamic_card = create_dynamic_agent_card(request) + # CORS middleware automatically adds CORS headers return JSONResponse(dynamic_card.model_dump()) # Find and replace the existing routes to ensure proper A2A specification compliance @@ -2279,15 +2305,15 @@ async def dynamic_agent_card_endpoint(request): if hasattr(route, "path"): if route.path == "/.well-known/agent.json": # Replace with our dynamic endpoint (legacy compatibility) - new_routes.append(Route("/.well-known/agent.json", dynamic_agent_discovery, methods=["GET"])) + new_routes.append(Route("/.well-known/agent.json", dynamic_agent_discovery, methods=["GET", "OPTIONS"])) logger.info("Replaced /.well-known/agent.json with dynamic version") elif route.path == "/.well-known/agent-card.json": # Replace with our dynamic endpoint (primary A2A discovery) - new_routes.append(Route("/.well-known/agent-card.json", dynamic_agent_discovery, methods=["GET"])) + new_routes.append(Route("/.well-known/agent-card.json", dynamic_agent_discovery, methods=["GET", "OPTIONS"])) logger.info("Replaced /.well-known/agent-card.json with dynamic version") elif route.path == "/agent.json": # Replace with our dynamic endpoint - new_routes.append(Route("/agent.json", dynamic_agent_card_endpoint, methods=["GET"])) + new_routes.append(Route("/agent.json", dynamic_agent_card_endpoint, methods=["GET", "OPTIONS"])) logger.info("Replaced /agent.json with dynamic version") else: new_routes.append(route) diff --git a/src/adapters/mock_ad_server.py b/src/adapters/mock_ad_server.py index 5b478a4be..8984f29af 100644 --- a/src/adapters/mock_ad_server.py +++ b/src/adapters/mock_ad_server.py @@ -706,10 +706,23 @@ def _create_media_buy_immediate( else: self.log(f"Would return: Campaign ID '{media_buy_id}' with status 'pending_creative'") + # Build packages response with buyer_ref from original request + response_packages = [] + for idx, pkg in enumerate(packages): + pkg_dict = pkg.model_dump() + self.log(f"[DEBUG] MockAdapter: Package {idx} model_dump() = {pkg_dict}") + self.log(f"[DEBUG] MockAdapter: Package {idx} has package_id = {pkg_dict.get('package_id')}") + # Add buyer_ref from original request package if available + if request.packages and idx < len(request.packages): + pkg_dict["buyer_ref"] = request.packages[idx].buyer_ref + response_packages.append(pkg_dict) + + self.log(f"[DEBUG] MockAdapter: Returning {len(response_packages)} packages in response") return CreateMediaBuyResponse( buyer_ref=request.buyer_ref, # Required field per AdCP spec media_buy_id=media_buy_id, creative_deadline=datetime.now(UTC) + timedelta(days=2), + packages=response_packages, # Include packages with buyer_ref errors=None, # No errors for successful mock response ) @@ -1114,6 +1127,34 @@ def update_media_buy( budget: int | None, today: datetime, ) -> UpdateMediaBuyResponse: + """Update media buy in database (Mock adapter implementation).""" + import logging + from sqlalchemy import select + from sqlalchemy.orm import attributes + + from src.core.database.database_session import get_db_session + from src.core.database.models import MediaBuy, MediaPackage + + logger = logging.getLogger(__name__) + + with get_db_session() as session: + if action == "update_package_budget" and package_id and budget is not None: + # Update package budget in MediaPackage.package_config JSON + stmt = select(MediaPackage).where( + MediaPackage.package_id == package_id, + MediaPackage.media_buy_id == media_buy_id + ) + media_package = session.scalars(stmt).first() + if media_package: + # Update budget in package_config JSON + media_package.package_config["budget"] = float(budget) + # Flag the JSON field as modified so SQLAlchemy persists it + attributes.flag_modified(media_package, "package_config") + session.commit() + logger.info(f"[MockAdapter] Updated package {package_id} budget to {budget} in database") + else: + logger.warning(f"[MockAdapter] Package {package_id} not found for media buy {media_buy_id}") + return UpdateMediaBuyResponse(media_buy_id=media_buy_id, buyer_ref=buyer_ref) def get_config_ui_endpoint(self) -> str | None: diff --git a/src/admin/blueprints/creatives.py b/src/admin/blueprints/creatives.py index e64acb918..03e9584af 100644 --- a/src/admin/blueprints/creatives.py +++ b/src/admin/blueprints/creatives.py @@ -330,22 +330,20 @@ def approve_creative(tenant_id, creative_id, **kwargs): db_session.commit() - # Find webhook_url from workflow step if it exists - from src.core.database.models import ObjectWorkflowMapping, WorkflowStep - - stmt = select(ObjectWorkflowMapping).filter_by(object_type="creative", object_id=creative_id) - mapping = db_session.scalars(stmt).first() - - webhook_url = None - if mapping: - stmt = select(WorkflowStep).filter_by(step_id=mapping.step_id) - workflow_step = db_session.scalars(stmt).first() - if workflow_step and workflow_step.request_data: - webhook_url = workflow_step.request_data.get("webhook_url") - - # Call webhook if configured - if webhook_url: - creative_data = { + # Send webhook notification to principal + from src.core.database.models import PushNotificationConfig + + stmt_webhook = select(PushNotificationConfig).filter_by( + tenant_id=tenant_id, + principal_id=creative.principal_id, + is_active=True + ).order_by(PushNotificationConfig.created_at.desc()) + webhook_config = db_session.scalars(stmt_webhook).first() + + if webhook_config: + import requests + webhook_payload = { + "event": "creative_approved", "creative_id": creative.creative_id, "name": creative.name, "format": creative.format, @@ -353,7 +351,11 @@ def approve_creative(tenant_id, creative_id, **kwargs): "approved_by": approved_by, "approved_at": creative.approved_at.isoformat(), } - _call_webhook_for_creative_status(webhook_url, creative_id, "approved", creative_data, tenant_id) + try: + requests.post(webhook_config.url, json=webhook_payload, timeout=10) + logger.info(f"Sent webhook notification for approved creative {creative_id}") + except Exception as webhook_err: + logger.warning(f"Failed to send creative approval webhook: {webhook_err}") # Send Slack notification if configured stmt_tenant = select(Tenant).filter_by(tenant_id=tenant_id) @@ -475,22 +477,20 @@ def reject_creative(tenant_id, creative_id, **kwargs): db_session.commit() - # Find webhook_url from workflow step if it exists - from src.core.database.models import ObjectWorkflowMapping, WorkflowStep - - stmt = select(ObjectWorkflowMapping).filter_by(object_type="creative", object_id=creative_id) - mapping = db_session.scalars(stmt).first() + # Send webhook notification to principal + from src.core.database.models import PushNotificationConfig - webhook_url = None - if mapping: - stmt = select(WorkflowStep).filter_by(step_id=mapping.step_id) - workflow_step = db_session.scalars(stmt).first() - if workflow_step and workflow_step.request_data: - webhook_url = workflow_step.request_data.get("webhook_url") - - # Call webhook if configured - if webhook_url: - creative_data = { + stmt_webhook = select(PushNotificationConfig).filter_by( + tenant_id=tenant_id, + principal_id=creative.principal_id, + is_active=True + ).order_by(PushNotificationConfig.created_at.desc()) + webhook_config = db_session.scalars(stmt_webhook).first() + + if webhook_config: + import requests + webhook_payload = { + "event": "creative_rejected", "creative_id": creative.creative_id, "name": creative.name, "format": creative.format, @@ -499,7 +499,11 @@ def reject_creative(tenant_id, creative_id, **kwargs): "rejection_reason": rejection_reason, "rejected_at": creative.data["rejected_at"], } - _call_webhook_for_creative_status(webhook_url, creative_id, "rejected", creative_data, tenant_id) + try: + requests.post(webhook_config.url, json=webhook_payload, timeout=10) + logger.info(f"Sent webhook notification for rejected creative {creative_id}") + except Exception as webhook_err: + logger.warning(f"Failed to send creative rejection webhook: {webhook_err}") # Send Slack notification if configured stmt_tenant = select(Tenant).filter_by(tenant_id=tenant_id) diff --git a/src/admin/blueprints/operations.py b/src/admin/blueprints/operations.py index 8eac41eab..64f90e00f 100644 --- a/src/admin/blueprints/operations.py +++ b/src/admin/blueprints/operations.py @@ -160,7 +160,7 @@ def media_buy_detail(tenant_id, media_buy_id): from src.core.context_manager import ContextManager from src.core.database.database_session import get_db_session - from src.core.database.models import Creative, CreativeAssignment, MediaBuy, Principal, WorkflowStep + from src.core.database.models import Creative, CreativeAssignment, MediaBuy, MediaPackage, Principal, Product, WorkflowStep try: with get_db_session() as db_session: @@ -177,6 +177,24 @@ def media_buy_detail(tenant_id, media_buy_id): stmt = select(Principal).filter_by(tenant_id=tenant_id, principal_id=media_buy.principal_id) principal = db_session.scalars(stmt).first() + # Get packages for this media buy from MediaPackage table + stmt = select(MediaPackage).filter_by(media_buy_id=media_buy_id) + media_packages = db_session.scalars(stmt).all() + + packages = [] + for media_pkg in media_packages: + # Extract product_id from package_config JSONB + product_id = media_pkg.package_config.get("product_id") + product = None + if product_id: + stmt = select(Product).filter_by(tenant_id=tenant_id, product_id=product_id) + product = db_session.scalars(stmt).first() + + packages.append({ + "package": media_pkg, + "product": product, + }) + # Get creative assignments for this media buy stmt = ( select(CreativeAssignment, Creative) @@ -213,6 +231,11 @@ def media_buy_detail(tenant_id, media_buy_id): pending_approval_step = db_session.scalars(stmt).first() break + # Get computed readiness state (not just raw database status) + from src.admin.services.media_buy_readiness_service import MediaBuyReadinessService + readiness = MediaBuyReadinessService.get_readiness_state(media_buy_id, tenant_id, db_session) + computed_state = readiness["state"] + # Determine status message status_message = None if pending_approval_step: @@ -232,10 +255,13 @@ def media_buy_detail(tenant_id, media_buy_id): tenant_id=tenant_id, media_buy=media_buy, principal=principal, + packages=packages, workflow_steps=workflow_steps, pending_approval_step=pending_approval_step, status_message=status_message, creative_assignments_by_package=creative_assignments_by_package, + computed_state=computed_state, + readiness=readiness, ) except Exception as e: logger.error(f"Error viewing media buy: {e}", exc_info=True) @@ -296,8 +322,80 @@ def approve_media_buy(tenant_id, media_buy_id, **kwargs): ) attributes.flag_modified(step, "comments") - db_session.commit() - flash("Media buy approved successfully", "success") + # Get the media buy and update status + from src.core.database.models import MediaBuy, PushNotificationConfig + + stmt_buy = select(MediaBuy).filter_by(media_buy_id=media_buy_id, tenant_id=tenant_id) + media_buy = db_session.scalars(stmt_buy).first() + + if media_buy and media_buy.status == "pending_approval": + # Check if all creatives are approved before moving to scheduled + from src.core.database.models import Creative, CreativeAssignment + + stmt_assignments = select(CreativeAssignment).filter_by( + tenant_id=tenant_id, + media_buy_id=media_buy_id + ) + assignments = db_session.scalars(stmt_assignments).all() + + all_creatives_approved = True + if assignments: + creative_ids = [a.creative_id for a in assignments] + stmt_creatives = select(Creative).filter( + Creative.tenant_id == tenant_id, + Creative.creative_id.in_(creative_ids) + ) + creatives = db_session.scalars(stmt_creatives).all() + + # Check if any creatives are not approved + for creative in creatives: + if creative.status != "approved": + all_creatives_approved = False + break + else: + # No creatives assigned yet + all_creatives_approved = False + + # Update status based on creative approval state + if all_creatives_approved: + media_buy.status = "scheduled" + else: + # Keep it in a state that shows it needs creative approval + # Use "draft" which will be displayed as "needs_approval" or "needs_creatives" by readiness service + media_buy.status = "draft" + + media_buy.approved_at = datetime.now(UTC) + media_buy.approved_by = user_email + db_session.commit() + + # Send webhook notification to buyer + stmt_webhook = select(PushNotificationConfig).filter_by( + tenant_id=tenant_id, + principal_id=media_buy.principal_id, + is_active=True + ).order_by(PushNotificationConfig.created_at.desc()) + webhook_config = db_session.scalars(stmt_webhook).first() + + if webhook_config: + import requests + webhook_payload = { + "event": "media_buy_approved", + "media_buy_id": media_buy_id, + "buyer_ref": media_buy.buyer_ref, + "status": "scheduled", + "approved_at": media_buy.approved_at.isoformat(), + "approved_by": user_email, + } + try: + requests.post(webhook_config.url, json=webhook_payload, timeout=10) + logger.info(f"Sent webhook notification for approved media buy {media_buy_id}") + except Exception as webhook_err: + logger.warning(f"Failed to send webhook notification: {webhook_err}") + + flash("Media buy approved and scheduled successfully", "success") + else: + db_session.commit() + flash("Media buy approved successfully", "success") elif action == "reject": step.status = "rejected" diff --git a/src/admin/blueprints/settings.py b/src/admin/blueprints/settings.py index dcbc6bfd0..0c2d903d4 100644 --- a/src/admin/blueprints/settings.py +++ b/src/admin/blueprints/settings.py @@ -351,9 +351,12 @@ def update_adapter(tenant_id): elif new_adapter == "mock": if request.is_json: dry_run = request.json.get("mock_dry_run", False) + manual_approval = request.json.get("mock_manual_approval", False) else: dry_run = request.form.get("mock_dry_run") == "on" + manual_approval = request.form.get("mock_manual_approval") == "on" adapter_config_obj.mock_dry_run = dry_run + adapter_config_obj.mock_manual_approval_required = manual_approval # Update the tenant tenant.ad_server = new_adapter @@ -877,10 +880,18 @@ def update_business_rules(tenant_id): # Update approval workflow if "human_review_required" in data: - tenant.human_review_required = data.get("human_review_required") in [True, "true", "on", 1, "1"] + manual_approval_value = data.get("human_review_required") in [True, "true", "on", 1, "1"] + tenant.human_review_required = manual_approval_value + + # Also update the adapter's manual approval setting if using Mock adapter + if tenant.adapter_config and tenant.adapter_config.adapter_type == "mock": + tenant.adapter_config.mock_manual_approval_required = manual_approval_value elif not request.is_json: # Checkbox not present in form data means unchecked tenant.human_review_required = False + # Also update Mock adapter setting if applicable + if tenant.adapter_config and tenant.adapter_config.adapter_type == "mock": + tenant.adapter_config.mock_manual_approval_required = False # Update creative review settings if "approval_mode" in data: diff --git a/src/admin/services/media_buy_readiness_service.py b/src/admin/services/media_buy_readiness_service.py index cbd7c41c3..ae0f8afd7 100644 --- a/src/admin/services/media_buy_readiness_service.py +++ b/src/admin/services/media_buy_readiness_service.py @@ -236,11 +236,11 @@ def _compute_state( State hierarchy (in priority order): 1. failed - Media buy creation failed 2. paused - Explicitly paused - 3. completed - Flight ended - 4. live - Currently serving (in flight, all creatives approved, no blockers) - 5. scheduled - Ready and waiting for start date - 6. needs_approval - Has pending creatives - 7. needs_creatives - Missing creative assignments or has rejected creatives + 3. needs_approval - Media buy itself awaiting manual approval (NOT creative approval) + 4. completed - Flight ended + 5. live - Currently serving (in flight, all creatives approved, no blockers) + 6. scheduled - Ready and waiting for start date + 7. needs_creatives - Creatives need action (missing, pending approval, or rejected) 8. draft - Initial state, not configured """ # Check explicit status first @@ -250,6 +250,10 @@ def _compute_state( if media_buy.status == "paused": return "paused" + # Check if awaiting manual approval (highest priority - bypasses creative checks) + if media_buy.status == "pending_approval": + return "needs_approval" + # Check flight timing - ensure timezone-aware datetimes if media_buy.start_time: start_time = ( @@ -282,12 +286,14 @@ def _compute_state( if packages_total == 0: return "draft" - # Needs approval: has pending creatives - if creatives_pending > 0: - return "needs_approval" - - # Needs creatives: missing assignments, has rejected creatives, or has packages but no creatives - if packages_total > packages_with_creatives or creatives_rejected > 0 or creatives_total == 0: + # Needs creatives: ANY creative action needed (missing, pending, or rejected) + # This includes creatives pending approval - they are still in "needs creatives" state + if ( + packages_total > packages_with_creatives # Missing creative assignments + or creatives_pending > 0 # Creatives pending approval + or creatives_rejected > 0 # Rejected creatives need replacement + or creatives_total == 0 # No creatives at all + ): return "needs_creatives" # Fallback (shouldn't reach here if logic is complete) diff --git a/src/core/auth_utils.py b/src/core/auth_utils.py index cf5a376c9..9c3d69404 100644 --- a/src/core/auth_utils.py +++ b/src/core/auth_utils.py @@ -33,17 +33,6 @@ def _lookup_principal(session): principal = session.scalars(stmt).first() if principal: return principal.principal_id - - # Also check if it's the admin token for this specific tenant - stmt = select(Tenant).filter_by(tenant_id=tenant_id, is_active=True) - tenant = session.scalars(stmt).first() - if tenant and token == tenant.admin_token: - # Set tenant context for admin token - from src.core.utils.tenant_utils import serialize_tenant_to_dict - - tenant_dict = serialize_tenant_to_dict(tenant) - set_current_tenant(tenant_dict) - return f"admin_{tenant.tenant_id}" else: # No tenant specified - search globally stmt = select(Principal).filter_by(access_token=token) diff --git a/src/core/context_manager.py b/src/core/context_manager.py index 6fb9b4242..b619f7847 100644 --- a/src/core/context_manager.py +++ b/src/core/context_manager.py @@ -515,6 +515,53 @@ def get_contexts_for_principal(self, tenant_id: str, principal_id: str, limit: i finally: session.close() + def link_workflow_to_object( + self, + step_id: str, + object_type: str, + object_id: str, + action: str | None = None, + ) -> None: + """Link a workflow step to an object after the step is created. + + This is useful when you need to associate objects with a workflow step + after the step has already been created. + + Args: + step_id: The workflow step ID + object_type: Type of object (media_buy, creative, product, etc.) + object_id: The object's ID + action: Optional action being performed (defaults to step_type) + """ + session = self.session + try: + # Get the step to use its step_type as default action + stmt = select(WorkflowStep).filter_by(step_id=step_id) + step = session.scalars(stmt).first() + + if not step: + console.print(f"[yellow]⚠️ Step {step_id} not found, cannot link object[/yellow]") + return + + obj_mapping = ObjectWorkflowMapping( + object_type=object_type, + object_id=object_id, + step_id=step_id, + action=action or step.step_type, + created_at=datetime.now(UTC), + ) + session.add(obj_mapping) + session.commit() + console.print( + f"[green]✅ Linked {object_type} {object_id} to workflow step {step_id}[/green]" + ) + except Exception as e: + session.rollback() + console.print(f"[red]Failed to link object to workflow: {e}[/red]") + raise + finally: + session.close() + def _send_push_notifications(self, step: WorkflowStep, new_status: str, session: Any) -> None: """Send push notifications via registered webhooks for workflow step status changes. diff --git a/src/core/database/models.py b/src/core/database/models.py index 6bb15793f..2c5086010 100644 --- a/src/core/database/models.py +++ b/src/core/database/models.py @@ -504,6 +504,26 @@ class MediaBuy(Base): ) +class MediaPackage(Base): + """Media package model for structured querying of media buy packages. + + Stores packages separately from MediaBuy.raw_request for efficient lookups + by package_id, which is needed for creative assignments. + """ + __tablename__ = "media_packages" + + media_buy_id: Mapped[str] = mapped_column( + String(100), ForeignKey("media_buys.media_buy_id"), primary_key=True, nullable=False + ) + package_id: Mapped[str] = mapped_column(String(100), primary_key=True, nullable=False) + package_config: Mapped[dict] = mapped_column(JSONType, nullable=False) + + __table_args__ = ( + Index("idx_media_packages_media_buy", "media_buy_id"), + Index("idx_media_packages_package", "package_id"), + ) + + # DEPRECATED: Task and HumanTask models removed - replaced by WorkflowStep system # Tables may still exist in database for backward compatibility but are not used by application # Dashboard now uses only audit_logs table for activity tracking @@ -595,6 +615,9 @@ class AdapterConfig(Base): kevel_api_key: Mapped[str | None] = mapped_column(String(100), nullable=True) kevel_manual_approval_required: Mapped[bool] = mapped_column(Boolean, default=False) + # Mock + mock_manual_approval_required: Mapped[bool] = mapped_column(Boolean, default=False) + # Triton triton_station_id: Mapped[str | None] = mapped_column(String(50), nullable=True) triton_api_key: Mapped[str | None] = mapped_column(String(100), nullable=True) diff --git a/src/core/main.py b/src/core/main.py index 069dc4626..a69e29b52 100644 --- a/src/core/main.py +++ b/src/core/main.py @@ -286,15 +286,7 @@ def get_principal_from_token(token: str, tenant_id: str | None = None) -> str | # Also check if it's the admin token for this specific tenant stmt = select(Tenant).filter_by(tenant_id=tenant_id, is_active=True) tenant = session.scalars(stmt).first() - if tenant and token == tenant.admin_token: - console.print(f"[green]Token matches admin token for tenant '{tenant_id}'[/green]") - # Set tenant context for admin token - from src.core.utils.tenant_utils import serialize_tenant_to_dict - - tenant_dict = serialize_tenant_to_dict(tenant) - set_current_tenant(tenant_dict) - return f"{tenant_id}_admin" - console.print(f"[red]Token not found in tenant '{tenant_id}' and doesn't match admin token[/red]") + console.print(f"[red]Token not found in tenant '{tenant_id}'[/red]") return None else: console.print(f"[green]Found principal '{principal.principal_id}' in tenant '{tenant_id}'[/green]") @@ -335,17 +327,6 @@ def get_principal_from_token(token: str, tenant_id: str | None = None) -> str | f"[bold green]Set tenant context to '{tenant.tenant_id}' (from principal)[/bold green]" ) - # Check if this is the admin token for the tenant - if token == tenant.admin_token: - return f"{tenant.tenant_id}_admin" - else: - # Tenant was already set by caller - just check admin token - stmt = select(Tenant).filter_by(tenant_id=tenant_id, is_active=True) - tenant = session.scalars(stmt).first() - if tenant and token == tenant.admin_token: - console.print(f"[green]Token is admin token for tenant '{tenant_id}'[/green]") - return f"{tenant_id}_admin" - return principal.principal_id @@ -746,6 +727,7 @@ def get_adapter(principal: Principal, dry_run: bool = False, testing_context=Non adapter_type = config_row.adapter_type if adapter_type == "mock": adapter_config["dry_run"] = config_row.mock_dry_run + adapter_config["manual_approval_required"] = config_row.mock_manual_approval_required elif adapter_type == "google_ad_manager": adapter_config["network_code"] = config_row.gam_network_code adapter_config["refresh_token"] = config_row.gam_refresh_token @@ -770,6 +752,9 @@ def get_adapter(principal: Principal, dry_run: bool = False, testing_context=Non adapter_config["network_id"] = config_row.kevel_network_id adapter_config["api_key"] = config_row.kevel_api_key adapter_config["manual_approval_required"] = config_row.kevel_manual_approval_required + elif adapter_type == "mock": + adapter_config["dry_run"] = config_row.mock_dry_run or False + adapter_config["manual_approval_required"] = config_row.mock_manual_approval_required or False elif adapter_type == "triton": adapter_config["station_id"] = config_row.triton_station_id adapter_config["api_key"] = config_row.triton_api_key @@ -777,7 +762,8 @@ def get_adapter(principal: Principal, dry_run: bool = False, testing_context=Non if not selected_adapter: # Default to mock if no adapter specified selected_adapter = "mock" - adapter_config = {"enabled": True} + if not adapter_config: + adapter_config = {"enabled": True} # Create the appropriate adapter instance with tenant_id and testing context tenant_id = tenant["tenant_id"] @@ -1143,13 +1129,20 @@ def _verify_principal(media_buy_id: str, context: Context): principal_id = _get_principal_id_from_context(context) tenant = get_current_tenant() - # Query database for media buy + # Query database for media buy (try media_buy_id first, then buyer_ref) with get_db_session() as session: stmt = select(MediaBuyModel).where( MediaBuyModel.media_buy_id == media_buy_id, MediaBuyModel.tenant_id == tenant["tenant_id"] ) media_buy = session.scalars(stmt).first() + # If not found by media_buy_id, try buyer_ref (for backwards compatibility) + if not media_buy: + stmt = select(MediaBuyModel).where( + MediaBuyModel.buyer_ref == media_buy_id, MediaBuyModel.tenant_id == tenant["tenant_id"] + ) + media_buy = session.scalars(stmt).first() + if not media_buy: raise ValueError(f"Media buy '{media_buy_id}' not found.") @@ -2465,25 +2458,35 @@ def _sync_creatives_impl( f"variants={len(preview_result.get('previews', []))}" ) else: - # Preview generation failed for update - creative is invalid - error_msg = f"Creative validation failed: preview_creative returned no previews for update of {existing_creative.creative_id}" - logger.error(f"[sync_creatives] {error_msg}") - failed_creatives.append( - { - "creative_id": existing_creative.creative_id, - "error": error_msg, - "format": creative_format, - } - ) - failed_count += 1 - results.append( - SyncCreativeResult( - creative_id=existing_creative.creative_id, - action="failed", - errors=[error_msg], + # Preview generation returned no previews + # Only acceptable if creative has a media_url (direct URL to creative asset) + has_media_url = bool(creative.get("url") or data.get("url")) + + if has_media_url: + # Static creatives with media_url don't need previews + warning_msg = f"Preview generation returned no previews for {existing_creative.creative_id} (static creative with media_url)" + logger.warning(f"[sync_creatives] {warning_msg}") + # Continue with update - preview is optional for static creatives + else: + # Creative agent should have generated previews but didn't + error_msg = f"Preview generation failed for {existing_creative.creative_id}: no previews returned and no media_url provided" + logger.error(f"[sync_creatives] {error_msg}") + failed_creatives.append( + { + "creative_id": existing_creative.creative_id, + "error": error_msg, + "format": creative_format, + } ) - ) - continue # Skip this creative update + failed_count += 1 + results.append( + SyncCreativeResult( + creative_id=existing_creative.creative_id, + action="failed", + errors=[error_msg], + ) + ) + continue # Skip this creative, move to next except Exception as validation_error: # Creative agent validation failed for update (network error, agent down, etc.) @@ -2795,25 +2798,35 @@ def _sync_creatives_impl( f"variants={len(preview_result.get('previews', []))}" ) else: - # Preview generation failed - creative is invalid - error_msg = f"Creative validation failed: preview_creative returned no previews for {creative_id}" - logger.error(f"[sync_creatives] {error_msg}") - failed_creatives.append( - { - "creative_id": creative_id, - "error": error_msg, - "format": creative_format, - } - ) - failed_count += 1 - results.append( - SyncCreativeResult( - creative_id=creative_id, - action="failed", - errors=[error_msg], + # Preview generation returned no previews + # Only acceptable if creative has a media_url (direct URL to creative asset) + has_media_url = bool(creative.get("url") or data.get("url")) + + if has_media_url: + # Static creatives with media_url don't need previews + warning_msg = f"Preview generation returned no previews for {creative_id} (static creative with media_url)" + logger.warning(f"[sync_creatives] {warning_msg}") + # Continue with creative creation - preview is optional for static creatives + else: + # Creative agent should have generated previews but didn't + error_msg = f"Preview generation failed for {creative_id}: no previews returned and no media_url provided" + logger.error(f"[sync_creatives] {error_msg}") + failed_creatives.append( + { + "creative_id": creative_id, + "error": error_msg, + "format": creative_format, + } ) - ) - continue # Skip this creative + failed_count += 1 + results.append( + SyncCreativeResult( + creative_id=creative_id, + action="failed", + errors=[error_msg], + ) + ) + continue # Skip this creative, move to next except Exception as validation_error: # Creative agent validation failed (network error, agent down, etc.) @@ -4930,6 +4943,13 @@ async def _create_media_buy_impl( adapter.manual_approval_operations if hasattr(adapter, "manual_approval_operations") else [] ) + # DEBUG: Log manual approval settings + logger.info( + f"[DEBUG] Manual approval check - required: {manual_approval_required}, " + f"operations: {manual_approval_operations}, " + f"adapter type: {adapter.__class__.__name__}" + ) + # Check if auto-creation is disabled in tenant config auto_create_enabled = tenant.get("auto_create_media_buys", True) product_auto_create = True # Will be set correctly when we get products later @@ -4937,11 +4957,15 @@ async def _create_media_buy_impl( if manual_approval_required and "create_media_buy" in manual_approval_operations: # Update existing workflow step to require approval ctx_manager.update_workflow_step( - step.step_id, status="requires_approval", step_type="approval", owner="publisher" + step.step_id, + status="requires_approval", + add_comment={"user": "system", "comment": "Manual approval required for media buy creation"} ) # Workflow step already created above - no need for separate task - pending_media_buy_id = f"pending_{uuid.uuid4().hex[:8]}" + # Generate permanent media buy ID (not "pending_xxx") + # This ID will be used whether pending or approved - only status changes + media_buy_id = f"mb_{uuid.uuid4().hex[:12]}" response_msg = ( f"Manual approval required. Workflow Step ID: {step.step_id}. Context ID: {persistent_ctx.context_id}" @@ -4975,7 +4999,7 @@ async def _create_media_buy_impl( slack_notifier.notify_media_buy_event( event_type="approval_required", - media_buy_id=pending_media_buy_id, + media_buy_id=media_buy_id, principal_name=principal_name, details=notification_details, tenant_name=tenant.get("name", "Unknown"), @@ -4986,11 +5010,123 @@ async def _create_media_buy_impl( except Exception as e: console.print(f"[yellow]⚠️ Failed to send manual approval Slack notification: {e}[/yellow]") + # Generate permanent package IDs (not dependent on media buy ID) + # These IDs will be used whether the media buy is pending or approved + pending_packages = [] + raw_request_dict = req.model_dump(mode="json") # Serialize datetimes to JSON-compatible format + + for idx, pkg in enumerate(req.packages, 1): + # Generate permanent package ID using product_id and index + # Format: pkg_{product_id}_{timestamp_part}_{idx} + import secrets + package_id = f"pkg_{pkg.product_id}_{secrets.token_hex(4)}_{idx}" + + # Use product_id or buyer_ref for package name since Package schema doesn't have 'name' + pkg_name = f"Package {idx}" + if pkg.product_id: + pkg_name = f"{pkg.product_id} - Package {idx}" + elif pkg.buyer_ref: + pkg_name = f"{pkg.buyer_ref} - Package {idx}" + + # Serialize the full package to include all fields (budget, targeting, etc.) + # Use model_dump_internal to get complete package data + if hasattr(pkg, "model_dump_internal"): + pkg_dict = pkg.model_dump_internal() + elif hasattr(pkg, "model_dump"): + pkg_dict = pkg.model_dump(exclude_none=True, mode="python") + else: + pkg_dict = {} + + # Build response with complete package data (matching auto-approval path) + pending_packages.append({ + **pkg_dict, # Include all package fields (budget, targeting_overlay, creative_ids, etc.) + "package_id": package_id, + "name": pkg_name, + "buyer_ref": pkg.buyer_ref, # Include buyer_ref from request package + "status": TaskStatus.INPUT_REQUIRED, # Consistent with TaskStatus enum (requires approval) + }) + + # Update the package in raw_request with the generated package_id so UI can find it + raw_request_dict["packages"][idx - 1]["package_id"] = package_id + + # Create media buy record in the database with permanent ID + # Status is "pending_approval" but the ID is final + with get_db_session() as session: + pending_buy = MediaBuy( + media_buy_id=media_buy_id, + buyer_ref=req.buyer_ref, + principal_id=principal.principal_id, + tenant_id=tenant["tenant_id"], + status="pending_approval", + order_name=f"{req.buyer_ref} - {start_time.strftime('%Y-%m-%d')}", + advertiser_name=principal.name, + budget=total_budget, + currency=request_currency or "USD", # Use request_currency from validation above + start_date=start_time.date(), + end_date=end_time.date(), + start_time=start_time, + end_time=end_time, + raw_request=raw_request_dict, # Now includes package_id in each package + created_at=datetime.now(UTC), + ) + session.add(pending_buy) + session.commit() + console.print(f"[green]✅ Created media buy {media_buy_id} with status=pending_approval[/green]") + + # Create MediaPackage records for structured querying + # This enables the UI to display packages and creative assignments to work properly + with get_db_session() as session: + from src.core.database.models import MediaPackage as DBMediaPackage + + for pkg_data in pending_packages: + package_config = { + "package_id": pkg_data["package_id"], + "name": pkg_data.get("name"), + "status": pkg_data.get("status"), + } + # Add full package data from raw_request + for idx, req_pkg in enumerate(req.packages): + if idx == pending_packages.index(pkg_data): + package_config.update({ + "product_id": req_pkg.product_id, + "budget": req_pkg.budget.model_dump() if req_pkg.budget else None, + "targeting_overlay": req_pkg.targeting_overlay.model_dump() if req_pkg.targeting_overlay else None, + "creative_ids": req_pkg.creative_ids, + "format_ids_to_provide": req_pkg.format_ids_to_provide, + }) + break + + db_package = DBMediaPackage( + media_buy_id=media_buy_id, + package_id=pkg_data["package_id"], + package_config=package_config, + ) + session.add(db_package) + + session.commit() + console.print(f"[green]✅ Created {len(pending_packages)} MediaPackage records[/green]") + + # Link the workflow step to the media buy so the approval button shows in UI + with get_db_session() as session: + from src.core.database.models import ObjectWorkflowMapping + mapping = ObjectWorkflowMapping( + object_type="media_buy", + object_id=media_buy_id, + step_id=step.step_id, + action="create" + ) + session.add(mapping) + session.commit() + console.print(f"[green]✅ Linked workflow step {step.step_id} to media buy[/green]") + + # Return success response with packages awaiting approval + # The workflow_step_id in packages indicates approval is required return CreateMediaBuyResponse( buyer_ref=req.buyer_ref, - media_buy_id=pending_media_buy_id, + media_buy_id=media_buy_id, creative_deadline=None, - errors=[{"code": "APPROVAL_REQUIRED", "message": response_msg}], + packages=pending_packages, + workflow_step_id=step.step_id, # Client can track approval via this ID ) # Get products for the media buy to check product-level auto-creation settings @@ -5060,11 +5196,37 @@ async def _create_media_buy_impl( ) # Workflow step already created above - no need for separate task - pending_media_buy_id = f"pending_{uuid.uuid4().hex[:8]}" + # Generate permanent media buy ID (not "pending_xxx") + media_buy_id = f"mb_{uuid.uuid4().hex[:12]}" response_msg = f"Media buy requires approval due to {reason.lower()}. Workflow Step ID: {step.step_id}. Context ID: {persistent_ctx.context_id}" ctx_manager.add_message(persistent_ctx.context_id, "assistant", response_msg) + # Generate permanent package IDs and prepare response packages + response_packages = [] + for idx, pkg in enumerate(req.packages, 1): + # Generate permanent package ID + import secrets + package_id = f"pkg_{pkg.product_id}_{secrets.token_hex(4)}_{idx}" + + # Serialize the full package to include all fields (budget, targeting, etc.) + # Use model_dump_internal to get complete package data + if hasattr(pkg, "model_dump_internal"): + pkg_dict = pkg.model_dump_internal() + elif hasattr(pkg, "model_dump"): + pkg_dict = pkg.model_dump(exclude_none=True, mode="python") + else: + pkg_dict = {} + + # Build response with complete package data (matching auto-approval path) + response_packages.append({ + **pkg_dict, # Include all package fields (budget, targeting_overlay, creative_ids, etc.) + "package_id": package_id, + "name": f"{pkg.product_id} - Package {idx}", + "buyer_ref": pkg.buyer_ref, # Include buyer_ref from request + "status": TaskStatus.INPUT_REQUIRED, # Consistent with TaskStatus enum (requires approval) + }) + # Send Slack notification for configuration-based approval requirement try: # Get principal name for notification @@ -5095,7 +5257,7 @@ async def _create_media_buy_impl( slack_notifier.notify_media_buy_event( event_type="config_approval_required", - media_buy_id=pending_media_buy_id, + media_buy_id=media_buy_id, principal_name=principal_name, details=notification_details, tenant_name=tenant.get("name", "Unknown"), @@ -5108,6 +5270,8 @@ async def _create_media_buy_impl( return CreateMediaBuyResponse( buyer_ref=req.buyer_ref, + media_buy_id=media_buy_id, + packages=response_packages, # Include packages with buyer_ref workflow_step_id=step.step_id, ) @@ -5124,14 +5288,14 @@ async def _create_media_buy_impl( # Convert products to MediaPackages # If req.packages provided, use format_ids from request; otherwise use product.formats packages = [] - for product in products_in_buy: + for idx, product in enumerate(products_in_buy, 1): # Determine format_ids to use format_ids_to_use = [] # Check if this product has a corresponding package in the request with format_ids + matching_package = None if req.packages: # Find the package for this product - matching_package = None for pkg in req.packages: if pkg.product_id == product.product_id: matching_package = pkg @@ -5231,14 +5395,31 @@ def format_display(url: str | None, fid: str) -> str: if first_option.rate: cpm = float(first_option.rate) + # Generate permanent package ID (not product_id) + import secrets + package_id = f"pkg_{product.product_id}_{secrets.token_hex(4)}_{idx}" + + # Get buyer_ref and budget from matching request package if available + buyer_ref = None + budget = None + if matching_package: + if hasattr(matching_package, 'buyer_ref'): + buyer_ref = matching_package.buyer_ref + if hasattr(matching_package, 'budget'): + budget = matching_package.budget + packages.append( MediaPackage( - package_id=product.product_id, + package_id=package_id, name=product.name, delivery_type=product.delivery_type, cpm=cpm, impressions=int(total_budget / cpm * 1000), format_ids=format_ids_to_use, + targeting_overlay=matching_package.targeting_overlay if matching_package and hasattr(matching_package, 'targeting_overlay') else None, + buyer_ref=buyer_ref, + product_id=product.product_id, # Include product_id + budget=budget, # Include budget from request ) ) @@ -5257,6 +5438,10 @@ def format_display(url: str | None, fid: str) -> str: # Pass package_pricing_info for pricing model support (AdCP PR #88) try: response = adapter.create_media_buy(req, packages, start_time, end_time, package_pricing_info) + logger.info(f"[DEBUG] create_media_buy: Adapter returned response with {len(response.packages) if response.packages else 0} packages") + if response.packages: + for i, pkg in enumerate(response.packages): + logger.info(f"[DEBUG] create_media_buy: Response package {i} = {pkg}") except Exception as adapter_error: import traceback @@ -5300,6 +5485,51 @@ def format_display(url: str | None, fid: str) -> str: session.add(new_media_buy) session.commit() + # Populate media_packages table for structured querying + # This enables creative_assignments to work properly + if req.packages or (response.packages and len(response.packages) > 0): + with get_db_session() as session: + from src.core.database.models import MediaPackage as DBMediaPackage + + # Use response packages if available (has package_ids), otherwise generate from request + packages_to_save = response.packages if response.packages else [] + logger.info(f"[DEBUG] Saving {len(packages_to_save)} packages to media_packages table") + + for i, resp_package in enumerate(packages_to_save): + # Extract package_id from response - MUST be present, no fallback allowed + package_id = resp_package.get("package_id") + logger.info(f"[DEBUG] Package {i}: resp_package.get('package_id') = {package_id}") + + if not package_id: + error_msg = f"Adapter did not return package_id for package {i}. This is a critical bug in the adapter." + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info(f"[DEBUG] Package {i}: Using package_id = {package_id}") + + # Store full package config as JSON + package_config = { + "package_id": package_id, + "name": resp_package.get("name"), # Include package name from adapter response + "product_id": resp_package.get("product_id"), + "budget": resp_package.get("budget"), + "targeting_overlay": resp_package.get("targeting_overlay"), + "creative_ids": resp_package.get("creative_ids"), + "creative_assignments": resp_package.get("creative_assignments"), + "format_ids_to_provide": resp_package.get("format_ids_to_provide"), + "status": resp_package.get("status"), + } + + db_package = DBMediaPackage( + media_buy_id=response.media_buy_id, + package_id=package_id, + package_config=package_config, + ) + session.add(db_package) + + session.commit() + logger.info(f"Saved {len(packages_to_save)} packages to media_packages table for media_buy {response.media_buy_id}") + # Handle creative_ids in packages if provided (immediate association) if req.packages: with get_db_session() as session: @@ -5321,9 +5551,34 @@ def format_display(url: str | None, fid: str) -> str: creatives_list = session.scalars(creative_stmt).all() creatives_map = {str(c.creative_id): c for c in creatives_list} + # Validate all creative IDs exist (match update_media_buy behavior) + found_creative_ids = set(creatives_map.keys()) + requested_creative_ids = set(all_creative_ids) + missing_ids = requested_creative_ids - found_creative_ids + + if missing_ids: + error_msg = f"Creative IDs not found: {', '.join(sorted(missing_ids))}" + logger.error(error_msg) + ctx_manager.update_workflow_step(step.step_id, status="failed", error_message=error_msg) + raise ToolError( + "CREATIVES_NOT_FOUND", + error_msg + ) + for i, package in enumerate(req.packages): if package.creative_ids: - package_id = f"{response.media_buy_id}_pkg_{i+1}" + # Use package_id from response (matches what's in media_packages table) + # NO FALLBACK - if adapter doesn't return package_id, fail loudly + package_id = None + if response.packages and i < len(response.packages): + package_id = response.packages[i].get("package_id") + logger.info(f"[DEBUG] Package {i}: response.packages[i] = {response.packages[i]}") + logger.info(f"[DEBUG] Package {i}: extracted package_id = {package_id}") + + if not package_id: + error_msg = f"Cannot assign creatives: Adapter did not return package_id for package {i}" + logger.error(error_msg) + raise ValueError(error_msg) # Get platform_line_item_id from response if available platform_line_item_id = None @@ -5337,10 +5592,9 @@ def format_display(url: str | None, fid: str) -> str: # Get creative from batch-loaded map creative = creatives_map.get(creative_id) + # This should never happen now due to validation above if not creative: - logger.warning( - f"Creative {creative_id} not found for package {package_id}, skipping assignment" - ) + logger.error(f"Creative {creative_id} not in map despite validation - this is a bug") continue # Create database assignment (always create, even if not yet uploaded to GAM) @@ -5422,36 +5676,66 @@ def format_display(url: str | None, fid: str) -> str: ) # Build packages list for response (AdCP v2.4 format) + # Use packages from adapter response (has package_ids) merged with request package fields response_packages = [] + + # Get adapter response packages (have package_ids) + adapter_packages = response.packages if response.packages else [] + for i, package in enumerate(req.packages): - # Serialize the package to dict to handle any nested Pydantic objects - # Use model_dump_internal to avoid validation that requires package_id (not set yet on request packages) + # Start with adapter response package (has package_id) + if i < len(adapter_packages): + # Get package_id and other fields from adapter response + response_package_dict = adapter_packages[i] if isinstance(adapter_packages[i], dict) else adapter_packages[i].model_dump() + else: + # Fallback if adapter didn't return enough packages + logger.warning(f"Adapter returned fewer packages than request. Using request package {i}") + response_package_dict = {} + + # CRITICAL: Save package_id from adapter response BEFORE merge + adapter_package_id = response_package_dict.get("package_id") + logger.info(f"[DEBUG] Package {i}: adapter_package_id from response = {adapter_package_id}") + + # Serialize the request package to get fields like buyer_ref, format_ids if hasattr(package, "model_dump_internal"): - package_dict = package.model_dump_internal() + request_package_dict = package.model_dump_internal() elif hasattr(package, "model_dump"): - # Fallback: use model_dump with exclude_none to avoid validation errors - package_dict = package.model_dump(exclude_none=True, mode="python") + request_package_dict = package.model_dump(exclude_none=True, mode="python") else: - package_dict = package + request_package_dict = package if isinstance(package, dict) else {} + + # Merge: Start with adapter response (has package_id), overlay request fields + package_dict = {**response_package_dict, **request_package_dict} + + # CRITICAL: Restore package_id from adapter (merge may have overwritten it with None from request) + if adapter_package_id: + package_dict["package_id"] = adapter_package_id + logger.info(f"[DEBUG] Package {i}: Forced package_id = {adapter_package_id}") + else: + # NO FALLBACK - adapter MUST return package_id + error_msg = f"Adapter did not return package_id for package {i}. Cannot build response." + logger.error(error_msg) + raise ValueError(error_msg) # Validate and convert format_ids (request field) to format_ids_to_provide (response field) - # Per AdCP spec: request has format_ids (array of FormatId), response has format_ids_to_provide (same) - # STRICT ENFORCEMENT: Only FormatId objects accepted, must be registered agents, formats must exist if "format_ids" in package_dict and package_dict["format_ids"]: validated_format_ids = await _validate_and_convert_format_ids( package_dict["format_ids"], tenant["tenant_id"], i ) package_dict["format_ids_to_provide"] = validated_format_ids - # Remove format_ids from response (only format_ids_to_provide should be in response) + # Remove format_ids from response del package_dict["format_ids"] - # Override/add response-specific fields (package_id and status are set by server) - response_package = { - **package_dict, - "package_id": f"{response.media_buy_id}_pkg_{i+1}", - "status": TaskStatus.WORKING, - } - response_packages.append(response_package) + # Determine package status + package_status = TaskStatus.WORKING + if package.creative_ids and len(package.creative_ids) > 0: + package_status = TaskStatus.COMPLETED + elif hasattr(package, 'format_ids_to_provide') and package.format_ids_to_provide: + package_status = TaskStatus.WORKING + + # Add status + package_dict["status"] = package_status + response_packages.append(package_dict) # Ensure buyer_ref is set (defensive check) buyer_ref_value = req.buyer_ref if req.buyer_ref else buyer_ref @@ -5990,7 +6274,14 @@ def _update_media_buy_impl( if currency_limit.max_daily_package_spend and req.packages: for pkg_update in req.packages: if pkg_update.budget: - package_budget = Decimal(str(pkg_update.budget)) + # Extract budget amount - handle both Budget object and legacy float + from src.core.schemas import Budget, extract_budget_amount + if isinstance(pkg_update.budget, Budget): + pkg_budget_amount, _ = extract_budget_amount(pkg_update.budget, request_currency) + else: + pkg_budget_amount = float(pkg_update.budget) + + package_budget = Decimal(str(pkg_budget_amount)) package_daily = package_budget / Decimal(str(flight_days)) if package_daily > currency_limit.max_daily_package_spend: @@ -6047,12 +6338,22 @@ def _update_media_buy_impl( # Handle budget updates if pkg_update.budget is not None: + # Extract budget amount - handle both Budget object and legacy float + from src.core.schemas import Budget, extract_budget_amount + + if isinstance(pkg_update.budget, Budget): + budget_amount, currency = extract_budget_amount(pkg_update.budget, "USD") + else: + # Legacy float format + budget_amount = float(pkg_update.budget) + currency = "USD" + result = adapter.update_media_buy( media_buy_id=req.media_buy_id, buyer_ref=req.buyer_ref or "", action="update_package_budget", package_id=pkg_update.package_id, - budget=int(pkg_update.budget), + budget=int(budget_amount), today=datetime.combine(today, datetime.min.time()), ) if result.errors: @@ -6066,15 +6367,68 @@ def _update_media_buy_impl( ) return result + # Track budget update in affected_packages + if not hasattr(req, "_affected_packages"): + req._affected_packages = [] + req._affected_packages.append( + { + "buyer_package_ref": pkg_update.package_id, + "changes_applied": { + "budget": { + "updated": budget_amount, + "currency": currency + } + }, + } + ) + # Handle creative_ids updates (AdCP v2.2.0+) if pkg_update.creative_ids is not None: + # Validate package_id is provided + if not pkg_update.package_id: + error_msg = "package_id is required when updating creative_ids" + ctx_manager.update_workflow_step(step.step_id, status="failed", error_message=error_msg) + return UpdateMediaBuyResponse( + media_buy_id=req.media_buy_id or "", + buyer_ref=req.buyer_ref or "", + errors=[{"code": "missing_package_id", "message": error_msg}], + ) + from sqlalchemy import select from src.core.database.database_session import get_db_session from src.core.database.models import Creative as DBCreative from src.core.database.models import CreativeAssignment as DBAssignment + from src.core.database.models import MediaBuy as MediaBuyModel with get_db_session() as session: + # Resolve media_buy_id (might be buyer_ref) + mb_stmt = select(MediaBuyModel).where( + MediaBuyModel.media_buy_id == req.media_buy_id, + MediaBuyModel.tenant_id == tenant["tenant_id"] + ) + media_buy_obj = session.scalars(mb_stmt).first() + + # Try buyer_ref if not found + if not media_buy_obj: + mb_stmt = select(MediaBuyModel).where( + MediaBuyModel.buyer_ref == req.media_buy_id, + MediaBuyModel.tenant_id == tenant["tenant_id"] + ) + media_buy_obj = session.scalars(mb_stmt).first() + + if not media_buy_obj: + error_msg = f"Media buy '{req.media_buy_id}' not found" + ctx_manager.update_workflow_step(step.step_id, status="failed", error_message=error_msg) + return UpdateMediaBuyResponse( + media_buy_id=req.media_buy_id or "", + buyer_ref=req.buyer_ref or "", + errors=[{"code": "media_buy_not_found", "message": error_msg}], + ) + + # Use the actual internal media_buy_id + actual_media_buy_id = media_buy_obj.media_buy_id + # Validate all creative IDs exist creative_stmt = select(DBCreative).where( DBCreative.tenant_id == tenant["tenant_id"], @@ -6096,7 +6450,7 @@ def _update_media_buy_impl( # Get existing assignments for this package assignment_stmt = select(DBAssignment).where( DBAssignment.tenant_id == tenant["tenant_id"], - DBAssignment.media_buy_id == req.media_buy_id, + DBAssignment.media_buy_id == actual_media_buy_id, DBAssignment.package_id == pkg_update.package_id, ) existing_assignments = session.scalars(assignment_stmt).all() @@ -6120,7 +6474,7 @@ def _update_media_buy_impl( assignment = DBAssignment( assignment_id=assignment_id, tenant_id=tenant["tenant_id"], - media_buy_id=req.media_buy_id, + media_buy_id=actual_media_buy_id, package_id=pkg_update.package_id, creative_id=creative_id, ) @@ -6178,6 +6532,42 @@ def _update_media_buy_impl( # Note: media_buys tuple stays as (CreateMediaBuyRequest, principal_id) + # Persist top-level budget update to database + from sqlalchemy import update + from src.core.database.models import MediaBuy + + with get_db_session() as db_session: + stmt = ( + update(MediaBuy) + .where(MediaBuy.media_buy_id == req.media_buy_id) + .values(budget=total_budget, currency=currency) + ) + db_session.execute(stmt) + db_session.commit() + logger.info(f"[update_media_buy] Updated MediaBuy {req.media_buy_id} budget to {total_budget} {currency}") + + # Track top-level budget update in affected_packages + # When top-level budget changes, all packages are affected + if not hasattr(req, "_affected_packages"): + req._affected_packages = [] + + # Get all packages for this media buy to report them as affected + if hasattr(existing_req, "packages") and existing_req.packages: + for pkg in existing_req.packages: + package_ref = pkg.package_id if hasattr(pkg, "package_id") and pkg.package_id else pkg.buyer_ref + if package_ref: + req._affected_packages.append( + { + "buyer_package_ref": package_ref, + "changes_applied": { + "budget": { + "updated": total_budget, + "currency": currency + } + }, + } + ) + # Note: Budget validation already done above (lines 4318-4336) # Package-level updates already handled above (lines 4266-4316) # Targeting updates are handled via packages (AdCP spec v2.4) @@ -6214,6 +6604,7 @@ def _update_media_buy_impl( # Build affected_packages from stored results affected_packages = getattr(req, "_affected_packages", []) + logger.info(f"[update_media_buy] Final affected_packages before return: {affected_packages}") return UpdateMediaBuyResponse( media_buy_id=req.media_buy_id or "", diff --git a/src/core/schemas.py b/src/core/schemas.py index 398e03531..d882c37e4 100644 --- a/src/core/schemas.py +++ b/src/core/schemas.py @@ -2657,6 +2657,9 @@ class MediaPackage(BaseModel): impressions: int format_ids: list[FormatId] # FormatId objects per AdCP spec targeting_overlay: Optional["Targeting"] = None + buyer_ref: Optional[str] = None # Optional buyer reference from request package + product_id: Optional[str] = None # Product ID for this package + budget: Optional["Budget"] = None # Budget information from request class PackagePerformance(BaseModel): @@ -2724,7 +2727,7 @@ class PackageUpdate(BaseModel): package_id: str active: bool | None = None # True to activate, False to pause - budget: float | None = None # New budget in dollars + budget: Budget | float | None = None # Budget object (AdCP spec) or legacy float impressions: int | None = None # Direct impression goal (overrides budget calculation) cpm: float | None = None # Update CPM rate daily_budget: float | None = None # Daily spend cap diff --git a/src/core/tools.py b/src/core/tools.py index c830395d0..d29de0703 100644 --- a/src/core/tools.py +++ b/src/core/tools.py @@ -41,6 +41,19 @@ GetSignalsRequest, ) +# Import all implementation functions from main.py at the top +from src.core.main import ( + _create_media_buy_impl, + _get_media_buy_delivery_impl, + _get_products_impl, + _list_authorized_properties_impl, + _list_creative_formats_impl, + _list_creatives_impl, + _sync_creatives_impl, + _update_media_buy_impl, + update_performance_index, # Note: This one doesn't follow _impl pattern yet +) + def get_principal_from_context(context: Context | None) -> str | None: """Extract principal ID from the FastMCP context or ToolContext. @@ -110,8 +123,6 @@ async def get_products_raw( Returns: GetProductsResponse containing matching products """ - # Use lazy import to avoid circular dependencies - from src.core.main import _get_products_impl from src.core.schema_helpers import create_get_products_request # Create request object using helper (handles generated schema variants) @@ -259,9 +270,6 @@ async def create_media_buy_raw( Returns: CreateMediaBuyResponse with media buy details """ - # Import here to avoid circular imports - from src.core.main import _create_media_buy_impl - # Call the shared implementation return await _create_media_buy_impl( buyer_ref=buyer_ref, @@ -315,9 +323,6 @@ def sync_creatives_raw( Returns: SyncCreativesResponse with synced creatives and assignments """ - # Import here to avoid circular imports - from src.core.main import _sync_creatives_impl - return _sync_creatives_impl( creatives=creatives, patch=patch, @@ -367,9 +372,6 @@ def list_creatives_raw( Returns: ListCreativesResponse with filtered creative assets and pagination info """ - # Import here to avoid circular imports - from src.core.main import _list_creatives_impl - return _list_creatives_impl( media_buy_id=media_buy_id, buyer_ref=buyer_ref, @@ -401,8 +403,6 @@ def list_creative_formats_raw( Returns: ListCreativeFormatsResponse with all available formats """ - from src.core.main import _list_creative_formats_impl - return _list_creative_formats_impl(req, context) @@ -413,8 +413,6 @@ def list_authorized_properties_raw( Delegates to shared implementation in main.py. """ - from src.core.main import _list_authorized_properties_impl - return _list_authorized_properties_impl(req, context) @@ -461,9 +459,6 @@ def update_media_buy_raw( Returns: UpdateMediaBuyResponse """ - # Import here to avoid circular imports - from src.core.main import _update_media_buy_impl - return _update_media_buy_impl( media_buy_id=media_buy_id, buyer_ref=buyer_ref, @@ -505,9 +500,6 @@ def get_media_buy_delivery_raw( Returns: GetMediaBuyDeliveryResponse with delivery metrics """ - # Import here to avoid circular imports - from src.core.main import _get_media_buy_delivery_impl - # Create request object req = GetMediaBuyDeliveryRequest( media_buy_ids=media_buy_ids, @@ -534,7 +526,4 @@ def update_performance_index_raw(media_buy_id: str, performance_data: list[dict[ Returns: UpdatePerformanceIndexResponse """ - # Import here to avoid circular imports - from src.core.main import _update_performance_index_impl - - return _update_performance_index_impl(media_buy_id, performance_data, context) + return update_performance_index(media_buy_id, performance_data, webhook_url=None, context=context) diff --git a/static/js/tenant_settings.js b/static/js/tenant_settings.js index 98fd0e90b..a7abe48e1 100644 --- a/static/js/tenant_settings.js +++ b/static/js/tenant_settings.js @@ -503,14 +503,27 @@ function saveBusinessRules() { fetch(`${config.scriptName}/tenant/${config.tenantId}/settings/business-rules`, { method: 'POST', - body: formData + body: formData, + redirect: 'manual' // Don't follow redirects automatically }) - .then(response => response.json()) - .then(data => { - if (data.success) { + .then(response => { + // Server returns 302 redirect on success, or 200/400 with JSON on error + if (response.type === 'opaqueredirect' || response.status === 302) { + // Success - redirect was initiated alert('Business rules saved successfully!'); + window.location.reload(); + } else if (response.ok) { + // Try to parse as JSON if not a redirect + return response.json().then(data => { + if (data.success) { + alert('Business rules saved successfully!'); + window.location.reload(); + } else { + alert('Error: ' + (data.error || data.message || 'Unknown error')); + } + }); } else { - alert('Error: ' + (data.error || data.message || 'Unknown error')); + throw new Error(`Server returned status ${response.status}`); } }) .catch(error => { diff --git a/templates/creative_management.html b/templates/creative_management.html index 715989500..cb0ac9846 100644 --- a/templates/creative_management.html +++ b/templates/creative_management.html @@ -67,7 +67,10 @@
No packages configured for this media buy.
+ {% endif %} +