From a6fb52ea9db051ae2229b374a30b3c2053014efc Mon Sep 17 00:00:00 2001 From: Serhii Chvaliuk Date: Wed, 23 Nov 2022 09:57:16 +0200 Subject: [PATCH] Source facebook marketing: check "breakdowns" combinations (#19645) Signed-off-by: Sergey Chvalyuk --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../source-facebook-marketing/Dockerfile | 2 +- .../source_facebook_marketing/source.py | 47 ++++++++++--------- .../source_facebook_marketing/utils.py | 10 ++++ .../unit_tests/test_source.py | 12 +---- .../sources/facebook-marketing.md | 1 + 7 files changed, 40 insertions(+), 36 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index ea11462818f5..6fbe4088dc8a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -437,7 +437,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.72 + dockerImageTag: 0.2.73 documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index de14185ee13f..2705fb703594 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3396,7 +3396,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.72" +- dockerImage: "airbyte/source-facebook-marketing:0.2.73" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.com/integrations/sources/facebook-marketing" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 5628c15c53eb..9def614575ff 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.72 +LABEL io.airbyte.version=0.2.73 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 76ac52e64c7e..ce412bda58f5 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -5,13 +5,14 @@ import logging from typing import Any, List, Mapping, Optional, Tuple, Type +import facebook_business import pendulum import requests from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from source_facebook_marketing.api import API -from source_facebook_marketing.spec import ConnectorConfig, InsightConfig +from source_facebook_marketing.spec import ConnectorConfig from source_facebook_marketing.streams import ( Activities, AdAccount, @@ -31,7 +32,7 @@ Videos, ) -from .utils import validate_end_date, validate_start_date +from .utils import read_full_refresh, validate_end_date, validate_start_date logger = logging.getLogger("airbyte") @@ -59,10 +60,18 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> try: api = API(account_id=config.account_id, access_token=config.access_token) logger.info(f"Select account {api.account}") - return True, None except requests.exceptions.RequestException as e: return False, e + # read one record from all custom insights streams + # to ensure that we have valid combination of "action_breakdowns" and "breakdowns" parameters + for stream in self.get_custom_insights_streams(api, config): + try: + next(read_full_refresh(stream), None) + except facebook_business.exceptions.FacebookRequestError as e: + return False, e._api_error_message + return True, None + def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: """Discovery method, returns available streams @@ -149,7 +158,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: ), ] - return self._update_insights_streams(insights=config.custom_insights, default_args=insights_args, streams=streams) + return streams + self.get_custom_insights_streams(api, config) def spec(self, *args, **kwargs) -> ConnectorSpecification: """Returns the spec for this integration. @@ -170,28 +179,20 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification: ), ) - def _update_insights_streams(self, insights: List[InsightConfig], default_args, streams) -> List[Type[Stream]]: - """Update method, if insights have values returns streams replacing the - default insights streams else returns streams - """ - if not insights: - return streams - - insights_custom_streams = list() - - for insight in insights: - args = dict( - api=default_args["api"], + def get_custom_insights_streams(self, api: API, config: ConnectorConfig) -> List[Type[Stream]]: + """return custom insights streams""" + streams = [] + for insight in config.custom_insights or []: + stream = AdsInsights( + api=api, name=f"Custom{insight.name}", fields=list(set(insight.fields)), breakdowns=list(set(insight.breakdowns)), action_breakdowns=list(set(insight.action_breakdowns)), time_increment=insight.time_increment, - start_date=insight.start_date or default_args["start_date"], - end_date=insight.end_date or default_args["end_date"], - insights_lookback_window=insight.insights_lookback_window or default_args["insights_lookback_window"], + start_date=insight.start_date or config.start_date, + end_date=insight.end_date or config.end_date, + insights_lookback_window=insight.insights_lookback_window or config.insights_lookback_window, ) - insight_stream = AdsInsights(**args) - insights_custom_streams.append(insight_stream) - - return streams + insights_custom_streams + streams.append(stream) + return streams diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/utils.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/utils.py index 392658ef1825..c5e3e2a99f1f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/utils.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/utils.py @@ -5,6 +5,8 @@ import logging import pendulum +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import Stream from pendulum import DateTime logger = logging.getLogger("airbyte") @@ -40,3 +42,11 @@ def validate_end_date(start_date: DateTime, end_date: DateTime) -> DateTime: logger.warning(message) return start_date return end_date + + +def read_full_refresh(stream_instance: Stream): + slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh) + for _slice in slices: + records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh) + for record in records: + yield record diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py index 3b0a7e3e8bf1..ac0352b2e445 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py @@ -99,20 +99,12 @@ def test_spec(self): assert isinstance(spec, ConnectorSpecification) - def test_update_insights_streams(self, api, config): + def test_get_custom_insights_streams(self, api, config): config["custom_insights"] = [ {"name": "test", "fields": ["account_id"], "breakdowns": ["ad_format_asset"], "action_breakdowns": ["action_device"]}, ] - streams = SourceFacebookMarketing().streams(config) config = ConnectorConfig.parse_obj(config) - insights_args = dict( - api=api, - start_date=config.start_date, - end_date=config.end_date, - ) - assert SourceFacebookMarketing()._update_insights_streams( - insights=config.custom_insights, default_args=insights_args, streams=streams - ) + assert SourceFacebookMarketing().get_custom_insights_streams(api, config) def test_check_config(config_gen, requests_mock): diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 671ded370b43..0034f2edc62a 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -127,6 +127,7 @@ Please be informed that the connector uses the `lookback_window` parameter to pe | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.2.73 | 2022-11-21 | [19645](https://github.com/airbytehq/airbyte/pull/19645) | Check "breakdowns" combinations | | 0.2.72 | 2022-11-04 | [18971](https://github.com/airbytehq/airbyte/pull/18971) | handle FacebookBadObjectError for empty results on async jobs | | 0.2.71 | 2022-10-31 | [18734](https://github.com/airbytehq/airbyte/pull/18734) | Reduce request record limit on retry | | 0.2.70 | 2022-10-26 | [18045](https://github.com/airbytehq/airbyte/pull/18045) | Upgrade FB SDK to v15.0 |