diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index ff8e7acd5..f6f091846 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -249,406 +249,357 @@ public void resume(@Nonnull String id) { @Override public void process(BeanDefinition beanDefinition, ExecutableMethod method) { - List> topicAnnotations = method.getDeclaredAnnotationValuesByType(Topic.class); - AnnotationValue consumerAnnotation = method.getAnnotation(KafkaListener.class); - + final AnnotationValue consumerAnnotation = method.getAnnotation(KafkaListener.class); if (CollectionUtils.isEmpty(topicAnnotations)) { topicAnnotations = beanDefinition.getDeclaredAnnotationValuesByType(Topic.class); } + if (consumerAnnotation == null || CollectionUtils.isEmpty(topicAnnotations)) { + return; // No topics to consume + } + final Class beanType = beanDefinition.getBeanType(); + final String groupId = consumerAnnotation.stringValue("groupId") + .filter(StringUtils::isNotEmpty) + .orElseGet(() -> applicationConfiguration.getName().orElse(beanType.getName())); + final String clientId = consumerAnnotation.stringValue("clientId") + .filter(StringUtils::isNotEmpty) + .orElseGet(() -> applicationConfiguration.getName().map(s -> s + '-' + NameUtils.hyphenate(beanType.getSimpleName())).orElse(null)); + final OffsetStrategy offsetStrategy = consumerAnnotation.enumValue("offsetStrategy", OffsetStrategy.class) + .orElse(OffsetStrategy.AUTO); + final AbstractKafkaConsumerConfiguration consumerConfigurationDefaults = beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName(groupId)) + .orElse(defaultConsumerConfiguration); + final DefaultKafkaConsumerConfiguration consumerConfiguration = new DefaultKafkaConsumerConfiguration<>(consumerConfigurationDefaults); + final Properties properties = createConsumerProperties(method, consumerAnnotation, consumerConfiguration, clientId, groupId, offsetStrategy); + configureDeserializers(method, consumerConfiguration); + submitConsumerThreads(method, clientId, offsetStrategy, topicAnnotations, consumerAnnotation, consumerConfiguration, properties, beanType); + } - if (consumerAnnotation != null && !CollectionUtils.isEmpty(topicAnnotations)) { - - Duration pollTimeout = method.getValue(KafkaListener.class, "pollTimeout", Duration.class) - .orElse(Duration.ofMillis(100)); - - Duration sessionTimeout = method.getValue(KafkaListener.class, "sessionTimeout", Duration.class) - .orElse(null); - - Duration heartbeatInterval = method.getValue(KafkaListener.class, "heartbeatInterval", Duration.class) - .orElse(null); - - boolean isBatch = method.isTrue(KafkaListener.class, "batch"); - - Optional consumerArg = Arrays.stream(method.getArguments()).filter(arg -> Consumer.class.isAssignableFrom(arg.getType())).findFirst(); - Optional ackArg = Arrays.stream(method.getArguments()) - .filter(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()) || - io.micronaut.messaging.Acknowledgement.class.isAssignableFrom(arg.getType())) - .findFirst(); - - String groupId = consumerAnnotation.stringValue("groupId").orElse(null); - - Class beanType = beanDefinition.getBeanType(); - if (StringUtils.isEmpty(groupId)) { - groupId = applicationConfiguration.getName().orElse(beanType.getName()); - } - - boolean hasUniqueGroupId = consumerAnnotation.isTrue("uniqueGroupId"); - String uniqueGroupId = groupId; - if (hasUniqueGroupId) { - uniqueGroupId += "_" + UUID.randomUUID().toString(); - } - - String clientId = consumerAnnotation.stringValue("clientId").orElse(null); - if (StringUtils.isEmpty(clientId)) { - clientId = applicationConfiguration.getName().map(s -> s + '-' + NameUtils.hyphenate(beanType.getSimpleName())).orElse(null); - } + @Override + @PreDestroy + public void close() { + for (Consumer consumer : consumers.values()) { + consumer.wakeup(); + } + consumers.clear(); + } - OffsetStrategy offsetStrategy = consumerAnnotation.enumValue("offsetStrategy", OffsetStrategy.class).orElse(OffsetStrategy.AUTO); - int consumerThreads = consumerAnnotation.intValue("threads").orElse(1); + private Properties createConsumerProperties(final ExecutableMethod method, final AnnotationValue consumerAnnotation, + final DefaultKafkaConsumerConfiguration consumerConfiguration, final String clientId, + final String groupId, final OffsetStrategy offsetStrategy) { + final Properties properties = consumerConfiguration.getConfig(); - AbstractKafkaConsumerConfiguration consumerConfigurationDefaults = beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName(groupId)) - .orElse(defaultConsumerConfiguration); + if (consumerAnnotation.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) { + properties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetReset.EARLIEST.name().toLowerCase()); + } - DefaultKafkaConsumerConfiguration consumerConfiguration = new DefaultKafkaConsumerConfiguration<>(consumerConfigurationDefaults); + // enable auto commit offsets if necessary + properties.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(offsetStrategy == OffsetStrategy.AUTO)); - Properties properties = consumerConfiguration.getConfig(); + method.getValue(KafkaListener.class, "heartbeatInterval", Duration.class) + .map(Duration::toMillis) + .map(String::valueOf) + .ifPresent(heartbeatInterval -> properties.putIfAbsent(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval)); - if (consumerAnnotation.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) { - properties.putIfAbsent( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - OffsetReset.EARLIEST.name().toLowerCase() - ); + method.getValue(KafkaListener.class, "sessionTimeout", Duration.class) + .map(Duration::toMillis) + .map(String::valueOf) + .ifPresent(sessionTimeout -> properties.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout)); - } + if (consumerAnnotation.isTrue("uniqueGroupId")) { + final String uniqueGroupId = groupId + "_" + UUID.randomUUID().toString(); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, uniqueGroupId); + } else { + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + } - // enable auto commit offsets if necessary - properties.putIfAbsent( - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - String.valueOf(offsetStrategy == OffsetStrategy.AUTO) - ); - - if (heartbeatInterval != null) { - properties.putIfAbsent( - ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, - String.valueOf(heartbeatInterval.toMillis()) - ); - } + if (clientId != null) { + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + } - if (sessionTimeout != null) { - long sessionTimeoutMillis = sessionTimeout.toMillis(); - properties.putIfAbsent( - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, - String.valueOf(sessionTimeoutMillis) - ); - } + properties.putAll(consumerAnnotation.getProperties("properties", "name")); + return properties; + } - properties.put(ConsumerConfig.GROUP_ID_CONFIG, hasUniqueGroupId ? uniqueGroupId : groupId); + private void debugDeserializationConfiguration(final ExecutableMethod method, final DefaultKafkaConsumerConfiguration consumerConfiguration, + final Properties properties) { + if (!LOG.isDebugEnabled()) { + return; + } + final Optional keyDeserializer = consumerConfiguration.getKeyDeserializer(); + if (consumerConfiguration.getKeyDeserializer().isPresent()) { + LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), method); + } else { + LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), method); + } + final Optional valueDeserializer = consumerConfiguration.getValueDeserializer(); + if (valueDeserializer.isPresent()) { + LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), method); + } else { + LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), method); + } + } + private void submitConsumerThreads(final ExecutableMethod method, final String clientId, final OffsetStrategy offsetStrategy, + final List> topicAnnotations, final AnnotationValue consumerAnnotation, + final DefaultKafkaConsumerConfiguration consumerConfiguration, final Properties properties, final Class beanType) { + final int consumerThreads = consumerAnnotation.intValue("threads").orElse(1); + for (int i = 0; i < consumerThreads; i++) { + final String finalClientId; if (clientId != null) { - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); - } - - properties.putAll(consumerAnnotation.getProperties("properties", "name")); - - configureDeserializers(method, consumerConfiguration); - - if (LOG.isDebugEnabled()) { - Optional kd = consumerConfiguration.getKeyDeserializer(); - if (kd.isPresent()) { - LOG.debug("Using key deserializer [{}] for Kafka listener: {}", kd.get(), method); + if (consumerThreads > 1) { + finalClientId = clientId + '-' + clientIdGenerator.incrementAndGet(); } else { - LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), method); - } - - Optional vd = consumerConfiguration.getValueDeserializer(); - if (vd.isPresent()) { - LOG.debug("Using value deserializer [{}] for Kafka listener: {}", vd.get(), method); - } else { - LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), method); + finalClientId = clientId; } + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, finalClientId); + } else { + finalClientId = "kafka-consumer-" + clientIdGenerator.incrementAndGet(); } + submitConsumerThread(method, finalClientId, offsetStrategy, topicAnnotations, consumerAnnotation, consumerConfiguration, beanType); + } + } - for (int i = 0; i < consumerThreads; i++) { - String finalClientId; - if (clientId != null) { - if (consumerThreads > 1) { - finalClientId = clientId + '-' + clientIdGenerator.incrementAndGet(); - } else { - finalClientId = clientId; - } - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, finalClientId); - } else { - finalClientId = "kafka-consumer-" + clientIdGenerator.incrementAndGet(); - } - - Consumer kafkaConsumer = beanContext.createBean(Consumer.class, consumerConfiguration); - - consumers.put(finalClientId, kafkaConsumer); - - Object consumerBean = beanContext.getBean(beanType); - - if (consumerBean instanceof ConsumerAware) { - //noinspection unchecked - ((ConsumerAware) consumerBean).setKafkaConsumer(kafkaConsumer); - } - - for (AnnotationValue topicAnnotation : topicAnnotations) { - - String[] topicNames = topicAnnotation.stringValues(); - String[] patterns = topicAnnotation.stringValues("patterns"); - boolean hasTopics = ArrayUtils.isNotEmpty(topicNames); - boolean hasPatterns = ArrayUtils.isNotEmpty(patterns); + private void submitConsumerThread(final ExecutableMethod method, final String finalClientId, final OffsetStrategy offsetStrategy, + final List> topicAnnotations, final AnnotationValue consumerAnnotation, + final DefaultKafkaConsumerConfiguration consumerConfiguration, final Class beanType) { + final Consumer kafkaConsumer = beanContext.createBean(Consumer.class, consumerConfiguration); + consumers.put(finalClientId, kafkaConsumer); + final Object consumerBean = beanContext.getBean(beanType); + if (consumerBean instanceof ConsumerAware) { + //noinspection unchecked + ((ConsumerAware) consumerBean).setKafkaConsumer(kafkaConsumer); + } + setupConsumerSubscription(method, topicAnnotations, consumerBean, kafkaConsumer); + consumerSubscriptions.put(finalClientId, Collections.unmodifiableSet(kafkaConsumer.subscription())); + executorService.submit(() -> createConsumerThreadPollLoop(method, finalClientId, offsetStrategy, consumerAnnotation, consumerBean, kafkaConsumer)); + } - if (!hasTopics && !hasPatterns) { - throw new MessagingSystemException("Either a topic or a topic must be specified for method: " + method); + @SuppressWarnings("squid:S2189") + private void createConsumerThreadPollLoop(final ExecutableMethod method, final String finalClientId, final OffsetStrategy offsetStrategy, + final AnnotationValue consumerAnnotation, final Object consumerBean, final Consumer kafkaConsumer) { + + final boolean isBatch = method.isTrue(KafkaListener.class, "batch"); + final Duration pollTimeout = method.getValue(KafkaListener.class, "pollTimeout", Duration.class) + .orElseGet(() -> Duration.ofMillis(100)); + final Optional consumerArg = Arrays.stream(method.getArguments()) + .filter(arg -> Consumer.class.isAssignableFrom(arg.getType())) + .findFirst(); + final Optional ackArg = Arrays.stream(method.getArguments()) + .filter(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()) || io.micronaut.messaging.Acknowledgement.class.isAssignableFrom(arg.getType())) + .findFirst(); + + try { + + final boolean trackPartitions = ackArg.isPresent() || offsetStrategy == OffsetStrategy.SYNC_PER_RECORD || offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD; + final Map, Object> boundArguments = new HashMap<>(2); + consumerArg.ifPresent(argument -> boundArguments.put(argument, kafkaConsumer)); + + boolean consumerPaused = false; + + //noinspection InfiniteLoopStatement + while (true) { + consumerAssignments.put(finalClientId, Collections.unmodifiableSet(kafkaConsumer.assignment())); + try { + if (!consumerPaused && paused.contains(finalClientId)) { + consumerPaused = true; + LOG.debug("Pausing Kafka consumption for Consumer [{}] from topic partition: {}", finalClientId, kafkaConsumer.paused()); + kafkaConsumer.pause(kafkaConsumer.assignment()); + pausedConsumers.put(finalClientId, kafkaConsumer); + } + final ConsumerRecords consumerRecords = kafkaConsumer.poll(pollTimeout); + if (consumerPaused && !paused.contains(finalClientId)) { + LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", finalClientId, kafkaConsumer.paused()); + kafkaConsumer.resume(kafkaConsumer.paused()); + pausedConsumers.remove(finalClientId); + consumerPaused = false; } - if (hasTopics) { - List topics = Arrays.asList(topicNames); - if (consumerBean instanceof ConsumerRebalanceListener) { - kafkaConsumer.subscribe(topics, (ConsumerRebalanceListener) consumerBean); - } else { - kafkaConsumer.subscribe(topics); - } - - LOG.info("Kafka listener [{}] subscribed to topics: {}", method, topics); + if (consumerRecords == null || consumerRecords.count() <= 0) { + continue; // No consumer records to process } - if (hasPatterns) { - for (String pattern : patterns) { - Pattern p; + final boolean failed; + if (isBatch) { + failed = !processConsumerRecordsAsBatch(method, consumerBean, kafkaConsumer, consumerAnnotation, boundArguments, consumerRecords); + } else { + failed = !processConsumerRecords(method, offsetStrategy, consumerBean, kafkaConsumer, consumerAnnotation, + boundArguments, trackPartitions, ackArg, consumerRecords); + } + if (!failed) { + if (offsetStrategy == OffsetStrategy.SYNC) { try { - p = Pattern.compile(pattern); - } catch (Exception e) { - throw new MessagingSystemException("Invalid topic pattern [" + pattern + "] for method [" + method + "]: " + e.getMessage(), e); - } - - if (consumerBean instanceof ConsumerRebalanceListener) { - kafkaConsumer.subscribe(p, (ConsumerRebalanceListener) consumerBean); - } else { - kafkaConsumer.subscribe(p); + kafkaConsumer.commitSync(); + } catch (CommitFailedException e) { + handleException(kafkaConsumer, consumerBean, null, e); } - - LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", method, pattern); + } else if (offsetStrategy == OffsetStrategy.ASYNC) { + kafkaConsumer.commitAsync(resolveCommitCallback(consumerBean)); } } + + } catch (WakeupException e) { + throw e; + } catch (Throwable e) { + handleException(kafkaConsumer, consumerBean, null, e); } - consumerSubscriptions.put(finalClientId, Collections.unmodifiableSet(kafkaConsumer.subscription())); - executorService.submit(() -> { - try { + } + } catch (WakeupException e) { + // ignore for shutdown + } finally { + try { + if (offsetStrategy != OffsetStrategy.DISABLED) { + kafkaConsumer.commitSync(); + } + } catch (Throwable e) { + LOG.warn("Error committing Kafka offsets on shutdown: {}", e.getMessage(), e); + } finally { + kafkaConsumer.close(); + } + } + } - boolean trackPartitions = ackArg.isPresent() || offsetStrategy == OffsetStrategy.SYNC_PER_RECORD || offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD; - Map, Object> boundArguments = new HashMap<>(2); - consumerArg.ifPresent(argument -> boundArguments.put(argument, kafkaConsumer)); + private boolean processConsumerRecords(final ExecutableMethod method, final OffsetStrategy offsetStrategy, final Object consumerBean, + final Consumer kafkaConsumer, final AnnotationValue consumerAnnotation, + final Map, Object> boundArguments, final boolean trackPartitions, + final Optional ackArg, final ConsumerRecords consumerRecords) { + final ExecutableBinder> executableBinder = new DefaultExecutableBinder<>(boundArguments); + final Map currentOffsets = trackPartitions ? new HashMap<>() : null; + for (final ConsumerRecord consumerRecord : consumerRecords) { - boolean consumerPaused = false; + LOG.trace("Kafka consumer [{}] received record: {}", method, consumerRecord); - //noinspection InfiniteLoopStatement - while (true) { - consumerAssignments.put(finalClientId, Collections.unmodifiableSet(kafkaConsumer.assignment())); - try { - if (!consumerPaused && paused.contains(finalClientId)) { - consumerPaused = true; - LOG.debug("Pausing Kafka consumption for Consumer [{}] from topic partition: {}", finalClientId, kafkaConsumer.paused()); - kafkaConsumer.pause(kafkaConsumer.assignment()); - pausedConsumers.put(finalClientId, kafkaConsumer); - } + if (trackPartitions) { + final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null); + currentOffsets.put(topicPartition, offsetAndMetadata); + } - ConsumerRecords consumerRecords = kafkaConsumer.poll(pollTimeout); - boolean failed = false; - if (consumerPaused && !paused.contains(finalClientId)) { - LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", finalClientId, kafkaConsumer.paused()); - kafkaConsumer.resume( - kafkaConsumer.paused() - ); - pausedConsumers.remove(finalClientId); - consumerPaused = false; - } + ackArg.ifPresent(argument -> boundArguments.put(argument, (KafkaAcknowledgement) () -> kafkaConsumer.commitSync(currentOffsets))); + + try { + final BoundExecutable boundExecutable = executableBinder.bind(method, binderRegistry, consumerRecord); + final Object result = boundExecutable.invoke(consumerBean); + if (result != null) { + final Flowable resultFlowable; + final boolean isBlocking; + if (Publishers.isConvertibleToPublisher(result)) { + resultFlowable = Publishers.convertPublisher(result, Flowable.class); + isBlocking = method.hasAnnotation(Blocking.class); + } else { + resultFlowable = Flowable.just(result); + isBlocking = true; + } + handleResultFlowable(consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, resultFlowable, isBlocking); + } + } catch (Throwable e) { + handleException(kafkaConsumer, consumerBean, consumerRecord, e); + return false; + } - Map currentOffsets = trackPartitions ? new HashMap<>() : null; - if (consumerRecords != null && consumerRecords.count() > 0) { - - if (isBatch) { - - ExecutableBinder> batchBinder = new DefaultExecutableBinder<>(boundArguments); - BoundExecutable boundExecutable = batchBinder.bind(method, batchBinderRegistry, consumerRecords); - Object result = boundExecutable.invoke(consumerBean); - - // handle batch result - if (result != null) { - if (result.getClass().isArray()) { - result = Arrays.asList((Object[]) result); - } - - boolean isPublisher = Publishers.isConvertibleToPublisher(result); - Flowable resultFlowable; - if (result instanceof Iterable) { - resultFlowable = Flowable.fromIterable((Iterable) result); - } else { - if (isPublisher) { - resultFlowable = Publishers.convertPublisher(result, Flowable.class); - } else { - resultFlowable = Flowable.just(result); - } - } - - Iterator> iterator = consumerRecords.iterator(); - boolean isBlocking = !isPublisher || method.hasAnnotation(Blocking.class); - - if (isBlocking) { - resultFlowable.blockingSubscribe(o -> { - if (iterator.hasNext()) { - ConsumerRecord consumerRecord = iterator.next(); - - handleResultFlowable( - consumerAnnotation, - consumerBean, - method, - kafkaConsumer, - consumerRecord, - Flowable.just(o), - isBlocking - ); - } - }); - } else { - resultFlowable.forEach(o -> { - if (iterator.hasNext()) { - ConsumerRecord consumerRecord = iterator.next(); - - handleResultFlowable( - consumerAnnotation, - consumerBean, - method, - kafkaConsumer, - consumerRecord, - Flowable.just(o), - isBlocking - ); - } - }); - } + if (offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) { + try { + kafkaConsumer.commitSync(currentOffsets); + } catch (CommitFailedException e) { + handleException(kafkaConsumer, consumerBean, consumerRecord, e); + } + } else if (offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) { + kafkaConsumer.commitAsync(currentOffsets, resolveCommitCallback(consumerBean)); + } + } + return true; + } - } + private boolean processConsumerRecordsAsBatch(final ExecutableMethod method, final Object consumerBean, final Consumer kafkaConsumer, + final AnnotationValue consumerAnnotation, final Map, Object> boundArguments, + final ConsumerRecords consumerRecords) { + final ExecutableBinder> batchBinder = new DefaultExecutableBinder<>(boundArguments); + final BoundExecutable boundExecutable = batchBinder.bind(method, batchBinderRegistry, consumerRecords); + Object result = boundExecutable.invoke(consumerBean); - } else { - ExecutableBinder> executableBinder = new DefaultExecutableBinder<>(boundArguments); - for (ConsumerRecord consumerRecord : consumerRecords) { - - LOG.trace("Kafka consumer [{}] received record: {}", method, consumerRecord); - - if (trackPartitions) { - currentOffsets.put(new TopicPartition( - consumerRecord.topic(), - consumerRecord.partition()), - new OffsetAndMetadata(consumerRecord.offset() + 1, null) - ); - } - - if (ackArg.isPresent()) { - boundArguments.put(ackArg.get(), (KafkaAcknowledgement) () -> kafkaConsumer.commitSync( - currentOffsets - )); - } - - try { - BoundExecutable boundExecutable = executableBinder.bind(method, binderRegistry, consumerRecord); - Object result = boundExecutable.invoke( - consumerBean - ); - - if (result != null) { - Flowable resultFlowable; - boolean isBlocking; - if (Publishers.isConvertibleToPublisher(result)) { - resultFlowable = Publishers.convertPublisher(result, Flowable.class); - isBlocking = method.hasAnnotation(Blocking.class); - } else { - resultFlowable = Flowable.just(result); - isBlocking = true; - } - - handleResultFlowable( - consumerAnnotation, - consumerBean, - method, - kafkaConsumer, - consumerRecord, - resultFlowable, - isBlocking - ); - } - } catch (Throwable e) { - handleException(kafkaConsumer, consumerBean, consumerRecord, e); - // break out of the poll loop so that re-delivery can be attempted - failed = true; - break; - } - - if (offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) { - try { - kafkaConsumer.commitSync( - currentOffsets - ); - } catch (CommitFailedException e) { - handleException(kafkaConsumer, consumerBean, consumerRecord, e); - } - } else if (offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) { - kafkaConsumer.commitAsync(currentOffsets, resolveCommitCallback(consumerBean)); - } - } - } - - if (!failed) { - if (offsetStrategy == OffsetStrategy.SYNC) { - try { - kafkaConsumer.commitSync(); - } catch (CommitFailedException e) { - handleException(kafkaConsumer, consumerBean, null, e); - } - } else if (offsetStrategy == OffsetStrategy.ASYNC) { - kafkaConsumer.commitAsync(resolveCommitCallback(consumerBean)); - } - } - } + if (result != null) { + if (result.getClass().isArray()) { + result = Arrays.asList((Object[]) result); + } - } catch (WakeupException e) { - throw e; - } catch (Throwable e) { - handleException(kafkaConsumer, consumerBean, null, e); - } + final boolean isPublisher = Publishers.isConvertibleToPublisher(result); + final Flowable resultFlowable; + if (result instanceof Iterable) { + resultFlowable = Flowable.fromIterable((Iterable) result); + } else if (isPublisher) { + resultFlowable = Publishers.convertPublisher(result, Flowable.class); + } else { + resultFlowable = Flowable.just(result); + } - } - } catch (WakeupException e) { - // ignore for shutdown - } finally { - try { - if (offsetStrategy != OffsetStrategy.DISABLED) { - kafkaConsumer.commitSync(); - } - } catch (Throwable e) { - LOG.warn("Error committing Kafka offsets on shutdown: {}", e.getMessage(), e); - } finally { - kafkaConsumer.close(); - } + final Iterator> iterator = consumerRecords.iterator(); + final boolean isBlocking = !isPublisher || method.hasAnnotation(Blocking.class); + if (isBlocking) { + resultFlowable.blockingSubscribe(o -> { + if (iterator.hasNext()) { + final ConsumerRecord consumerRecord = iterator.next(); + handleResultFlowable(consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, Flowable.just(o), isBlocking); + } + }); + } else { + resultFlowable.forEach(o -> { + if (iterator.hasNext()) { + final ConsumerRecord consumerRecord = iterator.next(); + handleResultFlowable(consumerAnnotation, consumerBean, method, kafkaConsumer, consumerRecord, Flowable.just(o), isBlocking); } }); } } + return true; } - @Override - @PreDestroy - public void close() { - for (Consumer consumer : consumers.values()) { - consumer.wakeup(); + private static void setupConsumerSubscription(final ExecutableMethod method, final List> topicAnnotations, + final Object consumerBean, final Consumer kafkaConsumer) { + for (final AnnotationValue topicAnnotation : topicAnnotations) { + + final String[] topicNames = topicAnnotation.stringValues(); + final String[] patterns = topicAnnotation.stringValues("patterns"); + final boolean hasTopics = ArrayUtils.isNotEmpty(topicNames); + final boolean hasPatterns = ArrayUtils.isNotEmpty(patterns); + + if (!hasTopics && !hasPatterns) { + throw new MessagingSystemException("Either a topic or a topic must be specified for method: " + method); + } + + if (hasTopics) { + final List topics = Arrays.asList(topicNames); + if (consumerBean instanceof ConsumerRebalanceListener) { + kafkaConsumer.subscribe(topics, (ConsumerRebalanceListener) consumerBean); + } else { + kafkaConsumer.subscribe(topics); + } + LOG.info("Kafka listener [{}] subscribed to topics: {}", method, topics); + } + + if (hasPatterns) { + for (final String pattern : patterns) { + final Pattern compiledPattern; + try { + compiledPattern = Pattern.compile(pattern); + } catch (Exception e) { + throw new MessagingSystemException("Invalid topic pattern [" + pattern + "] for method [" + method + "]: " + e.getMessage(), e); + } + if (consumerBean instanceof ConsumerRebalanceListener) { + kafkaConsumer.subscribe(compiledPattern, (ConsumerRebalanceListener) consumerBean); + } else { + kafkaConsumer.subscribe(compiledPattern); + } + LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", method, pattern); + } + } } - consumers.clear(); } - private void handleException(Consumer kafkaConsumer, Object consumerBean, ConsumerRecord consumerRecord, Throwable e) { - KafkaListenerException kafkaListenerException = new KafkaListenerException( - e, - consumerBean, - kafkaConsumer, - consumerRecord - - ); + private void handleException(final Consumer kafkaConsumer, final Object consumerBean, final ConsumerRecord consumerRecord, final Throwable e) { + final KafkaListenerException kafkaListenerException = new KafkaListenerException(e, consumerBean, kafkaConsumer, consumerRecord); handleException(consumerBean, kafkaListenerException); } - private void handleException(Object consumerBean, KafkaListenerException kafkaListenerException) { + private void handleException(final Object consumerBean, final KafkaListenerException kafkaListenerException) { if (consumerBean instanceof KafkaListenerExceptionHandler) { ((KafkaListenerExceptionHandler) consumerBean).handle(kafkaListenerException); } else { @@ -769,87 +720,72 @@ private void handleResultFlowable( } } - private Argument findBodyArgument(ExecutableMethod method) { + private static Argument findBodyArgument(ExecutableMethod method) { return Arrays.stream(method.getArguments()) .filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(Body.class)) .findFirst() - .orElseGet(() -> - Arrays.stream(method.getArguments()) - .filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class)) - .findFirst() - .orElse(null) - ); + .orElseGet(() -> Arrays.stream(method.getArguments()) + .filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class)) + .findFirst() + .orElse(null)); } - private void configureDeserializers(ExecutableMethod method, DefaultKafkaConsumerConfiguration consumerConfiguration) { - Properties properties = consumerConfiguration.getConfig(); + private void configureDeserializers(final ExecutableMethod method, final DefaultKafkaConsumerConfiguration consumerConfiguration) { + final Properties properties = consumerConfiguration.getConfig(); // figure out the Key deserializer - Argument bodyArgument = findBodyArgument(method); + final Argument bodyArgument = findBodyArgument(method); - if (!properties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { - if (!consumerConfiguration.getKeyDeserializer().isPresent()) { - Optional keyArgument = Arrays.stream(method.getArguments()) - .filter(arg -> arg.isAnnotationPresent(KafkaKey.class)).findFirst(); - - if (keyArgument.isPresent()) { - consumerConfiguration.setKeyDeserializer( - serdeRegistry.pickDeserializer(keyArgument.get()) - ); - } else { - //noinspection SingleStatementInBlock - if (bodyArgument != null && ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) { - Optional> keyType = bodyArgument.getTypeVariable("K"); - if (keyType.isPresent()) { - consumerConfiguration.setKeyDeserializer( - serdeRegistry.pickDeserializer(keyType.get()) - ); - } else { - consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer()); - } + if (!properties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) && !consumerConfiguration.getKeyDeserializer().isPresent()) { + final Optional keyArgument = Arrays.stream(method.getArguments()) + .filter(arg -> arg.isAnnotationPresent(KafkaKey.class)) + .findFirst(); + if (keyArgument.isPresent()) { + consumerConfiguration.setKeyDeserializer(serdeRegistry.pickDeserializer(keyArgument.get())); + } else { + //noinspection SingleStatementInBlock + if (bodyArgument != null && ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) { + final Optional> keyType = bodyArgument.getTypeVariable("K"); + if (keyType.isPresent()) { + consumerConfiguration.setKeyDeserializer(serdeRegistry.pickDeserializer(keyType.get())); } else { consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer()); } + } else { + consumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer()); } } } // figure out the Value deserializer if (!properties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) && !consumerConfiguration.getValueDeserializer().isPresent()) { - - if (bodyArgument != null) { - + if (bodyArgument == null) { + //noinspection SingleStatementInBlock + consumerConfiguration.setValueDeserializer(new StringDeserializer()); + } else { if (ConsumerRecord.class.isAssignableFrom(bodyArgument.getType())) { - Optional> valueType = bodyArgument.getTypeVariable("V"); + final Optional> valueType = bodyArgument.getTypeVariable("V"); if (valueType.isPresent()) { - consumerConfiguration.setValueDeserializer( - serdeRegistry.pickDeserializer(valueType.get()) - ); + consumerConfiguration.setValueDeserializer(serdeRegistry.pickDeserializer(valueType.get())); } else { consumerConfiguration.setValueDeserializer(new StringDeserializer()); } - } else { - boolean batch = method.isTrue(KafkaListener.class, "batch"); - - consumerConfiguration.setValueDeserializer( - serdeRegistry.pickDeserializer(batch ? getComponentType(bodyArgument) : bodyArgument) - ); + final boolean batch = method.isTrue(KafkaListener.class, "batch"); + consumerConfiguration.setValueDeserializer(serdeRegistry.pickDeserializer(batch ? getComponentType(bodyArgument) : bodyArgument)); } - } else { - //noinspection SingleStatementInBlock - consumerConfiguration.setValueDeserializer(new StringDeserializer()); } } + debugDeserializationConfiguration(method, consumerConfiguration, properties); } - private Argument getComponentType(Argument argument) { - Class argumentType = argument.getType(); + private static Argument getComponentType(final Argument argument) { + final Class argumentType = argument.getType(); return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) : argument.getFirstTypeVariable().orElse(argument); } - private OffsetCommitCallback resolveCommitCallback(Object consumerBean) { + private static OffsetCommitCallback resolveCommitCallback(final Object consumerBean) { return (offsets, exception) -> { if (consumerBean instanceof OffsetCommitCallback) { ((OffsetCommitCallback) consumerBean).onComplete(offsets, exception);