Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial logging #113

Merged
merged 1 commit into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fs2.concurrent.Queue
import fs2.kafka.internal.KafkaConsumerActor._
import fs2.kafka.internal.instances._
import fs2.kafka.internal.syntax._
import fs2.kafka.internal.{KafkaConsumerActor, Synchronized}
import fs2.kafka.internal._
import fs2.{Chunk, Stream}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -339,7 +339,8 @@ private[kafka] object KafkaConsumer {
requests: Queue[F, Request[F, K, V]],
settings: ConsumerSettings[K, V],
actor: Fiber[F, Unit],
polls: Fiber[F, Unit]
polls: Fiber[F, Unit],
id: Int
)(implicit F: Concurrent[F]): KafkaConsumer[F, K, V] =
new KafkaConsumer[F, K, V] {
override val fiber: Fiber[F, Unit] = {
Expand Down Expand Up @@ -606,7 +607,7 @@ private[kafka] object KafkaConsumer {
}

override def toString: String =
"KafkaConsumer$" + System.identityHashCode(this)
"KafkaConsumer$" + id
}

def consumerResource[F[_], K, V](
Expand All @@ -620,20 +621,24 @@ private[kafka] object KafkaConsumer {
Resource.liftF(Queue.bounded[F, Request[F, K, V]](1)).flatMap { polls =>
Resource.liftF(Ref.of[F, State[F, K, V]](State.empty)).flatMap { ref =>
Resource.liftF(Jitter.default[F]).flatMap { implicit jitter =>
executionContextResource(settings).flatMap { executionContext =>
createConsumer(settings, executionContext).flatMap { synchronized =>
val actor =
new KafkaConsumerActor(
settings = settings,
executionContext = executionContext,
ref = ref,
requests = requests,
synchronized = synchronized
)

startConsumerActor(requests, polls, actor).flatMap { actor =>
startPollScheduler(polls, settings.pollInterval).map { polls =>
createKafkaConsumer(requests, settings, actor, polls)
Resource.liftF(F.delay(new Object().hashCode)).flatMap { id =>
Resource.liftF(Logging.default[F](id)).flatMap { implicit logging =>
executionContextResource(settings).flatMap { executionContext =>
createConsumer(settings, executionContext).flatMap { synchronized =>
val actor =
new KafkaConsumerActor(
settings = settings,
executionContext = executionContext,
ref = ref,
requests = requests,
synchronized = synchronized
)

startConsumerActor(requests, polls, actor).flatMap { actor =>
startPollScheduler(polls, settings.pollInterval).map { polls =>
createKafkaConsumer(requests, settings, actor, polls, id)
}
}
}
}
}
Expand Down
84 changes: 70 additions & 14 deletions src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import fs2.kafka._
import fs2.kafka.internal.KafkaConsumerActor._
import fs2.kafka.internal.instances._
import fs2.kafka.internal.syntax._
import fs2.kafka.internal.LogEntry._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -69,9 +70,12 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
)(
implicit F: ConcurrentEffect[F],
context: ContextShift[F],
logging: Logging[F],
jitter: Jitter[F],
timer: Timer[F]
) {
import logging._

private[this] def withConsumer[A](f: Consumer[K, V] => F[A]): F[A] =
synchronized.use { consumer =>
context.evalOn(executionContext) {
Expand Down Expand Up @@ -148,8 +152,11 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](

subscribe
.flatTap {
case Left(_) => F.unit
case Right(_) => ref.update(_.asSubscribed)
case Left(_) => F.unit
case Right(_) =>
ref
.updateAndGet(_.asSubscribed)
.log(SubscribedTopics(topics, _))
}
.flatMap(deferred.complete)
}
Expand All @@ -170,8 +177,11 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](

subscribe
.flatTap {
case Left(_) => F.unit
case Right(_) => ref.update(_.asSubscribed)
case Left(_) => F.unit
case Right(_) =>
ref
.updateAndGet(_.asSubscribed)
.log(SubscribedPattern(pattern, _))
}
.flatMap(deferred.complete)
}
Expand Down Expand Up @@ -254,7 +264,9 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
}

def storeFetch =
ref.update(_.withFetch(partition, deferred))
ref
.updateAndGet(_.withFetch(partition, deferred))
.log(StoredFetch(partition, deferred, _))

def completeRevoked =
deferred.complete((Chunk.empty, FetchCompletedReason.TopicPartitionRevoked))
Expand Down Expand Up @@ -285,7 +297,10 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
}

private[this] def assigned(assigned: NonEmptySet[TopicPartition]): F[Unit] =
ref.get.flatMap(_.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned)))
ref.get.flatMap { state =>
log(AssignedPartitions(assigned, state)) >>
state.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned))
}

private[this] def revoked(revoked: NonEmptySet[TopicPartition]): F[Unit] =
ref.get.flatMap { state =>
Expand All @@ -296,27 +311,34 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
val withRecords = records intersect revokedFetches
val withoutRecords = revokedFetches diff records

val logRevoked =
log(RevokedPartitions(revoked, state))

val completeWithRecords =
if (withRecords.nonEmpty) {
state.fetches.filterKeysStrictList(withRecords).traverse {
case (partition, partitionFetches) =>
val records = Chunk.buffer(state.records(partition))
partitionFetches.traverse(_.completeRevoked(records))
} >> ref.update(_.withoutFetchesAndRecords(withRecords))
} >> ref
.updateAndGet(_.withoutFetchesAndRecords(withRecords))
.log(RevokedFetchesWithRecords(state.records.filterKeysStrict(withRecords), _))
} else F.unit

val completeWithoutRecords =
if (withoutRecords.nonEmpty) {
state.fetches
.filterKeysStrictValuesList(withoutRecords)
.traverse(_.traverse(_.completeRevoked(Chunk.empty))) >>
ref.update(_.withoutFetches(withoutRecords))
ref
.updateAndGet(_.withoutFetches(withoutRecords))
.log(RevokedFetchesWithoutRecords(withoutRecords, _))
} else F.unit

val onRevoked =
state.onRebalances.foldLeft(F.unit)(_ >> _.onRevoked(revoked))

completeWithRecords >> completeWithoutRecords >> onRevoked
logRevoked >> completeWithRecords >> completeWithoutRecords >> onRevoked
}

private[this] def assignment(
Expand All @@ -330,7 +352,11 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
} else F.pure(Left(NotSubscribedException()))

val withOnRebalance =
onRebalance.fold(F.unit)(on => ref.update(_.withOnRebalance(on)))
onRebalance.fold(F.unit) { on =>
ref
.updateAndGet(_.withOnRebalance(on))
.log(StoredOnRebalance(on, _))
}

assigned.flatMap(deferred.complete) >> withOnRebalance
}
Expand Down Expand Up @@ -425,7 +451,12 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
ref.get.flatMap { state =>
if (state.fetches.isEmpty) {
if (batch.isEmpty) F.unit
else ref.update(_.withRecords(records(batch)))
else {
val storeRecords = records(batch)
ref
.updateAndGet(_.withRecords(storeRecords))
.log(StoredRecords(storeRecords, _))
}
} else {
val newRecords = records(batch)
val allRecords = state.records combine newRecords
Expand All @@ -442,12 +473,17 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
case (partition, fetches) =>
val records = Chunk.buffer(allRecords(partition))
fetches.traverse(_.completeRecords(records))
} >> ref.update(_.withoutFetchesAndRecords(canBeCompleted))
} >> ref
.updateAndGet(_.withoutFetchesAndRecords(canBeCompleted))
.log(CompletedFetchesWithRecords(allRecords.filterKeysStrict(canBeCompleted), _))
} else F.unit

val store =
if (canBeStored.nonEmpty) {
ref.update(_.withRecords(newRecords.filterKeysStrict(canBeStored)))
val storeRecords = newRecords.filterKeysStrict(canBeStored)
ref
.updateAndGet(_.withRecords(storeRecords))
.log(StoredRecords(storeRecords, _))
} else F.unit

complete >> store
Expand Down Expand Up @@ -491,6 +527,9 @@ private[kafka] object KafkaConsumerActor {

def completeRecords(chunk: Chunk[CommittableMessage[F, K, V]]): F[Unit] =
deferred.complete((chunk, FetchCompletedReason.FetchedRecords))

override def toString: String =
"FetchRequest$" + System.identityHashCode(this)
}

final case class State[F[_], K, V](
Expand Down Expand Up @@ -526,6 +565,20 @@ private[kafka] object KafkaConsumerActor {

def asSubscribed: State[F, K, V] =
if (subscribed) this else copy(subscribed = true)

override def toString: String = {
val fetchesString =
fetches.toList
.sortBy { case (tp, _) => tp }
.mkStringAppend {
case (append, (tp, fs)) =>
append(tp.toString)
append(" -> ")
append(fs.mkString("[", ", ", "]"))
}("", ", ", "")

s"State(fetches = Map($fetchesString), records = Map(${recordsString(records)}), onRebalances = $onRebalances, subscribed = $subscribed)"
}
}

object State {
Expand Down Expand Up @@ -554,7 +607,10 @@ private[kafka] object KafkaConsumerActor {
final case class OnRebalance[F[_], K, V](
onAssigned: NonEmptySet[TopicPartition] => F[Unit],
onRevoked: NonEmptySet[TopicPartition] => F[Unit]
)
) {
override def toString: String =
"OnRebalance$" + System.identityHashCode(this)
}

sealed abstract class Request[F[_], K, V]

Expand Down
Loading