diff --git a/alembic/versions/0d4fe6eb03ab_migrate_format_id_to_id_in_product_.py b/alembic/versions/0d4fe6eb03ab_migrate_format_id_to_id_in_product_.py new file mode 100644 index 000000000..ce2856756 --- /dev/null +++ b/alembic/versions/0d4fe6eb03ab_migrate_format_id_to_id_in_product_.py @@ -0,0 +1,98 @@ +"""Migrate format_id to id in product formats + +Revision ID: 0d4fe6eb03ab +Revises: 6ac7f95b69c6 +Create Date: 2025-10-16 13:11:37.726255 + +This migration fixes a schema inconsistency where product formats stored +format_id instead of id (the AdCP spec-compliant field name). + +The FormatReference Pydantic model uses serialization_alias="id" to ensure +JSON serialization uses "id", but older database records have "format_id". + +This migration: +1. Renames "format_id" → "id" in all format objects in products.formats JSONB column +2. Preserves all other fields (agent_url, etc.) +3. Handles both cases: format_id and id (idempotent) + +""" + +from collections.abc import Sequence + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "0d4fe6eb03ab" +down_revision: str | Sequence[str] | None = "6ac7f95b69c6" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Migrate format_id to id in product formats JSONB.""" + + # PostgreSQL JSONB update to rename format_id → id in each format object + # This uses jsonb_set and jsonb_build_object to transform each array element + op.execute( + """ + UPDATE products + SET formats = ( + SELECT jsonb_agg( + CASE + -- If format has format_id but not id, rename it + WHEN format_obj ? 'format_id' AND NOT format_obj ? 'id' THEN + jsonb_set( + format_obj - 'format_id', + '{id}', + format_obj->'format_id' + ) + -- Otherwise keep as-is (already has id, or missing both) + ELSE format_obj + END + ) + FROM jsonb_array_elements(formats) AS format_obj + ) + WHERE formats IS NOT NULL + AND formats != '[]'::jsonb + -- Only update products that have format_id (not id) in any format + AND EXISTS ( + SELECT 1 + FROM jsonb_array_elements(formats) AS fmt + WHERE fmt ? 'format_id' AND NOT fmt ? 'id' + ) + """ + ) + + +def downgrade() -> None: + """Revert id back to format_id in product formats JSONB.""" + + # Reverse migration: rename id → format_id + op.execute( + """ + UPDATE products + SET formats = ( + SELECT jsonb_agg( + CASE + -- If format has id but not format_id, rename it back + WHEN format_obj ? 'id' AND NOT format_obj ? 'format_id' THEN + jsonb_set( + format_obj - 'id', + '{format_id}', + format_obj->'id' + ) + -- Otherwise keep as-is + ELSE format_obj + END + ) + FROM jsonb_array_elements(formats) AS format_obj + ) + WHERE formats IS NOT NULL + AND formats != '[]'::jsonb + AND EXISTS ( + SELECT 1 + FROM jsonb_array_elements(formats) AS fmt + WHERE fmt ? 'id' AND NOT fmt ? 'format_id' + ) + """ + ) diff --git a/scripts/cleanup_legacy_products.py b/scripts/cleanup_legacy_products.py new file mode 100644 index 000000000..451025401 --- /dev/null +++ b/scripts/cleanup_legacy_products.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +"""Script to identify and optionally clean up legacy products. + +Legacy products are identified by: +1. String format IDs (instead of proper FormatId dict structure with agent_url) +2. Missing or invalid agent_url in formats +3. Using format_id instead of id (AdCP spec requires id) + +Usage: + python scripts/cleanup_legacy_products.py --dry-run # List legacy products + python scripts/cleanup_legacy_products.py --delete # Delete legacy products + python scripts/cleanup_legacy_products.py --fix # Convert to proper FormatId structure + +Note: The format_id → id migration is handled by Alembic migration 0d4fe6eb03ab. + This script identifies other issues (string IDs, missing agent_url, etc.) +""" + +import argparse +import logging +import sys +from pathlib import Path + +# Add project root to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import select + +from src.core.database.database_session import get_db_session +from src.core.database.models import Product + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def identify_legacy_products(tenant_id=None): + """Identify products with legacy format data. + + Args: + tenant_id: Optional tenant ID to filter by + + Returns: + List of (product, issues) tuples where issues is a list of problem descriptions + """ + legacy_products = [] + + with get_db_session() as session: + stmt = select(Product) + if tenant_id: + stmt = stmt.filter_by(tenant_id=tenant_id) + + products = session.scalars(stmt).all() + + for product in products: + issues = [] + + if not product.formats: + issues.append("No formats defined") + continue + + # Check each format + for idx, fmt in enumerate(product.formats): + if isinstance(fmt, str): + # Legacy: plain string format ID + issues.append(f"Format {idx}: String format ID '{fmt}' (should be dict)") + elif isinstance(fmt, dict): + # Check for required fields + if "agent_url" not in fmt: + issues.append(f"Format {idx}: Missing agent_url field") + if "id" not in fmt and "format_id" not in fmt: + issues.append(f"Format {idx}: Missing format_id field") + else: + issues.append(f"Format {idx}: Invalid type {type(fmt)}") + + if issues: + legacy_products.append((product, issues)) + + return legacy_products + + +def delete_legacy_products(legacy_products): + """Delete legacy products from database. + + Args: + legacy_products: List of (product, issues) tuples + """ + with get_db_session() as session: + for product, _issues in legacy_products: + logger.info(f"Deleting product {product.product_id} ({product.name})") + # Re-fetch product in this session + stmt = select(Product).filter_by(tenant_id=product.tenant_id, product_id=product.product_id) + product_in_session = session.scalars(stmt).first() + if product_in_session: + session.delete(product_in_session) + + session.commit() + logger.info(f"Deleted {len(legacy_products)} legacy products") + + +def fix_legacy_products(legacy_products, default_agent_url="http://localhost:8888"): + """Attempt to fix legacy products by converting string formats to proper structure. + + Args: + legacy_products: List of (product, issues) tuples + default_agent_url: Default agent URL to use for formats without one + """ + with get_db_session() as session: + for product, _issues in legacy_products: + logger.info(f"Fixing product {product.product_id} ({product.name})") + + # Re-fetch product in this session + stmt = select(Product).filter_by(tenant_id=product.tenant_id, product_id=product.product_id) + product_in_session = session.scalars(stmt).first() + if not product_in_session: + logger.warning(f"Product {product.product_id} not found in session") + continue + + fixed_formats = [] + for fmt in product_in_session.formats: + if isinstance(fmt, str): + # Convert string to proper FormatId structure + fixed_formats.append({"agent_url": default_agent_url, "id": fmt}) + elif isinstance(fmt, dict): + # Fix missing fields + fixed_fmt = dict(fmt) + if "agent_url" not in fixed_fmt: + fixed_fmt["agent_url"] = default_agent_url + if "id" not in fixed_fmt and "format_id" in fixed_fmt: + # Rename format_id to id (AdCP spec) + fixed_fmt["id"] = fixed_fmt.pop("format_id") + elif "id" not in fixed_fmt: + logger.error(f"Cannot fix format: {fmt} (no format_id or id field)") + continue + fixed_formats.append(fixed_fmt) + else: + logger.error(f"Cannot fix format: {fmt} (invalid type {type(fmt)})") + + if fixed_formats: + product_in_session.formats = fixed_formats + from sqlalchemy.orm import attributes + + attributes.flag_modified(product_in_session, "formats") + logger.info(f"Fixed {len(fixed_formats)} formats for product {product.product_id}") + + session.commit() + logger.info(f"Fixed {len(legacy_products)} legacy products") + + +def main(): + parser = argparse.ArgumentParser(description="Identify and clean up legacy products") + parser.add_argument("--tenant-id", help="Filter by tenant ID") + parser.add_argument( + "--dry-run", action="store_true", default=True, help="List legacy products without making changes (default)" + ) + parser.add_argument("--delete", action="store_true", help="Delete legacy products") + parser.add_argument( + "--fix", action="store_true", help="Fix legacy products by converting to proper FormatId structure" + ) + parser.add_argument( + "--default-agent-url", + default="http://localhost:8888", + help="Default agent URL for fixing products (default: http://localhost:8888)", + ) + + args = parser.parse_args() + + # Identify legacy products + logger.info("Scanning for legacy products...") + legacy_products = identify_legacy_products(args.tenant_id) + + if not legacy_products: + logger.info("✓ No legacy products found!") + return 0 + + # Print report + logger.info(f"\nFound {len(legacy_products)} legacy products:\n") + for product, issues in legacy_products: + logger.info(f"Product: {product.product_id} ({product.name})") + logger.info(f" Tenant: {product.tenant_id}") + for issue in issues: + logger.info(f" ⚠️ {issue}") + logger.info("") + + # Take action based on flags + if args.delete: + logger.info("Deleting legacy products...") + delete_legacy_products(legacy_products) + logger.info("✓ Done!") + elif args.fix: + logger.info(f"Fixing legacy products (using default agent URL: {args.default_agent_url})...") + fix_legacy_products(legacy_products, args.default_agent_url) + logger.info("✓ Done!") + else: + logger.info("Dry run - no changes made. Use --delete or --fix to make changes.") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/admin/blueprints/products.py b/src/admin/blueprints/products.py index 4de6f0567..7f28daeb7 100644 --- a/src/admin/blueprints/products.py +++ b/src/admin/blueprints/products.py @@ -802,13 +802,23 @@ def edit_product(tenant_id, product_id): return redirect(url_for("products.edit_product", tenant_id=tenant_id, product_id=product_id)) product.formats = formats + # Flag JSONB column as modified so SQLAlchemy generates UPDATE + from sqlalchemy.orm import attributes + + attributes.flag_modified(product, "formats") # Parse countries - from multi-select countries_list = request.form.getlist("countries") if countries_list and "ALL" not in countries_list: product.countries = countries_list + from sqlalchemy.orm import attributes + + attributes.flag_modified(product, "countries") else: product.countries = None + from sqlalchemy.orm import attributes + + attributes.flag_modified(product, "countries") # Get pricing based on line item type (GAM form) or delivery type (other adapters) line_item_type = form_data.get("line_item_type") diff --git a/tests/integration/test_product_formats_update.py b/tests/integration/test_product_formats_update.py new file mode 100644 index 000000000..2aa2692e2 --- /dev/null +++ b/tests/integration/test_product_formats_update.py @@ -0,0 +1,143 @@ +"""Test that product format updates are properly saved to database. + +This tests the bug fix for JSONB columns not being flagged as modified. +""" + +import pytest +from sqlalchemy import select +from sqlalchemy.orm import attributes + +from src.core.database.models import Product + + +@pytest.fixture +def sample_product(integration_db): + """Create a sample product for testing.""" + from src.core.database.database_session import get_db_session + from src.core.database.models import Tenant + + with get_db_session() as session: + # Create tenant + tenant = Tenant( + tenant_id="test_tenant", + name="Test Tenant", + subdomain="test", + ) + session.add(tenant) + session.flush() + + # Create product with initial formats + product = Product( + tenant_id="test_tenant", + product_id="test_product", + name="Test Product", + description="Test Description", + formats=[ + {"agent_url": "http://localhost:8888", "id": "old_format_1"}, + {"agent_url": "http://localhost:8888", "id": "old_format_2"}, + ], + targeting_template={}, + delivery_type="guaranteed", + property_tags=["all_inventory"], + ) + session.add(product) + session.commit() + + return product.product_id + + +@pytest.mark.requires_db +def test_product_formats_update_with_flag_modified(integration_db, sample_product): + """Test that updating product.formats with flag_modified saves changes.""" + from src.core.database.database_session import get_db_session + + # Update the product's formats + with get_db_session() as session: + stmt = select(Product).filter_by(product_id=sample_product) + product = session.scalars(stmt).first() + assert product is not None + + # Update formats + product.formats = [ + {"agent_url": "http://localhost:8888", "id": "new_format_1"}, + {"agent_url": "http://localhost:8888", "id": "new_format_2"}, + {"agent_url": "http://localhost:8888", "id": "new_format_3"}, + ] + + # Flag as modified (this is the fix) + attributes.flag_modified(product, "formats") + + session.commit() + + # Verify the changes were saved + with get_db_session() as session: + stmt = select(Product).filter_by(product_id=sample_product) + product = session.scalars(stmt).first() + assert product is not None + assert len(product.formats) == 3 + assert product.formats[0]["id"] == "new_format_1" + assert product.formats[1]["id"] == "new_format_2" + assert product.formats[2]["id"] == "new_format_3" + + +@pytest.mark.requires_db +def test_product_formats_update_without_flag_modified_fails(integration_db, sample_product): + """Test that updating product.formats WITHOUT flag_modified does NOT save changes. + + This demonstrates the bug that was fixed. + """ + from src.core.database.database_session import get_db_session + + # Update the product's formats WITHOUT flag_modified + with get_db_session() as session: + stmt = select(Product).filter_by(product_id=sample_product) + product = session.scalars(stmt).first() + assert product is not None + + # Update formats + product.formats = [ + {"agent_url": "http://localhost:8888", "id": "should_not_save_1"}, + {"agent_url": "http://localhost:8888", "id": "should_not_save_2"}, + ] + + # NOTE: NOT calling flag_modified - this is the bug + # attributes.flag_modified(product, "formats") + + session.commit() + + # Verify the changes were NOT saved (demonstrates the bug) + with get_db_session() as session: + stmt = select(Product).filter_by(product_id=sample_product) + product = session.scalars(stmt).first() + assert product is not None + # Changes were not saved - still has old formats + assert len(product.formats) == 2 + assert product.formats[0]["id"] == "old_format_1" + assert product.formats[1]["id"] == "old_format_2" + + +@pytest.mark.requires_db +def test_product_countries_update_with_flag_modified(integration_db, sample_product): + """Test that updating product.countries with flag_modified saves changes.""" + from src.core.database.database_session import get_db_session + + # Update the product's countries + with get_db_session() as session: + stmt = select(Product).filter_by(product_id=sample_product) + product = session.scalars(stmt).first() + assert product is not None + + # Update countries + product.countries = ["US", "CA", "GB"] + + # Flag as modified (this is the fix) + attributes.flag_modified(product, "countries") + + session.commit() + + # Verify the changes were saved + with get_db_session() as session: + stmt = select(Product).filter_by(product_id=sample_product) + product = session.scalars(stmt).first() + assert product is not None + assert product.countries == ["US", "CA", "GB"]