Skip to content

Commit

Permalink
🐛Source Amazon Seller Partner: improve report streams performance (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Karpets authored Jan 3, 2024
1 parent 3fbc900 commit ff8ecbd
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
dockerImageTag: 3.0.0
dockerImageTag: 3.0.1
dockerRepository: airbyte/source-amazon-seller-partner
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-seller-partner
githubIssueLabel: source-amazon-seller-partner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
"period_in_days": {
"title": "Period In Days",
"type": "integer",
"description": "Will be used for stream slicing for initial full_refresh sync when no updated state is present for reports that support sliced incremental sync.",
"description": "For syncs spanning a large date range, this option is used to request data in a smaller fixed window to improve sync reliability. This time window can be configured granularly by day.",
"default": 90,
"minimum": 1,
"order": 9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import json as json_lib
import time
from abc import ABC, abstractmethod
from enum import Enum
from io import StringIO
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union

Expand Down Expand Up @@ -136,6 +137,14 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return {self.cursor_field: latest_benchmark}


class ReportProcessingStatus(str, Enum):
cancelled = "CANCELLED"
done = "DONE"
fatal = "FATAL"
in_progress = "IN_PROGRESS"
in_queue = "IN_QUEUE"


class ReportsAmazonSPStream(HttpStream, ABC):
max_wait_seconds = 3600
"""
Expand All @@ -161,6 +170,7 @@ class ReportsAmazonSPStream(HttpStream, ABC):
availability_sla_days = (
1 # see data availability sla at https://developer-docs.amazon.com/sp-api/docs/report-type-values#vendor-retail-analytics-reports
)
availability_strategy = None

def __init__(
self,
Expand Down Expand Up @@ -311,28 +321,36 @@ def read_records(
"""
report_payload = {}
stream_slice = stream_slice or {}
is_processed = False
start_time = pendulum.now("utc")
seconds_waited = 0
try:
report_id = self._create_report(sync_mode, cursor_field, stream_slice, stream_state)["reportId"]
except DefaultBackoffException as e:
logger.warning(f"The report for stream '{self.name}' was cancelled due to several failed retry attempts. {e}")
return []
except requests.exceptions.HTTPError as e:
if e.response.status_code == requests.codes.FORBIDDEN:
logger.warning(
f"The endpoint {e.response.url} returned {e.response.status_code}: {e.response.reason}. "
"This is most likely due to insufficient permissions on the credentials in use. "
"Try to grant required permissions/scopes or re-authenticate."
)
return []
raise e

# create and retrieve the report
while not is_processed and seconds_waited < self.max_wait_seconds:
processed = False
while not processed and seconds_waited < self.max_wait_seconds:
report_payload = self._retrieve_report(report_id=report_id)
seconds_waited = (pendulum.now("utc") - start_time).seconds
is_processed = report_payload.get("processingStatus") not in ["IN_QUEUE", "IN_PROGRESS"]
time.sleep(self.sleep_seconds)
processed = report_payload.get("processingStatus") not in (ReportProcessingStatus.in_queue, ReportProcessingStatus.in_progress)
if not processed:
time.sleep(self.sleep_seconds)

is_done = report_payload.get("processingStatus") == "DONE"
is_cancelled = report_payload.get("processingStatus") == "CANCELLED"
is_fatal = report_payload.get("processingStatus") == "FATAL"
processing_status = report_payload.get("processingStatus")
report_end_date = pendulum.parse(report_payload.get("dataEndTime", stream_slice.get("dataEndTime")))

if is_done:
if processing_status == ReportProcessingStatus.done:
# retrieve and decrypt the report document
document_id = report_payload["reportDocumentId"]
request_headers = self.request_headers()
Expand All @@ -346,12 +364,12 @@ def read_records(
if report_end_date:
record["dataEndTime"] = report_end_date.strftime(DATE_FORMAT)
yield record
elif is_fatal:
elif processing_status == ReportProcessingStatus.fatal:
raise AirbyteTracedException(message=f"The report for stream '{self.name}' was not created - skip reading")
elif is_cancelled:
elif processing_status == ReportProcessingStatus.cancelled:
logger.warning(f"The report for stream '{self.name}' was cancelled or there is no data to return")
else:
raise Exception(f"Unknown response for stream `{self.name}`. Response body {report_payload}")
raise Exception(f"Unknown response for stream '{self.name}'. Response body {report_payload}")


class IncrementalReportsAmazonSPStream(ReportsAmazonSPStream):
Expand Down Expand Up @@ -731,7 +749,6 @@ def stream_slices(
start_date = pendulum.parse(state)

start_date = min(start_date, end_date)
slices = []

while start_date < end_date:
# If request only returns data on day level
Expand All @@ -741,16 +758,12 @@ def stream_slices(
slice_range = self.period_in_days

end_date_slice = start_date.add(days=slice_range)
slices.append(
{
"dataStartTime": start_date.strftime(DATE_TIME_FORMAT),
"dataEndTime": min(end_date_slice.subtract(seconds=1), end_date).strftime(DATE_TIME_FORMAT),
}
)
yield {
"dataStartTime": start_date.strftime(DATE_TIME_FORMAT),
"dataEndTime": min(end_date_slice.subtract(seconds=1), end_date).strftime(DATE_TIME_FORMAT),
}
start_date = end_date_slice

return slices


class BrandAnalyticsMarketBasketReports(IncrementalAnalyticsStream):
name = "GET_BRAND_ANALYTICS_MARKET_BASKET_REPORT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#


from typing import Any, Dict
from unittest.mock import patch

import pendulum
Expand Down Expand Up @@ -144,6 +143,6 @@ def test_stream_slices(self, report_init_kwargs, start_date, end_date, stream_st
stream = SomeIncrementalAnalyticsStream(**report_init_kwargs)
stream.fixed_period_in_days = fixed_period_in_days
with patch("pendulum.now", return_value=pendulum.parse("2023-09-09T00:00:00Z")):
assert stream.stream_slices(
sync_mode=SyncMode.incremental, cursor_field=[stream.cursor_field], stream_state=stream_state
assert list(
stream.stream_slices(sync_mode=SyncMode.incremental, cursor_field=[stream.cursor_field], stream_state=stream_state)
) == expected_slices
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
#


from typing import Any, Dict
from unittest.mock import patch

import pendulum
import pytest
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.utils import AirbyteTracedException
from source_amazon_seller_partner.streams import IncrementalReportsAmazonSPStream, ReportsAmazonSPStream, VendorDirectFulfillmentShipping
from source_amazon_seller_partner.streams import (
IncrementalReportsAmazonSPStream,
ReportProcessingStatus,
ReportsAmazonSPStream,
VendorDirectFulfillmentShipping,
)


class SomeReportStream(ReportsAmazonSPStream):
Expand Down Expand Up @@ -118,7 +122,7 @@ def test_read_records_retrieve_fatal(self, report_init_kwargs, mocker, requests_
"GET",
f"https://test.url/reports/2021-06-30/reports/{report_id}",
status_code=200,
json={"processingStatus": "FATAL", "dataEndTime": "2022-10-03T00:00:00Z"},
json={"processingStatus": ReportProcessingStatus.fatal, "dataEndTime": "2022-10-03T00:00:00Z"},
)

stream = SomeReportStream(**report_init_kwargs)
Expand Down Expand Up @@ -146,7 +150,7 @@ def test_read_records_retrieve_cancelled(self, report_init_kwargs, mocker, reque
"GET",
f"https://test.url/reports/2021-06-30/reports/{report_id}",
status_code=200,
json={"processingStatus": "CANCELLED", "dataEndTime": "2022-10-03T00:00:00Z"},
json={"processingStatus": ReportProcessingStatus.cancelled, "dataEndTime": "2022-10-03T00:00:00Z"},
)

stream = SomeReportStream(**report_init_kwargs)
Expand Down Expand Up @@ -174,7 +178,7 @@ def test_read_records_retrieve_done(self, report_init_kwargs, mocker, requests_m
"GET",
f"https://test.url/reports/2021-06-30/reports/{report_id}",
status_code=200,
json={"processingStatus": "DONE", "dataEndTime": "2022-10-03T00:00:00Z", "reportDocumentId": document_id},
json={"processingStatus": ReportProcessingStatus.done, "dataEndTime": "2022-10-03T00:00:00Z", "reportDocumentId": document_id},
)
requests_mock.register_uri(
"GET",
Expand All @@ -188,6 +192,32 @@ def test_read_records_retrieve_done(self, report_init_kwargs, mocker, requests_m
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
assert records[0] == {"some_key": "some_value", "dataEndTime": "2022-10-03"}

def test_read_records_retrieve_forbidden(self, report_init_kwargs, mocker, requests_mock, caplog):
mocker.patch("time.sleep", lambda x: None)
requests_mock.register_uri(
"POST",
"https://api.amazon.com/auth/o2/token",
status_code=200,
json={"access_token": "access_token", "expires_in": "3600"},
)

report_id = "some_report_id"
requests_mock.register_uri(
"POST",
"https://test.url/reports/2021-06-30/reports",
status_code=403,
json={"reportId": report_id},
reason="Forbidden"
)

stream = SomeReportStream(**report_init_kwargs)
assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == []
assert (
"The endpoint https://test.url/reports/2021-06-30/reports returned 403: Forbidden. "
"This is most likely due to insufficient permissions on the credentials in use. "
"Try to grant required permissions/scopes or re-authenticate."
) in caplog.messages[-1]


class TestVendorDirectFulfillmentShipping:
@pytest.mark.parametrize(
Expand Down
19 changes: 12 additions & 7 deletions docs/integrations/sources/amazon-seller-partner-migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ Streams `GET_FLAT_FILE_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL` and `GET_FLAT_FILE

The following streams now have date-time formatted fields:

| Stream | Affected fields |
|-----------------------------------------------|-------------------------------------------------------------------------------|
| `GET_AMAZON_FULFILLED_SHIPMENTS_DATA_GENERAL` | `estimated-arrival-date` |
| `GET_LEDGER_DETAIL_VIEW_DATA` | `Date and Time` |
| `GET_MERCHANTS_LISTINGS_FYP_REPORT` | `Status Change Date` |
| `GET_STRANDED_INVENTORY_UI_DATA` | `Date-to-take-auto-removal` |
| `GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE` | `settlement-start-date`, `settlement-end-date`, `deposit-date`, `posted-date` |
| Stream | Affected fields | Format change |
|-----------------------------------------------|-------------------------------------------------------------------------------|----------------------------------------------------------------------|
| `GET_AMAZON_FULFILLED_SHIPMENTS_DATA_GENERAL` | `estimated-arrival-date` | `string YYYY-MM-DDTHH:mm:ssZ` -> `date-time YYYY-MM-DDTHH:mm:ssZ` |
| `GET_LEDGER_DETAIL_VIEW_DATA` | `Date and Time` | `string YYYY-MM-DDTHH:mm:ssZ` -> `date-time YYYY-MM-DDTHH:mm:ssZ` |
| `GET_MERCHANTS_LISTINGS_FYP_REPORT` | `Status Change Date` | `string MMM D[,] YYYY` -> `date-time YYYY-MM-DD` |
| `GET_STRANDED_INVENTORY_UI_DATA` | `Date-to-take-auto-removal` | `string YYYY-MM-DDTHH:mm:ssZ` -> `date-time YYYY-MM-DDTHH:mm:ssZ` |
| `GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE` | `settlement-start-date`, `settlement-end-date`, `deposit-date`, `posted-date` | `string YYYY-MM-DDTHH:mm:ssZ` -> `date-time YYYY-MM-DDTHH:mm:ssZ` |
| `GET_MERCHANT_LISTINGS_ALL_DATA` | `open-date` | `string YYYY-MM-DD HH:mm:ss ZZZ` -> `date-time YYYY-MM-DDTHH:mm:ssZ` |
| `GET_MERCHANT_LISTINGS_DATA` | `open-date` | `string YYYY-MM-DD HH:mm:ss ZZZ` -> `date-time YYYY-MM-DDTHH:mm:ssZ` |
| `GET_MERCHANT_LISTINGS_INACTIVE_DATA` | `open-date` | `string YYYY-MM-DD HH:mm:ss ZZZ` -> `date-time YYYY-MM-DDTHH:mm:ssZ` |
| `GET_MERCHANT_LISTINGS_DATA_BACK_COMPAT` | `open-date` | `string YYYY-MM-DD HH:mm:ss ZZZ` -> `date-time YYYY-MM-DDTHH:mm:ssZ` |


Users will need to refresh the source schemas and reset these streams after upgrading.

Expand Down
Loading

0 comments on commit ff8ecbd

Please sign in to comment.