From 29d6b754a49d34f4f3714159100fa8e6a6239219 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 21 Nov 2022 12:04:47 +0200 Subject: [PATCH 1/6] update_insights_streams -> get_custom_insights_streams Signed-off-by: Sergey Chvalyuk --- .../source_facebook_marketing/source.py | 34 +++++++------------ .../unit_tests/test_source.py | 6 ++-- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 76ac52e64c7e..f7a694a78c9a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -11,7 +11,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from source_facebook_marketing.api import API -from source_facebook_marketing.spec import ConnectorConfig, InsightConfig +from source_facebook_marketing.spec import ConnectorConfig from source_facebook_marketing.streams import ( Activities, AdAccount, @@ -149,7 +149,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: ), ] - return self._update_insights_streams(insights=config.custom_insights, default_args=insights_args, streams=streams) + return streams + self.get_custom_insights_streams(api, config) def spec(self, *args, **kwargs) -> ConnectorSpecification: """Returns the spec for this integration. @@ -170,28 +170,20 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification: ), ) - def _update_insights_streams(self, insights: List[InsightConfig], default_args, streams) -> List[Type[Stream]]: - """Update method, if insights have values returns streams replacing the - default insights streams else returns streams - """ - if not insights: - return streams - - insights_custom_streams = list() - - for insight in insights: - args = dict( - api=default_args["api"], + def get_custom_insights_streams(self, api: API, config: ConnectorConfig) -> List[Type[Stream]]: + """return custom insights streams""" + streams = [] + for insight in config.custom_insights or []: + stream = AdsInsights( + api=api, name=f"Custom{insight.name}", fields=list(set(insight.fields)), breakdowns=list(set(insight.breakdowns)), action_breakdowns=list(set(insight.action_breakdowns)), time_increment=insight.time_increment, - start_date=insight.start_date or default_args["start_date"], - end_date=insight.end_date or default_args["end_date"], - insights_lookback_window=insight.insights_lookback_window or default_args["insights_lookback_window"], + start_date=insight.start_date or config.start_date, + end_date=insight.end_date or config.end_date, + insights_lookback_window=insight.insights_lookback_window or config.insights_lookback_window, ) - insight_stream = AdsInsights(**args) - insights_custom_streams.append(insight_stream) - - return streams + insights_custom_streams + streams.append(stream) + return streams diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py index 3b0a7e3e8bf1..0c6bd38faebc 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py @@ -99,7 +99,7 @@ def test_spec(self): assert isinstance(spec, ConnectorSpecification) - def test_update_insights_streams(self, api, config): + def test_get_custom_insights_streams(self, api, config): config["custom_insights"] = [ {"name": "test", "fields": ["account_id"], "breakdowns": ["ad_format_asset"], "action_breakdowns": ["action_device"]}, ] @@ -110,9 +110,7 @@ def test_update_insights_streams(self, api, config): start_date=config.start_date, end_date=config.end_date, ) - assert SourceFacebookMarketing()._update_insights_streams( - insights=config.custom_insights, default_args=insights_args, streams=streams - ) + assert SourceFacebookMarketing().get_custom_insights_streams(api, config) def test_check_config(config_gen, requests_mock): From 5a0bf7f8eb76d84008efc55a25c00755f570672f Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 21 Nov 2022 12:22:11 +0200 Subject: [PATCH 2/6] read record from custom_insights_streams Signed-off-by: Sergey Chvalyuk --- .../source_facebook_marketing/source.py | 11 +++++++++-- .../source_facebook_marketing/utils.py | 10 ++++++++++ .../unit_tests/test_source.py | 6 ------ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index f7a694a78c9a..3cd2db7279da 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -5,6 +5,7 @@ import logging from typing import Any, List, Mapping, Optional, Tuple, Type +import facebook_business import pendulum import requests from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification @@ -31,7 +32,7 @@ Videos, ) -from .utils import validate_end_date, validate_start_date +from .utils import read_full_refresh, validate_end_date, validate_start_date logger = logging.getLogger("airbyte") @@ -59,10 +60,16 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> try: api = API(account_id=config.account_id, access_token=config.access_token) logger.info(f"Select account {api.account}") - return True, None except requests.exceptions.RequestException as e: return False, e + for stream in self.get_custom_insights_streams(api, config): + try: + next(read_full_refresh(stream), None) + except facebook_business.exceptions.FacebookRequestError as e: + return False, e._api_error_message + return True, None + def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: """Discovery method, returns available streams diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/utils.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/utils.py index 392658ef1825..c5e3e2a99f1f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/utils.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/utils.py @@ -5,6 +5,8 @@ import logging import pendulum +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import Stream from pendulum import DateTime logger = logging.getLogger("airbyte") @@ -40,3 +42,11 @@ def validate_end_date(start_date: DateTime, end_date: DateTime) -> DateTime: logger.warning(message) return start_date return end_date + + +def read_full_refresh(stream_instance: Stream): + slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh) + for _slice in slices: + records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh) + for record in records: + yield record diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py index 0c6bd38faebc..ac0352b2e445 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py @@ -103,13 +103,7 @@ def test_get_custom_insights_streams(self, api, config): config["custom_insights"] = [ {"name": "test", "fields": ["account_id"], "breakdowns": ["ad_format_asset"], "action_breakdowns": ["action_device"]}, ] - streams = SourceFacebookMarketing().streams(config) config = ConnectorConfig.parse_obj(config) - insights_args = dict( - api=api, - start_date=config.start_date, - end_date=config.end_date, - ) assert SourceFacebookMarketing().get_custom_insights_streams(api, config) From c01b2c4fba1f84e107e289ce234d5a4c88856a77 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 21 Nov 2022 12:23:16 +0200 Subject: [PATCH 3/6] bump 0.2.73 Signed-off-by: Sergey Chvalyuk --- .../connectors/source-facebook-marketing/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 5628c15c53eb..9def614575ff 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.72 +LABEL io.airbyte.version=0.2.73 LABEL io.airbyte.name=airbyte/source-facebook-marketing From 07a77526647891f8a3e5a91cabab9e1853246b1c Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 21 Nov 2022 12:26:57 +0200 Subject: [PATCH 4/6] comment added Signed-off-by: Sergey Chvalyuk --- .../source_facebook_marketing/source.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 3cd2db7279da..ce412bda58f5 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -63,6 +63,8 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> except requests.exceptions.RequestException as e: return False, e + # read one record from all custom insights streams + # to ensure that we have valid combination of "action_breakdowns" and "breakdowns" parameters for stream in self.get_custom_insights_streams(api, config): try: next(read_full_refresh(stream), None) From 85f9b5437ea850d8729cf955c8513bd1033c65e1 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 21 Nov 2022 12:31:02 +0200 Subject: [PATCH 5/6] facebook-marketing.md updated Signed-off-by: Sergey Chvalyuk --- docs/integrations/sources/facebook-marketing.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 671ded370b43..0034f2edc62a 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -127,6 +127,7 @@ Please be informed that the connector uses the `lookback_window` parameter to pe | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.2.73 | 2022-11-21 | [19645](https://github.com/airbytehq/airbyte/pull/19645) | Check "breakdowns" combinations | | 0.2.72 | 2022-11-04 | [18971](https://github.com/airbytehq/airbyte/pull/18971) | handle FacebookBadObjectError for empty results on async jobs | | 0.2.71 | 2022-10-31 | [18734](https://github.com/airbytehq/airbyte/pull/18734) | Reduce request record limit on retry | | 0.2.70 | 2022-10-26 | [18045](https://github.com/airbytehq/airbyte/pull/18045) | Upgrade FB SDK to v15.0 | From 9f6cbdd95f7ae110d645e4cb6be4e09e7eef9bec Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Mon, 21 Nov 2022 13:53:26 +0000 Subject: [PATCH 6/6] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 513cd1673eb7..3f746cb40ef5 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -437,7 +437,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.72 + dockerImageTag: 0.2.73 documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 7f715feddb29..38a271516d0b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3396,7 +3396,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.72" +- dockerImage: "airbyte/source-facebook-marketing:0.2.73" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.com/integrations/sources/facebook-marketing"