From 852267972c8efc1ceb5a0cbd71594d7ea2529d49 Mon Sep 17 00:00:00 2001 From: "nicholas.fwang" Date: Fri, 27 Oct 2023 01:57:43 +0900 Subject: [PATCH] refactor(ingestion/mongodb): Add platform_instance to mongodb (#8663) Co-authored-by: Harshal Sheth --- .../src/datahub/ingestion/source/mongodb.py | 16 +++++++++++++--- .../integration/mongodb/mongodb_mces_golden.json | 16 ++++++++-------- .../tests/integration/mongodb/test_mongodb.py | 1 + 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index f02b6845e40b5..890c5c64bd5e6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -11,7 +11,11 @@ from pymongo.mongo_client import MongoClient from datahub.configuration.common import AllowDenyPattern -from datahub.configuration.source_common import EnvConfigMixin +from datahub.configuration.source_common import ( + EnvConfigMixin, + PlatformInstanceConfigMixin, +) +from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SourceCapability, @@ -55,7 +59,7 @@ DENY_DATABASE_LIST = set(["admin", "config", "local"]) -class MongoDBConfig(EnvConfigMixin): +class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin): # See the MongoDB authentication docs for details and examples. # https://pymongo.readthedocs.io/en/stable/examples/authentication.html connect_uri: str = Field( @@ -199,6 +203,7 @@ def construct_schema_pymongo( @platform_name("MongoDB") @config_class(MongoDBConfig) @support_status(SupportStatus.CERTIFIED) +@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") @capability(SourceCapability.SCHEMA_METADATA, "Enabled by default") @dataclass class MongoDBSource(Source): @@ -320,7 +325,12 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.report_dropped(dataset_name) continue - dataset_urn = f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.config.env})" + dataset_urn = make_dataset_urn_with_platform_instance( + platform=platform, + name=dataset_name, + env=self.config.env, + platform_instance=self.config.platform_instance, + ) dataset_snapshot = DatasetSnapshot( urn=dataset_urn, diff --git a/metadata-ingestion/tests/integration/mongodb/mongodb_mces_golden.json b/metadata-ingestion/tests/integration/mongodb/mongodb_mces_golden.json index 1f662cfe514e2..e16101b137ac9 100644 --- a/metadata-ingestion/tests/integration/mongodb/mongodb_mces_golden.json +++ b/metadata-ingestion/tests/integration/mongodb/mongodb_mces_golden.json @@ -2,7 +2,7 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.emptyCollection,PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.emptyCollection,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { @@ -41,7 +41,7 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.firstCollection,PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.firstCollection,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { @@ -345,7 +345,7 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.largeCollection,PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.largeCollection,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { @@ -3988,7 +3988,7 @@ { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.secondCollection,PROD)", + "urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.secondCollection,PROD)", "aspects": [ { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { @@ -4135,7 +4135,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.emptyCollection,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.emptyCollection,PROD)", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -4150,7 +4150,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.firstCollection,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.firstCollection,PROD)", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -4165,7 +4165,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.largeCollection,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.largeCollection,PROD)", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -4180,7 +4180,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.secondCollection,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.secondCollection,PROD)", "changeType": "UPSERT", "aspectName": "status", "aspect": { diff --git a/metadata-ingestion/tests/integration/mongodb/test_mongodb.py b/metadata-ingestion/tests/integration/mongodb/test_mongodb.py index 5228c21223e24..56fb471d4c9f1 100644 --- a/metadata-ingestion/tests/integration/mongodb/test_mongodb.py +++ b/metadata-ingestion/tests/integration/mongodb/test_mongodb.py @@ -25,6 +25,7 @@ def test_mongodb_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time "username": "mongoadmin", "password": "examplepass", "maxDocumentSize": 25000, + "platform_instance": "instance", }, }, "sink": {