Skip to content

Commit

Permalink
feat(fieldpaths): prevent duplicate field paths (datahub-project#10590)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored and sleeperdeep committed Jun 25, 2024
1 parent 39e30d6 commit b35ca06
Show file tree
Hide file tree
Showing 3 changed files with 404 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.linkedin.metadata.aspect.hooks;

import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME;
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;

import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.ReadItem;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
import com.linkedin.schema.EditableSchemaFieldInfo;
import com.linkedin.schema.EditableSchemaFieldInfoArray;
import com.linkedin.schema.EditableSchemaMetadata;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaFieldArray;
import com.linkedin.schema.SchemaMetadata;
import com.linkedin.util.Pair;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Setter
@Getter
@Accessors(chain = true)
public class FieldPathMutator extends MutationHook {
@Nonnull private AspectPluginConfig config;

@Override
protected Stream<Pair<ChangeMCP, Boolean>> writeMutation(
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {

List<Pair<ChangeMCP, Boolean>> results = new LinkedList<>();

for (ChangeMCP item : changeMCPS) {
if (changeTypeFilter(item) && aspectFilter(item)) {
if (item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
results.add(Pair.of(item, processSchemaMetadataAspect(item)));
} else {
results.add(Pair.of(item, processEditableSchemaMetadataAspect(item)));
}
} else {
// no op
results.add(Pair.of(item, false));
}
}

return results.stream();
}

/*
TODO: After some time, this should no longer be required. Assuming at least 1 write has
occurred for all schema aspects.
*/
@Override
protected Stream<Pair<ReadItem, Boolean>> readMutation(
@Nonnull Collection<ReadItem> items, @Nonnull RetrieverContext retrieverContext) {
List<Pair<ReadItem, Boolean>> results = new LinkedList<>();

for (ReadItem item : items) {
if (aspectFilter(item)) {
if (item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
results.add(Pair.of(item, processSchemaMetadataAspect(item)));
} else {
results.add(Pair.of(item, processEditableSchemaMetadataAspect(item)));
}
} else {
// no op
results.add(Pair.of(item, false));
}
}

return results.stream();
}

private static boolean changeTypeFilter(BatchItem item) {
return !ChangeType.DELETE.equals(item.getChangeType())
&& !ChangeType.PATCH.equals(item.getChangeType());
}

private static boolean aspectFilter(ReadItem item) {
return item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)
|| item.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME);
}

private static boolean processEditableSchemaMetadataAspect(ReadItem item) {
boolean mutated = false;
final EditableSchemaMetadata schemaMetadata = item.getAspect(EditableSchemaMetadata.class);
EditableSchemaFieldInfoArray fields = schemaMetadata.getEditableSchemaFieldInfo();
List<EditableSchemaFieldInfo> replaceFields =
deduplicateFieldPaths(fields, EditableSchemaFieldInfo::getFieldPath);
if (!replaceFields.isEmpty()) {
schemaMetadata.setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(replaceFields));
mutated = true;
}
return mutated;
}

private static boolean processSchemaMetadataAspect(ReadItem item) {
boolean mutated = false;
final SchemaMetadata schemaMetadata = item.getAspect(SchemaMetadata.class);
SchemaFieldArray fields = schemaMetadata.getFields();
List<SchemaField> replaceFields = deduplicateFieldPaths(fields, SchemaField::getFieldPath);
if (!replaceFields.isEmpty()) {
schemaMetadata.setFields(new SchemaFieldArray(replaceFields));
mutated = true;
}
return mutated;
}

private static <T> List<T> deduplicateFieldPaths(
Collection<T> fields, Function<T, String> fieldPathExtractor) {

// preserve order
final LinkedHashMap<String, List<T>> grouped =
fields.stream()
.collect(
Collectors.groupingBy(fieldPathExtractor, LinkedHashMap::new, Collectors.toList()));

if (grouped.values().stream().anyMatch(v -> v.size() > 1)) {
log.warn(
"Duplicate field path(s) detected. Dropping duplicates: {}",
grouped.values().stream().filter(v -> v.size() > 1).collect(Collectors.toList()));
// return first
return grouped.values().stream().map(l -> l.get(0)).collect(Collectors.toList());
}

return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package com.linkedin.metadata.aspect.hooks;

import static com.linkedin.metadata.Constants.DOMAINS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME;
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.domain.Domains;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.GraphRetriever;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.schema.EditableSchemaFieldInfo;
import com.linkedin.schema.EditableSchemaFieldInfoArray;
import com.linkedin.schema.EditableSchemaMetadata;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaFieldArray;
import com.linkedin.schema.SchemaFieldDataType;
import com.linkedin.schema.SchemaMetadata;
import com.linkedin.schema.StringType;
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
import com.linkedin.test.metadata.aspect.batch.TestMCP;
import com.linkedin.util.Pair;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

public class FieldPathMutatorTest {

private EntityRegistry entityRegistry;
private RetrieverContext mockRetrieverContext;
private DatasetUrn testDatasetUrn;
private final FieldPathMutator test =
new FieldPathMutator().setConfig(mock(AspectPluginConfig.class));

@BeforeTest
public void init() throws URISyntaxException {
testDatasetUrn =
DatasetUrn.createFromUrn(
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)"));

entityRegistry = new TestEntityRegistry();
AspectRetriever mockAspectRetriever = mock(AspectRetriever.class);
when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry);
GraphRetriever mockGraphRetriever = mock(GraphRetriever.class);
mockRetrieverContext = mock(RetrieverContext.class);
when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever);
when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever);
}

@Test
public void testValidateIncorrectAspect() {
final Domains domains =
new Domains()
.setDomains(new UrnArray(ImmutableList.of(UrnUtils.getUrn("urn:li:domain:123"))));
assertEquals(
test.writeMutation(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(testDatasetUrn)
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(testDatasetUrn.getEntityType())
.getAspectSpec(DOMAINS_ASPECT_NAME))
.recordTemplate(domains)
.build()),
mockRetrieverContext)
.filter(Pair::getSecond)
.count(),
0);
}

@Test
public void testValidateNonDuplicatedSchemaFieldPath() {
final SchemaMetadata schema = getMockSchemaMetadataAspect(false);
assertEquals(
test.writeMutation(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(testDatasetUrn)
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(testDatasetUrn.getEntityType())
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.filter(Pair::getSecond)
.count(),
0);
}

@Test
public void testValidateDuplicatedSchemaFieldPath() {
final SchemaMetadata schema = getMockSchemaMetadataAspect(true);

List<Pair<ChangeMCP, Boolean>> result =
test.writeMutation(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(testDatasetUrn)
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(testDatasetUrn.getEntityType())
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.collect(Collectors.toList());

assertEquals(result.stream().filter(Pair::getSecond).count(), 1);
assertEquals(result.get(0).getFirst().getAspect(SchemaMetadata.class).getFields().size(), 1);
}

@Test
public void testValidateDeleteDuplicatedSchemaFieldPath() {
final SchemaMetadata schema = getMockSchemaMetadataAspect(true);

assertEquals(
test.writeMutation(
Set.of(
TestMCP.builder()
.changeType(ChangeType.DELETE)
.urn(testDatasetUrn)
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(testDatasetUrn.getEntityType())
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.filter(Pair::getSecond)
.count(),
0);
}

@Test
public void testValidateNonDuplicatedEditableSchemaFieldPath() {
final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false);
assertEquals(
test.writeMutation(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(testDatasetUrn)
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(testDatasetUrn.getEntityType())
.getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.filter(Pair::getSecond)
.count(),
0);
}

@Test
public void testValidateDuplicatedEditableSchemaFieldPath() {
final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(true);

List<Pair<ChangeMCP, Boolean>> result =
test.writeMutation(
Set.of(
TestMCP.builder()
.changeType(ChangeType.UPSERT)
.urn(testDatasetUrn)
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
.aspectSpec(
entityRegistry
.getEntitySpec(testDatasetUrn.getEntityType())
.getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
.recordTemplate(schema)
.build()),
mockRetrieverContext)
.collect(Collectors.toList());

assertEquals(result.stream().filter(Pair::getSecond).count(), 1);
assertEquals(
result
.get(0)
.getFirst()
.getAspect(EditableSchemaMetadata.class)
.getEditableSchemaFieldInfo()
.size(),
1);
}

private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) {
List<SchemaField> fields = new ArrayList<>();
fields.add(
new SchemaField()
.setType(
new SchemaFieldDataType()
.setType(SchemaFieldDataType.Type.create(new StringType())))
.setNullable(false)
.setNativeDataType("string")
.setFieldPath("test"));

if (duplicateFields) {
fields.add(
new SchemaField()
.setType(
new SchemaFieldDataType()
.setType(SchemaFieldDataType.Type.create(new StringType())))
.setNullable(false)
.setNativeDataType("string")
.setFieldPath("test"));
}

return new SchemaMetadata()
.setPlatform(testDatasetUrn.getPlatformEntity())
.setFields(new SchemaFieldArray(fields));
}

private EditableSchemaMetadata getMockEditableSchemaMetadataAspect(boolean duplicateFields) {

List<EditableSchemaFieldInfo> fields = new ArrayList<>();
fields.add(new EditableSchemaFieldInfo().setFieldPath("test"));

if (duplicateFields) {
fields.add(new EditableSchemaFieldInfo().setFieldPath("test"));
}

return new EditableSchemaMetadata()
.setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(fields));
}
}
Loading

0 comments on commit b35ca06

Please sign in to comment.