Skip to content

Commit

Permalink
🐛 Source File: add retry logic for Connection reset by peer - 104 e…
Browse files Browse the repository at this point in the history
…rror (#18428)
  • Loading branch information
bazarnov authored and nataly committed Nov 3, 2022
1 parent 0141ed0 commit cb91fe8
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
- name: File
sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerRepository: airbyte/source-file
dockerImageTag: 0.2.27
dockerImageTag: 0.2.28
documentationUrl: https://docs.airbyte.com/integrations/sources/file
icon: file.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3234,7 +3234,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-file:0.2.27"
- dockerImage: "airbyte/source-file:0.2.28"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/file"
connectionSpecification:
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-file-secure/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM airbyte/source-file:0.2.26
FROM airbyte/source-file:0.2.28

WORKDIR /airbyte/integration_code
COPY source_file_secure ./source_file_secure
Expand All @@ -9,5 +9,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.26
LABEL io.airbyte.version=0.2.28
LABEL io.airbyte.name=airbyte/source-file-secure
11 changes: 6 additions & 5 deletions airbyte-integrations/connectors/source-file-secure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ and place them into `secrets/config.json`.

### Locally running the connector
```
python main_dev.py spec
python main_dev.py check --config secrets/config.json
python main_dev.py discover --config secrets/config.json
python main_dev.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
```

### Unit Tests
Expand All @@ -59,7 +59,8 @@ python -m pytest unit_tests
#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/source-file-secure:dev
docker build . -t airbyte/source-file-secure:dev \
&& python -m pytest -p source_acceptance_test.plugin
```

You can also build the connector image via Gradle:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.27
LABEL io.airbyte.version=0.2.28
LABEL io.airbyte.name=airbyte/source-file
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"pyxlsb==1.0.9",
]

TEST_REQUIREMENTS = ["pytest~=6.2", "pytest-docker==1.0.0", "pytest-mock~=3.6.1"]
TEST_REQUIREMENTS = ["pytest~=6.2", "pytest-docker~=1.0.0", "pytest-mock~=3.6.1"]

setup(
name="source_file",
Expand Down
38 changes: 23 additions & 15 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Iterable
from urllib.parse import urlparse

import backoff
import boto3
import botocore
import google
Expand All @@ -24,6 +25,8 @@
from google.oauth2 import service_account
from yaml import safe_load

from .utils import backoff_handler


class ConfigurationError(Exception):
"""Client mis-configured"""
Expand Down Expand Up @@ -351,25 +354,30 @@ def dtype_to_json_type(current_type: str, dtype) -> str:
def reader(self) -> reader_class:
return self.reader_class(url=self._url, provider=self._provider, binary=self.binary_source, encoding=self.encoding)

@backoff.on_exception(backoff.expo, ConnectionResetError, on_backoff=backoff_handler, max_tries=5, max_time=60)
def read(self, fields: Iterable = None) -> Iterable[dict]:
"""Read data from the stream"""
with self.reader.open() as fp:
if self._reader_format in ["json", "jsonl"]:
yield from self.load_nested_json(fp)
elif self._reader_format == "yaml":
fields = set(fields) if fields else None
df = self.load_yaml(fp)
columns = fields.intersection(set(df.columns)) if fields else df.columns
df = df.where(pd.notnull(df), None)
yield from df[columns].to_dict(orient="records")
else:
fields = set(fields) if fields else None
if self.binary_source:
fp = self._cache_stream(fp)
for df in self.load_dataframes(fp):
try:
if self._reader_format in ["json", "jsonl"]:
yield from self.load_nested_json(fp)
elif self._reader_format == "yaml":
fields = set(fields) if fields else None
df = self.load_yaml(fp)
columns = fields.intersection(set(df.columns)) if fields else df.columns
df.replace({np.nan: None}, inplace=True)
yield from df[list(columns)].to_dict(orient="records")
df = df.where(pd.notnull(df), None)
yield from df[columns].to_dict(orient="records")
else:
fields = set(fields) if fields else None
if self.binary_source:
fp = self._cache_stream(fp)
for df in self.load_dataframes(fp):
columns = fields.intersection(set(df.columns)) if fields else df.columns
df.replace({np.nan: None}, inplace=True)
yield from df[list(columns)].to_dict(orient="records")
except ConnectionResetError:
logger.info(f"Catched `connection reset error - 104`, stream: {self.stream_name} ({self.reader.full_url})")
raise ConnectionResetError

def _cache_stream(self, fp):
"""cache stream to file"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import logging
from urllib.parse import parse_qs, urlencode, urlparse

# default logger
logger = logging.getLogger("airbyte")


def dropbox_force_download(url):
"""
Expand All @@ -17,3 +20,7 @@ def dropbox_force_download(url):
qs["dl"] = "1"
parse_result = parse_result._replace(query=urlencode(qs))
return parse_result.geturl()


def backoff_handler(details):
logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying...")
14 changes: 14 additions & 0 deletions airbyte-integrations/connectors/source-file/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,17 @@ def absolute_path():
@pytest.fixture
def test_files():
return "../integration_tests/sample_files"


@pytest.fixture
def test_read_config():
return {
"dataset_name": "integrationTestFile",
"format": "csv",
"url": "https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv",
"provider": {
"storage": "HTTPS",
"reader_impl": "gcsfs",
"user_agent": False,
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#


from unittest.mock import patch

import pytest
from pandas import read_csv, read_excel
from source_file.client import Client, ConfigurationError, URLFile
Expand Down Expand Up @@ -134,3 +136,14 @@ def test_open_gcs_url():
provider.update({"service_account_json": '{service_account_json": "service_account_json"}'})
with pytest.raises(ConfigurationError):
assert URLFile(url="", provider=provider)._open_gcs_url()


def test_read(test_read_config):
client = Client(**test_read_config)
client.sleep_on_retry_sec = 0 # just for test
with patch.object(client, "load_dataframes", side_effect=ConnectionResetError) as mock_method:
try:
return client.read(["date", "key"])
except ConnectionResetError:
print("Exception has been raised correctly!")
mock_method.assert_called()
3 changes: 2 additions & 1 deletion docs/integrations/sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ In order to read large files from a remote location, this connector uses the [sm
## Changelog

| Version | Date | Pull Request | Subject |
|---------|------------| -------------------------------------------------------- |----------------------------------------------------------|
| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------- |
| 0.2.28 | 2022-10-27 | [18428](https://github.com/airbytehq/airbyte/pull/18428) | Added retry logic for `Connection reset error - 104` |
| 0.2.27 | 2022-10-26 | [18481](https://github.com/airbytehq/airbyte/pull/18481) | Fix check for wrong format |
| 0.2.26 | 2022-10-18 | [18116](https://github.com/airbytehq/airbyte/pull/18116) | Transform Dropbox shared link |
| 0.2.25 | 2022-10-14 | [17994](https://github.com/airbytehq/airbyte/pull/17994) | Handle `UnicodeDecodeError` during discover step. |
Expand Down

0 comments on commit cb91fe8

Please sign in to comment.