Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka 2.4.0-RC1 #971

Merged
merged 1 commit into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package akka.kafka.benchmarks

import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Arrays, Properties, UUID}
Expand Down Expand Up @@ -39,6 +40,8 @@ private[benchmarks] trait PerfFixtureHelpers extends LazyLogging {

val producerTimeout = 6 minutes
val logPercentStep = 25
val adminClientCloseTimeout = Duration.ofSeconds(5)
val producerCloseTimeout = adminClientCloseTimeout

def randomId(): String = PerfFixtureHelpers.randomId()

Expand All @@ -50,7 +53,7 @@ private[benchmarks] trait PerfFixtureHelpers extends LazyLogging {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost)
val admin = AdminClient.create(props)
val producer = createTopicAndFill(ft, props, admin)
admin.close(5, TimeUnit.SECONDS)
admin.close(adminClientCloseTimeout)
producer
}

Expand All @@ -64,9 +67,9 @@ private[benchmarks] trait PerfFixtureHelpers extends LazyLogging {
logger.info(s"Reusing existing topic $ft")
} else {
val producer = createTopicAndFill(ft, props, admin)
producer.close(5, TimeUnit.SECONDS)
producer.close(producerCloseTimeout)
}
admin.close(5, TimeUnit.SECONDS)
admin.close(adminClientCloseTimeout)
}

private def createTopicAndFill(ft: FilledTopic, props: Properties, admin: AdminClient) = {
Expand Down
67 changes: 24 additions & 43 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ val Scala212 = "2.12.10"
val Scala213 = "2.13.1"
val akkaVersion = if (Nightly) "2.6.0" else "2.5.23"
val AkkaBinaryVersion = if (Nightly) "2.6" else "2.5"
val kafkaVersion = "2.1.1"
val kafkaVersion = "2.4.0"
val embeddedKafkaVersion = kafkaVersion
val kafkaVersionForDocs = "21"
val embeddedKafka = "io.github.seglo" %% "embedded-kafka" % embeddedKafkaVersion // "io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
val embeddedKafkaSchemaRegistry = "5.1.1"
val kafkaVersionForDocs = "24"
val scalatestVersion = "3.0.8"
val testcontainersVersion = "1.12.2"
val slf4jVersion = "1.7.26"
Expand All @@ -30,15 +34,13 @@ val silencer = {
)
}

// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
val embeddedKafkaSchemaRegistry = "5.1.2"

resolvers in ThisBuild ++= Seq(
// for Embedded Kafka
Resolver.bintrayRepo("manub", "maven"),
// for Embedded Kafka 2.4.0
Resolver.bintrayRepo("seglo", "maven"),
// for Jupiter interface (JUnit 5)
Resolver.jcenterRepo
Resolver.jcenterRepo,
// for release candidate builds of Apache Kafka
"Apache Staging" at "https://repository.apache.org/content/groups/staging/"
)

TaskKey[Unit]("verifyCodeStyle") := {
Expand Down Expand Up @@ -221,25 +223,11 @@ lazy val testkit = project
"org.testcontainers" % "kafka" % testcontainersVersion % Provided,
"org.scalatest" %% "scalatest" % scalatestVersion % Provided,
"junit" % "junit" % "4.12" % Provided,
"org.junit.jupiter" % "junit-jupiter-api" % JupiterKeys.junitJupiterVersion.value % Provided
) ++ silencer ++ {
if (scalaBinaryVersion.value == "2.13") Seq()
else
Seq(
"org.apache.kafka" %% "kafka" % kafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12"),
"com.fasterxml.jackson.core" % "jackson-databind" % "2.9.10" % Provided, // Kafka pulls in jackson databind ApacheV2
"org.apache.commons" % "commons-compress" % "1.19" % Provided, // embedded Kafka pulls in Avro which pulls in commons-compress 1.8.1
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion % Provided exclude ("log4j", "log4j")
)
},
Compile / unmanagedSources / excludeFilter := {
if (scalaBinaryVersion.value == "2.13") {
HiddenFileFilter ||
"EmbeddedKafkaLike.scala" ||
"EmbeddedKafkaTest.java" ||
"EmbeddedKafkaJunit4Test.java"
} else (Test / unmanagedSources / excludeFilter).value
},
"org.junit.jupiter" % "junit-jupiter-api" % JupiterKeys.junitJupiterVersion.value % Provided,
"org.apache.kafka" %% "kafka" % embeddedKafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12"),
"org.apache.commons" % "commons-compress" % "1.19" % Provided, // embedded Kafka pulls in Avro which pulls in commons-compress 1.8.1
embeddedKafka % Provided exclude ("log4j", "log4j")
) ++ silencer,
mimaPreviousArtifacts := Set(
organization.value %% name.value % previousStableVersion.value
.getOrElse(throw new Error("Unable to determine previous version"))
Expand Down Expand Up @@ -275,7 +263,8 @@ lazy val tests = project
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test,
// Schema registry uses Glassfish which uses java.util.logging
"org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
"org.mockito" % "mockito-core" % "2.24.5" % Test
"org.mockito" % "mockito-core" % "2.24.5" % Test,
embeddedKafka % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12")
) ++ silencer ++ {
scalaBinaryVersion.value match {
case "2.13" =>
Expand All @@ -293,20 +282,12 @@ lazy val tests = project
Test / parallelExecution := false,
IntegrationTest / parallelExecution := false,
Test / unmanagedSources / excludeFilter := {
if (scalaBinaryVersion.value == "2.13") {
HiddenFileFilter ||
"MultiConsumerSpec.scala" ||
"ReconnectSpec.scala" ||
"EmbeddedKafkaSampleSpec.scala" ||
"TransactionsSpec.scala" ||
"SerializationSpec.scala" ||
"PartitionExamples.scala" ||
"ConnectionCheckerSpec.scala" ||
"EmbeddedKafkaWithSchemaRegistryTest.java" ||
"AssignmentTest.java" ||
"ProducerExampleTest.java" ||
"SerializationTest.java"
} else (Test / unmanagedSources / excludeFilter).value
HiddenFileFilter ||
// TODO: Remove ignore once `"io.github.embeddedkafka" %% "embedded-kafka-schema-registry"` is
// available for Kafka 2.4.0
"SerializationTest.java" ||
"SerializationSpec.scala" ||
"EmbeddedKafkaWithSchemaRegistryTest.java"
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

package akka.kafka.internal
import java.util.concurrent.TimeUnit
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger

import akka.Done
Expand All @@ -15,8 +15,8 @@ import akka.kafka.ProducerSettings
import akka.kafka.internal.ProducerStage.{MessageCallback, ProducerCompletionState}
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
import akka.stream.{Attributes, FlowShape, Supervision}
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Supervision}
import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata}

import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -108,7 +108,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
if (producer != null) {
// Discard unsent ProducerRecords after encountering a send-failure in ProducerStage
// https://github.com/akka/alpakka-kafka/pull/318
producer.close(0L, TimeUnit.MILLISECONDS)
producer.close(Duration.ofSeconds(0))
}
failStage(ex)
}
Expand Down Expand Up @@ -215,7 +215,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
producer.close(stage.settings.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
producer.close(Duration.ofMillis(stage.settings.closeTimeout.toMillis))
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,15 @@ private final class TransactionalProducerStageLogic[K, V, P](
maybeCommitTransaction()
}

private def maybeCommitTransaction(beginNewTransaction: Boolean = true): Unit = {
private def maybeCommitTransaction(beginNewTransaction: Boolean = true,
abortEmptyTransactionOnComplete: Boolean = false): Unit = {
val awaitingConf = awaitingConfirmation.get
batchOffsets match {
case batch: NonemptyTransactionBatch if awaitingConf == 0 =>
commitTransaction(batch, beginNewTransaction)
case _: EmptyTransactionBatch if awaitingConf == 0 && abortEmptyTransactionOnComplete =>
log.debug("Aborting empty transaction because we're completing.")
abortTransaction()
case _ if awaitingConf > 0 =>
suspendDemand()
scheduleOnce(commitSchedulerKey, messageDrainInterval)
Expand All @@ -162,7 +166,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
override def onCompletionSuccess(): Unit = {
log.debug("Committing final transaction before shutdown")
cancelTimer(commitSchedulerKey)
maybeCommitTransaction(beginNewTransaction = false)
maybeCommitTransaction(beginNewTransaction = false, abortEmptyTransactionOnComplete = true)
super.onCompletionSuccess()
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/javadsl/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.annotation.ApiMayChange
import akka.kafka.ConsumerMessage.Committable
import akka.kafka.ProducerMessage._
import akka.kafka.{scaladsl, CommitterSettings, ConsumerMessage, ProducerSettings}
import akka.stream.javadsl.{Flow, FlowWithContext, Keep, Sink}
import akka.stream.javadsl.{Flow, FlowWithContext, Sink}
import akka.{japi, Done, NotUsed}
import org.apache.kafka.clients.producer.ProducerRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

package akka.kafka.testkit.internal

import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Arrays, Properties}

import scala.jdk.CollectionConverters._

import akka.actor.ActorSystem
import akka.kafka.testkit.KafkaTestkitSettings
import akka.kafka.{CommitterSettings, ConsumerSettings, ProducerSettings}
Expand Down Expand Up @@ -111,7 +111,7 @@ trait KafkaTestKit {
*/
def cleanUpAdminClient(): Unit =
if (adminClientVar != null) {
adminClientVar.close(60, TimeUnit.SECONDS)
adminClientVar.close(Duration.ofSeconds(60))
adminClientVar = null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package akka.kafka.testkit.scaladsl

import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -56,7 +57,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: A
}

def cleanUp(): Unit = {
if (testProducer ne null) testProducer.close(60, TimeUnit.SECONDS)
if (testProducer ne null) testProducer.close(Duration.ofSeconds(60))
cleanUpAdminClient()
TestKit.shutdownActorSystem(system)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,9 @@ public void onStop(Set<TopicPartition> currentTps, RestrictedConsumer consumer)
assertDone(control.isShutdown());
assertEquals(messageCount, resultOf(control.drainAndShutdown(executor)).size());

assertThat(revoked.get(), is(Collections.emptySet()));
assertThat(assigned.get(), hasItem(tp));
assertThat(stopped.get(), hasItem(tp));
assertThat(revoked.get(), hasItem(tp)); // revoke of partitions occurs after stop
seglo marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
Expand Down
8 changes: 7 additions & 1 deletion tests/src/test/scala/akka/kafka/internal/ConsumerDummy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package akka.kafka.internal

import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -48,6 +50,7 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def commitAsync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
callback: OffsetCommitCallback): Unit = ???
override def seek(partition: TopicPartition, offset: Long): Unit = ???
override def seek(partition: TopicPartition, offsetAndMetadata: OffsetAndMetadata): Unit = ???
override def seekToBeginning(partitions: java.util.Collection[TopicPartition]): Unit = ???
override def seekToEnd(partitions: java.util.Collection[TopicPartition]): Unit = ???
override def position(partition: TopicPartition): Long = ???
Expand Down Expand Up @@ -80,12 +83,15 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
timeout: java.time.Duration): Unit = ???
override def committed(partition: TopicPartition, timeout: java.time.Duration): OffsetAndMetadata = ???
override def committed(partitions: util.Set[TopicPartition]): util.Map[TopicPartition, OffsetAndMetadata] = ???
override def committed(partitions: util.Set[TopicPartition],
timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ???

override def partitionsFor(topic: String, timeout: java.time.Duration): java.util.List[PartitionInfo] = ???
override def listTopics(timeout: java.time.Duration): java.util.Map[String, java.util.List[PartitionInfo]] = ???
override def beginningOffsets(partitions: java.util.Collection[TopicPartition],
timeout: java.time.Duration): java.util.Map[TopicPartition, java.lang.Long] = ???
override def endOffsets(partitions: java.util.Collection[TopicPartition],
timeout: java.time.Duration): java.util.Map[TopicPartition, java.lang.Long] = ???
override def poll(timeout: java.time.Duration): ConsumerRecords[K, V] = ???

}
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ object PartitionedSourceSpec {
log.debug(s"seek($partition, $offset)")
seeks = seeks.updated(partition, offset)
}
def seek(partition: TopicPartition, offsetAndMeta: OffsetAndMetadata): Unit =
override def seek(partition: TopicPartition, offsetAndMeta: OffsetAndMetadata): Unit =
seek(partition, offsetAndMeta.offset)
override def paused(): java.util.Set[TopicPartition] = tpsPaused.asJava
override def pause(partitions: java.util.Collection[TopicPartition]): Unit = {
Expand Down
16 changes: 9 additions & 7 deletions tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package akka.kafka.internal

import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.CompletableFuture

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffset}
Expand All @@ -30,9 +30,9 @@ import org.mockito.stubbing.Answer
import org.mockito.verification.VerificationMode
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}

import scala.jdk.CollectionConverters._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

class ProducerSpec(_system: ActorSystem)
Expand Down Expand Up @@ -555,7 +555,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu
}
})
Mockito
.when(result.close(mockito.ArgumentMatchers.any[Long], mockito.ArgumentMatchers.any[TimeUnit]))
.when(result.close(mockito.ArgumentMatchers.any[java.time.Duration]))
.thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock) =
closed = true
Expand All @@ -570,14 +570,16 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu

def verifyClosed() = {
Mockito.verify(mock).flush()
Mockito.verify(mock).close(mockito.ArgumentMatchers.any[Long], mockito.ArgumentMatchers.any[TimeUnit])
Mockito.verify(mock).close(mockito.ArgumentMatchers.any[java.time.Duration])
}

def verifyForceClosedInCallback() = {
val inOrder = Mockito.inOrder(mock)
inOrder.verify(mock, atLeastOnce()).close(mockito.ArgumentMatchers.eq(0L), mockito.ArgumentMatchers.any[TimeUnit])
// the force close from the async callback `failStageCb`
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[java.time.Duration])
// the flush and close from `closeProducer`
inOrder.verify(mock).flush()
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[Long], mockito.ArgumentMatchers.any[TimeUnit])
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[java.time.Duration])
}

def verifyNoMoreInteractions() =
Expand Down Expand Up @@ -608,7 +610,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu
val inOrder = Mockito.inOrder(mock)
inOrder.verify(mock).abortTransaction()
inOrder.verify(mock).flush()
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[Long], mockito.ArgumentMatchers.any[TimeUnit])
inOrder.verify(mock).close(mockito.ArgumentMatchers.any[java.time.Duration])
}
}

Expand Down
Loading