Skip to content

Commit

Permalink
Source Iterable - better processing of 401 and 429 errors (#18292)
Browse files Browse the repository at this point in the history
* #829 source iterable - better processing of 401 and 429 errors

* source iterable: upd changelog

* #829 source iterable: fix unit tests

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Oct 25, 2022
1 parent e933de0 commit 848046a
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.19
dockerImageTag: 0.1.20
documentationUrl: https://docs.airbyte.com/integrations/sources/iterable
icon: iterable.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5192,7 +5192,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-iterable:0.1.19"
- dockerImage: "airbyte/source-iterable:0.1.20"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/iterable"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/source-iterable
Original file line number Diff line number Diff line change
Expand Up @@ -79,49 +79,68 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# end date is provided for integration tests only
start_date, end_date = config["start_date"], config.get("end_date")
date_range = {"start_date": start_date, "end_date": end_date}
return [
streams = [
Campaigns(authenticator=authenticator),
CampaignsMetrics(authenticator=authenticator, **date_range),
Channels(authenticator=authenticator),
EmailBounce(authenticator=authenticator, **date_range),
EmailClick(authenticator=authenticator, **date_range),
EmailComplaint(authenticator=authenticator, **date_range),
EmailOpen(authenticator=authenticator, **date_range),
EmailSend(authenticator=authenticator, **date_range),
EmailSendSkip(authenticator=authenticator, **date_range),
EmailSubscribe(authenticator=authenticator, **date_range),
EmailUnsubscribe(authenticator=authenticator, **date_range),
PushSend(authenticator=authenticator, **date_range),
PushSendSkip(authenticator=authenticator, **date_range),
PushOpen(authenticator=authenticator, **date_range),
PushUninstall(authenticator=authenticator, **date_range),
PushBounce(authenticator=authenticator, **date_range),
WebPushSend(authenticator=authenticator, **date_range),
WebPushClick(authenticator=authenticator, **date_range),
WebPushSendSkip(authenticator=authenticator, **date_range),
InAppSend(authenticator=authenticator, **date_range),
InAppOpen(authenticator=authenticator, **date_range),
InAppClick(authenticator=authenticator, **date_range),
InAppClose(authenticator=authenticator, **date_range),
InAppDelete(authenticator=authenticator, **date_range),
InAppDelivery(authenticator=authenticator, **date_range),
InAppSendSkip(authenticator=authenticator, **date_range),
InboxSession(authenticator=authenticator, **date_range),
InboxMessageImpression(authenticator=authenticator, **date_range),
SmsSend(authenticator=authenticator, **date_range),
SmsBounce(authenticator=authenticator, **date_range),
SmsClick(authenticator=authenticator, **date_range),
SmsReceived(authenticator=authenticator, **date_range),
SmsSendSkip(authenticator=authenticator, **date_range),
SmsUsageInfo(authenticator=authenticator, **date_range),
Purchase(authenticator=authenticator, **date_range),
CustomEvent(authenticator=authenticator, **date_range),
HostedUnsubscribeClick(authenticator=authenticator, **date_range),
Events(authenticator=authenticator),
Lists(authenticator=authenticator),
ListUsers(authenticator=authenticator),
MessageTypes(authenticator=authenticator),
Metadata(authenticator=authenticator),
Templates(authenticator=authenticator, **date_range),
Users(authenticator=authenticator, **date_range),
]
# Iterable supports two types of Server-side api keys:
# - read only
# - server side
# The first one has a limited set of supported APIs, so others are filtered out here.
# A simple check is done - a read operation on a stream that can be accessed only via a Server side API key.
# If read is successful - other streams should be supported as well.
# More on this - https://support.iterable.com/hc/en-us/articles/360043464871-API-Keys-
users_stream = ListUsers(authenticator=authenticator)
for slice_ in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
users = users_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh)
# first slice is enough
break

if next(users, None):
streams.extend(
[
Users(authenticator=authenticator, **date_range),
ListUsers(authenticator=authenticator),
EmailBounce(authenticator=authenticator, **date_range),
EmailClick(authenticator=authenticator, **date_range),
EmailComplaint(authenticator=authenticator, **date_range),
EmailOpen(authenticator=authenticator, **date_range),
EmailSend(authenticator=authenticator, **date_range),
EmailSendSkip(authenticator=authenticator, **date_range),
EmailSubscribe(authenticator=authenticator, **date_range),
EmailUnsubscribe(authenticator=authenticator, **date_range),
PushSend(authenticator=authenticator, **date_range),
PushSendSkip(authenticator=authenticator, **date_range),
PushOpen(authenticator=authenticator, **date_range),
PushUninstall(authenticator=authenticator, **date_range),
PushBounce(authenticator=authenticator, **date_range),
WebPushSend(authenticator=authenticator, **date_range),
WebPushClick(authenticator=authenticator, **date_range),
WebPushSendSkip(authenticator=authenticator, **date_range),
InAppSend(authenticator=authenticator, **date_range),
InAppOpen(authenticator=authenticator, **date_range),
InAppClick(authenticator=authenticator, **date_range),
InAppClose(authenticator=authenticator, **date_range),
InAppDelete(authenticator=authenticator, **date_range),
InAppDelivery(authenticator=authenticator, **date_range),
InAppSendSkip(authenticator=authenticator, **date_range),
InboxSession(authenticator=authenticator, **date_range),
InboxMessageImpression(authenticator=authenticator, **date_range),
SmsSend(authenticator=authenticator, **date_range),
SmsBounce(authenticator=authenticator, **date_range),
SmsClick(authenticator=authenticator, **date_range),
SmsReceived(authenticator=authenticator, **date_range),
SmsSendSkip(authenticator=authenticator, **date_range),
SmsUsageInfo(authenticator=authenticator, **date_range),
Purchase(authenticator=authenticator, **date_range),
CustomEvent(authenticator=authenticator, **date_range),
HostedUnsubscribeClick(authenticator=authenticator, **date_range),
Events(authenticator=authenticator),
]
)
return streams
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

class IterableStream(HttpStream, ABC):
raise_on_http_errors = True

# in case we get a 401 error (api token disabled or deleted) on a stream slice, do not make further requests within the current stream
# to prevent 429 error on other streams
ignore_further_slices = False
# Hardcode the value because it is not returned from the API
BACKOFF_TIME_CONSTANT = 10.0
# define date-time fields with potential wrong format
Expand All @@ -48,6 +50,7 @@ def data_field(self) -> str:
def check_unauthorized_key(self, response: requests.Response) -> bool:
if response.status_code == codes.UNAUTHORIZED:
self.logger.warn(f"Provided API Key has not sufficient permissions to read from stream: {self.data_field}")
self.ignore_further_slices = True
setattr(self, "raise_on_http_errors", False)
return False
return True
Expand All @@ -63,7 +66,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
return []
response_json = response.json()
records = response_json.get(self.data_field, [])

Expand All @@ -73,8 +76,18 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
def should_retry(self, response: requests.Response) -> bool:
if not self.check_unauthorized_key(response):
return False
else:
return super().should_retry(response)
return super().should_retry(response)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
if self.ignore_further_slices:
return []
yield from super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)


class IterableExportStream(IterableStream, ABC):
Expand Down Expand Up @@ -169,7 +182,7 @@ def request_params(

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return None
return []
for obj in response.iter_lines():
record = json.loads(obj)
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
Expand Down Expand Up @@ -321,7 +334,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
return []
list_id = self._get_list_id(response.url)
for user in response.iter_lines():
yield {"email": user.decode(), "listId": list_id}
Expand Down Expand Up @@ -381,7 +394,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
return []
content = response.content.decode()
records = self._parse_csv_string_to_dict(content)

Expand Down Expand Up @@ -480,7 +493,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Put the rest of the fields in the `data` subobject.
"""
if not self.check_unauthorized_key(response):
yield from []
return []
jsonl_records = StringIO(response.text)
for record in jsonl_records:
record_dict = json.loads(record)
Expand Down Expand Up @@ -643,7 +656,7 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
return []
response_json = response.json()
records = response_json.get(self.data_field, [])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def catalog(request):
return ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(name=request.param, json_schema={}),
stream=AirbyteStream(name=request.param, json_schema={}, supported_sync_modes=["full_refresh"]),
sync_mode="full_refresh",
destination_sync_mode="append",
)
Expand All @@ -22,3 +22,8 @@ def catalog(request):
@pytest.fixture(name="config")
def config_fixture():
return {"api_key": 123, "start_date": "2019-10-10T00:00:00"}


@pytest.fixture()
def mock_lists_resp(mocker):
mocker.patch("source_iterable.streams.Lists.read_records", return_value=iter([{"id": 1}, {"id": 2}]))
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def read_from_source(catalog):

@responses.activate
@pytest.mark.parametrize("catalog", (["email_send"]), indirect=True)
def test_email_stream(catalog, time_mock):
def test_email_stream(mock_lists_resp, catalog, time_mock):
DAYS_DURATION = 100
DAYS_PER_MINUTE_RATE = 8

Expand All @@ -59,12 +59,14 @@ def response_cb(req):
time_mock.tick(delta=datetime.timedelta(minutes=days / DAYS_PER_MINUTE_RATE))
return (200, {}, json.dumps({"createdAt": "2020"}))

responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json={"lists": [{"id": 1}]}, status=200)
responses.add_callback("GET", "https://api.iterable.com/api/export/data.json", callback=response_cb)

records = read_from_source(catalog)
assert records
assert sum(ranges) == DAYS_DURATION
assert len(responses.calls) == len(ranges)
# since read is called on source instance, under the hood .streams() is called which triggers one more http call
assert len(responses.calls) == len(ranges) + 1
assert ranges == [
AdjustableSliceGenerator.INITIAL_RANGE_DAYS,
*([int(DAYS_PER_MINUTE_RATE / AdjustableSliceGenerator.REQUEST_PER_MINUTE_LIMIT)] * 35),
Expand All @@ -76,17 +78,17 @@ def response_cb(req):
"catalog, days_duration, days_per_minute_rate",
[
("email_send", 10, 200),
# tests are commented because they take a lot of time for completion
# ("email_send", 100, 200000),
# ("email_send", 10000, 200000),
# ("email_click", 1000, 20),
# ("email_open", 1000, 1),
# ("email_open", 1, 1000),
# ("email_open", 0, 1000000),
("email_send", 100, 200000),
("email_send", 10000, 200000),
("email_click", 1000, 20),
("email_open", 1000, 1),
("email_open", 1, 1000),
("email_open", 0, 1000000),
],
indirect=["catalog"],
)
def test_email_stream_chunked_encoding(catalog, days_duration, days_per_minute_rate, time_mock):
def test_email_stream_chunked_encoding(mocker, mock_lists_resp, catalog, days_duration, days_per_minute_rate, time_mock):
mocker.patch("time.sleep")
time_mock.move_to(pendulum.parse(TEST_START_DATE) + pendulum.Duration(days=days_duration))

ranges: List[int] = []
Expand All @@ -104,9 +106,11 @@ def response_cb(req):
time_mock.tick(delta=datetime.timedelta(minutes=days / days_per_minute_rate))
return (200, {}, json.dumps({"createdAt": "2020"}))

responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json={"lists": [{"id": 1}]}, status=200)
responses.add_callback("GET", "https://api.iterable.com/api/export/data.json", callback=response_cb)

records = read_from_source(catalog)
assert sum(ranges) == days_duration
assert len(ranges) == len(records)
assert len(responses.calls) == 3 * len(ranges)
# since read is called on source instance, under the hood .streams() is called which triggers one more http call
assert len(responses.calls) == 3 * len(ranges) + 1
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@

from unittest.mock import MagicMock, patch

import pytest
import responses
from source_iterable.source import SourceIterable
from source_iterable.streams import Lists


def test_source_streams(config):
@responses.activate
@pytest.mark.parametrize("body, status, expected_streams", (({}, 401, 7), ({"lists": [{"id": 1}]}, 200, 44)))
def test_source_streams(mock_lists_resp, config, body, status, expected_streams):
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json=body, status=status)
streams = SourceIterable().streams(config=config)
assert len(streams) == 44
assert len(streams) == expected_streams


def test_source_check_connection_ok(config):
with patch.object(Lists, "read_records", return_value=iter([1])):
with patch.object(Lists, "read_records", return_value=iter([{"id": 1}])):
assert SourceIterable().check_connection(MagicMock(), config=config) == (True, None)


def test_source_check_connection_failed(config):
with patch.object(Lists, "read_records", return_value=0):
with patch.object(Lists, "read_records", return_value=iter([])):
assert SourceIterable().check_connection(MagicMock(), config=config)[0] is False
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
import requests
import responses
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_iterable.streams import (
Campaigns,
Expand Down Expand Up @@ -173,3 +174,16 @@ def test_get_updated_state(current_state, record_date, expected_state):
latest_record={"profileUpdatedAt": pendulum.parse(record_date)},
)
assert state == expected_state


@responses.activate
def test_stream_stops_on_401(mock_lists_resp):
# no requests should be made after getting 401 error despite the multiple slices
users_stream = ListUsers(authenticator=NoAuth())
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json={}, status=401)
slices = 0
for slice_ in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
slices += 1
_ = list(users_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh))
assert len(responses.calls) == 1
assert slices > 1
1 change: 1 addition & 0 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------|
| 0.1.20 | 2022-10-21 | [18292](https://github.com/airbytehq/airbyte/pull/18292) | Better processing of 401 and 429 errors |
| 0.1.19 | 2022-10-05 | [17602](https://github.com/airbytehq/airbyte/pull/17602) | Add check for stream permissions |
| 0.1.18 | 2022-10-04 | [17573](https://github.com/airbytehq/airbyte/pull/17573) | Limit time range for SATs |
| 0.1.17 | 2022-09-02 | [16067](https://github.com/airbytehq/airbyte/pull/16067) | added new events streams |
Expand Down

0 comments on commit 848046a

Please sign in to comment.