Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Add fail-safe stale entity removal via configurable 'fail_safe_threshold' param. #6027

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/state/dbt_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ def get_urns_not_in(
elif type == "assertion":
yield from self._get_assertion_urns_not_in(other_checkpoint_state)

def get_percent_entities_changed(
self, old_checkpoint_state: "DbtCheckpointState"
) -> float:
old_count_all = 0
overlap_count_all = 0
for new_entities, old_entities in [
(self.encoded_node_urns, old_checkpoint_state.encoded_node_urns),
(self.encoded_assertion_urns, old_checkpoint_state.encoded_assertion_urns),
]:
(
overlap_count,
old_count,
_,
) = StaleEntityCheckpointStateBase.get_entity_overlap_and_cardinalities(
new_entities=new_entities, old_entities=old_entities
)
overlap_count_all += overlap_count
old_count_all += old_count
if old_count_all:
return overlap_count * 100.0 / old_count_all
return 0.0

def prepare_for_commit(self) -> None:
self.encoded_node_urns = list(set(self.encoded_node_urns))
self.encoded_assertion_urns = list(set(self.encoded_assertion_urns))
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,18 @@ def get_urns_not_in(
yield from self._get_urns_not_in(
self.encoded_topic_urns, other_checkpoint_state.encoded_topic_urns
)

def get_percent_entities_changed(
self, old_checkpoint_state: "KafkaCheckpointState"
) -> float:
(
overlap_count,
old_count,
_,
) = StaleEntityCheckpointStateBase.get_entity_overlap_and_cardinalities(
new_entities=self.encoded_topic_urns,
old_entities=old_checkpoint_state.encoded_topic_urns,
)
if old_count:
return overlap_count * 100.0 / old_count
return 0.0
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,27 @@ def get_urns_not_in(
yield from self._get_table_urns_not_in(other_checkpoint_state)
elif type == "view":
yield from self._get_view_urns_not_in(other_checkpoint_state)

def get_percent_entities_changed(
self, old_checkpoint_state: "BaseSQLAlchemyCheckpointState"
) -> float:
old_count_all = 0
overlap_count_all = 0
for new_entities, old_entities in [
(self.encoded_assertion_urns, old_checkpoint_state.encoded_assertion_urns),
(self.encoded_container_urns, old_checkpoint_state.encoded_container_urns),
(self.encoded_table_urns, old_checkpoint_state.encoded_table_urns),
(self.encoded_view_urns, old_checkpoint_state.encoded_view_urns),
rslanka marked this conversation as resolved.
Show resolved Hide resolved
]:
(
overlap_count,
old_count,
_,
) = StaleEntityCheckpointStateBase.get_entity_overlap_and_cardinalities(
new_entities=new_entities, old_entities=old_entities
)
overlap_count_all += overlap_count
old_count_all += old_count
if old_count_all:
return overlap_count * 100.0 / old_count_all
return 0.0
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, cast
from typing import Dict, Generic, Iterable, List, Optional, Tuple, Type, TypeVar, cast

import pydantic

Expand Down Expand Up @@ -34,6 +34,12 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
default=True,
description=f"Soft-deletes the entities of type {', '.join(_entity_types)} in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
fail_safe_threshold: float = pydantic.Field(
default=20.0,
rslanka marked this conversation as resolved.
Show resolved Hide resolved
description="Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
le=100.0, # mypy does not work with pydantic.confloat. This is the recommended work-around.
ge=0.0,
)


@dataclass
Expand Down Expand Up @@ -83,6 +89,23 @@ def get_urns_not_in(
"""
pass

@abstractmethod
def get_percent_entities_changed(self, old_checkpoint_state: Derived) -> float:
"""
Returns the percentage of entities that have changed relative to `old_checkpoint_state`.
:param old_checkpoint_state: the old checkpoint state to compute the relative change percent against.
:return: (|intersection(self, old_checkpoint_state)| * 100.0 / |old_checkpoint_state|)
"""
pass

@staticmethod
def get_entity_overlap_and_cardinalities(
new_entities: List[str], old_entities: List[str]
) -> Tuple[int, int, int]:
new_set = set(new_entities)
old_set = set(old_entities)
return len(new_set.intersection(old_set)), len(old_set), len(new_set)


class StaleEntityRemovalHandler(
StatefulIngestionUsecaseHandlerBase[StaleEntityCheckpointStateBase]
Expand Down Expand Up @@ -217,6 +240,26 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
cur_checkpoint_state = cast(
StaleEntityCheckpointStateBase, cur_checkpoint.state
)

# Check if the entity delta is below the fail-safe threshold.
entity_difference_percent = cur_checkpoint_state.get_percent_entities_changed(
last_checkpoint_state
)
assert self.stateful_ingestion_config
if (
entity_difference_percent
> self.stateful_ingestion_config.fail_safe_threshold
):
# Log the failure. This would prevent the current state from getting committed.
self.source.get_report().report_failure(
"Stateful Ingestion",
f"Fail safe mode triggered, entity difference percent:{entity_difference_percent}"
" > fail_safe_threshold:{self.stateful_ingestion_config.fail_safe_threshold}",
)
# Bail so that we don't emit the stale entity removal workunits.
return

# Everything looks good, emit the soft-deletion workunits
for type in self.state_type_class.get_supported_types():
for urn in last_checkpoint_state.get_urns_not_in(
type=type, other_checkpoint_state=cur_checkpoint_state
Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/tests/integration/dbt/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def test_dbt_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
Expand Down Expand Up @@ -388,6 +389,7 @@ def test_dbt_state_backward_compatibility(
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
Expand Down Expand Up @@ -528,6 +530,7 @@ def test_dbt_stateful_tests(pytestconfig, tmp_path, mock_time, mock_datahub_grap
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def test_kafka_ingest_with_stateful(
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/tests/unit/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/tests/unit/test_kafka_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ def is_stateful_ingestion_configured(self):
kafka_source_patcher.is_local = True
KafkaSource.create(
{
"stateful_ingestion": {"enabled": "true"},
"stateful_ingestion": {
"enabled": "true",
"fail_safe_threshold": 100.0,
},
"connection": {"bootstrap": "localhost:9092"},
},
ctx,
Expand Down
1 change: 1 addition & 0 deletions smoke-test/tests/test_stateful_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def get_current_checkpoint_from_pipeline(
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": get_gms_url()}},
Expand Down