Skip to content

Commit

Permalink
Source facebook marketing: check "breakdowns" combinations (#19645)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored Nov 23, 2022
1 parent 2edb5f3 commit a6fb52e
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.72
dockerImageTag: 0.2.73
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3396,7 +3396,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.72"
- dockerImage: "airbyte/source-facebook-marketing:0.2.73"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.com/integrations/sources/facebook-marketing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.72
LABEL io.airbyte.version=0.2.73
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import logging
from typing import Any, List, Mapping, Optional, Tuple, Type

import facebook_business
import pendulum
import requests
from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from source_facebook_marketing.api import API
from source_facebook_marketing.spec import ConnectorConfig, InsightConfig
from source_facebook_marketing.spec import ConnectorConfig
from source_facebook_marketing.streams import (
Activities,
AdAccount,
Expand All @@ -31,7 +32,7 @@
Videos,
)

from .utils import validate_end_date, validate_start_date
from .utils import read_full_refresh, validate_end_date, validate_start_date

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -59,10 +60,18 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
try:
api = API(account_id=config.account_id, access_token=config.access_token)
logger.info(f"Select account {api.account}")
return True, None
except requests.exceptions.RequestException as e:
return False, e

# read one record from all custom insights streams
# to ensure that we have valid combination of "action_breakdowns" and "breakdowns" parameters
for stream in self.get_custom_insights_streams(api, config):
try:
next(read_full_refresh(stream), None)
except facebook_business.exceptions.FacebookRequestError as e:
return False, e._api_error_message
return True, None

def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
"""Discovery method, returns available streams
Expand Down Expand Up @@ -149,7 +158,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
),
]

return self._update_insights_streams(insights=config.custom_insights, default_args=insights_args, streams=streams)
return streams + self.get_custom_insights_streams(api, config)

def spec(self, *args, **kwargs) -> ConnectorSpecification:
"""Returns the spec for this integration.
Expand All @@ -170,28 +179,20 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification:
),
)

def _update_insights_streams(self, insights: List[InsightConfig], default_args, streams) -> List[Type[Stream]]:
"""Update method, if insights have values returns streams replacing the
default insights streams else returns streams
"""
if not insights:
return streams

insights_custom_streams = list()

for insight in insights:
args = dict(
api=default_args["api"],
def get_custom_insights_streams(self, api: API, config: ConnectorConfig) -> List[Type[Stream]]:
"""return custom insights streams"""
streams = []
for insight in config.custom_insights or []:
stream = AdsInsights(
api=api,
name=f"Custom{insight.name}",
fields=list(set(insight.fields)),
breakdowns=list(set(insight.breakdowns)),
action_breakdowns=list(set(insight.action_breakdowns)),
time_increment=insight.time_increment,
start_date=insight.start_date or default_args["start_date"],
end_date=insight.end_date or default_args["end_date"],
insights_lookback_window=insight.insights_lookback_window or default_args["insights_lookback_window"],
start_date=insight.start_date or config.start_date,
end_date=insight.end_date or config.end_date,
insights_lookback_window=insight.insights_lookback_window or config.insights_lookback_window,
)
insight_stream = AdsInsights(**args)
insights_custom_streams.append(insight_stream)

return streams + insights_custom_streams
streams.append(stream)
return streams
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import logging

import pendulum
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from pendulum import DateTime

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -40,3 +42,11 @@ def validate_end_date(start_date: DateTime, end_date: DateTime) -> DateTime:
logger.warning(message)
return start_date
return end_date


def read_full_refresh(stream_instance: Stream):
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for record in records:
yield record
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,12 @@ def test_spec(self):

assert isinstance(spec, ConnectorSpecification)

def test_update_insights_streams(self, api, config):
def test_get_custom_insights_streams(self, api, config):
config["custom_insights"] = [
{"name": "test", "fields": ["account_id"], "breakdowns": ["ad_format_asset"], "action_breakdowns": ["action_device"]},
]
streams = SourceFacebookMarketing().streams(config)
config = ConnectorConfig.parse_obj(config)
insights_args = dict(
api=api,
start_date=config.start_date,
end_date=config.end_date,
)
assert SourceFacebookMarketing()._update_insights_streams(
insights=config.custom_insights, default_args=insights_args, streams=streams
)
assert SourceFacebookMarketing().get_custom_insights_streams(api, config)


def test_check_config(config_gen, requests_mock):
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Please be informed that the connector uses the `lookback_window` parameter to pe

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.73 | 2022-11-21 | [19645](https://github.com/airbytehq/airbyte/pull/19645) | Check "breakdowns" combinations |
| 0.2.72 | 2022-11-04 | [18971](https://github.com/airbytehq/airbyte/pull/18971) | handle FacebookBadObjectError for empty results on async jobs |
| 0.2.71 | 2022-10-31 | [18734](https://github.com/airbytehq/airbyte/pull/18734) | Reduce request record limit on retry |
| 0.2.70 | 2022-10-26 | [18045](https://github.com/airbytehq/airbyte/pull/18045) | Upgrade FB SDK to v15.0 |
Expand Down

0 comments on commit a6fb52e

Please sign in to comment.