Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

low-code: Replace use of parent_key with partition_field in per-partiton state #36720

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Any, Mapping
from typing import Any, List, Mapping

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
Expand All @@ -22,11 +22,27 @@ class LegacyToPerPartitionStateMigration(StateMigration):
"13506132": {
"last_changed": "2022-12-27T08:34:39+00:00"
}
Example output state:
{
"partition": {"parent_id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
}

Due to a bug in the original migration, this class also corrects a per_partition state that erroneously uses the parent_key
as the partition key instead of the partition_key_field.
See https://github.com/airbytehq/airbyte/pull/36719 for mor details

Example output state:
{
"partition": {"id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
}

Example output state:
{
"partition": {"parent_id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
}
"""

def __init__(
Expand All @@ -41,25 +57,51 @@ def __init__(
self._config = config
self._parameters = parameters
self._partition_key_field = InterpolatedString.create(
self._get_parent_key(self._partition_router), parameters=self._parameters
self._get_partition_field(self._partition_router), parameters=self._parameters
).eval(self._config)
self._parent_key = InterpolatedString.create(self._get_parent_key(self._partition_router), parameters=self._parameters).eval(
self._config
)
self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config)

def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str:
parent_stream_config = partition_router.parent_stream_configs[0]

# Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components.
partition_field = (
parent_stream_config.partition_field
if isinstance(parent_stream_config, ParentStreamConfig)
else parent_stream_config.get("partition_field")
)

return partition_field

def _get_parent_key(self, partition_router: SubstreamPartitionRouter) -> str:
parent_stream_config = partition_router.parent_stream_configs[0]

# Retrieve the parent key with a condition, as properties are returned as a dictionary for custom components.
parent_key = (
parent_stream_config.parent_key
if isinstance(parent_stream_config, ParentStreamConfig)
else parent_stream_config.get("parent_key")
else parent_stream_config.get("parent_key") # type: ignore # There is a type check. parent_stream_config is known to be a mapping if not a ParentStreamConfig
)

return parent_key

def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
if _is_already_migrated(stream_state):
return False
if "states" in stream_state:
states = stream_state["states"]
if not states:
# If states is empty, then the state is in the right format, but it is empty. No need to migrate
return False
first_state_message = stream_state["states"][0]
# If the state objects already have the expected partition_key_field, no need to migrate
if self._partition_key_field in first_state_message.get("partition"):
return False
# If the state objects have the parent_key field, we might need to migrate due to a bug in the original implementation
# see https://github.com/airbytehq/airbyte/pull/36719 for more details
elif self._parent_key not in first_state_message.get("partition"):
return False

# There is exactly one parent stream
number_of_parent_streams = len(self._partition_router.parent_stream_configs)
Expand All @@ -85,5 +127,19 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return True

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
states = [{"partition": {self._partition_key_field: key}, "cursor": value} for key, value in stream_state.items()]
if "states" in stream_state:
states = self._create_states_from_bugged_migration(stream_state)
else:
states = self._create_states_from_legacy(stream_state)
return {"states": states}

def _create_states_from_legacy(self, stream_state: Mapping[str, Any]) -> List[Mapping[str, Any]]:
return [{"partition": {self._partition_key_field: key}, "cursor": value} for key, value in stream_state.items()]

def _create_states_from_bugged_migration(self, stream_state: Mapping[str, Any]) -> List[Mapping[str, Any]]:
# If the state objects have the parent_key field, we might need to migrate due to a bug in the original implementation
# see https://github.com/airbytehq/airbyte/pull/36719 for more details
return [
{"partition": {self._partition_key_field: state.get("partition").get(self._parent_key)}, "cursor": state.get("cursor")}
for state in stream_state["states"]
]
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,46 @@
transformer = ManifestComponentTransformer()


def test_migrate_a_valid_legacy_state_to_per_partition():
input_state = {
"13506132": {
"last_changed": "2022-12-27T08:34:39+00:00"
},
"14351124": {
"last_changed": "2022-12-27T08:35:39+00:00"
},
}

@pytest.mark.parametrize(
"input_state", [
pytest.param({
"13506132": {
"last_changed": "2022-12-27T08:34:39+00:00"
},
"14351124": {
"last_changed": "2022-12-27T08:35:39+00:00"
},
}, id="migrate_a_valid_legacy_state_to_per_partition"),
pytest.param(
{
"states": [
{
"partition": {"id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
{
"partition": {"id": "14351124"},
"cursor": {"last_changed": "2022-12-27T08:35:39+00:00"}
},
]
},
id="migrate_per_partition_states_using_parent_key_instead_of_partition_field"
)
]
)
def test_migrate_a_valid_legacy_state_to_per_partition(input_state):
migrator = _migrator()

assert migrator.should_migrate(input_state)

expected_state = {
"states": [
{
"partition": {"id": "13506132"},
"partition": {"parent_id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
{
"partition": {"id": "14351124"},
"partition": {"parent_id": "14351124"},
"cursor": {"last_changed": "2022-12-27T08:35:39+00:00"}
},
]
Expand All @@ -56,46 +74,46 @@ def test_migrate_a_valid_legacy_state_to_per_partition():
pytest.param({
"states": [
{
"partition": {"id": "13506132"},
"partition": {"paren_id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
{
"partition": {"id": "14351124"},
"partition": {"paren_id": "14351124"},
"cursor": {"last_changed": "2022-12-27T08:35:39+00:00"}
},
]
}, id="test_should_not_migrate_a_per_partition_state"),
pytest.param({
"states": [
{
"partition": {"id": "13506132"},
"partition": {"parent_id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
{
"partition": {"id": "14351124"},
"partition": {"parent_id": "14351124"},
},
]
}, id="test_should_not_migrate_state_without_a_cursor_component"),
pytest.param({
"states": [
{
"partition": {"id": "13506132"},
"partition": {"parent_id": "13506132"},
"cursor": {"updated_at": "2022-12-27T08:34:39+00:00"}
},
{
"partition": {"id": "14351124"},
"partition": {"parent_id": "14351124"},
"cursor": {"updated_at": "2022-12-27T08:35:39+00:00"}
},
]
}, id="test_should_not_migrate_a_per_partition_state_with_wrong_cursor_field"),
pytest.param({
"states": [
{
"partition": {"id": "13506132"},
"partition": {"parent_id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
{
"partition": {"id": "14351124"},
"partition": {"parent_id": "14351124"},
"cursor": {"last_changed": "2022-12-27T08:35:39+00:00", "updated_at": "2021-01-01"}
},
]
Expand All @@ -104,7 +122,7 @@ def test_migrate_a_valid_legacy_state_to_per_partition():
{
"states": [
{
"partition": {"id": "13506132"},
"partition": {"parent_id": "13506132"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
{
Expand All @@ -117,11 +135,11 @@ def test_migrate_a_valid_legacy_state_to_per_partition():
{
"states": [
{
"partition": {"id": "13506132", "another_id": "A"},
"partition": {"parent_id": "13506132", "another_id": "A"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
{
"partition": {"id": "13506134"},
"partition": {"parent_id": "13506134"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
]
Expand All @@ -135,7 +153,7 @@ def test_migrate_a_valid_legacy_state_to_per_partition():
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
{
"partition": {"id": "13506134"},
"partition": {"parent_id": "13506134"},
"cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
},
]
Expand Down
Loading