Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change to not store records internally in the actor #129

Merged
merged 1 commit into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 64 additions & 86 deletions src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,40 +209,25 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
.updateAndGet(_.withRebalancing(true))
.flatMap { state =>
val fetches = state.fetches.keySetStrict
val records = state.records.keySetStrict

val revokedFetches = revoked intersect fetches
val withRecords = records intersect revokedFetches
val withoutRecords = revokedFetches diff records

val logRevoked =
log(RevokedPartitions(revoked, state))

val completeWithRecords =
if (withRecords.nonEmpty) {
state.fetches.filterKeysStrictList(withRecords).traverse {
case (partition, partitionFetches) =>
val records = Chunk.vector(state.records(partition).toVector)
partitionFetches.values.toList.traverse(_.completeRevoked(records))
} >> ref
.updateAndGet(_.withoutFetchesAndRecords(withRecords))
.log(RevokedFetchesWithRecords(state.records.filterKeysStrict(withRecords), _))
} else F.unit

val completeWithoutRecords =
if (withoutRecords.nonEmpty) {
if (revokedFetches.nonEmpty) {
state.fetches
.filterKeysStrictValuesList(withoutRecords)
.filterKeysStrictValuesList(revokedFetches)
.traverse(_.values.toList.traverse(_.completeRevoked(Chunk.empty))) >>
ref
.updateAndGet(_.withoutFetches(withoutRecords))
.log(RevokedFetchesWithoutRecords(withoutRecords, _))
.updateAndGet(_.withoutFetches(revokedFetches))
.log(RevokedFetchesWithoutRecords(revokedFetches, _))
} else F.unit

val onRevoked =
state.onRebalances.foldLeft(F.unit)(_ >> _.onRevoked(revoked))

logRevoked >> completeWithRecords >> completeWithoutRecords >> onRevoked
logRevoked >> completeWithoutRecords >> onRevoked
}

private[this] def assignment(
Expand Down Expand Up @@ -335,70 +320,76 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
F.suspend[Either[KafkaConsumerRecords, ConsumerRecords]] {
val assigned = consumer.assignment.toSet
val requested = state.fetches.keySetStrict
val available = state.records.keySetStrict

val resume = (requested intersect assigned) diff available
val pause = assigned diff resume
if (requested.nonEmpty) {
val resume = requested intersect assigned
val pause = assigned diff resume

if (pause.nonEmpty)
consumer.pause(pause.asJava)

if (resume.nonEmpty)
consumer.resume(resume.asJava)

val batch =
consumer.poll(pollTimeout)

if (pause.nonEmpty)
consumer.pause(pause.asJava)
if (settings.shiftDeserialization)
records(batch).map(_.asRight)
else F.pure(batch.asLeft)
} else {
if (assigned.nonEmpty)
consumer.pause(assigned.asJava)

if (resume.nonEmpty)
consumer.resume(resume.asJava)
val batch =
consumer.poll(Duration.ZERO)

val batch = consumer.poll(pollTimeout)
if (settings.shiftDeserialization)
records(batch).map(_.asRight)
else F.pure(batch.asLeft)
if (batch.isEmpty)
F.pure(Right(Map.empty))
else
F.raiseError(
new IllegalStateException(
s"Received [${batch.count}] unexpected records"
)
)
}
}
}.flatMap {
case Left(batch) => records(batch)
case Right(records) => F.pure(records)
case Left(batch) => records(batch)
}

def handleBatch(newRecords: ConsumerRecords): F[Unit] =
ref.get.flatMap { state =>
if (state.fetches.isEmpty) {
if (newRecords.isEmpty) F.unit
else {
ref
.updateAndGet(_.withRecords(newRecords))
.log(StoredRecords(newRecords, _))
if (newRecords.nonEmpty) {
ref.get.flatMap { state =>
val canBeCompleted = newRecords.keySetStrict

val checkUnexpected = {
val unexpected = canBeCompleted diff state.fetches.keySetStrict
if (unexpected.isEmpty) F.unit
else
F.raiseError(
new IllegalStateException(
s"Received unexpected records for partitions [${unexpected.mkString(", ")}]"
)
)
}
} else {
val allRecords = state.records combine newRecords

if (allRecords.nonEmpty) {
val requested = state.fetches.keySetStrict

val canBeCompleted = allRecords.keySetStrict intersect requested
val canBeStored = newRecords.keySetStrict diff canBeCompleted

val complete =
if (canBeCompleted.nonEmpty) {
state.fetches.filterKeysStrictList(canBeCompleted).traverse {
case (partition, fetches) =>
val records = Chunk.vector(allRecords(partition).toVector)
fetches.values.toList.traverse(_.completeRecords(records))
} >> ref
.updateAndGet(_.withoutFetchesAndRecords(canBeCompleted))
.log(
CompletedFetchesWithRecords(allRecords.filterKeysStrict(canBeCompleted), _)
)
} else F.unit

val store =
if (canBeStored.nonEmpty) {
val storeRecords = newRecords.filterKeysStrict(canBeStored)
ref
.updateAndGet(_.withRecords(storeRecords))
.log(StoredRecords(storeRecords, _))
} else F.unit

complete >> store
} else F.unit

val completeFetches =
state.fetches.filterKeysStrictList(canBeCompleted).traverse {
case (partition, fetches) =>
val records = Chunk.vector(newRecords(partition).toVector)
fetches.values.toList.traverse(_.completeRecords(records))
}

val removeFetches =
ref
.updateAndGet(_.withoutFetches(canBeCompleted))
.log(CompletedFetchesWithRecords(newRecords, _))

checkUnexpected >> completeFetches >> removeFetches
}
}
} else F.unit

def handlePendingCommits(initialRebalancing: Boolean): F[Unit] =
ref.get.flatMap { state =>
Expand Down Expand Up @@ -454,7 +445,6 @@ private[kafka] object KafkaConsumerActor {

final case class State[F[_], K, V](
fetches: Map[TopicPartition, Map[StreamId, FetchRequest[F, K, V]]],
records: Map[TopicPartition, NonEmptyVector[CommittableMessage[F, K, V]]],
pendingCommits: Chain[Request.Commit[F, K, V]],
onRebalances: Chain[OnRebalance[F, K, V]],
rebalancing: Boolean,
Expand Down Expand Up @@ -492,17 +482,6 @@ private[kafka] object KafkaConsumerActor {
def withoutFetches(partitions: Set[TopicPartition]): State[F, K, V] =
copy(fetches = fetches.filterKeysStrict(!partitions.contains(_)))

def withRecords(
records: Map[TopicPartition, NonEmptyVector[CommittableMessage[F, K, V]]]
): State[F, K, V] =
copy(records = this.records combine records)

def withoutFetchesAndRecords(partitions: Set[TopicPartition]): State[F, K, V] =
copy(
fetches = fetches.filterKeysStrict(!partitions.contains(_)),
records = records.filterKeysStrict(!partitions.contains(_))
)

def withPendingCommit(pendingCommit: Request.Commit[F, K, V]): State[F, K, V] =
copy(pendingCommits = pendingCommits append pendingCommit)

Expand All @@ -529,15 +508,14 @@ private[kafka] object KafkaConsumerActor {
append(fs.mkString("[", ", ", "]"))
}("", ", ", "")

s"State(fetches = Map($fetchesString), records = Map(${recordsString(records)}), pendingCommits = $pendingCommits, onRebalances = $onRebalances, rebalancing = $rebalancing, subscribed = $subscribed, streaming = $streaming)"
s"State(fetches = Map($fetchesString), pendingCommits = $pendingCommits, onRebalances = $onRebalances, rebalancing = $rebalancing, subscribed = $subscribed, streaming = $streaming)"
}
}

object State {
def empty[F[_], K, V]: State[F, K, V] =
State(
fetches = Map.empty,
records = Map.empty,
pendingCommits = Chain.empty,
onRebalances = Chain.empty,
rebalancing = false,
Expand Down
9 changes: 0 additions & 9 deletions src/main/scala/fs2/kafka/internal/LogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,6 @@ private[kafka] object LogEntry {
s"Completed fetches with records for partitions [${recordsString(records)}]. Current state [$state]."
}

final case class RevokedFetchesWithRecords[F[_], K, V](
records: Records[F, K, V],
state: State[F, K, V]
) extends LogEntry {
override def level: LogLevel = Debug
override def message: String =
s"Revoked fetches with records for partitions [${recordsString(records)}]. Current state [$state]."
}

final case class RevokedFetchesWithoutRecords[F[_], K, V](
partitions: Set[TopicPartition],
state: State[F, K, V]
Expand Down