Skip to content

Commit

Permalink
✨ [file based cdk] S3 legacy config adapter (airbytehq#29145)
Browse files Browse the repository at this point in the history
* s3 adapter

* pr feedback and updates after rebasing master

* add comment

* formatting
  • Loading branch information
brianjlai authored and harrytou committed Sep 1, 2023
1 parent face13a commit 76afaf7
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#

from .config import Config
from .legacy_config_transformer import LegacyConfigTransformer
from .source import SourceS3
from .stream_reader import SourceS3StreamReader

__all__ = ["Config", "SourceS3StreamReader"]
__all__ = ["Config", "LegacyConfigTransformer", "SourceS3", "SourceS3StreamReader"]
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@


class Config(AbstractFileBasedSpec):
"""
NOTE: When this Spec is changed, legacy_config_transformer.py must also be modified to uptake the changes
because it is responsible for converting legacy S3 v3 configs into v4 configs using the File-Based CDK.
"""

@classmethod
def documentation_url(cls) -> AnyUrl:
return AnyUrl("https://docs.airbyte.com/integrations/sources/s3", scheme="https")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from datetime import datetime
from typing import Any, List, Mapping

from source_s3.source import SourceS3Spec

SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


class LegacyConfigTransformer:
"""
Class that takes in S3 source configs in the legacy format and transforms them into
configs that can be used by the new S3 source built with the file-based CDK.
"""

@classmethod
def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]:
transformed_config = {
"bucket": legacy_config.provider.bucket,
"streams": [
{
"name": legacy_config.dataset,
"file_type": legacy_config.format.filetype,
"globs": cls._create_globs(legacy_config.path_pattern, legacy_config.provider.path_prefix),
"validation_policy": "Emit Record",
# todo: add formats on a per-type basis in follow up PRs
}
],
}

if legacy_config.provider.start_date:
transformed_config["start_date"] = cls._transform_seconds_to_micros(legacy_config.provider.start_date)
if legacy_config.provider.aws_access_key_id:
transformed_config["aws_access_key_id"] = legacy_config.provider.aws_access_key_id
if legacy_config.provider.aws_secret_access_key:
transformed_config["aws_secret_access_key"] = legacy_config.provider.aws_secret_access_key
if legacy_config.provider.endpoint:
transformed_config["endpoint"] = legacy_config.provider.endpoint
if legacy_config.user_schema and legacy_config.user_schema != "{}":
transformed_config["streams"][0]["input_schema"] = legacy_config.user_schema

return transformed_config

@classmethod
def _create_globs(cls, path_pattern: str, path_prefix: str) -> List[str]:
if path_prefix:
return [path_prefix + path_pattern]
return [path_pattern]

@classmethod
def _transform_seconds_to_micros(cls, datetime_str: str) -> str:
try:
parsed_datetime = datetime.strptime(datetime_str, SECONDS_FORMAT)
return datetime.strftime(parsed_datetime, MICROS_FORMAT)
except ValueError as e:
raise e
23 changes: 23 additions & 0 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping

from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from source_s3.source import SourceS3Spec
from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer


class SourceS3(FileBasedSource):
def read_config(self, config_path: str) -> Mapping[str, Any]:
"""
Used to override the default read_config so that when the new file-based S3 connector processes a config
in the legacy format, it can be transformed into the new config. This happens in entrypoint before we
validate the config against the new spec.
"""
config = super().read_config(config_path)
if not config.get("streams"):
parsed_legacy_config = SourceS3Spec(**config)
return LegacyConfigTransformer.convert(parsed_legacy_config)
return config
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import pytest
from source_s3.source import SourceS3Spec
from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer


@pytest.mark.parametrize(
"legacy_config, expected_config",
[
pytest.param(
{
"dataset": "test_data",
"provider": {
"storage": "S3",
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",
"endpoint": "https://external-s3.com",
"path_prefix": "a_folder/",
"start_date": "2022-01-01T01:02:03Z"
},
"format": {
"filetype": "csv",
"delimiter": "^",
"quote_char": "|",
"escape_char": "!",
"double_quote": True,
"quoting_behavior": "Quote All"
},
"path_pattern": "**/*.csv",
"schema": '{"col1": "string", "col2": "integer"}'
},
{
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",
"endpoint": "https://external-s3.com",
"start_date": "2022-01-01T01:02:03.000000Z",
"streams": [
{
"name": "test_data",
"file_type": "csv",
"globs": ["a_folder/**/*.csv"],
"validation_policy": "Emit Record",
"input_schema": '{"col1": "string", "col2": "integer"}',
}
]
}
, id="test_convert_legacy_config"
),
pytest.param(
{
"dataset": "test_data",
"provider": {
"storage": "S3",
"bucket": "test_bucket",
},
"format": {
"filetype": "avro",
},
"path_pattern": "**/*.csv",
},
{
"bucket": "test_bucket",
"streams": [
{
"name": "test_data",
"file_type": "avro",
"globs": ["**/*.csv"],
"validation_policy": "Emit Record",
}
]
}
, id="test_convert_no_optional_fields"
),
]
)
def test_convert_legacy_config(legacy_config, expected_config):
parsed_legacy_config = SourceS3Spec(**legacy_config)
actual_config = LegacyConfigTransformer.convert(parsed_legacy_config)

assert actual_config == expected_config
5 changes: 2 additions & 3 deletions airbyte-integrations/connectors/source-s3/v4_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import sys

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from source_s3.v4 import Config, SourceS3StreamReader
from source_s3.v4 import Config, SourceS3, SourceS3StreamReader

if __name__ == "__main__":
args = sys.argv[1:]
catalog_path = AirbyteEntrypoint.extract_catalog(args)
source = FileBasedSource(SourceS3StreamReader(), Config, catalog_path)
source = SourceS3(SourceS3StreamReader(), Config, catalog_path)
launch(source, args)

0 comments on commit 76afaf7

Please sign in to comment.