From ca2a61e74a3d348c03d46fbaee76ccdaae3c75b8 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Fri, 3 Jan 2025 18:11:48 +0530 Subject: [PATCH 1/5] fix(ingest/gc): logging and stopping fix --- .../source/gc/execution_request_cleanup.py | 14 ++++-- .../source/gc/soft_deleted_entity_cleanup.py | 49 +++++++++++-------- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py index 170a6ada3e336f..f4185491caedd3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -158,8 +158,11 @@ def _scroll_execution_requests( break params["scrollId"] = document["scrollId"] except Exception as e: - logger.error( - f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}" + self.report.failure( + title="failed to fetch next batch of execution requests", + message="failed to fetch next batch of execution requests", + context=str(self.instance_id), + exc=e, ) self.report.ergc_read_errors += 1 @@ -231,8 +234,11 @@ def _delete_entry(self, entry: CleanupRecord) -> None: self.graph.delete_entity(entry.urn, True) except Exception as e: self.report.ergc_delete_errors += 1 - logger.error( - f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}" + self.report.failure( + title="failed to delete ExecutionRequest", + message="failed to delete ExecutionRequest", + context=str(self.instance_id), + exc=e, ) def _reached_runtime_limit(self) -> bool: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 4c0355834f9b4f..d55328f8c11e3b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -163,6 +163,8 @@ def delete_entity(self, urn: str) -> None: f"Dry run is on otherwise it would have deleted {urn} with hard deletion" ) return + if self._deletion_limit_reached() or self._times_up(): + return self._increment_removal_started_count() self.ctx.graph.delete_entity(urn=urn, hard=True) self.ctx.graph.delete_references_to_urn( @@ -203,11 +205,10 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: for future in done: self._print_report() if future.exception(): - logger.error( - f"Failed to delete entity {futures[future]}: {future.exception()}" - ) self.report.failure( - f"Failed to delete entity {futures[future]}", + title="Failed to delete entity", + message=futures[future], + context=futures[future], exc=future.exception(), ) self.report.num_soft_deleted_entity_processed += 1 @@ -274,6 +275,28 @@ def _get_urns(self) -> Iterable[str]: ) yield from self._get_soft_deleted_queries() + def _times_up(self) -> bool: + if ( + self.config.runtime_limit_seconds + and time.time() - self.start_time > self.config.runtime_limit_seconds + ): + logger.info( + f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached" + ) + return True + return False + + def _deletion_limit_reached(self) -> bool: + if ( + self.config.limit_entities_delete + and self.report.num_hard_deleted > self.config.limit_entities_delete + ): + logger.info( + f"Limit of {self.config.limit_entities_delete} entities reached" + ) + return True + return False + def cleanup_soft_deleted_entities(self) -> None: if not self.config.enabled: return @@ -285,24 +308,8 @@ def cleanup_soft_deleted_entities(self) -> None: self._print_report() while len(futures) >= self.config.futures_max_at_time: futures = self._process_futures(futures) - if ( - self.config.limit_entities_delete - and self.report.num_hard_deleted > self.config.limit_entities_delete - ): - logger.info( - f"Limit of {self.config.limit_entities_delete} entities reached. Stopped adding more." - ) + if self._deletion_limit_reached() or self._times_up(): break - if ( - self.config.runtime_limit_seconds - and time.time() - self.start_time - > self.config.runtime_limit_seconds - ): - logger.info( - f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Not submitting more futures." - ) - break - future = executor.submit(self.delete_soft_deleted_entity, urn) futures[future] = urn From 8872b091f6e2e16ebaeccad1eb7f106c7baaf287 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Fri, 3 Jan 2025 18:16:29 +0530 Subject: [PATCH 2/5] missed location --- .../datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index d55328f8c11e3b..6281bd0ed3af26 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -208,7 +208,7 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: self.report.failure( title="Failed to delete entity", message=futures[future], - context=futures[future], + context=str(futures[future]), exc=future.exception(), ) self.report.num_soft_deleted_entity_processed += 1 From 61c5b143f46d4929c49e5940bdf4d76527c56787 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Fri, 3 Jan 2025 19:39:52 +0530 Subject: [PATCH 3/5] code review feedback --- .../source/gc/execution_request_cleanup.py | 12 +++++++----- .../source/gc/soft_deleted_entity_cleanup.py | 16 ++++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py index f4185491caedd3..f9a00d7f009058 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -141,7 +141,9 @@ def _scroll_execution_requests( break if self.report.ergc_read_errors >= self.config.max_read_errors: self.report.failure( - f"ergc({self.instance_id}): too many read errors, aborting." + title="Too many read errors, aborting", + message="Too many read errors, aborting", + context=str(self.instance_id), ) break try: @@ -159,8 +161,8 @@ def _scroll_execution_requests( params["scrollId"] = document["scrollId"] except Exception as e: self.report.failure( - title="failed to fetch next batch of execution requests", - message="failed to fetch next batch of execution requests", + title="Failed to fetch next batch of execution requests", + message="Failed to fetch next batch of execution requests", context=str(self.instance_id), exc=e, ) @@ -235,8 +237,8 @@ def _delete_entry(self, entry: CleanupRecord) -> None: except Exception as e: self.report.ergc_delete_errors += 1 self.report.failure( - title="failed to delete ExecutionRequest", - message="failed to delete ExecutionRequest", + title="Failed to delete ExecutionRequest", + message="Failed to delete ExecutionRequest", context=str(self.instance_id), exc=e, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 6281bd0ed3af26..cf810d05aa2ca1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -105,6 +105,8 @@ class SoftDeletedEntitiesReport(SourceReport): sample_hard_deleted_aspects_by_type: TopKDict[str, LossyList[str]] = field( default_factory=TopKDict ) + runtime_limit_reached: bool = False + deletion_limit_reached: bool = False class SoftDeletedEntitiesCleanup: @@ -207,8 +209,8 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: if future.exception(): self.report.failure( title="Failed to delete entity", - message=futures[future], - context=str(futures[future]), + message="Failed to delete entity", + context=futures[future], exc=future.exception(), ) self.report.num_soft_deleted_entity_processed += 1 @@ -280,9 +282,8 @@ def _times_up(self) -> bool: self.config.runtime_limit_seconds and time.time() - self.start_time > self.config.runtime_limit_seconds ): - logger.info( - f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached" - ) + with self._report_lock: + self.report.runtime_limit_reached = True return True return False @@ -291,9 +292,8 @@ def _deletion_limit_reached(self) -> bool: self.config.limit_entities_delete and self.report.num_hard_deleted > self.config.limit_entities_delete ): - logger.info( - f"Limit of {self.config.limit_entities_delete} entities reached" - ) + with self._report_lock: + self.report.deletion_limit_reached = True return True return False From 483e68986582ed726251c346bdac7664ee1a7554 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Fri, 3 Jan 2025 20:39:58 +0530 Subject: [PATCH 4/5] add tests to increase coverage --- metadata-ingestion/tests/unit/cli/test_cli_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata-ingestion/tests/unit/cli/test_cli_utils.py b/metadata-ingestion/tests/unit/cli/test_cli_utils.py index c9693c75d96fe9..1b01731841e960 100644 --- a/metadata-ingestion/tests/unit/cli/test_cli_utils.py +++ b/metadata-ingestion/tests/unit/cli/test_cli_utils.py @@ -63,9 +63,11 @@ def test_correct_url_when_url_set(): def test_fixup_gms_url(): + assert cli_utils.fixup_gms_url(None) == "" assert cli_utils.fixup_gms_url("http://localhost:8080") == "http://localhost:8080" assert cli_utils.fixup_gms_url("http://localhost:8080/") == "http://localhost:8080" assert cli_utils.fixup_gms_url("http://abc.acryl.io") == "https://abc.acryl.io/gms" + assert cli_utils.fixup_gms_url("http://abc.acryl.io/") == "https://abc.acryl.io/gms" assert ( cli_utils.fixup_gms_url("http://abc.acryl.io/api/gms") == "https://abc.acryl.io/gms" From 99cd284991148c2349411e74b4efff9a8ab5dba5 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Fri, 3 Jan 2025 20:49:59 +0530 Subject: [PATCH 5/5] add test for report update --- .../tests/unit/cli/test_cli_utils.py | 2 -- metadata-ingestion/tests/unit/test_gc.py | 28 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/tests/unit/cli/test_cli_utils.py b/metadata-ingestion/tests/unit/cli/test_cli_utils.py index 1b01731841e960..c9693c75d96fe9 100644 --- a/metadata-ingestion/tests/unit/cli/test_cli_utils.py +++ b/metadata-ingestion/tests/unit/cli/test_cli_utils.py @@ -63,11 +63,9 @@ def test_correct_url_when_url_set(): def test_fixup_gms_url(): - assert cli_utils.fixup_gms_url(None) == "" assert cli_utils.fixup_gms_url("http://localhost:8080") == "http://localhost:8080" assert cli_utils.fixup_gms_url("http://localhost:8080/") == "http://localhost:8080" assert cli_utils.fixup_gms_url("http://abc.acryl.io") == "https://abc.acryl.io/gms" - assert cli_utils.fixup_gms_url("http://abc.acryl.io/") == "https://abc.acryl.io/gms" assert ( cli_utils.fixup_gms_url("http://abc.acryl.io/api/gms") == "https://abc.acryl.io/gms" diff --git a/metadata-ingestion/tests/unit/test_gc.py b/metadata-ingestion/tests/unit/test_gc.py index 8f00d5e064db85..fde9a3f2e0cf03 100644 --- a/metadata-ingestion/tests/unit/test_gc.py +++ b/metadata-ingestion/tests/unit/test_gc.py @@ -9,6 +9,34 @@ DataProcessCleanupConfig, DataProcessCleanupReport, ) +from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import ( + SoftDeletedEntitiesCleanup, + SoftDeletedEntitiesCleanupConfig, + SoftDeletedEntitiesReport, +) + + +class TestSoftDeletedEntitiesCleanup(unittest.TestCase): + def setUp(self): + self.ctx = PipelineContext(run_id="test_run") + self.ctx.graph = MagicMock() + self.config = SoftDeletedEntitiesCleanupConfig() + self.report = SoftDeletedEntitiesReport() + self.cleanup = SoftDeletedEntitiesCleanup( + self.ctx, self.config, self.report, dry_run=True + ) + + def test_update_report(self): + self.cleanup._update_report( + urn="urn:li:dataset:1", + entity_type="dataset", + ) + self.assertEqual(1, self.report.num_hard_deleted) + self.assertEqual(1, self.report.num_hard_deleted_by_type["dataset"]) + + def test_increment_retained_count(self): + self.cleanup._increment_retained_count() + self.assertEqual(1, self.report.num_soft_deleted_retained_due_to_age) class TestDataProcessCleanup(unittest.TestCase):