Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source S3: update block size for json #21210

Merged
merged 3 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.27
dockerImageTag: 0.1.28
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12563,7 +12563,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.27"
- dockerImage: "airbyte/source-s3:0.1.28"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down Expand Up @@ -12786,7 +12786,7 @@
\ from each file. If your data is particularly wide and failing\
\ during schema detection, increasing this should solve it. Beware\
\ of raising this too high as you could hit OOM errors."
default: 10000
default: 0
order: 2
type: "integer"
schema:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.210.197:9000"
"endpoint": "http://10.0.231.175:9000"
},
"format": {
"filetype": "csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
"block_size": {
"title": "Block Size",
"description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
"default": 10000,
"default": 0,
"order": 2,
"type": "integer"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import logging
from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union

import pyarrow as pa
from pyarrow import ArrowNotImplementedError
from pyarrow import json as pa_json
from source_s3.source_files_abstract.file_info import FileInfo

from .abstract_file_parser import AbstractFileParser
from .jsonl_spec import JsonlFormat

logger = logging.getLogger("airbyte")


class JsonlParser(AbstractFileParser):
TYPE_MAP = {
Expand Down Expand Up @@ -50,8 +53,9 @@ def _read_options(self) -> Mapping[str, str]:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html
build ReadOptions object like: pa.json.ReadOptions(**self._read_options())
Disable block size parameter if it set to 0.
"""
return {**{"block_size": self.format.block_size, "use_threads": True}}
return {**{"block_size": self.format.block_size if self.format.block_size else None, "use_threads": True}}

def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, str]:
"""
Expand All @@ -70,9 +74,14 @@ def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str,
return parse_options

def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, Any] = None) -> pa.Table:
return pa_json.read_json(
file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema))
)
try:
return pa_json.read_json(
file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema))
)
except ArrowNotImplementedError as e:
message = "Possibly too small block size used. Please try to increase it or set to 0 disable this feature."
logger.warning(message)
raise ValueError(message) from e

def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ class Config:
examples=["ignore", "infer", "error"],
order=1,
)

# Block size set to 0 as default value to disable this feature for most not-experienced users
block_size: int = Field(
default=10000,
default=0,
description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
order=2,
)
3 changes: 2 additions & 1 deletion docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,11 @@ The avro parser uses [fastavro](https://fastavro.readthedocs.io/en/latest/). Cur

The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is supported.For more detailed info, please refer to the [docs] (https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html)

## Changelog
## Changelog 21210

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format |
| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format |
| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |
| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file |
Expand Down