diff --git a/src/adcp/types/base.py b/src/adcp/types/base.py index ccc28d2..40dce9f 100644 --- a/src/adcp/types/base.py +++ b/src/adcp/types/base.py @@ -2,10 +2,189 @@ """Base model for AdCP types with spec-compliant serialization.""" +from collections.abc import Callable from typing import Any from pydantic import BaseModel +# Type alias to shorten long type annotations +MessageFormatter = Callable[[Any], str] + + +def _pluralize(count: int, singular: str, plural: str | None = None) -> str: + """Return singular or plural form based on count.""" + if count == 1: + return singular + return plural if plural else f"{singular}s" + + +# Registry of human-readable message formatters for response types. +# Key is the class name, value is a callable that takes the instance and returns a message. +_RESPONSE_MESSAGE_REGISTRY: dict[str, MessageFormatter] = {} + + +def _register_response_message(cls_name: str) -> Callable[[MessageFormatter], MessageFormatter]: + """Decorator to register a message formatter for a response type.""" + + def decorator(func: MessageFormatter) -> MessageFormatter: + _RESPONSE_MESSAGE_REGISTRY[cls_name] = func + return func + + return decorator + + +# Response message formatters +@_register_response_message("GetProductsResponse") +def _get_products_message(self: Any) -> str: + products = getattr(self, "products", None) + if products is None or len(products) == 0: + return "No products matched your requirements." + count = len(products) + return f"Found {count} {_pluralize(count, 'product')} matching your requirements." + + +@_register_response_message("ListCreativeFormatsResponse") +def _list_creative_formats_message(self: Any) -> str: + formats = getattr(self, "formats", None) + if formats is None: + return "No creative formats found." + count = len(formats) + return f"Found {count} supported creative {_pluralize(count, 'format')}." + + +@_register_response_message("GetSignalsResponse") +def _get_signals_message(self: Any) -> str: + signals = getattr(self, "signals", None) + if signals is None: + return "No signals found." + count = len(signals) + return f"Found {count} {_pluralize(count, 'signal')} available for targeting." + + +@_register_response_message("ListAuthorizedPropertiesResponse") +def _list_authorized_properties_message(self: Any) -> str: + domains = getattr(self, "publisher_domains", None) + if domains is None: + return "No authorized properties found." + count = len(domains) + return f"Authorized to represent {count} publisher {_pluralize(count, 'domain')}." + + +@_register_response_message("ListCreativesResponse") +def _list_creatives_message(self: Any) -> str: + creatives = getattr(self, "creatives", None) + if creatives is None: + return "No creatives found." + count = len(creatives) + return f"Found {count} {_pluralize(count, 'creative')} in the system." + + +@_register_response_message("CreateMediaBuyResponse1") +def _create_media_buy_success_message(self: Any) -> str: + media_buy_id = getattr(self, "media_buy_id", None) + packages = getattr(self, "packages", None) + package_count = len(packages) if packages else 0 + return ( + f"Media buy {media_buy_id} created with " + f"{package_count} {_pluralize(package_count, 'package')}." + ) + + +@_register_response_message("CreateMediaBuyResponse2") +def _create_media_buy_error_message(self: Any) -> str: + errors = getattr(self, "errors", None) + error_count = len(errors) if errors else 0 + return f"Media buy creation failed with {error_count} {_pluralize(error_count, 'error')}." + + +@_register_response_message("UpdateMediaBuyResponse1") +def _update_media_buy_success_message(self: Any) -> str: + media_buy_id = getattr(self, "media_buy_id", None) + return f"Media buy {media_buy_id} updated successfully." + + +@_register_response_message("UpdateMediaBuyResponse2") +def _update_media_buy_error_message(self: Any) -> str: + errors = getattr(self, "errors", None) + error_count = len(errors) if errors else 0 + return f"Media buy update failed with {error_count} {_pluralize(error_count, 'error')}." + + +@_register_response_message("SyncCreativesResponse1") +def _sync_creatives_success_message(self: Any) -> str: + creatives = getattr(self, "creatives", None) + creative_count = len(creatives) if creatives else 0 + return f"Synced {creative_count} {_pluralize(creative_count, 'creative')} successfully." + + +@_register_response_message("SyncCreativesResponse2") +def _sync_creatives_error_message(self: Any) -> str: + errors = getattr(self, "errors", None) + error_count = len(errors) if errors else 0 + return f"Creative sync failed with {error_count} {_pluralize(error_count, 'error')}." + + +@_register_response_message("ActivateSignalResponse1") +def _activate_signal_success_message(self: Any) -> str: + return "Signal activated successfully." + + +@_register_response_message("ActivateSignalResponse2") +def _activate_signal_error_message(self: Any) -> str: + errors = getattr(self, "errors", None) + error_count = len(errors) if errors else 0 + return f"Signal activation failed with {error_count} {_pluralize(error_count, 'error')}." + + +@_register_response_message("PreviewCreativeResponse1") +def _preview_creative_single_message(self: Any) -> str: + previews = getattr(self, "previews", None) + preview_count = len(previews) if previews else 0 + return f"Generated {preview_count} {_pluralize(preview_count, 'preview')}." + + +@_register_response_message("PreviewCreativeResponse2") +def _preview_creative_batch_message(self: Any) -> str: + results = getattr(self, "results", None) + result_count = len(results) if results else 0 + return f"Generated previews for {result_count} {_pluralize(result_count, 'manifest')}." + + +@_register_response_message("BuildCreativeResponse1") +def _build_creative_success_message(self: Any) -> str: + return "Creative built successfully." + + +@_register_response_message("BuildCreativeResponse2") +def _build_creative_error_message(self: Any) -> str: + errors = getattr(self, "errors", None) + error_count = len(errors) if errors else 0 + return f"Creative build failed with {error_count} {_pluralize(error_count, 'error')}." + + +@_register_response_message("GetMediaBuyDeliveryResponse") +def _get_media_buy_delivery_message(self: Any) -> str: + deliveries = getattr(self, "media_buy_deliveries", None) + if deliveries is None: + return "No delivery data available." + count = len(deliveries) + return f"Retrieved delivery data for {count} media {_pluralize(count, 'buy', 'buys')}." + + +@_register_response_message("ProvidePerformanceFeedbackResponse1") +def _provide_performance_feedback_success_message(self: Any) -> str: + return "Performance feedback recorded successfully." + + +@_register_response_message("ProvidePerformanceFeedbackResponse2") +def _provide_performance_feedback_error_message(self: Any) -> str: + errors = getattr(self, "errors", None) + error_count = len(errors) if errors else 0 + return ( + f"Performance feedback recording failed with " + f"{error_count} {_pluralize(error_count, 'error')}." + ) + class AdCPBaseModel(BaseModel): """Base model for AdCP types with spec-compliant serialization. @@ -24,3 +203,17 @@ def model_dump_json(self, **kwargs: Any) -> str: if "exclude_none" not in kwargs: kwargs["exclude_none"] = True return super().model_dump_json(**kwargs) + + def summary(self) -> str: + """Human-readable summary for protocol responses. + + Returns a standardized human-readable message suitable for MCP tool + results, A2A task communications, and REST API responses. + + For types without a registered formatter, returns a generic message + with the class name. + """ + formatter = _RESPONSE_MESSAGE_REGISTRY.get(self.__class__.__name__) + if formatter: + return formatter(self) + return f"{self.__class__.__name__} response" diff --git a/tests/test_public_api.py b/tests/test_public_api.py index 52240ff..9f35b3c 100644 --- a/tests/test_public_api.py +++ b/tests/test_public_api.py @@ -282,3 +282,53 @@ def test_public_api_has_version(): assert hasattr(adcp, "__version__"), "adcp package should export __version__" assert isinstance(adcp.__version__, str), "__version__ should be a string" assert len(adcp.__version__) > 0, "__version__ should not be empty" + + +def test_list_creative_formats_request_has_filter_params(): + """ListCreativeFormatsRequest type has filter parameters per AdCP spec. + + The SDK supports is_responsive and name_search parameters for filtering + creative formats. These parameters are part of the AdCP specification. + """ + from adcp import ListCreativeFormatsRequest + + model_fields = ListCreativeFormatsRequest.model_fields + + # Core filter parameters from AdCP spec + expected_fields = [ + "is_responsive", # Filter for responsive formats + "name_search", # Search formats by name (case-insensitive partial match) + "asset_types", # Filter by asset types (image, video, etc.) + "type", # Filter by format category (display, video, etc.) + "format_ids", # Return only specific format IDs + "min_width", # Minimum width filter + "max_width", # Maximum width filter + "min_height", # Minimum height filter + "max_height", # Maximum height filter + "context", # Context object for request + "ext", # Extension object + ] + + for field_name in expected_fields: + assert field_name in model_fields, ( + f"ListCreativeFormatsRequest missing field: {field_name}" + ) + + +def test_list_creative_formats_request_filter_params_types(): + """ListCreativeFormatsRequest filter parameters have correct types.""" + from adcp import ListCreativeFormatsRequest + + # Create request with filter parameters - should not raise + request = ListCreativeFormatsRequest( + is_responsive=True, + name_search="mobile", + ) + + assert request.is_responsive is True + assert request.name_search == "mobile" + + # Verify serialization includes the filter parameters + data = request.model_dump(exclude_none=True) + assert data["is_responsive"] is True + assert data["name_search"] == "mobile" diff --git a/tests/test_response_str.py b/tests/test_response_str.py new file mode 100644 index 0000000..36dab72 --- /dev/null +++ b/tests/test_response_str.py @@ -0,0 +1,350 @@ +"""Tests for .summary() method on response types. + +These tests verify that response types return human-readable messages +suitable for MCP tool results, A2A task communications, and REST API responses. +""" + +from __future__ import annotations + +from adcp.types._generated import ( + ActivateSignalResponse1, + ActivateSignalResponse2, + BuildCreativeResponse1, + BuildCreativeResponse2, + CreateMediaBuyResponse1, + CreateMediaBuyResponse2, + GetMediaBuyDeliveryResponse, + GetProductsResponse, + GetSignalsResponse, + ListAuthorizedPropertiesResponse, + ListCreativeFormatsResponse, + ListCreativesResponse, + PreviewCreativeResponse1, + PreviewCreativeResponse2, + ProvidePerformanceFeedbackResponse1, + ProvidePerformanceFeedbackResponse2, + SyncCreativesResponse1, + SyncCreativesResponse2, + UpdateMediaBuyResponse1, + UpdateMediaBuyResponse2, +) + + +class TestGetProductsResponseMessage: + """Tests for GetProductsResponse.summary().""" + + def test_singular_product(self): + """Single product uses singular form.""" + response = GetProductsResponse.model_construct( + products=[{"product_id": "p1", "name": "Test"}] + ) + assert response.summary() == "Found 1 product matching your requirements." + + def test_multiple_products(self): + """Multiple products uses plural form.""" + response = GetProductsResponse.model_construct( + products=[ + {"product_id": "p1", "name": "Test 1"}, + {"product_id": "p2", "name": "Test 2"}, + {"product_id": "p3", "name": "Test 3"}, + ] + ) + assert response.summary() == "Found 3 products matching your requirements." + + def test_zero_products(self): + """Zero products uses conversational message.""" + response = GetProductsResponse.model_construct(products=[]) + assert response.summary() == "No products matched your requirements." + + +class TestListCreativeFormatsResponseMessage: + """Tests for ListCreativeFormatsResponse.summary().""" + + def test_singular_format(self): + """Single format uses singular form.""" + response = ListCreativeFormatsResponse.model_construct( + formats=[{"format_id": "f1", "name": "Banner"}] + ) + assert response.summary() == "Found 1 supported creative format." + + def test_multiple_formats(self): + """Multiple formats uses plural form.""" + response = ListCreativeFormatsResponse.model_construct( + formats=[ + {"format_id": "f1", "name": "Banner 1"}, + {"format_id": "f2", "name": "Banner 2"}, + ] + ) + assert response.summary() == "Found 2 supported creative formats." + + +class TestGetSignalsResponseMessage: + """Tests for GetSignalsResponse.summary().""" + + def test_singular_signal(self): + """Single signal uses singular form.""" + response = GetSignalsResponse.model_construct(signals=[{"signal_id": "s1"}]) + assert response.summary() == "Found 1 signal available for targeting." + + def test_multiple_signals(self): + """Multiple signals uses plural form.""" + response = GetSignalsResponse.model_construct( + signals=[{"signal_id": "s1"}, {"signal_id": "s2"}] + ) + assert response.summary() == "Found 2 signals available for targeting." + + +class TestListAuthorizedPropertiesResponseMessage: + """Tests for ListAuthorizedPropertiesResponse.summary().""" + + def test_singular_domain(self): + """Single domain uses singular form.""" + response = ListAuthorizedPropertiesResponse.model_construct( + publisher_domains=["example.com"] + ) + assert response.summary() == "Authorized to represent 1 publisher domain." + + def test_multiple_domains(self): + """Multiple domains uses plural form.""" + response = ListAuthorizedPropertiesResponse.model_construct( + publisher_domains=["example.com", "test.com", "demo.com"] + ) + assert response.summary() == "Authorized to represent 3 publisher domains." + + +class TestListCreativesResponseMessage: + """Tests for ListCreativesResponse.summary().""" + + def test_singular_creative(self): + """Single creative uses singular form.""" + response = ListCreativesResponse.model_construct( + creatives=[{"creative_id": "c1"}] + ) + assert response.summary() == "Found 1 creative in the system." + + def test_multiple_creatives(self): + """Multiple creatives uses plural form.""" + response = ListCreativesResponse.model_construct( + creatives=[{"creative_id": "c1"}, {"creative_id": "c2"}] + ) + assert response.summary() == "Found 2 creatives in the system." + + +class TestCreateMediaBuyResponseMessage: + """Tests for CreateMediaBuyResponse success/error variants.""" + + def test_success_singular_package(self): + """Success with single package.""" + response = CreateMediaBuyResponse1.model_construct( + media_buy_id="mb_123", + buyer_ref="ref_456", + packages=[{"package_id": "pkg_1"}], + ) + assert response.summary() == "Media buy mb_123 created with 1 package." + + def test_success_multiple_packages(self): + """Success with multiple packages.""" + response = CreateMediaBuyResponse1.model_construct( + media_buy_id="mb_456", + buyer_ref="ref_789", + packages=[{"package_id": "pkg_1"}, {"package_id": "pkg_2"}], + ) + assert response.summary() == "Media buy mb_456 created with 2 packages." + + def test_error_singular(self): + """Error with single error.""" + response = CreateMediaBuyResponse2.model_construct( + errors=[{"code": "invalid", "message": "Failed"}] + ) + assert response.summary() == "Media buy creation failed with 1 error." + + def test_error_multiple(self): + """Error with multiple errors.""" + response = CreateMediaBuyResponse2.model_construct( + errors=[ + {"code": "invalid", "message": "Error 1"}, + {"code": "invalid", "message": "Error 2"}, + ] + ) + assert response.summary() == "Media buy creation failed with 2 errors." + + +class TestUpdateMediaBuyResponseMessage: + """Tests for UpdateMediaBuyResponse success/error variants.""" + + def test_success(self): + """Success message includes media buy ID.""" + response = UpdateMediaBuyResponse1.model_construct( + media_buy_id="mb_789", + packages=[], + ) + assert response.summary() == "Media buy mb_789 updated successfully." + + def test_error(self): + """Error message includes error count.""" + response = UpdateMediaBuyResponse2.model_construct( + errors=[{"code": "not_found", "message": "Not found"}] + ) + assert response.summary() == "Media buy update failed with 1 error." + + +class TestSyncCreativesResponseMessage: + """Tests for SyncCreativesResponse success/error variants.""" + + def test_success_singular(self): + """Success with single creative synced.""" + response = SyncCreativesResponse1.model_construct( + creatives=[{"creative_id": "c1", "action": "created"}] + ) + assert response.summary() == "Synced 1 creative successfully." + + def test_success_multiple(self): + """Success with multiple creatives synced.""" + response = SyncCreativesResponse1.model_construct( + creatives=[ + {"creative_id": "c1", "action": "created"}, + {"creative_id": "c2", "action": "updated"}, + {"creative_id": "c3", "action": "created"}, + ] + ) + assert response.summary() == "Synced 3 creatives successfully." + + def test_error(self): + """Error message includes error count.""" + response = SyncCreativesResponse2.model_construct( + errors=[{"code": "sync_failed", "message": "Failed"}] + ) + assert response.summary() == "Creative sync failed with 1 error." + + +class TestActivateSignalResponseMessage: + """Tests for ActivateSignalResponse success/error variants.""" + + def test_success(self): + """Success message is simple confirmation.""" + response = ActivateSignalResponse1.model_construct( + activation_status="active" + ) + assert response.summary() == "Signal activated successfully." + + def test_error(self): + """Error message includes error count.""" + response = ActivateSignalResponse2.model_construct( + errors=[{"code": "activation_failed", "message": "Failed"}] + ) + assert response.summary() == "Signal activation failed with 1 error." + + +class TestPreviewCreativeResponseMessage: + """Tests for PreviewCreativeResponse single/batch variants.""" + + def test_single_singular(self): + """Single request with one preview.""" + response = PreviewCreativeResponse1.model_construct( + response_type="single", + expires_at="2025-12-01T00:00:00Z", + previews=[{"preview_id": "p1"}], + ) + assert response.summary() == "Generated 1 preview." + + def test_single_multiple(self): + """Single request with multiple previews.""" + response = PreviewCreativeResponse1.model_construct( + response_type="single", + expires_at="2025-12-01T00:00:00Z", + previews=[{"preview_id": "p1"}, {"preview_id": "p2"}], + ) + assert response.summary() == "Generated 2 previews." + + def test_batch_singular(self): + """Batch request with one manifest.""" + response = PreviewCreativeResponse2.model_construct( + response_type="batch", + results=[{"manifest_id": "m1"}], + ) + assert response.summary() == "Generated previews for 1 manifest." + + def test_batch_multiple(self): + """Batch request with multiple manifests.""" + response = PreviewCreativeResponse2.model_construct( + response_type="batch", + results=[{"manifest_id": "m1"}, {"manifest_id": "m2"}], + ) + assert response.summary() == "Generated previews for 2 manifests." + + +class TestBuildCreativeResponseMessage: + """Tests for BuildCreativeResponse success/error variants.""" + + def test_success(self): + """Success message is simple confirmation.""" + response = BuildCreativeResponse1.model_construct( + assets=[{"url": "https://example.com/asset"}] + ) + assert response.summary() == "Creative built successfully." + + def test_error(self): + """Error message includes error count.""" + response = BuildCreativeResponse2.model_construct( + errors=[{"code": "build_failed", "message": "Failed"}] + ) + assert response.summary() == "Creative build failed with 1 error." + + +class TestGetMediaBuyDeliveryResponseMessage: + """Tests for GetMediaBuyDeliveryResponse.summary().""" + + def test_with_single_media_buy(self): + """Response with single media buy delivery data.""" + response = GetMediaBuyDeliveryResponse.model_construct( + media_buy_deliveries=[{"media_buy_id": "mb_123"}] + ) + assert response.summary() == "Retrieved delivery data for 1 media buy." + + def test_with_multiple_media_buys(self): + """Response with multiple media buy delivery data.""" + response = GetMediaBuyDeliveryResponse.model_construct( + media_buy_deliveries=[ + {"media_buy_id": "mb_123"}, + {"media_buy_id": "mb_456"}, + ] + ) + assert response.summary() == "Retrieved delivery data for 2 media buys." + + +class TestProvidePerformanceFeedbackResponseMessage: + """Tests for ProvidePerformanceFeedbackResponse success/error variants.""" + + def test_success(self): + """Success message is simple confirmation.""" + response = ProvidePerformanceFeedbackResponse1.model_construct( + acknowledged=True + ) + assert response.summary() == "Performance feedback recorded successfully." + + def test_error(self): + """Error message includes error count.""" + response = ProvidePerformanceFeedbackResponse2.model_construct( + errors=[{"code": "feedback_failed", "message": "Failed"}] + ) + assert response.summary() == "Performance feedback recording failed with 1 error." + + +class TestNonResponseTypeMessage: + """Tests for .summary() on non-response types.""" + + def test_request_type_returns_generic_message(self): + """Request types return generic message with class name.""" + from adcp.types import GetProductsRequest + + request = GetProductsRequest(brief="Test brief") + assert request.summary() == "GetProductsRequest response" + + def test_str_returns_pydantic_default(self): + """str() returns Pydantic's default representation for inspection.""" + from adcp.types import GetProductsRequest + + request = GetProductsRequest(brief="Test brief") + result = str(request) + # Should be Pydantic's default format, not a custom message + assert "GetProductsRequest" in result or "brief=" in result diff --git a/tests/test_simple_api.py b/tests/test_simple_api.py index f74d098..c6d4071 100644 --- a/tests/test_simple_api.py +++ b/tests/test_simple_api.py @@ -98,6 +98,44 @@ async def test_list_creative_formats_simple_api(): assert result.formats[0].format_id["id"] == "banner_300x250" +@pytest.mark.asyncio +async def test_list_creative_formats_with_filter_params(): + """Test client.simple.list_creative_formats with filter parameters. + + The SDK supports is_responsive and name_search parameters per the AdCP spec. + """ + from adcp.types import ListCreativeFormatsRequest + from adcp.types._generated import Format + + # Create mock response + mock_format = Format.model_construct( + format_id={"id": "responsive_banner"}, + name="Mobile Responsive Banner", + description="Responsive banner for mobile", + ) + mock_response = ListCreativeFormatsResponse.model_construct(formats=[mock_format]) + mock_result = TaskResult[ListCreativeFormatsResponse]( + status=TaskStatus.COMPLETED, data=mock_response, success=True + ) + + with patch.object(test_agent, "list_creative_formats", new=AsyncMock(return_value=mock_result)): + # Call with filter parameters + result = await test_agent.simple.list_creative_formats( + is_responsive=True, + name_search="mobile", + ) + + # Verify it returns unwrapped data + assert isinstance(result, ListCreativeFormatsResponse) + + # Verify the underlying call included the filter parameters + test_agent.list_creative_formats.assert_called_once() + call_args = test_agent.list_creative_formats.call_args[0][0] + assert isinstance(call_args, ListCreativeFormatsRequest) + assert call_args.is_responsive is True + assert call_args.name_search == "mobile" + + def test_simple_api_exists_on_client(): """Test that all clients have a .simple accessor.""" from adcp.testing import creative_agent, test_agent_a2a