Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/audit): add client id and version in system metadata props #10829

Merged
merged 4 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
from requests.models import Response
from requests.sessions import Session

import datahub
from datahub.cli import config_utils
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.request_helper import make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import _Aspect
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import SystemMetadataClass, _Aspect
from datahub.utilities.urns.urn import Urn, guess_entity_type

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -689,3 +695,18 @@ def generate_access_token(
return token_name, response.json().get("data", {}).get("createAccessToken", {}).get(
"accessToken", None
)


def ensure_has_system_metadata(
event: Union[
MetadataChangeProposal, MetadataChangeProposalWrapper, MetadataChangeEvent
]
) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put this in a separate utility file?

also, we don't need two methods - let's just have a inject_client_version_system_metadata or something that can handle both MCPs and MCEs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a single method. Do you have a preferred name for the separate file? If you can suggest a name that will be ok for you to approve do let me know and I'll move to using that.

if event.systemMetadata is None:
event.systemMetadata = SystemMetadataClass()
metadata = event.systemMetadata
if metadata.properties is None:
metadata.properties = {}
props = metadata.properties
props["clientId"] = datahub.__package_name__
props["clientVersion"] = datahub.__version__
20 changes: 12 additions & 8 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError, RequestException

from datahub.cli.cli_utils import fixup_gms_url, get_system_auth
from datahub.cli.cli_utils import (
ensure_has_system_metadata,
fixup_gms_url,
get_system_auth,
)
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -228,12 +232,10 @@ def emit_mce(self, mce: MetadataChangeEvent) -> None:
snapshot_fqn = (
f"com.linkedin.metadata.snapshot.{mce.proposedSnapshot.RECORD_SCHEMA.name}"
)
system_metadata_obj = {}
if mce.systemMetadata is not None:
system_metadata_obj = {
"lastObserved": mce.systemMetadata.lastObserved,
"runId": mce.systemMetadata.runId,
}
ensure_has_system_metadata(mce)
# To make lint happy
assert mce.systemMetadata is not None
system_metadata_obj = mce.systemMetadata.to_obj()
snapshot = {
"entity": {"value": {snapshot_fqn: mce_obj}},
"systemMetadata": system_metadata_obj,
Expand All @@ -246,7 +248,7 @@ def emit_mcp(
self, mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposal"

ensure_has_system_metadata(mcp)
mcp_obj = pre_json_transform(mcp.to_obj())
payload = json.dumps({"proposal": mcp_obj})

Expand All @@ -256,6 +258,8 @@ def emit_mcps(
self, mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)

mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps]
payload = json.dumps({"proposals": mcp_objs})
Expand Down
39 changes: 36 additions & 3 deletions metadata-ingestion/tests/unit/test_rest_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
Expand Down Expand Up @@ -125,7 +133,15 @@
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
Expand Down Expand Up @@ -161,7 +177,15 @@
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
Expand Down Expand Up @@ -238,6 +262,15 @@
"value": '{"owners": [{"owner": "urn:li:corpuser:fbar", "type": "DATAOWNER"}], "ownerTypes": {}, "lastModified": {"time": 0, "actor": "urn:li:corpuser:fbar"}}',
"contentType": "application/json",
},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
}
},
),
Expand Down
Loading