Skip to content

Commit

Permalink
✨ Source File: add fixed width file format support (airbytehq#34678)
Browse files Browse the repository at this point in the history
Co-authored-by: mgreene <michael.greene@gravie.com>
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
  • Loading branch information
4 people authored and jatinyadav-cc committed Feb 26, 2024
1 parent 8f6b88b commit e485365
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 99 deletions.
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-file/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
omit =
source_file/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ acceptance_tests:
extra_fields: no
exact_order: no
extra_records: yes
file_types:
skip_test: yes
bypass_reason: "Source is not based on file based CDK"
full_refresh:
tests:
- config_path: "integration_tests/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"jsonl",
"excel",
"excel_binary",
"fwf",
"feather",
"parquet",
"yaml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def check_read(config, expected_columns=10, expected_rows=42):
("jsonl", "jsonl", 2, 6492, "jsonl"),
("excel", "xls", 8, 50, "demo"),
("excel", "xlsx", 8, 50, "demo"),
("fwf", "txt", 4, 2, "demo"),
("feather", "feather", 9, 3, "demo"),
("parquet", "parquet", 9, 3, "demo"),
("yaml", "yaml", 8, 3, "demo"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"streams": [
{
"stream": {
"name": "test",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"$schema": "http://json-schema.org/schema#",
"type": "object",
"properties": {
"text": { "type": "string" },
"num": { "type": "number" },
"float": { "type": "number" },
"bool": { "type": "string" }
}
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
text num float bool
short 1 0.2 true
long_text 33 0.0 false
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerImageTag: 0.3.16
dockerImageTag: 0.3.17
dockerRepository: airbyte/source-file
documentationUrl: https://docs.airbyte.com/integrations/sources/file
githubIssueLabel: source-file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def storage_scheme(self) -> str:
"""
storage_name = self._provider["storage"].upper()
parse_result = urlparse(self._url)

if storage_name == "GCS":
return "gs://"
elif storage_name == "S3":
Expand All @@ -213,7 +214,7 @@ def storage_scheme(self) -> str:
elif parse_result.scheme:
return parse_result.scheme

logger.error(f"Unknown Storage provider in: {self.full_url}")
logger.error(f"Unknown Storage provider in: {self._url}")
return ""

def _open_gcs_url(self) -> object:
Expand Down Expand Up @@ -358,6 +359,7 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
"html": pd.read_html,
"excel": pd.read_excel,
"excel_binary": pd.read_excel,
"fwf": pd.read_fwf,
"feather": pd.read_feather,
"parquet": pq.ParquetFile,
"orc": pd.read_orc,
Expand Down Expand Up @@ -385,9 +387,9 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
yield record
if read_sample_chunk and bytes_read >= self.CSV_CHUNK_SIZE:
return
elif self._reader_options == "excel_binary":
elif self._reader_format == "excel_binary":
reader_options["engine"] = "pyxlsb"
yield from reader(fp, **reader_options)
yield reader(fp, **reader_options)
elif self._reader_format == "excel":
# Use openpyxl to read new-style Excel (xlsx) file; return to pandas for others
try:
Expand Down Expand Up @@ -562,7 +564,7 @@ def streams(self, empty_schema: bool = False) -> Iterable:

def openpyxl_chunk_reader(self, file, **kwargs):
"""Use openpyxl lazy loading feature to read excel files (xlsx only) in chunks of 500 lines at a time"""
work_book = load_workbook(filename=file, read_only=True)
work_book = load_workbook(filename=file)
user_provided_column_names = kwargs.get("names")
for sheetname in work_book.sheetnames:
work_sheet = work_book[sheetname]
Expand Down
25 changes: 23 additions & 2 deletions airbyte-integrations/connectors/source-file/source_file/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStreamStatus,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
FailureType,
Expand All @@ -24,6 +25,7 @@
)
from airbyte_cdk.sources import Source
from airbyte_cdk.utils import AirbyteTracedException, is_cloud_environment
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message

from .client import Client, ConfigurationError
from .utils import LOCAL_STORAGE_NAME, dropbox_force_download
Expand Down Expand Up @@ -61,6 +63,7 @@ class SourceFile(Source):
- read_json
- read_html
- read_excel
- read_fwf
- read_feather
- read_parquet
- read_orc
Expand Down Expand Up @@ -172,15 +175,33 @@ def read(
fields = self.selected_fields(catalog, config)
name = client.stream_name

logger.info(f"Reading {name} ({client.reader.full_url})...")
logger.info(fields)
configured_stream = catalog.streams[0]

logger.info(f"Syncing stream: {name} ({client.reader.full_url})...")

yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED)

record_counter = 0
try:
for row in client.read(fields=fields):
record = AirbyteRecordMessage(stream=name, data=row, emitted_at=int(datetime.now().timestamp()) * 1000)

record_counter += 1
if record_counter == 1:
logger.info(f"Marking stream {name} as RUNNING")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.RUNNING)

yield AirbyteMessage(type=Type.RECORD, record=record)

logger.info(f"Marking stream {name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.COMPLETE)

except Exception as err:
reason = f"Failed to read data of {name} at {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
logger.error(reason)
logger.exception(f"Encountered an exception while reading stream {name}")
logger.info(f"Marking stream {name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.INCOMPLETE)
raise err

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"jsonl",
"excel",
"excel_binary",
"fwf",
"feather",
"parquet",
"yaml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

import pytest
from airbyte_cdk.utils import AirbyteTracedException
from pandas import read_csv, read_excel
from pandas import read_csv, read_excel, testing
from paramiko import SSHException
from source_file.client import Client, URLFile
from source_file.utils import backoff_handler
from urllib3.exceptions import ProtocolError


Expand All @@ -34,21 +35,22 @@ def csv_format_client():


@pytest.mark.parametrize(
"storage, expected_scheme",
"storage, expected_scheme, url",
[
("GCS", "gs://"),
("S3", "s3://"),
("AZBLOB", "azure://"),
("HTTPS", "https://"),
("SSH", "scp://"),
("SCP", "scp://"),
("SFTP", "sftp://"),
("WEBHDFS", "webhdfs://"),
("LOCAL", "file://"),
("GCS", "gs://", "http://localhost"),
("S3", "s3://", "http://localhost"),
("AZBLOB", "azure://", "http://localhost"),
("HTTPS", "https://", "http://localhost"),
("SSH", "scp://", "http://localhost"),
("SCP", "scp://", "http://localhost"),
("SFTP", "sftp://", "http://localhost"),
("WEBHDFS", "webhdfs://", "http://localhost"),
("LOCAL", "file://", "http://localhost"),
("WRONG", "", ""),
],
)
def test_storage_scheme(storage, expected_scheme):
urlfile = URLFile(provider={"storage": storage}, url="http://localhost")
def test_storage_scheme(storage, expected_scheme, url):
urlfile = URLFile(provider={"storage": storage}, url=url)
assert urlfile.storage_scheme == expected_scheme


Expand Down Expand Up @@ -80,8 +82,27 @@ def test_load_dataframes_xlsb(config, absolute_path, test_files):
assert read_file.equals(expected)


def test_load_nested_json(client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/formats/json/demo.json"
@pytest.mark.parametrize("file_name, should_raise_error", [("test.xlsx", False), ("test_one_line.xlsx", True)])
def test_load_dataframes_xlsx(config, absolute_path, test_files, file_name, should_raise_error):
config["format"] = "excel"
client = Client(**config)
f = f"{absolute_path}/{test_files}/{file_name}"
if should_raise_error:
with pytest.raises(AirbyteTracedException):
next(client.load_dataframes(fp=f))
else:
read_file = next(client.load_dataframes(fp=f))
expected = read_excel(f, engine="openpyxl")
assert read_file.equals(expected)


@pytest.mark.parametrize("file_format, file_path", [("json", "formats/json/demo.json"),
("jsonl", "formats/jsonl/jsonl_nested.jsonl")])
def test_load_nested_json(client, config, absolute_path, test_files, file_format, file_path):
if file_format == "jsonl":
config["format"] = file_format
client = Client(**config)
f = f"{absolute_path}/{test_files}/{file_path}"
with open(f, mode="rb") as file:
assert client.load_nested_json(fp=file)

Expand Down Expand Up @@ -204,3 +225,11 @@ def patched_open(self):
assert call_count == 7

assert sleep_mock.call_count == 5


def test_backoff_handler(caplog):
details = {"tries": 1, "wait": 1}
backoff_handler(details)
expected = [('airbyte', 20, 'Caught retryable error after 1 tries. Waiting 1 seconds then retrying...')]

assert caplog.record_tuples == expected
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from airbyte_cdk.utils import AirbyteTracedException

from source_file.client import ConfigurationError
from airbyte_protocol.models.airbyte_protocol import Type as MessageType
from source_file.source import SourceFile

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -99,7 +100,8 @@ def test_nan_to_null(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]

records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"col1": "key1", "col2": 1.11, "col3": None},
{"col1": "key2", "col2": None, "col3": 2.22},
Expand All @@ -109,13 +111,14 @@ def test_nan_to_null(absolute_path, test_files):

config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == []

config.update({"provider": {"storage": "SSH", "user": "user", "host": "host"}})

with pytest.raises(Exception):
next(source.read(logger=logger, config=config, catalog=catalog))
for record in source.read(logger=logger, config=config, catalog=catalog):
pass


def test_spec(source):
Expand All @@ -136,9 +139,8 @@ def test_check_invalid_config(source, invalid_config):


def test_check_invalid_reader_options(source, invalid_reader_options_config):
expected = AirbyteConnectionStatus(status=Status.FAILED)
actual = source.check(logger=logger, config=invalid_reader_options_config)
assert actual.status == expected.status
with pytest.raises(AirbyteTracedException, match="Field 'reader_options' is not a valid JSON object. Please provide key-value pairs"):
source.check(logger=logger, config=invalid_reader_options_config)


def test_discover_dropbox_link(source, config_dropbox_link):
Expand All @@ -160,17 +162,17 @@ def test_discover(source, config, client):

def test_check_wrong_reader_options(source, config):
config["reader_options"] = '{encoding":"utf_16"}'
assert source.check(logger=logger, config=config) == AirbyteConnectionStatus(
status=Status.FAILED, message="Field 'reader_options' is not valid JSON object. https://www.json.org/"
)
with pytest.raises(AirbyteTracedException, match="Field 'reader_options' is not valid JSON object. https://www.json.org/"):
source.check(logger=logger, config=config)


def test_check_google_spreadsheets_url(source, config):
config["url"] = "https://docs.google.com/spreadsheets/d/"
assert source.check(logger=logger, config=config) == AirbyteConnectionStatus(
status=Status.FAILED,
message="Failed to load https://docs.google.com/spreadsheets/d/: please use the Official Google Sheets Source connector",
)
with pytest.raises(
AirbyteTracedException,
match="Failed to load https://docs.google.com/spreadsheets/d/: please use the Official Google Sheets Source connector",
):
source.check(logger=logger, config=config)


def test_pandas_header_not_none(absolute_path, test_files):
Expand All @@ -186,7 +188,7 @@ def test_pandas_header_not_none(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"text11": "text21", "text12": "text22"},
]
Expand All @@ -205,7 +207,7 @@ def test_pandas_header_none(absolute_path, test_files):

source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"0": "text11", "1": "text12"},
{"0": "text21", "1": "text22"},
Expand Down Expand Up @@ -234,4 +236,4 @@ def test_incorrect_reader_options(absolute_path, test_files):
):
catalog = get_catalog({"0": {"type": ["string", "null"]}, "1": {"type": ["string", "null"]}})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
records = [r.record.data for r in records if r.type == MessageType.RECORD]
Loading

0 comments on commit e485365

Please sign in to comment.