Skip to content

Commit

Permalink
Revert "Merge pull request #902 from fd4s/untyped-actor"
Browse files Browse the repository at this point in the history
This reverts commit 19f5210, reversing
changes made to 5aa90d7.
  • Loading branch information
bplommer committed May 13, 2022
1 parent 64eaa1d commit 75c5568
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 116 deletions.
54 changes: 13 additions & 41 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ object KafkaConsumer {
}
}(_.cancel)

private def startConsumerActor[F[_]](
requests: QueueSource[F, Request[F]],
private def startConsumerActor[F[_], K, V](
requests: QueueSource[F, Request[F, K, V]],
polls: QueueSource[F, Request.Poll[F]],
actor: KafkaConsumerActor[F]
actor: KafkaConsumerActor[F, K, V]
)(
implicit F: Async[F]
): Resource[F, FakeFiber[F]] =
Expand All @@ -116,11 +116,9 @@ object KafkaConsumer {
}

private def createKafkaConsumer[F[_], K, V](
requests: QueueSink[F, Request[F]],
requests: QueueSink[F, Request[F, K, V]],
settings: ConsumerSettings[F, K, V],
keyDes: Deserializer[F, K],
valueDes: Deserializer[F, V],
actor: KafkaConsumerActor[F],
actor: KafkaConsumerActor[F, K, V],
fiber: FakeFiber[F],
streamIdRef: Ref[F, StreamId],
id: Int,
Expand All @@ -135,7 +133,7 @@ object KafkaConsumer {
Queue.bounded(settings.maxPrefetchBatches - 1)

type PartitionResult =
(Chunk[KafkaByteConsumerRecord], FetchCompletedReason)
(Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason)

type PartitionsMap = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]
type PartitionsMapQueue = Queue[F, Option[PartitionsMap]]
Expand All @@ -162,23 +160,6 @@ object KafkaConsumer {
.void
stopReqs <- Deferred[F, Unit]
} yield Stream.eval {
def committableConsumerRecord(
record: ConsumerRecord[K, V],
partition: TopicPartition
): CommittableConsumerRecord[F, K, V] =
CommittableConsumerRecord(
record = record,
offset = CommittableOffset(
topicPartition = partition,
consumerGroupId = actor.consumerGroupId,
offsetAndMetadata = new OffsetAndMetadata(
record.offset + 1L,
settings.recordMetadata(record)
),
commit = actor.offsetCommit
)
)

def fetchPartition: F[Unit] = F.deferred[PartitionResult].flatMap { deferred =>
val callback: PartitionResult => F[Unit] =
deferred.complete(_).void
Expand Down Expand Up @@ -206,21 +187,12 @@ object KafkaConsumer {

assigned.ifM(storeFetch, completeRevoked)
} >> deferred.get

F.race(shutdown, fetch).flatMap {
case Left(()) =>
stopReqs.complete(()).void

case Right((chunk, reason)) =>
val c = chunk.traverse[F, CommittableConsumerRecord[F, K, V]] { rec =>
ConsumerRecord
.fromJava[F, K, V](rec, keyDes, valueDes)
.map(committableConsumerRecord(_, partition))
}

val enqueueChunk = c.flatMap { chunk =>
chunks.offer(Some(chunk)).unlessA(chunk.isEmpty)
}
val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty)

val completeRevoked =
stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked)
Expand Down Expand Up @@ -403,7 +375,7 @@ object KafkaConsumer {
}

private[this] def request[A](
request: (Either[Throwable, A] => F[Unit]) => Request[F]
request: (Either[Throwable, A] => F[Unit]) => Request[F, K, V]
): F[A] =
Deferred[F, Either[Throwable, A]].flatMap { deferred =>
requests.offer(request(deferred.complete(_).void)) >>
Expand Down Expand Up @@ -661,9 +633,9 @@ object KafkaConsumer {
id <- Resource.eval(F.delay(new Object().hashCode))
jitter <- Resource.eval(Jitter.default[F])
logging <- Resource.eval(Logging.default[F](id))
requests <- Resource.eval(Queue.unbounded[F, Request[F]])
requests <- Resource.eval(Queue.unbounded[F, Request[F, K, V]])
polls <- Resource.eval(Queue.bounded[F, Request.Poll[F]](1))
ref <- Resource.eval(Ref.of[F, State[F]](State.empty))
ref <- Resource.eval(Ref.of[F, State[F, K, V]](State.empty))
streamId <- Resource.eval(Ref.of[F, StreamId](0))
dispatcher <- Dispatcher[F]
stopConsumingDeferred <- Resource.eval(Deferred[F, Unit])
Expand All @@ -673,8 +645,10 @@ object KafkaConsumer {
implicit val logging0: Logging[F] = logging
implicit val dispatcher0: Dispatcher[F] = dispatcher

new KafkaConsumerActor[F](
new KafkaConsumerActor(
settings = settings,
keyDeserializer = keyDeserializer,
valueDeserializer = valueDeserializer,
ref = ref,
requests = requests,
withConsumer = withConsumer
Expand All @@ -685,8 +659,6 @@ object KafkaConsumer {
} yield createKafkaConsumer(
requests,
settings,
keyDeserializer,
valueDeserializer,
actor,
actorFiber.combine(polls),
streamId,
Expand Down
Loading

0 comments on commit 75c5568

Please sign in to comment.