Skip to content

Commit

Permalink
feat(ingest/audit): add client id and version in system metadata props (
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jul 8, 2024
1 parent 43bac36 commit 41b9e15
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 12 deletions.
23 changes: 22 additions & 1 deletion metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
from requests.models import Response
from requests.sessions import Session

import datahub
from datahub.cli import config_utils
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.request_helper import make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import _Aspect
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import SystemMetadataClass, _Aspect
from datahub.utilities.urns.urn import Urn, guess_entity_type

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -689,3 +695,18 @@ def generate_access_token(
return token_name, response.json().get("data", {}).get("createAccessToken", {}).get(
"accessToken", None
)


def ensure_has_system_metadata(
event: Union[
MetadataChangeProposal, MetadataChangeProposalWrapper, MetadataChangeEvent
]
) -> None:
if event.systemMetadata is None:
event.systemMetadata = SystemMetadataClass()
metadata = event.systemMetadata
if metadata.properties is None:
metadata.properties = {}
props = metadata.properties
props["clientId"] = datahub.__package_name__
props["clientVersion"] = datahub.__version__
20 changes: 12 additions & 8 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError, RequestException

from datahub.cli.cli_utils import fixup_gms_url, get_system_auth
from datahub.cli.cli_utils import (
ensure_has_system_metadata,
fixup_gms_url,
get_system_auth,
)
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -228,12 +232,10 @@ def emit_mce(self, mce: MetadataChangeEvent) -> None:
snapshot_fqn = (
f"com.linkedin.metadata.snapshot.{mce.proposedSnapshot.RECORD_SCHEMA.name}"
)
system_metadata_obj = {}
if mce.systemMetadata is not None:
system_metadata_obj = {
"lastObserved": mce.systemMetadata.lastObserved,
"runId": mce.systemMetadata.runId,
}
ensure_has_system_metadata(mce)
# To make lint happy
assert mce.systemMetadata is not None
system_metadata_obj = mce.systemMetadata.to_obj()
snapshot = {
"entity": {"value": {snapshot_fqn: mce_obj}},
"systemMetadata": system_metadata_obj,
Expand All @@ -246,7 +248,7 @@ def emit_mcp(
self, mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposal"

ensure_has_system_metadata(mcp)
mcp_obj = pre_json_transform(mcp.to_obj())
payload = json.dumps({"proposal": mcp_obj})

Expand All @@ -256,6 +258,8 @@ def emit_mcps(
self, mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)

mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps]
payload = json.dumps({"proposals": mcp_objs})
Expand Down
39 changes: 36 additions & 3 deletions metadata-ingestion/tests/unit/test_rest_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
Expand Down Expand Up @@ -125,7 +133,15 @@
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
Expand Down Expand Up @@ -161,7 +177,15 @@
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
Expand Down Expand Up @@ -238,6 +262,15 @@
"value": '{"owners": [{"owner": "urn:li:corpuser:fbar", "type": "DATAOWNER"}], "ownerTypes": {}, "lastModified": {"time": 0, "actor": "urn:li:corpuser:fbar"}}',
"contentType": "application/json",
},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
}
},
),
Expand Down

0 comments on commit 41b9e15

Please sign in to comment.