Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random cancellation of calls in KafkaAdminClient #986

Closed
vshalts opened this issue May 24, 2022 · 1 comment · Fixed by #1017
Closed

Random cancellation of calls in KafkaAdminClient #986

vshalts opened this issue May 24, 2022 · 1 comment · Fixed by #1017
Assignees
Labels
bug Something isn't working
Milestone

Comments

@vshalts
Copy link

vshalts commented May 24, 2022

Hi, I found a bug in the implementation of cancelToken (https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala#L198). It breaks the contract of Async.async cancellation token. What the current implementation is actually doing is immediately executing cancelation and creating a token to cancel this cancelation. Therefore, cancellation competes with the completion of the future in https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala#L215. It causes the cancellation of requests to the admin client without any reason and random behavior.

Here is example that illustrates the issue and proposed fixed version:

import cats.effect.syntax.all._
import cats.syntax.all._
import cats.effect._
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.internals.KafkaFutureImpl

import scala.concurrent.duration._

object TestFs2 extends IOApp {

  implicit final class KafkaFutureSyntax[A](
      private val future: KafkaFuture[A]
  ) extends AnyVal {

    def cancelToken[F[_]](implicit F: Async[F]): F[Option[F[Unit]]] =
      F.blocking { future.cancel(true); () }.start.map(_.cancel.some)

    def cancelTokenFixed[F[_]](implicit F: Async[F]): F[Option[F[Unit]]] =
      F.delay(F.blocking { future.cancel(true); () }.start.void.some)

  }

  override def run(args: List[String]): IO[ExitCode] = {
    for {
      _ <- IO.println("Testing original fs2 cancelToken")
      f  = new KafkaFutureImpl[String]()
      _ <- f.cancelToken[IO] // create cancel token, but never call it
      _ <- IO.sleep(1.seconds)
      _ <- IO.println(f.isCancelled.toString) // true?? bug is here, future is canceled without calling cancel token

      _     <- IO.println("Testing fixed cancelToken")
      f      = new KafkaFutureImpl[String]()
      token <- f.cancelTokenFixed[IO] // create cancel token, but do not call it yet
      _     <- IO.sleep(1.seconds)
      _     <- IO.println(f.isCancelled.toString) // false, as expected
      _     <- token.getOrElse(IO.unit) // call cancellation token (if we have it)
      _     <- IO.sleep(1.seconds)
      _     <- IO.println(f.isCancelled.toString) // true, as expected

    } yield ExitCode.Success
  }

}

Can you please fix it and release the version without this bug. Thanks!

@bplommer bplommer self-assigned this Jul 13, 2022
@bplommer bplommer added the bug Something isn't working label Jul 13, 2022
@bplommer
Copy link
Member

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

Successfully merging a pull request may close this issue.

2 participants