Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Amazon Ads: Add new tactic support for sponsored_display_report_stream #29212

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=3.0.0
LABEL io.airbyte.version=3.1.0
LABEL io.airbyte.name=airbyte/source-amazon-ads
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
dockerImageTag: 3.0.0
dockerImageTag: 3.1.0
dockerRepository: airbyte/source-amazon-ads
githubIssueLabel: source-amazon-ads
icon: amazonads.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def _get_init_report_body(self, report_date: str, record_type: str, profile):
body = {
"reportDate": report_date,
}
return {**body, "metrics": ",".join(metrics_list)}
yield {**body, "metrics": ",".join(metrics_list)}


METRICS_MAP_V3 = {
Expand Down Expand Up @@ -187,4 +187,4 @@ def _get_init_report_body(self, report_date: str, record_type: str, profile):
},
}

return body
yield body
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,4 @@ def _get_init_report_body(self, report_date: str, record_type: str, profile):
"reportDate": report_date,
"creativeType": "video",
}
return {**body, "metrics": ",".join(metrics_list)}
yield {**body, "metrics": ",".join(metrics_list)}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from enum import Enum

from .report_streams import RecordType, ReportStream

Expand Down Expand Up @@ -214,12 +213,7 @@

METRICS_TYPE_TO_ID_MAP = {"campaigns": "campaignId", "adGroups": "adGroupId", "productAds": "adId", "targets": "targetId", "asins": "asin"}


class Tactics(str, Enum):
T00001 = "T00001"
T00020 = "T00020"
T00030 = "T00030"
REMARKETING = "remarketing"
TACTICS = ["T00020", "T00030"]
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved


class SponsoredDisplayReportStream(ReportStream):
Expand All @@ -234,15 +228,15 @@ def report_init_endpoint(self, record_type: str) -> str:
metrics_type_to_id_map = METRICS_TYPE_TO_ID_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
if record_type == RecordType.ASINS and profile.accountInfo.type == "vendor":
return None
elif record_type == RecordType.PRODUCTADS and profile.accountInfo.type != "seller":
# Remove SKU from metrics since it is only available for seller accounts in Product Ad report
metrics_list = [m for m in metrics_list if m != "sku"]
return {
"reportDate": report_date,
# Only for most common T00020 tactic for now
"tactic": Tactics.T00020,
"metrics": ",".join(metrics_list),
}
for tactic in TACTICS:
metrics_list = self.metrics_map[record_type]
if record_type == RecordType.ASINS and profile.accountInfo.type == "vendor":
return None
elif record_type == RecordType.PRODUCTADS and profile.accountInfo.type != "seller":
# Remove SKU from metrics since it is only available for seller accounts in Product Ad report
metrics_list = [m for m in metrics_list if m != "sku"]
yield {
"reportDate": report_date,
"tactic": tactic,
"metrics": ",".join(metrics_list),
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,4 +316,4 @@ def _get_init_report_body(self, report_date: str, record_type: str, profile):
},
}

return body
yield body
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class ReportStream(BasicAmazonAdsStream, ABC):
# Check if the connector received an error like: 'Tactic T00020 is not supported for report API in marketplace A1C3SOZRARQ6R3.'
# https://docs.developer.amazonservices.com/en_UK/dev_guide/DG_Endpoints.html
(400, re.compile(r"^Tactic T00020 is not supported for report API in marketplace [A-Z\d]+\.$")),
(400, re.compile(r"^Tactic T00030 is not supported for report API in marketplace [A-Z\d]+\.$")),
# Check if the connector received an error: 'Report date is too far in the past. Reports are only available for 60 days.'
# In theory, it does not have to get such an error because the connector correctly calculates the start date,
# but from practice, we can still catch such errors from time to time.
Expand Down Expand Up @@ -385,39 +386,41 @@ def _init_reports(self, profile: Profile, report_date: str) -> List[ReportInfo]:
if len(self._report_record_types) > 0 and record_type not in self._report_record_types:
continue

report_init_body = self._get_init_report_body(report_date, record_type, profile)
if not report_init_body:
continue
# Some of the record types has subtypes. For example asins type
# for product report have keyword and targets subtypes and it
# represented as asins_keywords and asins_targets types. Those
# subtypes have mutually excluded parameters so we requesting
# different metric list for each record.
request_record_type = record_type.split("_")[0]
self.logger.info(f"Initiating report generation for {profile.profileId} profile with {record_type} type for {report_date} date")
response = self._send_http_request(
urljoin(self._url, self.report_init_endpoint(request_record_type)),
profile.profileId,
report_init_body,
)
if response.status_code != self.report_is_created:
error_msg = f"Unexpected HTTP status code {response.status_code} when registering {record_type}, {type(self).__name__} for {profile.profileId} profile: {response.text}"
if self._skip_known_errors(response):
self.logger.warning(error_msg)
break
raise ReportInitFailure(error_msg)

response = ReportInitResponse.parse_raw(response.text)
report_infos.append(
ReportInfo(
report_id=response.reportId,
record_type=record_type,
profile_id=profile.profileId,
status=Status.IN_PROGRESS,
metric_objects=[],
for report_init_body in self._get_init_report_body(report_date, record_type, profile):
if not report_init_body:
continue
# Some of the record types has subtypes. For example asins type
# for product report have keyword and targets subtypes and it
# represented as asins_keywords and asins_targets types. Those
# subtypes have mutually excluded parameters so we requesting
# different metric list for each record.
request_record_type = record_type.split("_")[0]
self.logger.info(
f"Initiating report generation for {profile.profileId} profile with {record_type} type for {report_date} date"
)
)
self.logger.info("Initiated successfully")
response = self._send_http_request(
urljoin(self._url, self.report_init_endpoint(request_record_type)),
profile.profileId,
report_init_body,
)
if response.status_code != self.report_is_created:
error_msg = f"Unexpected HTTP status code {response.status_code} when registering {record_type}, {type(self).__name__} for {profile.profileId} profile: {response.text}"
if self._skip_known_errors(response):
self.logger.warning(error_msg)
break
raise ReportInitFailure(error_msg)

response = ReportInitResponse.parse_raw(response.text)
report_infos.append(
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
ReportInfo(
report_id=response.reportId,
record_type=record_type,
profile_id=profile.profileId,
status=Status.IN_PROGRESS,
metric_objects=[],
)
)
self.logger.info("Initiated successfully")

return report_infos

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
SponsoredProductCampaigns,
SponsoredProductsReportStream,
)
from source_amazon_ads.streams.report_streams.display_report import TACTICS
from source_amazon_ads.streams.report_streams.report_streams import (
RecordType,
ReportGenerationFailure,
Expand Down Expand Up @@ -196,14 +197,14 @@ def test_display_report_stream(config):
stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock())
stream_slice = {"profile": profiles[0], "reportDate": "20210725"}
metrics = [m for m in stream.read_records(SyncMode.incremental, stream_slice=stream_slice)]
assert len(metrics) == METRICS_COUNT * len(stream.metrics_map)
assert len(metrics) == METRICS_COUNT * len(stream.metrics_map) * len(TACTICS)

profiles = make_profiles(profile_type="vendor")
stream = SponsoredDisplayReportStream(config, profiles, authenticator=mock.MagicMock())
stream_slice["profile"] = profiles[0]
metrics = [m for m in stream.read_records(SyncMode.incremental, stream_slice=stream_slice)]
# Skip asins record for vendor profiles
assert len(metrics) == METRICS_COUNT * (len(stream.metrics_map) - 1)
assert len(metrics) == METRICS_COUNT * (len(stream.metrics_map) - 1) * len(TACTICS)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -376,15 +377,15 @@ def test_display_report_stream_init_too_many_requests(mocker, config):
[
(
[
(lambda x: x <= 5, "SUCCESS", None),
(lambda x: x <= 10, "SUCCESS", None),
],
5,
10,
),
(
[
(lambda x: x > 5, "SUCCESS", None),
(lambda x: x > 10, "SUCCESS", None),
],
10,
20,
),
(
[
Expand All @@ -398,7 +399,7 @@ def test_display_report_stream_init_too_many_requests(mocker, config):
(lambda x: x >= 6 and x <= 10, None, "2021-01-02 03:23:05"),
(lambda x: x >= 11, "SUCCESS", "2021-01-02 03:24:06"),
],
15,
20,
),
(
[
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/amazon-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Information about expected report generation waiting time you may find [here](ht

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 3.1.0 | 2023-08-08 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Add `T00030` tactic support for `sponsored_display_report_stream` |
| 3.0.0 | 2023-07-24 | [27868](https://github.com/airbytehq/airbyte/pull/27868) | Fix attribution report stream schemas |
| 2.3.1 | 2023-07-11 | [28155](https://github.com/airbytehq/airbyte/pull/28155) | Bugfix: validation error when record values are missing |
| 2.3.0 | 2023-07-06 | [28002](https://github.com/airbytehq/airbyte/pull/28002) | Add sponsored_product_ad_group_suggested_keywords, sponsored_product_ad_group_bid_recommendations streams |
Expand Down Expand Up @@ -130,7 +131,7 @@ Information about expected report generation waiting time you may find [here](ht
| 0.1.19 | 2022-08-31 | [16191](https://github.com/airbytehq/airbyte/pull/16191) | Improved connector's input configuration validation |
| 0.1.18 | 2022-08-25 | [15951](https://github.com/airbytehq/airbyte/pull/15951) | Skip API error "Tactic T00020 is not supported for report API in marketplace A1C3SOZRARQ6R3." |
| 0.1.17 | 2022-08-24 | [15921](https://github.com/airbytehq/airbyte/pull/15921) | Skip API error "Report date is too far in the past." |
| 0.1.16 | 2022-08-23 | [15822](https://github.com/airbytehq/airbyte/pull/15822) | Set default value for 'region' if needed |
| 0.1.16 | 2022-08-23 | [15822](https://github.com/airbytehq/airbyte/pull/15822) | Set default value for `region` if needed |
| 0.1.15 | 2022-08-20 | [15816](https://github.com/airbytehq/airbyte/pull/15816) | Update STATE of incremental sync if no records |
| 0.1.14 | 2022-08-15 | [15637](https://github.com/airbytehq/airbyte/pull/15637) | Generate slices by lazy evaluation |
| 0.1.12 | 2022-08-09 | [15469](https://github.com/airbytehq/airbyte/pull/15469) | Define primary_key for all report streams |
Expand All @@ -140,7 +141,7 @@ Information about expected report generation waiting time you may find [here](ht
| 0.1.8 | 2022-05-04 | [12482](https://github.com/airbytehq/airbyte/pull/12482) | Update input configuration copy |
| 0.1.7 | 2022-04-27 | [11730](https://github.com/airbytehq/airbyte/pull/11730) | Update fields in source-connectors specifications |
| 0.1.6 | 2022-04-20 | [11659](https://github.com/airbytehq/airbyte/pull/11659) | Add adId to products report |
| 0.1.5 | 2022-04-08 | [11430](https://github.com/airbytehq/airbyte/pull/11430) | Added support OAuth2.0 |
| 0.1.5 | 2022-04-08 | [11430](https://github.com/airbytehq/airbyte/pull/11430) | Add support OAuth2.0 |
| 0.1.4 | 2022-02-21 | [10513](https://github.com/airbytehq/airbyte/pull/10513) | Increasing REPORT_WAIT_TIMEOUT for supporting report generation which takes longer time |
| 0.1.3 | 2021-12-28 | [8388](https://github.com/airbytehq/airbyte/pull/8388) | Add retry if recoverable error occured for reporting stream processing |
| 0.1.2 | 2021-10-01 | [6367](https://github.com/airbytehq/airbyte/pull/6461) | Add option to pull data for different regions. Add option to choose profiles we want to pull data. Add lookback |
Expand Down