From 3c78c34df0c55f3b5dec9717d54749a3f5dc751e Mon Sep 17 00:00:00 2001 From: Ming Liu Date: Thu, 24 Sep 2020 18:59:56 +0000 Subject: [PATCH] finatra-kafka-streams: Be able to compile Finatra kafka Stream to compile for both 2.2 and 2.5 Problem ------- Some code in current Finatra Kafka Stream has internal dependency on Kafka 2.2, which blocks 2.5 upgrade. Solution -------- After some experiments, we think the best way to upgrade to Kafka 2.5 is: 1. Make the code change in finatra kafka-streams to move those Kafka 2.2 direct dependency in to separate folder. 2. In build.sbt, we added filter to control which version to include. 3. In this phab, it is still building finatra kafka with 2.2 by default, but allowing us to easily switch to build with 2.5. Result -------- No public API change. It should have no impact for current Kafka 2.2 customers. Differential Revision: https://phabricator.twitter.biz/D545900 --- CHANGELOG.rst | 3 + build.sbt | 12 ++ .../prerestore/PreRestoreState.scala | 20 +-- ...PreRestoreWordCountServerFeatureTest.scala | 3 +- .../src/main/java/BUILD | 1 + ...titioningKafkaClientSupplierSupplier.scala | 6 +- .../internal/ClientStateAndHostInfo.scala | 3 +- .../StaticPartitioningStreamAssignor.scala | 22 --- .../internal/StaticTaskAssignor.scala | 0 .../internal/TaskAssignments.scala | 0 ...titioningKafkaClientSupplierSupplier.scala | 26 +++ .../src/main/scala/BUILD | 5 +- .../partitioning/StaticPartitioning.scala | 26 ++- .../main/scala-kafka2.2/CompatibleUtils.scala | 43 +++++ .../RocksDBStoreFactory.scala | 2 +- .../CompatibleUtils.scala} | 16 +- .../scala-kafka2.5/RocksDBStoreFactory.scala | 10 ++ .../kafka-streams/src/main/scala/BUILD | 5 +- .../KafkaStreamsTwitterServer.scala | 165 ++++++++++++------ .../config/KafkaStreamsConfig.scala | 3 + .../flushing/AsyncTransformer.scala | 7 +- .../internal/utils/KafkaFlagUtils.scala | 8 +- ...traTransformerLifecycleKeyValueStore.scala | 6 +- .../StatelessKafkaStreamsTwitterServer.scala | 4 +- ...traRocksDbKeyValueBytesStoreSupplier.scala | 2 +- .../test/scala-kafka2.2/KafkaTestUtil.scala | 20 +++ .../test/scala-kafka2.5/KafkaTestUtil.scala | 19 ++ .../kafka-streams/src/test/scala/BUILD | 1 + .../test/FinatraTopologyTester.scala | 2 +- .../test/KafkaStreamsFeatureTest.scala | 12 +- .../transformer/FinatraTransformerTest.scala | 12 +- .../stores/PersistentTimerStoreTest.scala | 7 +- .../FinatraKeyValueStoreLatencyTest.scala | 7 +- kafka/src/test/scala/BUILD | 2 + 34 files changed, 342 insertions(+), 138 deletions(-) rename kafka-streams/kafka-streams-static-partitioning/src/main/{scala/com/twitter/finatra/kafkastreams/partitioning/internal => scala-kafka2.2}/StaticPartitioningKafkaClientSupplierSupplier.scala (79%) rename kafka-streams/kafka-streams-static-partitioning/src/main/{scala/com/twitter/finatra/kafkastreams/partitioning => scala-kafka2.2}/internal/ClientStateAndHostInfo.scala (76%) rename kafka-streams/kafka-streams-static-partitioning/src/main/{scala/com/twitter/finatra/kafkastreams/partitioning => scala-kafka2.2}/internal/StaticPartitioningStreamAssignor.scala (64%) rename kafka-streams/kafka-streams-static-partitioning/src/main/{scala/com/twitter/finatra/kafkastreams/partitioning => scala-kafka2.2}/internal/StaticTaskAssignor.scala (100%) rename kafka-streams/kafka-streams-static-partitioning/src/main/{scala/com/twitter/finatra/kafkastreams/partitioning => scala-kafka2.2}/internal/TaskAssignments.scala (100%) create mode 100644 kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.5/StaticPartitioningKafkaClientSupplierSupplier.scala create mode 100644 kafka-streams/kafka-streams/src/main/scala-kafka2.2/CompatibleUtils.scala rename kafka-streams/kafka-streams/src/main/{scala/org/apache/kafka/streams/state/internals => scala-kafka2.2}/RocksDBStoreFactory.scala (74%) rename kafka-streams/kafka-streams/src/main/{scala/com/twitter/finatra/kafkastreams/internal/utils/TopologyReflectionUtils.scala => scala-kafka2.5/CompatibleUtils.scala} (52%) create mode 100644 kafka-streams/kafka-streams/src/main/scala-kafka2.5/RocksDBStoreFactory.scala create mode 100644 kafka-streams/kafka-streams/src/test/scala-kafka2.2/KafkaTestUtil.scala create mode 100644 kafka-streams/kafka-streams/src/test/scala-kafka2.5/KafkaTestUtil.scala diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9ee2e58c67..67eb1d0771 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -42,6 +42,9 @@ Added Changed ~~~~~~~ +* finatra-kafka-streams: Update and separate the Finatra kafka stream code base which has direct + dependency on Kafka 2.2. Separate any code which cannot easily be upgraded to separate build + target. ``PHAB_ID=D545900`` * inject-core: `c.t.inject.Injector` is now an abstract class. Use `Injector.apply` to create a new instance (versus the `new Injector(...)` before). ``PHAB_ID=D543297`` diff --git a/build.sbt b/build.sbt index 7dc8da8ce4..fbf4d023d3 100644 --- a/build.sbt +++ b/build.sbt @@ -1177,6 +1177,10 @@ lazy val kafkaStreamsStaticPartitioning = name := "finatra-kafka-streams-static-partitioning", moduleName := "finatra-kafka-streams-static-partitioning", ScoverageKeys.coverageExcludedPackages := ";.*", + unmanagedSourceDirectories in Compile += { + val sourceDir = (sourceDirectory in Compile).value + sourceDir / "scala-kafka2.2" + }, excludeDependencies in Test ++= kafkaStreamsExclusionRules, excludeDependencies ++= kafkaStreamsExclusionRules, excludeFilter in unmanagedResources := "BUILD", @@ -1237,6 +1241,14 @@ lazy val kafkaStreams = (project in file("kafka-streams/kafka-streams")) name := "finatra-kafka-streams", moduleName := "finatra-kafka-streams", ScoverageKeys.coverageExcludedPackages := ";.*", + unmanagedSourceDirectories in Compile += { + val sourceDir = (sourceDirectory in Compile).value + sourceDir / "scala-kafka2.2" + }, + unmanagedSourceDirectories in Test += { + val testDir = (sourceDirectory in Test).value + testDir / "scala-kafka2.2" + }, libraryDependencies ++= Seq( "com.twitter" %% "util-jvm" % versions.twLibVersion, "it.unimi.dsi" % "fastutil" % versions.fastutil, diff --git a/kafka-streams/kafka-streams-prerestore/src/main/scala/com/twitter/finatra/kafkastreams/prerestore/PreRestoreState.scala b/kafka-streams/kafka-streams-prerestore/src/main/scala/com/twitter/finatra/kafkastreams/prerestore/PreRestoreState.scala index 820d461fd2..0c148890da 100644 --- a/kafka-streams/kafka-streams-prerestore/src/main/scala/com/twitter/finatra/kafkastreams/prerestore/PreRestoreState.scala +++ b/kafka-streams/kafka-streams-prerestore/src/main/scala/com/twitter/finatra/kafkastreams/prerestore/PreRestoreState.scala @@ -5,9 +5,10 @@ import com.twitter.finatra.annotations.Experimental import com.twitter.finatra.kafkastreams.KafkaStreamsTwitterServer import com.twitter.finatra.kafkastreams.internal.utils.ReflectionUtils import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning +import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils + import java.util.Properties import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.common.Metric import org.apache.kafka.common.utils.Utils @@ -53,7 +54,7 @@ trait PreRestoreState extends KafkaStreamsTwitterServer with StaticPartitioning info(s"Pre-restore complete.") //Reset the thread id and start Kafka Streams as if we weren't using pre-restore mode - resetStreamThreadId() + CompatibleUtils.resetStreamThreadId() PreRestoreState.super.createAndStartKafkaStreams() } catch { case NonFatal(e) => @@ -99,21 +100,6 @@ trait PreRestoreState extends KafkaStreamsTwitterServer with StaticPartitioning properties } - //HACK: Reset StreamThread's so the kafka broker doesn't see 2x the consumer client.ids (since thread number is part of client id) - private def resetStreamThreadId(): Unit = { - try { - val streamThreadClass = classOf[StreamThread] - val streamThreadIdSequenceField = - streamThreadClass.getDeclaredField("STREAM_THREAD_ID_SEQUENCE") - streamThreadIdSequenceField.setAccessible(true) - val streamThreadIdSequence = streamThreadIdSequenceField.get(null).asInstanceOf[AtomicInteger] - streamThreadIdSequence.set(1) - } catch { - case NonFatal(e) => - error("Error resetting stream threads", e) - } - } - private def findRestoreConsumerLagMetrics(kafkaStreams: KafkaStreams): Seq[Metric] = { for { thread <- getThreads(kafkaStreams).toSeq diff --git a/kafka-streams/kafka-streams-prerestore/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount/PreRestoreWordCountServerFeatureTest.scala b/kafka-streams/kafka-streams-prerestore/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount/PreRestoreWordCountServerFeatureTest.scala index e9c0849f57..035edc62f5 100644 --- a/kafka-streams/kafka-streams-prerestore/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount/PreRestoreWordCountServerFeatureTest.scala +++ b/kafka-streams/kafka-streams-prerestore/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount/PreRestoreWordCountServerFeatureTest.scala @@ -3,6 +3,7 @@ package com.twitter.finatra.kafkastreams.integration.wordcount import com.twitter.conversions.DurationOps._ import com.twitter.finatra.http.EmbeddedHttpServer import com.twitter.finatra.kafka.serde.ScalaSerdes +import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils import com.twitter.finatra.kafkastreams.test.KafkaStreamsMultiServerFeatureTest import com.twitter.util.{Await, Duration} import org.apache.kafka.common.serialization.Serdes @@ -64,7 +65,7 @@ class PreRestoreWordCountServerFeatureTest extends KafkaStreamsMultiServerFeatur server.close() Await.result(server.mainResult) server.clearStats() - resetStreamThreadId() + CompatibleUtils.resetStreamThreadId() } private def testRestartWithoutPrerestore(): Unit = { diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/java/BUILD b/kafka-streams/kafka-streams-static-partitioning/src/main/java/BUILD index 0b63949b48..a1566edb6f 100644 --- a/kafka-streams/kafka-streams-static-partitioning/src/main/java/BUILD +++ b/kafka-streams/kafka-streams-static-partitioning/src/main/java/BUILD @@ -1,4 +1,5 @@ java_library( + # only include with compiling with kafka2.2 sources = ["**/*.java"], compiler_option_sets = [], provides = artifact( diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/StaticPartitioningKafkaClientSupplierSupplier.scala b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/StaticPartitioningKafkaClientSupplierSupplier.scala similarity index 79% rename from kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/StaticPartitioningKafkaClientSupplierSupplier.scala rename to kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/StaticPartitioningKafkaClientSupplierSupplier.scala index 1bf5720d7a..77636746af 100644 --- a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/StaticPartitioningKafkaClientSupplierSupplier.scala +++ b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/StaticPartitioningKafkaClientSupplierSupplier.scala @@ -1,10 +1,12 @@ package com.twitter.finatra.kafkastreams.partitioning.internal import java.util -import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig} +import org.apache.kafka.clients.consumer._ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier -class StaticPartitioningKafkaClientSupplierSupplier(numApplicationInstances: Int) +class StaticPartitioningKafkaClientSupplierSupplier( + numApplicationInstances: Int, + serverConfig: String) extends DefaultKafkaClientSupplier { override def getConsumer(config: util.Map[String, AnyRef]): Consumer[Array[Byte], Array[Byte]] = { diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/ClientStateAndHostInfo.scala b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/ClientStateAndHostInfo.scala similarity index 76% rename from kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/ClientStateAndHostInfo.scala rename to kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/ClientStateAndHostInfo.scala index 8f7897f173..da24fb12a6 100644 --- a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/ClientStateAndHostInfo.scala +++ b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/ClientStateAndHostInfo.scala @@ -1,12 +1,13 @@ package com.twitter.finatra.kafkastreams.partitioning.internal import com.twitter.finatra.streams.queryable.thrift.domain.ServiceShardId +import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning import org.apache.kafka.streams.processor.internals.assignment.ClientState import org.apache.kafka.streams.state.HostInfo case class ClientStateAndHostInfo[ID](id: ID, clientState: ClientState, hostInfo: HostInfo) { val serviceShardId: ServiceShardId = { - StaticPartitioningStreamAssignor.parseShardId(hostInfo.host()) + StaticPartitioning.parseShardId(hostInfo.host()) } } diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/StaticPartitioningStreamAssignor.scala b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/StaticPartitioningStreamAssignor.scala similarity index 64% rename from kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/StaticPartitioningStreamAssignor.scala rename to kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/StaticPartitioningStreamAssignor.scala index d83f9d3c66..f97d153e36 100644 --- a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/StaticPartitioningStreamAssignor.scala +++ b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/StaticPartitioningStreamAssignor.scala @@ -1,6 +1,5 @@ package com.twitter.finatra.kafkastreams.partitioning.internal -import com.twitter.finatra.streams.queryable.thrift.domain.ServiceShardId import com.twitter.finatra.streams.queryable.thrift.partitioning.StaticServiceShardPartitioner import com.twitter.inject.Logging import java.util @@ -10,31 +9,10 @@ import org.apache.kafka.streams.processor.TaskId import org.apache.kafka.streams.processor.internals.OverridableStreamsPartitionAssignor import org.apache.kafka.streams.processor.internals.assignment.{ClientState, TaskAssignor} import scala.collection.JavaConverters._ -import scala.util.control.NonFatal object StaticPartitioningStreamAssignor { val StreamsPreRestoreConfig = "streams.prerestore" val ApplicationNumInstances = "application.num.instances" - - def parseShardId(applicationServerHost: String): ServiceShardId = { - val firstPeriodIndex = applicationServerHost.indexOf('.') - - val shardId = - try { - applicationServerHost.substring(0, firstPeriodIndex).toInt - } catch { - case NonFatal(e) => - throw new Exception( - "Finatra Kafka Stream's StaticPartitioning functionality requires the " + - "'kafka.application.server' flag value to be specified as '.:" + - " where unused_hostname can be empty and unused_port must be > 0. As an example, to configure the server" + - " that represents shard #5, you can set 'kafka.application.server=5.:80'. In this example, port 80 is unused and does not" + - " need to represent an actual open port" - ) - } - - ServiceShardId(shardId) - } } class StaticPartitioningStreamAssignor extends OverridableStreamsPartitionAssignor with Logging { diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/StaticTaskAssignor.scala b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/StaticTaskAssignor.scala similarity index 100% rename from kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/StaticTaskAssignor.scala rename to kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/StaticTaskAssignor.scala diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/TaskAssignments.scala b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/TaskAssignments.scala similarity index 100% rename from kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/internal/TaskAssignments.scala rename to kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.2/internal/TaskAssignments.scala diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.5/StaticPartitioningKafkaClientSupplierSupplier.scala b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.5/StaticPartitioningKafkaClientSupplierSupplier.scala new file mode 100644 index 0000000000..0954d09dd6 --- /dev/null +++ b/kafka-streams/kafka-streams-static-partitioning/src/main/scala-kafka2.5/StaticPartitioningKafkaClientSupplierSupplier.scala @@ -0,0 +1,26 @@ +package com.twitter.finatra.kafkastreams.partitioning.internal + +import java.util +import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier + +class StaticPartitioningKafkaClientSupplierSupplier( + numApplicationInstances: Int, + serverConfig: String) + extends DefaultKafkaClientSupplier { + + override def getConsumer(config: util.Map[String, AnyRef]): Consumer[Array[Byte], Array[Byte]] = { + val applicationServerHost = Utils.getHost(serverConfig) + + val serviceShardId = StaticPartitioning.parseShardId(applicationServerHost) + + config.put( + "group.instance.id", + serviceShardId.id.toString + ) + + super.getConsumer(config) + } +} diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/BUILD b/kafka-streams/kafka-streams-static-partitioning/src/main/scala/BUILD index 92c6833e79..256ad6a7f8 100644 --- a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/BUILD +++ b/kafka-streams/kafka-streams-static-partitioning/src/main/scala/BUILD @@ -1,5 +1,8 @@ scala_library( - sources = ["**/*.scala"], + sources = [ + "../scala-kafka2.2/**/*.scala", + "com/**/*.scala", + ], compiler_option_sets = ["fatal_warnings"], provides = scala_artifact( org = "com.twitter", diff --git a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/StaticPartitioning.scala b/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/StaticPartitioning.scala index a195451c3e..421886c264 100644 --- a/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/StaticPartitioning.scala +++ b/kafka-streams/kafka-streams-static-partitioning/src/main/scala/com/twitter/finatra/kafkastreams/partitioning/StaticPartitioning.scala @@ -3,10 +3,32 @@ package com.twitter.finatra.kafkastreams.partitioning import com.twitter.app.Flag import com.twitter.finatra.kafkastreams.KafkaStreamsTwitterServer import com.twitter.finatra.kafkastreams.partitioning.internal.StaticPartitioningKafkaClientSupplierSupplier +import com.twitter.finatra.streams.queryable.thrift.domain.ServiceShardId import org.apache.kafka.streams.KafkaClientSupplier +import scala.util.control.NonFatal object StaticPartitioning { val PreRestoreSignalingPort = 0 //TODO: Hack to signal our assignor that we are in PreRestore mode + + def parseShardId(applicationServerHost: String): ServiceShardId = { + val firstPeriodIndex = applicationServerHost.indexOf('.') + + val shardId = + try { + applicationServerHost.substring(0, firstPeriodIndex).toInt + } catch { + case NonFatal(e) => + throw new Exception( + "Finatra Kafka Stream's StaticPartitioning functionality requires the " + + "'kafka.application.server' flag value to be specified as '.:" + + " where unused_hostname can be empty and unused_port must be > 0. As an example, to configure the server" + + " that represents shard #5, you can set 'kafka.application.server=5.:80'. In this example, port 80 is unused and does not" + + " need to represent an actual open port" + ) + } + + ServiceShardId(shardId) + } } trait StaticPartitioning extends KafkaStreamsTwitterServer { @@ -20,6 +42,8 @@ trait StaticPartitioning extends KafkaStreamsTwitterServer { /* Protected */ override def kafkaStreamsClientSupplier: KafkaClientSupplier = { - new StaticPartitioningKafkaClientSupplierSupplier(numApplicationInstances()) + new StaticPartitioningKafkaClientSupplierSupplier( + numApplicationInstances(), + applicationServerConfig()) } } diff --git a/kafka-streams/kafka-streams/src/main/scala-kafka2.2/CompatibleUtils.scala b/kafka-streams/kafka-streams/src/main/scala-kafka2.2/CompatibleUtils.scala new file mode 100644 index 0000000000..7d06acf079 --- /dev/null +++ b/kafka-streams/kafka-streams/src/main/scala-kafka2.2/CompatibleUtils.scala @@ -0,0 +1,43 @@ +package com.twitter.finatra.kafkastreams.internal.utils + +import java.util.concurrent.atomic.AtomicInteger +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.processor.{ProcessorContext, StateStore} +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder +import org.apache.kafka.streams.processor.internals.StreamThread +import org.apache.kafka.streams.state.internals.WrappedStateStore + +private[kafkastreams] object CompatibleUtils { + + private val internalTopologyBuilderField = + ReflectionUtils.getFinalField(classOf[Topology], "internalTopologyBuilder") + + def isStateless(topology: Topology): Boolean = { + val internalTopologyBuilder = getInternalTopologyBuilder(topology) + + internalTopologyBuilder.getStateStores.isEmpty + } + + private def getInternalTopologyBuilder(topology: Topology): InternalTopologyBuilder = { + internalTopologyBuilderField + .get(topology) + .asInstanceOf[InternalTopologyBuilder] + } + + def getUnwrappedStateStore[K, V](name: String, processorContext: ProcessorContext): StateStore = { + val unwrappedStateStore = processorContext + .getStateStore(name).asInstanceOf[WrappedStateStore[_]] + .wrapped() + + unwrappedStateStore + } + + def resetStreamThreadId() = { + val streamThreadClass = classOf[StreamThread] + val streamThreadIdSequenceField = streamThreadClass + .getDeclaredField("STREAM_THREAD_ID_SEQUENCE") + streamThreadIdSequenceField.setAccessible(true) + val streamThreadIdSequence = streamThreadIdSequenceField.get(null).asInstanceOf[AtomicInteger] + streamThreadIdSequence.set(1) + } +} diff --git a/kafka-streams/kafka-streams/src/main/scala/org/apache/kafka/streams/state/internals/RocksDBStoreFactory.scala b/kafka-streams/kafka-streams/src/main/scala-kafka2.2/RocksDBStoreFactory.scala similarity index 74% rename from kafka-streams/kafka-streams/src/main/scala/org/apache/kafka/streams/state/internals/RocksDBStoreFactory.scala rename to kafka-streams/kafka-streams/src/main/scala-kafka2.2/RocksDBStoreFactory.scala index 28cadb7f14..05365906b1 100644 --- a/kafka-streams/kafka-streams/src/main/scala/org/apache/kafka/streams/state/internals/RocksDBStoreFactory.scala +++ b/kafka-streams/kafka-streams/src/main/scala-kafka2.2/RocksDBStoreFactory.scala @@ -4,7 +4,7 @@ package org.apache.kafka.streams.state.internals * Factory to allow us to call the package private RocksDBStore constructor */ object RocksDBStoreFactory { - def create(name: String): RocksDBStore = { + def create(name: String, metricScope: String): RocksDBStore = { new RocksDBStore(name) } } diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/utils/TopologyReflectionUtils.scala b/kafka-streams/kafka-streams/src/main/scala-kafka2.5/CompatibleUtils.scala similarity index 52% rename from kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/utils/TopologyReflectionUtils.scala rename to kafka-streams/kafka-streams/src/main/scala-kafka2.5/CompatibleUtils.scala index d066447f09..591b10f634 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/utils/TopologyReflectionUtils.scala +++ b/kafka-streams/kafka-streams/src/main/scala-kafka2.5/CompatibleUtils.scala @@ -1,9 +1,11 @@ package com.twitter.finatra.kafkastreams.internal.utils import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.processor.{ProcessorContext, StateStore} import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder +import org.apache.kafka.streams.state.internals.WrappedStateStore -private[kafkastreams] object TopologyReflectionUtils { +private[kafkastreams] object CompatibleUtils { private val internalTopologyBuilderField = ReflectionUtils.getFinalField(classOf[Topology], "internalTopologyBuilder") @@ -11,7 +13,7 @@ private[kafkastreams] object TopologyReflectionUtils { def isStateless(topology: Topology): Boolean = { val internalTopologyBuilder = getInternalTopologyBuilder(topology) - internalTopologyBuilder.getStateStores.isEmpty + internalTopologyBuilder.stateStores.isEmpty } private def getInternalTopologyBuilder(topology: Topology): InternalTopologyBuilder = { @@ -19,4 +21,14 @@ private[kafkastreams] object TopologyReflectionUtils { .get(topology) .asInstanceOf[InternalTopologyBuilder] } + + def getUnwrappedStateStore[K, V](name: String, processorContext: ProcessorContext): StateStore = { + val unwrappedStateStore = processorContext + .getStateStore(name).asInstanceOf[WrappedStateStore[_, K, V]] + .wrapped().asInstanceOf[StateStore] + + unwrappedStateStore + } + + def resetStreamThreadId() = {} } diff --git a/kafka-streams/kafka-streams/src/main/scala-kafka2.5/RocksDBStoreFactory.scala b/kafka-streams/kafka-streams/src/main/scala-kafka2.5/RocksDBStoreFactory.scala new file mode 100644 index 0000000000..181871efa8 --- /dev/null +++ b/kafka-streams/kafka-streams/src/main/scala-kafka2.5/RocksDBStoreFactory.scala @@ -0,0 +1,10 @@ +package org.apache.kafka.streams.state.internals + +/** + * Factory to allow us to call the package private RocksDBStore constructor + */ +object RocksDBStoreFactory { + def create(name: String, metricScope: String): RocksDBStore = { + new RocksDBStore(name, metricScope) + } +} diff --git a/kafka-streams/kafka-streams/src/main/scala/BUILD b/kafka-streams/kafka-streams/src/main/scala/BUILD index 7bba712bd3..e5c37bd29f 100644 --- a/kafka-streams/kafka-streams/src/main/scala/BUILD +++ b/kafka-streams/kafka-streams/src/main/scala/BUILD @@ -13,9 +13,9 @@ jar_library( scala_library( sources = [ + "../scala-kafka2.2/*.scala", "com/twitter/finatra/kafkastreams/**/*.scala", - "com/twitter/finatra/streams/**/*.scala", - "org/**/*.scala", + "org/apache/kafka/streams/state/**/*.scala", ], compiler_option_sets = ["fatal_warnings"], provides = scala_artifact( @@ -25,6 +25,7 @@ scala_library( ), strict_deps = True, dependencies = [ + # NOTE: remove this direct rocksDB dependency when building 2.5 ":rocksdb-5.14.2", "3rdparty/jvm/it/unimi/dsi:fastutil", "3rdparty/jvm/org/agrona", diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/KafkaStreamsTwitterServer.scala b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/KafkaStreamsTwitterServer.scala index daede567da..e1d5ca7f4b 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/KafkaStreamsTwitterServer.scala +++ b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/KafkaStreamsTwitterServer.scala @@ -39,16 +39,16 @@ import org.apache.kafka.streams.{ /** * A [[com.twitter.server.TwitterServer]] that supports configuring a KafkaStreams topology. * - * To use, override the [[configureKafkaStreams]] method to setup your topology. + * To use, override the [[configureKafkaStreams]] method to setup your topology. * - * {{{ + * {{{ * import com.twitter.finatra.kafkastreams.KafkaStreamsTwitterServer * - * object MyKafkaStreamsTwitterServerMain extends MyKafkaStreamsTwitterServer + * object MyKafkaStreamsTwitterServerMain extends MyKafkaStreamsTwitterServer * - * class MyKafkaStreamsTwitterServer extends KafkaStreamsTwitterServer { + * class MyKafkaStreamsTwitterServer extends KafkaStreamsTwitterServer { * - * override def configureKafkaStreams(streamsBuilder: StreamsBuilder): Unit = { + * override def configureKafkaStreams(streamsBuilder: StreamsBuilder): Unit = { * streamsBuilder.asScala * .stream("dp-it-devel-tweetid-to-interaction")( * Consumed.`with`(ScalaSerdes.Long, ScalaSerdes.Thrift[MigratorInteraction]) @@ -78,32 +78,44 @@ abstract class KafkaStreamsTwitterServer flagWithKafkaDefault[String](StreamsConfig.PROCESSING_GUARANTEE_CONFIG) private val cacheMaxBytesBuffering = flagWithKafkaDefault[Long](StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) - private val metadataMaxAge = flagWithKafkaDefault[Long](StreamsConfig.METADATA_MAX_AGE_CONFIG) + private val metadataMaxAge = + flagWithKafkaDefault[Long](StreamsConfig.METADATA_MAX_AGE_CONFIG) // ConsumerConfig default flags private val consumerMaxPollRecords = consumerFlagWithKafkaDefault[Int](ConsumerConfig.MAX_POLL_RECORDS_CONFIG) private val consumerMaxPollInterval = - consumerFlagWithKafkaDefault[Int](ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) + consumerFlagWithKafkaDefault[Int]( + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ) private val consumerAutoOffsetReset = - consumerFlagWithKafkaDefault[String](ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) + consumerFlagWithKafkaDefault[String]( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + ) private val consumerSessionTimeout = consumerFlagWithKafkaDefault[Int](ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) private val consumerHeartbeatInterval = - consumerFlagWithKafkaDefault[Int](ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG) + consumerFlagWithKafkaDefault[Int]( + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG + ) private val consumerFetchMin = consumerFlagWithKafkaDefault[Int](ConsumerConfig.FETCH_MIN_BYTES_CONFIG) private val consumerFetchMaxWait = consumerFlagWithKafkaDefault[Int](ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG) private val consumerMaxPartitionFetch = - consumerFlagWithKafkaDefault[Int](ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG) + consumerFlagWithKafkaDefault[Int]( + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ) private val consumerRequestTimeout = consumerFlagWithKafkaDefault[Int](ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG) private val consumerConnectionsMaxIdle = - consumerFlagWithKafkaDefault[Long](ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG) + consumerFlagWithKafkaDefault[Long]( + ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG + ) // ProducerConfig default flag - private val producerLinger = producerFlagWithKafkaDefault[Int](ProducerConfig.LINGER_MS_CONFIG) + private val producerLinger = + producerFlagWithKafkaDefault[Long](ProducerConfig.LINGER_MS_CONFIG) // Configs with customized default private val replicationFactor = @@ -112,8 +124,12 @@ abstract class KafkaStreamsTwitterServer 3 ) // We set it to 3 for durability and reliability. protected[kafkastreams] val applicationServerConfig = - kafkaFlag(StreamsConfig.APPLICATION_SERVER_CONFIG, s"localhost:$defaultAdminPort") - private[finatra] val stateDir = kafkaFlag(StreamsConfig.STATE_DIR_CONFIG, "kafka-stream-state") + kafkaFlag( + StreamsConfig.APPLICATION_SERVER_CONFIG, + s"localhost:$defaultAdminPort" + ) + private[finatra] val stateDir = + kafkaFlag(StreamsConfig.STATE_DIR_CONFIG, "kafka-stream-state") private val metricsRecordingLevel = kafkaFlag(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") @@ -147,9 +163,9 @@ abstract class KafkaStreamsTwitterServer * Callback method which is executed after the injector is created and before any other lifecycle * methods. * - * Use the provided StreamsBuilder to create your KafkaStreams topology. + * Use the provided StreamsBuilder to create your KafkaStreams topology. * - * @note It is NOT expected that you block in this method as you will prevent completion + * @note It is NOT expected that you block in this method as you will prevent completion * of the server lifecycle. * @param builder */ @@ -184,7 +200,9 @@ abstract class KafkaStreamsTwitterServer closeKafkaStreamsOnExit(kafkaStreams) kafkaStreams.start() - while (!kafkaStreams.state().isRunning) { + // after upgraded to 2.5, we can move to isRunningOrRebalancing() function + while (kafkaStreams.state() != KafkaStreams.State.RUNNING && + kafkaStreams.state() != KafkaStreams.State.REBALANCING) { Thread.sleep(100) debug("Waiting for Initial Kafka Streams Startup") } @@ -209,11 +227,11 @@ abstract class KafkaStreamsTwitterServer * Callback method which is executed after the injector is created and before KafkaStreams is * configured. * - * Use the provided KafkaStreamsConfig and augment to configure your KafkaStreams topology. + * Use the provided KafkaStreamsConfig and augment to configure your KafkaStreams topology. * - * Example: + * Example: * - * {{{ + * {{{ * override def streamsProperties(config: KafkaStreamsConfig): KafkaStreamsConfig = { * super * .streamsProperties(config) @@ -229,18 +247,24 @@ abstract class KafkaStreamsTwitterServer * } * }}} * - * + * * @param config the default KafkaStreamsConfig defined at [[createKafkaStreamsProperties]] * - * @return a KafkaStreamsConfig with your additional configurations applied. + * @return a KafkaStreamsConfig with your additional configurations applied. */ - protected def streamsProperties(config: KafkaStreamsConfig): KafkaStreamsConfig = config + protected def streamsProperties( + config: KafkaStreamsConfig + ): KafkaStreamsConfig = config protected[finatra] def createKafkaStreamsProperties(): Properties = { var defaultConfig = new KafkaStreamsConfig() + // Set upgradeFrom when we first move to 2.5 binary + //.upgradeFrom("2.2") .metricReporter[KafkaStreamsFinagleMetricsReporter] - .metricsRecordingLevelConfig(RecordingLevel.forName(metricsRecordingLevel())) + .metricsRecordingLevelConfig( + RecordingLevel.forName(metricsRecordingLevel()) + ) .metricsSampleWindow(60.seconds) .applicationServer(applicationServerConfig()) .dest(bootstrapServer()) @@ -251,31 +275,57 @@ abstract class KafkaStreamsTwitterServer .cacheMaxBuffering(cacheMaxBytesBuffering().bytes) .numStandbyReplicas(numStandbyReplicas()) .metadataMaxAge(metadataMaxAge().milliseconds) - .processingGuarantee(ProcessingGuarantee.valueOf(processingGuarantee().toUpperCase)) + .processingGuarantee( + ProcessingGuarantee.valueOf(processingGuarantee().toUpperCase) + ) .defaultKeySerde[AvoidDefaultSerde] .defaultValueSerde[AvoidDefaultSerde] - .withConfig(InstanceMetadataProducerInterceptor.KafkaInstanceKeyFlagName, instanceKey()) - .producer.metricReporter[KafkaStreamsFinagleMetricsReporter] - .producer.metricsRecordingLevel(RecordingLevel.forName(metricsRecordingLevel())) - .producer.metricsSampleWindow(60.seconds) - .producer.interceptor[PublishTimeProducerInterceptor] - .producer.interceptor[InstanceMetadataProducerInterceptor] - .producer.linger(producerLinger().milliseconds) - .consumer.fetchMin(consumerFetchMin().bytes) - .consumer.fetchMaxWait(consumerFetchMaxWait().milliseconds) - .consumer.sessionTimeout(consumerSessionTimeout().milliseconds) - .consumer.heartbeatInterval(consumerHeartbeatInterval().milliseconds) - .consumer.metricReporter[KafkaStreamsFinagleMetricsReporter] - .consumer.metricsRecordingLevel(RecordingLevel.forName(metricsRecordingLevel())) - .consumer.metricsSampleWindow(60.seconds) - .consumer.autoOffsetReset(OffsetResetStrategy.valueOf( - consumerAutoOffsetReset().toUpperCase)) - .consumer.maxPollRecords(consumerMaxPollRecords()) - .consumer.maxPollInterval(consumerMaxPollInterval().milliseconds) - .consumer.maxPartitionFetch(consumerMaxPartitionFetch().bytes) - .consumer.requestTimeout(consumerRequestTimeout().milliseconds) - .consumer.connectionsMaxIdle(consumerConnectionsMaxIdle().milliseconds) - .consumer.interceptor[KafkaStreamsMonitoringConsumerInterceptor] + .withConfig( + InstanceMetadataProducerInterceptor.KafkaInstanceKeyFlagName, + instanceKey() + ) + .producer + .metricReporter[KafkaStreamsFinagleMetricsReporter] + .producer + .metricsRecordingLevel(RecordingLevel.forName(metricsRecordingLevel())) + .producer + .metricsSampleWindow(60.seconds) + .producer + .interceptor[PublishTimeProducerInterceptor] + .producer + .interceptor[InstanceMetadataProducerInterceptor] + .producer + .linger(producerLinger().milliseconds) + .consumer + .fetchMin(consumerFetchMin().bytes) + .consumer + .fetchMaxWait(consumerFetchMaxWait().milliseconds) + .consumer + .sessionTimeout(consumerSessionTimeout().milliseconds) + .consumer + .heartbeatInterval(consumerHeartbeatInterval().milliseconds) + .consumer + .metricReporter[KafkaStreamsFinagleMetricsReporter] + .consumer + .metricsRecordingLevel(RecordingLevel.forName(metricsRecordingLevel())) + .consumer + .metricsSampleWindow(60.seconds) + .consumer + .autoOffsetReset( + OffsetResetStrategy.valueOf(consumerAutoOffsetReset().toUpperCase) + ) + .consumer + .maxPollRecords(consumerMaxPollRecords()) + .consumer + .maxPollInterval(consumerMaxPollInterval().milliseconds) + .consumer + .maxPartitionFetch(consumerMaxPartitionFetch().bytes) + .consumer + .requestTimeout(consumerRequestTimeout().milliseconds) + .consumer + .connectionsMaxIdle(consumerConnectionsMaxIdle().milliseconds) + .consumer + .interceptor[KafkaStreamsMonitoringConsumerInterceptor] if (applicationId().nonEmpty) { defaultConfig = defaultConfig.applicationId(applicationId()) @@ -304,11 +354,16 @@ abstract class KafkaStreamsTwitterServer /* Private */ - private def closeKafkaStreamsOnExit(kafkaStreamsToClose: KafkaStreams): Unit = { + private def closeKafkaStreamsOnExit( + kafkaStreamsToClose: KafkaStreams + ): Unit = { onExit { info("Closing kafka streams") try { - kafkaStreams.close(defaultCloseGracePeriod.inMillis, TimeUnit.MILLISECONDS) + kafkaStreams.close( + defaultCloseGracePeriod.inMillis, + TimeUnit.MILLISECONDS + ) } catch { case e: Throwable => error("Error while closing kafka streams", e) @@ -320,9 +375,13 @@ abstract class KafkaStreamsTwitterServer private def monitorStateChanges(streams: KafkaStreams): Unit = { streams.setStateListener(new FinatraStateChangeListener(streams)) - streams.setGlobalStateRestoreListener(new FinatraStateRestoreListener(streamsStatsReceiver)) + streams.setGlobalStateRestoreListener( + new FinatraStateRestoreListener(streamsStatsReceiver) + ) - streamsStatsReceiver.provideGauge("totalTimeRebalancing")(totalTimeRebalancing.get()) + streamsStatsReceiver.provideGauge("totalTimeRebalancing")( + totalTimeRebalancing.get() + ) streamsStatsReceiver.provideGauge("state") { streams.state match { @@ -343,7 +402,9 @@ abstract class KafkaStreamsTwitterServer timeStartedRebalancingOpt = Some(System.currentTimeMillis()) } else { for (timeStartedRebalancing <- timeStartedRebalancingOpt) { - totalTimeRebalancing.addAndGet(System.currentTimeMillis - timeStartedRebalancing) + totalTimeRebalancing.addAndGet( + System.currentTimeMillis - timeStartedRebalancing + ) timeStartedRebalancingOpt = None } } diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/config/KafkaStreamsConfig.scala b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/config/KafkaStreamsConfig.scala index b4109a7bc2..13489ab060 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/config/KafkaStreamsConfig.scala +++ b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/config/KafkaStreamsConfig.scala @@ -159,6 +159,9 @@ class KafkaStreamsConfig( def stateDir(directory: String): This = withConfig(StreamsConfig.STATE_DIR_CONFIG, directory) + def upgradeFrom(version: String): This = + withConfig(StreamsConfig.UPGRADE_FROM_CONFIG, version) + def windowStoreChangeLogAdditionalRetention(duration: Duration): This = withConfig(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, duration) } diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/flushing/AsyncTransformer.scala b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/flushing/AsyncTransformer.scala index 757210b67f..0955ed7a99 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/flushing/AsyncTransformer.scala +++ b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/flushing/AsyncTransformer.scala @@ -3,7 +3,8 @@ package com.twitter.finatra.kafkastreams.flushing import com.twitter.finagle.stats.{Stat, StatsReceiver, Verbosity} import com.twitter.finatra.kafkastreams.internal.utils.ProcessorContextLogging import com.twitter.util.{Duration, Future} -import java.util.concurrent.ConcurrentHashMap +import java.util.Collections +import java.util import org.apache.kafka.streams.processor.internals.{ InternalProcessorContext, ProcessorRecordContext @@ -55,8 +56,8 @@ abstract class AsyncTransformer[K1, V1, K2, V2]( @volatile private var flushOutputRecordsCancellable: Cancellable = _ - private val outstandingResults = ConcurrentHashMap - .newKeySet[(K2, V2, ProcessorRecordContext)](maxOutstandingFuturesPerTask) + val oldList = new util.LinkedList[(K2, V2, ProcessorRecordContext)]() + val outstandingResults = Collections.synchronizedList(oldList) private val outstandingResultsGauge = statsReceiver.addGauge(Verbosity.Debug, "outstandingResults")(numOutstandingResults) diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/utils/KafkaFlagUtils.scala b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/utils/KafkaFlagUtils.scala index cdaea35dd6..ce7759cd32 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/utils/KafkaFlagUtils.scala +++ b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/utils/KafkaFlagUtils.scala @@ -134,6 +134,12 @@ private[kafkastreams] trait KafkaFlagUtils extends App { s"Kafka doesn't have a default value for $key, please provide a default value" ) } - configKey.defaultValue.asInstanceOf[T] + + if (configKey.defaultValue.isInstanceOf[Integer] && key == ProducerConfig.LINGER_MS_CONFIG) { + // compatibility workaround: kafka 2.2 Linger is int, 2.5 is long + configKey.defaultValue.asInstanceOf[Integer].toLong.asInstanceOf[T] + } else { + configKey.defaultValue.asInstanceOf[T] + } } } diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/transformer/stores/internal/FinatraTransformerLifecycleKeyValueStore.scala b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/transformer/stores/internal/FinatraTransformerLifecycleKeyValueStore.scala index c64327c35a..6b79c46961 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/transformer/stores/internal/FinatraTransformerLifecycleKeyValueStore.scala +++ b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/transformer/stores/internal/FinatraTransformerLifecycleKeyValueStore.scala @@ -7,7 +7,7 @@ import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.errors.InvalidStateStoreException import org.apache.kafka.streams.processor.{ProcessorContext, StateStore, TaskId} import org.apache.kafka.streams.state.KeyValueIterator -import org.apache.kafka.streams.state.internals.WrappedStateStore +import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils /** * A FinatraKeyValueStore which allows FinatraTransformer#getKeyValueStore to retrieve a key value @@ -33,9 +33,7 @@ case class FinatraTransformerLifecycleKeyValueStore[K, V]( debug(s"init ${processorContext.taskId}") this._processorContext = processorContext - val unwrappedStateStore = processorContext - .getStateStore(name).asInstanceOf[WrappedStateStore[_]] - .wrapped() + val unwrappedStateStore = CompatibleUtils.getUnwrappedStateStore[K, V](name, processorContext) this.keyValueStore = unwrappedStateStore match { case cachingStore: CachingFinatraKeyValueStoreImpl[K, V] => diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/utils/StatelessKafkaStreamsTwitterServer.scala b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/utils/StatelessKafkaStreamsTwitterServer.scala index 512aa0db56..a83476aad5 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/utils/StatelessKafkaStreamsTwitterServer.scala +++ b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/utils/StatelessKafkaStreamsTwitterServer.scala @@ -1,7 +1,7 @@ package com.twitter.finatra.kafkastreams.utils import com.twitter.finatra.kafkastreams.KafkaStreamsTwitterServer -import com.twitter.finatra.kafkastreams.internal.utils.TopologyReflectionUtils +import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils import org.apache.kafka.streams.Topology /** @@ -15,7 +15,7 @@ abstract class StatelessKafkaStreamsTwitterServer extends KafkaStreamsTwitterSer override def createKafkaStreamsTopology(): Topology = { val topology = super.createKafkaStreamsTopology() - if (!TopologyReflectionUtils.isStateless(topology)) { + if (!CompatibleUtils.isStateless(topology)) { throw new UnsupportedOperationException( "This server is using StatelessKafkaStreamsTwitterServer but it is not a stateless topology" ) diff --git a/kafka-streams/kafka-streams/src/main/scala/org/apache/kafka/streams/state/internals/FinatraRocksDbKeyValueBytesStoreSupplier.scala b/kafka-streams/kafka-streams/src/main/scala/org/apache/kafka/streams/state/internals/FinatraRocksDbKeyValueBytesStoreSupplier.scala index 9254bfae29..4e85743b72 100644 --- a/kafka-streams/kafka-streams/src/main/scala/org/apache/kafka/streams/state/internals/FinatraRocksDbKeyValueBytesStoreSupplier.scala +++ b/kafka-streams/kafka-streams/src/main/scala/org/apache/kafka/streams/state/internals/FinatraRocksDbKeyValueBytesStoreSupplier.scala @@ -17,7 +17,7 @@ class FinatraRocksDbKeyValueBytesStoreSupplier( override val name: String) extends KeyValueBytesStoreSupplier { - override def get: RocksDBStore = new RocksDBStore(name) + override def get: RocksDBStore = RocksDBStoreFactory.create(name, "rocksdb-state") override def metricsScope: String = "rocksdb-state" } diff --git a/kafka-streams/kafka-streams/src/test/scala-kafka2.2/KafkaTestUtil.scala b/kafka-streams/kafka-streams/src/test/scala-kafka2.2/KafkaTestUtil.scala new file mode 100644 index 0000000000..67357a7e61 --- /dev/null +++ b/kafka-streams/kafka-streams/src/test/scala-kafka2.2/KafkaTestUtil.scala @@ -0,0 +1,20 @@ +package com.twitter.finatra.kafkastreams.test + +import org.apache.kafka.streams.TopologyTestDriver +import org.apache.kafka.streams.processor.StateStore +import org.apache.kafka.test.NoOpRecordCollector +import org.apache.kafka.streams.processor.internals.RecordCollector + +/** + * Factory to allow us to call the package private RocksDBStore constructor + */ +object KafkaTestUtil { + def createNoopRecord(): RecordCollector = { + new NoOpRecordCollector + } + + def getStore(driver: TopologyTestDriver, name: String): StateStore = { + driver.getStateStore(name) + } + +} diff --git a/kafka-streams/kafka-streams/src/test/scala-kafka2.5/KafkaTestUtil.scala b/kafka-streams/kafka-streams/src/test/scala-kafka2.5/KafkaTestUtil.scala new file mode 100644 index 0000000000..5e19bded33 --- /dev/null +++ b/kafka-streams/kafka-streams/src/test/scala-kafka2.5/KafkaTestUtil.scala @@ -0,0 +1,19 @@ +package com.twitter.finatra.kafkastreams.test + +import org.apache.kafka.streams.TopologyTestDriver +import org.apache.kafka.streams.processor.StateStore +import org.apache.kafka.streams.processor.internals.RecordCollector +import org.apache.kafka.test.MockRecordCollector + +/** + * Factory to allow us to call the package private RocksDBStore constructor + */ +object KafkaTestUtil { + def createNoopRecord(): RecordCollector = { + new MockRecordCollector + } + + def getStore(driver: TopologyTestDriver, name: String): StateStore = { + driver.getStateStore(name) + } +} diff --git a/kafka-streams/kafka-streams/src/test/scala/BUILD b/kafka-streams/kafka-streams/src/test/scala/BUILD index 3cba376b3b..09ac8916f1 100644 --- a/kafka-streams/kafka-streams/src/test/scala/BUILD +++ b/kafka-streams/kafka-streams/src/test/scala/BUILD @@ -1,6 +1,7 @@ scala_library( name = "test-deps", sources = [ + "../scala-kafka2.2/*.scala", "com/twitter/finatra/kafkastreams/test/*.scala", "com/twitter/inject/*.scala", ], diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/test/FinatraTopologyTester.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/test/FinatraTopologyTester.scala index ec88c9fc6e..a24178dfef 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/test/FinatraTopologyTester.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/test/FinatraTopologyTester.scala @@ -180,7 +180,7 @@ case class FinatraTopologyTester private ( def getKeyValueStore[K, V](name: String): KeyValueStore[K, V] = { driver - .getStateStore(name) + .getKeyValueStore(name) .asInstanceOf[KeyValueStore[K, V]] } diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/test/KafkaStreamsFeatureTest.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/test/KafkaStreamsFeatureTest.scala index 9d9b32544f..f3f6040bc6 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/test/KafkaStreamsFeatureTest.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/test/KafkaStreamsFeatureTest.scala @@ -2,12 +2,11 @@ package com.twitter.finatra.kafkastreams.test import com.twitter.conversions.DurationOps._ import com.twitter.finatra.kafka.test._ +import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils import com.twitter.inject.Test import com.twitter.util.Duration import java.io.File -import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.streams.processor.internals.StreamThread /** * Extensible abstract test class used when testing a single KafkaStreamsTwitterServer in your test. @@ -142,14 +141,7 @@ abstract class AbstractKafkaStreamsFeatureTest extends Test with EmbeddedKafka { ) } - //HACK: Reset StreamThread's id after each test so that each test starts from a known fresh state - //Without this hack, tests would need to always wildcard the thread number when asserting stats protected def resetStreamThreadId(): Unit = { - val streamThreadClass = classOf[StreamThread] - val streamThreadIdSequenceField = streamThreadClass - .getDeclaredField("STREAM_THREAD_ID_SEQUENCE") - streamThreadIdSequenceField.setAccessible(true) - val streamThreadIdSequence = streamThreadIdSequenceField.get(null).asInstanceOf[AtomicInteger] - streamThreadIdSequence.set(1) + CompatibleUtils.resetStreamThreadId() } } diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/FinatraTransformerTest.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/FinatraTransformerTest.scala index 980305284d..0192171417 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/FinatraTransformerTest.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/FinatraTransformerTest.scala @@ -6,20 +6,16 @@ import com.twitter.finatra.kafkastreams.config.KafkaStreamsConfig import com.twitter.finatra.kafkastreams.transformer.domain.Time import com.twitter.finatra.kafkastreams.transformer.stores.CachingKeyValueStores import com.twitter.finatra.kafkastreams.transformer.watermarks.Watermark +import com.twitter.finatra.kafkastreams.test.KafkaTestUtil import com.twitter.inject.Test import com.twitter.inject.server.InMemoryStatsReceiverUtility import com.twitter.util.Duration import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.processor._ -import org.apache.kafka.streams.processor.internals.{ProcessorNode, RecordCollector, ToInternal} +import org.apache.kafka.streams.processor.internals._ import org.apache.kafka.streams.state.internals.{FinatraStores, WrappedStateStore} -import org.apache.kafka.test.{ - InternalMockProcessorContext, - MockProcessorNode, - NoOpRecordCollector, - TestUtils -} +import org.apache.kafka.test.{InternalMockProcessorContext, MockProcessorNode, TestUtils} import org.mockito.{ArgumentMatcher, ArgumentMatchers, Mockito} class FinatraTransformerTest extends Test with com.twitter.util.mock.Mockito { @@ -157,7 +153,7 @@ class FinatraTransformerTest extends Test with com.twitter.util.mock.Mockito { } override def recordCollector(): RecordCollector = { - new NoOpRecordCollector + KafkaTestUtil.createNoopRecord() } override def forward[K, V](key: K, value: V, to: To): Unit = {} diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/stores/PersistentTimerStoreTest.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/stores/PersistentTimerStoreTest.scala index d7e98cbe74..2b878be137 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/stores/PersistentTimerStoreTest.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/stores/PersistentTimerStoreTest.scala @@ -6,6 +6,7 @@ import com.twitter.finatra.kafkastreams.transformer.stores.internal.{ FinatraKeyValueStoreImpl, Timer } +import com.twitter.finatra.kafkastreams.test.KafkaTestUtil import com.twitter.finatra.kafkastreams.transformer.watermarks.Watermark import com.twitter.finatra.streams.transformer.internal.domain.TimerSerde import com.twitter.inject.Test @@ -14,7 +15,7 @@ import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.common.utils.LogContext import org.apache.kafka.streams.processor.internals.MockStreamsMetrics import org.apache.kafka.streams.state.internals.{RocksDBStoreFactory, ThreadCache} -import org.apache.kafka.test.{InternalMockProcessorContext, NoOpRecordCollector, TestUtils} +import org.apache.kafka.test.{InternalMockProcessorContext, TestUtils} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -26,11 +27,11 @@ class PersistentTimerStoreTest extends Test { TestUtils.tempDirectory, Serdes.String, Serdes.String, - new NoOpRecordCollector, + KafkaTestUtil.createNoopRecord, new ThreadCache(new LogContext("testCache"), 0, new MockStreamsMetrics(new Metrics())) ) - private val rocksDBStore = RocksDBStoreFactory.create("mystore") + private val rocksDBStore = RocksDBStoreFactory.create("mystore", "TestMetrics") private val keyValueStore = new FinatraKeyValueStoreImpl[Timer[TimerKey], Array[Byte]]( _rocksDBStore = rocksDBStore, diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/stores/internal/FinatraKeyValueStoreLatencyTest.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/stores/internal/FinatraKeyValueStoreLatencyTest.scala index 81f8f84b18..549557f8ae 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/stores/internal/FinatraKeyValueStoreLatencyTest.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/transformer/stores/internal/FinatraKeyValueStoreLatencyTest.scala @@ -4,13 +4,14 @@ import com.twitter.finagle.stats.InMemoryStatsReceiver import com.twitter.finatra.kafka.serde.ScalaSerdes import com.twitter.inject.Test import com.twitter.inject.server.InMemoryStatsReceiverUtility +import com.twitter.finatra.kafkastreams.test.KafkaTestUtil import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.common.utils.LogContext import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.processor.internals.MockStreamsMetrics import org.apache.kafka.streams.state.internals.{RocksDBStoreFactory, ThreadCache} -import org.apache.kafka.test.{InternalMockProcessorContext, NoOpRecordCollector, TestUtils} +import org.apache.kafka.test.{InternalMockProcessorContext, TestUtils} import scala.collection.JavaConverters._ class FinatraKeyValueStoreLatencyTest extends Test { @@ -19,13 +20,13 @@ class FinatraKeyValueStoreLatencyTest extends Test { TestUtils.tempDirectory, Serdes.String, Serdes.String, - new NoOpRecordCollector, + KafkaTestUtil.createNoopRecord(), new ThreadCache(new LogContext("testCache"), 0, new MockStreamsMetrics(new Metrics())) ) private val statsReceiver = new InMemoryStatsReceiver() private val statsUtil = new InMemoryStatsReceiverUtility(statsReceiver) - private val rocksDbStore = RocksDBStoreFactory.create("FinatraKeyValueStoreTest") + private val rocksDbStore = RocksDBStoreFactory.create("FinatraKeyValueStoreTest", "TestMetrics") private val keyValueStore = new MetricsFinatraKeyValueStore[Int, String]( new FinatraKeyValueStoreImpl(rocksDbStore, rocksDbStore, ScalaSerdes.Int, Serdes.String), statsReceiver = statsReceiver) diff --git a/kafka/src/test/scala/BUILD b/kafka/src/test/scala/BUILD index 8093de9592..ff9441332d 100644 --- a/kafka/src/test/scala/BUILD +++ b/kafka/src/test/scala/BUILD @@ -20,6 +20,8 @@ junit_tests( "finatra/kafka/src/main/java", "finatra/kafka/src/test/resources", "finatra/kafka/src/test/thrift:thrift-scala", + # EmbeddedKafkaCluster.java in 2.5 has dependency on kafka-streams code. + "3rdparty/jvm/org/apache/kafka:kafka-streams", ], )