diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java index 7d147a876..20ca5582e 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java @@ -117,7 +117,6 @@ class KafkaClientIntroductionAdvice implements MethodInterceptor this.conversionService = conversionService; } - @SuppressWarnings("unchecked") @Override public final Object intercept(MethodInvocationContext context) { if (context.hasAnnotation(KafkaClient.class)) { @@ -126,23 +125,27 @@ public final Object intercept(MethodInvocationContext context) { } ProducerState producerState = getProducer(context); - InterceptedMethod interceptedMethod = InterceptedMethod.of(context); + InterceptedMethod interceptedMethod = InterceptedMethod.of(context, beanContext.getConversionService()); try { Argument returnType = interceptedMethod.returnTypeValue(); if (Argument.OBJECT_ARGUMENT.equalsType(returnType)) { returnType = Argument.of(RecordMetadata.class); } switch (interceptedMethod.resultType()) { - case COMPLETION_STAGE: + case COMPLETION_STAGE -> { CompletableFuture completableFuture = returnCompletableFuture(context, producerState, returnType); return interceptedMethod.handleResult(completableFuture); - case PUBLISHER: + } + case PUBLISHER -> { Flux returnFlowable = returnPublisher(context, producerState, returnType); return interceptedMethod.handleResult(returnFlowable); - case SYNCHRONOUS: + } + case SYNCHRONOUS -> { return returnSynchronous(context, producerState); - default: + } + default -> { return interceptedMethod.unsupported(); + } } } catch (Exception e) { return interceptedMethod.handleException(e); @@ -408,7 +411,7 @@ private Flux buildSendFlux(MethodInvocationContext conte } private Flux buildSendFluxForReactiveValue(MethodInvocationContext context, ProducerState producerState, Argument returnType, Object value) { - Flux valueFlowable = Flux.from(Publishers.convertPublisher(value, Publisher.class)); + Flux valueFlowable = Flux.from(Publishers.convertPublisher(beanContext.getConversionService(), value, Publisher.class)); Class javaReturnType = returnType.getType(); if (Iterable.class.isAssignableFrom(javaReturnType)) { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index 96fe02ce4..737bf359d 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -535,7 +535,7 @@ private boolean processConsumerRecords(final ConsumerState consumerState, final Flux resultFlowable; final boolean isBlocking; if (Publishers.isConvertibleToPublisher(result)) { - resultFlowable = Flux.from(Publishers.convertPublisher(result, Publisher.class)); + resultFlowable = Flux.from(Publishers.convertPublisher(beanContext.getConversionService(), result, Publisher.class)); isBlocking = method.hasAnnotation(Blocking.class); } else { resultFlowable = Flux.just(result); @@ -660,7 +660,7 @@ private boolean processConsumerRecordsAsBatch(final ConsumerState consumerState, if (result instanceof Iterable iterable) { resultFlux = Flux.fromIterable(iterable); } else if (isPublisher) { - resultFlux = Flux.from(Publishers.convertPublisher(result, Publisher.class)); + resultFlux = Flux.from(Publishers.convertPublisher(beanContext.getConversionService(), result, Publisher.class)); } else { resultFlux = Flux.just(result); }