Skip to content

Commit

Permalink
feat(ingest/nifi): ingest process group as browse path v2
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Apr 4, 2024
1 parent bad96ed commit dcfddfb
Show file tree
Hide file tree
Showing 3 changed files with 506 additions and 65 deletions.
118 changes: 116 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import EnvConfigMixin
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ContainerKey, gen_containers
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand All @@ -33,7 +34,10 @@
)
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import JobContainerSubTypes
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsV2Class,
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
Expand Down Expand Up @@ -70,6 +74,10 @@ class NifiAuthType(Enum):
BASIC_AUTH = "BASIC_AUTH"


class ProcessGroupKey(ContainerKey):
process_group_id: str


class NifiSourceConfig(EnvConfigMixin):
site_url: str = Field(
description="URL for Nifi, ending with /nifi/. e.g. https://mynifi.domain/nifi/"
Expand Down Expand Up @@ -123,7 +131,15 @@ class NifiSourceConfig(EnvConfigMixin):
# root CA trusted by client system, e.g. self-signed certificates
ca_file: Optional[Union[bool, str]] = Field(
default=None,
description="Path to PEM file containing certs for the root CA(s) for the NiFi",
description="Path to PEM file containing certs for the root CA(s) for the NiFi."
"Set to False to disable SSL verification.",
)

# As of now, container entities retrieval does not respect browsePathsV2 similar to container aspect.
# Consider enabling this when entities with browsePathsV2 pointing to container also get listed in container entities.
emit_process_group_as_container: bool = Field(
default=False,
description="Whether to emit Nifi process groups as container entities.",
)

@root_validator(skip_on_failure=True)
Expand Down Expand Up @@ -392,6 +408,10 @@ def __init__(self, config: NifiSourceConfig, ctx: PipelineContext) -> None:
if self.config.ca_file is not None:
self.session.verify = self.config.ca_file

# To keep track of process groups (containers) which have already been ingested
# Required, as we do not ingest all process groups but only those that have known ingress/egress processors
self.processed_pgs: List[str] = []

@cached_property
def rest_api_base_url(self):
return self.config.site_url[: -len("nifi/")] + "nifi-api/"
Expand Down Expand Up @@ -794,7 +814,7 @@ def delete_provenance(self, provenance_uri):
def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
rootpg = self.nifi_flow.root_process_group
flow_name = rootpg.name # self.config.site_name
flow_urn = builder.make_data_flow_urn(NIFI, rootpg.id, self.config.env)
flow_urn = self.make_flow_urn()
flow_properties = {}
if self.nifi_flow.clustered is not None:
flow_properties["clustered"] = str(self.nifi_flow.clustered)
Expand Down Expand Up @@ -927,9 +947,16 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
)
break

if self.config.emit_process_group_as_container:
# We emit process groups only for all nifi components qualifying as datajobs
yield from self.construct_process_group_workunits(
component.parent_group_id
)

yield from self.construct_job_workunits(
job_urn,
job_name,
component.parent_group_id,
external_url=self.make_external_url(
component.parent_group_id, component.id, component.parent_rpg_id
),
Expand All @@ -951,6 +978,11 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
external_url=self.make_external_url(port.parent_group_id, port.id),
)

def make_flow_urn(self) -> str:
return builder.make_data_flow_urn(
NIFI, self.nifi_flow.root_process_group.id, self.config.env
)

def process_provenance_events(self):
startDate = datetime.now(timezone.utc) - timedelta(
days=self.config.provenance_days
Expand Down Expand Up @@ -1083,6 +1115,7 @@ def construct_job_workunits(
self,
job_urn: str,
job_name: str,
parent_group_id: str,
external_url: str,
job_type: str,
description: Optional[str],
Expand All @@ -1107,6 +1140,10 @@ def construct_job_workunits(
),
).as_workunit()

# If dataJob had container aspect, we would ideally only emit it
# and browse path v2 would automatically be generated.
yield self.gen_browse_path_v2_workunit(job_urn, parent_group_id)

inlets.sort()
outlets.sort()
inputJobs.sort()
Expand All @@ -1118,6 +1155,83 @@ def construct_job_workunits(
),
).as_workunit()

def gen_browse_path_v2_workunit(
self, entity_urn: str, process_group_id: str
) -> MetadataWorkUnit:
flow_urn = self.make_flow_urn()
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=BrowsePathsV2Class(
path=[
BrowsePathEntryClass(id=flow_urn, urn=flow_urn),
*self.get_browse_path_entries(process_group_id),
]
),
).as_workunit()

def get_browse_path_entries(
self, process_group_id: str
) -> List[BrowsePathEntryClass]:
"""Browse path entries till current process group"""
if self._is_root_process_group(process_group_id):
return []

current_process_group = self.nifi_flow.processGroups[process_group_id]
parent_pg_id = current_process_group.parent_group_id
if parent_pg_id and not self._is_root_process_group(parent_pg_id):
parent_browse_path = self.get_browse_path_entries(parent_pg_id)
else:
parent_browse_path = []

if self.config.emit_process_group_as_container:
container_urn = self.gen_process_group_key(process_group_id).as_urn()
current_browse_entry = BrowsePathEntryClass(
id=container_urn, urn=container_urn
)
else:
current_browse_entry = BrowsePathEntryClass(id=current_process_group.name)
return parent_browse_path + [current_browse_entry]

def _is_root_process_group(self, process_group_id: str) -> bool:
return self.nifi_flow.root_process_group.id == process_group_id

def construct_process_group_workunits(
self, process_group_id: str
) -> Iterable[MetadataWorkUnit]:
if (
self._is_root_process_group(process_group_id)
or process_group_id in self.processed_pgs
):
return
self.processed_pgs.append(process_group_id)

pg = self.nifi_flow.processGroups[process_group_id]
container_key = self.gen_process_group_key(process_group_id)
yield from gen_containers(
container_key=container_key,
name=pg.name,
sub_types=[JobContainerSubTypes.NIFI_PROCESS_GROUP],
parent_container_key=(
self.gen_process_group_key(pg.parent_group_id)
if pg.parent_group_id
and not self._is_root_process_group(pg.parent_group_id)
else None
),
)

if pg.parent_group_id: # always true for non-root process group
yield from self.construct_process_group_workunits(pg.parent_group_id)

if self._is_root_process_group(pg.parent_group_id):
yield self.gen_browse_path_v2_workunit(
container_key.as_urn(), pg.parent_group_id
)

def gen_process_group_key(self, process_group_id: str) -> ProcessGroupKey:
return ProcessGroupKey(
process_group_id=process_group_id, platform=NIFI, env=self.config.env
)

def construct_dataset_workunits(
self,
dataset_platform: str,
Expand Down
Loading

0 comments on commit dcfddfb

Please sign in to comment.