Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
Added changes to ingestors/kafka/src/main/scala/hydra/kafka/algebras/…
Browse files Browse the repository at this point in the history
…ConsumerGroupsAlgebra.scala
  • Loading branch information
abhivermaaa committed May 8, 2024
1 parent d9a0390 commit 5052eda
Showing 1 changed file with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ object ConsumerGroupsAlgebra {
override def getConsumersForTopic(topicName: String): F[TopicConsumers] =
consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName)))


override def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]] = {

for {
groupOffsetsFromOffsetStream <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset())

// TODO: To be optimized
largestOffsets <- kAA.getLatestOffsets(dvsConsumersTopic.value)
.map(_.map(k => PartitionOffset
(
Expand Down Expand Up @@ -311,18 +313,15 @@ private object ConsumerGroupsStorageFacade {

private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) {

def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = {
val res = this.copy(this.offsetMap + (key -> value))
println(this.offsetMap)
res
}
def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade =
this.copy(this.offsetMap + (key -> value))

def getAllPartitionOffset(): Map[Partition, Offset] = {
def getAllPartitionOffset(): Map[Partition, Offset] =
this.offsetMap
}

def removeOffset(key: Partition): ConsumerGroupsOffsetFacade =
this.copy(this.offsetMap - key)
this.copy(this.offsetMap - key)

}

private object ConsumerGroupsOffsetFacade {
Expand Down

0 comments on commit 5052eda

Please sign in to comment.