diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala index 520ac34c3..b9f5aeeaf 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/ConsumerGroupsAlgebra.scala @@ -156,7 +156,7 @@ object ConsumerGroupsAlgebra { def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long = offsetMap.get(partition) match { - case Some(value) => value + 1.toLong + case Some(value) => value + 1.toLong //Adding offsets by 1 as Kafka Admin algebra's getLatestOffsets method is having the same behaviour. case _ => 0 } @@ -211,10 +211,11 @@ object ConsumerGroupsAlgebra { } _ <- fs2.Stream.awakeEvery[F](lagPublishInterval).evalMap(_ => getLagOnDvsInternalCGTopic.flatMap( lagInfo => Logger[F].info( - s"""Total Offset Lag on ${dvsConsumersTopic} = ${lagInfo.totalLag.toString} , - Lag percentage = ${lagInfo.lagPercentage.toString} , - Total_Group_Offset = ${lagInfo.totalGroupOffset} , - Total_Largest_Offset = ${lagInfo.totalLargestOffset}""" + s"Lag details on ${dvsConsumersTopic}. Total_Offset_Lag = ${lagInfo.totalLag.toString}, " + + s"Lag_percentage = ${lagInfo.lagPercentage.toString}, " + + s"Total_Group_Offset = ${lagInfo.totalGroupOffset}, " + + s"Total_Largest_Offset = ${lagInfo.totalLargestOffset}, " + + s"Total_active_partitions = ${Option(lagInfo.partitionOffsets).map(_.size).getOrElse(0)}" ))).compile.drain } yield () }