Skip to content

Commit

Permalink
feat(structured-properties): immutable flag (datahub-project#10461)
Browse files Browse the repository at this point in the history
Co-authored-by: Chris Collins <chriscollins3456@gmail.com>
  • Loading branch information
david-leifker and chriscollins3456 authored May 9, 2024
1 parent c8bb7dd commit e7c7015
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private void mapStructuredPropertyDefinition(
definition.setQualifiedName(gmsDefinition.getQualifiedName());
definition.setCardinality(
PropertyCardinality.valueOf(gmsDefinition.getCardinality().toString()));
definition.setImmutable(gmsDefinition.isImmutable());
definition.setValueType(createDataTypeEntity(gmsDefinition.getValueType()));
if (gmsDefinition.hasDisplayName()) {
definition.setDisplayName(gmsDefinition.getDisplayName());
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/properties.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type StructuredPropertyDefinition {
Entity types that this structured property can be applied to
"""
entityTypes: [EntityTypeEntity!]!

"""
Whether or not this structured property is immutable
"""
immutable: Boolean!
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface Props {
export function EditColumn({ propertyRow }: Props) {
const [isEditModalVisible, setIsEditModalVisible] = useState(false);

if (!propertyRow.structuredProperty) {
if (!propertyRow.structuredProperty || propertyRow.structuredProperty?.definition.immutable) {
return null;
}

Expand Down
1 change: 1 addition & 0 deletions datahub-web-react/src/graphql/fragments.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,7 @@ fragment structuredPropertyFields on StructuredPropertyEntity {
qualifiedName
description
cardinality
immutable
valueType {
info {
type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.linkedin.structured.StructuredProperties;
import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.structured.StructuredPropertyValueAssignment;
import com.linkedin.util.Pair;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,9 +39,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

/** A Validator for StructuredProperties Aspect that is attached to entities like Datasets, etc. */
Expand Down Expand Up @@ -92,20 +95,22 @@ protected Stream<AspectValidationException> validateProposedAspects(
@Override
protected Stream<AspectValidationException> validatePreCommitAspects(
@Nonnull Collection<ChangeMCP> changeMCPs, @Nonnull RetrieverContext retrieverContext) {
return Stream.empty();
return validateImmutable(
changeMCPs.stream()
.filter(
i ->
ChangeType.DELETE.equals(i.getChangeType())
|| CHANGE_TYPES.contains(i.getChangeType()))
.collect(Collectors.toList()),
retrieverContext.getAspectRetriever());
}

public static Stream<AspectValidationException> validateProposedUpserts(
@Nonnull Collection<BatchItem> mcpItems, @Nonnull AspectRetriever aspectRetriever) {

ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection();

// Validate propertyUrns
Set<Urn> validPropertyUrns = validateStructuredPropertyUrns(mcpItems, exceptions);

// Fetch property aspects for further validation
Map<Urn, Map<String, Aspect>> allStructuredPropertiesAspects =
fetchPropertyAspects(validPropertyUrns, aspectRetriever);
fetchPropertyAspects(mcpItems, aspectRetriever, exceptions, false);

// Validate assignments
for (BatchItem i : exceptions.successful(mcpItems)) {
Expand All @@ -120,15 +125,13 @@ public static Stream<AspectValidationException> validateProposedUpserts(
softDeleteCheck(i, propertyAspects, "Cannot apply a soft deleted Structured Property value")
.ifPresent(exceptions::addException);

Aspect structuredPropertyDefinitionAspect =
propertyAspects.get(STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME);
if (structuredPropertyDefinitionAspect == null) {
StructuredPropertyDefinition structuredPropertyDefinition =
lookupPropertyDefinition(propertyUrn, allStructuredPropertiesAspects);
if (structuredPropertyDefinition == null) {
exceptions.addException(i, "Unexpected null value found.");
}

StructuredPropertyDefinition structuredPropertyDefinition =
new StructuredPropertyDefinition(structuredPropertyDefinitionAspect.data());
log.warn(
log.debug(
"Retrieved property definition for {}. {}", propertyUrn, structuredPropertyDefinition);
if (structuredPropertyDefinition != null) {
PrimitivePropertyValueArray values = structuredPropertyValueAssignment.getValues();
Expand Down Expand Up @@ -158,8 +161,73 @@ public static Stream<AspectValidationException> validateProposedUpserts(
return exceptions.streamAllExceptions();
}

public static Stream<AspectValidationException> validateImmutable(
@Nonnull Collection<ChangeMCP> changeMCPs, @Nonnull AspectRetriever aspectRetriever) {

ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection();
final Map<Urn, Map<String, Aspect>> allStructuredPropertiesAspects =
fetchPropertyAspects(changeMCPs, aspectRetriever, exceptions, true);

Set<Urn> immutablePropertyUrns =
allStructuredPropertiesAspects.keySet().stream()
.map(
stringAspectMap ->
Pair.of(
stringAspectMap,
lookupPropertyDefinition(stringAspectMap, allStructuredPropertiesAspects)))
.filter(defPair -> defPair.getSecond() != null && defPair.getSecond().isImmutable())
.map(Pair::getFirst)
.collect(Collectors.toSet());

// Validate immutable assignments
for (ChangeMCP i : exceptions.successful(changeMCPs)) {

// only apply immutable validation if previous properties exist
if (i.getPreviousRecordTemplate() != null) {
Map<Urn, StructuredPropertyValueAssignment> newImmutablePropertyMap =
i.getAspect(StructuredProperties.class).getProperties().stream()
.filter(assign -> immutablePropertyUrns.contains(assign.getPropertyUrn()))
.collect(
Collectors.toMap(
StructuredPropertyValueAssignment::getPropertyUrn, Function.identity()));
Map<Urn, StructuredPropertyValueAssignment> oldImmutablePropertyMap =
i.getPreviousAspect(StructuredProperties.class).getProperties().stream()
.filter(assign -> immutablePropertyUrns.contains(assign.getPropertyUrn()))
.collect(
Collectors.toMap(
StructuredPropertyValueAssignment::getPropertyUrn, Function.identity()));

// upsert/mutation path
newImmutablePropertyMap
.entrySet()
.forEach(
entry -> {
Urn propertyUrn = entry.getKey();
StructuredPropertyValueAssignment assignment = entry.getValue();

if (oldImmutablePropertyMap.containsKey(propertyUrn)
&& !oldImmutablePropertyMap.get(propertyUrn).equals(assignment)) {
exceptions.addException(
i, String.format("Cannot mutate an immutable property: %s", propertyUrn));
}
});

// delete path
oldImmutablePropertyMap.entrySet().stream()
.filter(entry -> !newImmutablePropertyMap.containsKey(entry.getKey()))
.forEach(
entry ->
exceptions.addException(
i,
String.format("Cannot delete an immutable property %s", entry.getKey())));
}
}

return exceptions.streamAllExceptions();
}

private static Set<Urn> validateStructuredPropertyUrns(
Collection<BatchItem> mcpItems, ValidationExceptionCollection exceptions) {
Collection<? extends BatchItem> mcpItems, ValidationExceptionCollection exceptions) {
Set<Urn> validPropertyUrns = new HashSet<>();

for (BatchItem i : exceptions.successful(mcpItems)) {
Expand Down Expand Up @@ -202,6 +270,17 @@ private static Set<Urn> validateStructuredPropertyUrns(
return validPropertyUrns;
}

private static Set<Urn> previousStructuredPropertyUrns(Collection<? extends BatchItem> mcpItems) {
return mcpItems.stream()
.filter(i -> i instanceof ChangeMCP)
.map(i -> ((ChangeMCP) i))
.filter(i -> i.getPreviousRecordTemplate() != null)
.flatMap(i -> i.getPreviousAspect(StructuredProperties.class).getProperties().stream())
.map(StructuredPropertyValueAssignment::getPropertyUrn)
.filter(propertyUrn -> propertyUrn.getEntityType().equals("structuredProperty"))
.collect(Collectors.toSet());
}

private static Optional<AspectValidationException> validateAllowedValues(
BatchItem item,
Urn propertyUrn,
Expand Down Expand Up @@ -338,14 +417,40 @@ private static String getValueTypeId(@Nonnull final Urn valueType) {
}

private static Map<Urn, Map<String, Aspect>> fetchPropertyAspects(
Set<Urn> structuredPropertyUrns, AspectRetriever aspectRetriever) {
if (structuredPropertyUrns.isEmpty()) {
@Nonnull Collection<? extends BatchItem> mcpItems,
AspectRetriever aspectRetriever,
@Nonnull ValidationExceptionCollection exceptions,
boolean includePrevious) {

// Validate propertyUrns
Set<Urn> validPropertyUrns =
Stream.concat(
validateStructuredPropertyUrns(mcpItems, exceptions).stream(),
includePrevious
? previousStructuredPropertyUrns(mcpItems).stream()
: Stream.empty())
.collect(Collectors.toSet());

if (validPropertyUrns.isEmpty()) {
return Collections.emptyMap();
} else {
return aspectRetriever.getLatestAspectObjects(
structuredPropertyUrns,
validPropertyUrns,
ImmutableSet.of(
Constants.STATUS_ASPECT_NAME, STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME));
}
}

@Nullable
private static StructuredPropertyDefinition lookupPropertyDefinition(
@Nonnull Urn propertyUrn,
@Nonnull Map<Urn, Map<String, Aspect>> allStructuredPropertiesAspects) {
Map<String, Aspect> propertyAspects =
allStructuredPropertiesAspects.getOrDefault(propertyUrn, Collections.emptyMap());
Aspect structuredPropertyDefinitionAspect =
propertyAspects.get(STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME);
return structuredPropertyDefinitionAspect == null
? null
: new StructuredPropertyDefinition(structuredPropertyDefinitionAspect.data());
}
}
Loading

0 comments on commit e7c7015

Please sign in to comment.