Skip to content

Commit

Permalink
Force checkpoint before shutting down terminated shards
Browse files Browse the repository at this point in the history
  • Loading branch information
easel committed Oct 15, 2019
1 parent 6df65b0 commit 3dcc4d8
Showing 1 changed file with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,14 @@ import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{
IRecordProcessor,
IShutdownNotificationAware
}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{IRecordProcessor, IShutdownNotificationAware}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ShutdownReason, Worker}
import com.amazonaws.services.kinesis.clientlibrary.types._
import com.amazonaws.services.kinesis.model.Record
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{
GracefulShutdown,
ProcessEvents,
ProcessingComplete
}
import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{GracefulShutdown, ProcessEvents, ProcessingComplete}
import com.weightwatchers.reactive.kinesis.models.{CompoundSequenceNumber, ConsumerEvent}
import org.joda.time.{DateTime, DateTimeZone}

import scala.collection.JavaConverters._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down Expand Up @@ -129,9 +121,15 @@ private[consumer] class ConsumerProcessingManager(
}

override def shutdown(shutdownInput: ShutdownInput): Unit = {
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}"
)
if(shutdownInput.getShutdownReason == ShutdownReason.TERMINATE) {
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}. Forcing checkpoint."
)
shutdownInput.getCheckpointer.checkpoint()
} else
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}"
)
shutdown(shutdownInput.getCheckpointer)
}

Expand Down

0 comments on commit 3dcc4d8

Please sign in to comment.