Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source File: add fixed width file format support #34678

Merged
merged 28 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9498a4e
Add fwf support and add tests
mjgatz Jan 30, 2024
15f8908
update tests and docs
michael-greene-gravie Jan 30, 2024
3fd4f0d
update docs
michael-greene-gravie Jan 30, 2024
4f2569b
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
mjgatz Jan 30, 2024
dc280b7
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
mjgatz Jan 30, 2024
befacc7
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
mjgatz Jan 31, 2024
e27cc7d
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
mjgatz Jan 31, 2024
01cb024
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
mjgatz Feb 2, 2024
b231c34
Fix formating
lazebnyi Feb 6, 2024
5728f86
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
mjgatz Feb 6, 2024
a732a69
Update file.md to reflect version bump
mjgatz Feb 6, 2024
8c9d04a
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
lazebnyi Feb 9, 2024
5644a54
Re-run CI
lazebnyi Feb 9, 2024
cdce40b
Fix unttests
lazebnyi Feb 9, 2024
5a0b4ad
Fix changelog
lazebnyi Feb 9, 2024
c31f689
Re-run CI
lazebnyi Feb 9, 2024
f616cf6
Add unittests and fix excel_binary load
lazebnyi Feb 9, 2024
9300afc
Fix formatting
lazebnyi Feb 9, 2024
fbc054f
Re-run CI
lazebnyi Feb 9, 2024
9f916ca
Add STARTED, RUNNING and COMPLETE status
lazebnyi Feb 9, 2024
08f45cb
Fix formtating
lazebnyi Feb 9, 2024
4493ee6
Fix unittests
lazebnyi Feb 10, 2024
f463770
Merge master to branch
lazebnyi Feb 10, 2024
3c8c109
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
mjgatz Feb 10, 2024
6d2ef9c
Merge branch 'master' of https://github.com/airbytehq/airbyte into mj…
mjgatz Feb 13, 2024
d77a041
Merge branch 'master' into mjgatz/34585-fwf-format-support-and-tests
lazebnyi Feb 13, 2024
c481a61
Bump version
lazebnyi Feb 13, 2024
b5c8f53
Merge master to branch
lazebnyi Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-file/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
omit =
source_file/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ acceptance_tests:
extra_fields: no
exact_order: no
extra_records: yes
file_types:
skip_test: yes
bypass_reason: "Source is not based on file based CDK"
full_refresh:
tests:
- config_path: "integration_tests/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"jsonl",
"excel",
"excel_binary",
"fwf",
"feather",
"parquet",
"yaml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def check_read(config, expected_columns=10, expected_rows=42):
("jsonl", "jsonl", 2, 6492, "jsonl"),
("excel", "xls", 8, 50, "demo"),
("excel", "xlsx", 8, 50, "demo"),
("fwf", "txt", 4, 2, "demo"),
("feather", "feather", 9, 3, "demo"),
("parquet", "parquet", 9, 3, "demo"),
("yaml", "yaml", 8, 3, "demo"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"streams": [
{
"stream": {
"name": "test",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"$schema": "http://json-schema.org/schema#",
"type": "object",
"properties": {
"text": { "type": "string" },
"num": { "type": "number" },
"float": { "type": "number" },
"bool": { "type": "string" }
}
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
text num float bool
short 1 0.2 true
long_text 33 0.0 false
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerImageTag: 0.3.16
dockerImageTag: 0.3.17
dockerRepository: airbyte/source-file
documentationUrl: https://docs.airbyte.com/integrations/sources/file
githubIssueLabel: source-file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def storage_scheme(self) -> str:
"""
storage_name = self._provider["storage"].upper()
parse_result = urlparse(self._url)

if storage_name == "GCS":
return "gs://"
elif storage_name == "S3":
Expand All @@ -191,7 +192,7 @@ def storage_scheme(self) -> str:
elif parse_result.scheme:
return parse_result.scheme

logger.error(f"Unknown Storage provider in: {self.full_url}")
logger.error(f"Unknown Storage provider in: {self._url}")
return ""

def _open_gcs_url(self) -> object:
Expand Down Expand Up @@ -328,6 +329,7 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
"html": pd.read_html,
"excel": pd.read_excel,
"excel_binary": pd.read_excel,
"fwf": pd.read_fwf,
"feather": pd.read_feather,
"parquet": pd.read_parquet,
"orc": pd.read_orc,
Expand All @@ -354,9 +356,9 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
yield record
if read_sample_chunk and bytes_read >= self.CSV_CHUNK_SIZE:
return
elif self._reader_options == "excel_binary":
elif self._reader_format == "excel_binary":
reader_options["engine"] = "pyxlsb"
yield from reader(fp, **reader_options)
yield reader(fp, **reader_options)
elif self._reader_format == "excel":
# Use openpyxl to read new-style Excel (xlsx) file; return to pandas for others
try:
Expand Down Expand Up @@ -483,7 +485,7 @@ def streams(self, empty_schema: bool = False) -> Iterable:

def openpyxl_chunk_reader(self, file, **kwargs):
"""Use openpyxl lazy loading feature to read excel files (xlsx only) in chunks of 500 lines at a time"""
work_book = load_workbook(filename=file, read_only=True)
work_book = load_workbook(filename=file)
user_provided_column_names = kwargs.get("names")
for sheetname in work_book.sheetnames:
work_sheet = work_book[sheetname]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStreamStatus,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
FailureType,
Expand All @@ -24,6 +25,7 @@
)
from airbyte_cdk.sources import Source
from airbyte_cdk.utils import AirbyteTracedException, is_cloud_environment
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message

from .client import Client
from .utils import LOCAL_STORAGE_NAME, dropbox_force_download
Expand Down Expand Up @@ -61,6 +63,7 @@ class SourceFile(Source):
- read_json
- read_html
- read_excel
- read_fwf
- read_feather
- read_parquet
- read_orc
Expand Down Expand Up @@ -170,14 +173,33 @@ def read(
fields = self.selected_fields(catalog, config)
name = client.stream_name

logger.info(f"Reading {name} ({client.reader.full_url})...")
configured_stream = catalog.streams[0]

logger.info(f"Syncing stream: {name} ({client.reader.full_url})...")

yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED)

record_counter = 0
try:
for row in client.read(fields=fields):
record = AirbyteRecordMessage(stream=name, data=row, emitted_at=int(datetime.now().timestamp()) * 1000)

record_counter += 1
if record_counter == 1:
logger.info(f"Marking stream {name} as RUNNING")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)

yield AirbyteMessage(type=Type.RECORD, record=record)

logger.info(f"Marking stream {name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE)

except Exception as err:
reason = f"Failed to read data of {name} at {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
logger.error(reason)
logger.exception(f"Encountered an exception while reading stream {name}")
logger.info(f"Marking stream {name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
raise err

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"jsonl",
"excel",
"excel_binary",
"fwf",
"feather",
"parquet",
"yaml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

import pytest
from airbyte_cdk.utils import AirbyteTracedException
from pandas import read_csv, read_excel
from pandas import read_csv, read_excel, testing
from paramiko import SSHException
from source_file.client import Client, URLFile
from source_file.utils import backoff_handler
from urllib3.exceptions import ProtocolError


Expand All @@ -34,21 +35,22 @@ def csv_format_client():


@pytest.mark.parametrize(
"storage, expected_scheme",
"storage, expected_scheme, url",
[
("GCS", "gs://"),
("S3", "s3://"),
("AZBLOB", "azure://"),
("HTTPS", "https://"),
("SSH", "scp://"),
("SCP", "scp://"),
("SFTP", "sftp://"),
("WEBHDFS", "webhdfs://"),
("LOCAL", "file://"),
("GCS", "gs://", "http://localhost"),
("S3", "s3://", "http://localhost"),
("AZBLOB", "azure://", "http://localhost"),
("HTTPS", "https://", "http://localhost"),
("SSH", "scp://", "http://localhost"),
("SCP", "scp://", "http://localhost"),
("SFTP", "sftp://", "http://localhost"),
("WEBHDFS", "webhdfs://", "http://localhost"),
("LOCAL", "file://", "http://localhost"),
("WRONG", "", ""),
],
)
def test_storage_scheme(storage, expected_scheme):
urlfile = URLFile(provider={"storage": storage}, url="http://localhost")
def test_storage_scheme(storage, expected_scheme, url):
urlfile = URLFile(provider={"storage": storage}, url=url)
assert urlfile.storage_scheme == expected_scheme


Expand Down Expand Up @@ -80,8 +82,27 @@ def test_load_dataframes_xlsb(config, absolute_path, test_files):
assert read_file.equals(expected)


def test_load_nested_json(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/formats/json/demo.json"
@pytest.mark.parametrize("file_name, should_raise_error", [("test.xlsx", False), ("test_one_line.xlsx", True)])
def test_load_dataframes_xlsx(config, absolute_path, test_files, file_name, should_raise_error):
config["format"] = "excel"
client = Client(**config)
f = f"{absolute_path}/{test_files}/{file_name}"
if should_raise_error:
with pytest.raises(AirbyteTracedException):
next(client.load_dataframes(fp=f))
else:
read_file = next(client.load_dataframes(fp=f))
expected = read_excel(f, engine="openpyxl")
assert read_file.equals(expected)


@pytest.mark.parametrize("file_format, file_path", [("json", "formats/json/demo.json"),
("jsonl", "formats/jsonl/jsonl_nested.jsonl")])
def test_load_nested_json(client, config, absolute_path, test_files, file_format, file_path):
if file_format == "jsonl":
config["format"] = file_format
client = Client(**config)
f = f"{absolute_path}/{test_files}/{file_path}"
with open(f, mode="rb") as file:
assert client.load_nested_json(fp=file)

Expand Down Expand Up @@ -189,3 +210,11 @@ def patched_open(self):
assert call_count == 7

assert sleep_mock.call_count == 5


def test_backoff_handler(caplog):
details = {"tries": 1, "wait": 1}
backoff_handler(details)
expected = [('airbyte', 20, 'Caught retryable error after 1 tries. Waiting 1 seconds then retrying...')]

assert caplog.record_tuples == expected
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Type,
)
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_protocol.models.airbyte_protocol import Type as MessageType
from source_file.source import SourceFile

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -95,7 +96,8 @@ def test_nan_to_null(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]

records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"col1": "key1", "col2": 1.11, "col3": None},
{"col1": "key2", "col2": None, "col3": 2.22},
Expand All @@ -105,13 +107,14 @@ def test_nan_to_null(absolute_path, test_files):

config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == []

config.update({"provider": {"storage": "SSH", "user": "user", "host": "host"}})

with pytest.raises(Exception):
next(source.read(logger=logger, config=config, catalog=catalog))
for record in source.read(logger=logger, config=config, catalog=catalog):
pass


def test_spec(source):
Expand Down Expand Up @@ -176,7 +179,7 @@ def test_pandas_header_not_none(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"text11": "text21", "text12": "text22"},
]
Expand All @@ -195,7 +198,7 @@ def test_pandas_header_none(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"0": "text11", "1": "text12"},
{"0": "text21", "1": "text22"},
Expand Down Expand Up @@ -224,4 +227,4 @@ def test_incorrect_reader_options(absolute_path, test_files):
):
catalog = get_catalog({"0": {"type": ["string", "null"]}, "1": {"type": ["string", "null"]}})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
Loading
Loading