Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ [file based cdk] S3 legacy config adapter #29145

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#

from .config import Config
from .legacy_config_transformer import LegacyConfigTransformer
from .source import SourceS3
from .stream_reader import SourceS3StreamReader

__all__ = ["Config", "SourceS3StreamReader"]
__all__ = ["Config", "LegacyConfigTransformer", "SourceS3", "SourceS3StreamReader"]
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@


class Config(AbstractFileBasedSpec):
"""
NOTE: When this Spec is changed, legacy_config_transformer.py must also be modified to uptake the changes
because it is responsible for converting legacy S3 v3 configs into v4 configs using the File-Based CDK.
"""

@classmethod
def documentation_url(cls) -> AnyUrl:
return AnyUrl("https://docs.airbyte.com/integrations/sources/s3", scheme="https")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from datetime import datetime
from typing import Any, List, Mapping

from source_s3.source import SourceS3Spec

SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


class LegacyConfigTransformer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add a comment in the spec that this file needs to be updated if the spec changes? I'm not sure how long we'll need this transformer and there are PRs in the pipeline (eg #17334)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

"""
Class that takes in S3 source configs in the legacy format and transforms them into
configs that can be used by the new S3 source built with the file-based CDK.
"""

@classmethod
def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]:
transformed_config = {
"bucket": legacy_config.provider.bucket,
"streams": [
{
"name": legacy_config.dataset,
"file_type": legacy_config.format.filetype,
"globs": cls._create_globs(legacy_config.path_pattern, legacy_config.provider.path_prefix),
"validation_policy": "Emit Record",
# todo: add formats on a per-type basis in follow up PRs
}
],
}

if legacy_config.provider.start_date:
transformed_config["start_date"] = cls._transform_seconds_to_micros(legacy_config.provider.start_date)
if legacy_config.provider.aws_access_key_id:
transformed_config["aws_access_key_id"] = legacy_config.provider.aws_access_key_id
if legacy_config.provider.aws_secret_access_key:
transformed_config["aws_secret_access_key"] = legacy_config.provider.aws_secret_access_key
if legacy_config.provider.endpoint:
transformed_config["endpoint"] = legacy_config.provider.endpoint
if legacy_config.user_schema and legacy_config.user_schema != "{}":
transformed_config["streams"][0]["input_schema"] = legacy_config.user_schema

return transformed_config

@classmethod
def _create_globs(cls, path_pattern: str, path_prefix: str) -> List[str]:
if path_prefix:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also add trimming to the prefix/pattern to avoid extra / but I also didn't see any of that in the existing version so I kept it simpler

return [path_prefix + path_pattern]
return [path_pattern]

@classmethod
def _transform_seconds_to_micros(cls, datetime_str: str) -> str:
try:
parsed_datetime = datetime.strptime(datetime_str, SECONDS_FORMAT)
return datetime.strftime(parsed_datetime, MICROS_FORMAT)
except ValueError as e:
raise e
23 changes: 23 additions & 0 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping

from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from source_s3.source import SourceS3Spec
from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer


class SourceS3(FileBasedSource):
def read_config(self, config_path: str) -> Mapping[str, Any]:
"""
Used to override the default read_config so that when the new file-based S3 connector processes a config
in the legacy format, it can be transformed into the new config. This happens in entrypoint before we
validate the config against the new spec.
"""
config = super().read_config(config_path)
if not config.get("streams"):
parsed_legacy_config = SourceS3Spec(**config)
return LegacyConfigTransformer.convert(parsed_legacy_config)
return config
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import pytest
from source_s3.source import SourceS3Spec
from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer


@pytest.mark.parametrize(
"legacy_config, expected_config",
[
pytest.param(
{
"dataset": "test_data",
"provider": {
"storage": "S3",
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",
"endpoint": "https://external-s3.com",
"path_prefix": "a_folder/",
"start_date": "2022-01-01T01:02:03Z"

},
"format": {
"filetype": "csv",
"delimiter": "^",
"quote_char": "|",
"escape_char": "!",
"double_quote": True,
"quoting_behavior": "Quote All"
},
"path_pattern": "**/*.csv",
"schema": '{"col1": "string", "col2": "integer"}'
},
{
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",
"endpoint": "https://external-s3.com",
"start_date": "2022-01-01T01:02:03.000000Z",
"streams": [
{
"name": "test_data",
"file_type": "csv",
"globs": ["a_folder/**/*.csv"],
"validation_policy": "Emit Record",
"input_schema": '{"col1": "string", "col2": "integer"}',
}
]
}
, id="test_convert_legacy_config"
),
pytest.param(
{
"dataset": "test_data",
"provider": {
"storage": "S3",
"bucket": "test_bucket",
},
"format": {
"filetype": "avro",
},
"path_pattern": "**/*.csv",
},
{
"bucket": "test_bucket",
"streams": [
{
"name": "test_data",
"file_type": "avro",
"globs": ["**/*.csv"],
"validation_policy": "Emit Record",
}
]
}
, id="test_convert_no_optional_fields"
),
]
)
def test_convert_legacy_config(legacy_config, expected_config):
parsed_legacy_config = SourceS3Spec(**legacy_config)
actual_config = LegacyConfigTransformer.convert(parsed_legacy_config)

assert actual_config == expected_config
5 changes: 2 additions & 3 deletions airbyte-integrations/connectors/source-s3/v4_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import sys

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from source_s3.v4 import Config, SourceS3StreamReader
from source_s3.v4 import Config, SourceS3, SourceS3StreamReader

if __name__ == "__main__":
args = sys.argv[1:]
catalog_path = AirbyteEntrypoint.extract_catalog(args)
source = FileBasedSource(SourceS3StreamReader(), Config, catalog_path)
source = SourceS3(SourceS3StreamReader(), Config, catalog_path)
launch(source, args)