From f1187c0e10b35e6feba006de6a8ba7ff883f5b93 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Fri, 9 Aug 2024 09:59:02 -0500 Subject: [PATCH 1/2] feat(mcl-processor): Update mcl processor hooks optionally run with different consumer groups --- .../kafka/MaeConsumerApplication.java | 7 +- .../metadata/kafka/MCLKafkaListener.java | 103 +++++++++++++ .../kafka/MCLKafkaListenerRegistrar.java | 120 +++++++++++++++ .../kafka/MetadataChangeLogProcessor.java | 140 ------------------ .../kafka/hook/MetadataChangeLogHook.java | 8 + .../kafka/hook/UpdateIndicesHook.java | 17 ++- .../event/EntityChangeEventGeneratorHook.java | 34 +++-- .../kafka/hook/form/FormAssignmentHook.java | 26 +++- .../hook/incident/IncidentsSummaryHook.java | 45 ++++-- .../ingestion/IngestionSchedulerHook.java | 30 ++-- .../hook/siblings/SiblingAssociationHook.java | 20 ++- .../kafka/hook/spring/MCLGMSSpringTest.java | 16 +- .../kafka/hook/spring/MCLMAESpringTest.java | 16 +- .../MCLSpringCommonTestConfiguration.java | 9 +- .../datahub/event/PlatformEventProcessor.java | 9 +- .../src/main/resources/application.yaml | 10 ++ .../kafka/KafkaEventConsumerFactory.java | 2 +- .../linkedin/gms/CommonApplicationConfig.java | 5 +- 18 files changed, 400 insertions(+), 217 deletions(-) create mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java create mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java delete mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java index f6533a6ac1d8a..617bc8e0b7303 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java @@ -18,8 +18,6 @@ "com.linkedin.metadata.service", "com.datahub.event", "com.linkedin.gms.factory.kafka", - "com.linkedin.gms.factory.kafka.common", - "com.linkedin.gms.factory.kafka.schemaregistry", "com.linkedin.metadata.boot.kafka", "com.linkedin.metadata.kafka", "com.linkedin.metadata.dao.producer", @@ -34,7 +32,10 @@ "com.linkedin.gms.factory.context", "com.linkedin.gms.factory.timeseries", "com.linkedin.gms.factory.assertion", - "com.linkedin.gms.factory.plugins" + "com.linkedin.gms.factory.plugins", + "com.linkedin.gms.factory.change", + "com.datahub.event.hook", + "com.linkedin.gms.factory.notifications" }, excludeFilters = { @ComponentScan.Filter( diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java new file mode 100644 index 0000000000000..70b452722abc7 --- /dev/null +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java @@ -0,0 +1,103 @@ +package com.linkedin.metadata.kafka; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.mxe.MetadataChangeLog; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +@Slf4j +public class MCLKafkaListener { + private static final Histogram kafkaLagStats = + MetricUtils.get() + .histogram( + MetricRegistry.name( + "com.linkedin.metadata.kafka.MetadataChangeLogProcessor", "kafkaLag")); + + private final String consumerGroupId; + private final List hooks; + + public MCLKafkaListener( + OperationContext systemOperationContext, + String consumerGroup, + List hooks) { + this.consumerGroupId = consumerGroup; + this.hooks = hooks; + this.hooks.forEach(hook -> hook.init(systemOperationContext)); + + log.info( + "Enabled MCL Hooks - Group: {} Hooks: {}", + consumerGroup, + hooks.stream().map(hook -> hook.getClass().getSimpleName()).collect(Collectors.toList())); + } + + public void consume(final ConsumerRecord consumerRecord) { + try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { + kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); + final GenericRecord record = consumerRecord.value(); + log.debug( + "Got MCL event consumer: {} key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerGroupId, + consumerRecord.key(), + consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); + MetricUtils.counter(this.getClass(), consumerGroupId + "_received_mcl_count").inc(); + + MetadataChangeLog event; + try { + event = EventUtils.avroToPegasusMCL(record); + } catch (Exception e) { + MetricUtils.counter( + this.getClass(), consumerGroupId + "_avro_to_pegasus_conversion_failure") + .inc(); + log.error("Error deserializing message due to: ", e); + log.error("Message: {}", record.toString()); + return; + } + + log.info( + "Invoking MCL hooks for consumer: {} urn: {}, aspect name: {}, entity type: {}, change type: {}", + consumerGroupId, + event.getEntityUrn(), + event.hasAspectName() ? event.getAspectName() : null, + event.hasEntityType() ? event.getEntityType() : null, + event.hasChangeType() ? event.getChangeType() : null); + + // Here - plug in additional "custom processor hooks" + for (MetadataChangeLogHook hook : this.hooks) { + log.info( + "Invoking MCL hook {} for urn: {}", + hook.getClass().getSimpleName(), + event.getEntityUrn()); + try (Timer.Context ignored = + MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency") + .time()) { + hook.invoke(event); + } catch (Exception e) { + // Just skip this hook and continue. - Note that this represents "at most once"// + // processing. + MetricUtils.counter(this.getClass(), hook.getClass().getSimpleName() + "_failure").inc(); + log.error( + "Failed to execute MCL hook with name {}", hook.getClass().getCanonicalName(), e); + } + } + // TODO: Manually commit kafka offsets after full processing. + MetricUtils.counter(this.getClass(), consumerGroupId + "_consumed_mcl_count").inc(); + log.info( + "Successfully completed MCL hooks for consumer: {} urn: {}", + consumerGroupId, + event.getEntityUrn()); + } + } +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java new file mode 100644 index 0000000000000..fb2880f617d30 --- /dev/null +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java @@ -0,0 +1,120 @@ +package com.linkedin.metadata.kafka; + +import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; +import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; +import com.linkedin.mxe.Topics; +import io.datahubproject.metadata.context.OperationContext; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Conditional; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpoint; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.config.MethodKafkaListenerEndpoint; +import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; +import org.springframework.stereotype.Component; + +@Slf4j +@EnableKafka +@Component +@Conditional(MetadataChangeLogProcessorCondition.class) +public class MCLKafkaListenerRegistrar implements InitializingBean { + + @Autowired + @Qualifier("systemOperationContext") + private OperationContext systemOperationContext; + + @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Autowired + @Qualifier("kafkaEventConsumer") + private KafkaListenerContainerFactory kafkaListenerContainerFactory; + + @Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}") + private String consumerGroupBase; + + @Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}") + private String mclVersionedTopicName; + + @Value( + "${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}") + private String mclTimeseriesTopicName; + + @Autowired private List metadataChangeLogHooks; + + @Override + public void afterPropertiesSet() { + Map> hookGroups = + getMetadataChangeLogHooks().stream() + .collect(Collectors.groupingBy(MetadataChangeLogHook::getConsumerGroupSuffix)); + + log.info( + "MetadataChangeLogProcessor Consumer Groups: {}", + hookGroups.keySet().stream().map(this::buildConsumerGroupName).collect(Collectors.toSet())); + + hookGroups.forEach( + (key, hooks) -> { + KafkaListenerEndpoint kafkaListenerEndpoint = + createListenerEndpoint( + buildConsumerGroupName(key), + List.of(mclVersionedTopicName, mclTimeseriesTopicName), + hooks); + registerMCLKafkaListener(kafkaListenerEndpoint, true); + }); + } + + public List getMetadataChangeLogHooks() { + return metadataChangeLogHooks.stream() + .filter(MetadataChangeLogHook::isEnabled) + .sorted(Comparator.comparing(MetadataChangeLogHook::executionOrder)) + .toList(); + } + + @SneakyThrows + public void registerMCLKafkaListener( + KafkaListenerEndpoint kafkaListenerEndpoint, boolean startImmediately) { + kafkaListenerEndpointRegistry.registerListenerContainer( + kafkaListenerEndpoint, kafkaListenerContainerFactory, startImmediately); + } + + private KafkaListenerEndpoint createListenerEndpoint( + String consumerGroupId, List topics, List hooks) { + MethodKafkaListenerEndpoint kafkaListenerEndpoint = + new MethodKafkaListenerEndpoint<>(); + kafkaListenerEndpoint.setId(consumerGroupId); + kafkaListenerEndpoint.setGroupId(consumerGroupId); + kafkaListenerEndpoint.setAutoStartup(true); + kafkaListenerEndpoint.setTopics(topics.toArray(new String[topics.size()])); + kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); + kafkaListenerEndpoint.setBean( + new MCLKafkaListener(systemOperationContext, consumerGroupId, hooks)); + try { + kafkaListenerEndpoint.setMethod( + MCLKafkaListener.class.getMethod("consume", ConsumerRecord.class)); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + + return kafkaListenerEndpoint; + } + + private String buildConsumerGroupName(@Nonnull String suffix) { + if (suffix.isEmpty()) { + return consumerGroupBase; + } else { + return String.join("-", consumerGroupBase, suffix); + } + } +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java deleted file mode 100644 index 6112ad798d73d..0000000000000 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.linkedin.metadata.kafka; - -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; -import com.linkedin.metadata.EventUtils; -import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; -import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; -import com.linkedin.metadata.kafka.hook.UpdateIndicesHook; -import com.linkedin.metadata.kafka.hook.event.EntityChangeEventGeneratorHook; -import com.linkedin.metadata.kafka.hook.form.FormAssignmentHook; -import com.linkedin.metadata.kafka.hook.incident.IncidentsSummaryHook; -import com.linkedin.metadata.kafka.hook.ingestion.IngestionSchedulerHook; -import com.linkedin.metadata.kafka.hook.siblings.SiblingAssociationHook; -import com.linkedin.metadata.utils.metrics.MetricUtils; -import com.linkedin.mxe.MetadataChangeLog; -import com.linkedin.mxe.Topics; -import io.datahubproject.metadata.context.OperationContext; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -@Conditional(MetadataChangeLogProcessorCondition.class) -@Import({ - UpdateIndicesHook.class, - IngestionSchedulerHook.class, - EntityChangeEventGeneratorHook.class, - KafkaEventConsumerFactory.class, - SiblingAssociationHook.class, - FormAssignmentHook.class, - IncidentsSummaryHook.class, -}) -@EnableKafka -public class MetadataChangeLogProcessor { - - @Getter private final List hooks; - private final Histogram kafkaLagStats = - MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); - - @Autowired - public MetadataChangeLogProcessor( - @Qualifier("systemOperationContext") OperationContext systemOperationContext, - List metadataChangeLogHooks) { - this.hooks = - metadataChangeLogHooks.stream() - .filter(MetadataChangeLogHook::isEnabled) - .sorted(Comparator.comparing(MetadataChangeLogHook::executionOrder)) - .collect(Collectors.toList()); - log.info( - "Enabled hooks: {}", - this.hooks.stream() - .map(hook -> hook.getClass().getSimpleName()) - .collect(Collectors.toList())); - this.hooks.forEach(hook -> hook.init(systemOperationContext)); - } - - @KafkaListener( - id = "${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}", - topics = { - "${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}", - "${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}" - }, - containerFactory = "kafkaEventConsumer") - public void consume(final ConsumerRecord consumerRecord) { - try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { - kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); - final GenericRecord record = consumerRecord.value(); - log.info( - "Got MCL event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", - consumerRecord.key(), - consumerRecord.topic(), - consumerRecord.partition(), - consumerRecord.offset(), - consumerRecord.serializedValueSize(), - consumerRecord.timestamp()); - MetricUtils.counter(this.getClass(), "received_mcl_count").inc(); - - MetadataChangeLog event; - try { - event = EventUtils.avroToPegasusMCL(record); - log.debug( - "Successfully converted Avro MCL to Pegasus MCL. urn: {}, key: {}", - event.getEntityUrn(), - event.getEntityKeyAspect()); - } catch (Exception e) { - MetricUtils.counter(this.getClass(), "avro_to_pegasus_conversion_failure").inc(); - log.error("Error deserializing message due to: ", e); - log.error("Message: {}", record.toString()); - return; - } - - log.info( - "Invoking MCL hooks for urn: {}, aspect name: {}, entity type: {}, change type: {}", - event.getEntityUrn(), - event.hasAspectName() ? event.getAspectName() : null, - event.hasEntityType() ? event.getEntityType() : null, - event.hasChangeType() ? event.getChangeType() : null); - - // Here - plug in additional "custom processor hooks" - for (MetadataChangeLogHook hook : this.hooks) { - if (!hook.isEnabled()) { - log.info(String.format("Skipping disabled hook %s", hook.getClass())); - continue; - } - log.info( - "Invoking MCL hook {} for urn: {}", - hook.getClass().getSimpleName(), - event.getEntityUrn()); - try (Timer.Context ignored = - MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency") - .time()) { - hook.invoke(event); - } catch (Exception e) { - // Just skip this hook and continue. - Note that this represents "at most once"// - // processing. - MetricUtils.counter(this.getClass(), hook.getClass().getSimpleName() + "_failure").inc(); - log.error( - "Failed to execute MCL hook with name {}", hook.getClass().getCanonicalName(), e); - } - } - // TODO: Manually commit kafka offsets after full processing. - MetricUtils.counter(this.getClass(), "consumed_mcl_count").inc(); - log.info("Successfully completed MCL hooks for urn: {}", event.getEntityUrn()); - } - } -} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/MetadataChangeLogHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/MetadataChangeLogHook.java index 145d1ded724cc..06a184c9f89f9 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/MetadataChangeLogHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/MetadataChangeLogHook.java @@ -18,6 +18,14 @@ default MetadataChangeLogHook init(@Nonnull OperationContext systemOperationCont return this; } + /** + * Suffix for the consumer group + * + * @return suffix + */ + @Nonnull + String getConsumerGroupSuffix(); + /** * Return whether the hook is enabled or not. If not enabled, the below invoke method is not * triggered diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index a0e304b26ea60..bd804b0f4424c 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -2,6 +2,7 @@ import static com.linkedin.metadata.Constants.*; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.gms.factory.common.GraphServiceFactory; import com.linkedin.gms.factory.common.SystemMetadataServiceFactory; import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; @@ -12,7 +13,9 @@ import com.linkedin.mxe.MetadataChangeLog; import io.datahubproject.metadata.context.OperationContext; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Import; import org.springframework.stereotype.Component; @@ -34,15 +37,27 @@ public class UpdateIndicesHook implements MetadataChangeLogHook { private final boolean isEnabled; private final boolean reprocessUIEvents; private OperationContext systemOperationContext; + @Getter private final String consumerGroupSuffix; + @Autowired public UpdateIndicesHook( UpdateIndicesService updateIndicesService, @Nonnull @Value("${updateIndices.enabled:true}") Boolean isEnabled, @Nonnull @Value("${featureFlags.preProcessHooks.reprocessEnabled:false}") - Boolean reprocessUIEvents) { + Boolean reprocessUIEvents, + @Nonnull @Value("${updateIndices.consumerGroupSuffix}") String consumerGroupSuffix) { this.updateIndicesService = updateIndicesService; this.isEnabled = isEnabled; this.reprocessUIEvents = reprocessUIEvents; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public UpdateIndicesHook( + UpdateIndicesService updateIndicesService, + @Nonnull Boolean isEnabled, + @Nonnull Boolean reprocessUIEvents) { + this(updateIndicesService, isEnabled, reprocessUIEvents, ""); } @Override diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java index 8dc98d77233ce..59d068a46d8c6 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.kafka.hook.event; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; @@ -29,6 +30,7 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -78,10 +80,11 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook { private static final Set SUPPORTED_OPERATIONS = ImmutableSet.of("CREATE", "UPSERT", "DELETE"); - private final EntityChangeEventGeneratorRegistry _entityChangeEventGeneratorRegistry; + private final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry; private final OperationContext systemOperationContext; - private final SystemEntityClient _entityClient; - private final Boolean _isEnabled; + private final SystemEntityClient entityClient; + private final Boolean isEnabled; + @Getter private final String consumerGroupSuffix; @Autowired public EntityChangeEventGeneratorHook( @@ -89,17 +92,28 @@ public EntityChangeEventGeneratorHook( @Nonnull @Qualifier("entityChangeEventGeneratorRegistry") final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry, @Nonnull final SystemEntityClient entityClient, - @Nonnull @Value("${entityChangeEvents.enabled:true}") Boolean isEnabled) { + @Nonnull @Value("${entityChangeEvents.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${entityChangeEvents.consumerGroupSuffix}") String consumerGroupSuffix) { this.systemOperationContext = systemOperationContext; - _entityChangeEventGeneratorRegistry = + this.entityChangeEventGeneratorRegistry = Objects.requireNonNull(entityChangeEventGeneratorRegistry); - _entityClient = Objects.requireNonNull(entityClient); - _isEnabled = isEnabled; + this.entityClient = Objects.requireNonNull(entityClient); + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public EntityChangeEventGeneratorHook( + @Nonnull OperationContext systemOperationContext, + @Nonnull final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry, + @Nonnull final SystemEntityClient entityClient, + @Nonnull Boolean isEnabled) { + this(systemOperationContext, entityChangeEventGeneratorRegistry, entityClient, isEnabled, ""); } @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override @@ -166,7 +180,7 @@ private List generateChangeEvents( @Nonnull final Aspect to, @Nonnull AuditStamp auditStamp) { final List> entityChangeEventGenerators = - _entityChangeEventGeneratorRegistry.getEntityChangeEventGenerators(aspectName).stream() + entityChangeEventGeneratorRegistry.getEntityChangeEventGenerators(aspectName).stream() // Note: Assumes that correct types have been registered for the aspect. .map(changeEventGenerator -> (EntityChangeEventGenerator) changeEventGenerator) .collect(Collectors.toList()); @@ -186,7 +200,7 @@ private boolean isEligibleForProcessing(final MetadataChangeLog log) { private void emitPlatformEvent( @Nonnull final PlatformEvent event, @Nonnull final String partitioningKey) throws Exception { - _entityClient.producePlatformEvent( + entityClient.producePlatformEvent( systemOperationContext, Constants.CHANGE_EVENT_PLATFORM_EVENT_NAME, partitioningKey, event); } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java index 8d093fe0b8a12..063fa6de92c83 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/form/FormAssignmentHook.java @@ -2,6 +2,7 @@ import static com.linkedin.metadata.Constants.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.linkedin.events.metadata.ChangeType; import com.linkedin.form.DynamicFormAssignment; @@ -15,6 +16,7 @@ import java.util.Objects; import java.util.Set; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -53,17 +55,25 @@ public class FormAssignmentHook implements MetadataChangeLogHook { ImmutableSet.of( ChangeType.UPSERT, ChangeType.CREATE, ChangeType.CREATE_ENTITY, ChangeType.RESTATE); - private final FormService _formService; - private final boolean _isEnabled; + private final FormService formService; + private final boolean isEnabled; private OperationContext systemOperationContext; + @Getter private final String consumerGroupSuffix; @Autowired public FormAssignmentHook( @Nonnull final FormService formService, - @Nonnull @Value("${forms.hook.enabled:true}") Boolean isEnabled) { - _formService = Objects.requireNonNull(formService, "formService is required"); - _isEnabled = isEnabled; + @Nonnull @Value("${forms.hook.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${forms.hook.consumerGroupSuffix}") String consumerGroupSuffix) { + this.formService = Objects.requireNonNull(formService, "formService is required"); + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public FormAssignmentHook(@Nonnull final FormService formService, @Nonnull Boolean isEnabled) { + this(formService, isEnabled, ""); } @Override @@ -74,12 +84,12 @@ public FormAssignmentHook init(@Nonnull OperationContext systemOperationContext) @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override public void invoke(@Nonnull final MetadataChangeLog event) { - if (_isEnabled && isEligibleForProcessing(event)) { + if (isEnabled && isEligibleForProcessing(event)) { if (isFormDynamicFilterUpdated(event)) { handleFormFilterUpdated(event); } @@ -96,7 +106,7 @@ private void handleFormFilterUpdated(@Nonnull final MetadataChangeLog event) { DynamicFormAssignment.class); // 2. Register a automation to assign it. - _formService.upsertFormAssignmentRunner( + formService.upsertFormAssignmentRunner( systemOperationContext, event.getEntityUrn(), formFilters); } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java index 7c03a11a81f7a..5483fed9116e1 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/incident/IncidentsSummaryHook.java @@ -2,6 +2,7 @@ import static com.linkedin.metadata.Constants.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.linkedin.common.IncidentSummaryDetails; import com.linkedin.common.IncidentSummaryDetailsArray; @@ -27,6 +28,7 @@ import java.util.Objects; import java.util.Set; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -57,20 +59,31 @@ public class IncidentsSummaryHook implements MetadataChangeLogHook { ImmutableSet.of(INCIDENT_INFO_ASPECT_NAME, STATUS_ASPECT_NAME); private OperationContext systemOperationContext; - private final IncidentService _incidentService; - private final boolean _isEnabled; + private final IncidentService incidentService; + private final boolean isEnabled; + @Getter private final String consumerGroupSuffix; /** Max number of incidents to allow in incident summary, limited to prevent HTTP errors */ - private final int _maxIncidentHistory; + private final int maxIncidentHistory; @Autowired public IncidentsSummaryHook( @Nonnull final IncidentService incidentService, - @Nonnull @Value("${incidents.hook.enabled:true}") Boolean isEnabled, - @Nonnull @Value("${incidents.hook.maxIncidentHistory:100}") Integer maxIncidentHistory) { - _incidentService = Objects.requireNonNull(incidentService, "incidentService is required"); - _isEnabled = isEnabled; - _maxIncidentHistory = maxIncidentHistory; + @Nonnull @Value("${incidents.hook.enabled}") Boolean isEnabled, + @Nonnull @Value("${incidents.hook.maxIncidentHistory}") Integer maxIncidentHistory, + @Nonnull @Value("${incidents.hook.consumerGroupSuffix}") String consumerGroupSuffix) { + this.incidentService = Objects.requireNonNull(incidentService, "incidentService is required"); + this.isEnabled = isEnabled; + this.maxIncidentHistory = maxIncidentHistory; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public IncidentsSummaryHook( + @Nonnull final IncidentService incidentService, + @Nonnull Boolean isEnabled, + @Nonnull Integer maxIncidentHistory) { + this(incidentService, isEnabled, maxIncidentHistory, ""); } @Override @@ -81,12 +94,12 @@ public IncidentsSummaryHook init(@Nonnull OperationContext systemOperationContex @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override public void invoke(@Nonnull final MetadataChangeLog event) { - if (_isEnabled && isEligibleForProcessing(event)) { + if (isEnabled && isEligibleForProcessing(event)) { log.debug("Urn {} received by Incident Summary Hook.", event.getEntityUrn()); final Urn urn = HookUtils.getUrnFromEvent(event, systemOperationContext.getEntityRegistry()); // Handle the deletion case. @@ -104,7 +117,7 @@ public void invoke(@Nonnull final MetadataChangeLog event) { private void handleIncidentSoftDeleted(@Nonnull final Urn incidentUrn) { // 1. Fetch incident info. IncidentInfo incidentInfo = - _incidentService.getIncidentInfo(systemOperationContext, incidentUrn); + incidentService.getIncidentInfo(systemOperationContext, incidentUrn); // 2. Retrieve associated urns. if (incidentInfo != null) { @@ -127,7 +140,7 @@ private void handleIncidentSoftDeleted(@Nonnull final Urn incidentUrn) { private void handleIncidentUpdated(@Nonnull final Urn incidentUrn) { // 1. Fetch incident info + status IncidentInfo incidentInfo = - _incidentService.getIncidentInfo(systemOperationContext, incidentUrn); + incidentService.getIncidentInfo(systemOperationContext, incidentUrn); // 2. Retrieve associated urns. if (incidentInfo != null) { @@ -179,14 +192,14 @@ private void addIncidentToSummary( IncidentsSummaryUtils.removeIncidentFromResolvedSummary(incidentUrn, summary); // Then, add to active. - IncidentsSummaryUtils.addIncidentToActiveSummary(details, summary, _maxIncidentHistory); + IncidentsSummaryUtils.addIncidentToActiveSummary(details, summary, maxIncidentHistory); } else if (IncidentState.RESOLVED.equals(status.getState())) { // First, ensure this isn't in any summaries anymore. IncidentsSummaryUtils.removeIncidentFromActiveSummary(incidentUrn, summary); // Then, add to resolved. - IncidentsSummaryUtils.addIncidentToResolvedSummary(details, summary, _maxIncidentHistory); + IncidentsSummaryUtils.addIncidentToResolvedSummary(details, summary, maxIncidentHistory); } // 3. Emit the change back! @@ -196,7 +209,7 @@ private void addIncidentToSummary( @Nonnull private IncidentsSummary getIncidentsSummary(@Nonnull final Urn entityUrn) { IncidentsSummary maybeIncidentsSummary = - _incidentService.getIncidentsSummary(systemOperationContext, entityUrn); + incidentService.getIncidentsSummary(systemOperationContext, entityUrn); return maybeIncidentsSummary == null ? new IncidentsSummary() .setResolvedIncidentDetails(new IncidentSummaryDetailsArray()) @@ -260,7 +273,7 @@ private boolean isIncidentUpdate(@Nonnull final MetadataChangeLog event) { private void updateIncidentSummary( @Nonnull final Urn entityUrn, @Nonnull final IncidentsSummary newSummary) { try { - _incidentService.updateIncidentsSummary(systemOperationContext, entityUrn, newSummary); + incidentService.updateIncidentsSummary(systemOperationContext, entityUrn, newSummary); } catch (Exception e) { log.error( String.format( diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java index c13f0f75708f7..5569fade7e6eb 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/ingestion/IngestionSchedulerHook.java @@ -15,6 +15,7 @@ import com.linkedin.mxe.MetadataChangeLog; import io.datahubproject.metadata.context.OperationContext; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -29,27 +30,36 @@ @Component @Import({EntityRegistryFactory.class, IngestionSchedulerFactory.class}) public class IngestionSchedulerHook implements MetadataChangeLogHook { - private final IngestionScheduler _scheduler; - private final boolean _isEnabled; + private final IngestionScheduler scheduler; + private final boolean isEnabled; private OperationContext systemOperationContext; + @Getter private final String consumerGroupSuffix; @Autowired public IngestionSchedulerHook( @Nonnull final IngestionScheduler scheduler, - @Nonnull @Value("${ingestionScheduler.enabled:true}") Boolean isEnabled) { - _scheduler = scheduler; - _isEnabled = isEnabled; + @Nonnull @Value("${ingestionScheduler.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${ingestionScheduler.consumerGroupSuffix}") String consumerGroupSuffix) { + this.scheduler = scheduler; + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public IngestionSchedulerHook( + @Nonnull final IngestionScheduler scheduler, @Nonnull Boolean isEnabled) { + this(scheduler, isEnabled, ""); } @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override public IngestionSchedulerHook init(@Nonnull OperationContext systemOperationContext) { this.systemOperationContext = systemOperationContext; - _scheduler.init(); + scheduler.init(); return this; } @@ -66,11 +76,11 @@ public void invoke(@Nonnull MetadataChangeLog event) { final Urn urn = getUrnFromEvent(event); if (ChangeType.DELETE.equals(event.getChangeType())) { - _scheduler.unscheduleNextIngestionSourceExecution(urn); + scheduler.unscheduleNextIngestionSourceExecution(urn); } else { // Update the scheduler to reflect the latest changes. final DataHubIngestionSourceInfo info = getInfoFromEvent(event); - _scheduler.scheduleNextIngestionSourceExecution(urn, info); + scheduler.scheduleNextIngestionSourceExecution(urn, info); } } } @@ -138,6 +148,6 @@ private DataHubIngestionSourceInfo getInfoFromEvent(final MetadataChangeLog even @VisibleForTesting IngestionScheduler scheduler() { - return _scheduler; + return scheduler; } } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java index f068679da7757..bbe0feed7de11 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -70,17 +71,28 @@ public class SiblingAssociationHook implements MetadataChangeLogHook { private final SystemEntityClient systemEntityClient; private final EntitySearchService entitySearchService; - private final boolean _isEnabled; + private final boolean isEnabled; private OperationContext systemOperationContext; + @Getter private final String consumerGroupSuffix; @Autowired public SiblingAssociationHook( @Nonnull final SystemEntityClient systemEntityClient, @Nonnull final EntitySearchService searchService, - @Nonnull @Value("${siblings.enabled:true}") Boolean isEnabled) { + @Nonnull @Value("${siblings.enabled:true}") Boolean isEnabled, + @Nonnull @Value("${siblings.consumerGroupSuffix}") String consumerGroupSuffix) { this.systemEntityClient = systemEntityClient; entitySearchService = searchService; - _isEnabled = isEnabled; + this.isEnabled = isEnabled; + this.consumerGroupSuffix = consumerGroupSuffix; + } + + @VisibleForTesting + public SiblingAssociationHook( + @Nonnull final SystemEntityClient systemEntityClient, + @Nonnull final EntitySearchService searchService, + @Nonnull Boolean isEnabled) { + this(systemEntityClient, searchService, isEnabled, ""); } @Value("${siblings.enabled:false}") @@ -99,7 +111,7 @@ public SiblingAssociationHook init(@Nonnull OperationContext systemOperationCont @Override public boolean isEnabled() { - return _isEnabled; + return isEnabled; } @Override diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLGMSSpringTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLGMSSpringTest.java index c2a8de161eafe..10f149e606295 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLGMSSpringTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLGMSSpringTest.java @@ -3,7 +3,7 @@ import static org.testng.AssertJUnit.*; import com.linkedin.gms.factory.config.ConfigurationProvider; -import com.linkedin.metadata.kafka.MetadataChangeLogProcessor; +import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar; import com.linkedin.metadata.kafka.hook.UpdateIndicesHook; import com.linkedin.metadata.kafka.hook.event.EntityChangeEventGeneratorHook; import com.linkedin.metadata.kafka.hook.incident.IncidentsSummaryHook; @@ -35,23 +35,23 @@ public class MCLGMSSpringTest extends AbstractTestNGSpringContextTests { @Test public void testHooks() { - MetadataChangeLogProcessor metadataChangeLogProcessor = - applicationContext.getBean(MetadataChangeLogProcessor.class); + MCLKafkaListenerRegistrar registrar = + applicationContext.getBean(MCLKafkaListenerRegistrar.class); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .noneMatch(hook -> hook instanceof IngestionSchedulerHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof UpdateIndicesHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof SiblingAssociationHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof EntityChangeEventGeneratorHook)); assertEquals( 1, - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .filter(hook -> hook instanceof IncidentsSummaryHook) .count()); } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java index 23de7707cc571..2049e974999b1 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java @@ -4,7 +4,7 @@ import static org.testng.AssertJUnit.assertTrue; import com.linkedin.gms.factory.config.ConfigurationProvider; -import com.linkedin.metadata.kafka.MetadataChangeLogProcessor; +import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar; import com.linkedin.metadata.kafka.hook.UpdateIndicesHook; import com.linkedin.metadata.kafka.hook.event.EntityChangeEventGeneratorHook; import com.linkedin.metadata.kafka.hook.incident.IncidentsSummaryHook; @@ -33,23 +33,23 @@ public class MCLMAESpringTest extends AbstractTestNGSpringContextTests { @Test public void testHooks() { - MetadataChangeLogProcessor metadataChangeLogProcessor = - applicationContext.getBean(MetadataChangeLogProcessor.class); + MCLKafkaListenerRegistrar registrar = + applicationContext.getBean(MCLKafkaListenerRegistrar.class); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .noneMatch(hook -> hook instanceof IngestionSchedulerHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof UpdateIndicesHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof SiblingAssociationHook)); assertTrue( - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .anyMatch(hook -> hook instanceof EntityChangeEventGeneratorHook)); assertEquals( 1, - metadataChangeLogProcessor.getHooks().stream() + registrar.getMetadataChangeLogHooks().stream() .filter(hook -> hook instanceof IncidentsSummaryHook) .count()); } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java index f6f71a12a6951..68768051eccad 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java @@ -34,10 +34,13 @@ @ComponentScan( basePackages = { "com.linkedin.metadata.kafka", - "com.linkedin.gms.factory.kafka.common", - "com.linkedin.gms.factory.kafka.schemaregistry", + "com.linkedin.gms.factory.kafka", "com.linkedin.gms.factory.entity.update.indices", - "com.linkedin.gms.factory.timeline.eventgenerator" + "com.linkedin.gms.factory.timeline.eventgenerator", + "com.linkedin.metadata.dao.producer", + "com.linkedin.gms.factory.change", + "com.datahub.event.hook", + "com.linkedin.gms.factory.notifications" }) public class MCLSpringCommonTestConfiguration { diff --git a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java index c4116b314254c..358a2ac0c2ee3 100644 --- a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java +++ b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java @@ -3,9 +3,7 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.datahub.event.hook.BusinessAttributeUpdateHook; import com.datahub.event.hook.PlatformEventHook; -import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; import com.linkedin.metadata.EventUtils; import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.mxe.PlatformEvent; @@ -21,7 +19,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -29,7 +26,6 @@ @Slf4j @Component @Conditional(PlatformEventProcessorCondition.class) -@Import({BusinessAttributeUpdateHook.class, KafkaEventConsumerFactory.class}) @EnableKafka public class PlatformEventProcessor { @@ -49,6 +45,11 @@ public PlatformEventProcessor( platformEventHooks.stream() .filter(PlatformEventHook::isEnabled) .collect(Collectors.toList()); + log.info( + "Enabled platform hooks: {}", + this.hooks.stream() + .map(hook -> hook.getClass().getSimpleName()) + .collect(Collectors.toList())); this.hooks.forEach(PlatformEventHook::init); } diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 2514060ff2d61..5b3673ddca52c 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -296,10 +296,18 @@ metadataTests: siblings: enabled: ${ENABLE_SIBLING_HOOK:true} # enable to turn on automatic sibling associations for dbt + consumerGroupSuffix: ${SIBLINGS_HOOK_CONSUMER_GROUP_SUFFIX:} updateIndices: enabled: ${ENABLE_UPDATE_INDICES_HOOK:true} + consumerGroupSuffix: ${UPDATE_INDICES_CONSUMER_GROUP_SUFFIX:} ingestionScheduler: enabled: ${ENABLE_INGESTION_SCHEDULER_HOOK:true} # enable to execute ingestion scheduling + consumerGroupSuffix: ${INGESTION_SCHEDULER_HOOK_CONSUMER_GROUP_SUFFIX:} +incidents: + hook: + enabled: ${ENABLE_INCIDENTS_HOOK:true} + maxIncidentHistory: ${MAX_INCIDENT_HISTORY:100} + consumerGroupSuffix: ${INCIDENTS_HOOK_CONSUMER_GROUP_SUFFIX:} bootstrap: upgradeDefaultBrowsePaths: @@ -376,6 +384,7 @@ featureFlags: entityChangeEvents: enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true} + consumerGroupSuffix: ${ECE_CONSUMER_GROUP_SUFFIX:} views: enabled: ${VIEWS_ENABLED:true} @@ -460,6 +469,7 @@ springdoc.api-docs.groups.enabled: true forms: hook: enabled: { $FORMS_HOOK_ENABLED:true } + consumerGroupSuffix: ${FORMS_HOOK_CONSUMER_GROUP_SUFFIX:} businessAttribute: fetchRelatedEntitiesCount: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_COUNT:20000} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index 9501b03482d04..aecb4f0afb12c 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -96,7 +96,7 @@ private static Map buildCustomizedProperties( } @Bean(name = "kafkaEventConsumer") - protected KafkaListenerContainerFactory createInstance( + protected KafkaListenerContainerFactory kafkaEventConsumer( @Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory kafkaConsumerFactory, @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { diff --git a/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java b/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java index bc623c3cc983c..e47a2b4e278e4 100644 --- a/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java +++ b/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java @@ -37,7 +37,10 @@ "com.linkedin.gms.factory.search", "com.linkedin.gms.factory.secret", "com.linkedin.gms.factory.timeseries", - "com.linkedin.gms.factory.plugins" + "com.linkedin.gms.factory.plugins", + "com.linkedin.gms.factory.change", + "com.datahub.event.hook", + "com.linkedin.gms.factory.notifications" }) @PropertySource(value = "classpath:/application.yaml", factory = YamlPropertySourceFactory.class) @Configuration From 712221458e4e92f1851d2aaac9827a68be9ea428 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Fri, 9 Aug 2024 10:57:48 -0500 Subject: [PATCH 2/2] add docs --- docs/how/kafka-config.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/how/kafka-config.md b/docs/how/kafka-config.md index 2f20e8b548f83..06c7418f16713 100644 --- a/docs/how/kafka-config.md +++ b/docs/how/kafka-config.md @@ -116,6 +116,27 @@ We've included an environment variable to customize the consumer group id, if yo - `KAFKA_CONSUMER_GROUP_ID`: The name of the kafka consumer's group id. +#### datahub-mae-consumer MCL Hooks + +By default, all MetadataChangeLog processing hooks execute as part of the same kafka consumer group based on the +previously mentioned `KAFKA_CONSUMER_GROUP_ID`. + +The various MCL Hooks could alsp be separated into separate groups which allows for controlling parallelization and +prioritization of the hooks. + +For example, the `UpdateIndicesHook` and `SiblingsHook` processing can be delayed by other hooks. Separating these +hooks into their own group can reduce latency from these other hooks. The `application.yaml` configuration +includes options for assigning a suffix to the consumer group, see `consumerGroupSuffix`. + +| Environment Variable | Default | Description | +|------------------------------------------------|---------|---------------------------------------------------------------------------------------------| +| SIBLINGS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Siblings processing hook. Considered one of the primary hooks in the `datahub-mae-consumer` | +| UPDATE_INDICES_CONSUMER_GROUP_SUFFIX | '' | Primary processing hook. | +| INGESTION_SCHEDULER_HOOK_CONSUMER_GROUP_SUFFIX | '' | Scheduled ingestion hook. | +| INCIDENTS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Incidents hook. | +| ECE_CONSUMER_GROUP_SUFFIX | '' | Entity Change Event hook which publishes to the Platform Events topic. | +| FORMS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Forms processing. | + ## Applying Configurations ### Docker