From caf4c39724a1613a1fb13d200dd93cfa820c164e Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 10 Jan 2023 18:15:39 +0100 Subject: [PATCH 1/3] Source S3: update block size for json --- .../connectors/source-s3/Dockerfile | 2 +- .../source-s3/integration_tests/spec.json | 2 +- .../formats/jsonl_parser.py | 22 +++++++++++++------ .../formats/jsonl_spec.py | 4 ++-- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index e97e475cc4a2..5483d35fd9f4 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.27 +LABEL io.airbyte.version=0.1.28 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index 2fcd27e2bcf9..8aa6266bd205 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -198,7 +198,7 @@ "block_size": { "title": "Block Size", "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", - "default": 10000, + "default": 0, "order": 2, "type": "integer" } diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py index b6d408dc1e4f..8e3ea463693b 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py @@ -1,17 +1,18 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - - +import logging from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union import pyarrow as pa -from pyarrow import json as pa_json +from pyarrow import json as pa_json, ArrowNotImplementedError from source_s3.source_files_abstract.file_info import FileInfo from .abstract_file_parser import AbstractFileParser from .jsonl_spec import JsonlFormat +logger = logging.getLogger("airbyte") + class JsonlParser(AbstractFileParser): TYPE_MAP = { @@ -50,8 +51,10 @@ def _read_options(self) -> Mapping[str, str]: """ https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html build ReadOptions object like: pa.json.ReadOptions(**self._read_options()) + Disable block size parameter if it set to 0. """ - return {**{"block_size": self.format.block_size, "use_threads": True}} + return {**{"block_size": self.format.block_size if self.format.block_size else None, + "use_threads": True}} def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, str]: """ @@ -70,9 +73,14 @@ def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, return parse_options def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, Any] = None) -> pa.Table: - return pa_json.read_json( - file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema)) - ) + try: + return pa_json.read_json( + file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema)) + ) + except ArrowNotImplementedError as e: + message = "Possibly too small block size used. Please try to increase it or set to 0 disable this feature." + logger.warning(message) + raise ValueError(message) from e def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]: """ diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py index 6af676c9159e..98d9e44e8e2b 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py @@ -38,9 +38,9 @@ class Config: examples=["ignore", "infer", "error"], order=1, ) - + # Block size set to 0 as default value to disable this feature for most not-experienced users block_size: int = Field( - default=10000, + default=0, description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", order=2, ) From b93c8f16cf1b022f1c3389dea8eb9205c6fba18a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 10 Jan 2023 18:19:01 +0100 Subject: [PATCH 2/3] Source S3: update docs --- docs/integrations/sources/s3.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 23bb3f120d1b..efdf8c1b2fee 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -205,10 +205,11 @@ The avro parser uses [fastavro](https://fastavro.readthedocs.io/en/latest/). Cur The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is supported.For more detailed info, please refer to the [docs] (https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html) -## Changelog +## Changelog 21210 | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| +| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format | | 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | | 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | | 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file | From 2bd5353fc75a90b6990d5bee8ec6c7cdf423132e Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Tue, 10 Jan 2023 19:15:36 +0000 Subject: [PATCH 3/3] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- .../init/src/main/resources/seed/source_specs.yaml | 4 ++-- .../source-s3/integration_tests/config_minio.json | 2 +- .../source_files_abstract/formats/jsonl_parser.py | 7 ++++--- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 3f9684b03d1d..79ce2e6263f2 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1422,7 +1422,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.27 + dockerImageTag: 0.1.28 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 41c07d46c771..5f798eba0a28 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -12563,7 +12563,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.27" +- dockerImage: "airbyte/source-s3:0.1.28" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3" @@ -12786,7 +12786,7 @@ \ from each file. If your data is particularly wide and failing\ \ during schema detection, increasing this should solve it. Beware\ \ of raising this too high as you could hit OOM errors." - default: 10000 + default: 0 order: 2 type: "integer" schema: diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json index 6198d09c452e..c51152a102b0 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -6,7 +6,7 @@ "aws_access_key_id": "123456", "aws_secret_access_key": "123456key", "path_prefix": "", - "endpoint": "http://10.0.210.197:9000" + "endpoint": "http://10.0.231.175:9000" }, "format": { "filetype": "csv", diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py index 8e3ea463693b..54eb6b4c33f4 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py @@ -1,11 +1,13 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # + import logging from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union import pyarrow as pa -from pyarrow import json as pa_json, ArrowNotImplementedError +from pyarrow import ArrowNotImplementedError +from pyarrow import json as pa_json from source_s3.source_files_abstract.file_info import FileInfo from .abstract_file_parser import AbstractFileParser @@ -53,8 +55,7 @@ def _read_options(self) -> Mapping[str, str]: build ReadOptions object like: pa.json.ReadOptions(**self._read_options()) Disable block size parameter if it set to 0. """ - return {**{"block_size": self.format.block_size if self.format.block_size else None, - "use_threads": True}} + return {**{"block_size": self.format.block_size if self.format.block_size else None, "use_threads": True}} def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, str]: """