From bb4bc7050704fb0118234f577b21bc97f022982c Mon Sep 17 00:00:00 2001 From: Ryan Holstien Date: Tue, 5 Mar 2024 16:01:51 -0600 Subject: [PATCH 1/2] feat(config): add configuration to reprocess UI sourced events --- .../kafka/hook/UpdateIndicesHook.java | 21 +- .../kafka/hook/UpdateIndicesHookTest.java | 241 ++++++++++-------- .../src/main/resources/application.yml | 1 + 3 files changed, 145 insertions(+), 118 deletions(-) diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index 125f6c0cf443b..fa6a8d987fa21 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -29,32 +29,37 @@ }) public class UpdateIndicesHook implements MetadataChangeLogHook { - protected final UpdateIndicesService _updateIndicesService; - private final boolean _isEnabled; + protected final UpdateIndicesService updateIndicesService; + private final boolean isEnabled; + private final boolean reprocessUIEvents; public UpdateIndicesHook( UpdateIndicesService updateIndicesService, - @Nonnull @Value("${updateIndices.enabled:true}") Boolean isEnabled) { - _updateIndicesService = updateIndicesService; - _isEnabled = isEnabled; + @Nonnull @Value("${updateIndices.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${featureFlags.preProcessHooks.reprocessEnabled") + Boolean reprocessUIEvents) { + this.updateIndicesService = updateIndicesService; + this.isEnabled = isEnabled; + this.reprocessUIEvents = reprocessUIEvents; } @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override public void invoke(@Nonnull final MetadataChangeLog event) { if (event.getSystemMetadata() != null) { if (event.getSystemMetadata().getProperties() != null) { - if (UI_SOURCE.equals(event.getSystemMetadata().getProperties().get(APP_SOURCE))) { + if (UI_SOURCE.equals(event.getSystemMetadata().getProperties().get(APP_SOURCE)) + && !reprocessUIEvents) { // If coming from the UI, we pre-process the Update Indices hook as a fast path to avoid // Kafka lag return; } } } - _updateIndicesService.handleChangeEvent(event); + updateIndicesService.handleChangeEvent(event); } } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java index 89ad6105be9cb..cae67108b4ca0 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java @@ -84,54 +84,56 @@ public class UpdateIndicesHookTest { static final long LAST_OBSERVED_1 = 123L; static final long LAST_OBSERVED_2 = 456L; static final long LAST_OBSERVED_3 = 789L; - private UpdateIndicesHook _updateIndicesHook; - private GraphService _mockGraphService; - private EntitySearchService _mockEntitySearchService; - private TimeseriesAspectService _mockTimeseriesAspectService; - private SystemMetadataService _mockSystemMetadataService; - private SearchDocumentTransformer _searchDocumentTransformer; - private DataHubUpgradeKafkaListener _mockDataHubUpgradeKafkaListener; - private ConfigurationProvider _mockConfigurationProvider; - private EntityIndexBuilders _mockEntityIndexBuilders; - private Urn _actorUrn; - private UpdateIndicesService _updateIndicesService; + private UpdateIndicesHook updateIndicesHook; + private GraphService mockGraphService; + private EntitySearchService mockEntitySearchService; + private TimeseriesAspectService mockTimeseriesAspectService; + private SystemMetadataService mockSystemMetadataService; + private SearchDocumentTransformer searchDocumentTransformer; + private DataHubUpgradeKafkaListener mockDataHubUpgradeKafkaListener; + private ConfigurationProvider mockConfigurationProvider; + private EntityIndexBuilders mockEntityIndexBuilders; + private Urn actorUrn; + private UpdateIndicesService updateIndicesService; + private UpdateIndicesHook reprocessUIHook; @Value("${elasticsearch.index.maxArrayLength}") private int maxArrayLength; @BeforeMethod public void setupTest() { - _actorUrn = UrnUtils.getUrn(TEST_ACTOR_URN); - _mockGraphService = Mockito.mock(ElasticSearchGraphService.class); - _mockEntitySearchService = Mockito.mock(EntitySearchService.class); - _mockTimeseriesAspectService = Mockito.mock(TimeseriesAspectService.class); - _mockSystemMetadataService = Mockito.mock(SystemMetadataService.class); - _searchDocumentTransformer = new SearchDocumentTransformer(1000, 1000, 1000); - _mockDataHubUpgradeKafkaListener = Mockito.mock(DataHubUpgradeKafkaListener.class); - _mockConfigurationProvider = Mockito.mock(ConfigurationProvider.class); - _mockEntityIndexBuilders = Mockito.mock(EntityIndexBuilders.class); + actorUrn = UrnUtils.getUrn(TEST_ACTOR_URN); + mockGraphService = Mockito.mock(ElasticSearchGraphService.class); + mockEntitySearchService = Mockito.mock(EntitySearchService.class); + mockTimeseriesAspectService = Mockito.mock(TimeseriesAspectService.class); + mockSystemMetadataService = Mockito.mock(SystemMetadataService.class); + searchDocumentTransformer = new SearchDocumentTransformer(1000, 1000, 1000); + mockDataHubUpgradeKafkaListener = Mockito.mock(DataHubUpgradeKafkaListener.class); + mockConfigurationProvider = Mockito.mock(ConfigurationProvider.class); + mockEntityIndexBuilders = Mockito.mock(EntityIndexBuilders.class); ElasticSearchConfiguration elasticSearchConfiguration = new ElasticSearchConfiguration(); SystemUpdateConfiguration systemUpdateConfiguration = new SystemUpdateConfiguration(); systemUpdateConfiguration.setWaitForSystemUpdate(false); - Mockito.when(_mockConfigurationProvider.getElasticSearch()) + Mockito.when(mockConfigurationProvider.getElasticSearch()) .thenReturn(elasticSearchConfiguration); - _updateIndicesService = + updateIndicesService = new UpdateIndicesService( - _mockGraphService, - _mockEntitySearchService, - _mockTimeseriesAspectService, - _mockSystemMetadataService, - _searchDocumentTransformer, - _mockEntityIndexBuilders); - _updateIndicesService.initializeAspectRetriever( + mockGraphService, + mockEntitySearchService, + mockTimeseriesAspectService, + mockSystemMetadataService, + searchDocumentTransformer, + mockEntityIndexBuilders); + updateIndicesService.initializeAspectRetriever( EntityClientAspectRetriever.builder().entityRegistry(ENTITY_REGISTRY).build()); - _updateIndicesHook = new UpdateIndicesHook(_updateIndicesService, true); + updateIndicesHook = new UpdateIndicesHook(updateIndicesService, true, false); + reprocessUIHook = new UpdateIndicesHook(updateIndicesService, true, true); } @Test public void testFineGrainedLineageEdgesAreAdded() throws Exception { - _updateIndicesService.setGraphDiffMode(false); + updateIndicesService.setGraphDiffMode(false); Urn upstreamUrn = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)"); @@ -139,11 +141,11 @@ public void testFineGrainedLineageEdgesAreAdded() throws Exception { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)"); MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn); - _updateIndicesHook.invoke(event); + updateIndicesHook.invoke(event); Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null, null); - Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); - Mockito.verify(_mockGraphService, Mockito.times(1)) + Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); + Mockito.verify(mockGraphService, Mockito.times(1)) .removeEdgesFromNode( Mockito.eq(downstreamUrn), Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))), @@ -155,7 +157,7 @@ public void testFineGrainedLineageEdgesAreAdded() throws Exception { @Test public void testFineGrainedLineageEdgesAreAddedRestate() throws Exception { - _updateIndicesService.setGraphDiffMode(false); + updateIndicesService.setGraphDiffMode(false); Urn upstreamUrn = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)"); @@ -164,11 +166,11 @@ public void testFineGrainedLineageEdgesAreAddedRestate() throws Exception { "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)"); MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn, ChangeType.RESTATE); - _updateIndicesHook.invoke(event); + updateIndicesHook.invoke(event); Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null, null); - Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); - Mockito.verify(_mockGraphService, Mockito.times(1)) + Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); + Mockito.verify(mockGraphService, Mockito.times(1)) .removeEdgesFromNode( Mockito.eq(downstreamUrn), Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))), @@ -176,7 +178,7 @@ public void testFineGrainedLineageEdgesAreAddedRestate() throws Exception { newRelationshipFilter( new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING))); - Mockito.verify(_mockEntitySearchService, Mockito.times(1)) + Mockito.verify(mockEntitySearchService, Mockito.times(1)) .upsertDocument( Mockito.eq(DATASET_ENTITY_NAME), Mockito.any(), @@ -194,27 +196,27 @@ public void testInputFieldsEdgesAreAdded() throws Exception { String downstreamFieldPath = "users.count"; MetadataChangeLog event = createInputFieldsMCL(upstreamUrn, downstreamFieldPath); EntityRegistry mockEntityRegistry = createMockEntityRegistry(); - _updateIndicesService = + updateIndicesService = new UpdateIndicesService( - _mockGraphService, - _mockEntitySearchService, - _mockTimeseriesAspectService, - _mockSystemMetadataService, - _searchDocumentTransformer, - _mockEntityIndexBuilders); - _updateIndicesService.initializeAspectRetriever( + mockGraphService, + mockEntitySearchService, + mockTimeseriesAspectService, + mockSystemMetadataService, + searchDocumentTransformer, + mockEntityIndexBuilders); + updateIndicesService.initializeAspectRetriever( EntityClientAspectRetriever.builder().entityRegistry(mockEntityRegistry).build()); - _updateIndicesHook = new UpdateIndicesHook(_updateIndicesService, true); + updateIndicesHook = new UpdateIndicesHook(updateIndicesService, true, false); - _updateIndicesHook.invoke(event); + updateIndicesHook.invoke(event); Urn downstreamUrn = UrnUtils.getUrn( String.format("urn:li:schemaField:(%s,%s)", TEST_CHART_URN, downstreamFieldPath)); Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null, null); - Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); - Mockito.verify(_mockGraphService, Mockito.times(1)) + Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); + Mockito.verify(mockGraphService, Mockito.times(1)) .removeEdgesFromNode( Mockito.eq(downstreamUrn), Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))), @@ -227,19 +229,19 @@ public void testInputFieldsEdgesAreAdded() throws Exception { @Test public void testMCLProcessExhaustive() throws URISyntaxException { - _updateIndicesService.setGraphDiffMode(true); - _updateIndicesService.setSearchDiffMode(true); + updateIndicesService.setGraphDiffMode(true); + updateIndicesService.setSearchDiffMode(true); /* * newLineage */ MetadataChangeLog changeLog = createBaseChangeLog(); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // One new edge added - Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.any()); // Update document - Mockito.verify(_mockEntitySearchService, Mockito.times(1)) + Mockito.verify(mockEntitySearchService, Mockito.times(1)) .upsertDocument( Mockito.eq(DATASET_ENTITY_NAME), Mockito.any(), @@ -248,161 +250,161 @@ public void testMCLProcessExhaustive() throws URISyntaxException { /* * restateNewLineage */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setSystemMetadata(changeLog); changeLog = setPreviousData(setToRestate(changeLog), changeLog); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // No edges added - Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); // Timestamp updated - Mockito.verify(_mockGraphService, Mockito.times(1)).upsertEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(1)).upsertEdge(Mockito.any()); // No document change - Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + Mockito.verify(mockEntitySearchService, Mockito.times(0)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); /* * addLineage */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setPreviousData(setToUpsert(changeLog), changeLog); UpstreamLineage currentLineage = addLineageEdge(createBaseLineageAspect()); changeLog = modifyAspect(changeLog, currentLineage); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // New edge added - Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.any()); // Update timestamp of old edge - Mockito.verify(_mockGraphService, Mockito.times(1)).upsertEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(1)).upsertEdge(Mockito.any()); // Document update for new upstream - Mockito.verify(_mockEntitySearchService, Mockito.times(1)) + Mockito.verify(mockEntitySearchService, Mockito.times(1)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); /* * restateAddLineage */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setPreviousData(setToRestate(changeLog), changeLog); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // No new edges - Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); // Update timestamps of old edges - Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); // No document update - Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + Mockito.verify(mockEntitySearchService, Mockito.times(0)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); /* * noOpUpsert */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setPreviousData(setToUpsert(changeLog), changeLog); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // No new edges - Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); // Update timestamps of old edges - Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); // No document update - Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + Mockito.verify(mockEntitySearchService, Mockito.times(0)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); /* * restateNoOp */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setPreviousData(setToRestate(changeLog), changeLog); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // No new edges - Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); // Update timestamps of old edges - Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); // No document update - Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + Mockito.verify(mockEntitySearchService, Mockito.times(0)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); /* * systemMetadataChange */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setPreviousData(setToUpsert(changeLog), changeLog); changeLog = modifySystemMetadata(changeLog); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // No new edges - Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); // Update timestamps of old edges - Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); // No document update - Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + Mockito.verify(mockEntitySearchService, Mockito.times(0)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); /* * restateSystemMetadataChange */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setPreviousData(setToRestate(changeLog), changeLog); changeLog = modifySystemMetadata2(changeLog); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // No new edges - Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); // Update timestamps of old edges - Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); // No document update - Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + Mockito.verify(mockEntitySearchService, Mockito.times(0)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); /* * modifyNonSearchableField */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setPreviousData(setToUpsert(changeLog), changeLog); currentLineage = modifyNonSearchableField(currentLineage); changeLog = modifyAspect(changeLog, currentLineage); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // No new edges - Mockito.verify(_mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(0)).addEdge(Mockito.any()); // Update timestamps of old edges - Mockito.verify(_mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(2)).upsertEdge(Mockito.any()); // No document update - Mockito.verify(_mockEntitySearchService, Mockito.times(0)) + Mockito.verify(mockEntitySearchService, Mockito.times(0)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); /* * force reindexing */ - Mockito.clearInvocations(_mockGraphService, _mockEntitySearchService); + Mockito.clearInvocations(mockGraphService, mockEntitySearchService); changeLog = setPreviousDataToEmpty(setToRestate(changeLog)); changeLog = setSystemMetadataWithForceIndexing(changeLog); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); // Forced removal of all edges - Mockito.verify(_mockGraphService, Mockito.times(1)) + Mockito.verify(mockGraphService, Mockito.times(1)) .removeEdgesFromNode(Mockito.any(), Mockito.any(), Mockito.any()); // Forced add of edges - Mockito.verify(_mockGraphService, Mockito.times(2)).addEdge(Mockito.any()); + Mockito.verify(mockGraphService, Mockito.times(2)).addEdge(Mockito.any()); // Forced document update - Mockito.verify(_mockEntitySearchService, Mockito.times(1)) + Mockito.verify(mockEntitySearchService, Mockito.times(1)) .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); } @Test public void testMCLUIPreProcessed() throws Exception { - _updateIndicesService.setGraphDiffMode(true); - _updateIndicesService.setSearchDiffMode(true); + updateIndicesService.setGraphDiffMode(true); + updateIndicesService.setSearchDiffMode(true); Urn upstreamUrn = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)"); @@ -412,12 +414,31 @@ public void testMCLUIPreProcessed() throws Exception { MetadataChangeLog changeLog = createUpstreamLineageMCLUIPreProcessed(upstreamUrn, downstreamUrn, ChangeType.UPSERT); - _updateIndicesHook.invoke(changeLog); + updateIndicesHook.invoke(changeLog); Mockito.verifyNoInteractions( - _mockEntitySearchService, - _mockGraphService, - _mockTimeseriesAspectService, - _mockSystemMetadataService); + mockEntitySearchService, + mockGraphService, + mockTimeseriesAspectService, + mockSystemMetadataService); + } + + @Test + public void testMCLUIPreProcessedReprocess() throws Exception { + updateIndicesService.setGraphDiffMode(true); + updateIndicesService.setSearchDiffMode(true); + Urn upstreamUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info2)"); + Urn downstreamUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo2)"); + + MetadataChangeLog changeLog = + createUpstreamLineageMCLUIPreProcessed(upstreamUrn, downstreamUrn, ChangeType.UPSERT); + reprocessUIHook.invoke(changeLog); + Mockito.verify(mockGraphService, Mockito.times(3)).addEdge(Mockito.any()); + Mockito.verify(mockEntitySearchService, Mockito.times(1)) + .upsertDocument(Mockito.any(), Mockito.any(), Mockito.any()); } private EntityRegistry createMockEntityRegistry() { @@ -494,7 +515,7 @@ private MetadataChangeLog createUpstreamLineageMCL( event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage)); event.setEntityUrn(Urn.createFromString(TEST_DATASET_URN)); event.setEntityType(DATASET_ENTITY_NAME); - event.setCreated(new AuditStamp().setActor(_actorUrn).setTime(EVENT_TIME)); + event.setCreated(new AuditStamp().setActor(actorUrn).setTime(EVENT_TIME)); return event; } @@ -531,7 +552,7 @@ private MetadataChangeLog createInputFieldsMCL(Urn upstreamUrn, String downstrea event.setAspect(GenericRecordUtils.serializeAspect(inputFields)); event.setEntityUrn(Urn.createFromString(TEST_CHART_URN)); event.setEntityType(Constants.CHART_ENTITY_NAME); - event.setCreated(new AuditStamp().setActor(_actorUrn).setTime(EVENT_TIME)); + event.setCreated(new AuditStamp().setActor(actorUrn).setTime(EVENT_TIME)); return event; } } diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 9e82430378827..689abafcaa1b0 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -360,6 +360,7 @@ featureFlags: platformBrowseV2: ${PLATFORM_BROWSE_V2:false} # Enables the platform browse experience, instead of the entity-oriented browse default. preProcessHooks: uiEnabled: ${PRE_PROCESS_HOOKS_UI_ENABLED:true} # Circumvents Kafka for processing index updates for UI changes sourced from GraphQL to avoid processing delays + reprocessEnabled: ${PRE_PROCESS_HOOKS_UI_ENABLED:false} # If enabled, will reprocess UI sourced events asynchronously when reading from Kafka after pre-processing them synchronously showAcrylInfo: ${SHOW_ACRYL_INFO:false} # Show different CTAs within DataHub around moving to Managed DataHub. Set to true for the demo site. nestedDomainsEnabled: ${NESTED_DOMAINS_ENABLED:true} # Enables the nested Domains feature that allows users to have sub-Domains. If this is off, Domains appear "flat" again schemaFieldEntityFetchEnabled: ${SCHEMA_FIELD_ENTITY_FETCH_ENABLED:true} # Enables fetching for schema field entities from the database when we hydrate them on schema fields From f967192437ebefbce2638ddfe7b4aaec4b1d2eeb Mon Sep 17 00:00:00 2001 From: Ryan Holstien Date: Tue, 5 Mar 2024 16:34:49 -0600 Subject: [PATCH 2/2] typo --- .../com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index fa6a8d987fa21..6b27858421603 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -36,7 +36,7 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { public UpdateIndicesHook( UpdateIndicesService updateIndicesService, @Nonnull @Value("${updateIndices.enabled:true}") Boolean isEnabled, - @Nonnull @Value("${featureFlags.preProcessHooks.reprocessEnabled") + @Nonnull @Value("${featureFlags.preProcessHooks.reprocessEnabled:false}") Boolean reprocessUIEvents) { this.updateIndicesService = updateIndicesService; this.isEnabled = isEnabled;