-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/nifi): ingest process group as browse path v2, incremental lineage #10202
feat(ingest/nifi): ingest process group as browse path v2, incremental lineage #10202
Conversation
2acb3c6
to
b7b328c
Compare
entityUrn=job_urn, | ||
aspect=DataJobInputOutputClass( | ||
inputDatasets=inlets, outputDatasets=outlets, inputDatajobs=inputJobs | ||
if self.config.incremental_lineage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, I would also like to patch DataJobInfo - to not overwrite last_event_time
custom property that represents when the last event was processed by DataJob so that it doesn't get overwritten. However currently DataJobPatchBuilder does not support patching this aspect. I will add this in followup.
|
||
# As of now, container entities retrieval does not respect browsePathsV2 similar to container aspect. | ||
# Consider enabling this when entities with browsePathsV2 pointing to container also get listed in container entities. | ||
emit_process_group_as_container: bool = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only needed until we show browsePathV2 on entity cards right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
for patch_mcp in patch_builder.build(): | ||
yield MetadataWorkUnit( | ||
id=f"{job_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually I would like to move this into auto_incremental_lineage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right!
|
||
def gen_process_group_key(self, process_group_id: str) -> ProcessGroupKey: | ||
return ProcessGroupKey( | ||
process_group_id=process_group_id, platform=NIFI, env=self.config.env |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are process_group_id
s globally unique within nifi? it seems like they're scoped to the flow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, this is safe.
For a single nifi clustered or standalone deployment - process group id would be unique. As of now - entire deployment is represented as single dataflow - for nothing else comes closer.
In future when multiple nifi deployments come in picture (seems rare, given the complexity of single nifi deployment), addition of platform-instance capability in nifi source should handle the uniqueness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
|
||
current_process_group = self.nifi_flow.processGroups[process_group_id] | ||
parent_pg_id = current_process_group.parent_group_id | ||
if parent_pg_id and not self._is_root_process_group(parent_pg_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the _is_root_process_group
check is already done in the recursive call - imo it's clearer to not check it again here
@@ -392,6 +400,10 @@ def __init__(self, config: NifiSourceConfig, ctx: PipelineContext) -> None: | |||
if self.config.ca_file is not None: | |||
self.session.verify = self.config.ca_file | |||
|
|||
# To keep track of process groups (containers) which have already been ingested | |||
# Required, as we do not ingest all process groups but only those that have known ingress/egress processors | |||
self.processed_pgs: List[str] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only used when emit_process_group_as_container
is enabled right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
CI failures are related to #10235 |
…l lineage (datahub-project#10202) Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
Checklist