From 662ea2285a4898d4f5cc47007f856cd573aa96f3 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Mon, 1 Apr 2024 13:47:42 +0200 Subject: [PATCH 1/4] Change legacy state migration to partition key --- .../migrations/legacy_to_per_partition_state_migration.py | 6 +++--- .../migrations/test_legacy_to_per_partition_migration.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py index 6420b338e22f..5ebe96e24334 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py @@ -49,13 +49,13 @@ def _get_parent_key(self, partition_router: SubstreamPartitionRouter) -> str: parent_stream_config = partition_router.parent_stream_configs[0] # Retrieve the parent key with a condition, as properties are returned as a dictionary for custom components. - parent_key = ( - parent_stream_config.parent_key + partition_field = ( + parent_stream_config.partition_field if isinstance(parent_stream_config, ParentStreamConfig) else parent_stream_config.get("parent_key") ) - return parent_key + return partition_field def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: if _is_already_migrated(stream_state): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py b/airbyte-cdk/python/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py index 768b94b5e5b9..7fce15031ee1 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py @@ -38,11 +38,11 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): expected_state = { "states": [ { - "partition": {"id": "13506132"}, + "partition": {"parent_id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, { - "partition": {"id": "14351124"}, + "partition": {"parent_id": "14351124"}, "cursor": {"last_changed": "2022-12-27T08:35:39+00:00"} }, ] From f2668906e06d51542c7c1ac287b7193069b87bf2 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Mon, 1 Apr 2024 13:50:45 +0200 Subject: [PATCH 2/4] Update key name to partition_field in legacy migration --- .../migrations/legacy_to_per_partition_state_migration.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py index 5ebe96e24334..11d33d0f4cf3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py @@ -41,18 +41,18 @@ def __init__( self._config = config self._parameters = parameters self._partition_key_field = InterpolatedString.create( - self._get_parent_key(self._partition_router), parameters=self._parameters + self._get_partition_field(self._partition_router), parameters=self._parameters ).eval(self._config) self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config) - def _get_parent_key(self, partition_router: SubstreamPartitionRouter) -> str: + def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str: parent_stream_config = partition_router.parent_stream_configs[0] - # Retrieve the parent key with a condition, as properties are returned as a dictionary for custom components. + # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components. partition_field = ( parent_stream_config.partition_field if isinstance(parent_stream_config, ParentStreamConfig) - else parent_stream_config.get("parent_key") + else parent_stream_config.get("partition_field") ) return partition_field From c062d0c1439ad0f1704f96301966bc0f195cbeda Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 1 Apr 2024 07:04:32 -0700 Subject: [PATCH 3/4] self-heal buggy migrations --- ...legacy_to_per_partition_state_migration.py | 62 +++++++++++++++++-- .../test_legacy_to_per_partition_migration.py | 62 ++++++++++++------- 2 files changed, 98 insertions(+), 26 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py index 11d33d0f4cf3..f6dbc810859d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py @@ -1,6 +1,6 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -from typing import Any, Mapping +from typing import Any, List, Mapping from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration @@ -23,10 +23,24 @@ class LegacyToPerPartitionStateMigration(StateMigration): "last_changed": "2022-12-27T08:34:39+00:00" } Example output state: + { + "partition": {"parent_id": "13506132"}, + "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} + } + + Due to a bug in the original migration, this class also corrects a per_partition state that erroneously uses the parent_key + as the partition key instead of the partition_key_field + Example output state: { "partition": {"id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} } + + Example output state: + { + "partition": {"parent_id": "13506132"}, + "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} + } """ def __init__( @@ -43,6 +57,9 @@ def __init__( self._partition_key_field = InterpolatedString.create( self._get_partition_field(self._partition_router), parameters=self._parameters ).eval(self._config) + self._parent_key = InterpolatedString.create(self._get_parent_key(self._partition_router), parameters=self._parameters).eval( + self._config + ) self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config) def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str: @@ -57,9 +74,32 @@ def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> st return partition_field + def _get_parent_key(self, partition_router: SubstreamPartitionRouter) -> str: + parent_stream_config = partition_router.parent_stream_configs[0] + + # Retrieve the parent key with a condition, as properties are returned as a dictionary for custom components. + parent_key = ( + parent_stream_config.parent_key + if isinstance(parent_stream_config, ParentStreamConfig) + else parent_stream_config.get("parent_key") + ) + + return parent_key + def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: - if _is_already_migrated(stream_state): - return False + if "states" in stream_state: + states = stream_state["states"] + if not states: + # If states is empty, then the state is in the right format, but it is empty. No need to migrate + return False + first_state_message = stream_state["states"][0] + # If the state objects already have the expected partition_key_field, no need to migrate + if self._partition_key_field in first_state_message.get("partition"): + return False + # If the state objects have the parent_key field, we might need to migrate due to a bug in the original implementation + # see https://github.com/airbytehq/airbyte/pull/36719 for more details + elif not self._parent_key in first_state_message.get("partition"): + return False # There is exactly one parent stream number_of_parent_streams = len(self._partition_router.parent_stream_configs) @@ -85,5 +125,19 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: return True def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: - states = [{"partition": {self._partition_key_field: key}, "cursor": value} for key, value in stream_state.items()] + if "states" in stream_state: + states = self._create_states_from_bugged_migration(stream_state) + else: + states = self._create_states_from_legacy(stream_state) return {"states": states} + + def _create_states_from_legacy(self, stream_state: Mapping[str, Any]) -> List[Mapping[str, Any]]: + return [{"partition": {self._partition_key_field: key}, "cursor": value} for key, value in stream_state.items()] + + def _create_states_from_bugged_migration(self, stream_state: Mapping[str, Any]) -> List[Mapping[str, Any]]: + # If the state objects have the parent_key field, we might need to migrate due to a bug in the original implementation + # see https://github.com/airbytehq/airbyte/pull/36719 for more details + return [ + {"partition": {self._partition_key_field: state.get("partition").get(self._parent_key)}, "cursor": state.get("cursor")} + for state in stream_state["states"] + ] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py b/airbyte-cdk/python/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py index 7fce15031ee1..82715354521b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py @@ -21,16 +21,34 @@ transformer = ManifestComponentTransformer() -def test_migrate_a_valid_legacy_state_to_per_partition(): - input_state = { - "13506132": { - "last_changed": "2022-12-27T08:34:39+00:00" - }, - "14351124": { - "last_changed": "2022-12-27T08:35:39+00:00" - }, - } - +@pytest.mark.parametrize( + "input_state", [ + pytest.param({ + "13506132": { + "last_changed": "2022-12-27T08:34:39+00:00" + }, + "14351124": { + "last_changed": "2022-12-27T08:35:39+00:00" + }, + }, id="migrate_a_valid_legacy_state_to_per_partition"), + pytest.param( + { + "states": [ + { + "partition": {"id": "13506132"}, + "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} + }, + { + "partition": {"id": "14351124"}, + "cursor": {"last_changed": "2022-12-27T08:35:39+00:00"} + }, + ] + }, + id="migrate_per_partition_states_using_parent_key_instead_of_partition_field" + ) + ] +) +def test_migrate_a_valid_legacy_state_to_per_partition(input_state): migrator = _migrator() assert migrator.should_migrate(input_state) @@ -56,11 +74,11 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): pytest.param({ "states": [ { - "partition": {"id": "13506132"}, + "partition": {"paren_id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, { - "partition": {"id": "14351124"}, + "partition": {"paren_id": "14351124"}, "cursor": {"last_changed": "2022-12-27T08:35:39+00:00"} }, ] @@ -68,22 +86,22 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): pytest.param({ "states": [ { - "partition": {"id": "13506132"}, + "partition": {"parent_id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, { - "partition": {"id": "14351124"}, + "partition": {"parent_id": "14351124"}, }, ] }, id="test_should_not_migrate_state_without_a_cursor_component"), pytest.param({ "states": [ { - "partition": {"id": "13506132"}, + "partition": {"parent_id": "13506132"}, "cursor": {"updated_at": "2022-12-27T08:34:39+00:00"} }, { - "partition": {"id": "14351124"}, + "partition": {"parent_id": "14351124"}, "cursor": {"updated_at": "2022-12-27T08:35:39+00:00"} }, ] @@ -91,11 +109,11 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): pytest.param({ "states": [ { - "partition": {"id": "13506132"}, + "partition": {"parent_id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, { - "partition": {"id": "14351124"}, + "partition": {"parent_id": "14351124"}, "cursor": {"last_changed": "2022-12-27T08:35:39+00:00", "updated_at": "2021-01-01"} }, ] @@ -104,7 +122,7 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): { "states": [ { - "partition": {"id": "13506132"}, + "partition": {"parent_id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, { @@ -117,11 +135,11 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): { "states": [ { - "partition": {"id": "13506132", "another_id": "A"}, + "partition": {"parent_id": "13506132", "another_id": "A"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, { - "partition": {"id": "13506134"}, + "partition": {"parent_id": "13506134"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, ] @@ -135,7 +153,7 @@ def test_migrate_a_valid_legacy_state_to_per_partition(): "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, { - "partition": {"id": "13506134"}, + "partition": {"parent_id": "13506134"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }, ] From 7819c2cf6b91ea25b236c2514535496f29f6dff9 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 1 Apr 2024 07:12:40 -0700 Subject: [PATCH 4/4] comment and fix lint --- .../migrations/legacy_to_per_partition_state_migration.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py index f6dbc810859d..b285be0c7fe0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py @@ -29,7 +29,9 @@ class LegacyToPerPartitionStateMigration(StateMigration): } Due to a bug in the original migration, this class also corrects a per_partition state that erroneously uses the parent_key - as the partition key instead of the partition_key_field + as the partition key instead of the partition_key_field. + See https://github.com/airbytehq/airbyte/pull/36719 for mor details + Example output state: { "partition": {"id": "13506132"}, @@ -81,7 +83,7 @@ def _get_parent_key(self, partition_router: SubstreamPartitionRouter) -> str: parent_key = ( parent_stream_config.parent_key if isinstance(parent_stream_config, ParentStreamConfig) - else parent_stream_config.get("parent_key") + else parent_stream_config.get("parent_key") # type: ignore # There is a type check. parent_stream_config is known to be a mapping if not a ParentStreamConfig ) return parent_key @@ -98,7 +100,7 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: return False # If the state objects have the parent_key field, we might need to migrate due to a bug in the original implementation # see https://github.com/airbytehq/airbyte/pull/36719 for more details - elif not self._parent_key in first_state_message.get("partition"): + elif self._parent_key not in first_state_message.get("partition"): return False # There is exactly one parent stream