Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
kanavnarula authored Sep 18, 2024
2 parents 77a0372 + 31edb46 commit 5cf0d60
Show file tree
Hide file tree
Showing 107 changed files with 4,220 additions and 3,527 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #11313 - `datahub get` will no longer return a key aspect for entities that don't exist.
- #11369 - The default datahub-rest sink mode has been changed to `ASYNC_BATCH`. This requires a server with version 0.14.0+.
- #11214 Container properties aspect will produce an additional field that will require a corresponding upgrade of server. Otherwise server can reject the aspects.

### Potential Downtime

Expand Down
22 changes: 22 additions & 0 deletions metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
source:
type: datahub-gc
config:
# Cleanup expired tokens
cleanup_expired_tokens: true
# Whether to truncate elasticsearch indices or not which can be safely truncated
truncate_indices: true

# Cleanup DataProcess Instances
dataprocess_cleanup:
retention_days: 10
# Delete empty Data Jobs (if no DataProcessInstance associated with the DataJob)
delete_empty_data_jobs: true
# Delete empty Data Flow (if no DataJob associated with the DataFlow)
delete_empty_data_flows: true
# Whether to hard delete entities or soft delete them
hard_delete_entities: false
# Keep the last n dataprocess instances
keep_last_n: 5
soft_deleted_entities_cleanup:
# Delete soft deleted entities which were deleted 10 days ago
retention_days: 10
8 changes: 8 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ContainerClass,
DomainsClass,
EmbedClass,
FabricTypeClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
Expand Down Expand Up @@ -190,6 +191,12 @@ def gen_containers(
created: Optional[int] = None,
last_modified: Optional[int] = None,
) -> Iterable[MetadataWorkUnit]:
# because of backwards compatibility with a past issue, container_key.env may be a valid env or an instance name
env = (
container_key.env
if container_key.env in vars(FabricTypeClass).values()
else None
)
container_urn = container_key.as_urn()
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
Expand All @@ -207,6 +214,7 @@ def gen_containers(
lastModified=(
TimeStamp(time=last_modified) if last_modified is not None else None
),
env=env if env is not None else None,
),
).as_workunit()

Expand Down
57 changes: 54 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import re
import time
from dataclasses import dataclass
from typing import Dict, Iterable
from functools import partial
from typing import Dict, Iterable, List, Optional

from pydantic import Field

Expand All @@ -15,8 +16,19 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.gc.dataprocess_cleanup import (
DataProcessCleanup,
DataProcessCleanupConfig,
DataProcessCleanupReport,
)
from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import (
SoftDeletedEntitiesCleanup,
SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport,
)

logger = logging.getLogger(__name__)

Expand All @@ -43,34 +55,73 @@ class DataHubGcSourceConfig(ConfigModel):
description="Sleep between truncation monitoring.",
)

dataprocess_cleanup: Optional[DataProcessCleanupConfig] = Field(
default=None,
description="Configuration for data process cleanup",
)

soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanupConfig] = Field(
default=None,
description="Configuration for soft deleted entities cleanup",
)


@dataclass
class DataHubGcSourceReport(SourceReport):
class DataHubGcSourceReport(DataProcessCleanupReport, SoftDeletedEntitiesReport):
expired_tokens_revoked: int = 0


@platform_name("DataHubGc")
@config_class(DataHubGcSourceConfig)
@support_status(SupportStatus.TESTING)
class DataHubGcSource(Source):
"""
DataHubGcSource is responsible for performing garbage collection tasks on DataHub.
This source performs the following tasks:
1. Cleans up expired tokens.
2. Truncates Elasticsearch indices based on configuration.
3. Cleans up data processes and soft-deleted entities if configured.
"""

def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.ctx = ctx
self.config = config
self.report = DataHubGcSourceReport()
self.graph = ctx.require_graph("The DataHubGc source")
self.dataprocess_cleanup: Optional[DataProcessCleanup] = None
self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None

if self.config.dataprocess_cleanup:
self.dataprocess_cleanup = DataProcessCleanup(
ctx, self.config.dataprocess_cleanup, self.report
)
if self.config.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup(
ctx, self.config.soft_deleted_entities_cleanup, self.report
)

@classmethod
def create(cls, config_dict, ctx):
config = DataHubGcSourceConfig.parse_obj(config_dict)
return cls(ctx, config)

# auto_work_unit_report is overriden to disable a couple of automation like auto status aspect, etc. which is not needed her.
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [partial(auto_workunit_reporter, self.get_report())]

def get_workunits_internal(
self,
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
self.revoke_expired_tokens()
if self.config.truncate_indices:
self.truncate_indices()
if self.dataprocess_cleanup:
yield from self.dataprocess_cleanup.get_workunits_internal()
if self.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
yield from []

def truncate_indices(self) -> None:
Expand Down
Loading

0 comments on commit 5cf0d60

Please sign in to comment.