Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaListener OffsetStrategy.DISABLED Manual Acknowledgement Failure (Reactive Type) #103

Closed
brianwyka opened this issue Mar 6, 2020 · 7 comments

Comments

@brianwyka
Copy link
Contributor

brianwyka commented Mar 6, 2020

With a @KafkaListener that has offset strategy diabled, when using the reactive approach, the Acknowledgement.ack() produces an exception when called within the @KafkaListener.

@KafkaListener(groupId = "archive-upload", offsetStrategy = OffsetStrategy.DISABLED)
public class ArchiveUploadMessageProcessor {

    private final ArchiveService archiveService;

    @Topic("${kafka-topics.archive-upload}")
    public Single<String> processMessage(@Body final PayloadArchiveUploadMessage payloadArchiveUploadMessage, final Acknowledgement acknowledgement) {
        val payloadKey = payloadArchiveUploadMessage.getPayloadRequest().getPayloadKey();
        val payloadKeyString = PayloadKeyFormatter.asString(payloadKey);
        return archive(payloadArchiveUploadMessage, payloadKeyString, acknowledgement)
                .doOnSuccess(payloadUrl -> acknowledgement.ack());
    }

Below is the exception:

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

My assumption is the error is triggered from the lambda implementation of Acknowledgement below in KafkaConsumerProcessor:

if (ackArg.isPresent()) {
boundArguments.put(ackArg.get(), (KafkaAcknowledgement) () -> kafkaConsumer.commitSync(
currentOffsets
));
}

Relevant consumer thread configuration:

micronaut:
  executors:
    consumer:
      type: fixed
      nThreads: ${KAFKA_CONSUMER_THREAD_COUNT:4}
@brianwyka
Copy link
Contributor Author

Perhaps its happening in a different thread since its chained in the reactive stream?

@brianwyka brianwyka changed the title KafkaListener OffsetStrategy.DISABLED Manual Acknowledgement Failure KafkaListener OffsetStrategy.DISABLED Manual Acknowledgement Failure (Reactive Type) Mar 6, 2020
@brianwyka
Copy link
Contributor Author

After reading around the documentation a bit, I was able to find this:

Alternatively, you can use the @Blocking annotation to tell Micronaut to subscribe to the returned reactive type in a blocking manner which will result in blocking the poll loop, preventing offsets from being committed automatically:

After adding the @Blocking annotation, its working as expected, however, kind of defeats the purpose of using reactive style of programming in the first place. Any way this can be achieved without @Blocking annotation on the kafka listener?

@ctoestreich
Copy link
Contributor

ctoestreich commented Mar 6, 2020

@brianwyka Not sure that it will change the exception, but are you trying to bee reactive on the way IN or the way OUT or both? Just writing pseudo code here from your above example.

 public void processMessage(@Body final Single<PayloadArchiveUploadMessage> payloadArchiveUploadMessage, final Acknowledgement c) {
            payloadArchiveUploadMessage.doOnSuccess((payload) -> {
            val payloadKey = payload.getPayloadRequest().getPayloadKey();
            val payloadKeyString = PayloadKeyFormatter.asString(payloadKey);
            archive(payload, payloadKeyString) //call out to the archive process?  Is this reative too?
            acknowledgement.ack();
       });
    }

Also should you synchronize the method?

@graemerocher
Copy link
Contributor

Kafka consumers are inherently single threaded so you have to always acknowledge the message on the same thread as the listener executed on. That means waiting for all other publishers to complete

@brianwyka
Copy link
Contributor Author

brianwyka commented Mar 7, 2020

@graemerocher is there any way to bind a method argument which would be a callback to the publishers being completed which is a Supplier<KafkaConsumer> or Supplier<Acknowledgement> that way it can be executed at the appropriate time and in the correct thread? Or perhaps, a configuration on the annotation to ackOnSuccess ?

@brianwyka
Copy link
Contributor Author

brianwyka commented Mar 7, 2020

@ctoestreich I am trying to be reactive on the way out, such that Micronaut treats it as a non-blocking subscription. I could also take the @Body parameter as a Single as well. Not sure what the benefit of that is, however... The archive method call inside the method is also reactive, so the example above would not work since it would never be subscribed to...

@brianwyka
Copy link
Contributor Author

Thinking about it more, seems like it would be non-trivial to have some sort of callback (in reactive style) which executes in the same thread as the polling loop...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants