diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
index 9401676cf27c..f422a62d243f 100644
--- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
+++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
@@ -1749,7 +1749,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
- dockerImageTag: 2.0.3
+ dockerImageTag: 2.0.4
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml
index ba5945e8fe5c..4adc5adbeefe 100644
--- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml
+++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml
@@ -13099,7 +13099,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
-- dockerImage: "airbyte/source-s3:2.0.3"
+- dockerImage: "airbyte/source-s3:2.0.4"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile
index 1af10885f602..a92039a60212 100644
--- a/airbyte-integrations/connectors/source-s3/Dockerfile
+++ b/airbyte-integrations/connectors/source-s3/Dockerfile
@@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
-LABEL io.airbyte.version=2.0.3
+LABEL io.airbyte.version=2.0.4
LABEL io.airbyte.name=airbyte/source-s3
diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json
index 565fbdc69a02..5e8688ed9462 100644
--- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json
+++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json
@@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
- "endpoint": "http://10.0.89.123:9000"
+ "endpoint": "http://10.0.124.82:9000"
},
"format": {
"filetype": "csv",
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py
index 274c763bd0ac..d28c62067cf5 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py
@@ -111,3 +111,7 @@ def json_schema_to_pyarrow_schema(cls, schema: Mapping[str, Any], reverse: bool
def _validate_config(self, config: Mapping[str, Any]):
pass
+
+ @classmethod
+ def set_minimal_block_size(cls, format: Mapping[str, Any]):
+ pass
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py
index 3594df3a1d95..97d2d7c15b8b 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py
@@ -254,3 +254,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) ->
for record_values in zip(*columnwise_record_values):
# create our record of {col: value, col: value} by dict comprehension, iterating through all cols in batch_columns
yield {batch_columns[i]: record_values[i] for i in range(len(batch_columns))}
+
+ @classmethod
+ def set_minimal_block_size(cls, format: Mapping[str, Any]):
+ format["block_size"] = 1024
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py
index 7f0295676750..13558eea9dbc 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py
@@ -110,3 +110,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) ->
"""
table = self._read_table(file, self._master_schema)
yield from table.to_pylist()
+
+ @classmethod
+ def set_minimal_block_size(cls, format: Mapping[str, Any]):
+ format["block_size"] = 0
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py
index bb9391a0e7b4..f2fc571a2dc0 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py
@@ -143,3 +143,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) ->
batch_columns[i]: self.convert_field_data(logical_types[batch_columns[i]], record_values[i])
for i in range(len(batch_columns))
}
+
+ @classmethod
+ def set_minimal_block_size(cls, format: Mapping[str, Any]):
+ format["buffer_size"] = 2
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py
index 85408ee35b03..9384886b2aab 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py
@@ -58,7 +58,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
The error object will be cast to string to display the problem to the user.
"""
try:
- stream = self.stream_class(**config)
+ stream = self.stream_class.with_minimal_block_size(config)
stream.fileformatparser_class(stream._format)._validate_config(config)
for file_info in stream.filepath_iterator():
# TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams
@@ -66,9 +66,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT)
# just need first file here to test connection and valid patterns
break
- for slice_ in stream.stream_slices(sync_mode=SyncMode.full_refresh):
- list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_))
- break
+ slice_ = next(stream.stream_slices(sync_mode=SyncMode.full_refresh), None)
+ if slice_:
+ next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_), None)
except Exception as e:
logger.error(format_exc())
return False, e
diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
index 3cddaa645921..506a1833c49d 100644
--- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
+++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
@@ -35,16 +35,12 @@ class ConfigurationError(Exception):
class FileStream(Stream, ABC):
- @property
- def fileformatparser_map(self) -> Mapping[str, type]:
- """Mapping where every key is equal 'filetype' and values are corresponding parser classes."""
- return {
- "csv": CsvParser,
- "parquet": ParquetParser,
- "avro": AvroParser,
- "jsonl": JsonlParser,
- }
-
+ file_formatparser_map = {
+ "csv": CsvParser,
+ "parquet": ParquetParser,
+ "avro": AvroParser,
+ "jsonl": JsonlParser,
+ }
# TODO: make these user configurable in spec.json
ab_additional_col = "_ab_additional_properties"
ab_last_mod_col = "_ab_source_file_last_modified"
@@ -100,6 +96,13 @@ def _parse_user_input_schema(schema: str) -> Dict[str, Any]:
return py_schema
+ @classmethod
+ def with_minimal_block_size(cls, config: MutableMapping[str, Any]):
+ file_type = config["format"]["filetype"]
+ file_reader = cls.file_formatparser_map[file_type]
+ file_reader.set_minimal_block_size(config["format"])
+ return cls(**config)
+
@property
def name(self) -> str:
return self.dataset
@@ -114,10 +117,10 @@ def fileformatparser_class(self) -> type:
:return: reference to the relevant fileformatparser class e.g. CsvParser
"""
filetype = self._format.get("filetype")
- file_reader = self.fileformatparser_map.get(filetype)
+ file_reader = self.file_formatparser_map.get(filetype)
if not file_reader:
raise RuntimeError(
- f"Detected mismatched file format '{filetype}'. Available values: '{list(self.fileformatparser_map.keys())}''."
+ f"Detected mismatched file format '{filetype}'. Available values: '{list(self.file_formatparser_map.keys())}''."
)
return file_reader
diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py
index a2e74da67380..c8687a53b6de 100644
--- a/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py
+++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py
@@ -456,7 +456,7 @@ def test_fileformatparser_map(self):
stream_instance = IncrementalFileStreamS3(
dataset="dummy", provider={"bucket": "test-test"}, format={}, path_pattern="**/prefix*.csv"
)
- assert stream_instance.fileformatparser_map
+ assert stream_instance.file_formatparser_map
@pytest.mark.parametrize(
("bucket", "path_prefix", "list_v2_objects", "expected_file_info"),
diff --git a/connectors.md b/connectors.md
index aaee91ba2a01..e8e21735ebb4 100644
--- a/connectors.md
+++ b/connectors.md
@@ -200,7 +200,7 @@
| **Reply.io** | | Source | airbyte/source-reply-io:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/reply-io) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-reply-io) | `8cc6537e-f8a6-423c-b960-e927af76116e` |
| **Retently** | | Source | airbyte/source-retently:0.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/retently) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-retently) | `db04ecd1-42e7-4115-9cec-95812905c626` |
| **Rocket.chat** | | Source | airbyte/source-rocket-chat:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/rocket-chat) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-rocket-chat) | `921d9608-3915-450b-8078-0af18801ea1b` |
-| **S3** | | Source | airbyte/source-s3:2.0.3 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | `69589781-7828-43c5-9f63-8925b1c1ccc2` |
+| **S3** | | Source | airbyte/source-s3:2.0.4 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | `69589781-7828-43c5-9f63-8925b1c1ccc2` |
| **SAP Fieldglass** | | Source | airbyte/source-sap-fieldglass:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sap-fieldglass) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sap-fieldglass) | `ec5f3102-fb31-4916-99ae-864faf8e7e25` |
| **SFTP** | | Source | airbyte/source-sftp:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | `a827c52e-791c-4135-a245-e233c5255199` |
| **SFTP Bulk** | | Source | airbyte/source-sftp-bulk:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | `31e3242f-dee7-4cdc-a4b8-8e06c5458517` |
diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md
index 3fc95e0b1052..27de37227f21 100644
--- a/docs/integrations/sources/s3.md
+++ b/docs/integrations/sources/s3.md
@@ -215,7 +215,8 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
-| 2.0.3 | 2023-03-17 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Support legacy datetime format for the period of migration, fix time-zone conversion. |
+| 2.0.4 | 2023-03-23 | [24429](https://github.com/airbytehq/airbyte/pull/24429) | Call `check` with a little block size to save time and memory. |
+| 2.0.3 | 2023-03-17 | [24178](https://github.com/airbytehq/airbyte/pull/24178) | Support legacy datetime format for the period of migration, fix time-zone conversion. |
| 2.0.2 | 2023-03-16 | [24157](https://github.com/airbytehq/airbyte/pull/24157) | Return empty schema if `discover` finds no files; Do not infer extra data types when user defined schema is applied. |
| 2.0.1 | 2023-03-06 | [23195](https://github.com/airbytehq/airbyte/pull/23195) | Fix datetime format string |
| 2.0.0 | 2023-03-14 | [23189](https://github.com/airbytehq/airbyte/pull/23189) | Infer schema based on one file instead of all the files |