From b1af573476819887ac9eb5e4d3c9deab278db743 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Thu, 15 Aug 2024 13:54:03 -0500 Subject: [PATCH 1/2] fix(ingest): invalid urn should not fail full batch of changes --- .../entity/ebean/batch/AspectsBatchImpl.java | 20 ++++++---- .../ebean/batch/AspectsBatchImplTest.java | 39 +++++++++++++++++-- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 0808c29e8ea89..7371897c25585 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -170,16 +170,20 @@ public AspectsBatchImplBuilder mcps( mcps.stream() .map( mcp -> { - if (mcp.getChangeType().equals(ChangeType.PATCH)) { - return PatchItemImpl.PatchItemImplBuilder.build( - mcp, - auditStamp, - retrieverContext.getAspectRetriever().getEntityRegistry()); - } else { - return ChangeItemImpl.ChangeItemImplBuilder.build( - mcp, auditStamp, retrieverContext.getAspectRetriever()); + try { + if (mcp.getChangeType().equals(ChangeType.PATCH)) { + return PatchItemImpl.PatchItemImplBuilder.build(mcp, auditStamp, + retrieverContext.getAspectRetriever().getEntityRegistry()); + } else { + return ChangeItemImpl.ChangeItemImplBuilder.build(mcp, auditStamp, + retrieverContext.getAspectRetriever()); + } + } catch (IllegalArgumentException e) { + log.error("Invalid proposal, skipping and proceeding with batch: " + mcp, e); + return null; } }) + .filter(Objects::nonNull) .collect(Collectors.toList())); return this; } diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java index d2e7243d04560..54ebdefce544f 100644 --- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java @@ -1,22 +1,26 @@ package com.linkedin.metadata.entity.ebean.batch; -import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; -import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; -import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTIES_ASPECT_NAME; +import static com.linkedin.metadata.Constants.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import com.google.common.collect.ImmutableList; +import com.linkedin.common.FabricType; import com.linkedin.common.Status; +import com.linkedin.common.urn.DataPlatformUrn; +import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.ByteString; import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; +import com.linkedin.dataset.DatasetProperties; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.GraphRetriever; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.patch.GenericJsonPatch; import com.linkedin.metadata.aspect.patch.PatchOperationType; +import com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; import com.linkedin.metadata.entity.SearchRetriever; @@ -297,6 +301,35 @@ public void toUpsertBatchItemsProposedItemTest() { "Mutation to status aspect"); } + @Test + public void singleInvalidDoesntBreakBatch() { + MetadataChangeProposal proposal1 = + new DatasetPropertiesPatchBuilder() + .urn(new DatasetUrn(new DataPlatformUrn("platform"), "name", FabricType.PROD)) + .setDescription("something") + .setName("name") + .addCustomProperty("prop1", "propVal1") + .addCustomProperty("prop2", "propVal2") + .build(); + MetadataChangeProposal proposal2 = + new MetadataChangeProposal() + .setEntityType(DATASET_ENTITY_NAME) + .setAspectName(DATASET_PROPERTIES_ASPECT_NAME) + .setAspect(GenericRecordUtils.serializeAspect(new DatasetProperties())) + .setChangeType(ChangeType.UPSERT); + + + AspectsBatchImpl testBatch = + AspectsBatchImpl.builder() + .mcps(ImmutableList.of(proposal1, proposal2), AuditStampUtils.createDefaultAuditStamp(), retrieverContext) + .retrieverContext(retrieverContext).build(); + + assertEquals( + testBatch.toUpsertBatchItems(Map.of()).getSecond().size(), + 1, + "Expected 1 valid mcp to be passed through."); + } + /** Converts unsupported to status aspect */ @Getter @Setter From 69cb80ef2154c7c343eeabecf13f37cd213f8986 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Thu, 15 Aug 2024 14:38:56 -0500 Subject: [PATCH 2/2] spotless --- .../metadata/entity/ebean/batch/AspectsBatchImpl.java | 8 +++++--- .../entity/ebean/batch/AspectsBatchImplTest.java | 9 ++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 7371897c25585..3ec090a3db3a4 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -172,11 +172,13 @@ public AspectsBatchImplBuilder mcps( mcp -> { try { if (mcp.getChangeType().equals(ChangeType.PATCH)) { - return PatchItemImpl.PatchItemImplBuilder.build(mcp, auditStamp, + return PatchItemImpl.PatchItemImplBuilder.build( + mcp, + auditStamp, retrieverContext.getAspectRetriever().getEntityRegistry()); } else { - return ChangeItemImpl.ChangeItemImplBuilder.build(mcp, auditStamp, - retrieverContext.getAspectRetriever()); + return ChangeItemImpl.ChangeItemImplBuilder.build( + mcp, auditStamp, retrieverContext.getAspectRetriever()); } } catch (IllegalArgumentException e) { log.error("Invalid proposal, skipping and proceeding with batch: " + mcp, e); diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java index 54ebdefce544f..31dd868b4cb4a 100644 --- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java @@ -318,11 +318,14 @@ public void singleInvalidDoesntBreakBatch() { .setAspect(GenericRecordUtils.serializeAspect(new DatasetProperties())) .setChangeType(ChangeType.UPSERT); - AspectsBatchImpl testBatch = AspectsBatchImpl.builder() - .mcps(ImmutableList.of(proposal1, proposal2), AuditStampUtils.createDefaultAuditStamp(), retrieverContext) - .retrieverContext(retrieverContext).build(); + .mcps( + ImmutableList.of(proposal1, proposal2), + AuditStampUtils.createDefaultAuditStamp(), + retrieverContext) + .retrieverContext(retrieverContext) + .build(); assertEquals( testBatch.toUpsertBatchItems(Map.of()).getSecond().size(),