diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index d8939ddcff09c..45fff4aca5e7e 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -13,11 +13,17 @@ from requests.models import Response from requests.sessions import Session +import datahub from datahub.cli import config_utils from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.request_helper import make_curl_command from datahub.emitter.serialization_helper import post_json_transform -from datahub.metadata.schema_classes import _Aspect +from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( + MetadataChangeEvent, + MetadataChangeProposal, +) +from datahub.metadata.schema_classes import SystemMetadataClass, _Aspect from datahub.utilities.urns.urn import Urn, guess_entity_type log = logging.getLogger(__name__) @@ -689,3 +695,16 @@ def generate_access_token( return token_name, response.json().get("data", {}).get("createAccessToken", {}).get( "accessToken", None ) + + +def ensure_has_system_metadata( + event: Union[MetadataChangeProposal, MetadataChangeProposalWrapper, MetadataChangeEvent] +) -> None: + if event.systemMetadata is None: + event.systemMetadata = SystemMetadataClass() + metadata = event.systemMetadata + if metadata.properties is None: + metadata.properties = {} + props = metadata.properties + props["clientId"] = datahub.__package_name__ + props["clientVersion"] = datahub.__version__ diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index e6257796aa4c4..6ecf411b71635 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -10,7 +10,11 @@ from requests.adapters import HTTPAdapter, Retry from requests.exceptions import HTTPError, RequestException -from datahub.cli.cli_utils import fixup_gms_url, get_system_auth +from datahub.cli.cli_utils import ( + ensure_has_system_metadata, + fixup_gms_url, + get_system_auth, +) from datahub.configuration.common import ConfigurationError, OperationalError from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -228,12 +232,8 @@ def emit_mce(self, mce: MetadataChangeEvent) -> None: snapshot_fqn = ( f"com.linkedin.metadata.snapshot.{mce.proposedSnapshot.RECORD_SCHEMA.name}" ) - system_metadata_obj = {} - if mce.systemMetadata is not None: - system_metadata_obj = { - "lastObserved": mce.systemMetadata.lastObserved, - "runId": mce.systemMetadata.runId, - } + ensure_has_system_metadata(mce) + system_metadata_obj = mce.systemMetadata.to_obj() snapshot = { "entity": {"value": {snapshot_fqn: mce_obj}}, "systemMetadata": system_metadata_obj, @@ -246,7 +246,7 @@ def emit_mcp( self, mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper] ) -> None: url = f"{self._gms_server}/aspects?action=ingestProposal" - + ensure_has_system_metadata(mcp) mcp_obj = pre_json_transform(mcp.to_obj()) payload = json.dumps({"proposal": mcp_obj}) @@ -256,6 +256,8 @@ def emit_mcps( self, mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]] ) -> None: url = f"{self._gms_server}/aspects?action=ingestProposalBatch" + for mcp in mcps: + ensure_has_system_metadata(mcp) mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps] payload = json.dumps({"proposals": mcp_objs})