Skip to content

Commit

Permalink
incremental lineage for nifi, docs update
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Apr 4, 2024
1 parent dcfddfb commit b7b328c
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 47 deletions.
17 changes: 17 additions & 0 deletions metadata-ingestion/docs/sources/nifi/nifi_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Concept Mapping

| Source Concept | DataHub Concept | Notes |
| --------------------------------- | --------------------------------------------------------- | ----------------------- |
| `"Nifi"` | [Data Platform](../../metamodel/entities/dataPlatform.md) | |
| Nifi flow | [Data Flow](../../metamodel/entities/dataFlow.md) | |
| Nifi Ingress / Egress Processor | [Data Job](../../metamodel/entities/dataJob.md) | |
| Nifi Remote Port | [Data Job](../../metamodel/entities/dataJob.md) | |
| Nifi Port with remote connections | [Dataset](../../metamodel/entities/dataset.md) | |
| Nifi Process Group | [Container](../../metamodel/entities/container.md) | Subtype `Process Group` |

### Caveats
- This plugin extracts the lineage information between external datasets and ingress/egress processors by analyzing provenance events. Please check your Nifi configuration to confirm max rentention period of provenance events and make sure that ingestion runs frequent enough to read provenance events before they are disappear.

- Limited ingress/egress processors are supported
- S3: `ListS3`, `FetchS3Object`, `PutS3Object`
- SFTP: `ListSFTP`, `FetchSFTP`, `GetSFTP`, `PutSFTP`
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class BIContainerSubTypes(str, Enum):
QLIK_APP = "Qlik App"


class JobContainerSubTypes(str, Enum):
NIFI_PROCESS_GROUP = "Process Group"


class BIAssetSubTypes(str, Enum):
# Generic SubTypes
REPORT = "Report"
Expand Down
49 changes: 28 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
DataPlatformInstanceClass,
DatasetPropertiesClass,
)
from datahub.specific.datajob import DataJobPatchBuilder

logger = logging.getLogger(__name__)
NIFI = "nifi"
Expand Down Expand Up @@ -142,6 +143,12 @@ class NifiSourceConfig(EnvConfigMixin):
description="Whether to emit Nifi process groups as container entities.",
)

incremental_lineage: bool = Field(
default=True,
description="When enabled, emits incremental/patch lineage for Nifi processors."
" When disabled, re-states lineage on each run.",
)

@root_validator(skip_on_failure=True)
def validate_auth_params(cla, values):
if values.get("auth") is NifiAuthType.CLIENT_CERT and not values.get(
Expand Down Expand Up @@ -380,21 +387,6 @@ def report_dropped(self, ent_name: str) -> None:
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.LINEAGE_COARSE, "Supported. See docs for limitations")
class NifiSource(Source):
"""
This plugin extracts the following:
- NiFi flow as `DataFlow` entity
- Ingress, egress processors, remote input and output ports as `DataJob` entity
- Input and output ports receiving remote connections as `Dataset` entity
- Lineage information between external datasets and ingress/egress processors by analyzing provenance events
Current limitations:
- Limited ingress/egress processors are supported
- S3: `ListS3`, `FetchS3Object`, `PutS3Object`
- SFTP: `ListSFTP`, `FetchSFTP`, `GetSFTP`, `PutSFTP`
"""

config: NifiSourceConfig
report: NifiSourceReport
Expand Down Expand Up @@ -1148,12 +1140,27 @@ def construct_job_workunits(
outlets.sort()
inputJobs.sort()

yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=DataJobInputOutputClass(
inputDatasets=inlets, outputDatasets=outlets, inputDatajobs=inputJobs
),
).as_workunit()
if self.config.incremental_lineage:
patch_builder: DataJobPatchBuilder = DataJobPatchBuilder(job_urn)
for inlet in inlets:
patch_builder.add_input_dataset(inlet)
for outlet in outlets:
patch_builder.add_output_dataset(outlet)
for inJob in inputJobs:
patch_builder.add_input_datajob(inJob)
for patch_mcp in patch_builder.build():
yield MetadataWorkUnit(
id=f"{job_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp
)
else:
yield MetadataChangeProposalWrapper(
entityUrn=job_urn,
aspect=DataJobInputOutputClass(
inputDatasets=inlets,
outputDatasets=outlets,
inputDatajobs=inputJobs,
),
).as_workunit()

def gen_browse_path_v2_workunit(
self, entity_urn: str, process_group_id: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,26 @@
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),aed63edf-e660-3f29-b56b-192cf6286889)",
"changeType": "UPSERT",
"changeType": "PATCH",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)"
]
}
"json": [
{
"op": "add",
"path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"value": {
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
}
}
]
},
"systemMetadata": {
"lastObserved": 1638532800000,
Expand Down Expand Up @@ -160,18 +170,41 @@
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"changeType": "UPSERT",
"changeType": "PATCH",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)"
]
}
"json": [
{
"op": "add",
"path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
}
},
{
"op": "add",
"path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"value": {
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
}
}
]
},
"systemMetadata": {
"lastObserved": 1638532800000,
Expand Down Expand Up @@ -246,16 +279,26 @@
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"changeType": "UPSERT",
"changeType": "PATCH",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
],
"outputDatasets": [],
"inputDatajobs": []
}
"json": [
{
"op": "add",
"path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
}
}
]
},
"systemMetadata": {
"lastObserved": 1638532800000,
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/tests/integration/nifi/test_nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def test_nifi_ingest_cluster(loaded_nifi, pytestconfig, tmp_path, test_resources
"http://nifi02:9081/nifi/": "default",
"http://nifi03:9082/nifi/": "default",
},
"incremental_lineage": False,
},
},
"sink": {
Expand Down

0 comments on commit b7b328c

Please sign in to comment.