From 212df52370de6f4db9c93cf20d6b1ccf3a9c18a5 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Thu, 23 Mar 2023 23:37:10 +0200 Subject: [PATCH 1/4] #1697 source S3: read a single record on check --- .../connectors/source-s3/Dockerfile | 2 +- .../formats/abstract_file_parser.py | 4 +++ .../formats/csv_parser.py | 4 +++ .../formats/jsonl_parser.py | 4 +++ .../formats/parquet_parser.py | 4 +++ .../source_s3/source_files_abstract/source.py | 8 +++--- .../source_s3/source_files_abstract/stream.py | 27 ++++++++++--------- docs/integrations/sources/s3.md | 1 + 8 files changed, 37 insertions(+), 17 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 1af10885f602..a92039a60212 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=2.0.3 +LABEL io.airbyte.version=2.0.4 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py index 274c763bd0ac..d28c62067cf5 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py @@ -111,3 +111,7 @@ def json_schema_to_pyarrow_schema(cls, schema: Mapping[str, Any], reverse: bool def _validate_config(self, config: Mapping[str, Any]): pass + + @classmethod + def set_minimal_block_size(cls, format: Mapping[str, Any]): + pass diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py index 3594df3a1d95..97d2d7c15b8b 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py @@ -254,3 +254,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> for record_values in zip(*columnwise_record_values): # create our record of {col: value, col: value} by dict comprehension, iterating through all cols in batch_columns yield {batch_columns[i]: record_values[i] for i in range(len(batch_columns))} + + @classmethod + def set_minimal_block_size(cls, format: Mapping[str, Any]): + format["block_size"] = 1024 diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py index 7f0295676750..13558eea9dbc 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py @@ -110,3 +110,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> """ table = self._read_table(file, self._master_schema) yield from table.to_pylist() + + @classmethod + def set_minimal_block_size(cls, format: Mapping[str, Any]): + format["block_size"] = 0 diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py index bb9391a0e7b4..f2fc571a2dc0 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py @@ -143,3 +143,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> batch_columns[i]: self.convert_field_data(logical_types[batch_columns[i]], record_values[i]) for i in range(len(batch_columns)) } + + @classmethod + def set_minimal_block_size(cls, format: Mapping[str, Any]): + format["buffer_size"] = 2 diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py index 85408ee35b03..9384886b2aab 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py @@ -58,7 +58,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> The error object will be cast to string to display the problem to the user. """ try: - stream = self.stream_class(**config) + stream = self.stream_class.with_minimal_block_size(config) stream.fileformatparser_class(stream._format)._validate_config(config) for file_info in stream.filepath_iterator(): # TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams @@ -66,9 +66,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT) # just need first file here to test connection and valid patterns break - for slice_ in stream.stream_slices(sync_mode=SyncMode.full_refresh): - list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_)) - break + slice_ = next(stream.stream_slices(sync_mode=SyncMode.full_refresh), None) + if slice_: + next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_), None) except Exception as e: logger.error(format_exc()) return False, e diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 3cddaa645921..506a1833c49d 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -35,16 +35,12 @@ class ConfigurationError(Exception): class FileStream(Stream, ABC): - @property - def fileformatparser_map(self) -> Mapping[str, type]: - """Mapping where every key is equal 'filetype' and values are corresponding parser classes.""" - return { - "csv": CsvParser, - "parquet": ParquetParser, - "avro": AvroParser, - "jsonl": JsonlParser, - } - + file_formatparser_map = { + "csv": CsvParser, + "parquet": ParquetParser, + "avro": AvroParser, + "jsonl": JsonlParser, + } # TODO: make these user configurable in spec.json ab_additional_col = "_ab_additional_properties" ab_last_mod_col = "_ab_source_file_last_modified" @@ -100,6 +96,13 @@ def _parse_user_input_schema(schema: str) -> Dict[str, Any]: return py_schema + @classmethod + def with_minimal_block_size(cls, config: MutableMapping[str, Any]): + file_type = config["format"]["filetype"] + file_reader = cls.file_formatparser_map[file_type] + file_reader.set_minimal_block_size(config["format"]) + return cls(**config) + @property def name(self) -> str: return self.dataset @@ -114,10 +117,10 @@ def fileformatparser_class(self) -> type: :return: reference to the relevant fileformatparser class e.g. CsvParser """ filetype = self._format.get("filetype") - file_reader = self.fileformatparser_map.get(filetype) + file_reader = self.file_formatparser_map.get(filetype) if not file_reader: raise RuntimeError( - f"Detected mismatched file format '{filetype}'. Available values: '{list(self.fileformatparser_map.keys())}''." + f"Detected mismatched file format '{filetype}'. Available values: '{list(self.file_formatparser_map.keys())}''." ) return file_reader diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 3fc95e0b1052..907a00add15c 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -215,6 +215,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------| +| 2.0.4 | 2023-03-23 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Call `check` with a little block size to save time and memory. | | 2.0.3 | 2023-03-17 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Support legacy datetime format for the period of migration, fix time-zone conversion. | | 2.0.2 | 2023-03-16 | [24157](https://github.com/airbytehq/airbyte/pull/24157) | Return empty schema if `discover` finds no files; Do not infer extra data types when user defined schema is applied. | | 2.0.1 | 2023-03-06 | [23195](https://github.com/airbytehq/airbyte/pull/23195) | Fix datetime format string | From 3563406425650c6161a945938445f4d605dc1d59 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Thu, 23 Mar 2023 23:40:16 +0200 Subject: [PATCH 2/4] #1697 source s3: upd changelog --- docs/integrations/sources/s3.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 907a00add15c..27de37227f21 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -215,8 +215,8 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------| -| 2.0.4 | 2023-03-23 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Call `check` with a little block size to save time and memory. | -| 2.0.3 | 2023-03-17 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Support legacy datetime format for the period of migration, fix time-zone conversion. | +| 2.0.4 | 2023-03-23 | [24429](https://github.com/airbytehq/airbyte/pull/24429) | Call `check` with a little block size to save time and memory. | +| 2.0.3 | 2023-03-17 | [24178](https://github.com/airbytehq/airbyte/pull/24178) | Support legacy datetime format for the period of migration, fix time-zone conversion. | | 2.0.2 | 2023-03-16 | [24157](https://github.com/airbytehq/airbyte/pull/24157) | Return empty schema if `discover` finds no files; Do not infer extra data types when user defined schema is applied. | | 2.0.1 | 2023-03-06 | [23195](https://github.com/airbytehq/airbyte/pull/23195) | Fix datetime format string | | 2.0.0 | 2023-03-14 | [23189](https://github.com/airbytehq/airbyte/pull/23189) | Infer schema based on one file instead of all the files | From 9a9e917d81e9aff255425f90797157c4eefe01eb Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 24 Mar 2023 10:22:50 +0200 Subject: [PATCH 3/4] #1697 source s3: fix unit_tests --- .../connectors/source-s3/unit_tests/test_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py index a2e74da67380..c8687a53b6de 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py @@ -456,7 +456,7 @@ def test_fileformatparser_map(self): stream_instance = IncrementalFileStreamS3( dataset="dummy", provider={"bucket": "test-test"}, format={}, path_pattern="**/prefix*.csv" ) - assert stream_instance.fileformatparser_map + assert stream_instance.file_formatparser_map @pytest.mark.parametrize( ("bucket", "path_prefix", "list_v2_objects", "expected_file_info"), From 7a0e242ddf3013c2b34e88d5585b8fe7bd02f7b0 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Mon, 27 Mar 2023 08:01:23 +0000 Subject: [PATCH 4/4] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-s3/integration_tests/config_minio.json | 2 +- connectors.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9401676cf27c..f422a62d243f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1749,7 +1749,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 2.0.3 + dockerImageTag: 2.0.4 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ba5945e8fe5c..4adc5adbeefe 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -13099,7 +13099,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:2.0.3" +- dockerImage: "airbyte/source-s3:2.0.4" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json index 565fbdc69a02..5e8688ed9462 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -6,7 +6,7 @@ "aws_access_key_id": "123456", "aws_secret_access_key": "123456key", "path_prefix": "", - "endpoint": "http://10.0.89.123:9000" + "endpoint": "http://10.0.124.82:9000" }, "format": { "filetype": "csv", diff --git a/connectors.md b/connectors.md index aaee91ba2a01..e8e21735ebb4 100644 --- a/connectors.md +++ b/connectors.md @@ -200,7 +200,7 @@ | **Reply.io** | Reply.io icon | Source | airbyte/source-reply-io:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/reply-io) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-reply-io) | `8cc6537e-f8a6-423c-b960-e927af76116e` | | **Retently** | Retently icon | Source | airbyte/source-retently:0.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/retently) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-retently) | `db04ecd1-42e7-4115-9cec-95812905c626` | | **Rocket.chat** | Rocket.chat icon | Source | airbyte/source-rocket-chat:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/rocket-chat) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-rocket-chat) | `921d9608-3915-450b-8078-0af18801ea1b` | -| **S3** | S3 icon | Source | airbyte/source-s3:2.0.3 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | `69589781-7828-43c5-9f63-8925b1c1ccc2` | +| **S3** | S3 icon | Source | airbyte/source-s3:2.0.4 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | `69589781-7828-43c5-9f63-8925b1c1ccc2` | | **SAP Fieldglass** | SAP Fieldglass icon | Source | airbyte/source-sap-fieldglass:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sap-fieldglass) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sap-fieldglass) | `ec5f3102-fb31-4916-99ae-864faf8e7e25` | | **SFTP** | SFTP icon | Source | airbyte/source-sftp:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | `a827c52e-791c-4135-a245-e233c5255199` | | **SFTP Bulk** | SFTP Bulk icon | Source | airbyte/source-sftp-bulk:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | `31e3242f-dee7-4cdc-a4b8-8e06c5458517` |