From e729abcfbb6d45dcf0929e367fcfc3f0aee6bafe Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Wed, 8 Feb 2023 11:34:47 +0200 Subject: [PATCH 1/4] #1467 source S3: validate CSV read options and convert options --- .../connectors/source-s3/Dockerfile | 2 +- .../formats/csv_parser.py | 65 +++++++++++++++---- .../source-s3/unit_tests/test_source.py | 25 ++++--- docs/integrations/sources/s3.md | 63 +++++++++--------- 4 files changed, 100 insertions(+), 55 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 0e03ad049bbc..2a9757d5a0d5 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.30 +LABEL io.airbyte.version=0.1.31 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py index adb87b467220..ec49a5553884 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py @@ -11,6 +11,8 @@ import pyarrow import pyarrow as pa import six # type: ignore[import] +from airbyte_cdk.models import FailureType +from airbyte_cdk.utils.traced_exception import AirbyteTracedException from pyarrow import csv as pa_csv from source_s3.exceptions import S3Exception from source_s3.source_files_abstract.file_info import FileInfo @@ -51,22 +53,57 @@ def format(self) -> CsvFormat: self.format_model = CsvFormat.parse_obj(self._format) return self.format_model - def _validate_field_len(self, config: Mapping[str, Any], field_name: str): - if len(config.get("format", {}).get(field_name)) != 1: - raise ValueError(f"{field_name} should contain 1 character only") + @staticmethod + def _validate_field( + format_: Mapping[str, Any], field_name: str, allow_empty: bool = False, disallow_values: Optional[Tuple[Any, ...]] = None + ) -> Optional[str]: + disallow_values = disallow_values or () + field_value = format_.get(field_name) + if not field_value and allow_empty: + return + if len(format_.get(field_name)) != 1: + return f"{field_name} should contain 1 character only" + if field_value in disallow_values: + return f"{field_name} can not be {field_value}" + + @staticmethod + def _validate_read_options(format_: Mapping[str, Any]) -> Optional[str]: + options = format_.get("advanced_options", "{}") + try: + options = json.loads(options) + pa.csv.ReadOptions(**options) + except json.decoder.JSONDecodeError: + return "Malformed advanced read options!" + except TypeError as e: + return f"One or more read options are invalid: {str(e)}" + + @staticmethod + def _validate_convert_options(format_: Mapping[str, Any]) -> Optional[str]: + options = format_.get("additional_reader_options", "{}") + try: + options = json.loads(options) + pa.csv.ConvertOptions(**options) + except json.decoder.JSONDecodeError: + return "Malformed advanced read options!" + except TypeError as e: + return f"One or more read options are invalid: {str(e)}" def _validate_config(self, config: Mapping[str, Any]): - if config.get("format", {}).get("filetype") == "csv": - self._validate_field_len(config, "delimiter") - if config.get("format", {}).get("delimiter") in ("\r", "\n"): - raise ValueError("Delimiter cannot be \r or \n") - - self._validate_field_len(config, "quote_char") - - if config.get("format", {}).get("escape_char"): - self._validate_field_len(config, "escape_char") - - codecs.lookup(config.get("format", {}).get("encoding")) + format_ = config.get("format", {}) + for error_message in ( + self._validate_field(format_, "delimiter", disallow_values=("\r", "\n")), + self._validate_field(format_, "quote_char"), + self._validate_field(format_, "escape_char", allow_empty=True), + self._validate_read_options(format_), + self._validate_convert_options(format_), + ): + if error_message: + raise AirbyteTracedException(error_message, error_message, failure_type=FailureType.config_error) + + try: + codecs.lookup(format_.get("encoding")) + except LookupError: + raise AirbyteTracedException(error_message, error_message, failure_type=FailureType.config_error) def _read_options(self) -> Mapping[str, str]: """ diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py index 680d303e0a36..98588ad9e08e 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py @@ -8,6 +8,7 @@ import pytest from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ConnectorSpecification +from airbyte_cdk.utils.traced_exception import AirbyteTracedException from source_s3 import SourceS3 from source_s3.source_files_abstract.spec import SourceFilesAbstractSpec @@ -39,23 +40,27 @@ def test_check_connection_exception(config): @pytest.mark.parametrize( - "delimiter, quote_char, escape_char, encoding, error_type", + "delimiter, quote_char, escape_char, encoding, read_options, convert_options", [ - ("string", "'", None, "utf8", ValueError), - ("\n", "'", None, "utf8", ValueError), - (",", ";,", None, "utf8", ValueError), - (",", "'", "escape", "utf8", ValueError), - (",", "'", None, "utf888", LookupError) + ("string", "'", None, "utf8", "{}", "{}"), + ("\n", "'", None, "utf8", "{}", "{}"), + (",", ";,", None, "utf8", "{}", "{}"), + (",", "'", "escape", "utf8", "{}", "{}"), + (",", "'", None, "utf888", "{}", "{}"), + (",", "'", None, "utf8", "{'compression': true}", "{}"), + (",", "'", None, "utf8", "{}", "{'compression: true}"), ], ids=[ "long_delimiter", "forbidden_delimiter_symbol", "long_quote_char", "long_escape_char", - "unknown_encoding" + "unknown_encoding", + "invalid read options", + "invalid convert options" ], ) -def test_check_connection_csv_validation_exception(delimiter, quote_char, escape_char, encoding, error_type): +def test_check_connection_csv_validation_exception(delimiter, quote_char, escape_char, encoding, read_options, convert_options): config = { "dataset": "test", "provider": { @@ -73,13 +78,15 @@ def test_check_connection_csv_validation_exception(delimiter, quote_char, escape "quote_char": quote_char, "escape_char": escape_char, "encoding": encoding, + "advanced_options": read_options, + "additional_reader_options": convert_options } } ok, error_msg = SourceS3().check_connection(logger, config=config) assert not ok assert error_msg - assert isinstance(error_msg, error_type) + assert isinstance(error_msg, AirbyteTracedException) def test_check_connection(config): diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index cdd3333b720c..51ac0d72b18a 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -207,35 +207,36 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo ## Changelog 21210 -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| -| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------| +| 0.1.31 | 2023-02-08 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Validate CSV read options and convert options | +| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI | | 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer | -| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format | -| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | -| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | -| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file | -| 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays | -| 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` | -| 0.1.22 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state | -| 0.1.21 | 2022-09-20 | [16921](https://github.com/airbytehq/airbyte/pull/16921) | Upgrade pyarrow | -| 0.1.20 | 2022-09-12 | [16607](https://github.com/airbytehq/airbyte/pull/16607) | Fix for reading jsonl files containing nested structures | -| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas | -| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. | -| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet | -| 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema | -| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs | -| 0.1.14 | 2022-05-23 | [11967](https://github.com/airbytehq/airbyte/pull/11967) | Increase unit test coverage up to 90% | -| 0.1.13 | 2022-05-11 | [12730](https://github.com/airbytehq/airbyte/pull/12730) | Fixed empty options issue | -| 0.1.12 | 2022-05-11 | [12602](https://github.com/airbytehq/airbyte/pull/12602) | Added support for Avro file format | -| 0.1.11 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy | -| 0.1.10 | 2022-01-28 | [8252](https://github.com/airbytehq/airbyte/pull/8252) | Refactoring of files' metadata | -| 0.1.9 | 2022-01-06 | [9163](https://github.com/airbytehq/airbyte/pull/9163) | Work-around for web-UI, `backslash - t` converts to `tab` for `format.delimiter` field. | -| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | -| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) & [7058](https://github.com/airbytehq/airbyte/pull/7058) | Memory and performance optimisation. Advanced options for CSV parsing. | -| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | -| 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | -| 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference | -| 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly | -| 0.1.1 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990/commits/ff5f70662c5f84eabc03526cddfcc9d73c58c0f4) | Fixed documentation url in source definition | -| 0.1.0 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990) | Created S3 source connector | +| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format | +| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | +| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | +| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file | +| 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays | +| 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` | +| 0.1.22 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state | +| 0.1.21 | 2022-09-20 | [16921](https://github.com/airbytehq/airbyte/pull/16921) | Upgrade pyarrow | +| 0.1.20 | 2022-09-12 | [16607](https://github.com/airbytehq/airbyte/pull/16607) | Fix for reading jsonl files containing nested structures | +| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas | +| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. | +| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet | +| 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema | +| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs | +| 0.1.14 | 2022-05-23 | [11967](https://github.com/airbytehq/airbyte/pull/11967) | Increase unit test coverage up to 90% | +| 0.1.13 | 2022-05-11 | [12730](https://github.com/airbytehq/airbyte/pull/12730) | Fixed empty options issue | +| 0.1.12 | 2022-05-11 | [12602](https://github.com/airbytehq/airbyte/pull/12602) | Added support for Avro file format | +| 0.1.11 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy | +| 0.1.10 | 2022-01-28 | [8252](https://github.com/airbytehq/airbyte/pull/8252) | Refactoring of files' metadata | +| 0.1.9 | 2022-01-06 | [9163](https://github.com/airbytehq/airbyte/pull/9163) | Work-around for web-UI, `backslash - t` converts to `tab` for `format.delimiter` field. | +| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | +| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) & [7058](https://github.com/airbytehq/airbyte/pull/7058) | Memory and performance optimisation. Advanced options for CSV parsing. | +| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | +| 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | +| 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference | +| 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly | +| 0.1.1 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990/commits/ff5f70662c5f84eabc03526cddfcc9d73c58c0f4) | Fixed documentation url in source definition | +| 0.1.0 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990) | Created S3 source connector | From c378bf98583175200bbfefb42d47d6ac7a9a82c6 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Wed, 8 Feb 2023 11:36:39 +0200 Subject: [PATCH 2/4] #1467 source S3: upd changelog --- docs/integrations/sources/s3.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 51ac0d72b18a..57747c1ae969 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -209,7 +209,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------| -| 0.1.31 | 2023-02-08 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Validate CSV read options and convert options | +| 0.1.31 | 2023-02-08 | [22550](https://github.com/airbytehq/airbyte/pull/22550) | Validate CSV read options and convert options | | 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI | | 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer | | 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format | From 4fcf946e248e5f6ab9521c0e37d73bb198deffaf Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Thu, 9 Feb 2023 10:33:36 +0200 Subject: [PATCH 3/4] #1467 source s3: review fixes --- .../formats/csv_parser.py | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py index 1b03aded3c1b..ac76eecfe64d 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py @@ -61,32 +61,29 @@ def _validate_field( field_value = format_.get(field_name) if not field_value and allow_empty: return - if len(format_.get(field_name)) != 1: + if len(field_value) != 1: return f"{field_name} should contain 1 character only" if field_value in disallow_values: return f"{field_name} can not be {field_value}" - @staticmethod - def _validate_read_options(format_: Mapping[str, Any]) -> Optional[str]: - options = format_.get("advanced_options", "{}") + @classmethod + def _validate_options(cls, validator: Callable, options_name: str, format_: Mapping[str, Any]) -> Optional[str]: + options = format_.get(options_name, "{}") try: options = json.loads(options) - pa.csv.ReadOptions(**options) + validator(**options) except json.decoder.JSONDecodeError: return "Malformed advanced read options!" except TypeError as e: return f"One or more read options are invalid: {str(e)}" - @staticmethod - def _validate_convert_options(format_: Mapping[str, Any]) -> Optional[str]: - options = format_.get("additional_reader_options", "{}") - try: - options = json.loads(options) - pa.csv.ConvertOptions(**options) - except json.decoder.JSONDecodeError: - return "Malformed advanced read options!" - except TypeError as e: - return f"One or more read options are invalid: {str(e)}" + @classmethod + def _validate_read_options(cls, format_: Mapping[str, Any]) -> Optional[str]: + return cls._validate_options(pa.csv.ReadOptions, "advanced_options", format_) + + @classmethod + def _validate_convert_options(cls, format_: Mapping[str, Any]) -> Optional[str]: + return cls._validate_options(pa.csv.ConvertOptions, "additional_reader_options", format_) def _validate_config(self, config: Mapping[str, Any]): format_ = config.get("format", {}) From 0beffbf640d11098e6bcce9861813112450642ef Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Thu, 9 Feb 2023 15:00:21 +0000 Subject: [PATCH 4/4] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 42d71e5909a1..f74892cc8fa2 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1549,7 +1549,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.30 + dockerImageTag: 0.1.31 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e3a0db465242..c98c4e5a6410 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -12775,7 +12775,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.30" +- dockerImage: "airbyte/source-s3:0.1.31" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"