Skip to content

Commit

Permalink
Merge pull request #129 from ovotech/no-internal-storage
Browse files Browse the repository at this point in the history
Change to not store records internally in the actor
  • Loading branch information
vlovgr authored May 9, 2019
2 parents 8400d62 + d941864 commit 75ef99f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 95 deletions.
150 changes: 64 additions & 86 deletions src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,40 +209,25 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
.updateAndGet(_.withRebalancing(true))
.flatMap { state =>
val fetches = state.fetches.keySetStrict
val records = state.records.keySetStrict

val revokedFetches = revoked intersect fetches
val withRecords = records intersect revokedFetches
val withoutRecords = revokedFetches diff records

val logRevoked =
log(RevokedPartitions(revoked, state))

val completeWithRecords =
if (withRecords.nonEmpty) {
state.fetches.filterKeysStrictList(withRecords).traverse {
case (partition, partitionFetches) =>
val records = Chunk.vector(state.records(partition).toVector)
partitionFetches.values.toList.traverse(_.completeRevoked(records))
} >> ref
.updateAndGet(_.withoutFetchesAndRecords(withRecords))
.log(RevokedFetchesWithRecords(state.records.filterKeysStrict(withRecords), _))
} else F.unit

val completeWithoutRecords =
if (withoutRecords.nonEmpty) {
if (revokedFetches.nonEmpty) {
state.fetches
.filterKeysStrictValuesList(withoutRecords)
.filterKeysStrictValuesList(revokedFetches)
.traverse(_.values.toList.traverse(_.completeRevoked(Chunk.empty))) >>
ref
.updateAndGet(_.withoutFetches(withoutRecords))
.log(RevokedFetchesWithoutRecords(withoutRecords, _))
.updateAndGet(_.withoutFetches(revokedFetches))
.log(RevokedFetchesWithoutRecords(revokedFetches, _))
} else F.unit

val onRevoked =
state.onRebalances.foldLeft(F.unit)(_ >> _.onRevoked(revoked))

logRevoked >> completeWithRecords >> completeWithoutRecords >> onRevoked
logRevoked >> completeWithoutRecords >> onRevoked
}

private[this] def assignment(
Expand Down Expand Up @@ -335,70 +320,76 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
F.suspend[Either[KafkaConsumerRecords, ConsumerRecords]] {
val assigned = consumer.assignment.toSet
val requested = state.fetches.keySetStrict
val available = state.records.keySetStrict

val resume = (requested intersect assigned) diff available
val pause = assigned diff resume
if (requested.nonEmpty) {
val resume = requested intersect assigned
val pause = assigned diff resume

if (pause.nonEmpty)
consumer.pause(pause.asJava)

if (resume.nonEmpty)
consumer.resume(resume.asJava)

val batch =
consumer.poll(pollTimeout)

if (pause.nonEmpty)
consumer.pause(pause.asJava)
if (settings.shiftDeserialization)
records(batch).map(_.asRight)
else F.pure(batch.asLeft)
} else {
if (assigned.nonEmpty)
consumer.pause(assigned.asJava)

if (resume.nonEmpty)
consumer.resume(resume.asJava)
val batch =
consumer.poll(Duration.ZERO)

val batch = consumer.poll(pollTimeout)
if (settings.shiftDeserialization)
records(batch).map(_.asRight)
else F.pure(batch.asLeft)
if (batch.isEmpty)
F.pure(Right(Map.empty))
else
F.raiseError(
new IllegalStateException(
s"Received [${batch.count}] unexpected records"
)
)
}
}
}.flatMap {
case Left(batch) => records(batch)
case Right(records) => F.pure(records)
case Left(batch) => records(batch)
}

def handleBatch(newRecords: ConsumerRecords): F[Unit] =
ref.get.flatMap { state =>
if (state.fetches.isEmpty) {
if (newRecords.isEmpty) F.unit
else {
ref
.updateAndGet(_.withRecords(newRecords))
.log(StoredRecords(newRecords, _))
if (newRecords.nonEmpty) {
ref.get.flatMap { state =>
val canBeCompleted = newRecords.keySetStrict

val checkUnexpected = {
val unexpected = canBeCompleted diff state.fetches.keySetStrict
if (unexpected.isEmpty) F.unit
else
F.raiseError(
new IllegalStateException(
s"Received unexpected records for partitions [${unexpected.mkString(", ")}]"
)
)
}
} else {
val allRecords = state.records combine newRecords

if (allRecords.nonEmpty) {
val requested = state.fetches.keySetStrict

val canBeCompleted = allRecords.keySetStrict intersect requested
val canBeStored = newRecords.keySetStrict diff canBeCompleted

val complete =
if (canBeCompleted.nonEmpty) {
state.fetches.filterKeysStrictList(canBeCompleted).traverse {
case (partition, fetches) =>
val records = Chunk.vector(allRecords(partition).toVector)
fetches.values.toList.traverse(_.completeRecords(records))
} >> ref
.updateAndGet(_.withoutFetchesAndRecords(canBeCompleted))
.log(
CompletedFetchesWithRecords(allRecords.filterKeysStrict(canBeCompleted), _)
)
} else F.unit

val store =
if (canBeStored.nonEmpty) {
val storeRecords = newRecords.filterKeysStrict(canBeStored)
ref
.updateAndGet(_.withRecords(storeRecords))
.log(StoredRecords(storeRecords, _))
} else F.unit

complete >> store
} else F.unit

val completeFetches =
state.fetches.filterKeysStrictList(canBeCompleted).traverse {
case (partition, fetches) =>
val records = Chunk.vector(newRecords(partition).toVector)
fetches.values.toList.traverse(_.completeRecords(records))
}

val removeFetches =
ref
.updateAndGet(_.withoutFetches(canBeCompleted))
.log(CompletedFetchesWithRecords(newRecords, _))

checkUnexpected >> completeFetches >> removeFetches
}
}
} else F.unit

def handlePendingCommits(initialRebalancing: Boolean): F[Unit] =
ref.get.flatMap { state =>
Expand Down Expand Up @@ -454,7 +445,6 @@ private[kafka] object KafkaConsumerActor {

final case class State[F[_], K, V](
fetches: Map[TopicPartition, Map[StreamId, FetchRequest[F, K, V]]],
records: Map[TopicPartition, NonEmptyVector[CommittableMessage[F, K, V]]],
pendingCommits: Chain[Request.Commit[F, K, V]],
onRebalances: Chain[OnRebalance[F, K, V]],
rebalancing: Boolean,
Expand Down Expand Up @@ -492,17 +482,6 @@ private[kafka] object KafkaConsumerActor {
def withoutFetches(partitions: Set[TopicPartition]): State[F, K, V] =
copy(fetches = fetches.filterKeysStrict(!partitions.contains(_)))

def withRecords(
records: Map[TopicPartition, NonEmptyVector[CommittableMessage[F, K, V]]]
): State[F, K, V] =
copy(records = this.records combine records)

def withoutFetchesAndRecords(partitions: Set[TopicPartition]): State[F, K, V] =
copy(
fetches = fetches.filterKeysStrict(!partitions.contains(_)),
records = records.filterKeysStrict(!partitions.contains(_))
)

def withPendingCommit(pendingCommit: Request.Commit[F, K, V]): State[F, K, V] =
copy(pendingCommits = pendingCommits append pendingCommit)

Expand All @@ -529,15 +508,14 @@ private[kafka] object KafkaConsumerActor {
append(fs.mkString("[", ", ", "]"))
}("", ", ", "")

s"State(fetches = Map($fetchesString), records = Map(${recordsString(records)}), pendingCommits = $pendingCommits, onRebalances = $onRebalances, rebalancing = $rebalancing, subscribed = $subscribed, streaming = $streaming)"
s"State(fetches = Map($fetchesString), pendingCommits = $pendingCommits, onRebalances = $onRebalances, rebalancing = $rebalancing, subscribed = $subscribed, streaming = $streaming)"
}
}

object State {
def empty[F[_], K, V]: State[F, K, V] =
State(
fetches = Map.empty,
records = Map.empty,
pendingCommits = Chain.empty,
onRebalances = Chain.empty,
rebalancing = false,
Expand Down
9 changes: 0 additions & 9 deletions src/main/scala/fs2/kafka/internal/LogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,6 @@ private[kafka] object LogEntry {
s"Completed fetches with records for partitions [${recordsString(records)}]. Current state [$state]."
}

final case class RevokedFetchesWithRecords[F[_], K, V](
records: Records[F, K, V],
state: State[F, K, V]
) extends LogEntry {
override def level: LogLevel = Debug
override def message: String =
s"Revoked fetches with records for partitions [${recordsString(records)}]. Current state [$state]."
}

final case class RevokedFetchesWithoutRecords[F[_], K, V](
partitions: Set[TopicPartition],
state: State[F, K, V]
Expand Down

0 comments on commit 75ef99f

Please sign in to comment.