Skip to content

Commit

Permalink
Source S3: update block size for json (#21210)
Browse files Browse the repository at this point in the history
* Source S3: update block size for json

* Source S3: update docs

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
artem1205 and octavia-squidington-iii authored Jan 10, 2023
1 parent 2ea6612 commit 31edbd8
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1486,7 +1486,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.27
dockerImageTag: 0.1.28
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12511,7 +12511,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.27"
- dockerImage: "airbyte/source-s3:0.1.28"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down Expand Up @@ -12734,7 +12734,7 @@
\ from each file. If your data is particularly wide and failing\
\ during schema detection, increasing this should solve it. Beware\
\ of raising this too high as you could hit OOM errors."
default: 10000
default: 0
order: 2
type: "integer"
schema:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.210.197:9000"
"endpoint": "http://10.0.231.175:9000"
},
"format": {
"filetype": "csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
"block_size": {
"title": "Block Size",
"description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
"default": 10000,
"default": 0,
"order": 2,
"type": "integer"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import logging
from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union

import pyarrow as pa
from pyarrow import ArrowNotImplementedError
from pyarrow import json as pa_json
from source_s3.source_files_abstract.file_info import FileInfo

from .abstract_file_parser import AbstractFileParser
from .jsonl_spec import JsonlFormat

logger = logging.getLogger("airbyte")


class JsonlParser(AbstractFileParser):
TYPE_MAP = {
Expand Down Expand Up @@ -50,8 +53,9 @@ def _read_options(self) -> Mapping[str, str]:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html
build ReadOptions object like: pa.json.ReadOptions(**self._read_options())
Disable block size parameter if it set to 0.
"""
return {**{"block_size": self.format.block_size, "use_threads": True}}
return {**{"block_size": self.format.block_size if self.format.block_size else None, "use_threads": True}}

def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, str]:
"""
Expand All @@ -70,9 +74,14 @@ def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str,
return parse_options

def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, Any] = None) -> pa.Table:
return pa_json.read_json(
file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema))
)
try:
return pa_json.read_json(
file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema))
)
except ArrowNotImplementedError as e:
message = "Possibly too small block size used. Please try to increase it or set to 0 disable this feature."
logger.warning(message)
raise ValueError(message) from e

def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ class Config:
examples=["ignore", "infer", "error"],
order=1,
)

# Block size set to 0 as default value to disable this feature for most not-experienced users
block_size: int = Field(
default=10000,
default=0,
description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
order=2,
)
3 changes: 2 additions & 1 deletion docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,11 @@ The avro parser uses [fastavro](https://fastavro.readthedocs.io/en/latest/). Cur
The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is supported.For more detailed info, please refer to the [docs] (https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html)
## Changelog
## Changelog 21210
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format |
| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format |
| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |
| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file |
Expand Down

0 comments on commit 31edbd8

Please sign in to comment.