Skip to content

Commit

Permalink
Source Iterable: Add permission check for stream (#17602)
Browse files Browse the repository at this point in the history
* Source Iterable: Add permission check for stream

* Source Iterable: docs update

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
artem1205 and octavia-squidington-iii authored Oct 6, 2022
1 parent 559a79e commit 00d1f0d
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.18
dockerImageTag: 0.1.19
documentationUrl: https://docs.airbyte.io/integrations/sources/iterable
icon: iterable.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5002,7 +5002,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-iterable:0.1.18"
- dockerImage: "airbyte/source-iterable:0.1.19"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/iterable"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/source-iterable
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ python -m pytest unit_tests
#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/iterable:dev
docker build . -t airbyte/source-iterable:dev
```

You can also build the connector image via Gradle:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-cdk",
"pendulum~=2.1.2",
"requests~=2.25",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from pendulum.datetime import DateTime
from requests import codes
from requests.exceptions import ChunkedEncodingError
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice

Expand All @@ -24,6 +25,7 @@


class IterableStream(HttpStream, ABC):
raise_on_http_errors = True

# Hardcode the value because it is not returned from the API
BACKOFF_TIME_CONSTANT = 10.0
Expand All @@ -43,6 +45,13 @@ def data_field(self) -> str:
:return: Default field name to get data from response
"""

def check_unauthorized_key(self, response: requests.Response) -> bool:
if response.status_code == codes.UNAUTHORIZED:
self.logger.warn(f"Provided API Key has not sufficient permissions to read from stream: {self.data_field}")
setattr(self, "raise_on_http_errors", False)
return False
return True

def backoff_time(self, response: requests.Response) -> Optional[float]:
return self.BACKOFF_TIME_CONSTANT

Expand All @@ -53,12 +62,20 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return None

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
response_json = response.json()
records = response_json.get(self.data_field, [])

for record in records:
yield record

def should_retry(self, response: requests.Response) -> bool:
if not self.check_unauthorized_key(response):
return False
else:
return super().should_retry(response)


class IterableExportStream(IterableStream, ABC):
"""
Expand Down Expand Up @@ -151,6 +168,8 @@ def request_params(
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return None
for obj in response.iter_lines():
record = json.loads(obj)
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
Expand Down Expand Up @@ -301,6 +320,8 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
yield {"list_id": list_record["id"]}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
list_id = self._get_list_id(response.url)
for user in response.iter_lines():
yield {"email": user.decode(), "listId": list_id}
Expand Down Expand Up @@ -359,6 +380,8 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
yield {"campaign_ids": campaign_ids}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
content = response.content.decode()
records = self._parse_csv_string_to_dict(content)

Expand Down Expand Up @@ -456,7 +479,8 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Put common event fields at the top level.
Put the rest of the fields in the `data` subobject.
"""

if not self.check_unauthorized_key(response):
yield from []
jsonl_records = StringIO(response.text)
for record in jsonl_records:
record_dict = json.loads(record)
Expand Down Expand Up @@ -618,6 +642,8 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg
yield from super().read_records(stream_slice=stream_slice, **kwargs)

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
response_json = response.json()
records = response_json.get(self.data_field, [])

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------|
| 0.1.19 | 2022-10-05 | [17602](https://github.com/airbytehq/airbyte/pull/17602) | Add check for stream permissions |
| 0.1.18 | 2022-10-04 | [17573](https://github.com/airbytehq/airbyte/pull/17573) | Limit time range for SATs |
| 0.1.17 | 2022-09-02 | [16067](https://github.com/airbytehq/airbyte/pull/16067) | added new events streams |
| 0.1.16 | 2022-08-15 | [15670](https://github.com/airbytehq/airbyte/pull/15670) | Api key is passed via header |
Expand Down

0 comments on commit 00d1f0d

Please sign in to comment.