Skip to content

Commit

Permalink
move to use properties instead
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal committed Jul 5, 2024
1 parent ebe7b2d commit bbbd6dd
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
21 changes: 20 additions & 1 deletion metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
from requests.models import Response
from requests.sessions import Session

import datahub
from datahub.cli import config_utils
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.request_helper import make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import _Aspect
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import SystemMetadataClass, _Aspect
from datahub.utilities.urns.urn import Urn, guess_entity_type

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -689,3 +695,16 @@ def generate_access_token(
return token_name, response.json().get("data", {}).get("createAccessToken", {}).get(
"accessToken", None
)


def ensure_has_system_metadata(
event: Union[MetadataChangeProposal, MetadataChangeProposalWrapper, MetadataChangeEvent]
) -> None:
if event.systemMetadata is None:
event.systemMetadata = SystemMetadataClass()
metadata = event.systemMetadata
if metadata.properties is None:
metadata.properties = {}
props = metadata.properties
props["clientId"] = datahub.__package_name__
props["clientVersion"] = datahub.__version__
18 changes: 10 additions & 8 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError, RequestException

from datahub.cli.cli_utils import fixup_gms_url, get_system_auth
from datahub.cli.cli_utils import (
ensure_has_system_metadata,
fixup_gms_url,
get_system_auth,
)
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -228,12 +232,8 @@ def emit_mce(self, mce: MetadataChangeEvent) -> None:
snapshot_fqn = (
f"com.linkedin.metadata.snapshot.{mce.proposedSnapshot.RECORD_SCHEMA.name}"
)
system_metadata_obj = {}
if mce.systemMetadata is not None:
system_metadata_obj = {
"lastObserved": mce.systemMetadata.lastObserved,
"runId": mce.systemMetadata.runId,
}
ensure_has_system_metadata(mce)
system_metadata_obj = mce.systemMetadata.to_obj()
snapshot = {
"entity": {"value": {snapshot_fqn: mce_obj}},
"systemMetadata": system_metadata_obj,
Expand All @@ -246,7 +246,7 @@ def emit_mcp(
self, mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposal"

ensure_has_system_metadata(mcp)
mcp_obj = pre_json_transform(mcp.to_obj())
payload = json.dumps({"proposal": mcp_obj})

Expand All @@ -256,6 +256,8 @@ def emit_mcps(
self, mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)

mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps]
payload = json.dumps({"proposals": mcp_objs})
Expand Down

0 comments on commit bbbd6dd

Please sign in to comment.