Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/gc): Add dataflow and soft deleted entities cleanup #11102

Merged
merged 16 commits into from
Sep 18, 2024

Conversation

treff7es
Copy link
Contributor

@treff7es treff7es commented Aug 6, 2024

Add way to cleanup Dataflows/datajobs/dataprocessinstances and soft deleted entities.

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

Summary by CodeRabbit

  • New Features

    • Introduced a metadata cleanup mechanism to enhance the management of data jobs and flows.
    • Added support for configuring retention and deletion parameters for metadata entities.
  • Improvements

    • Streamlined processes for efficiently removing outdated or empty data jobs and flows, ensuring a more organized metadata landscape.

Copy link
Contributor

coderabbitai bot commented Aug 6, 2024

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The changes introduce a new metadata cleanup capability to the DataHub ingestion framework. A new entry point is added to the setup.py file, registering the MetadataCleanupSource class. The metadata_cleanup.py file implements a structured approach to managing metadata, including retention and deletion strategies for data jobs and flows, significantly enhancing overall metadata management and cleanliness.

Changes

Files Change Summary
metadata-ingestion/setup.py Added a new entry point for MetadataCleanupSource to enable metadata cleanup operations.
metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py Introduced MetadataCleanupConfig, MetadataCleanupSource, and related classes for managing metadata retention and cleanup.

Poem

🐇 In the meadow where data flows,
A cleanup dance the rabbit knows.
With hops and jumps, we tidy up,
Old jobs and flows, we clear the cup.
A fresh start for metadata bright,
A joyful leap in the morning light! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    -- I pushed a fix in commit <commit_id>, please review it.
    -- Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    -- @coderabbitai generate unit testing code for this file.
    -- @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    -- @coderabbitai generate interesting stats about this repository and render them as a table.
    -- @coderabbitai read src/utils.ts and generate unit testing code.
    -- @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    -- @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Aug 6, 2024
@treff7es treff7es force-pushed the metadata_cleanup_source branch from ba8200c to d4204da Compare August 6, 2024 11:36
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 018a106 and d4204da.

Files selected for processing (2)
  • metadata-ingestion/setup.py (1 hunks)
  • metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py (1 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py

1-1: json imported but unused

Remove unused import: json

(F401)


3-3: os imported but unused

Remove unused import: os

(F401)


8-8: typing.Set imported but unused

Remove unused import: typing.Set

(F401)


14-14: datahub.configuration.source_common.EnvConfigMixin imported but unused

Remove unused import

(F401)


15-15: datahub.configuration.source_common.PlatformInstanceConfigMixin imported but unused

Remove unused import

(F401)


18-18: datahub.emitter.mce_builder.make_dataset_urn_with_platform_instance imported but unused

Remove unused import

(F401)


19-19: datahub.emitter.mce_builder.make_user_urn imported but unused

Remove unused import

(F401)


21-21: datahub.emitter.sql_parsing_builder.SqlParsingBuilder imported but unused

Remove unused import: datahub.emitter.sql_parsing_builder.SqlParsingBuilder

(F401)


25-25: datahub.ingestion.api.decorators.capability imported but unused

Remove unused import: datahub.ingestion.api.decorators.capability

(F401)


33-33: datahub.ingestion.api.source.SourceCapability imported but unused

Remove unused import: datahub.ingestion.api.source.SourceCapability

(F401)


39-39: datahub.ingestion.source.usage.usage_common.BaseUsageConfig imported but unused

Remove unused import: datahub.ingestion.source.usage.usage_common.BaseUsageConfig

(F401)


40-40: datahub.sql_parsing.schema_resolver.SchemaResolver imported but unused

Remove unused import: datahub.sql_parsing.schema_resolver.SchemaResolver

(F401)


41-41: datahub.sql_parsing.sqlglot_lineage.sqlglot_lineage imported but unused

Remove unused import: datahub.sql_parsing.sqlglot_lineage.sqlglot_lineage

(F401)


242-243: Use a single if statement instead of nested if statements

Combine if statements using and

(SIM102)


372-372: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)

Additional comments not posted (7)
metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py (6)

123-155: LGTM!

The MetadataCleanupConfig class is well-defined and follows best practices.


158-163: LGTM!

The DataFlowEntity class is well-defined and follows best practices.


166-173: LGTM!

The DataJobEntity class is well-defined and follows best practices.


176-179: LGTM!

The MetadataCleanupSourceReport class is well-defined and follows best practices.


182-204: LGTM!

The MetadataCleanupSource class is well-defined and follows best practices.


46-99: LGTM!

The GraphQL queries are well-defined and follow best practices.

metadata-ingestion/setup.py (1)

687-687: LGTM!

The new entry point for the MetadataCleanupSource class is well-defined and follows best practices.

Comment on lines 18 to 21
make_dataset_urn_with_platform_instance,
make_user_urn,
)
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused imports.

The following imports are not used and should be removed: make_dataset_urn_with_platform_instance, make_user_urn, SqlParsingBuilder.

-from datahub.emitter.mce_builder import (
-    make_dataset_urn_with_platform_instance,
-    make_user_urn,
-)
-from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
make_dataset_urn_with_platform_instance,
make_user_urn,
)
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
Tools
Ruff

18-18: datahub.emitter.mce_builder.make_dataset_urn_with_platform_instance imported but unused

Remove unused import

(F401)


19-19: datahub.emitter.mce_builder.make_user_urn imported but unused

Remove unused import

(F401)


21-21: datahub.emitter.sql_parsing_builder.SqlParsingBuilder imported but unused

Remove unused import: datahub.emitter.sql_parsing_builder.SqlParsingBuilder

(F401)

from dataclasses import dataclass, field
from datetime import datetime, timezone
from functools import partial
from typing import Dict, Iterable, List, Optional, Set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The Set import from typing is not used and should be removed.

-from typing import Set
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from typing import Dict, Iterable, List, Optional, Set
from typing import Dict, Iterable, List, Optional
Tools
Ruff

8-8: typing.Set imported but unused

Remove unused import: typing.Set

(F401)

Comment on lines 14 to 16
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused imports.

The following imports are not used and should be removed: EnvConfigMixin, PlatformInstanceConfigMixin.

-from datahub.configuration.source_common import (
-    EnvConfigMixin,
-    PlatformInstanceConfigMixin,
-)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
Tools
Ruff

14-14: datahub.configuration.source_common.EnvConfigMixin imported but unused

Remove unused import

(F401)


15-15: datahub.configuration.source_common.PlatformInstanceConfigMixin imported but unused

Remove unused import

(F401)

Comment on lines 1 to 3
import json
import logging
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused imports.

The following imports are not used and should be removed: json, os.

-import json
-import os
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import json
import logging
import os
import logging
Tools
Ruff

1-1: json imported but unused

Remove unused import: json

(F401)


3-3: os imported but unused

Remove unused import: os

(F401)

Comment on lines 25 to 33
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import (
MetadataWorkUnitProcessor,
Source,
SourceCapability,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused imports.

The following imports are not used and should be removed: capability, SourceCapability, BaseUsageConfig, SchemaResolver, sqlglot_lineage.

-from datahub.ingestion.api.decorators import (
-    capability,
-)
-from datahub.ingestion.api.source import (
-    SourceCapability,
-)
-from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
-from datahub.sql_parsing.schema_resolver import SchemaResolver
-from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import (
MetadataWorkUnitProcessor,
Source,
SourceCapability,
from datahub.ingestion.api.decorators import (
)
from datahub.ingestion.api.source import (
MetadataWorkUnitProcessor,
Source,
)
Tools
Ruff

25-25: datahub.ingestion.api.decorators.capability imported but unused

Remove unused import: datahub.ingestion.api.decorators.capability

(F401)


33-33: datahub.ingestion.api.source.SourceCapability imported but unused

Remove unused import: datahub.ingestion.api.source.SourceCapability

(F401)

Comment on lines 242 to 243
if self.config.keep_last_n:
if len(dpis) >= self.config.keep_last_n:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine nested if statements.

Use a single if statement instead of nested if statements.

-        if self.config.keep_last_n:
-            if len(dpis) >= self.config.keep_last_n:
+        if self.config.keep_last_n and len(dpis) >= self.config.keep_last_n:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self.config.keep_last_n:
if len(dpis) >= self.config.keep_last_n:
if self.config.keep_last_n and len(dpis) >= self.config.keep_last_n:
Tools
Ruff

242-243: Use a single if statement instead of nested if statements

Combine if statements using and

(SIM102)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range, codebase verification and nitpick comments (1)
metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py (1)

104-136: Fix typos in field descriptions.

There are typos in the descriptions of delete_empty_data_jobs and delete_empty_data_flows.

- description="Wether to delete Data Jobs without runs"
+ description="Whether to delete Data Jobs without runs"
- description="Wether to delete Data Flows without runs"
+ description="Whether to delete Data Flows without runs"
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between d4204da and 98ce810.

Files selected for processing (1)
  • metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py (1 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py

358-358: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)

Additional comments not posted (13)
metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py (13)

139-144: LGTM!

The class DataFlowEntity is well-defined.


147-154: LGTM!

The class DataJobEntity is well-defined.


157-163: LGTM!

The class MetadataCleanupSourceReport is well-defined.


175-182: LGTM!

The __init__ method is well-defined and handles errors appropriately.


184-187: LGTM!

The create method is well-defined and follows best practices.


189-190: LGTM!

The get_report method is well-defined.


192-193: LGTM!

The get_workunit_processors method is well-defined and follows best practices.


195-217: LGTM!

The fetch_dpis method is well-defined, handles errors appropriately, and follows best practices.


219-232: LGTM!

The keep_last_n_dpi method is well-defined, handles errors appropriately, and follows best practices.


233-244: LGTM!

The delete_entity method is well-defined, handles errors appropriately, and follows best practices.


246-263: LGTM!

The delete_dpi_from_datajobs method is well-defined, handles errors appropriately, and follows best practices.


264-288: LGTM!

The remove_old_dpis method is well-defined, handles errors appropriately, and follows best practices.


290-314: LGTM!

The get_data_flows method is well-defined, handles errors appropriately, and follows best practices.

Comment on lines 1 to 23
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from functools import partial
from typing import Dict, Iterable, List, Optional

from pydantic import Field

from datahub.configuration import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.stats_collections import TopKDict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused imports.

The following imports are not used and should be removed: defaultdict, partial, Field, datetime, timezone, LossyList.

- from collections import defaultdict
- from functools import partial
- from pydantic import Field
- from datetime import datetime, timezone
- from datahub.utilities.lossy_collections import LossyList
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from functools import partial
from typing import Dict, Iterable, List, Optional
from pydantic import Field
from datahub.configuration import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.stats_collections import TopKDict
import logging
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional
from pydantic import Field
from datahub.configuration import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.stats_collections import TopKDict

Comment on lines 316 to 368
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
assert self.ctx.graph

dataFlows: Dict[str, DataFlowEntity] = {}
for flow in self.get_data_flows():
dataFlows[flow.urn] = flow

scroll_id: Optional[str] = None
dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list)

while True:
result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY,
{"query": "*", scroll_id: scroll_id if scroll_id else "null"},
)
scrollAcrossEntities = result.get("scrollAcrossEntities")
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")

scroll_id = scrollAcrossEntities.get("nextScrollId")
for job in scrollAcrossEntities.get("searchResults"):
datajob_entity = DataJobEntity(
urn=job.get("entity").get("urn"),
flow_urn=job.get("entity").get("dataFlow").get("urn"),
lastIngested=job.get("entity").get("lastIngested"),
jobId=job.get("entity").get("jobId"),
dataPlatformInstance=job.get("entity").get("dataPlatformInstance"),
total_runs=job.get("entity").get("runs").get("total"),
)
if datajob_entity.total_runs > 0:
self.delete_dpi_from_datajobs(datajob_entity)
if (
datajob_entity.total_runs == 0
and self.config.delete_empty_data_jobs
):
logger.info(
f"Deleting datajob {datajob_entity.urn} because there are no runs"
)
self.delete_entity(datajob_entity.urn, "dataJob")
else:
dataJobs[datajob_entity.flow_urn].append(datajob_entity)

for key in dataFlows.keys():
if (
not dataJobs.get(key) or len(dataJobs[key]) == 0
) and self.config.delete_empty_data_flows:
logger.info(
f"Deleting dataflow {key} because there are not datajobs"
)
self.delete_entity(key, "dataFlow")

if not scroll_id:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use key in dict instead of key in dict.keys().

Remove .keys() for better readability and performance.

- for key in dataFlows.keys():
+ for key in dataFlows:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
assert self.ctx.graph
dataFlows: Dict[str, DataFlowEntity] = {}
for flow in self.get_data_flows():
dataFlows[flow.urn] = flow
scroll_id: Optional[str] = None
dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list)
while True:
result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY,
{"query": "*", scroll_id: scroll_id if scroll_id else "null"},
)
scrollAcrossEntities = result.get("scrollAcrossEntities")
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")
scroll_id = scrollAcrossEntities.get("nextScrollId")
for job in scrollAcrossEntities.get("searchResults"):
datajob_entity = DataJobEntity(
urn=job.get("entity").get("urn"),
flow_urn=job.get("entity").get("dataFlow").get("urn"),
lastIngested=job.get("entity").get("lastIngested"),
jobId=job.get("entity").get("jobId"),
dataPlatformInstance=job.get("entity").get("dataPlatformInstance"),
total_runs=job.get("entity").get("runs").get("total"),
)
if datajob_entity.total_runs > 0:
self.delete_dpi_from_datajobs(datajob_entity)
if (
datajob_entity.total_runs == 0
and self.config.delete_empty_data_jobs
):
logger.info(
f"Deleting datajob {datajob_entity.urn} because there are no runs"
)
self.delete_entity(datajob_entity.urn, "dataJob")
else:
dataJobs[datajob_entity.flow_urn].append(datajob_entity)
for key in dataFlows.keys():
if (
not dataJobs.get(key) or len(dataJobs[key]) == 0
) and self.config.delete_empty_data_flows:
logger.info(
f"Deleting dataflow {key} because there are not datajobs"
)
self.delete_entity(key, "dataFlow")
if not scroll_id:
break
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
assert self.ctx.graph
dataFlows: Dict[str, DataFlowEntity] = {}
for flow in self.get_data_flows():
dataFlows[flow.urn] = flow
scroll_id: Optional[str] = None
dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list)
while True:
result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY,
{"query": "*", scroll_id: scroll_id if scroll_id else "null"},
)
scrollAcrossEntities = result.get("scrollAcrossEntities")
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")
scroll_id = scrollAcrossEntities.get("nextScrollId")
for job in scrollAcrossEntities.get("searchResults"):
datajob_entity = DataJobEntity(
urn=job.get("entity").get("urn"),
flow_urn=job.get("entity").get("dataFlow").get("urn"),
lastIngested=job.get("entity").get("lastIngested"),
jobId=job.get("entity").get("jobId"),
dataPlatformInstance=job.get("entity").get("dataPlatformInstance"),
total_runs=job.get("entity").get("runs").get("total"),
)
if datajob_entity.total_runs > 0:
self.delete_dpi_from_datajobs(datajob_entity)
if (
datajob_entity.total_runs == 0
and self.config.delete_empty_data_jobs
):
logger.info(
f"Deleting datajob {datajob_entity.urn} because there are no runs"
)
self.delete_entity(datajob_entity.urn, "dataJob")
else:
dataJobs[datajob_entity.flow_urn].append(datajob_entity)
for key in dataFlows:
if (
not dataJobs.get(key) or len(dataJobs[key]) == 0
) and self.config.delete_empty_data_flows:
logger.info(
f"Deleting dataflow {key} because there are not datajobs"
)
self.delete_entity(key, "dataFlow")
if not scroll_id:
break
Tools
Ruff

358-358: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 98ce810 and a7b5f95.

Files selected for processing (1)
  • metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py (1 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py

363-363: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)

Additional comments not posted (16)
metadata-ingestion/src/datahub/ingestion/source/metadata_cleanup.py (16)

104-136: LGTM!

The MetadataCleanupConfig class is well-defined with appropriate fields and descriptions.


140-144: LGTM!

The DataFlowEntity class is well-defined with appropriate fields.


148-154: LGTM!

The DataJobEntity class is well-defined with appropriate fields.


158-163: LGTM!

The MetadataCleanupSourceReport class is well-defined with appropriate fields.


180-187: LGTM!

The __init__ method is well-defined with appropriate initialization steps.


189-192: LGTM!

The create method is well-defined with appropriate steps to create an instance of MetadataCleanupSource.


194-195: LGTM!

The get_report method is well-defined with appropriate steps to return the report.


197-198: LGTM!

The get_workunit_processors method is well-defined with appropriate steps to return the work unit processors.


200-222: LGTM!

The fetch_dpis method is well-defined with appropriate steps to fetch data process instances for a given job URN.


224-236: LGTM!

The keep_last_n_dpi method is well-defined with appropriate steps to keep the last N data process instances for a given job.


238-249: LGTM!

The delete_entity method is well-defined with appropriate steps to delete an entity with a given URN and type.


251-267: LGTM!

The delete_dpi_from_datajobs method is well-defined with appropriate steps to delete data process instances from data jobs.


269-293: LGTM!

The remove_old_dpis method is well-defined with appropriate steps to remove old data process instances based on retention days.


295-319: LGTM!

The get_data_flows method is well-defined with appropriate steps to fetch data flows.


321-374: LGTM!

The get_workunits_internal method is well-defined with appropriate steps to fetch internal work units.

Tools
Ruff

363-363: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)


2-6: Remove unused imports.

The following imports are not used and should be removed: defaultdict, partial.

- from collections import defaultdict
- from functools import partial

Likely invalid or redundant comment.

Comment on lines 22 to 23
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.stats_collections import TopKDict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused imports.

The following imports are not used and should be removed: LossyList.

- from datahub.utilities.lossy_collections import LossyList
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.stats_collections import TopKDict
from datahub.utilities.stats_collections import TopKDict

from functools import partial
from typing import Dict, Iterable, List, Optional

from pydantic import Field
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The Field import from pydantic is not used and should be removed.

- from pydantic import Field
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from pydantic import Field

else:
dataJobs[datajob_entity.flow_urn].append(datajob_entity)

for key in dataFlows.keys():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use key in dict instead of key in dict.keys().

Remove .keys() for better readability and performance.

- for key in dataFlows.keys():
+ for key in dataFlows:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for key in dataFlows.keys():
for key in dataFlows:
Tools
Ruff

363-363: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)

Comment on lines 1 to 4
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused imports.

The following imports are not used and should be removed: datetime, timezone.

- from datetime import datetime, timezone
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
import logging
from collections import defaultdict
from dataclasses import dataclass, field

@treff7es treff7es changed the title feat(ingest/metadata_cleanup): Creating metadata cleanup source to clean up dataflows feat(ingest/gc): Add dataflow and soft deleted entities cleanup Sep 17, 2024

@dataclass
class DataHubGcSourceReport(SourceReport):
class DataHubGcSourceReport(DataProcessCleanupReport, SoftDeletedEntitiesReport):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, I tend to prefer nesting / composition instead of inheritance

e.g.

class DataHubGcSourceReport
  data_process_cleanup: DataProcessCleanupReport
  soft_deleted_entities_cleanup: SoftDeletedEntitiesReport

same for the configs

however, I know it's a larger change so we can definitely defer it to later as well


@classmethod
def create(cls, config_dict, ctx):
config = DataHubGcSourceConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [partial(auto_workunit_reporter, self.get_report())]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment here - this disables auto status and a couple other things on purpose


entity_urn = Urn.create_from_string(urn)
self.report.num_soft_deleted_entity_removed += 1
self.report.num_soft_deleted_entity_removed_by_type[entity_urn.entity_type] = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use a defaultdict here and for other types in the report

entity_urn.entity_type
].append(urn)

self.ctx.graph.delete_entity(urn, True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.ctx.graph.delete_entity(urn, True)
self.ctx.graph.delete_entity(urn, hard=True)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be in the gc directory?

@treff7es treff7es merged commit e7a3890 into datahub-project:master Sep 18, 2024
62 of 65 checks passed
@treff7es treff7es deleted the metadata_cleanup_source branch September 18, 2024 08:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants