Skip to content

Commit

Permalink
feat(SDK) Add StructuredPropertyPatchBuilder in python sdk and provid…
Browse files Browse the repository at this point in the history
…e sample CRUD files (#10824)
  • Loading branch information
chriscollins3456 committed Jul 2, 2024
1 parent a7f4b71 commit b651d81
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

public interface ArrayMergingTemplate<T extends RecordTemplate> extends Template<T> {

static final String UNIT_SEPARATOR_DELIMITER = "␟";

/**
* Takes an Array field on the {@link RecordTemplate} subtype along with a set of key fields to
* transform into a map Avoids producing side effects by copying nodes, use resulting node and not
Expand Down Expand Up @@ -39,7 +41,15 @@ default JsonNode arrayFieldToMap(
JsonNode nodeClone = node.deepCopy();
if (!keyFields.isEmpty()) {
for (String keyField : keyFields) {
String key = node.get(keyField).asText();
String key;
// if the keyField has a unit separator, we are working with a nested key
if (keyField.contains(UNIT_SEPARATOR_DELIMITER)) {
String[] keyParts = keyField.split(UNIT_SEPARATOR_DELIMITER);
JsonNode keyObject = node.get(keyParts[0]);
key = keyObject.get(keyParts[1]).asText();
} else {
key = node.get(keyField).asText();
}
keyValue =
keyValue.get(key) == null
? (ObjectNode) keyValue.set(key, instance.objectNode()).get(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static com.linkedin.metadata.Constants.GLOSSARY_TERMS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.OWNERSHIP_ASPECT_NAME;
import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME;
import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class AspectTemplateEngine {
CHART_INFO_ASPECT_NAME,
DASHBOARD_INFO_ASPECT_NAME,
STRUCTURED_PROPERTIES_ASPECT_NAME,
STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME,
FORM_INFO_ASPECT_NAME)
.collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.linkedin.metadata.aspect.patch.template.structuredproperty;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.aspect.patch.template.CompoundKeyTemplate;
import com.linkedin.structured.StructuredPropertyDefinition;
import java.util.Collections;
import javax.annotation.Nonnull;

public class StructuredPropertyDefinitionTemplate
extends CompoundKeyTemplate<StructuredPropertyDefinition> {

private static final String ENTITY_TYPES_FIELD_NAME = "entityTypes";
private static final String ALLOWED_VALUES_FIELD_NAME = "allowedValues";
private static final String VALUE_FIELD_NAME = "value";
private static final String UNIT_SEPARATOR_DELIMITER = "␟";

@Override
public StructuredPropertyDefinition getSubtype(RecordTemplate recordTemplate)
throws ClassCastException {
if (recordTemplate instanceof StructuredPropertyDefinition) {
return (StructuredPropertyDefinition) recordTemplate;
}
throw new ClassCastException("Unable to cast RecordTemplate to StructuredPropertyDefinition");
}

@Override
public Class<StructuredPropertyDefinition> getTemplateType() {
return StructuredPropertyDefinition.class;
}

@Nonnull
@Override
public StructuredPropertyDefinition getDefault() {
StructuredPropertyDefinition definition = new StructuredPropertyDefinition();
definition.setQualifiedName("");
definition.setValueType(UrnUtils.getUrn("urn:li:dataType:datahub.string"));
definition.setEntityTypes(new UrnArray());

return definition;
}

@Nonnull
@Override
public JsonNode transformFields(JsonNode baseNode) {
JsonNode transformedNode =
arrayFieldToMap(baseNode, ENTITY_TYPES_FIELD_NAME, Collections.emptyList());

if (transformedNode.get(ALLOWED_VALUES_FIELD_NAME) == null) {
return transformedNode;
}

// allowedValues has a nested key - value.string or value.number depending on type. Mapping
// needs this nested key
JsonNode allowedValues = transformedNode.get(ALLOWED_VALUES_FIELD_NAME);
if (((ArrayNode) allowedValues).size() > 0) {
JsonNode allowedValue = ((ArrayNode) allowedValues).get(0);
JsonNode value = allowedValue.get(VALUE_FIELD_NAME);
String secondaryKeyName = value.fieldNames().next();
return arrayFieldToMap(
transformedNode,
ALLOWED_VALUES_FIELD_NAME,
Collections.singletonList(
VALUE_FIELD_NAME + UNIT_SEPARATOR_DELIMITER + secondaryKeyName));
}

return arrayFieldToMap(
transformedNode, ALLOWED_VALUES_FIELD_NAME, Collections.singletonList(VALUE_FIELD_NAME));
}

@Nonnull
@Override
public JsonNode rebaseFields(JsonNode patched) {
JsonNode patchedNode =
transformedMapToArray(patched, ENTITY_TYPES_FIELD_NAME, Collections.emptyList());

if (patchedNode.get(ALLOWED_VALUES_FIELD_NAME) == null) {
return patchedNode;
}
return transformedMapToArray(
patchedNode, ALLOWED_VALUES_FIELD_NAME, Collections.singletonList(VALUE_FIELD_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.aspect.patch.template.dataset.EditableSchemaMetadataTemplate;
import com.linkedin.metadata.aspect.patch.template.dataset.UpstreamLineageTemplate;
import com.linkedin.metadata.aspect.patch.template.form.FormInfoTemplate;
import com.linkedin.metadata.aspect.patch.template.structuredproperty.StructuredPropertyDefinitionTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.DefaultEntitySpec;
import com.linkedin.metadata.models.EntitySpec;
Expand Down Expand Up @@ -88,6 +89,8 @@ private AspectTemplateEngine populateTemplateEngine(Map<String, AspectSpec> aspe
aspectSpecTemplateMap.put(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME, new DataJobInputOutputTemplate());
aspectSpecTemplateMap.put(
STRUCTURED_PROPERTIES_ASPECT_NAME, new StructuredPropertiesTemplate());
aspectSpecTemplateMap.put(
STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, new StructuredPropertyDefinitionTemplate());
aspectSpecTemplateMap.put(FORM_INFO_ASPECT_NAME, new FormInfoTemplate());
return new AspectTemplateEngine(aspectSpecTemplateMap);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")

dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")


for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.add_structured_property("io.acryl.dataManagement.replicationSLA", 12)
.build()
):
rest_emitter.emit(patch_mcp)


log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")

dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")


for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.remove_structured_property("io.acryl.dataManagement.replicationSLA")
.build()
):
rest_emitter.emit(patch_mcp)


log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Create rest emitter
rest_emitter = DataHubRestEmitter(gms_server="http://localhost:8080")

dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")


for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.set_structured_property("io.acryl.dataManagement.replicationSLA", 120)
.build()
):
rest_emitter.emit(patch_mcp)


log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
PropertyValueClass,
StructuredPropertyDefinitionClass,
)
from datahub.metadata.urns import StructuredPropertyUrn

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Create rest emitter
rest_emitter = DatahubRestEmitter(gms_server="http://localhost:8080")

# first, let's make an open ended structured property that allows one text value
text_property_urn = StructuredPropertyUrn("openTextProperty")
text_property_definition = StructuredPropertyDefinitionClass(
qualifiedName="io.acryl.openTextProperty",
displayName="Open Text Property",
valueType="urn:li:dataType:datahub.string",
cardinality="SINGLE",
entityTypes=[
"urn:li:entityType:datahub.dataset",
"urn:li:entityType:datahub.container",
],
description="This structured property allows a signle open ended response as a value",
immutable=False,
)

event_prop_1: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(text_property_urn),
aspect=text_property_definition,
)
rest_emitter.emit(event_prop_1)

# next, let's make a property that allows for multiple datahub entity urns as values
# This example property could be used to reference other users or groups in datahub
urn_property_urn = StructuredPropertyUrn("dataSteward")
urn_property_definition = StructuredPropertyDefinitionClass(
qualifiedName="io.acryl.dataManagement.dataSteward",
displayName="Data Steward",
valueType="urn:li:dataType:datahub.urn",
cardinality="MULTIPLE",
entityTypes=["urn:li:entityType:datahub.dataset"],
description="The data stewards of this asset are in charge of ensuring data cleanliness and governance",
immutable=True,
typeQualifier={
"allowedTypes": [
"urn:li:entityType:datahub.corpuser",
"urn:li:entityType:datahub.corpGroup",
]
}, # this line ensures only user or group urns can be assigned as values
)

event_prop_2: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(urn_property_urn),
aspect=urn_property_definition,
)
rest_emitter.emit(event_prop_2)

# finally, let's make a single select number property with a few allowed options
number_property_urn = StructuredPropertyUrn("replicationSLA")
number_property_definition = StructuredPropertyDefinitionClass(
qualifiedName="io.acryl.dataManagement.replicationSLA",
displayName="Retention Time",
valueType="urn:li:dataType:datahub.number",
cardinality="SINGLE",
entityTypes=[
"urn:li:entityType:datahub.dataset",
"urn:li:entityType:datahub.dataFlow",
],
description="SLA for how long data can be delayed before replicating to the destination cluster",
immutable=False,
allowedValues=[
PropertyValueClass(
value=30,
description="30 days, usually reserved for datasets that are ephemeral and contain pii",
),
PropertyValueClass(
value=90,
description="Use this for datasets that drive monthly reporting but contain pii",
),
PropertyValueClass(
value=365,
description="Use this for non-sensitive data that can be retained for longer",
),
],
)

event_prop_3: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(number_property_urn),
aspect=number_property_definition,
)
rest_emitter.emit(event_prop_3)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from typing import Union

from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.metadata.urns import StructuredPropertyUrn
from datahub.specific.structured_property import StructuredPropertyPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


# Get an emitter, either REST or Kafka, this example shows you both
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
USE_REST_EMITTER = True
if USE_REST_EMITTER:
gms_endpoint = "http://localhost:8080"
return DataHubRestEmitter(gms_server=gms_endpoint)
else:
kafka_server = "localhost:9092"
schema_registry_url = "http://localhost:8081"
return DatahubKafkaEmitter(
config=KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap=kafka_server, schema_registry_url=schema_registry_url
)
)
)


# input your unique structured property ID
property_urn = StructuredPropertyUrn("dataSteward")

with get_emitter() as emitter:
for patch_mcp in (
StructuredPropertyPatchBuilder(str(property_urn))
.set_display_name("test display name")
.set_cardinality("MULTIPLE")
.add_entity_type("urn:li:entityType:datahub.dataJob")
.build()
):
emitter.emit(patch_mcp)
Loading

0 comments on commit b651d81

Please sign in to comment.