Skip to content

Commit

Permalink
✨ Source File: support ZIP file (#32354)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
  • Loading branch information
3 people authored Feb 15, 2024
1 parent e3ab5b6 commit a886ace
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def check_read(config, expected_columns=10, expected_rows=42):
("ssh", "files/test.csv", "csv"),
("scp", "files/test.csv", "csv"),
("sftp", "files/test.csv", "csv"),
("ssh", "files/test.csv.zip", "csv"),
("ssh", "files/test.csv.gz", "csv"), # text in binary
("ssh", "files/test.pkl", "pickle"), # binary
("sftp", "files/test.pkl.gz", "pickle"), # binary in binary
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerImageTag: 0.3.17
dockerImageTag: 0.4.0
dockerRepository: airbyte/source-file
documentationUrl: https://docs.airbyte.com/integrations/sources/file
githubIssueLabel: source-file
Expand Down
22 changes: 20 additions & 2 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

import json
import logging
import os
import sys
import tempfile
import traceback
import urllib
import zipfile
from os import environ
from typing import Iterable
from urllib.parse import urlparse
Expand Down Expand Up @@ -261,7 +263,8 @@ def __init__(self, dataset_name: str, url: str, provider: dict, format: str = No
self._provider = provider
self._reader_format = format or "csv"
self._reader_options = reader_options or {}
self.binary_source = self._reader_format in self.binary_formats
self._is_zip = url.endswith(".zip")
self.binary_source = self._reader_format in self.binary_formats or self._is_zip
self.encoding = self._reader_options.get("encoding")

@property
Expand Down Expand Up @@ -422,6 +425,8 @@ def read(self, fields: Iterable = None) -> Iterable[dict]:
fields = set(fields) if fields else None
if self.binary_source:
fp = self._cache_stream(fp)
if self._is_zip:
fp = self._unzip(fp)
for df in self.load_dataframes(fp):
columns = fields.intersection(set(df.columns)) if fields else df.columns
df.replace({np.nan: None}, inplace=True)
Expand All @@ -436,9 +441,20 @@ def read(self, fields: Iterable = None) -> Iterable[dict]:
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err

def _unzip(self, fp):
tmp_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(str(fp.name), "r") as zip_ref:
zip_ref.extractall(tmp_dir.name)

logger.info("Temp dir content: " + str(os.listdir(tmp_dir.name)))
final_file: str = os.path.join(tmp_dir.name, os.listdir(tmp_dir.name)[0])
logger.info("Pick up first file: " + final_file)
fp_tmp = open(final_file, "r")
return fp_tmp

def _cache_stream(self, fp):
"""cache stream to file"""
fp_tmp = tempfile.TemporaryFile(mode="w+b")
fp_tmp = tempfile.NamedTemporaryFile(mode="w+b")
fp_tmp.write(fp.read())
fp_tmp.seek(0)
fp.close()
Expand All @@ -454,6 +470,8 @@ def _stream_properties(self, fp, empty_schema: bool = False, read_sample_chunk:
else:
if self.binary_source:
fp = self._cache_stream(fp)
if self._is_zip:
fp = self._unzip(fp)
df_list = self.load_dataframes(fp, skip_data=empty_schema, read_sample_chunk=read_sample_chunk)
fields = {}
for df in df_list:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ def test_cache_stream(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/test.csv"
with open(f, mode="rb") as file:
assert client._cache_stream(file)

def test_unzip_stream(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/test.csv.zip"
with open(f, mode="rb") as file:
assert client._unzip(file)


def test_open_aws_url():
Expand Down
27 changes: 14 additions & 13 deletions docs/integrations/sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ This connector does not support syncing unstructured data files such as raw text
## Supported sync modes

| Feature | Supported? |
| ---------------------------------------- | ---------- |
|------------------------------------------|------------|
| Full Refresh Sync | Yes |
| Incremental Sync | No |
| Replicate Incremental Deletes | No |
Expand All @@ -140,9 +140,9 @@ This source produces a single table for the target file as it replicates only on
## File / Stream Compression

| Compression | Supported? |
| ----------- | ---------- |
|-------------|------------|
| Gzip | Yes |
| Zip | No |
| Zip | Yes |
| Bzip2 | No |
| Lzma | No |
| Xz | No |
Expand All @@ -151,7 +151,7 @@ This source produces a single table for the target file as it replicates only on
## Storage Providers

| Storage Providers | Supported? |
| ---------------------- | ----------------------------------------------- |
|------------------------|-------------------------------------------------|
| HTTPS | Yes |
| Google Cloud Storage | Yes |
| Amazon Web Services S3 | Yes |
Expand All @@ -162,7 +162,7 @@ This source produces a single table for the target file as it replicates only on
### File Formats

| Format | Supported? |
| --------------------- | ---------- |
|-----------------------|------------|
| CSV | Yes |
| JSON/JSONL | Yes |
| HTML | No |
Expand All @@ -183,24 +183,24 @@ Normally, Airbyte tries to infer the data type from the source, but you can use

Here are a list of examples of possible file inputs:

| Dataset Name | Storage | URL | Reader Impl | Service Account | Description |
| ----------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------ | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| epidemiology | HTTPS | [https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv](https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv) | | | [COVID-19 Public dataset](https://console.cloud.google.com/marketplace/product/bigquery-public-datasets/covid19-public-data-program?filter=solution-type:dataset&id=7d6cc408-53c8-4485-a187-b8cb9a5c0b56) on BigQuery |
| Dataset Name | Storage | URL | Reader Impl | Service Account | Description |
|-------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| epidemiology | HTTPS | [https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv](https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv) | | | [COVID-19 Public dataset](https://console.cloud.google.com/marketplace/product/bigquery-public-datasets/covid19-public-data-program?filter=solution-type:dataset&id=7d6cc408-53c8-4485-a187-b8cb9a5c0b56) on BigQuery |
| hr_and_financials | GCS | gs://airbyte-vault/financial.csv | smart_open or gcfs | `{"type": "service_account", "private_key_id": "XXXXXXXX", ...}` | data from a private bucket, a service account is necessary |
| landsat_index | GCS | gcp-public-data-landsat/index.csv.gz | smart_open | | Using smart_open, we don't need to specify the compression (note the gs:// is optional too, same for other providers) |
| landsat_index | GCS | gcp-public-data-landsat/index.csv.gz | smart_open | | Using smart_open, we don't need to specify the compression (note the gs:// is optional too, same for other providers) |

Examples with reader options:

| Dataset Name | Storage | URL | Reader Impl | Reader Options | Description |
| ------------- | ------- | ----------------------------------------------- | ----------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ |
| Dataset Name | Storage | URL | Reader Impl | Reader Options | Description |
|---------------|---------|-------------------------------------------------|-------------|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|
| landsat_index | GCS | gs://gcp-public-data-landsat/index.csv.gz | GCFS | `{"compression": "gzip"}` | Additional reader options to specify a compression option to `read_csv` |
| GDELT | S3 | s3://gdelt-open-data/events/20190914.export.csv | | `{"sep": "\t", "header": null}` | Here is TSV data separated by tabs without header row from [AWS Open Data](https://registry.opendata.aws/gdelt/) |
| server_logs | local | /local/logs.log | | `{"sep": ";"}` | After making sure a local text file exists at `/tmp/airbyte_local/logs.log` with logs file from some server that are delimited by ';' delimiters |

Example for SFTP:

| Dataset Name | Storage | User | Password | Host | URL | Reader Options | Description |
| ------------ | ------- | ---- | -------- | --------------- | ----------------------- | ----------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------- |
| Dataset Name | Storage | User | Password | Host | URL | Reader Options | Description |
|--------------|---------|------|----------|-----------------|-------------------------|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------|
| Test Rebext | SFTP | demo | password | test.rebext.net | /pub/example/readme.txt | `{"sep": "\r\n", "header": null, "names": \["text"], "engine": "python"}` | We use `python` engine for `read_csv` in order to handle delimiter of more than 1 character while providing our own column names. |

Please see (or add) more at `airbyte-integrations/connectors/source-file/integration_tests/integration_source_test.py` for further usages examples.
Expand All @@ -217,6 +217,7 @@ In order to read large files from a remote location, this connector uses the [sm

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------|
| 0.4.0 | 2024-02-15 | [32354](https://github.com/airbytehq/airbyte/pull/32354) | Add Zip File Support |
| 0.3.17 | 2024-02-13 | [34678](https://github.com/airbytehq/airbyte/pull/34678) | Add Fixed-Width File Support |
| 0.3.16 | 2024-02-12 | [35186](https://github.com/airbytehq/airbyte/pull/35186) | Manage dependencies with Poetry |
| 0.3.15 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Upgrade to airbyte/python-connector-base:1.0.1 |
Expand Down

0 comments on commit a886ace

Please sign in to comment.