Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): Add run-id option to put sub-command #11023

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.request_helper import make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
Expand Down Expand Up @@ -166,10 +166,11 @@ def post_entity(
aspect_value: Dict,
cached_session_host: Optional[Tuple[Session, str]] = None,
is_async: Optional[str] = "false",
system_metadata: Union[None, SystemMetadataClass] = None,
) -> int:
endpoint: str = "/aspects/?action=ingestProposal"

proposal = {
proposal: dict[str, Any] = {
"proposal": {
"entityType": entity_type,
"entityUrn": urn,
Expand All @@ -182,6 +183,12 @@ def post_entity(
},
"async": is_async,
}

if system_metadata is not None:
proposal["proposal"]["systemMetadata"] = json.dumps(
pre_json_transform(system_metadata.to_obj())
)

payload = json.dumps(proposal)
url = gms_host + endpoint
curl_command = make_curl_command(session, "POST", url, payload)
Expand Down
20 changes: 13 additions & 7 deletions metadata-ingestion/src/datahub/cli/put_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.cli.cli_utils import post_entity
from datahub.configuration.config_loader import load_config_file
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp import MetadataChangeProposalWrapper, SystemMetadataClass
from datahub.ingestion.graph.client import get_default_graph
from datahub.metadata.schema_classes import (
DataPlatformInfoClass as DataPlatformInfo,
Expand Down Expand Up @@ -36,9 +36,10 @@ def put() -> None:
@click.option("--urn", required=True, type=str)
@click.option("-a", "--aspect", required=True, type=str)
@click.option("-d", "--aspect-data", required=True, type=str)
@click.option("--run-id", type=str, help="Run ID into which we should log the aspect.")
@upgrade.check_upgrade
@telemetry.with_telemetry()
def aspect(urn: str, aspect: str, aspect_data: str) -> None:
def aspect(urn: str, aspect: str, aspect_data: str, run_id: str) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Add tests.

The function aspect is missing test coverage.

Do you want me to generate the unit testing code or open a GitHub issue to track this task?

"""Update a single aspect of an entity"""

entity_type = guess_entity_type(urn)
Expand All @@ -56,6 +57,7 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None:
aspect_name=aspect,
entity_type=entity_type,
aspect_value=aspect_obj,
system_metadata=SystemMetadataClass(runId=run_id),
)
click.secho(f"Update succeeded with status {status}", fg="green")

Expand All @@ -82,8 +84,11 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None:
help="Logo URL that must be reachable from the DataHub UI.",
required=True,
)
@click.option(
"--run-id", type=str, help="Run ID into which we should log the platform."
)
def platform(
ctx: click.Context, name: str, display_name: Optional[str], logo: str
ctx: click.Context, name: str, display_name: Optional[str], logo: str, run_id: str
Comment on lines +96 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Add tests.

The function platform is missing test coverage.

Do you want me to generate the unit testing code or open a GitHub issue to track this task?

) -> None:
"""
Create or update a dataplatform entity in DataHub
Expand All @@ -104,11 +109,12 @@ def platform(
logoUrl=logo,
)
datahub_graph = get_default_graph()
datahub_graph.emit(
MetadataChangeProposalWrapper(
entityUrn=str(platform_urn), aspect=data_platform_info
)
mcp = MetadataChangeProposalWrapper(
entityUrn=str(platform_urn),
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
aspect=data_platform_info,
systemMetadata=SystemMetadataClass(runId=run_id),
)
datahub_graph.emit(mcp)
click.echo(
f"✅ Successfully wrote data platform metadata for {platform_urn} to DataHub ({datahub_graph})"
)
Loading