Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.2.1</version>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import kafka.api.Request
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import kafka.utils.ZkUtils
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}

import org.apache.spark.{Logging, SparkConf}
Expand All @@ -57,7 +57,7 @@ private[kafka] class KafkaTestUtils extends Logging {

private var zookeeper: EmbeddedZookeeper = _

private var zkClient: ZkClient = _
private var zkUtils: ZkUtils = _

// Kafka broker related configurations
private val brokerHost = "localhost"
Expand All @@ -84,9 +84,9 @@ private[kafka] class KafkaTestUtils extends Logging {
s"$brokerHost:$brokerPort"
}

def zookeeperClient: ZkClient = {
def zookeeperUtils: ZkUtils = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
Option(zkClient).getOrElse(
Option(zkUtils).getOrElse(
throw new IllegalStateException("Zookeeper client is not yet initialized"))
}

Expand All @@ -96,8 +96,8 @@ private[kafka] class KafkaTestUtils extends Logging {
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout)
zkUtils = ZkUtils(zkClient, JaasUtils.isZkSecurityEnabled())
zkReady = true
}

Expand Down Expand Up @@ -140,9 +140,9 @@ private[kafka] class KafkaTestUtils extends Logging {

brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }

if (zkClient != null) {
zkClient.close()
zkClient = null
if (zkUtils != null) {
zkUtils.close()
zkUtils = null
}

if (zookeeper != null) {
Expand All @@ -153,7 +153,7 @@ private[kafka] class KafkaTestUtils extends Logging {

/** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String): Unit = {
AdminUtils.createTopic(zkClient, topic, 1, 1)
AdminUtils.createTopic(zkUtils, topic, 1, 1)
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
}
Expand Down Expand Up @@ -234,7 +234,7 @@ private[kafka] class KafkaTestUtils extends Logging {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr

ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
leaderAndInSyncReplicas.isr.size >= 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import kafka.common.TopicAndPartition
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.security.JaasUtils

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
Expand Down Expand Up @@ -65,8 +65,8 @@ class ReliableKafkaReceiver[
/** High level consumer to connect to Kafka. */
private var consumerConnector: ConsumerConnector = null

/** zkClient to connect to Zookeeper to commit the offsets. */
private var zkClient: ZkClient = null
/** zkUtils to connect to Zookeeper to commit the offsets. */
private var zkUtils: ZkUtils = null

/**
* A HashMap to manage the offset for each topic/partition, this HashMap is called in
Expand Down Expand Up @@ -118,8 +118,9 @@ class ReliableKafkaReceiver[
consumerConnector = Consumer.create(consumerConfig)
logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")

zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
val zkClient = ZkUtils.createZkClient(consumerConfig.zkConnect, consumerConfig
.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs)
zkUtils = ZkUtils(zkClient, JaasUtils.isZkSecurityEnabled())

messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
topics.values.sum, "KafkaMessageHandler")
Expand Down Expand Up @@ -155,9 +156,9 @@ class ReliableKafkaReceiver[
consumerConnector = null
}

if (zkClient != null) {
zkClient.close()
zkClient = null
if (zkUtils != null) {
zkUtils.close()
zkUtils = null
}

if (blockGenerator != null) {
Expand Down Expand Up @@ -233,7 +234,7 @@ class ReliableKafkaReceiver[
* metadata schema in Zookeeper.
*/
private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
if (zkClient == null) {
if (zkUtils == null) {
val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
return
Expand All @@ -244,7 +245,7 @@ class ReliableKafkaReceiver[
val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"

ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
zkUtils.updatePersistentPath(zkPath, offset.toString)
} catch {
case e: Exception =>
logWarning(s"Exception during commit offset $offset for topic" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.postfixOps
import scala.util.Random

import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import kafka.utils.{ZKGroupTopicDirs}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.Eventually

Expand Down Expand Up @@ -143,6 +143,6 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong)
kafkaTestUtils.zookeeperUtils.readDataMaybeNull(zkPath)._1.map(_.toLong)
}
}
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry$")
) ++ Seq(
// SPARK-13252 Bump up Kafka to 0.9.0.0
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.zookeeperClient"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$zkClient")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down