From 3dcc4d8512c809240d71c9f24fa1592aae745cff Mon Sep 17 00:00:00 2001 From: Erik LaBianca Date: Mon, 14 Oct 2019 22:09:37 -0400 Subject: [PATCH] Force checkpoint before shutting down terminated shards Fixes #37,#63 --- .../consumer/ConsumerProcessingManager.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerProcessingManager.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerProcessingManager.scala index 607f3ce..6a8708d 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerProcessingManager.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerProcessingManager.scala @@ -22,22 +22,14 @@ import akka.actor.ActorRef import akka.pattern.ask import akka.util.Timeout import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{ - IRecordProcessor, - IShutdownNotificationAware -} -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{IRecordProcessor, IShutdownNotificationAware} +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ShutdownReason, Worker} import com.amazonaws.services.kinesis.clientlibrary.types._ import com.amazonaws.services.kinesis.model.Record import com.typesafe.scalalogging.LazyLogging -import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{ - GracefulShutdown, - ProcessEvents, - ProcessingComplete -} +import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{GracefulShutdown, ProcessEvents, ProcessingComplete} import com.weightwatchers.reactive.kinesis.models.{CompoundSequenceNumber, ConsumerEvent} import org.joda.time.{DateTime, DateTimeZone} - import scala.collection.JavaConverters._ import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{Await, ExecutionContext, Future} @@ -129,9 +121,15 @@ private[consumer] class ConsumerProcessingManager( } override def shutdown(shutdownInput: ShutdownInput): Unit = { - logger.info( - s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}" - ) + if(shutdownInput.getShutdownReason == ShutdownReason.TERMINATE) { + logger.info( + s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}. Forcing checkpoint." + ) + shutdownInput.getCheckpointer.checkpoint() + } else + logger.info( + s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}" + ) shutdown(shutdownInput.getCheckpointer) }