Skip to content

Commit

Permalink
Merge pull request #902 from fd4s/untyped-actor
Browse files Browse the repository at this point in the history
Move deserialization from KafkaConsumerActor to KafkaConsumer
  • Loading branch information
bplommer authored Mar 15, 2022
2 parents 5aa90d7 + 988b508 commit 19f5210
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 129 deletions.
61 changes: 43 additions & 18 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ object KafkaConsumer {
}
}(_.cancel)

private def startConsumerActor[F[_], K, V](
requests: QueueSource[F, Request[F, K, V]],
private def startConsumerActor[F[_]](
requests: QueueSource[F, Request[F]],
polls: QueueSource[F, Request.Poll[F]],
actor: KafkaConsumerActor[F, K, V]
actor: KafkaConsumerActor[F]
)(
implicit F: Async[F]
): Resource[F, FakeFiber[F]] =
Expand All @@ -115,26 +115,26 @@ object KafkaConsumer {
}

private def createKafkaConsumer[F[_], K, V](
requests: QueueSink[F, Request[F, K, V]],
requests: QueueSink[F, Request[F]],
settings: ConsumerSettings[F, K, V],
actor: FakeFiber[F],
polls: FakeFiber[F],
keyDes: Deserializer[F, K],
valueDes: Deserializer[F, V],
actor: KafkaConsumerActor[F],
fiber: FakeFiber[F],
streamIdRef: Ref[F, StreamId],
id: Int,
withConsumer: WithConsumer[F],
stopConsumingDeferred: Deferred[F, Unit]
)(implicit F: Async[F]): KafkaConsumer[F, K, V] =
new KafkaConsumer[F, K, V] {

private val fiber: FakeFiber[F] = actor.combine(polls)

override def partitionsMapStream
: Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]] = {
val chunkQueue: F[Queue[F, Option[Chunk[CommittableConsumerRecord[F, K, V]]]]] =
Queue.bounded(settings.maxPrefetchBatches - 1)

type PartitionRequest =
(Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason)
(Chunk[KafkaByteConsumerRecord], FetchCompletedReason)

type PartitionsMap = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]
type PartitionsMapQueue = Queue[F, Option[PartitionsMap]]
Expand All @@ -161,6 +161,23 @@ object KafkaConsumer {
.void
stopReqs <- Deferred[F, Unit]
} yield Stream.eval {
def committableConsumerRecord(
record: ConsumerRecord[K, V],
partition: TopicPartition
): CommittableConsumerRecord[F, K, V] =
CommittableConsumerRecord(
record = record,
offset = CommittableOffset(
topicPartition = partition,
consumerGroupId = actor.consumerGroupId,
offsetAndMetadata = new OffsetAndMetadata(
record.offset + 1L,
settings.recordMetadata(record)
),
commit = actor.offsetCommit
)
)

def fetchPartition(deferred: Deferred[F, PartitionRequest]): F[Unit] = {
val request = Request.Fetch(
partition,
Expand All @@ -173,7 +190,15 @@ object KafkaConsumer {
stopReqs.complete(()).void

case Right((chunk, reason)) =>
val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty)
val c = chunk.traverse[F, CommittableConsumerRecord[F, K, V]] { rec =>
ConsumerRecord
.fromJava[F, K, V](rec, keyDes, valueDes)
.map(committableConsumerRecord(_, partition))
}

val enqueueChunk = c.flatMap { chunk =>
chunks.offer(Some(chunk)).unlessA(chunk.isEmpty)
}

val completeRevoked =
stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked)
Expand Down Expand Up @@ -370,7 +395,7 @@ object KafkaConsumer {
}

private[this] def request[A](
request: (Either[Throwable, A] => F[Unit]) => Request[F, K, V]
request: (Either[Throwable, A] => F[Unit]) => Request[F]
): F[A] =
Deferred[F, Either[Throwable, A]].flatMap { deferred =>
requests.offer(request(deferred.complete(_).void)) >>
Expand Down Expand Up @@ -597,9 +622,9 @@ object KafkaConsumer {
id <- Resource.eval(F.delay(new Object().hashCode))
jitter <- Resource.eval(Jitter.default[F])
logging <- Resource.eval(Logging.default[F](id))
requests <- Resource.eval(Queue.unbounded[F, Request[F, K, V]])
requests <- Resource.eval(Queue.unbounded[F, Request[F]])
polls <- Resource.eval(Queue.bounded[F, Request.Poll[F]](1))
ref <- Resource.eval(Ref.of[F, State[F, K, V]](State.empty))
ref <- Resource.eval(Ref.of[F, State[F]](State.empty))
streamId <- Resource.eval(Ref.of[F, StreamId](0))
dispatcher <- Dispatcher[F]
stopConsumingDeferred <- Resource.eval(Deferred[F, Unit])
Expand All @@ -609,22 +634,22 @@ object KafkaConsumer {
implicit val logging0: Logging[F] = logging
implicit val dispatcher0: Dispatcher[F] = dispatcher

new KafkaConsumerActor(
new KafkaConsumerActor[F](
settings = settings,
keyDeserializer = keyDeserializer,
valueDeserializer = valueDeserializer,
ref = ref,
requests = requests,
withConsumer = withConsumer
)
}
actor <- startConsumerActor(requests, polls, actor)
actorFiber <- startConsumerActor(requests, polls, actor)
polls <- startPollScheduler(polls, settings.pollInterval)
} yield createKafkaConsumer(
requests,
settings,
keyDeserializer,
valueDeserializer,
actor,
polls,
actorFiber.combine(polls),
streamId,
id,
withConsumer,
Expand Down
Loading

0 comments on commit 19f5210

Please sign in to comment.