Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source file: do not read whole file on check and discover #24278

Merged
merged 4 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,9 @@
icon: file.svg
sourceType: file
releaseStage: generally_available
allowedHosts:
hosts:
- "*"
- name: Firebase Realtime Database
sourceDefinitionId: acb5f973-a565-441e-992f-4946f3e65662
dockerRepository: airbyte/source-firebase-realtime-database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-file-secure/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM airbyte/source-file:0.2.34
FROM airbyte/source-file:0.2.35

WORKDIR /airbyte/integration_code
COPY source_file_secure ./source_file_secure
Expand All @@ -9,5 +9,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.34
LABEL io.airbyte.version=0.2.35
LABEL io.airbyte.name=airbyte/source-file-secure
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.34
LABEL io.airbyte.version=0.2.35
LABEL io.airbyte.name=airbyte/source-file
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test__read_file_not_found(provider_config, provider_name, file_path, file_fo
)
def test__streams_from_ssh_providers(provider_config, provider_name, file_path, file_format):
client = Client(dataset_name="output", format=file_format, url=file_path, provider=provider_config(provider_name))
streams = list(client.streams)
streams = list(client.streams())
assert len(streams) == 1
assert streams[0].json_schema["properties"] == {
"header1": {"type": ["string", "null"]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


from pathlib import Path
from unittest.mock import patch

import pytest
from airbyte_cdk import AirbyteLogger
Expand Down Expand Up @@ -79,29 +78,3 @@ def run_load_nested_json_schema(config, expected_columns=10, expected_rows=42):
df = data_list[0]
assert len(df) == expected_rows # DataFrame should have 42 items
return df


# https://github.com/airbytehq/alpha-beta-issues/issues/174
# this is to ensure we make all conditions under which the bug is reproduced, i.e.
# - chunk size < file size
# - column type in the last chunk is not `string`
@patch("source_file.client.Client.CSV_CHUNK_SIZE", 1)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropped this test because it doesn't represent the expected behavior from now on

def test_csv_schema():
source = SourceFile()
file_path = str(SAMPLE_DIRECTORY.parent.joinpath("discover.csv"))
config = {"dataset_name": "test", "format": "csv", "url": file_path, "provider": {"storage": "local"}}
catalog = source.discover(logger=AirbyteLogger(), config=config).dict()
assert len(catalog["streams"]) == 1
schema = catalog["streams"][0]["json_schema"]
assert schema == {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"Address": {"type": ["string", "null"]},
"City": {"type": ["string", "null"]},
"First Name": {"type": ["string", "null"]},
"Last Name": {"type": ["string", "null"]},
"State": {"type": ["string", "null"]},
"zip_code": {"type": ["string", "null"]},
},
"type": "object",
}
24 changes: 17 additions & 7 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import json
import sys
import tempfile
import traceback
import urllib
Expand Down Expand Up @@ -288,11 +289,12 @@ def load_yaml(self, fp):
if self._reader_format == "yaml":
return pd.DataFrame(safe_load(fp))

def load_dataframes(self, fp, skip_data=False) -> Iterable:
def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False) -> Iterable:
"""load and return the appropriate pandas dataframe.
:param fp: file-like object to read from
:param skip_data: limit reading data
:param read_sample_chunk: indicates whether a single chunk should only be read to generate schema
:return: a list of dataframe loaded from files described in the configuration
"""
readers = {
Expand Down Expand Up @@ -321,11 +323,16 @@ def load_dataframes(self, fp, skip_data=False) -> Iterable:
reader_options = {**self._reader_options}
try:
if self._reader_format == "csv":
bytes_read = 0
reader_options["chunksize"] = self.CSV_CHUNK_SIZE
if skip_data:
reader_options["nrows"] = 0
reader_options["index_col"] = 0
yield from reader(fp, **reader_options)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read only self.CSV_CHUNK_SIZE bytes of data to generate schema. Otherwise a time out is possible in case a large file is read

for record in reader(fp, **reader_options):
bytes_read += sys.getsizeof(record)
yield record
if read_sample_chunk and bytes_read >= self.CSV_CHUNK_SIZE:
return
elif self._reader_options == "excel_binary":
reader_options["engine"] = "pyxlsb"
yield from reader(fp, **reader_options)
Expand Down Expand Up @@ -393,13 +400,17 @@ def _cache_stream(self, fp):
fp.close()
return fp_tmp

def _stream_properties(self, fp):
def _stream_properties(self, fp, empty_schema: bool = False, read_sample_chunk: bool = False):
"""
empty_schema param is used to check connectivity, i.e. we only read a header and do not produce stream properties
read_sample_chunk is used to determine if just one chunk should be read to generate schema
"""
if self._reader_format == "yaml":
df_list = [self.load_yaml(fp)]
else:
if self.binary_source:
fp = self._cache_stream(fp)
df_list = self.load_dataframes(fp, skip_data=False)
df_list = self.load_dataframes(fp, skip_data=empty_schema, read_sample_chunk=read_sample_chunk)
fields = {}
for df in df_list:
for col in df.columns:
Expand All @@ -408,8 +419,7 @@ def _stream_properties(self, fp):
fields[col] = self.dtype_to_json_type(prev_frame_column_type, df[col].dtype)
return {field: {"type": [fields[field], "null"]} for field in fields}

@property
def streams(self) -> Iterable:
def streams(self, empty_schema: bool = False) -> Iterable:
"""Discovers available streams"""
# TODO handle discovery of directories of multiple files instead
with self.reader.open() as fp:
Expand All @@ -419,6 +429,6 @@ def streams(self) -> Iterable:
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": self._stream_properties(fp),
"properties": self._stream_properties(fp, empty_schema=empty_schema, read_sample_chunk=True),
}
yield AirbyteStream(name=self.stream_name, json_schema=json_schema, supported_sync_modes=[SyncMode.full_refresh])
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def _get_client(self, config: Mapping):

return client

def _validate_and_transform(self, config: Mapping[str, Any]):
@staticmethod
def _validate_and_transform(config: Mapping[str, Any]):
if "reader_options" in config:
try:
config["reader_options"] = json.loads(config["reader_options"])
Expand Down Expand Up @@ -108,9 +109,8 @@ def check(self, logger, config: Mapping) -> AirbyteConnectionStatus:
client = self._get_client(config)
source_url = client.reader.full_url
try:
with client.reader.open():
list(client.streams)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
list(client.streams(empty_schema=True))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read only file header when running check to ensure the connection succeeds

return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except (TypeError, ValueError, ConfigurationError) as err:
reason = f"Failed to load {source_url}\n Please check File Format and Reader Options are set correctly. \n{repr(err)}"
logger.error(reason)
Expand All @@ -127,13 +127,13 @@ def discover(self, logger: AirbyteLogger, config: Mapping) -> AirbyteCatalog:
"""
config = self._validate_and_transform(config)
client = self._get_client(config)
name = client.stream_name
name, full_url = client.stream_name, client.reader.full_url

logger.info(f"Discovering schema of {name} at {client.reader.full_url}...")
logger.info(f"Discovering schema of {name} at {full_url}...")
try:
streams = list(client.streams)
streams = list(client.streams())
except Exception as err:
reason = f"Failed to discover schemas of {name} at {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
reason = f"Failed to discover schemas of {name} at {full_url}: {repr(err)}\n{traceback.format_exc()}"
logger.error(reason)
raise err
return AirbyteCatalog(streams=streams)
Expand Down
Loading