Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add way to filter by aspect_type in metadata_sync #137

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
MetadataChangeLogClass,
MetadataChangeProposalClass,
)
from pydantic import BaseModel
from pydantic import BaseModel, Field

from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
Expand All @@ -21,6 +21,7 @@ class MetadataChangeEmitterConfig(BaseModel):
gms_server: Optional[str]
gms_auth_token: Optional[str]
aspects_to_exclude: Optional[List]
entity_type_to_exclude: List[str] = Field(default_factory=list)
extra_headers: Optional[Dict[str, str]]


Expand Down Expand Up @@ -59,6 +60,7 @@ def __init__(self, config: MetadataChangeEmitterConfig, ctx: PipelineContext):
if self.config.aspects_to_exclude
else self.DEFAULT_ASPECTS_EXCLUDE_SET
)

extra_headers_keys = (
list(self.config.extra_headers.keys())
if self.config.extra_headers
Expand All @@ -77,10 +79,18 @@ def act(self, event: EventEnvelope) -> None:
if event.event_type is METADATA_CHANGE_LOG_EVENT_V1_TYPE:
orig_event = cast(MetadataChangeLogClass, event.event)
logger.debug(f"received orig_event {orig_event}")
if orig_event.get("aspectName") not in self.aspects_exclude_set:
if (orig_event.get("aspectName") not in self.aspects_exclude_set) and (
orig_event.get("entityType") not in self.config.entity_type_to_exclude
if self.config.entity_type_to_exclude
else True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get some reporting / logging for this?

):
mcp = self.buildMcp(orig_event)
if mcp is not None:
self.emit(mcp)
else:
logger.debug(
f"skip emitting mcp for aspect as {orig_event.get('aspectName')} or entity type {orig_event.get('entityType')} on exclude list"
)

def buildMcp(
self, orig_event: MetadataChangeLogClass
Expand Down
Loading