Skip to content

Commit

Permalink
finatra-kafka-streams: Be able to compile Finatra kafka Stream to com…
Browse files Browse the repository at this point in the history
…pile for both 2.2 and 2.5

Problem
-------
Some code in current Finatra Kafka Stream has internal dependency on Kafka 2.2, which blocks 2.5 upgrade.

Solution
--------
 After some experiments, we think the best way to upgrade to Kafka 2.5 is:
 1. Make the code change in finatra kafka-streams to move those Kafka 2.2 direct dependency in to separate folder.
 2. In build.sbt,  we added filter to control which version to include.
 3. In this phab, it is still building finatra kafka with 2.2 by default, but allowing us to easily switch to build with 2.5.

Result
--------
  No public API change.  It should have no impact for current Kafka 2.2 customers.

Differential Revision: https://phabricator.twitter.biz/D545900
  • Loading branch information
Ming Liu authored and jenkins committed Sep 24, 2020
1 parent 1c1b55c commit 3c78c34
Show file tree
Hide file tree
Showing 34 changed files with 342 additions and 138 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ Added

Changed
~~~~~~~
* finatra-kafka-streams: Update and separate the Finatra kafka stream code base which has direct
dependency on Kafka 2.2. Separate any code which cannot easily be upgraded to separate build
target. ``PHAB_ID=D545900``

* inject-core: `c.t.inject.Injector` is now an abstract class. Use `Injector.apply` to create
a new instance (versus the `new Injector(...)` before). ``PHAB_ID=D543297``
Expand Down
12 changes: 12 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,10 @@ lazy val kafkaStreamsStaticPartitioning =
name := "finatra-kafka-streams-static-partitioning",
moduleName := "finatra-kafka-streams-static-partitioning",
ScoverageKeys.coverageExcludedPackages := "<empty>;.*",
unmanagedSourceDirectories in Compile += {
val sourceDir = (sourceDirectory in Compile).value
sourceDir / "scala-kafka2.2"
},
excludeDependencies in Test ++= kafkaStreamsExclusionRules,
excludeDependencies ++= kafkaStreamsExclusionRules,
excludeFilter in unmanagedResources := "BUILD",
Expand Down Expand Up @@ -1237,6 +1241,14 @@ lazy val kafkaStreams = (project in file("kafka-streams/kafka-streams"))
name := "finatra-kafka-streams",
moduleName := "finatra-kafka-streams",
ScoverageKeys.coverageExcludedPackages := "<empty>;.*",
unmanagedSourceDirectories in Compile += {
val sourceDir = (sourceDirectory in Compile).value
sourceDir / "scala-kafka2.2"
},
unmanagedSourceDirectories in Test += {
val testDir = (sourceDirectory in Test).value
testDir / "scala-kafka2.2"
},
libraryDependencies ++= Seq(
"com.twitter" %% "util-jvm" % versions.twLibVersion,
"it.unimi.dsi" % "fastutil" % versions.fastutil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import com.twitter.finatra.annotations.Experimental
import com.twitter.finatra.kafkastreams.KafkaStreamsTwitterServer
import com.twitter.finatra.kafkastreams.internal.utils.ReflectionUtils
import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning
import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils

import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.Metric
import org.apache.kafka.common.utils.Utils
Expand Down Expand Up @@ -53,7 +54,7 @@ trait PreRestoreState extends KafkaStreamsTwitterServer with StaticPartitioning
info(s"Pre-restore complete.")

//Reset the thread id and start Kafka Streams as if we weren't using pre-restore mode
resetStreamThreadId()
CompatibleUtils.resetStreamThreadId()
PreRestoreState.super.createAndStartKafkaStreams()
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -99,21 +100,6 @@ trait PreRestoreState extends KafkaStreamsTwitterServer with StaticPartitioning
properties
}

//HACK: Reset StreamThread's so the kafka broker doesn't see 2x the consumer client.ids (since thread number is part of client id)
private def resetStreamThreadId(): Unit = {
try {
val streamThreadClass = classOf[StreamThread]
val streamThreadIdSequenceField =
streamThreadClass.getDeclaredField("STREAM_THREAD_ID_SEQUENCE")
streamThreadIdSequenceField.setAccessible(true)
val streamThreadIdSequence = streamThreadIdSequenceField.get(null).asInstanceOf[AtomicInteger]
streamThreadIdSequence.set(1)
} catch {
case NonFatal(e) =>
error("Error resetting stream threads", e)
}
}

private def findRestoreConsumerLagMetrics(kafkaStreams: KafkaStreams): Seq[Metric] = {
for {
thread <- getThreads(kafkaStreams).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.twitter.finatra.kafkastreams.integration.wordcount
import com.twitter.conversions.DurationOps._
import com.twitter.finatra.http.EmbeddedHttpServer
import com.twitter.finatra.kafka.serde.ScalaSerdes
import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils
import com.twitter.finatra.kafkastreams.test.KafkaStreamsMultiServerFeatureTest
import com.twitter.util.{Await, Duration}
import org.apache.kafka.common.serialization.Serdes
Expand Down Expand Up @@ -64,7 +65,7 @@ class PreRestoreWordCountServerFeatureTest extends KafkaStreamsMultiServerFeatur
server.close()
Await.result(server.mainResult)
server.clearStats()
resetStreamThreadId()
CompatibleUtils.resetStreamThreadId()
}

private def testRestartWithoutPrerestore(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
java_library(
# only include with compiling with kafka2.2
sources = ["**/*.java"],
compiler_option_sets = [],
provides = artifact(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.twitter.finatra.kafkastreams.partitioning.internal

import java.util
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier

class StaticPartitioningKafkaClientSupplierSupplier(numApplicationInstances: Int)
class StaticPartitioningKafkaClientSupplierSupplier(
numApplicationInstances: Int,
serverConfig: String)
extends DefaultKafkaClientSupplier {

override def getConsumer(config: util.Map[String, AnyRef]): Consumer[Array[Byte], Array[Byte]] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.twitter.finatra.kafkastreams.partitioning.internal

import com.twitter.finatra.streams.queryable.thrift.domain.ServiceShardId
import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning
import org.apache.kafka.streams.processor.internals.assignment.ClientState
import org.apache.kafka.streams.state.HostInfo

case class ClientStateAndHostInfo[ID](id: ID, clientState: ClientState, hostInfo: HostInfo) {

val serviceShardId: ServiceShardId = {
StaticPartitioningStreamAssignor.parseShardId(hostInfo.host())
StaticPartitioning.parseShardId(hostInfo.host())
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.twitter.finatra.kafkastreams.partitioning.internal

import com.twitter.finatra.streams.queryable.thrift.domain.ServiceShardId
import com.twitter.finatra.streams.queryable.thrift.partitioning.StaticServiceShardPartitioner
import com.twitter.inject.Logging
import java.util
Expand All @@ -10,31 +9,10 @@ import org.apache.kafka.streams.processor.TaskId
import org.apache.kafka.streams.processor.internals.OverridableStreamsPartitionAssignor
import org.apache.kafka.streams.processor.internals.assignment.{ClientState, TaskAssignor}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

object StaticPartitioningStreamAssignor {
val StreamsPreRestoreConfig = "streams.prerestore"
val ApplicationNumInstances = "application.num.instances"

def parseShardId(applicationServerHost: String): ServiceShardId = {
val firstPeriodIndex = applicationServerHost.indexOf('.')

val shardId =
try {
applicationServerHost.substring(0, firstPeriodIndex).toInt
} catch {
case NonFatal(e) =>
throw new Exception(
"Finatra Kafka Stream's StaticPartitioning functionality requires the " +
"'kafka.application.server' flag value to be specified as '<kafka.current.shard>.<unused_hostname>:<unused_port>" +
" where unused_hostname can be empty and unused_port must be > 0. As an example, to configure the server" +
" that represents shard #5, you can set 'kafka.application.server=5.:80'. In this example, port 80 is unused and does not" +
" need to represent an actual open port"
)
}

ServiceShardId(shardId)
}
}

class StaticPartitioningStreamAssignor extends OverridableStreamsPartitionAssignor with Logging {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.twitter.finatra.kafkastreams.partitioning.internal

import java.util
import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier

class StaticPartitioningKafkaClientSupplierSupplier(
numApplicationInstances: Int,
serverConfig: String)
extends DefaultKafkaClientSupplier {

override def getConsumer(config: util.Map[String, AnyRef]): Consumer[Array[Byte], Array[Byte]] = {
val applicationServerHost = Utils.getHost(serverConfig)

val serviceShardId = StaticPartitioning.parseShardId(applicationServerHost)

config.put(
"group.instance.id",
serviceShardId.id.toString
)

super.getConsumer(config)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
scala_library(
sources = ["**/*.scala"],
sources = [
"../scala-kafka2.2/**/*.scala",
"com/**/*.scala",
],
compiler_option_sets = ["fatal_warnings"],
provides = scala_artifact(
org = "com.twitter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,32 @@ package com.twitter.finatra.kafkastreams.partitioning
import com.twitter.app.Flag
import com.twitter.finatra.kafkastreams.KafkaStreamsTwitterServer
import com.twitter.finatra.kafkastreams.partitioning.internal.StaticPartitioningKafkaClientSupplierSupplier
import com.twitter.finatra.streams.queryable.thrift.domain.ServiceShardId
import org.apache.kafka.streams.KafkaClientSupplier
import scala.util.control.NonFatal

object StaticPartitioning {
val PreRestoreSignalingPort = 0 //TODO: Hack to signal our assignor that we are in PreRestore mode

def parseShardId(applicationServerHost: String): ServiceShardId = {
val firstPeriodIndex = applicationServerHost.indexOf('.')

val shardId =
try {
applicationServerHost.substring(0, firstPeriodIndex).toInt
} catch {
case NonFatal(e) =>
throw new Exception(
"Finatra Kafka Stream's StaticPartitioning functionality requires the " +
"'kafka.application.server' flag value to be specified as '<kafka.current.shard>.<unused_hostname>:<unused_port>" +
" where unused_hostname can be empty and unused_port must be > 0. As an example, to configure the server" +
" that represents shard #5, you can set 'kafka.application.server=5.:80'. In this example, port 80 is unused and does not" +
" need to represent an actual open port"
)
}

ServiceShardId(shardId)
}
}

trait StaticPartitioning extends KafkaStreamsTwitterServer {
Expand All @@ -20,6 +42,8 @@ trait StaticPartitioning extends KafkaStreamsTwitterServer {
/* Protected */

override def kafkaStreamsClientSupplier: KafkaClientSupplier = {
new StaticPartitioningKafkaClientSupplierSupplier(numApplicationInstances())
new StaticPartitioningKafkaClientSupplierSupplier(
numApplicationInstances(),
applicationServerConfig())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.twitter.finatra.kafkastreams.internal.utils

import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.processor.{ProcessorContext, StateStore}
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
import org.apache.kafka.streams.processor.internals.StreamThread
import org.apache.kafka.streams.state.internals.WrappedStateStore

private[kafkastreams] object CompatibleUtils {

private val internalTopologyBuilderField =
ReflectionUtils.getFinalField(classOf[Topology], "internalTopologyBuilder")

def isStateless(topology: Topology): Boolean = {
val internalTopologyBuilder = getInternalTopologyBuilder(topology)

internalTopologyBuilder.getStateStores.isEmpty
}

private def getInternalTopologyBuilder(topology: Topology): InternalTopologyBuilder = {
internalTopologyBuilderField
.get(topology)
.asInstanceOf[InternalTopologyBuilder]
}

def getUnwrappedStateStore[K, V](name: String, processorContext: ProcessorContext): StateStore = {
val unwrappedStateStore = processorContext
.getStateStore(name).asInstanceOf[WrappedStateStore[_]]
.wrapped()

unwrappedStateStore
}

def resetStreamThreadId() = {
val streamThreadClass = classOf[StreamThread]
val streamThreadIdSequenceField = streamThreadClass
.getDeclaredField("STREAM_THREAD_ID_SEQUENCE")
streamThreadIdSequenceField.setAccessible(true)
val streamThreadIdSequence = streamThreadIdSequenceField.get(null).asInstanceOf[AtomicInteger]
streamThreadIdSequence.set(1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package org.apache.kafka.streams.state.internals
* Factory to allow us to call the package private RocksDBStore constructor
*/
object RocksDBStoreFactory {
def create(name: String): RocksDBStore = {
def create(name: String, metricScope: String): RocksDBStore = {
new RocksDBStore(name)
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package com.twitter.finatra.kafkastreams.internal.utils

import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.processor.{ProcessorContext, StateStore}
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
import org.apache.kafka.streams.state.internals.WrappedStateStore

private[kafkastreams] object TopologyReflectionUtils {
private[kafkastreams] object CompatibleUtils {

private val internalTopologyBuilderField =
ReflectionUtils.getFinalField(classOf[Topology], "internalTopologyBuilder")

def isStateless(topology: Topology): Boolean = {
val internalTopologyBuilder = getInternalTopologyBuilder(topology)

internalTopologyBuilder.getStateStores.isEmpty
internalTopologyBuilder.stateStores.isEmpty
}

private def getInternalTopologyBuilder(topology: Topology): InternalTopologyBuilder = {
internalTopologyBuilderField
.get(topology)
.asInstanceOf[InternalTopologyBuilder]
}

def getUnwrappedStateStore[K, V](name: String, processorContext: ProcessorContext): StateStore = {
val unwrappedStateStore = processorContext
.getStateStore(name).asInstanceOf[WrappedStateStore[_, K, V]]
.wrapped().asInstanceOf[StateStore]

unwrappedStateStore
}

def resetStreamThreadId() = {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.kafka.streams.state.internals

/**
* Factory to allow us to call the package private RocksDBStore constructor
*/
object RocksDBStoreFactory {
def create(name: String, metricScope: String): RocksDBStore = {
new RocksDBStore(name, metricScope)
}
}
5 changes: 3 additions & 2 deletions kafka-streams/kafka-streams/src/main/scala/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ jar_library(

scala_library(
sources = [
"../scala-kafka2.2/*.scala",
"com/twitter/finatra/kafkastreams/**/*.scala",
"com/twitter/finatra/streams/**/*.scala",
"org/**/*.scala",
"org/apache/kafka/streams/state/**/*.scala",
],
compiler_option_sets = ["fatal_warnings"],
provides = scala_artifact(
Expand All @@ -25,6 +25,7 @@ scala_library(
),
strict_deps = True,
dependencies = [
# NOTE: remove this direct rocksDB dependency when building 2.5
":rocksdb-5.14.2",
"3rdparty/jvm/it/unimi/dsi:fastutil",
"3rdparty/jvm/org/agrona",
Expand Down
Loading

0 comments on commit 3c78c34

Please sign in to comment.