From e580413d954fb9a31b6b078c1307724c54d38c37 Mon Sep 17 00:00:00 2001 From: Anton Karpets Date: Wed, 7 Feb 2024 11:04:56 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9BSource=20Amazon=20Seller=20Partner:?= =?UTF-8?q?=20fix=20date=20formatting=20for=20ledger=20reports=20with=20ag?= =?UTF-8?q?gregation=20by=20month=20(#34914)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../metadata.yaml | 2 +- .../schemas/GET_LEDGER_SUMMARY_VIEW_DATA.json | 2 +- .../source_amazon_seller_partner/streams.py | 63 +++++++++++-------- .../unit_tests/test_analytics_streams.py | 8 +-- .../unit_tests/test_streams.py | 25 +++++--- .../unit_tests/test_transform_function.py | 3 + .../sources/amazon-seller-partner.md | 1 + 7 files changed, 62 insertions(+), 42 deletions(-) diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/metadata.yaml b/airbyte-integrations/connectors/source-amazon-seller-partner/metadata.yaml index 74c48e7535cc..44f10006bdeb 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/metadata.yaml +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/metadata.yaml @@ -15,7 +15,7 @@ data: connectorSubtype: api connectorType: source definitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460 - dockerImageTag: 3.2.1 + dockerImageTag: 3.2.2 dockerRepository: airbyte/source-amazon-seller-partner documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-seller-partner githubIssueLabel: source-amazon-seller-partner diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/schemas/GET_LEDGER_SUMMARY_VIEW_DATA.json b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/schemas/GET_LEDGER_SUMMARY_VIEW_DATA.json index 5b7d9aa7c908..88a3305e3165 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/schemas/GET_LEDGER_SUMMARY_VIEW_DATA.json +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/schemas/GET_LEDGER_SUMMARY_VIEW_DATA.json @@ -4,7 +4,7 @@ "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "properties": { - "Date": { "type": ["null", "string"], "format": "date-time" }, + "Date": { "type": ["null", "string"], "format": "date" }, "FNSKU": { "type": ["null", "string"] }, "ASIN": { "type": ["null", "string"] }, "MSKU": { "type": ["null", "string"] }, diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py index bffea989b12f..2d54ae6ca078 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py @@ -122,7 +122,7 @@ def parse_response( self, response: requests.Response, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs ) -> Iterable[Mapping]: """ - :return an iterable containing each record in the response + Return an iterable containing each record in the response """ yield from response.json().get(self.data_field, []) @@ -146,10 +146,8 @@ class ReportProcessingStatus(str, Enum): class ReportsAmazonSPStream(HttpStream, ABC): - max_wait_seconds = 3600 """ - API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/reports-api/reports_2020-09-04.md - API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/reports-api-model/reports_2020-09-04.json + API docs: https://developer-docs.amazon.com/sp-api/docs/reports-api-v2021-06-30-reference Report streams are intended to work as following: - create a new report; @@ -160,16 +158,17 @@ class ReportsAmazonSPStream(HttpStream, ABC): - yield the report document (if report processing status is `DONE`) """ - replication_start_date_limit_in_days = 90 + max_wait_seconds = 3600 + replication_start_date_limit_in_days = 365 primary_key = None path_prefix = f"reports/{REPORTS_API_VERSION}" sleep_seconds = 30 data_field = "payload" result_key = None - availability_sla_days = ( - 1 # see data availability sla at https://developer-docs.amazon.com/sp-api/docs/report-type-values#vendor-retail-analytics-reports - ) + + # see data availability sla at https://developer-docs.amazon.com/sp-api/docs/report-type-values#vendor-retail-analytics-reports + availability_sla_days = 1 availability_strategy = None def __init__( @@ -203,6 +202,11 @@ def http_method(self) -> str: def http_method(self, value: str): self._http_method = value + @property + def retry_factor(self) -> float: + # https://developer-docs.amazon.com/sp-api/docs/reports-api-v2021-06-30-reference#post-reports2021-06-30reports + return 60.0 + @property def url_base(self) -> str: return self._url_base @@ -317,7 +321,7 @@ def read_records( ) -> Iterable[Mapping[str, Any]]: """ Create and retrieve the report. - Decrypt and parse the report is its fully proceed, then yield the report document records. + Decrypt and parse the report if it's fully processed, then yield the report document records. """ report_payload = {} stream_slice = stream_slice or {} @@ -365,11 +369,16 @@ def read_records( record["dataEndTime"] = report_end_date.strftime(DATE_FORMAT) yield record elif processing_status == ReportProcessingStatus.FATAL: - raise AirbyteTracedException(message=f"The report for stream '{self.name}' was not created - skip reading") + raise AirbyteTracedException( + internal_message=( + f"Failed to retrieve the report '{self.name}' for period {stream_slice['dataStartTime']}-{stream_slice['dataEndTime']} " + "due to Amazon Seller Partner platform issues. This will be read during the next sync." + ) + ) elif processing_status == ReportProcessingStatus.CANCELLED: - logger.warning(f"The report for stream '{self.name}' was cancelled or there is no data to return") + logger.warning(f"The report for stream '{self.name}' was cancelled or there is no data to return.") else: - raise Exception(f"Unknown response for stream '{self.name}'. Response body {report_payload}") + raise Exception(f"Unknown response for stream '{self.name}'. Response body: {report_payload}.") class IncrementalReportsAmazonSPStream(ReportsAmazonSPStream): @@ -447,6 +456,8 @@ class FulfilledShipmentsReports(IncrementalReportsAmazonSPStream): name = "GET_AMAZON_FULFILLED_SHIPMENTS_DATA_GENERAL" + # You can request up to one month of data in a single report + # https://developer-docs.amazon.com/sp-api/docs/report-type-values-fba#fba-sales-reports replication_start_date_limit_in_days = 30 @@ -560,6 +571,10 @@ class StrandedInventoryUiReport(IncrementalReportsAmazonSPStream): class XmlAllOrdersDataByOrderDataGeneral(IncrementalReportsAmazonSPStream): + name = "GET_XML_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL" + primary_key = "AmazonOrderID" + cursor_field = "LastUpdatedDate" + def parse_document(self, document): try: parsed = xmltodict.parse(document, attr_prefix="", cdata_key="value", force_list={"Message", "OrderItem"}) @@ -575,10 +590,6 @@ def parse_document(self, document): return result - name = "GET_XML_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL" - primary_key = "AmazonOrderID" - cursor_field = "LastUpdatedDate" - class MerchantListingsReportBackCompat(MerchantReports): name = "GET_MERCHANT_LISTINGS_DATA_BACK_COMPAT" @@ -627,6 +638,8 @@ class FlatFileArchivedOrdersDataByOrderDate(IncrementalReportsAmazonSPStream): class FlatFileReturnsDataByReturnDate(IncrementalReportsAmazonSPStream): name = "GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE" + # You can request up to 60 days of data in a single report + # https://developer-docs.amazon.com/sp-api/docs/report-type-values-returns replication_start_date_limit_in_days = 60 @@ -686,7 +699,6 @@ def _augmented_data(self, report_options) -> Mapping[str, Any]: class IncrementalAnalyticsStream(AnalyticsStream): - fixed_period_in_days = 0 @property @@ -714,7 +726,6 @@ def _report_data( def parse_response( self, response: requests.Response, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs ) -> Iterable[Mapping]: - payload = response.json() document = self.download_and_decompress_report_document(payload) @@ -930,7 +941,7 @@ class FlatFileOrdersReportsByLastUpdate(IncrementalReportsAmazonSPStream): class Orders(IncrementalAmazonSPStream): """ - API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/orders-api/ordersV0.md + API docs: https://developer-docs.amazon.com/sp-api/docs/orders-api-v0-reference API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/orders-api-model/ordersV0.json """ @@ -1050,10 +1061,11 @@ def __init__(self, *args, **kwargs): @staticmethod def get_transform_function(): - def transform_function(original_value: Any, field_schema: Dict[str, Any]) -> Any: + def transform_function(original_value: str, field_schema: Dict[str, Any]) -> str: if original_value and field_schema.get("format") == "date": + date_format = "MM/YYYY" if len(original_value) <= 7 else "MM/DD/YYYY" try: - transformed_value = pendulum.from_format(original_value, "MM/DD/YYYY").to_date_string() + transformed_value = pendulum.from_format(original_value, date_format).to_date_string() return transformed_value except ValueError: pass @@ -1068,7 +1080,7 @@ class LedgerSummaryViewReport(LedgerDetailedViewReports): class VendorDirectFulfillmentShipping(IncrementalAmazonSPStream): """ - API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/vendor-direct-fulfillment-shipping-api/vendorDirectFulfillmentShippingV1.md + API docs: https://developer-docs.amazon.com/sp-api/docs/vendor-direct-fulfillment-shipping-api-v1-reference API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/vendor-direct-fulfillment-shipping-api-model/vendorDirectFulfillmentShippingV1.json Returns a list of shipping labels created during the time frame that you specify. @@ -1178,7 +1190,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: class ListFinancialEventGroups(FinanceStream): """ - API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/finances-api/financesV0.md#listfinancialeventgroups + API docs: https://developer-docs.amazon.com/sp-api/docs/finances-api-reference#get-financesv0financialeventgroups API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/finances-api-model/financesV0.json """ @@ -1199,7 +1211,7 @@ def parse_response( class ListFinancialEvents(FinanceStream): """ - API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/finances-api/financesV0.md#listfinancialevents + API docs: https://developer-docs.amazon.com/sp-api/docs/finances-api-reference#get-financesv0financialevents API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/finances-api-model/financesV0.json """ @@ -1221,12 +1233,10 @@ def parse_response( class FbaCustomerReturnsReports(IncrementalReportsAmazonSPStream): - name = "GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA" class FlatFileSettlementV2Reports(IncrementalReportsAmazonSPStream): - name = "GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE" transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization) @@ -1265,7 +1275,6 @@ def stream_slices( You can search for these reports using the getReports operation. ``` """ - strict_start_date = pendulum.now("utc").subtract(days=90) utc_now = pendulum.now("utc").date().to_date_string() diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_analytics_streams.py b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_analytics_streams.py index 61905c59fd3a..daa53acd0730 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_analytics_streams.py +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_analytics_streams.py @@ -126,13 +126,13 @@ def test_get_updated_state(self, report_init_kwargs, current_stream_state, lates [{"dataStartTime": "2023-09-06T00:00:00Z", "dataEndTime": "2023-09-06T23:59:59Z"}], ), ( - "2023-05-01T00:00:00Z", - "2023-09-07T00:00:00Z", + "2022-05-01T00:00:00Z", + "2023-09-05T00:00:00Z", None, 0, [ - {"dataStartTime": "2023-05-01T00:00:00Z", "dataEndTime": "2023-07-29T23:59:59Z"}, - {"dataStartTime": "2023-07-30T00:00:00Z", "dataEndTime": "2023-09-07T00:00:00Z"}, + {"dataStartTime": "2022-05-01T00:00:00Z", "dataEndTime": "2023-04-30T23:59:59Z"}, + {"dataStartTime": "2023-05-01T00:00:00Z", "dataEndTime": "2023-09-05T00:00:00Z"}, ], ), ), diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_streams.py index 50a53adba3e9..c431e79fe19d 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_streams.py @@ -59,19 +59,19 @@ def test_report_data(self, report_init_kwargs): [{"dataStartTime": "2022-09-01T00:00:00Z", "dataEndTime": "2022-10-01T00:00:00Z"}], ), ( - "2022-09-01T00:00:00Z", - "2023-01-01T00:00:00Z", + "2021-05-01T00:00:00Z", + "2022-09-05T00:00:00Z", [ - {"dataStartTime": "2022-09-01T00:00:00Z", "dataEndTime": "2022-11-29T23:59:59Z"}, - {"dataStartTime": "2022-11-30T00:00:00Z", "dataEndTime": "2023-01-01T00:00:00Z"}, + {"dataStartTime": "2021-05-01T00:00:00Z", "dataEndTime": "2022-04-30T23:59:59Z"}, + {"dataStartTime": "2022-05-01T00:00:00Z", "dataEndTime": "2022-09-05T00:00:00Z"}, ], ), ( - "2022-10-01T00:00:00Z", + "2021-10-01T00:00:00Z", None, [ - {"dataStartTime": "2022-10-01T00:00:00Z", "dataEndTime": "2022-12-29T23:59:59Z"}, - {"dataStartTime": "2022-12-30T00:00:00Z", "dataEndTime": "2023-01-01T00:00:00Z"} + {"dataStartTime": "2021-10-01T00:00:00Z", "dataEndTime": "2022-09-30T23:59:59Z"}, + {"dataStartTime": "2022-10-01T00:00:00Z", "dataEndTime": "2023-01-01T00:00:00Z"} ], ), ( @@ -126,9 +126,16 @@ def test_read_records_retrieve_fatal(self, report_init_kwargs, mocker, requests_ ) stream = SomeReportStream(**report_init_kwargs) + stream_start = "2022-09-03T00:00:00Z" + stream_end = "2022-10-03T00:00:00Z" with pytest.raises(AirbyteTracedException) as e: - list(stream.read_records(sync_mode=SyncMode.full_refresh)) - assert e.value.message == "The report for stream 'GET_TEST_REPORT' was not created - skip reading" + list(stream.read_records( + sync_mode=SyncMode.full_refresh, stream_slice={"dataStartTime": stream_start, "dataEndTime": stream_end}) + ) + assert e.value.internal_message == ( + f"Failed to retrieve the report 'GET_TEST_REPORT' for period {stream_start}-{stream_end} " + "due to Amazon Seller Partner platform issues. This will be read during the next sync." + ) def test_read_records_retrieve_cancelled(self, report_init_kwargs, mocker, requests_mock, caplog): mocker.patch("time.sleep", lambda x: None) diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_transform_function.py b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_transform_function.py index a2e343c6f077..0acc47ef9079 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_transform_function.py +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_transform_function.py @@ -107,7 +107,10 @@ def test_transform_merchant_fyp_reports(report_init_kwargs, input_data, expected @pytest.mark.parametrize( ("input_data", "expected_data"), ( + ({"Date": "07/29/2022", "dataEndTime": "2022-07-31"}, {"Date": "2022-07-29", "dataEndTime": "2022-07-31"}), ({"Date": "7/29/2022", "dataEndTime": "2022-07-31"}, {"Date": "2022-07-29", "dataEndTime": "2022-07-31"}), + ({"Date": "07/2022", "dataEndTime": "2022-07-31"}, {"Date": "2022-07-01", "dataEndTime": "2022-07-31"}), + ({"Date": "7/2022", "dataEndTime": "2022-07-31"}, {"Date": "2022-07-01", "dataEndTime": "2022-07-31"}), ({"Date": "", "dataEndTime": "2022-07-31"}, {"Date": "", "dataEndTime": "2022-07-31"}), ), ) diff --git a/docs/integrations/sources/amazon-seller-partner.md b/docs/integrations/sources/amazon-seller-partner.md index fcc0077703d1..622c71c00302 100644 --- a/docs/integrations/sources/amazon-seller-partner.md +++ b/docs/integrations/sources/amazon-seller-partner.md @@ -153,6 +153,7 @@ Information about rate limits you may find [here](https://developer-docs.amazon. | Version | Date | Pull Request | Subject | |:---------|:-----------|:------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `3.2.2` | 2024-02-07 | [\#34914](https://github.com/airbytehq/airbyte/pull/34914) | Fix date formatting for ledger reports with aggregation by month | | `3.2.1` | 2024-01-30 | [\#34654](https://github.com/airbytehq/airbyte/pull/34654) | Fix date format in state message for streams with custom dates formatting | | `3.2.0` | 2024-01-26 | [\#34549](https://github.com/airbytehq/airbyte/pull/34549) | Update schemas for vendor analytics streams | | `3.1.0` | 2024-01-17 | [\#34283](https://github.com/airbytehq/airbyte/pull/34283) | Delete deprecated streams |