Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
Added a comment and Added active partition count in the logger
Browse files Browse the repository at this point in the history
  • Loading branch information
abhivermaaa committed Jun 10, 2024
1 parent 8212ae2 commit 74f962d
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ object ConsumerGroupsAlgebra {

def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long =
offsetMap.get(partition) match {
case Some(value) => value + 1.toLong
case Some(value) => value + 1.toLong //Adding offsets by 1 as Kafka Admin algebra's getLatestOffsets method is having the same behaviour.
case _ => 0
}

Expand Down Expand Up @@ -211,10 +211,11 @@ object ConsumerGroupsAlgebra {
}
_ <- fs2.Stream.awakeEvery[F](lagPublishInterval).evalMap(_ => getLagOnDvsInternalCGTopic.flatMap(
lagInfo => Logger[F].info(
s"""Total Offset Lag on ${dvsConsumersTopic} = ${lagInfo.totalLag.toString} ,
Lag percentage = ${lagInfo.lagPercentage.toString} ,
Total_Group_Offset = ${lagInfo.totalGroupOffset} ,
Total_Largest_Offset = ${lagInfo.totalLargestOffset}"""
s"Lag details on ${dvsConsumersTopic}. Total_Offset_Lag = ${lagInfo.totalLag.toString}, " +
s"Lag_percentage = ${lagInfo.lagPercentage.toString}, " +
s"Total_Group_Offset = ${lagInfo.totalGroupOffset}, " +
s"Total_Largest_Offset = ${lagInfo.totalLargestOffset}, " +
s"Total_active_partitions = ${Option(lagInfo.partitionOffsets).map(_.size).getOrElse(0)}"
))).compile.drain
} yield ()
}
Expand Down

0 comments on commit 74f962d

Please sign in to comment.