Skip to content

Commit

Permalink
follow up to #36294: allow migrate sub stream state with custom parti…
Browse files Browse the repository at this point in the history
…tion router (#36590)
  • Loading branch information
lazebnyi authored Mar 28, 2024
1 parent 23ecdb0 commit c3c87ea
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor, SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig


def _is_already_migrated(stream_state: Mapping[str, Any]) -> bool:
Expand Down Expand Up @@ -40,10 +41,22 @@ def __init__(
self._config = config
self._parameters = parameters
self._partition_key_field = InterpolatedString.create(
self._partition_router.parent_stream_configs[0].parent_key, parameters=self._parameters
self._get_parent_key(self._partition_router), parameters=self._parameters
).eval(self._config)
self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config)

def _get_parent_key(self, partition_router: SubstreamPartitionRouter) -> str:
parent_stream_config = partition_router.parent_stream_configs[0]

# Retrieve the parent key with a condition, as properties are returned as a dictionary for custom components.
parent_key = (
parent_stream_config.parent_key
if isinstance(parent_stream_config, ParentStreamConfig)
else parent_stream_config.get("parent_key")
)

return parent_key

def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
if _is_already_migrated(stream_state):
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,20 @@ def create_legacy_to_per_partition_state_migration(
config: Mapping[str, Any],
declarative_stream: DeclarativeStreamModel,
) -> LegacyToPerPartitionStateMigration:
if not isinstance(declarative_stream.retriever, SimpleRetrieverModel):
retriever = declarative_stream.retriever
partition_router = retriever.partition_router

if not isinstance(retriever, SimpleRetrieverModel):
raise ValueError(
f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got {type(declarative_stream.retriever)}"
f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got {type(retriever)}"
)
if not isinstance(declarative_stream.retriever.partition_router, SubstreamPartitionRouterModel):
if not isinstance(partition_router, (SubstreamPartitionRouterModel, CustomPartitionRouterModel)):
raise ValueError(
f"LegacyToPerPartitionStateMigrations can only be applied on a SimpleRetriever with a Substream partition router. Got {type(declarative_stream.retriever.partition_router)}"
f"LegacyToPerPartitionStateMigrations can only be applied on a SimpleRetriever with a Substream partition router. Got {type(partition_router)}"
)
if not hasattr(partition_router, "parent_stream_configs"):
raise ValueError("LegacyToPerPartitionStateMigrations can only be applied with a parent stream configuration.")

return LegacyToPerPartitionStateMigration(declarative_stream.retriever.partition_router, declarative_stream.incremental_sync, config, declarative_stream.parameters) # type: ignore # The retriever type was already checked

def create_session_token_authenticator(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from unittest.mock import MagicMock

import pytest
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
from airbyte_cdk.sources.declarative.models import (
CustomRetriever,
DatetimeBasedCursor,
DeclarativeStream,
ParentStreamConfig,
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.models import CustomPartitionRouter, CustomRetriever, DatetimeBasedCursor, DeclarativeStream
from airbyte_cdk.sources.declarative.models import LegacyToPerPartitionStateMigration as LegacyToPerPartitionStateMigrationModel
from airbyte_cdk.sources.declarative.models import ParentStreamConfig, SimpleRetriever, SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ManifestComponentTransformer
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource

factory = ModelToComponentFactory()

resolver = ManifestReferenceResolver()

transformer = ManifestComponentTransformer()


def test_migrate_a_valid_legacy_state_to_per_partition():
Expand Down Expand Up @@ -241,3 +250,46 @@ def _migrator_with_multiple_parent_streams():
config = {}
parameters = {}
return LegacyToPerPartitionStateMigration(partition_router, cursor, config, parameters)


@pytest.mark.parametrize(
"retriever_type, partition_router_class, is_parent_stream_config, expected_exception, expected_error_message",
[
(SimpleRetriever, CustomPartitionRouter, True, None, None),
(None, CustomPartitionRouter, True, ValueError, "LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got <class 'unittest.mock.MagicMock'>"),
(SimpleRetriever, None, False, ValueError, "LegacyToPerPartitionStateMigrations can only be applied on a SimpleRetriever with a Substream partition router. Got <class 'NoneType'>"),
(SimpleRetriever, CustomPartitionRouter, False, ValueError, "LegacyToPerPartitionStateMigrations can only be applied with a parent stream configuration."),
]
)
def test_create_legacy_to_per_partition_state_migration(
retriever_type,
partition_router_class,
is_parent_stream_config,
expected_exception,
expected_error_message,
):
partition_router = partition_router_class(type="CustomPartitionRouter", class_name="a_class_namer") if partition_router_class else None

stream = MagicMock()
stream.retriever = MagicMock(spec=retriever_type)
stream.retriever.partition_router = partition_router

content = """
state_migrations:
- type: LegacyToPerPartitionStateMigration
"""

resolved_manifest = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
state_migrations_manifest = transformer.propagate_types_and_parameters("", resolved_manifest["state_migrations"][0], {})

if is_parent_stream_config:
parent_stream_config = ParentStreamConfig(type="ParentStreamConfig", parent_key="id", partition_field="parent_id", stream=DeclarativeStream(type="DeclarativeStream", retriever=CustomRetriever(type="CustomRetriever", class_name="a_class_name")))
partition_router.parent_stream_configs = [parent_stream_config]

if expected_exception:
with pytest.raises(expected_exception) as excinfo:
factory.create_component(model_type=LegacyToPerPartitionStateMigrationModel, component_definition=state_migrations_manifest, config={}, declarative_stream=stream)
assert str(excinfo.value) == expected_error_message
else:
migration_instance = factory.create_component(model_type=LegacyToPerPartitionStateMigrationModel, component_definition=state_migrations_manifest, config={}, declarative_stream=stream)
assert migration_instance is not None

0 comments on commit c3c87ea

Please sign in to comment.