Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source doesn't resume partitions after a failed seek() operation #1333

Open
gygabyte opened this issue Mar 4, 2021 · 2 comments
Open

Source doesn't resume partitions after a failed seek() operation #1333

gygabyte opened this issue Mar 4, 2021 · 2 comments

Comments

@gygabyte
Copy link

gygabyte commented Mar 4, 2021

A typical use case in Kafka to achieve Exactly-Once semantics when consuming messages is to store offset external to kafka atomically with appropriate state.. for that the Alpakka library provides the committablePartitionedManualOffsetSource source where offsets to start consuming from are provided through onAssign function which result is a Future with all TopicPartition->Offset(Long) assigned to this consumer.

However, due to the async nature of this operation, a rebalance can occur in between the previous assignment and the Future with offsets being completed.
The source will attempt to do a seek to all these TopicPartition(s), but some of them will no longer be assigned to this kafka consumer.
https://github.com/akka/alpakka-kafka/blob/master/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala#L154

This results on a failure (on the internal KafkaConsumerActor) that is sent back to the SubSourceLogic actor.. However, the SubSourceLogic is only expecting a AskTimeoutException and not a Failure(Throwable) message..

https://github.com/akka/alpakka-kafka/blob/master/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala#L157
https://github.com/akka/alpakka-kafka/blob/master/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala#L287

The end result is the Source does not resume the sources, but also doesn't fail the consumer/source. It is a silent failure and there is no indication that we will not be able to recover from this.

I can suggest two approaches for this:

  • recover by failing the Source... However, this is will cause more rebalances and potentially again the same race condition and so on.. this will go on until no rebalances happen in-between the two operations (assign and Future completion).. also this may cause unnecessary thrashing/load on the external offset store where the consumers are retrieving the offsets from
.recover {
    case _: Exception => 
       stageFailCB.invoke(
                new ConsumerFailed(
                  s"$idLogPrefix Consumer failed during seek for partitions: ${offsets.keys.mkString(", ")}."
                )
              )
}
  • Ignore the seek() failures on the KafkaConsumerActor and just send back Done when completed.. I believe this would also be a suitable approach and doesn't cause more rebalances/changes for the race condition to keep occurring.
case Seek(offsets) =>
      offsets.foreach {
        case (tp, offset) =>
          try {
            consumer.seek(tp, offset)
          } catch {
            case NonFatal(e) =>
              log.warning("seek failed on consumer from {}, {} -> {}}", sender(), tp, offset)
          }
      }
      sender() ! Done

I am running on 2.11 and 2.0.7 version.. It would be great if we could have other release for scala 2.11.
This is a critical issue that has huge impacts on volatile environments (running consumers on AWS spot instances) where consumers might come and go at will..

@seglo
Copy link
Member

seglo commented Mar 9, 2021

Thanks for raising the issue. What exception is raised when the seek fails? I think it would be acceptable to log a warning for such failures and carry on. Do you have some time to put up a PR?

@gygabyte
Copy link
Author

Sorry for the delay getting back to you.

the exception is

java.lang.IllegalStateException: No current assignment for partition topic-0

I should have some time, but need to go through some internal legal stuff before submitting a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants