From 98f279da9fa4f4c2db1372a03325b7db0730150c Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 13 Jan 2023 13:49:16 +0100 Subject: [PATCH 1/4] Source Facebook Marketing: Videos stream remove filtering --- .../streams/streams.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py index 4af8d8392c05..07ac337c2e55 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py @@ -151,8 +151,27 @@ class Videos(FBMarketingIncrementalStream): entity_prefix = "video" def list_objects(self, params: Mapping[str, Any]) -> Iterable: + # Remove filtering as it is not working for this stream since 2023-01-13 + del(params["filtering"]) return self._api.account.get_ad_videos(params=params) + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """Main read method used by CDK""" + records_iter = self.list_objects(params=self.request_params(stream_state=stream_state)) + loaded_records_iter = (record.api_get(fields=self.fields, pending=self.use_batch) for record in records_iter) + if self.use_batch: + loaded_records_iter = self.execute_in_batch(loaded_records_iter) + start_date = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self._start_date + for record in loaded_records_iter: + if pendulum.parse(record.get(self.cursor_field)) > start_date: + yield record + class AdAccount(FBMarketingStream): """See: https://developers.facebook.com/docs/marketing-api/reference/ad-account""" From 3c6fb545fd13e23d901f85fd88d498ca3946fede Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 13 Jan 2023 13:59:36 +0100 Subject: [PATCH 2/4] Source Facebook Marketing: update docs --- .../connectors/source-facebook-marketing/Dockerfile | 2 +- docs/integrations/sources/facebook-marketing.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 82f065cbf77b..03e57ca90bfa 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.82 +LABEL io.airbyte.version=0.2.83 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 4e5d041ce5f8..bd21d47fa2c5 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -133,6 +133,7 @@ Please be informed that the connector uses the `lookback_window` parameter to pe | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.2.83 | 2023-01-13 | [21149](https://github.com/airbytehq/airbyte/pull/21149) | Videos stream remove filtering | | 0.2.82 | 2023-01-09 | [21149](https://github.com/airbytehq/airbyte/pull/21149) | Fix AdAccount schema | | 0.2.81 | 2023-01-05 | [21057](https://github.com/airbytehq/airbyte/pull/21057) | Remove unsupported fields from request | | 0.2.80 | 2022-12-21 | [20736](https://github.com/airbytehq/airbyte/pull/20736) | Fix update next cursor | From 14d166dcb712a27382fcb16d446f597262335511 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 13 Jan 2023 15:09:41 +0100 Subject: [PATCH 3/4] Source Facebook Marketing: Ref --- .../streams/base_streams.py | 6 +++-- .../streams/streams.py | 27 +++++-------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py index d24929d5d164..7a1a8f628e74 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -15,7 +15,6 @@ from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from cached_property import cached_property from facebook_business.adobjects.abstractobject import AbstractObject -from facebook_business.adobjects.adimage import AdImage from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse from .common import deep_merge @@ -243,6 +242,9 @@ def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: """Don't have classic cursor filtering""" return {} + def get_record_deleted_status(self, record) -> bool: + return False + def read_records( self, sync_mode: SyncMode, @@ -261,7 +263,7 @@ def read_records( record_cursor_value = pendulum.parse(record[self.cursor_field]) if self._cursor_value and record_cursor_value < self._cursor_value: break - if not self._include_deleted and record[AdImage.Field.status] == AdImage.Status.deleted: + if not self._include_deleted and self.get_record_deleted_status(record): continue self._max_cursor_value = self._max_cursor_value or record_cursor_value diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py index 07ac337c2e55..d30a1ae58128 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py @@ -8,6 +8,8 @@ import pendulum import requests +from facebook_business.adobjects.adimage import AdImage + from airbyte_cdk.models import SyncMode from cached_property import cached_property from facebook_business.adobjects.abstractobject import AbstractObject @@ -145,32 +147,14 @@ def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: return {"since": since.int_timestamp} -class Videos(FBMarketingIncrementalStream): +class Videos(FBMarketingReversedIncrementalStream): """See: https://developers.facebook.com/docs/marketing-api/reference/video""" entity_prefix = "video" def list_objects(self, params: Mapping[str, Any]) -> Iterable: # Remove filtering as it is not working for this stream since 2023-01-13 - del(params["filtering"]) - return self._api.account.get_ad_videos(params=params) - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - """Main read method used by CDK""" - records_iter = self.list_objects(params=self.request_params(stream_state=stream_state)) - loaded_records_iter = (record.api_get(fields=self.fields, pending=self.use_batch) for record in records_iter) - if self.use_batch: - loaded_records_iter = self.execute_in_batch(loaded_records_iter) - start_date = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self._start_date - for record in loaded_records_iter: - if pendulum.parse(record.get(self.cursor_field)) > start_date: - yield record + return self._api.account.get_ad_videos(params=params, fields=self.fields) class AdAccount(FBMarketingStream): @@ -213,6 +197,9 @@ class Images(FBMarketingReversedIncrementalStream): def list_objects(self, params: Mapping[str, Any]) -> Iterable: return self._api.account.get_ad_images(params=params, fields=self.fields) + def get_record_deleted_status(self, record) -> bool: + return record[AdImage.Field.status] == AdImage.Status.deleted + class AdsInsightsAgeAndGender(AdsInsights): breakdowns = ["age", "gender"] From 793a4786ad36de5acb4e1b64392c9c3419d5b0fe Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Fri, 13 Jan 2023 17:21:23 +0000 Subject: [PATCH 4/4] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- .../source_facebook_marketing/streams/streams.py | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index c12e6d17482d..9bb9ef20d3b3 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -480,7 +480,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.82 + dockerImageTag: 0.2.83 documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 635af0131274..73ac6f81c33a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3500,7 +3500,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.82" +- dockerImage: "airbyte/source-facebook-marketing:0.2.83" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.com/integrations/sources/facebook-marketing" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py index d30a1ae58128..4d7d397983ef 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py @@ -8,12 +8,11 @@ import pendulum import requests -from facebook_business.adobjects.adimage import AdImage - from airbyte_cdk.models import SyncMode from cached_property import cached_property from facebook_business.adobjects.abstractobject import AbstractObject from facebook_business.adobjects.adaccount import AdAccount as FBAdAccount +from facebook_business.adobjects.adimage import AdImage from facebook_business.adobjects.user import User from .base_insight_streams import AdsInsights