Skip to content

Commit

Permalink
Source S3: updates for compatibility with the concurrent CDK (#34591)
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll authored Feb 5, 2024
1 parent 5571fd0 commit 53d71f9
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,95 +4,95 @@ acceptance_tests:
- config_path: secrets/config.json
expect_records:
path: integration_tests/expected_records/csv.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/config_iam_role.json
expect_records:
path: integration_tests/expected_records/csv.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_custom_encoding_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_custom_encoding.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_custom_format_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_custom_format.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_user_schema_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_user_schema.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_no_header_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_no_header.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_skip_rows_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_skip_rows.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_skip_rows_no_header_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_skip_rows_no_header.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_with_nulls_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_with_nulls.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_with_null_bools_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_with_null_bools.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_parquet_config.json
expect_records:
path: integration_tests/expected_records/parquet.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/parquet_dataset_config.json
expect_records:
path: integration_tests/expected_records/parquet_dataset.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
Expand All @@ -107,71 +107,71 @@ acceptance_tests:
- config_path: secrets/v4_avro_config.json
expect_records:
path: integration_tests/expected_records/avro.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_jsonl_config.json
expect_records:
path: integration_tests/expected_records/jsonl.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_jsonl_newlines_config.json
expect_records:
path: integration_tests/expected_records/jsonl_newlines.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_csv.json
expect_records:
path: integration_tests/expected_records/zip_csv.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_csv_custom_encoding.json
expect_records:
path: integration_tests/expected_records/zip_csv_custom_encoding.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_jsonl.json
expect_records:
path: integration_tests/expected_records/zip_jsonl.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_avro.json
expect_records:
path: integration_tests/expected_records/zip_avro.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_parquet.json
expect_records:
path: integration_tests/expected_records/zip_parquet.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/unstructured_config.json
expect_records:
path: integration_tests/expected_records/unstructured.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800

connection:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 4.4.1
dockerImageTag: 4.5.0
dockerRepository: airbyte/source-s3
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
githubIssueLabel: source-s3
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk[file-based]==0.59.2", # temporarily pin until concurrency can be released
"airbyte-cdk[file-based]>=0.60.1",
"smart-open[s3]==5.1.0",
"wcmatch==8.4",
"dill==0.3.4",
Expand Down
11 changes: 10 additions & 1 deletion airbyte-integrations/connectors/source-s3/source_s3/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@

def get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
return SourceS3(
SourceS3StreamReader(),
Config,
SourceS3.read_catalog(catalog_path) if catalog_path else None,
SourceS3.read_config(config_path) if config_path else None,
SourceS3.read_state(state_path) if state_path else None,
cursor_cls=Cursor,
)
except Exception:
print(
AirbyteMessage(
Expand Down
12 changes: 8 additions & 4 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from airbyte_cdk.config_observation import emit_configuration_as_airbyte_control_message
from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from airbyte_cdk.sources.file_based.file_based_source import DEFAULT_CONCURRENCY, FileBasedSource
from airbyte_cdk.utils import is_cloud_environment
from source_s3.source import SourceS3Spec
from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer
Expand All @@ -21,14 +21,17 @@


class SourceS3(FileBasedSource):
def read_config(self, config_path: str) -> Mapping[str, Any]:
_concurrency_level = DEFAULT_CONCURRENCY

@classmethod
def read_config(cls, config_path: str) -> Mapping[str, Any]:
"""
Used to override the default read_config so that when the new file-based S3 connector processes a config
in the legacy format, it can be transformed into the new config. This happens in entrypoint before we
validate the config against the new spec.
"""
config = super().read_config(config_path)
if not self._is_v4_config(config):
if not SourceS3._is_v4_config(config):
parsed_legacy_config = SourceS3Spec(**config)
converted_config = LegacyConfigTransformer.convert(parsed_legacy_config)
emit_configuration_as_airbyte_control_message(converted_config)
Expand Down Expand Up @@ -66,7 +69,8 @@ def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
connectionSpecification=s4_spec,
)

def _is_v4_config(self, config: Mapping[str, Any]) -> bool:
@staticmethod
def _is_v4_config(config: Mapping[str, Any]) -> bool:
return "streams" in config

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
class SourceTest(unittest.TestCase):
def setUp(self) -> None:
self._stream_reader = Mock(spec=SourceS3StreamReader)
self._source = SourceS3(self._stream_reader, Config, str(TEST_FILES_FOLDER.joinpath("catalog.json")))
self._source = SourceS3(
self._stream_reader,
Config,
SourceS3.read_catalog(str(TEST_FILES_FOLDER.joinpath("catalog.json"))),
SourceS3.read_config(str(TEST_FILES_FOLDER.joinpath("v3_config.json"))),
None,
)

@patch("source_s3.v4.source.emit_configuration_as_airbyte_control_message")
def test_given_config_is_v3_when_read_config_then_emit_new_config(self, emit_config_mock) -> None:
Expand Down
11 changes: 10 additions & 1 deletion airbyte-integrations/connectors/source-s3/v4_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@

def get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
return SourceS3(
SourceS3StreamReader(),
Config,
SourceS3.read_catalog(catalog_path) if catalog_path else None,
SourceS3.read_config(config_path) if config_path else None,
SourceS3.read_state(state_path) if state_path else None,
cursor_cls=Cursor,
)
except Exception:
print(
AirbyteMessage(
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ To perform the text extraction from PDF and Docx files, the connector uses the [

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
| 4.5.0 | 2024-02-01 | [34591](https://github.com/airbytehq/airbyte/pull/34591) | Run full refresh syncs concurrently |
| 4.4.1 | 2024-01-30 | [34665](https://github.com/airbytehq/airbyte/pull/34665) | Pin moto & CDK version |
| 4.4.0 | 2024-01-12 | [33818](https://github.com/airbytehq/airbyte/pull/33818) | Add IAM Role Authentication |
| 4.4.0 | 2023-01-12 | [33818](https://github.com/airbytehq/airbyte/pull/33818) | Add IAM Role Authentication |
| 4.3.1 | 2024-01-04 | [33937](https://github.com/airbytehq/airbyte/pull/33937) | Prepare for airbyte-lib |
| 4.3.0 | 2023-12-14 | [33411](https://github.com/airbytehq/airbyte/pull/33411) | Bump CDK version to auto-set primary key for document file streams and support raw txt files |
| 4.2.4 | 2023-12-06 | [33187](https://github.com/airbytehq/airbyte/pull/33187) | Bump CDK version to hide source-defined primary key |
Expand Down

0 comments on commit 53d71f9

Please sign in to comment.