Skip to content

Commit

Permalink
✨ Source File: support ZIP file (airbytehq#32354)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
  • Loading branch information
3 people authored and jatinyadav-cc committed Feb 26, 2024
1 parent d40dae3 commit d870438
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def check_read(config, expected_columns=10, expected_rows=42):
("ssh", "files/test.csv", "csv"),
("scp", "files/test.csv", "csv"),
("sftp", "files/test.csv", "csv"),
("ssh", "files/test.csv.zip", "csv"),
("ssh", "files/test.csv.gz", "csv"), # text in binary
("ssh", "files/test.pkl", "pickle"), # binary
("sftp", "files/test.pkl.gz", "pickle"), # binary in binary
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerImageTag: 0.3.17
dockerImageTag: 0.4.0
dockerRepository: airbyte/source-file
documentationUrl: https://docs.airbyte.com/integrations/sources/file
githubIssueLabel: source-file
Expand Down
21 changes: 19 additions & 2 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tempfile
import traceback
import urllib
import zipfile
from os import environ, getcwd
from typing import Iterable
from urllib.parse import urlparse
Expand Down Expand Up @@ -292,7 +293,8 @@ def __init__(
self._provider = provider
self._reader_format = format or "csv"
self._reader_options = reader_options or {}
self.binary_source = self._reader_format in self.binary_formats or encryption_options
self._is_zip = url.endswith(".zip")
self.binary_source = self._reader_format in self.binary_formats or encryption_options or self._is_zip
self.encoding = self._reader_options.get("encoding")
self.encryption_options = encryption_options

Expand Down Expand Up @@ -465,6 +467,8 @@ def read(self, fields: Iterable = None) -> Iterable[dict]:
fields = frozenset(fields) if fields else None
# if self.binary_source:
# fp = self._cache_stream(fp)
# if self._is_zip:
# fp = self._unzip(fp)
for batch in self.load_dataframes(fp):
df = batch.to_pandas() if self._reader_format == "parquet" else batch
# for parquet files
Expand All @@ -491,9 +495,20 @@ def read(self, fields: Iterable = None) -> Iterable[dict]:
os.remove(file_path)
logger.info(f"The file at {file_path} has been deleted.")

def _unzip(self, fp):
tmp_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(str(fp.name), "r") as zip_ref:
zip_ref.extractall(tmp_dir.name)

logger.info("Temp dir content: " + str(os.listdir(tmp_dir.name)))
final_file: str = os.path.join(tmp_dir.name, os.listdir(tmp_dir.name)[0])
logger.info("Pick up first file: " + final_file)
fp_tmp = open(final_file, "r")
return fp_tmp

def _cache_stream(self, fp):
"""cache stream to file"""
fp_tmp = tempfile.TemporaryFile(mode="w+b")
fp_tmp = tempfile.NamedTemporaryFile(mode="w+b")
fp_tmp.write(fp.read())
fp_tmp.seek(0)
fp.close()
Expand All @@ -511,6 +526,8 @@ def _stream_properties(self, fp, empty_schema: bool = False, read_sample_chunk:
# if self.binary_source:
# fp = self._cache_stream(fp)
# logger.info("Cache stream successs")
# if self._is_zip:
# fp = self._unzip(fp)
df_list = self.load_dataframes(fp, skip_data=False)
fields = {}
for df in df_list:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ def test_cache_stream(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/test.csv"
with open(f, mode="rb") as file:
assert client._cache_stream(file)

def test_unzip_stream(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/test.csv.zip"
with open(f, mode="rb") as file:
assert client._unzip(file)


def test_open_aws_url():
Expand Down
27 changes: 14 additions & 13 deletions docs/integrations/sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ This connector does not support syncing unstructured data files such as raw text
## Supported sync modes

| Feature | Supported? |
| ---------------------------------------- | ---------- |
|------------------------------------------|------------|
| Full Refresh Sync | Yes |
| Incremental Sync | No |
| Replicate Incremental Deletes | No |
Expand All @@ -140,9 +140,9 @@ This source produces a single table for the target file as it replicates only on
## File / Stream Compression

| Compression | Supported? |
| ----------- | ---------- |
|-------------|------------|
| Gzip | Yes |
| Zip | No |
| Zip | Yes |
| Bzip2 | No |
| Lzma | No |
| Xz | No |
Expand All @@ -151,7 +151,7 @@ This source produces a single table for the target file as it replicates only on
## Storage Providers

| Storage Providers | Supported? |
| ---------------------- | ----------------------------------------------- |
|------------------------|-------------------------------------------------|
| HTTPS | Yes |
| Google Cloud Storage | Yes |
| Amazon Web Services S3 | Yes |
Expand All @@ -162,7 +162,7 @@ This source produces a single table for the target file as it replicates only on
### File Formats

| Format | Supported? |
| --------------------- | ---------- |
|-----------------------|------------|
| CSV | Yes |
| JSON/JSONL | Yes |
| HTML | No |
Expand All @@ -183,24 +183,24 @@ Normally, Airbyte tries to infer the data type from the source, but you can use

Here are a list of examples of possible file inputs:

| Dataset Name | Storage | URL | Reader Impl | Service Account | Description |
| ----------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------ | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| epidemiology | HTTPS | [https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv](https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv) | | | [COVID-19 Public dataset](https://console.cloud.google.com/marketplace/product/bigquery-public-datasets/covid19-public-data-program?filter=solution-type:dataset&id=7d6cc408-53c8-4485-a187-b8cb9a5c0b56) on BigQuery |
| Dataset Name | Storage | URL | Reader Impl | Service Account | Description |
|-------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| epidemiology | HTTPS | [https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv](https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv) | | | [COVID-19 Public dataset](https://console.cloud.google.com/marketplace/product/bigquery-public-datasets/covid19-public-data-program?filter=solution-type:dataset&id=7d6cc408-53c8-4485-a187-b8cb9a5c0b56) on BigQuery |
| hr_and_financials | GCS | gs://airbyte-vault/financial.csv | smart_open or gcfs | `{"type": "service_account", "private_key_id": "XXXXXXXX", ...}` | data from a private bucket, a service account is necessary |
| landsat_index | GCS | gcp-public-data-landsat/index.csv.gz | smart_open | | Using smart_open, we don't need to specify the compression (note the gs:// is optional too, same for other providers) |
| landsat_index | GCS | gcp-public-data-landsat/index.csv.gz | smart_open | | Using smart_open, we don't need to specify the compression (note the gs:// is optional too, same for other providers) |

Examples with reader options:

| Dataset Name | Storage | URL | Reader Impl | Reader Options | Description |
| ------------- | ------- | ----------------------------------------------- | ----------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ |
| Dataset Name | Storage | URL | Reader Impl | Reader Options | Description |
|---------------|---------|-------------------------------------------------|-------------|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|
| landsat_index | GCS | gs://gcp-public-data-landsat/index.csv.gz | GCFS | `{"compression": "gzip"}` | Additional reader options to specify a compression option to `read_csv` |
| GDELT | S3 | s3://gdelt-open-data/events/20190914.export.csv | | `{"sep": "\t", "header": null}` | Here is TSV data separated by tabs without header row from [AWS Open Data](https://registry.opendata.aws/gdelt/) |
| server_logs | local | /local/logs.log | | `{"sep": ";"}` | After making sure a local text file exists at `/tmp/airbyte_local/logs.log` with logs file from some server that are delimited by ';' delimiters |

Example for SFTP:

| Dataset Name | Storage | User | Password | Host | URL | Reader Options | Description |
| ------------ | ------- | ---- | -------- | --------------- | ----------------------- | ----------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------- |
| Dataset Name | Storage | User | Password | Host | URL | Reader Options | Description |
|--------------|---------|------|----------|-----------------|-------------------------|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------|
| Test Rebext | SFTP | demo | password | test.rebext.net | /pub/example/readme.txt | `{"sep": "\r\n", "header": null, "names": \["text"], "engine": "python"}` | We use `python` engine for `read_csv` in order to handle delimiter of more than 1 character while providing our own column names. |

Please see (or add) more at `airbyte-integrations/connectors/source-file/integration_tests/integration_source_test.py` for further usages examples.
Expand All @@ -217,6 +217,7 @@ In order to read large files from a remote location, this connector uses the [sm

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------|
| 0.4.0 | 2024-02-15 | [32354](https://github.com/airbytehq/airbyte/pull/32354) | Add Zip File Support |
| 0.3.17 | 2024-02-13 | [34678](https://github.com/airbytehq/airbyte/pull/34678) | Add Fixed-Width File Support |
| 0.3.16 | 2024-02-12 | [35186](https://github.com/airbytehq/airbyte/pull/35186) | Manage dependencies with Poetry |
| 0.3.15 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Upgrade to airbyte/python-connector-base:1.0.1 |
Expand Down

0 comments on commit d870438

Please sign in to comment.