Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alpakka Kakfa suddenly stopped consuming without errors. #1264

Open
kkalavantavanich opened this issue Nov 25, 2020 · 3 comments
Open

Alpakka Kakfa suddenly stopped consuming without errors. #1264

kkalavantavanich opened this issue Nov 25, 2020 · 3 comments

Comments

@kkalavantavanich
Copy link

Versions used

Scala version: 2.12.11
Akka version: 2.6.3
Alpakka-Kafka version: 2.0.2

Expected Behavior

Please describe the expected behavior of the issue, starting from the first action.

  1. Start the application which starts the consumer
  2. Consumer continues to consume message.

Actual Behavior

Please provide a description of what actually happens, working from the same starting point.

  1. Start the application which starts the Kafka consumer.
  2. Consumer stop consuming message.

Relevant logs

Application logs was "committing xxx" (normal behavior) then it stopped coming without any errors.
Application was deployed in docker and it didn't stop.
Syslogs looks normal.
Kafka audit logs looks normal.
Other consumers still working and able to consume from Kafka.
Lag for this consumer group increased.

Reproducible Test Case

My understanding of this bug is minimal. Bug is not reproducible. Please help provide provide possible ways to reproduce the bug.

Relevant code

class KafkaConsumer(
    settings: KafkaConsumerSettings,
    transactionService: TransactionService
)(implicit system: ActorSystem, executor: ExecutionContext) {
  val config: Config = settings.consumerConfig
  val schemaRegistry: String = "schemaregistry.company.com"
  private var consumerControl: Option[Consumer.Control] = None
  def consumerSettings: ConsumerSettings[String, Message] = {
    ConsumerSettings(config, new StringDeserializer, new CustomMessageDeserializer)
      .withBootstrapServers(settings.brokers)
      .withGroupId(settings.groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, settings.resetStart)
      .withProperty(CustomMessageDeserializer.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry)
      .withProperty(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "com.company.deserializer.CustomMessageDeserializer"
      )
      .withProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, settings.metadataMaxAge)
      .withProperty(SaslConfigs.SASL_JAAS_CONFIG, s"""org.apache.kafka.common.security.scram.ScramLoginModule required
                                                     |    username="user"
                                                     |    password="passw0rd";
                                                     |""".stripMargin)
  }
  def run: Consumer.Control = {
    val control = consumer(consumerSettings).run()
    consumerControl = Some(control)
    control
  }
  private def consumer(consumerSettings: ConsumerSettings[String, Message]): RunnableGraph[DrainingControl[Done]] =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topicPattern(settings.topic))
      .mapAsync(settings.workers)(KafkaMessageHandler(transactionService.handleTransactionMessage))
      .groupedWithin(settings.batchSize, 1.millis)
      .map(CommittableOffsetBatch(_))
      .mapAsync(settings.workers)(offset => {
        logger.debug(s"committing ${offset.getOffsets}")
        //TODO - migrate to undeprecated version
        offset.commitScaladsl()
      })
      .toMat(Sink.ignore)(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
  def stop: Unit = {
    consumerControl.map(_.shutdown())
  }
}
@mrubin
Copy link

mrubin commented Jan 13, 2021

We have been plagued by this as well. No changes other than swapping from Kafka's Java SDK to alpakka-kafka.

@seglo
Copy link
Contributor

seglo commented Jan 13, 2021

There's likely an exception that's being swallowed. This is usually a result of the materialized value of the stream not being handled correctly.

In OP's case they should handle the Future[Done] in DrainingControl.isShutdown. This will resolve when the stream shuts down due to an error within the stream, or from without when shutting down the stream using the draining control itself (DrainingControl.shutdown).

@sandeepjindal
Copy link

Thanks @seglo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants