Skip to content

Commit

Permalink
#17506 source snapchat marketing: retry failed request for refreshing…
Browse files Browse the repository at this point in the history
… access token (#17596)

* #17506 source snapchat marketing: improve error msg

* #17506 source snapchat marketing: improve error msg

* #17506 source snapchat-marketing: use CDK authenticator because the custom one is buggy

* #17506 source snapchat-marketing: retry 429 and 5xx when refreshing access token

* #17506 debug

* #17506 reformat code

* #17506 source snapchat: change max_tries to max_time

* #17506 rm debug code

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Oct 7, 2022
1 parent 84f0299 commit 6ea207d
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@
- name: Snapchat Marketing
sourceDefinitionId: 200330b2-ea62-4d11-ac6d-cfe3e3f8ab2b
dockerRepository: airbyte/source-snapchat-marketing
dockerImageTag: 0.1.6
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/sources/snapchat-marketing
icon: snapchat.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10362,7 +10362,7 @@
type: "object"
additionalProperties: false
properties: {}
- dockerImage: "airbyte/source-snapchat-marketing:0.1.6"
- dockerImage: "airbyte/source-snapchat-marketing:0.1.8"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/snapchat-marketing"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_snapchat_marketing ./source_snapchat_marketing
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-snapchat-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
timeout_seconds: 60
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
timeout_seconds: 60
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib.parse import parse_qsl, urlparse

import backoff
import pendulum
import requests
from airbyte_cdk.models import SyncMode
Expand All @@ -15,6 +17,7 @@
from airbyte_cdk.sources.streams.core import IncrementalMixin, package_name_from_class
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader

# https://marketingapi.snapchat.com/docs/#core-metrics
Expand Down Expand Up @@ -95,6 +98,9 @@
]


logger = logging.getLogger("airbyte")


class GranularityType(Enum):
HOUR = "HOUR"
DAY = "DAY"
Expand Down Expand Up @@ -722,49 +728,49 @@ class CampaignsStatsLifetime(Lifetime, Stats):
parent = Campaigns


class SnapchatAdsOauth2Authenticator(Oauth2Authenticator):
"""Request example for API token extraction:
curl -X POST https://accounts.snapchat.com/login/oauth2/access_token \
-d "refresh_token={refresh_token}" \
-d "client_id={client_id}" \
-d "client_secret={client_secret}" \
-d "grant_type=refresh_token" \
"""

def __init__(self, config):
super().__init__(
token_refresh_endpoint="https://accounts.snapchat.com/login/oauth2/access_token",
client_id=config["client_id"],
client_secret=config["client_secret"],
refresh_token=config["refresh_token"],
)

class SnapchatOauth2Authenticator(Oauth2Authenticator):
@backoff.on_exception(
backoff.expo,
DefaultBackoffException,
on_backoff=lambda details: logger.info(
f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
),
max_time=300,
)
def refresh_access_token(self) -> Tuple[str, int]:
response_json = None
"""
returns a tuple of (access_token, token_lifespan_in_seconds)
"""
try:
response = requests.request(method="POST", url=self.token_refresh_endpoint, data=self.get_refresh_request_body())
response_json = response.json()
response = requests.request(
method="POST",
url=self.token_refresh_endpoint,
data=self.get_refresh_request_body(),
headers=self.get_refresh_access_token_headers(),
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
if response_json and "error" in response_json:
raise Exception(
"Error refreshing access token. Error: {}; Error details: {}; Exception: {}".format(
response_json["error"], response_json["error_description"], e
)
) from e
raise Exception(f"Error refreshing access token: {e}") from e
else:
response_json = response.json()
return response_json["access_token"], response_json["expires_in"]
except requests.exceptions.RequestException as e:
if e.response.status_code == 429 or e.response.status_code >= 500:
raise DefaultBackoffException(request=e.response.request, response=e.response)
raise
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e


# Source
class SourceSnapchatMarketing(AbstractSource):
"""Source Snapchat Marketing helps to retrieve the different Ad data from Snapchat business account"""

def check_connection(self, logger, config) -> Tuple[bool, any]:

try:
auth = SnapchatAdsOauth2Authenticator(config)
auth = SnapchatOauth2Authenticator(
token_refresh_endpoint="https://accounts.snapchat.com/login/oauth2/access_token",
client_id=config["client_id"],
client_secret=config["client_secret"],
refresh_token=config["refresh_token"],
)
token = auth.get_access_token()
url = f"{SnapchatMarketingStream.url_base}me"

Expand All @@ -776,7 +782,6 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
return False, e

def streams(self, config: Mapping[str, Any]) -> List[Stream]:

# https://marketingapi.snapchat.com/docs/#core-metrics
# IMPORTANT: Metrics are finalized 48 hours after the end of the day in the Ad Account’s timezone.
DELAYED_DAYS = 2
Expand All @@ -786,7 +791,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# 2. when timezone is not specified, default account's timezone will be used automatically
default_end_date = pendulum.now().subtract(days=DELAYED_DAYS).to_date_string()
kwargs = {
"authenticator": SnapchatAdsOauth2Authenticator(config),
"authenticator": SnapchatOauth2Authenticator(
token_refresh_endpoint="https://accounts.snapchat.com/login/oauth2/access_token",
client_id=config["client_id"],
client_secret=config["client_secret"],
refresh_token=config["refresh_token"],
),
"start_date": config["start_date"],
"end_date": config.get("end_date", default_end_date),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
AdsStatsDaily,
AdsStatsLifetime,
Organizations,
SnapchatOauth2Authenticator,
SourceSnapchatMarketing,
)

Expand Down Expand Up @@ -369,3 +370,20 @@ def test_source_check_connection(requests_mock):

results = SourceSnapchatMarketing().check_connection(logger=None, config=source_config)
assert results == (True, None)


def test_retry_get_access_token(requests_mock):
requests_mock.register_uri(
"POST",
"https://accounts.snapchat.com/login/oauth2/access_token",
[{"status_code": 429}, {"status_code": 429}, {"status_code": 200, "json": {"access_token": "token", "expires_in": 3600}}],
)
auth = SnapchatOauth2Authenticator(
token_refresh_endpoint="https://accounts.snapchat.com/login/oauth2/access_token",
client_id="client_id",
client_secret="client_secret",
refresh_token="refresh_token",
)
token = auth.get_access_token()
assert len(requests_mock.request_history) == 3
assert token == "token"
3 changes: 2 additions & 1 deletion docs/integrations/sources/snapchat-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ Snapchat Marketing API has limitations to 1000 items per page.
## Changelog
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------- |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------|
| 0.1.8 | 2022-10-05 | [17596](https://github.com/airbytehq/airbyte/pull/17596) | Retry 429 and 5xx errors when refreshing access token |
| 0.1.6 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from specs |
| 0.1.5 | 2022-07-13 | [14577](https://github.com/airbytehq/airbyte/pull/14577) | Added stats streams hourly, daily, lifetime |
| 0.1.4 | 2021-12-07 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Update titles and descriptions |
Expand Down

0 comments on commit 6ea207d

Please sign in to comment.