Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-based CDK + Source S3 (v4): Pass configured file encoding to stream reader #29110

Merged
merged 24 commits into from
Aug 9, 2023

Conversation

girarda
Copy link
Contributor

@girarda girarda commented Aug 4, 2023

What

Enable S3 v4 to read files with various encoding

How

  • CDK (csv): Pass the encoding field from the stream config to the stream reader
  • CDK (json): Pass "utf8" as the encoding to the stream reader
  • CDK (parquet and avro): Pass None as the encoding to the stream reader
  • S3 file reader: pass the encoding to smart_open.open

Recommended reading order

  1. airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_stream_reader.py
  2. airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py
  3. airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/avro_parser.py
  4. airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/jsonl_parser.py
  5. airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py
  6. airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit connectors/source/s3 labels Aug 4, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Aug 4, 2023

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@girarda girarda changed the title Add encoding to open_file interface File-based CDK + Source S3 (v4): Specify encoding Aug 4, 2023
@girarda girarda changed the title File-based CDK + Source S3 (v4): Specify encoding File-based CDK + Source S3 (v4): Pass configured file encoding to stream reader Aug 4, 2023
try:
params = {"client": self.s3_client}
except Exception as exc:
raise exc

logger.debug(f"try to open {file.uri}")
try:
result = smart_open.open(f"s3://{self.config.bucket}/{file.uri}", transport_params=params, mode=mode.value)
result = smart_open.open(f"s3://{self.config.bucket}/{file.uri}", transport_params=params, mode=mode.value, encoding=encoding)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a little tricky to test this until we have the adapter. I tested by merging the codebase with this branch to run the v4 source and reading s3://airbyte-acceptance-test-source-s3/csv_tests/csv_encoded_as_cp1252.csv, which is encoded as cp1252

@@ -95,7 +95,7 @@ def validate_quote_char(cls, v: str) -> str:

@validator("escape_char")
def validate_escape_char(cls, v: str) -> str:
if len(v) != 1:
if v is not None and len(v) != 1:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

escape_char is an optional field

@@ -16,6 +16,7 @@
class JsonlParser(FileTypeParser):

MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE = 1_000_000
ENCODING = "utf8"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encoding isn't configurable in legacy S3 source. We can move this to a config if needed

@@ -50,8 +53,8 @@ def parse_records(
) -> Iterable[Dict[str, Any]]:
parquet_format = config.format[config.file_type] if config.format else ParquetFormat()
if not isinstance(parquet_format, ParquetFormat):
raise ValueError(f"Expected ParquetFormat, got {parquet_format}") # FIXME test this branch!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._expected_encoding = expected_encoding

def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> io.IOBase:
assert encoding == self._expected_encoding
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a great test, but the actual decoding is done outside of the CDK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we defining a class MockFileBasedStreamReader while we could just "Mock(spec=AbstractFileBasedStreamReader)`? Having the mock would allow us to assert calls (and therefore validate arguments like we're doing with this assert

@girarda girarda marked this pull request as ready for review August 4, 2023 23:01
@girarda girarda requested a review from a team as a code owner August 4, 2023 23:01
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 8a7af7278c) - ❌

⏲️ Total pipeline duration: 02mn57s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 5fff44ba69) - ❌

⏲️ Total pipeline duration: 02mn45s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 474de6e105) - ❌

⏲️ Total pipeline duration: 17mn42s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit bf2d50e068) - ❌

⏲️ Total pipeline duration: 18mn25s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 19ce36dcd8) - ❌

⏲️ Total pipeline duration: 18mn15s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 85f8d8e66f) - ❌

⏲️ Total pipeline duration: 18mn20s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@girarda girarda requested review from maxi297 and clnoll August 7, 2023 18:45
Copy link
Contributor

@clnoll clnoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥇

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blocker as my comments only relates to testing

self._expected_encoding = expected_encoding

def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> io.IOBase:
assert encoding == self._expected_encoding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we defining a class MockFileBasedStreamReader while we could just "Mock(spec=AbstractFileBasedStreamReader)`? Having the mock would allow us to assert calls (and therefore validate arguments like we're doing with this assert

)
.set_stream_reader(TemporaryParquetFilesStreamReader(files=_single_parquet_file, file_type="parquet"))
.set_file_type("parquet")
.set_expected_read_error(ConfigValidationError, "Error creating stream config object. Contact Support if you need assistance.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is invalid because of the csv, right? Could we have a better error message? Also, could this be a unit test?

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit f0e56fe2a5) - ❌

⏲️ Total pipeline duration: 23mn00s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit b7aed85c08) - ❌

⏲️ Total pipeline duration: 22mn27s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 983b40e46c) - ❌

⏲️ Total pipeline duration: 20mn12s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit 78cf5736c6) - ❌

⏲️ Total pipeline duration: 20mn22s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

@girarda girarda enabled auto-merge (squash) August 9, 2023 05:40
@girarda girarda disabled auto-merge August 9, 2023 05:40
@girarda girarda enabled auto-merge (squash) August 9, 2023 14:01
@girarda
Copy link
Contributor Author

girarda commented Aug 9, 2023

/approve-and-merge reason="S3 pipeline will not pass because the connector version isn't incremented"

@octavia-approvington
Copy link
Contributor

This code is at peace with itself
imagine karate kid approving

@octavia-approvington octavia-approvington merged commit 0aa86cf into master Aug 9, 2023
@octavia-approvington octavia-approvington deleted the alex/file_based_encoding branch August 9, 2023 14:05
@octavia-squidington-iii
Copy link
Collaborator

source-s3 test report (commit ba6984c841) - ❌

⏲️ Total pipeline duration: 22mn15s

Step Result
Validate airbyte-integrations/connectors/source-s3/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install
Build source-s3 docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-s3 test

harrytou pushed a commit to KYVENetwork/airbyte that referenced this pull request Sep 1, 2023
…eam reader (airbytehq#29110)

* Add encoding to open_file interface

* pass the encoding set in the config

* cleanup

* cleanup

* Automated Commit - Formatting Changes

* Add missing test

* Automated Commit - Formatting Changes

* Update infer_schema too

* Automated Commit - Formatting Changes

* Update unit test

* add a unit test

* fix

* format

* format

* remove newline

* use a mock

* fix

* format

---------

Co-authored-by: girarda <girarda@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues CDK Connector Development Kit connectors/source/s3
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants