Skip to content

Commit

Permalink
Kafka 2.4.0-RC1 (#971)
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo authored and ennru committed Nov 26, 2019
1 parent 06ba87e commit ada44cd
Show file tree
Hide file tree
Showing 18 changed files with 78 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package akka.kafka.benchmarks

import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Arrays, Properties, UUID}
Expand Down Expand Up @@ -39,6 +40,8 @@ private[benchmarks] trait PerfFixtureHelpers extends LazyLogging {

val producerTimeout = 6 minutes
val logPercentStep = 25
val adminClientCloseTimeout = Duration.ofSeconds(5)
val producerCloseTimeout = adminClientCloseTimeout

def randomId(): String = PerfFixtureHelpers.randomId()

Expand All @@ -50,7 +53,7 @@ private[benchmarks] trait PerfFixtureHelpers extends LazyLogging {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost)
val admin = AdminClient.create(props)
val producer = createTopicAndFill(ft, props, admin)
admin.close(5, TimeUnit.SECONDS)
admin.close(adminClientCloseTimeout)
producer
}

Expand All @@ -64,9 +67,9 @@ private[benchmarks] trait PerfFixtureHelpers extends LazyLogging {
logger.info(s"Reusing existing topic $ft")
} else {
val producer = createTopicAndFill(ft, props, admin)
producer.close(5, TimeUnit.SECONDS)
producer.close(producerCloseTimeout)
}
admin.close(5, TimeUnit.SECONDS)
admin.close(adminClientCloseTimeout)
}

private def createTopicAndFill(ft: FilledTopic, props: Properties, admin: AdminClient) = {
Expand Down
67 changes: 24 additions & 43 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ val Scala212 = "2.12.10"
val Scala213 = "2.13.1"
val akkaVersion = if (Nightly) "2.6.0" else "2.5.23"
val AkkaBinaryVersion = if (Nightly) "2.6" else "2.5"
val kafkaVersion = "2.1.1"
val kafkaVersion = "2.4.0"
val embeddedKafkaVersion = kafkaVersion
val kafkaVersionForDocs = "21"
val embeddedKafka = "io.github.seglo" %% "embedded-kafka" % embeddedKafkaVersion // "io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
val embeddedKafkaSchemaRegistry = "5.1.1"
val kafkaVersionForDocs = "24"
val scalatestVersion = "3.0.8"
val testcontainersVersion = "1.12.2"
val slf4jVersion = "1.7.26"
Expand All @@ -30,15 +34,13 @@ val silencer = {
)
}

// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
val embeddedKafkaSchemaRegistry = "5.1.2"

resolvers in ThisBuild ++= Seq(
// for Embedded Kafka
Resolver.bintrayRepo("manub", "maven"),
// for Embedded Kafka 2.4.0
Resolver.bintrayRepo("seglo", "maven"),
// for Jupiter interface (JUnit 5)
Resolver.jcenterRepo
Resolver.jcenterRepo,
// for release candidate builds of Apache Kafka
"Apache Staging" at "https://repository.apache.org/content/groups/staging/"
)

TaskKey[Unit]("verifyCodeStyle") := {
Expand Down Expand Up @@ -221,25 +223,11 @@ lazy val testkit = project
"org.testcontainers" % "kafka" % testcontainersVersion % Provided,
"org.scalatest" %% "scalatest" % scalatestVersion % Provided,
"junit" % "junit" % "4.12" % Provided,
"org.junit.jupiter" % "junit-jupiter-api" % JupiterKeys.junitJupiterVersion.value % Provided
) ++ silencer ++ {
if (scalaBinaryVersion.value == "2.13") Seq()
else
Seq(
"org.apache.kafka" %% "kafka" % kafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12"),
"com.fasterxml.jackson.core" % "jackson-databind" % "2.9.10" % Provided, // Kafka pulls in jackson databind ApacheV2
"org.apache.commons" % "commons-compress" % "1.19" % Provided, // embedded Kafka pulls in Avro which pulls in commons-compress 1.8.1
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion % Provided exclude ("log4j", "log4j")
)
},
Compile / unmanagedSources / excludeFilter := {
if (scalaBinaryVersion.value == "2.13") {
HiddenFileFilter ||
"EmbeddedKafkaLike.scala" ||
"EmbeddedKafkaTest.java" ||
"EmbeddedKafkaJunit4Test.java"
} else (Test / unmanagedSources / excludeFilter).value
},
"org.junit.jupiter" % "junit-jupiter-api" % JupiterKeys.junitJupiterVersion.value % Provided,
"org.apache.kafka" %% "kafka" % embeddedKafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12"),
"org.apache.commons" % "commons-compress" % "1.19" % Provided, // embedded Kafka pulls in Avro which pulls in commons-compress 1.8.1
embeddedKafka % Provided exclude ("log4j", "log4j")
) ++ silencer,
mimaPreviousArtifacts := Set(
organization.value %% name.value % previousStableVersion.value
.getOrElse(throw new Error("Unable to determine previous version"))
Expand Down Expand Up @@ -275,7 +263,8 @@ lazy val tests = project
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test,
// Schema registry uses Glassfish which uses java.util.logging
"org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
"org.mockito" % "mockito-core" % "2.24.5" % Test
"org.mockito" % "mockito-core" % "2.24.5" % Test,
embeddedKafka % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12")
) ++ silencer ++ {
scalaBinaryVersion.value match {
case "2.13" =>
Expand All @@ -293,20 +282,12 @@ lazy val tests = project
Test / parallelExecution := false,
IntegrationTest / parallelExecution := false,
Test / unmanagedSources / excludeFilter := {
if (scalaBinaryVersion.value == "2.13") {
HiddenFileFilter ||
"MultiConsumerSpec.scala" ||
"ReconnectSpec.scala" ||
"EmbeddedKafkaSampleSpec.scala" ||
"TransactionsSpec.scala" ||
"SerializationSpec.scala" ||
"PartitionExamples.scala" ||
"ConnectionCheckerSpec.scala" ||
"EmbeddedKafkaWithSchemaRegistryTest.java" ||
"AssignmentTest.java" ||
"ProducerExampleTest.java" ||
"SerializationTest.java"
} else (Test / unmanagedSources / excludeFilter).value
HiddenFileFilter ||
// TODO: Remove ignore once `"io.github.embeddedkafka" %% "embedded-kafka-schema-registry"` is
// available for Kafka 2.4.0
"SerializationTest.java" ||
"SerializationSpec.scala" ||
"EmbeddedKafkaWithSchemaRegistryTest.java"
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

package akka.kafka.internal
import java.util.concurrent.TimeUnit
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger

import akka.Done
Expand All @@ -15,8 +15,8 @@ import akka.kafka.ProducerSettings
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
import akka.stream.{Attributes, FlowShape, Supervision}
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Supervision}
import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata}

import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -108,7 +108,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
if (producer != null) {
// Discard unsent ProducerRecords after encountering a send-failure in ProducerStage
// https://github.com/akka/alpakka-kafka/pull/318
producer.close(0L, TimeUnit.MILLISECONDS)
producer.close(Duration.ofSeconds(0))
}
failStage(ex)
}
Expand Down Expand Up @@ -215,7 +215,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
producer.close(stage.settings.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
producer.close(Duration.ofMillis(stage.settings.closeTimeout.toMillis))
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,15 @@ private final class TransactionalProducerStageLogic[K, V, P](
maybeCommitTransaction()
}

private def maybeCommitTransaction(beginNewTransaction: Boolean = true): Unit = {
private def maybeCommitTransaction(beginNewTransaction: Boolean = true,
abortEmptyTransactionOnComplete: Boolean = false): Unit = {
val awaitingConf = awaitingConfirmation.get
batchOffsets match {
case batch: NonemptyTransactionBatch if awaitingConf == 0 =>
commitTransaction(batch, beginNewTransaction)
case _: EmptyTransactionBatch if awaitingConf == 0 && abortEmptyTransactionOnComplete =>
log.debug("Aborting empty transaction because we're completing.")
abortTransaction()
case _ if awaitingConf > 0 =>
suspendDemand()
scheduleOnce(commitSchedulerKey, messageDrainInterval)
Expand All @@ -162,7 +166,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
override def onCompletionSuccess(): Unit = {
log.debug("Committing final transaction before shutdown")
cancelTimer(commitSchedulerKey)
maybeCommitTransaction(beginNewTransaction = false)
maybeCommitTransaction(beginNewTransaction = false, abortEmptyTransactionOnComplete = true)
super.onCompletionSuccess()
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/javadsl/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.annotation.ApiMayChange
import akka.kafka.ConsumerMessage.Committable
import akka.kafka.ProducerMessage._
import akka.kafka.{scaladsl, CommitterSettings, ConsumerMessage, ProducerSettings}
import akka.stream.javadsl.{Flow, FlowWithContext, Keep, Sink}
import akka.stream.javadsl.{Flow, FlowWithContext, Sink}
import akka.{japi, Done, NotUsed}
import org.apache.kafka.clients.producer.ProducerRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

package akka.kafka.testkit.internal

import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Arrays, Properties}

import scala.jdk.CollectionConverters._

import akka.actor.ActorSystem
import akka.kafka.testkit.KafkaTestkitSettings
import akka.kafka.{CommitterSettings, ConsumerSettings, ProducerSettings}
Expand Down Expand Up @@ -111,7 +111,7 @@ trait KafkaTestKit {
*/
def cleanUpAdminClient(): Unit =
if (adminClientVar != null) {
adminClientVar.close(60, TimeUnit.SECONDS)
adminClientVar.close(Duration.ofSeconds(60))
adminClientVar = null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package akka.kafka.testkit.scaladsl

import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -56,7 +57,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: A
}

def cleanUp(): Unit = {
if (testProducer ne null) testProducer.close(60, TimeUnit.SECONDS)
if (testProducer ne null) testProducer.close(Duration.ofSeconds(60))
cleanUpAdminClient()
TestKit.shutdownActorSystem(system)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,9 @@ public void onStop(Set<TopicPartition> currentTps, RestrictedConsumer consumer)
assertDone(control.isShutdown());
assertEquals(messageCount, resultOf(control.drainAndShutdown(executor)).size());

assertThat(revoked.get(), is(Collections.emptySet()));
assertThat(assigned.get(), hasItem(tp));
assertThat(stopped.get(), hasItem(tp));
assertThat(revoked.get(), hasItem(tp)); // revoke of partitions occurs after stop
}

@Test
Expand Down
8 changes: 7 additions & 1 deletion tests/src/test/scala/akka/kafka/internal/ConsumerDummy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package akka.kafka.internal

import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -48,6 +50,7 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def commitAsync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
callback: OffsetCommitCallback): Unit = ???
override def seek(partition: TopicPartition, offset: Long): Unit = ???
override def seek(partition: TopicPartition, offsetAndMetadata: OffsetAndMetadata): Unit = ???
override def seekToBeginning(partitions: java.util.Collection[TopicPartition]): Unit = ???
override def seekToEnd(partitions: java.util.Collection[TopicPartition]): Unit = ???
override def position(partition: TopicPartition): Long = ???
Expand Down Expand Up @@ -80,12 +83,15 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
timeout: java.time.Duration): Unit = ???
override def committed(partition: TopicPartition, timeout: java.time.Duration): OffsetAndMetadata = ???
override def committed(partitions: util.Set[TopicPartition]): util.Map[TopicPartition, OffsetAndMetadata] = ???
override def committed(partitions: util.Set[TopicPartition],
timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ???

override def partitionsFor(topic: String, timeout: java.time.Duration): java.util.List[PartitionInfo] = ???
override def listTopics(timeout: java.time.Duration): java.util.Map[String, java.util.List[PartitionInfo]] = ???
override def beginningOffsets(partitions: java.util.Collection[TopicPartition],
timeout: java.time.Duration): java.util.Map[TopicPartition, java.lang.Long] = ???
override def endOffsets(partitions: java.util.Collection[TopicPartition],
timeout: java.time.Duration): java.util.Map[TopicPartition, java.lang.Long] = ???
override def poll(timeout: java.time.Duration): ConsumerRecords[K, V] = ???

}
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ object PartitionedSourceSpec {
log.debug(s"seek($partition, $offset)")
seeks = seeks.updated(partition, offset)
}
def seek(partition: TopicPartition, offsetAndMeta: OffsetAndMetadata): Unit =
override def seek(partition: TopicPartition, offsetAndMeta: OffsetAndMetadata): Unit =
seek(partition, offsetAndMeta.offset)
override def paused(): java.util.Set[TopicPartition] = tpsPaused.asJava
override def pause(partitions: java.util.Collection[TopicPartition]): Unit = {
Expand Down
16 changes: 9 additions & 7 deletions tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package akka.kafka.internal

import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.CompletableFuture

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffset}
Expand All @@ -30,9 +30,9 @@ import org.mockito.stubbing.Answer
import org.mockito.verification.VerificationMode
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}

import scala.jdk.CollectionConverters._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

class ProducerSpec(_system: ActorSystem)
Expand Down Expand Up @@ -555,7 +555,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu
}
})
Mockito
.when(result.close(mockito.ArgumentMatchers.any[Long], mockito.ArgumentMatchers.any[TimeUnit]))
.when(result.close(mockito.ArgumentMatchers.any[java.time.Duration]))
.thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock) =
closed = true
Expand All @@ -570,14 +570,16 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu

def verifyClosed() = {
Mockito.verify(mock).flush()
Mockito.verify(mock).close(mockito.ArgumentMatchers.any[Long], mockito.ArgumentMatchers.any[TimeUnit])
Mockito.verify(mock).close(mockito.ArgumentMatchers.any[java.time.Duration])
}

def verifyForceClosedInCallback() = {
val inOrder = Mockito.inOrder(mock)
inOrder.verify(mock, atLeastOnce()).close(mockito.ArgumentMatchers.eq(0L), mockito.ArgumentMatchers.any[TimeUnit])
// the force close from the async callback `failStageCb`
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[java.time.Duration])
// the flush and close from `closeProducer`
inOrder.verify(mock).flush()
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[Long], mockito.ArgumentMatchers.any[TimeUnit])
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[java.time.Duration])
}

def verifyNoMoreInteractions() =
Expand Down Expand Up @@ -608,7 +610,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu
val inOrder = Mockito.inOrder(mock)
inOrder.verify(mock).abortTransaction()
inOrder.verify(mock).flush()
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[Long], mockito.ArgumentMatchers.any[TimeUnit])
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[java.time.Duration])
}
}

Expand Down
Loading

0 comments on commit ada44cd

Please sign in to comment.