From 31edbd8baefeab01749d88c93698fe68e51e3bc7 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Tue, 10 Jan 2023 20:53:42 +0100 Subject: [PATCH] Source S3: update block size for json (#21210) * Source S3: update block size for json * Source S3: update docs * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 4 ++-- .../connectors/source-s3/Dockerfile | 2 +- .../integration_tests/config_minio.json | 2 +- .../source-s3/integration_tests/spec.json | 2 +- .../formats/jsonl_parser.py | 19 ++++++++++++++----- .../formats/jsonl_spec.py | 4 ++-- docs/integrations/sources/s3.md | 3 ++- 8 files changed, 24 insertions(+), 14 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 44cb29fd919e..8f9b622fa068 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1486,7 +1486,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.27 + dockerImageTag: 0.1.28 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e2281281ee1d..6528f9b8c170 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -12511,7 +12511,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.27" +- dockerImage: "airbyte/source-s3:0.1.28" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3" @@ -12734,7 +12734,7 @@ \ from each file. If your data is particularly wide and failing\ \ during schema detection, increasing this should solve it. Beware\ \ of raising this too high as you could hit OOM errors." - default: 10000 + default: 0 order: 2 type: "integer" schema: diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index e97e475cc4a2..5483d35fd9f4 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.27 +LABEL io.airbyte.version=0.1.28 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json index 6198d09c452e..c51152a102b0 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -6,7 +6,7 @@ "aws_access_key_id": "123456", "aws_secret_access_key": "123456key", "path_prefix": "", - "endpoint": "http://10.0.210.197:9000" + "endpoint": "http://10.0.231.175:9000" }, "format": { "filetype": "csv", diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index 2fcd27e2bcf9..8aa6266bd205 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -198,7 +198,7 @@ "block_size": { "title": "Block Size", "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", - "default": 10000, + "default": 0, "order": 2, "type": "integer" } diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py index b6d408dc1e4f..54eb6b4c33f4 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py @@ -2,16 +2,19 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - +import logging from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union import pyarrow as pa +from pyarrow import ArrowNotImplementedError from pyarrow import json as pa_json from source_s3.source_files_abstract.file_info import FileInfo from .abstract_file_parser import AbstractFileParser from .jsonl_spec import JsonlFormat +logger = logging.getLogger("airbyte") + class JsonlParser(AbstractFileParser): TYPE_MAP = { @@ -50,8 +53,9 @@ def _read_options(self) -> Mapping[str, str]: """ https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html build ReadOptions object like: pa.json.ReadOptions(**self._read_options()) + Disable block size parameter if it set to 0. """ - return {**{"block_size": self.format.block_size, "use_threads": True}} + return {**{"block_size": self.format.block_size if self.format.block_size else None, "use_threads": True}} def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, str]: """ @@ -70,9 +74,14 @@ def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, return parse_options def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, Any] = None) -> pa.Table: - return pa_json.read_json( - file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema)) - ) + try: + return pa_json.read_json( + file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema)) + ) + except ArrowNotImplementedError as e: + message = "Possibly too small block size used. Please try to increase it or set to 0 disable this feature." + logger.warning(message) + raise ValueError(message) from e def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]: """ diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py index 6af676c9159e..98d9e44e8e2b 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py @@ -38,9 +38,9 @@ class Config: examples=["ignore", "infer", "error"], order=1, ) - + # Block size set to 0 as default value to disable this feature for most not-experienced users block_size: int = Field( - default=10000, + default=0, description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", order=2, ) diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 23bb3f120d1b..efdf8c1b2fee 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -205,10 +205,11 @@ The avro parser uses [fastavro](https://fastavro.readthedocs.io/en/latest/). Cur The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is supported.For more detailed info, please refer to the [docs] (https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html) -## Changelog +## Changelog 21210 | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| +| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format | | 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | | 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | | 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file |