Skip to content

Commit

Permalink
Source Iterable: distinguish 401 from empty stream (#18537)
Browse files Browse the repository at this point in the history
* #829 source iterable: distinguish 401 from empty stream

* #829 source iterable: upd changelog

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Oct 27, 2022
1 parent 2ff16df commit d1fb5a2
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.20
dockerImageTag: 0.1.21
documentationUrl: https://docs.airbyte.com/integrations/sources/iterable
icon: iterable.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5303,7 +5303,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-iterable:0.1.20"
- dockerImage: "airbyte/source-iterable:0.1.21"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/iterable"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.version=0.1.21
LABEL io.airbyte.name=airbyte/source-iterable
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

from typing import Any, List, Mapping, Tuple

import requests.exceptions
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

from .streams import (
AccessCheck,
Campaigns,
CampaignsMetrics,
Channels,
Expand Down Expand Up @@ -75,6 +77,18 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
return False, f"Unable to connect to Iterable API with the provided credentials - {e}"

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def all_streams_accessible():
access_check_stream = AccessCheck(authenticator=authenticator)
slice_ = next(iter(access_check_stream.stream_slices(sync_mode=SyncMode.full_refresh)))
try:
list(access_check_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh))
except requests.exceptions.RequestException as e:
if e.response.status_code == requests.codes.UNAUTHORIZED:
return False
raise
else:
return True

authenticator = TokenAuthenticator(token=config["api_key"], auth_header="Api-Key", auth_method="")
# end date is provided for integration tests only
start_date, end_date = config["start_date"], config.get("end_date")
Expand All @@ -95,13 +109,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# A simple check is done - a read operation on a stream that can be accessed only via a Server side API key.
# If read is successful - other streams should be supported as well.
# More on this - https://support.iterable.com/hc/en-us/articles/360043464871-API-Keys-
users_stream = ListUsers(authenticator=authenticator)
for slice_ in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
users = users_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh)
# first slice is enough
break

if next(users, None):
if all_streams_accessible():
streams.extend(
[
Users(authenticator=authenticator, **date_range),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,3 +668,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
class Users(IterableExportStreamRanged):
data_field = "user"
cursor_field = "profileUpdatedAt"


class AccessCheck(ListUsers):
# since 401 error is failed silently in all the streams,
# we need another class to distinguish an empty stream from 401 response
def check_unauthorized_key(self, response: requests.Response) -> bool:
# this allows not retrying 401 and raising the error upstream
return response.status_code != codes.UNAUTHORIZED
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@


@responses.activate
@pytest.mark.parametrize("body, status, expected_streams", (({}, 401, 7), ({"lists": [{"id": 1}]}, 200, 44)))
@pytest.mark.parametrize("body, status, expected_streams", ((b"", 401, 7), (b"", 200, 44), (b"alpha@gmail.com\nbeta@gmail.com", 200, 44)))
def test_source_streams(mock_lists_resp, config, body, status, expected_streams):
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json=body, status=status)
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", body=body, status=status)
streams = SourceIterable().streams(config=config)
assert len(streams) == expected_streams

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------|
| 0.1.21 | 2022-10-27 | [18537](https://github.com/airbytehq/airbyte/pull/18537) | Improve streams discovery |
| 0.1.20 | 2022-10-21 | [18292](https://github.com/airbytehq/airbyte/pull/18292) | Better processing of 401 and 429 errors |
| 0.1.19 | 2022-10-05 | [17602](https://github.com/airbytehq/airbyte/pull/17602) | Add check for stream permissions |
| 0.1.18 | 2022-10-04 | [17573](https://github.com/airbytehq/airbyte/pull/17573) | Limit time range for SATs |
Expand Down

0 comments on commit d1fb5a2

Please sign in to comment.