forked from datahub-project/datahub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(fieldpaths): prevent duplicate field paths (datahub-project#10590)
- Loading branch information
1 parent
c9cf518
commit 4031193
Showing
3 changed files
with
404 additions
and
0 deletions.
There are no files selected for viewing
142 changes: 142 additions & 0 deletions
142
entity-registry/src/main/java/com/linkedin/metadata/aspect/hooks/FieldPathMutator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
package com.linkedin.metadata.aspect.hooks; | ||
|
||
import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME; | ||
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; | ||
|
||
import com.linkedin.events.metadata.ChangeType; | ||
import com.linkedin.metadata.aspect.ReadItem; | ||
import com.linkedin.metadata.aspect.RetrieverContext; | ||
import com.linkedin.metadata.aspect.batch.BatchItem; | ||
import com.linkedin.metadata.aspect.batch.ChangeMCP; | ||
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; | ||
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; | ||
import com.linkedin.schema.EditableSchemaFieldInfo; | ||
import com.linkedin.schema.EditableSchemaFieldInfoArray; | ||
import com.linkedin.schema.EditableSchemaMetadata; | ||
import com.linkedin.schema.SchemaField; | ||
import com.linkedin.schema.SchemaFieldArray; | ||
import com.linkedin.schema.SchemaMetadata; | ||
import com.linkedin.util.Pair; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.LinkedHashMap; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
import javax.annotation.Nonnull; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
import lombok.experimental.Accessors; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
@Setter | ||
@Getter | ||
@Accessors(chain = true) | ||
public class FieldPathMutator extends MutationHook { | ||
@Nonnull private AspectPluginConfig config; | ||
|
||
@Override | ||
protected Stream<Pair<ChangeMCP, Boolean>> writeMutation( | ||
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) { | ||
|
||
List<Pair<ChangeMCP, Boolean>> results = new LinkedList<>(); | ||
|
||
for (ChangeMCP item : changeMCPS) { | ||
if (changeTypeFilter(item) && aspectFilter(item)) { | ||
if (item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { | ||
results.add(Pair.of(item, processSchemaMetadataAspect(item))); | ||
} else { | ||
results.add(Pair.of(item, processEditableSchemaMetadataAspect(item))); | ||
} | ||
} else { | ||
// no op | ||
results.add(Pair.of(item, false)); | ||
} | ||
} | ||
|
||
return results.stream(); | ||
} | ||
|
||
/* | ||
TODO: After some time, this should no longer be required. Assuming at least 1 write has | ||
occurred for all schema aspects. | ||
*/ | ||
@Override | ||
protected Stream<Pair<ReadItem, Boolean>> readMutation( | ||
@Nonnull Collection<ReadItem> items, @Nonnull RetrieverContext retrieverContext) { | ||
List<Pair<ReadItem, Boolean>> results = new LinkedList<>(); | ||
|
||
for (ReadItem item : items) { | ||
if (aspectFilter(item)) { | ||
if (item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { | ||
results.add(Pair.of(item, processSchemaMetadataAspect(item))); | ||
} else { | ||
results.add(Pair.of(item, processEditableSchemaMetadataAspect(item))); | ||
} | ||
} else { | ||
// no op | ||
results.add(Pair.of(item, false)); | ||
} | ||
} | ||
|
||
return results.stream(); | ||
} | ||
|
||
private static boolean changeTypeFilter(BatchItem item) { | ||
return !ChangeType.DELETE.equals(item.getChangeType()) | ||
&& !ChangeType.PATCH.equals(item.getChangeType()); | ||
} | ||
|
||
private static boolean aspectFilter(ReadItem item) { | ||
return item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME) | ||
|| item.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME); | ||
} | ||
|
||
private static boolean processEditableSchemaMetadataAspect(ReadItem item) { | ||
boolean mutated = false; | ||
final EditableSchemaMetadata schemaMetadata = item.getAspect(EditableSchemaMetadata.class); | ||
EditableSchemaFieldInfoArray fields = schemaMetadata.getEditableSchemaFieldInfo(); | ||
List<EditableSchemaFieldInfo> replaceFields = | ||
deduplicateFieldPaths(fields, EditableSchemaFieldInfo::getFieldPath); | ||
if (!replaceFields.isEmpty()) { | ||
schemaMetadata.setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(replaceFields)); | ||
mutated = true; | ||
} | ||
return mutated; | ||
} | ||
|
||
private static boolean processSchemaMetadataAspect(ReadItem item) { | ||
boolean mutated = false; | ||
final SchemaMetadata schemaMetadata = item.getAspect(SchemaMetadata.class); | ||
SchemaFieldArray fields = schemaMetadata.getFields(); | ||
List<SchemaField> replaceFields = deduplicateFieldPaths(fields, SchemaField::getFieldPath); | ||
if (!replaceFields.isEmpty()) { | ||
schemaMetadata.setFields(new SchemaFieldArray(replaceFields)); | ||
mutated = true; | ||
} | ||
return mutated; | ||
} | ||
|
||
private static <T> List<T> deduplicateFieldPaths( | ||
Collection<T> fields, Function<T, String> fieldPathExtractor) { | ||
|
||
// preserve order | ||
final LinkedHashMap<String, List<T>> grouped = | ||
fields.stream() | ||
.collect( | ||
Collectors.groupingBy(fieldPathExtractor, LinkedHashMap::new, Collectors.toList())); | ||
|
||
if (grouped.values().stream().anyMatch(v -> v.size() > 1)) { | ||
log.warn( | ||
"Duplicate field path(s) detected. Dropping duplicates: {}", | ||
grouped.values().stream().filter(v -> v.size() > 1).collect(Collectors.toList())); | ||
// return first | ||
return grouped.values().stream().map(l -> l.get(0)).collect(Collectors.toList()); | ||
} | ||
|
||
return Collections.emptyList(); | ||
} | ||
} |
249 changes: 249 additions & 0 deletions
249
entity-registry/src/test/java/com/linkedin/metadata/aspect/hooks/FieldPathMutatorTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
package com.linkedin.metadata.aspect.hooks; | ||
|
||
import static com.linkedin.metadata.Constants.DOMAINS_ASPECT_NAME; | ||
import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME; | ||
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
import static org.testng.Assert.assertEquals; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import com.linkedin.common.UrnArray; | ||
import com.linkedin.common.urn.DatasetUrn; | ||
import com.linkedin.common.urn.UrnUtils; | ||
import com.linkedin.domain.Domains; | ||
import com.linkedin.events.metadata.ChangeType; | ||
import com.linkedin.metadata.aspect.AspectRetriever; | ||
import com.linkedin.metadata.aspect.GraphRetriever; | ||
import com.linkedin.metadata.aspect.RetrieverContext; | ||
import com.linkedin.metadata.aspect.batch.ChangeMCP; | ||
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; | ||
import com.linkedin.metadata.models.registry.EntityRegistry; | ||
import com.linkedin.schema.EditableSchemaFieldInfo; | ||
import com.linkedin.schema.EditableSchemaFieldInfoArray; | ||
import com.linkedin.schema.EditableSchemaMetadata; | ||
import com.linkedin.schema.SchemaField; | ||
import com.linkedin.schema.SchemaFieldArray; | ||
import com.linkedin.schema.SchemaFieldDataType; | ||
import com.linkedin.schema.SchemaMetadata; | ||
import com.linkedin.schema.StringType; | ||
import com.linkedin.test.metadata.aspect.TestEntityRegistry; | ||
import com.linkedin.test.metadata.aspect.batch.TestMCP; | ||
import com.linkedin.util.Pair; | ||
import java.net.URISyntaxException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import org.testng.annotations.BeforeTest; | ||
import org.testng.annotations.Test; | ||
|
||
public class FieldPathMutatorTest { | ||
|
||
private EntityRegistry entityRegistry; | ||
private RetrieverContext mockRetrieverContext; | ||
private DatasetUrn testDatasetUrn; | ||
private final FieldPathMutator test = | ||
new FieldPathMutator().setConfig(mock(AspectPluginConfig.class)); | ||
|
||
@BeforeTest | ||
public void init() throws URISyntaxException { | ||
testDatasetUrn = | ||
DatasetUrn.createFromUrn( | ||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)")); | ||
|
||
entityRegistry = new TestEntityRegistry(); | ||
AspectRetriever mockAspectRetriever = mock(AspectRetriever.class); | ||
when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry); | ||
GraphRetriever mockGraphRetriever = mock(GraphRetriever.class); | ||
mockRetrieverContext = mock(RetrieverContext.class); | ||
when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever); | ||
when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever); | ||
} | ||
|
||
@Test | ||
public void testValidateIncorrectAspect() { | ||
final Domains domains = | ||
new Domains() | ||
.setDomains(new UrnArray(ImmutableList.of(UrnUtils.getUrn("urn:li:domain:123")))); | ||
assertEquals( | ||
test.writeMutation( | ||
Set.of( | ||
TestMCP.builder() | ||
.changeType(ChangeType.UPSERT) | ||
.urn(testDatasetUrn) | ||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) | ||
.aspectSpec( | ||
entityRegistry | ||
.getEntitySpec(testDatasetUrn.getEntityType()) | ||
.getAspectSpec(DOMAINS_ASPECT_NAME)) | ||
.recordTemplate(domains) | ||
.build()), | ||
mockRetrieverContext) | ||
.filter(Pair::getSecond) | ||
.count(), | ||
0); | ||
} | ||
|
||
@Test | ||
public void testValidateNonDuplicatedSchemaFieldPath() { | ||
final SchemaMetadata schema = getMockSchemaMetadataAspect(false); | ||
assertEquals( | ||
test.writeMutation( | ||
Set.of( | ||
TestMCP.builder() | ||
.changeType(ChangeType.UPSERT) | ||
.urn(testDatasetUrn) | ||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) | ||
.aspectSpec( | ||
entityRegistry | ||
.getEntitySpec(testDatasetUrn.getEntityType()) | ||
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) | ||
.recordTemplate(schema) | ||
.build()), | ||
mockRetrieverContext) | ||
.filter(Pair::getSecond) | ||
.count(), | ||
0); | ||
} | ||
|
||
@Test | ||
public void testValidateDuplicatedSchemaFieldPath() { | ||
final SchemaMetadata schema = getMockSchemaMetadataAspect(true); | ||
|
||
List<Pair<ChangeMCP, Boolean>> result = | ||
test.writeMutation( | ||
Set.of( | ||
TestMCP.builder() | ||
.changeType(ChangeType.UPSERT) | ||
.urn(testDatasetUrn) | ||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) | ||
.aspectSpec( | ||
entityRegistry | ||
.getEntitySpec(testDatasetUrn.getEntityType()) | ||
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) | ||
.recordTemplate(schema) | ||
.build()), | ||
mockRetrieverContext) | ||
.collect(Collectors.toList()); | ||
|
||
assertEquals(result.stream().filter(Pair::getSecond).count(), 1); | ||
assertEquals(result.get(0).getFirst().getAspect(SchemaMetadata.class).getFields().size(), 1); | ||
} | ||
|
||
@Test | ||
public void testValidateDeleteDuplicatedSchemaFieldPath() { | ||
final SchemaMetadata schema = getMockSchemaMetadataAspect(true); | ||
|
||
assertEquals( | ||
test.writeMutation( | ||
Set.of( | ||
TestMCP.builder() | ||
.changeType(ChangeType.DELETE) | ||
.urn(testDatasetUrn) | ||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) | ||
.aspectSpec( | ||
entityRegistry | ||
.getEntitySpec(testDatasetUrn.getEntityType()) | ||
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) | ||
.recordTemplate(schema) | ||
.build()), | ||
mockRetrieverContext) | ||
.filter(Pair::getSecond) | ||
.count(), | ||
0); | ||
} | ||
|
||
@Test | ||
public void testValidateNonDuplicatedEditableSchemaFieldPath() { | ||
final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false); | ||
assertEquals( | ||
test.writeMutation( | ||
Set.of( | ||
TestMCP.builder() | ||
.changeType(ChangeType.UPSERT) | ||
.urn(testDatasetUrn) | ||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) | ||
.aspectSpec( | ||
entityRegistry | ||
.getEntitySpec(testDatasetUrn.getEntityType()) | ||
.getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) | ||
.recordTemplate(schema) | ||
.build()), | ||
mockRetrieverContext) | ||
.filter(Pair::getSecond) | ||
.count(), | ||
0); | ||
} | ||
|
||
@Test | ||
public void testValidateDuplicatedEditableSchemaFieldPath() { | ||
final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(true); | ||
|
||
List<Pair<ChangeMCP, Boolean>> result = | ||
test.writeMutation( | ||
Set.of( | ||
TestMCP.builder() | ||
.changeType(ChangeType.UPSERT) | ||
.urn(testDatasetUrn) | ||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) | ||
.aspectSpec( | ||
entityRegistry | ||
.getEntitySpec(testDatasetUrn.getEntityType()) | ||
.getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) | ||
.recordTemplate(schema) | ||
.build()), | ||
mockRetrieverContext) | ||
.collect(Collectors.toList()); | ||
|
||
assertEquals(result.stream().filter(Pair::getSecond).count(), 1); | ||
assertEquals( | ||
result | ||
.get(0) | ||
.getFirst() | ||
.getAspect(EditableSchemaMetadata.class) | ||
.getEditableSchemaFieldInfo() | ||
.size(), | ||
1); | ||
} | ||
|
||
private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { | ||
List<SchemaField> fields = new ArrayList<>(); | ||
fields.add( | ||
new SchemaField() | ||
.setType( | ||
new SchemaFieldDataType() | ||
.setType(SchemaFieldDataType.Type.create(new StringType()))) | ||
.setNullable(false) | ||
.setNativeDataType("string") | ||
.setFieldPath("test")); | ||
|
||
if (duplicateFields) { | ||
fields.add( | ||
new SchemaField() | ||
.setType( | ||
new SchemaFieldDataType() | ||
.setType(SchemaFieldDataType.Type.create(new StringType()))) | ||
.setNullable(false) | ||
.setNativeDataType("string") | ||
.setFieldPath("test")); | ||
} | ||
|
||
return new SchemaMetadata() | ||
.setPlatform(testDatasetUrn.getPlatformEntity()) | ||
.setFields(new SchemaFieldArray(fields)); | ||
} | ||
|
||
private EditableSchemaMetadata getMockEditableSchemaMetadataAspect(boolean duplicateFields) { | ||
|
||
List<EditableSchemaFieldInfo> fields = new ArrayList<>(); | ||
fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); | ||
|
||
if (duplicateFields) { | ||
fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); | ||
} | ||
|
||
return new EditableSchemaMetadata() | ||
.setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(fields)); | ||
} | ||
} |
Oops, something went wrong.