Skip to content

Commit

Permalink
šŸ›Source Amazon Seller Partner: fix date formatting for ledger reportsā€¦
Browse files Browse the repository at this point in the history
ā€¦ with aggregation by month (#34914)
  • Loading branch information
Anton Karpets authored Feb 7, 2024
1 parent 58de015 commit 454b846
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
dockerImageTag: 3.2.1
dockerImageTag: 3.2.2
dockerRepository: airbyte/source-amazon-seller-partner
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-seller-partner
githubIssueLabel: source-amazon-seller-partner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"type": "object",
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"Date": { "type": ["null", "string"], "format": "date-time" },
"Date": { "type": ["null", "string"], "format": "date" },
"FNSKU": { "type": ["null", "string"] },
"ASIN": { "type": ["null", "string"] },
"MSKU": { "type": ["null", "string"] },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def parse_response(
self, response: requests.Response, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs
) -> Iterable[Mapping]:
"""
:return an iterable containing each record in the response
Return an iterable containing each record in the response
"""
yield from response.json().get(self.data_field, [])

Expand All @@ -146,10 +146,8 @@ class ReportProcessingStatus(str, Enum):


class ReportsAmazonSPStream(HttpStream, ABC):
max_wait_seconds = 3600
"""
API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/reports-api/reports_2020-09-04.md
API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/reports-api-model/reports_2020-09-04.json
API docs: https://developer-docs.amazon.com/sp-api/docs/reports-api-v2021-06-30-reference
Report streams are intended to work as following:
- create a new report;
Expand All @@ -160,16 +158,17 @@ class ReportsAmazonSPStream(HttpStream, ABC):
- yield the report document (if report processing status is `DONE`)
"""

replication_start_date_limit_in_days = 90
max_wait_seconds = 3600
replication_start_date_limit_in_days = 365

primary_key = None
path_prefix = f"reports/{REPORTS_API_VERSION}"
sleep_seconds = 30
data_field = "payload"
result_key = None
availability_sla_days = (
1 # see data availability sla at https://developer-docs.amazon.com/sp-api/docs/report-type-values#vendor-retail-analytics-reports
)

# see data availability sla at https://developer-docs.amazon.com/sp-api/docs/report-type-values#vendor-retail-analytics-reports
availability_sla_days = 1
availability_strategy = None

def __init__(
Expand Down Expand Up @@ -203,6 +202,11 @@ def http_method(self) -> str:
def http_method(self, value: str):
self._http_method = value

@property
def retry_factor(self) -> float:
# https://developer-docs.amazon.com/sp-api/docs/reports-api-v2021-06-30-reference#post-reports2021-06-30reports
return 60.0

@property
def url_base(self) -> str:
return self._url_base
Expand Down Expand Up @@ -317,7 +321,7 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
"""
Create and retrieve the report.
Decrypt and parse the report is its fully proceed, then yield the report document records.
Decrypt and parse the report if it's fully processed, then yield the report document records.
"""
report_payload = {}
stream_slice = stream_slice or {}
Expand Down Expand Up @@ -365,11 +369,16 @@ def read_records(
record["dataEndTime"] = report_end_date.strftime(DATE_FORMAT)
yield record
elif processing_status == ReportProcessingStatus.FATAL:
raise AirbyteTracedException(message=f"The report for stream '{self.name}' was not created - skip reading")
raise AirbyteTracedException(
internal_message=(
f"Failed to retrieve the report '{self.name}' for period {stream_slice['dataStartTime']}-{stream_slice['dataEndTime']} "
"due to Amazon Seller Partner platform issues. This will be read during the next sync."
)
)
elif processing_status == ReportProcessingStatus.CANCELLED:
logger.warning(f"The report for stream '{self.name}' was cancelled or there is no data to return")
logger.warning(f"The report for stream '{self.name}' was cancelled or there is no data to return.")
else:
raise Exception(f"Unknown response for stream '{self.name}'. Response body {report_payload}")
raise Exception(f"Unknown response for stream '{self.name}'. Response body: {report_payload}.")


class IncrementalReportsAmazonSPStream(ReportsAmazonSPStream):
Expand Down Expand Up @@ -447,6 +456,8 @@ class FulfilledShipmentsReports(IncrementalReportsAmazonSPStream):

name = "GET_AMAZON_FULFILLED_SHIPMENTS_DATA_GENERAL"

# You can request up to one month of data in a single report
# https://developer-docs.amazon.com/sp-api/docs/report-type-values-fba#fba-sales-reports
replication_start_date_limit_in_days = 30


Expand Down Expand Up @@ -560,6 +571,10 @@ class StrandedInventoryUiReport(IncrementalReportsAmazonSPStream):


class XmlAllOrdersDataByOrderDataGeneral(IncrementalReportsAmazonSPStream):
name = "GET_XML_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"
primary_key = "AmazonOrderID"
cursor_field = "LastUpdatedDate"

def parse_document(self, document):
try:
parsed = xmltodict.parse(document, attr_prefix="", cdata_key="value", force_list={"Message", "OrderItem"})
Expand All @@ -575,10 +590,6 @@ def parse_document(self, document):

return result

name = "GET_XML_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL"
primary_key = "AmazonOrderID"
cursor_field = "LastUpdatedDate"


class MerchantListingsReportBackCompat(MerchantReports):
name = "GET_MERCHANT_LISTINGS_DATA_BACK_COMPAT"
Expand Down Expand Up @@ -627,6 +638,8 @@ class FlatFileArchivedOrdersDataByOrderDate(IncrementalReportsAmazonSPStream):
class FlatFileReturnsDataByReturnDate(IncrementalReportsAmazonSPStream):
name = "GET_FLAT_FILE_RETURNS_DATA_BY_RETURN_DATE"

# You can request up to 60 days of data in a single report
# https://developer-docs.amazon.com/sp-api/docs/report-type-values-returns
replication_start_date_limit_in_days = 60


Expand Down Expand Up @@ -686,7 +699,6 @@ def _augmented_data(self, report_options) -> Mapping[str, Any]:


class IncrementalAnalyticsStream(AnalyticsStream):

fixed_period_in_days = 0

@property
Expand Down Expand Up @@ -714,7 +726,6 @@ def _report_data(
def parse_response(
self, response: requests.Response, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs
) -> Iterable[Mapping]:

payload = response.json()

document = self.download_and_decompress_report_document(payload)
Expand Down Expand Up @@ -930,7 +941,7 @@ class FlatFileOrdersReportsByLastUpdate(IncrementalReportsAmazonSPStream):

class Orders(IncrementalAmazonSPStream):
"""
API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/orders-api/ordersV0.md
API docs: https://developer-docs.amazon.com/sp-api/docs/orders-api-v0-reference
API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/orders-api-model/ordersV0.json
"""

Expand Down Expand Up @@ -1050,10 +1061,11 @@ def __init__(self, *args, **kwargs):

@staticmethod
def get_transform_function():
def transform_function(original_value: Any, field_schema: Dict[str, Any]) -> Any:
def transform_function(original_value: str, field_schema: Dict[str, Any]) -> str:
if original_value and field_schema.get("format") == "date":
date_format = "MM/YYYY" if len(original_value) <= 7 else "MM/DD/YYYY"
try:
transformed_value = pendulum.from_format(original_value, "MM/DD/YYYY").to_date_string()
transformed_value = pendulum.from_format(original_value, date_format).to_date_string()
return transformed_value
except ValueError:
pass
Expand All @@ -1068,7 +1080,7 @@ class LedgerSummaryViewReport(LedgerDetailedViewReports):

class VendorDirectFulfillmentShipping(IncrementalAmazonSPStream):
"""
API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/vendor-direct-fulfillment-shipping-api/vendorDirectFulfillmentShippingV1.md
API docs: https://developer-docs.amazon.com/sp-api/docs/vendor-direct-fulfillment-shipping-api-v1-reference
API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/vendor-direct-fulfillment-shipping-api-model/vendorDirectFulfillmentShippingV1.json
Returns a list of shipping labels created during the time frame that you specify.
Expand Down Expand Up @@ -1178,7 +1190,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:

class ListFinancialEventGroups(FinanceStream):
"""
API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/finances-api/financesV0.md#listfinancialeventgroups
API docs: https://developer-docs.amazon.com/sp-api/docs/finances-api-reference#get-financesv0financialeventgroups
API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/finances-api-model/financesV0.json
"""

Expand All @@ -1199,7 +1211,7 @@ def parse_response(

class ListFinancialEvents(FinanceStream):
"""
API docs: https://github.com/amzn/selling-partner-api-docs/blob/main/references/finances-api/financesV0.md#listfinancialevents
API docs: https://developer-docs.amazon.com/sp-api/docs/finances-api-reference#get-financesv0financialevents
API model: https://github.com/amzn/selling-partner-api-models/blob/main/models/finances-api-model/financesV0.json
"""

Expand All @@ -1221,12 +1233,10 @@ def parse_response(


class FbaCustomerReturnsReports(IncrementalReportsAmazonSPStream):

name = "GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA"


class FlatFileSettlementV2Reports(IncrementalReportsAmazonSPStream):

name = "GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE"
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization)

Expand Down Expand Up @@ -1265,7 +1275,6 @@ def stream_slices(
You can search for these reports using the getReports operation.
```
"""

strict_start_date = pendulum.now("utc").subtract(days=90)
utc_now = pendulum.now("utc").date().to_date_string()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ def test_get_updated_state(self, report_init_kwargs, current_stream_state, lates
[{"dataStartTime": "2023-09-06T00:00:00Z", "dataEndTime": "2023-09-06T23:59:59Z"}],
),
(
"2023-05-01T00:00:00Z",
"2023-09-07T00:00:00Z",
"2022-05-01T00:00:00Z",
"2023-09-05T00:00:00Z",
None,
0,
[
{"dataStartTime": "2023-05-01T00:00:00Z", "dataEndTime": "2023-07-29T23:59:59Z"},
{"dataStartTime": "2023-07-30T00:00:00Z", "dataEndTime": "2023-09-07T00:00:00Z"},
{"dataStartTime": "2022-05-01T00:00:00Z", "dataEndTime": "2023-04-30T23:59:59Z"},
{"dataStartTime": "2023-05-01T00:00:00Z", "dataEndTime": "2023-09-05T00:00:00Z"},
],
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ def test_report_data(self, report_init_kwargs):
[{"dataStartTime": "2022-09-01T00:00:00Z", "dataEndTime": "2022-10-01T00:00:00Z"}],
),
(
"2022-09-01T00:00:00Z",
"2023-01-01T00:00:00Z",
"2021-05-01T00:00:00Z",
"2022-09-05T00:00:00Z",
[
{"dataStartTime": "2022-09-01T00:00:00Z", "dataEndTime": "2022-11-29T23:59:59Z"},
{"dataStartTime": "2022-11-30T00:00:00Z", "dataEndTime": "2023-01-01T00:00:00Z"},
{"dataStartTime": "2021-05-01T00:00:00Z", "dataEndTime": "2022-04-30T23:59:59Z"},
{"dataStartTime": "2022-05-01T00:00:00Z", "dataEndTime": "2022-09-05T00:00:00Z"},
],
),
(
"2022-10-01T00:00:00Z",
"2021-10-01T00:00:00Z",
None,
[
{"dataStartTime": "2022-10-01T00:00:00Z", "dataEndTime": "2022-12-29T23:59:59Z"},
{"dataStartTime": "2022-12-30T00:00:00Z", "dataEndTime": "2023-01-01T00:00:00Z"}
{"dataStartTime": "2021-10-01T00:00:00Z", "dataEndTime": "2022-09-30T23:59:59Z"},
{"dataStartTime": "2022-10-01T00:00:00Z", "dataEndTime": "2023-01-01T00:00:00Z"}
],
),
(
Expand Down Expand Up @@ -126,9 +126,16 @@ def test_read_records_retrieve_fatal(self, report_init_kwargs, mocker, requests_
)

stream = SomeReportStream(**report_init_kwargs)
stream_start = "2022-09-03T00:00:00Z"
stream_end = "2022-10-03T00:00:00Z"
with pytest.raises(AirbyteTracedException) as e:
list(stream.read_records(sync_mode=SyncMode.full_refresh))
assert e.value.message == "The report for stream 'GET_TEST_REPORT' was not created - skip reading"
list(stream.read_records(
sync_mode=SyncMode.full_refresh, stream_slice={"dataStartTime": stream_start, "dataEndTime": stream_end})
)
assert e.value.internal_message == (
f"Failed to retrieve the report 'GET_TEST_REPORT' for period {stream_start}-{stream_end} "
"due to Amazon Seller Partner platform issues. This will be read during the next sync."
)

def test_read_records_retrieve_cancelled(self, report_init_kwargs, mocker, requests_mock, caplog):
mocker.patch("time.sleep", lambda x: None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ def test_transform_merchant_fyp_reports(report_init_kwargs, input_data, expected
@pytest.mark.parametrize(
("input_data", "expected_data"),
(
({"Date": "07/29/2022", "dataEndTime": "2022-07-31"}, {"Date": "2022-07-29", "dataEndTime": "2022-07-31"}),
({"Date": "7/29/2022", "dataEndTime": "2022-07-31"}, {"Date": "2022-07-29", "dataEndTime": "2022-07-31"}),
({"Date": "07/2022", "dataEndTime": "2022-07-31"}, {"Date": "2022-07-01", "dataEndTime": "2022-07-31"}),
({"Date": "7/2022", "dataEndTime": "2022-07-31"}, {"Date": "2022-07-01", "dataEndTime": "2022-07-31"}),
({"Date": "", "dataEndTime": "2022-07-31"}, {"Date": "", "dataEndTime": "2022-07-31"}),
),
)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/amazon-seller-partner.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Information about rate limits you may find [here](https://developer-docs.amazon.

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `3.2.2` | 2024-02-07 | [\#34914](https://github.com/airbytehq/airbyte/pull/34914) | Fix date formatting for ledger reports with aggregation by month |
| `3.2.1` | 2024-01-30 | [\#34654](https://github.com/airbytehq/airbyte/pull/34654) | Fix date format in state message for streams with custom dates formatting |
| `3.2.0` | 2024-01-26 | [\#34549](https://github.com/airbytehq/airbyte/pull/34549) | Update schemas for vendor analytics streams |
| `3.1.0` | 2024-01-17 | [\#34283](https://github.com/airbytehq/airbyte/pull/34283) | Delete deprecated streams |
Expand Down

0 comments on commit 454b846

Please sign in to comment.