Skip to content

Commit

Permalink
Source S3: fix discovery issues (airbytehq#24157)
Browse files Browse the repository at this point in the history
* airbytehq#1652 airbytehq#1664 Source S3: fix discovery issues

* airbytehq#1652 airbytehq#1664 source s3: upd changelog

* airbytehq#1652 airbytehq#1664 source s3: review comments

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and adriennevermorel committed Mar 17, 2023
1 parent c990e3a commit 40ab566
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1730,7 +1730,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 2.0.1
dockerImageTag: 2.0.2
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13019,7 +13019,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:2.0.1"
- dockerImage: "airbyte/source-s3:2.0.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.1
LABEL io.airbyte.version=2.0.2
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.186.241:9000"
"endpoint": "http://10.0.132.182:9000"
},
"format": {
"filetype": "csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def _get_schema_dict_without_inference(self, file: Union[TextIO, BinaryIO]) -> M
quote_char = self.format.quote_char
reader = csv.reader([six.ensure_text(file.readline())], delimiter=delimiter, quotechar=quote_char)
field_names = next(reader)
file.seek(0) # the file may be reused later so return the cursor to the very beginning of the file as if nothing happened here
return {field_name.strip(): pyarrow.string() for field_name in field_names}

@wrap_exception((ValueError,))
Expand All @@ -220,11 +221,22 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) ->
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
PyArrow returns lists of values for each column so we zip() these up into records which we then yield
"""
# In case master_schema is a user defined schema, it may miss some columns.
# We set their type to `string` as a default type in order to pass a schema with all the file columns to pyarrow
# so that pyarrow wouldn't need to infer data types of missing columns. Type inference may often break syncs:
# it reads a block of data and makes suggestions of its type based on that block. So if the next block contains data
# of different type, things get broken. To fix it you either have to increase block size or pass a predefined schema.
# Even if actual data type is changed because of this hack, it will not break sync because this data is written
# to `_ab_additional_properties` column which is not strictly typed ({'type': 'object'}). That's why this is helpful
# when a schema is defined by user and there's no space to increase a block size.
schema = self._get_schema_dict_without_inference(file)
schema.update(self._master_schema)

streaming_reader = pa_csv.open_csv(
file,
pa.csv.ReadOptions(**self._read_options()),
pa.csv.ParseOptions(**self._parse_options()),
pa.csv.ConvertOptions(**self._convert_options(self._master_schema)),
pa.csv.ConvertOptions(**self._convert_options(schema)),
)
still_reading = True
while still_reading:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ def get_json_schema(self) -> Mapping[str, Any]:
def _auto_inferred_schema(self) -> Dict[str, Any]:
file_reader = self.fileformatparser_class(self._format)
file_info_iterator = iter(list(self.get_time_ordered_file_infos()))
file_info = next(file_info_iterator)
file_info = next(file_info_iterator, None)
if not file_info:
return {}
storage_file = self.storagefile_class(file_info, self._provider)
with storage_file.open(file_reader.is_binary) as f:
return file_reader.get_inferred_schema(f, file_info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,3 +573,20 @@ def test_get_json_schema(self):
},
"type": "object",
}

def test_schema_no_files(self, mocker):
stream_instance = IncrementalFileStreamS3(
dataset="dummy",
provider={"bucket": "empty"},
format={"filetype": "csv"},
path_pattern="**/prefix*.csv"
)
mocker.patch.object(stream_instance, "_list_bucket", MagicMock(return_value=[]))
assert stream_instance.get_json_schema() == {
"properties": {
"_ab_additional_properties": {"type": "object"},
"_ab_source_file_last_modified": {"format": "date-time", "type": "string"},
"_ab_source_file_url": {"type": "string"}
},
"type": "object",
}
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
| **Reply.io** | <img alt="Reply.io icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/reply-io.svg" height="30" height="30"/> | Source | airbyte/source-reply-io:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/reply-io) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-reply-io) | <small>`8cc6537e-f8a6-423c-b960-e927af76116e`</small> |
| **Retently** | <img alt="Retently icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/retently.svg" height="30" height="30"/> | Source | airbyte/source-retently:0.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/retently) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-retently) | <small>`db04ecd1-42e7-4115-9cec-95812905c626`</small> |
| **Rocket.chat** | <img alt="Rocket.chat icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/rocket-chat.svg" height="30" height="30"/> | Source | airbyte/source-rocket-chat:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/rocket-chat) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-rocket-chat) | <small>`921d9608-3915-450b-8078-0af18801ea1b`</small> |
| **S3** | <img alt="S3 icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/s3.svg" height="30" height="30"/> | Source | airbyte/source-s3:2.0.1 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | <small>`69589781-7828-43c5-9f63-8925b1c1ccc2`</small> |
| **S3** | <img alt="S3 icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/s3.svg" height="30" height="30"/> | Source | airbyte/source-s3:2.0.2 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | <small>`69589781-7828-43c5-9f63-8925b1c1ccc2`</small> |
| **SAP Fieldglass** | <img alt="SAP Fieldglass icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sapfieldglass.svg" height="30" height="30"/> | Source | airbyte/source-sap-fieldglass:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sap-fieldglass) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sap-fieldglass) | <small>`ec5f3102-fb31-4916-99ae-864faf8e7e25`</small> |
| **SFTP** | <img alt="SFTP icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | <small>`a827c52e-791c-4135-a245-e233c5255199`</small> |
| **SFTP Bulk** | <img alt="SFTP Bulk icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp-bulk:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | <small>`31e3242f-dee7-4cdc-a4b8-8e06c5458517`</small> |
Expand Down
Loading

0 comments on commit 40ab566

Please sign in to comment.