Skip to content

Commit

Permalink
Source file: do not read whole file on check and discover (#24278)
Browse files Browse the repository at this point in the history
* #1681 source file: do not read whole file on check and discover

* #1681 source file: upd changelog

* #1681 source file: add allowed hosts

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and erohmensing committed Mar 22, 2023
1 parent 35622d3 commit c67e67f
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,14 @@
- name: File (CSV, JSON, Excel, Feather, Parquet)
sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerRepository: airbyte/source-file
dockerImageTag: 0.2.34
dockerImageTag: 0.2.35
documentationUrl: https://docs.airbyte.com/integrations/sources/file
icon: file.svg
sourceType: file
releaseStage: generally_available
allowedHosts:
hosts:
- "*"
- name: Firebase Realtime Database
sourceDefinitionId: acb5f973-a565-441e-992f-4946f3e65662
dockerRepository: airbyte/source-firebase-realtime-database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4358,7 +4358,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-file:0.2.34"
- dockerImage: "airbyte/source-file:0.2.35"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/file"
connectionSpecification:
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-file-secure/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM airbyte/source-file:0.2.34
FROM airbyte/source-file:0.2.35

WORKDIR /airbyte/integration_code
COPY source_file_secure ./source_file_secure
Expand All @@ -9,5 +9,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.34
LABEL io.airbyte.version=0.2.35
LABEL io.airbyte.name=airbyte/source-file-secure
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.34
LABEL io.airbyte.version=0.2.35
LABEL io.airbyte.name=airbyte/source-file
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test__read_file_not_found(provider_config, provider_name, file_path, file_fo
)
def test__streams_from_ssh_providers(provider_config, provider_name, file_path, file_format):
client = Client(dataset_name="output", format=file_format, url=file_path, provider=provider_config(provider_name))
streams = list(client.streams)
streams = list(client.streams())
assert len(streams) == 1
assert streams[0].json_schema["properties"] == {
"header1": {"type": ["string", "null"]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


from pathlib import Path
from unittest.mock import patch

import pytest
from airbyte_cdk import AirbyteLogger
Expand Down Expand Up @@ -79,29 +78,3 @@ def run_load_nested_json_schema(config, expected_columns=10, expected_rows=42):
df = data_list[0]
assert len(df) == expected_rows # DataFrame should have 42 items
return df


# https://github.com/airbytehq/alpha-beta-issues/issues/174
# this is to ensure we make all conditions under which the bug is reproduced, i.e.
# - chunk size < file size
# - column type in the last chunk is not `string`
@patch("source_file.client.Client.CSV_CHUNK_SIZE", 1)
def test_csv_schema():
source = SourceFile()
file_path = str(SAMPLE_DIRECTORY.parent.joinpath("discover.csv"))
config = {"dataset_name": "test", "format": "csv", "url": file_path, "provider": {"storage": "local"}}
catalog = source.discover(logger=AirbyteLogger(), config=config).dict()
assert len(catalog["streams"]) == 1
schema = catalog["streams"][0]["json_schema"]
assert schema == {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"Address": {"type": ["string", "null"]},
"City": {"type": ["string", "null"]},
"First Name": {"type": ["string", "null"]},
"Last Name": {"type": ["string", "null"]},
"State": {"type": ["string", "null"]},
"zip_code": {"type": ["string", "null"]},
},
"type": "object",
}
24 changes: 17 additions & 7 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import json
import sys
import tempfile
import traceback
import urllib
Expand Down Expand Up @@ -288,11 +289,12 @@ def load_yaml(self, fp):
if self._reader_format == "yaml":
return pd.DataFrame(safe_load(fp))

def load_dataframes(self, fp, skip_data=False) -> Iterable:
def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False) -> Iterable:
"""load and return the appropriate pandas dataframe.
:param fp: file-like object to read from
:param skip_data: limit reading data
:param read_sample_chunk: indicates whether a single chunk should only be read to generate schema
:return: a list of dataframe loaded from files described in the configuration
"""
readers = {
Expand Down Expand Up @@ -321,11 +323,16 @@ def load_dataframes(self, fp, skip_data=False) -> Iterable:
reader_options = {**self._reader_options}
try:
if self._reader_format == "csv":
bytes_read = 0
reader_options["chunksize"] = self.CSV_CHUNK_SIZE
if skip_data:
reader_options["nrows"] = 0
reader_options["index_col"] = 0
yield from reader(fp, **reader_options)
for record in reader(fp, **reader_options):
bytes_read += sys.getsizeof(record)
yield record
if read_sample_chunk and bytes_read >= self.CSV_CHUNK_SIZE:
return
elif self._reader_options == "excel_binary":
reader_options["engine"] = "pyxlsb"
yield from reader(fp, **reader_options)
Expand Down Expand Up @@ -393,13 +400,17 @@ def _cache_stream(self, fp):
fp.close()
return fp_tmp

def _stream_properties(self, fp):
def _stream_properties(self, fp, empty_schema: bool = False, read_sample_chunk: bool = False):
"""
empty_schema param is used to check connectivity, i.e. we only read a header and do not produce stream properties
read_sample_chunk is used to determine if just one chunk should be read to generate schema
"""
if self._reader_format == "yaml":
df_list = [self.load_yaml(fp)]
else:
if self.binary_source:
fp = self._cache_stream(fp)
df_list = self.load_dataframes(fp, skip_data=False)
df_list = self.load_dataframes(fp, skip_data=empty_schema, read_sample_chunk=read_sample_chunk)
fields = {}
for df in df_list:
for col in df.columns:
Expand All @@ -408,8 +419,7 @@ def _stream_properties(self, fp):
fields[col] = self.dtype_to_json_type(prev_frame_column_type, df[col].dtype)
return {field: {"type": [fields[field], "null"]} for field in fields}

@property
def streams(self) -> Iterable:
def streams(self, empty_schema: bool = False) -> Iterable:
"""Discovers available streams"""
# TODO handle discovery of directories of multiple files instead
with self.reader.open() as fp:
Expand All @@ -419,6 +429,6 @@ def streams(self) -> Iterable:
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": self._stream_properties(fp),
"properties": self._stream_properties(fp, empty_schema=empty_schema, read_sample_chunk=True),
}
yield AirbyteStream(name=self.stream_name, json_schema=json_schema, supported_sync_modes=[SyncMode.full_refresh])
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def _get_client(self, config: Mapping):

return client

def _validate_and_transform(self, config: Mapping[str, Any]):
@staticmethod
def _validate_and_transform(config: Mapping[str, Any]):
if "reader_options" in config:
try:
config["reader_options"] = json.loads(config["reader_options"])
Expand Down Expand Up @@ -108,9 +109,8 @@ def check(self, logger, config: Mapping) -> AirbyteConnectionStatus:
client = self._get_client(config)
source_url = client.reader.full_url
try:
with client.reader.open():
list(client.streams)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
list(client.streams(empty_schema=True))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except (TypeError, ValueError, ConfigurationError) as err:
reason = f"Failed to load {source_url}\n Please check File Format and Reader Options are set correctly. \n{repr(err)}"
logger.error(reason)
Expand All @@ -127,13 +127,13 @@ def discover(self, logger: AirbyteLogger, config: Mapping) -> AirbyteCatalog:
"""
config = self._validate_and_transform(config)
client = self._get_client(config)
name = client.stream_name
name, full_url = client.stream_name, client.reader.full_url

logger.info(f"Discovering schema of {name} at {client.reader.full_url}...")
logger.info(f"Discovering schema of {name} at {full_url}...")
try:
streams = list(client.streams)
streams = list(client.streams())
except Exception as err:
reason = f"Failed to discover schemas of {name} at {client.reader.full_url}: {repr(err)}\n{traceback.format_exc()}"
reason = f"Failed to discover schemas of {name} at {full_url}: {repr(err)}\n{traceback.format_exc()}"
logger.error(reason)
raise err
return AirbyteCatalog(streams=streams)
Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
| **Facebook Pages** | <img alt="Facebook Pages icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/facebook.svg" height="30" height="30"/> | Source | airbyte/source-facebook-pages:0.2.3 | beta | [link](https://docs.airbyte.com/integrations/sources/facebook-pages) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-facebook-pages) | <small>`010eb12f-837b-4685-892d-0a39f76a98f5`</small> |
| **Fastbill** | <img alt="Fastbill icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/fastbill.svg" height="30" height="30"/> | Source | airbyte/source-fastbill:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/fastbill) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-fastbill) | <small>`eb3e9c1c-0467-4eb7-a172-5265e04ccd0a`</small> |
| **Fauna** | <img alt="Fauna icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/fauna.svg" height="30" height="30"/> | Source | airbyte/source-fauna:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/fauna) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-fauna) | <small>`3825db3e-c94b-42ac-bd53-b5a9507ace2b`</small> |
| **File (CSV, JSON, Excel, Feather, Parquet)** | <img alt="File (CSV, JSON, Excel, Feather, Parquet) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/file.svg" height="30" height="30"/> | Source | airbyte/source-file:0.2.34 | generally_available | [link](https://docs.airbyte.com/integrations/sources/file) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-file) | <small>`778daa7c-feaf-4db6-96f3-70fd645acc77`</small> |
| **File (CSV, JSON, Excel, Feather, Parquet)** | <img alt="File (CSV, JSON, Excel, Feather, Parquet) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/file.svg" height="30" height="30"/> | Source | airbyte/source-file:0.2.35 | generally_available | [link](https://docs.airbyte.com/integrations/sources/file) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-file) | <small>`778daa7c-feaf-4db6-96f3-70fd645acc77`</small> |
| **Firebase Realtime Database** | x | Source | airbyte/source-firebase-realtime-database:0.1.0 | alpha | [link](https://docs.airbyte.io/integrations/sources/firebase-realtime-database) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-firebase-realtime-database) | <small>`acb5f973-a565-441e-992f-4946f3e65662`</small> |
| **Firebolt** | <img alt="Firebolt icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/firebolt.svg" height="30" height="30"/> | Source | airbyte/source-firebolt:0.2.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/firebolt) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-firebolt) | <small>`6f2ac653-8623-43c4-8950-19218c7caf3d`</small> |
| **Flexport** | x | Source | airbyte/source-flexport:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/flexport) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-flexport) | <small>`f95337f1-2ad1-4baf-922f-2ca9152de630`</small> |
Expand Down
Loading

0 comments on commit c67e67f

Please sign in to comment.