From a886ace52f219a1f41237eed8f8e0833b04ab184 Mon Sep 17 00:00:00 2001 From: Julien COUTAND Date: Thu, 15 Feb 2024 18:33:19 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Source=20File:=20support=20ZIP=20fi?= =?UTF-8?q?le=20(#32354)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Co-authored-by: Serhii Lazebnyi --- .../client_storage_providers_test.py | 1 + .../sample_files/test.csv.zip | Bin 0 -> 213 bytes .../connectors/source-file/metadata.yaml | 2 +- .../source-file/source_file/client.py | 22 ++++++++++++-- .../source-file/unit_tests/test_client.py | 5 ++++ docs/integrations/sources/file.md | 27 +++++++++--------- 6 files changed, 41 insertions(+), 16 deletions(-) create mode 100644 airbyte-integrations/connectors/source-file/integration_tests/sample_files/test.csv.zip diff --git a/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py b/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py index a1d70ec923b5..758c1118eae6 100644 --- a/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py +++ b/airbyte-integrations/connectors/source-file/integration_tests/client_storage_providers_test.py @@ -25,6 +25,7 @@ def check_read(config, expected_columns=10, expected_rows=42): ("ssh", "files/test.csv", "csv"), ("scp", "files/test.csv", "csv"), ("sftp", "files/test.csv", "csv"), + ("ssh", "files/test.csv.zip", "csv"), ("ssh", "files/test.csv.gz", "csv"), # text in binary ("ssh", "files/test.pkl", "pickle"), # binary ("sftp", "files/test.pkl.gz", "pickle"), # binary in binary diff --git a/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test.csv.zip b/airbyte-integrations/connectors/source-file/integration_tests/sample_files/test.csv.zip new file mode 100644 index 0000000000000000000000000000000000000000..0c388a8f2b47b738ca10e797fef855c41da19728 GIT binary patch literal 213 zcmWIWW@Zs#U|`^2;4H}u&rA3*TOY`C1dGTpl%y7y=p`4Ig@$l4FelyjNd@833T_5Q zmKV$n3}E84hp&&PuF;i~442&kZb^8}E||I`Y|0CXR}wEIUrrI3y3FavF^7GPeo;w& zg<9ea0p5&Ea?H4_lmJ@Hz`zK^OBz8egacS14nVU!z?+o~q>d2?{eW~Kh{FH?lG8N- literal 0 HcmV?d00001 diff --git a/airbyte-integrations/connectors/source-file/metadata.yaml b/airbyte-integrations/connectors/source-file/metadata.yaml index bc930144d4ba..3f58d1298337 100644 --- a/airbyte-integrations/connectors/source-file/metadata.yaml +++ b/airbyte-integrations/connectors/source-file/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: file connectorType: source definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77 - dockerImageTag: 0.3.17 + dockerImageTag: 0.4.0 dockerRepository: airbyte/source-file documentationUrl: https://docs.airbyte.com/integrations/sources/file githubIssueLabel: source-file diff --git a/airbyte-integrations/connectors/source-file/source_file/client.py b/airbyte-integrations/connectors/source-file/source_file/client.py index 6fbb72ebc8b9..8aa587106684 100644 --- a/airbyte-integrations/connectors/source-file/source_file/client.py +++ b/airbyte-integrations/connectors/source-file/source_file/client.py @@ -5,10 +5,12 @@ import json import logging +import os import sys import tempfile import traceback import urllib +import zipfile from os import environ from typing import Iterable from urllib.parse import urlparse @@ -261,7 +263,8 @@ def __init__(self, dataset_name: str, url: str, provider: dict, format: str = No self._provider = provider self._reader_format = format or "csv" self._reader_options = reader_options or {} - self.binary_source = self._reader_format in self.binary_formats + self._is_zip = url.endswith(".zip") + self.binary_source = self._reader_format in self.binary_formats or self._is_zip self.encoding = self._reader_options.get("encoding") @property @@ -422,6 +425,8 @@ def read(self, fields: Iterable = None) -> Iterable[dict]: fields = set(fields) if fields else None if self.binary_source: fp = self._cache_stream(fp) + if self._is_zip: + fp = self._unzip(fp) for df in self.load_dataframes(fp): columns = fields.intersection(set(df.columns)) if fields else df.columns df.replace({np.nan: None}, inplace=True) @@ -436,9 +441,20 @@ def read(self, fields: Iterable = None) -> Iterable[dict]: logger.error(f"{error_msg}\n{traceback.format_exc()}") raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err + def _unzip(self, fp): + tmp_dir = tempfile.TemporaryDirectory() + with zipfile.ZipFile(str(fp.name), "r") as zip_ref: + zip_ref.extractall(tmp_dir.name) + + logger.info("Temp dir content: " + str(os.listdir(tmp_dir.name))) + final_file: str = os.path.join(tmp_dir.name, os.listdir(tmp_dir.name)[0]) + logger.info("Pick up first file: " + final_file) + fp_tmp = open(final_file, "r") + return fp_tmp + def _cache_stream(self, fp): """cache stream to file""" - fp_tmp = tempfile.TemporaryFile(mode="w+b") + fp_tmp = tempfile.NamedTemporaryFile(mode="w+b") fp_tmp.write(fp.read()) fp_tmp.seek(0) fp.close() @@ -454,6 +470,8 @@ def _stream_properties(self, fp, empty_schema: bool = False, read_sample_chunk: else: if self.binary_source: fp = self._cache_stream(fp) + if self._is_zip: + fp = self._unzip(fp) df_list = self.load_dataframes(fp, skip_data=empty_schema, read_sample_chunk=read_sample_chunk) fields = {} for df in df_list: diff --git a/airbyte-integrations/connectors/source-file/unit_tests/test_client.py b/airbyte-integrations/connectors/source-file/unit_tests/test_client.py index 2c41ca37ce1a..71d745c1ef74 100644 --- a/airbyte-integrations/connectors/source-file/unit_tests/test_client.py +++ b/airbyte-integrations/connectors/source-file/unit_tests/test_client.py @@ -128,6 +128,11 @@ def test_cache_stream(client, absolute_path, test_files): f = f"{absolute_path}/{test_files}/test.csv" with open(f, mode="rb") as file: assert client._cache_stream(file) + +def test_unzip_stream(client, absolute_path, test_files): + f = f"{absolute_path}/{test_files}/test.csv.zip" + with open(f, mode="rb") as file: + assert client._unzip(file) def test_open_aws_url(): diff --git a/docs/integrations/sources/file.md b/docs/integrations/sources/file.md index ed7509055dcf..4d4a66eac643 100644 --- a/docs/integrations/sources/file.md +++ b/docs/integrations/sources/file.md @@ -126,7 +126,7 @@ This connector does not support syncing unstructured data files such as raw text ## Supported sync modes | Feature | Supported? | -| ---------------------------------------- | ---------- | +|------------------------------------------|------------| | Full Refresh Sync | Yes | | Incremental Sync | No | | Replicate Incremental Deletes | No | @@ -140,9 +140,9 @@ This source produces a single table for the target file as it replicates only on ## File / Stream Compression | Compression | Supported? | -| ----------- | ---------- | +|-------------|------------| | Gzip | Yes | -| Zip | No | +| Zip | Yes | | Bzip2 | No | | Lzma | No | | Xz | No | @@ -151,7 +151,7 @@ This source produces a single table for the target file as it replicates only on ## Storage Providers | Storage Providers | Supported? | -| ---------------------- | ----------------------------------------------- | +|------------------------|-------------------------------------------------| | HTTPS | Yes | | Google Cloud Storage | Yes | | Amazon Web Services S3 | Yes | @@ -162,7 +162,7 @@ This source produces a single table for the target file as it replicates only on ### File Formats | Format | Supported? | -| --------------------- | ---------- | +|-----------------------|------------| | CSV | Yes | | JSON/JSONL | Yes | | HTML | No | @@ -183,24 +183,24 @@ Normally, Airbyte tries to infer the data type from the source, but you can use Here are a list of examples of possible file inputs: -| Dataset Name | Storage | URL | Reader Impl | Service Account | Description | -| ----------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------ | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| epidemiology | HTTPS | [https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv](https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv) | | | [COVID-19 Public dataset](https://console.cloud.google.com/marketplace/product/bigquery-public-datasets/covid19-public-data-program?filter=solution-type:dataset&id=7d6cc408-53c8-4485-a187-b8cb9a5c0b56) on BigQuery | +| Dataset Name | Storage | URL | Reader Impl | Service Account | Description | +|-------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| epidemiology | HTTPS | [https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv](https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv) | | | [COVID-19 Public dataset](https://console.cloud.google.com/marketplace/product/bigquery-public-datasets/covid19-public-data-program?filter=solution-type:dataset&id=7d6cc408-53c8-4485-a187-b8cb9a5c0b56) on BigQuery | | hr_and_financials | GCS | gs://airbyte-vault/financial.csv | smart_open or gcfs | `{"type": "service_account", "private_key_id": "XXXXXXXX", ...}` | data from a private bucket, a service account is necessary | -| landsat_index | GCS | gcp-public-data-landsat/index.csv.gz | smart_open | | Using smart_open, we don't need to specify the compression (note the gs:// is optional too, same for other providers) | +| landsat_index | GCS | gcp-public-data-landsat/index.csv.gz | smart_open | | Using smart_open, we don't need to specify the compression (note the gs:// is optional too, same for other providers) | Examples with reader options: -| Dataset Name | Storage | URL | Reader Impl | Reader Options | Description | -| ------------- | ------- | ----------------------------------------------- | ----------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | +| Dataset Name | Storage | URL | Reader Impl | Reader Options | Description | +|---------------|---------|-------------------------------------------------|-------------|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------| | landsat_index | GCS | gs://gcp-public-data-landsat/index.csv.gz | GCFS | `{"compression": "gzip"}` | Additional reader options to specify a compression option to `read_csv` | | GDELT | S3 | s3://gdelt-open-data/events/20190914.export.csv | | `{"sep": "\t", "header": null}` | Here is TSV data separated by tabs without header row from [AWS Open Data](https://registry.opendata.aws/gdelt/) | | server_logs | local | /local/logs.log | | `{"sep": ";"}` | After making sure a local text file exists at `/tmp/airbyte_local/logs.log` with logs file from some server that are delimited by ';' delimiters | Example for SFTP: -| Dataset Name | Storage | User | Password | Host | URL | Reader Options | Description | -| ------------ | ------- | ---- | -------- | --------------- | ----------------------- | ----------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------- | +| Dataset Name | Storage | User | Password | Host | URL | Reader Options | Description | +|--------------|---------|------|----------|-----------------|-------------------------|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------| | Test Rebext | SFTP | demo | password | test.rebext.net | /pub/example/readme.txt | `{"sep": "\r\n", "header": null, "names": \["text"], "engine": "python"}` | We use `python` engine for `read_csv` in order to handle delimiter of more than 1 character while providing our own column names. | Please see (or add) more at `airbyte-integrations/connectors/source-file/integration_tests/integration_source_test.py` for further usages examples. @@ -217,6 +217,7 @@ In order to read large files from a remote location, this connector uses the [sm | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------| +| 0.4.0 | 2024-02-15 | [32354](https://github.com/airbytehq/airbyte/pull/32354) | Add Zip File Support | | 0.3.17 | 2024-02-13 | [34678](https://github.com/airbytehq/airbyte/pull/34678) | Add Fixed-Width File Support | | 0.3.16 | 2024-02-12 | [35186](https://github.com/airbytehq/airbyte/pull/35186) | Manage dependencies with Poetry | | 0.3.15 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Upgrade to airbyte/python-connector-base:1.0.1 |