diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 06861065ca6f2..21841b173c23d 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -13,7 +13,7 @@ from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.request_helper import make_curl_command -from datahub.emitter.serialization_helper import post_json_transform +from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeEvent, MetadataChangeProposal, @@ -153,10 +153,11 @@ def post_entity( aspect_value: Dict, cached_session_host: Optional[Tuple[Session, str]] = None, is_async: Optional[str] = "false", + system_metadata: Union[None, SystemMetadataClass] = None, ) -> int: endpoint: str = "/aspects/?action=ingestProposal" - proposal = { + proposal: Dict[str, Any] = { "proposal": { "entityType": entity_type, "entityUrn": urn, @@ -169,6 +170,12 @@ def post_entity( }, "async": is_async, } + + if system_metadata is not None: + proposal["proposal"]["systemMetadata"] = json.dumps( + pre_json_transform(system_metadata.to_obj()) + ) + payload = json.dumps(proposal) url = gms_host + endpoint curl_command = make_curl_command(session, "POST", url, payload) diff --git a/metadata-ingestion/src/datahub/cli/put_cli.py b/metadata-ingestion/src/datahub/cli/put_cli.py index 40af54c7c7e2e..989b1a6d02fd0 100644 --- a/metadata-ingestion/src/datahub/cli/put_cli.py +++ b/metadata-ingestion/src/datahub/cli/put_cli.py @@ -1,12 +1,12 @@ import logging -from typing import Optional +from typing import Optional, Union import click from click_default_group import DefaultGroup from datahub.cli.cli_utils import post_entity from datahub.configuration.config_loader import load_config_file -from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp import MetadataChangeProposalWrapper, SystemMetadataClass from datahub.ingestion.graph.client import get_default_graph from datahub.metadata.schema_classes import ( DataPlatformInfoClass as DataPlatformInfo, @@ -36,9 +36,15 @@ def put() -> None: @click.option("--urn", required=True, type=str) @click.option("-a", "--aspect", required=True, type=str) @click.option("-d", "--aspect-data", required=True, type=str) +@click.option( + "--run-id", + type=str, + required=False, + help="Run ID into which we should log the aspect.", +) @upgrade.check_upgrade @telemetry.with_telemetry() -def aspect(urn: str, aspect: str, aspect_data: str) -> None: +def aspect(urn: str, aspect: str, aspect_data: str, run_id: Optional[str]) -> None: """Update a single aspect of an entity""" entity_type = guess_entity_type(urn) @@ -48,6 +54,10 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None: client = get_default_graph() + system_metadata: Union[None, SystemMetadataClass] = None + if run_id: + system_metadata = SystemMetadataClass(runId=run_id) + # TODO: Replace with client.emit, requires figuring out the correct subsclass of _Aspect to create from the data status = post_entity( client._session, @@ -56,6 +66,7 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None: aspect_name=aspect, entity_type=entity_type, aspect_value=aspect_obj, + system_metadata=system_metadata, ) click.secho(f"Update succeeded with status {status}", fg="green") @@ -82,8 +93,11 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None: help="Logo URL that must be reachable from the DataHub UI.", required=True, ) +@click.option( + "--run-id", type=str, help="Run ID into which we should log the platform." +) def platform( - ctx: click.Context, name: str, display_name: Optional[str], logo: str + ctx: click.Context, name: str, display_name: Optional[str], logo: str, run_id: str ) -> None: """ Create or update a dataplatform entity in DataHub @@ -104,11 +118,12 @@ def platform( logoUrl=logo, ) datahub_graph = get_default_graph() - datahub_graph.emit( - MetadataChangeProposalWrapper( - entityUrn=str(platform_urn), aspect=data_platform_info - ) + mcp = MetadataChangeProposalWrapper( + entityUrn=str(platform_urn), + aspect=data_platform_info, + systemMetadata=SystemMetadataClass(runId=run_id), ) + datahub_graph.emit(mcp) click.echo( f"✅ Successfully wrote data platform metadata for {platform_urn} to DataHub ({datahub_graph})" )