Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long-lived transactions #1156

Open
wants to merge 6 commits into
base: series/2.x
Choose a base branch
from

Conversation

vitoracle
Copy link

Enhancement

We've had to use Kafka transactions at work and, fortunately, fs2-kafka has a pretty nice API through TransactionalKafkaProducer (thanks :D).

However, in our use case, we had to hold the Kafka transaction for a long period of time since we were producing substantial amounts of records into Kafka. We also preferred if that operation is atomic.

Unfortunately, it seems to me that TransactionalKafkaProducer's produce method begins and commits/aborts a transaction only for one chunk of ProducerRecords. Unluckily for us, this is not enough as we'd be receiving chunks periodically (from a stream).

Proposal

API

An API such as:

for {
  producer <- TransactionalKafkaProducer.stream { ... }
  _        <- Stream.eval(producer.beginTransaction)
  _        <- Stream.eval(producer.produce { ... })
  _        <- Stream.eval(producer.commitTransaction)
} yield ()

would suffice.

However, this can easily introduce invalid state as the consumer of this API could, for example, attempt to commit/abort a transaction that does not even exist. Also, the user could forget to commit/abort the transaction, intentionally or not.

A better approach would be to create a Transaction class, that is similar to a TransactionalKafkaProducer, with the exception that it can only be obtained within a Resource context. This ensures that when the resource is released, the transaction is always handled.

This is what is achieved here with:

/**
   * Creates a new [[Transaction]] in the `Resource` context. This operation will block until the previous
   * transaction is finished, if any. Once this resource is released, the transaction and offsets will be committed,
   * or aborted if an exception is thrown.
*/
def createTransaction: Resource[F, Transaction.WithoutOffsets[F, K, V]]

Example:

(for {
  producer    <- TransactionalKafkaProducer.resource { ... }
  transaction <- producer.createTransaction // blocks until acquired 
} yield transaction).use { transaction => 
  transaction.produce { ... } *> transaction.produce { ... }
}

We also of course need to ensure the resource is only acquired when the Semaphore has one permit. For that, I had to make an internal change, exposing WithTransactionalProducer's internal Semaphore:

private[kafka] sealed abstract class WithTransactionalProducer[F[_]] {
   ...

  def semaphore: Semaphore[F]
}

We do have ExclusiveAccess[F, A] however it does not suffice as it only grants the permit for one operation (F[A] => F[A]).

Batch committing

Within Transaction's implementation, a Ref[F, CommittableOffsetBatch[F]] is used to store all the batches to be committed once the resource is released. Batches are merged together in every call to produce.

Transaction leaks

Leaks are possible. This is an example from a test that causes them:

for {
  globalStateRef <- IO
    .ref[Option[Transaction.WithoutOffsets[IO, String, String]]](None)
  makeProducer = TransactionalKafkaProducer.resource(
    TransactionalProducerSettings(
      s"id-$topic",
      producerSettings[IO]
        .withRetries(Int.MaxValue)
    )
  )
  _ <- makeProducer.flatMap(producer => producer.createTransaction).use {
    transaction =>
      globalStateRef.set(Some(transaction))
      // once this resource is released, the transaction will finalize
  }
  toProduce = ProducerRecords.one(ProducerRecord(topic, "key-0", "value-0"))
  _ <- globalStateRef.get.flatMap(_.get.produceWithoutOffsets(toProduce)) // by now, the transaction is already over, so this throws TransactionLeakedException
} yield ()

Within Transaction's implementation, a Ref[F, Boolean] is used to indicate whether or not a transaction has been closed. If there's a call to produce when the transaction is over, it throws TransactionLeakedException.

Would be nice if you have any inputs to make this (if possible) not happen.

All in all, this approach is pretty much identical to what zio-kafka currently does with their transaction implementation. But I'd like to hear your opinion on how we can make this better.

@bplommer bplommer requested review from vlovgr and LMnet March 24, 2023 16:18
@bplommer
Copy link
Member

Thanks for this! It looks really thoughtful - much appreciated. I'm a bit (very, very) behind with core library maintenance so I want to catch up on that first, but I'll try to look at this properly soon.

@vitoracle
Copy link
Author

@bplommer No worries! We are already using this in production but it'd be cool if it was integrated in the library so we don't have to maintain a fork.

Seems like the CICD fails during header checks, is that something I can fix?

@bplommer
Copy link
Member

Seems like the CICD fails during header checks, is that something I can fix?

I've updated it against the base branch, that should fix this.

@bplommer
Copy link
Member

Oh there are some new files that the update didn't touch - you need to run sbt headerCreate to add the missing headers.

@vitoracle
Copy link
Author

@bplommer Should work now. At least now it works locally, unless I'm missing something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants