diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 68d52e9339b3..ef32b2bbff55 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -51,7 +51,7 @@ org.apache.kafka kafka_${scala.binary.version} - 0.8.2.1 + 0.9.0.0 com.sun.jmx diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index a76fa6671a4b..2063996f2842 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -33,8 +33,8 @@ import kafka.api.Request import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{ZKStringSerializer, ZkUtils} -import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZkUtils +import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.{Logging, SparkConf} @@ -57,7 +57,7 @@ private[kafka] class KafkaTestUtils extends Logging { private var zookeeper: EmbeddedZookeeper = _ - private var zkClient: ZkClient = _ + private var zkUtils: ZkUtils = _ // Kafka broker related configurations private val brokerHost = "localhost" @@ -84,9 +84,9 @@ private[kafka] class KafkaTestUtils extends Logging { s"$brokerHost:$brokerPort" } - def zookeeperClient: ZkClient = { + def zookeeperUtils: ZkUtils = { assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") - Option(zkClient).getOrElse( + Option(zkUtils).getOrElse( throw new IllegalStateException("Zookeeper client is not yet initialized")) } @@ -96,8 +96,8 @@ private[kafka] class KafkaTestUtils extends Logging { zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port zkPort = zookeeper.actualPort - zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, - ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout) + zkUtils = ZkUtils(zkClient, JaasUtils.isZkSecurityEnabled()) zkReady = true } @@ -140,9 +140,9 @@ private[kafka] class KafkaTestUtils extends Logging { brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } - if (zkClient != null) { - zkClient.close() - zkClient = null + if (zkUtils != null) { + zkUtils.close() + zkUtils = null } if (zookeeper != null) { @@ -153,7 +153,7 @@ private[kafka] class KafkaTestUtils extends Logging { /** Create a Kafka topic and wait until it is propagated to the whole cluster */ def createTopic(topic: String): Unit = { - AdminUtils.createTopic(zkClient, topic, 1, 1) + AdminUtils.createTopic(zkUtils, topic, 1, 1) // wait until metadata is propagated waitUntilMetadataIsPropagated(topic, 0) } @@ -234,7 +234,7 @@ private[kafka] class KafkaTestUtils extends Logging { case Some(partitionState) => val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr - ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined && + zkUtils.getLeaderForPartition(topic, partition).isDefined && Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && leaderAndInSyncReplicas.isr.size >= 1 diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index a872781b78ee..0c8c01bcc4bc 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -27,8 +27,8 @@ import kafka.common.TopicAndPartition import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} import kafka.message.MessageAndMetadata import kafka.serializer.Decoder -import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils} -import org.I0Itec.zkclient.ZkClient +import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZkUtils} +import org.apache.kafka.common.security.JaasUtils import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.storage.{StorageLevel, StreamBlockId} @@ -65,8 +65,8 @@ class ReliableKafkaReceiver[ /** High level consumer to connect to Kafka. */ private var consumerConnector: ConsumerConnector = null - /** zkClient to connect to Zookeeper to commit the offsets. */ - private var zkClient: ZkClient = null + /** zkUtils to connect to Zookeeper to commit the offsets. */ + private var zkUtils: ZkUtils = null /** * A HashMap to manage the offset for each topic/partition, this HashMap is called in @@ -118,8 +118,9 @@ class ReliableKafkaReceiver[ consumerConnector = Consumer.create(consumerConfig) logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") - zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, - consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(consumerConfig.zkConnect, consumerConfig + .zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs) + zkUtils = ZkUtils(zkClient, JaasUtils.isZkSecurityEnabled()) messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( topics.values.sum, "KafkaMessageHandler") @@ -155,9 +156,9 @@ class ReliableKafkaReceiver[ consumerConnector = null } - if (zkClient != null) { - zkClient.close() - zkClient = null + if (zkUtils != null) { + zkUtils.close() + zkUtils = null } if (blockGenerator != null) { @@ -233,7 +234,7 @@ class ReliableKafkaReceiver[ * metadata schema in Zookeeper. */ private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { - if (zkClient == null) { + if (zkUtils == null) { val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) return @@ -244,7 +245,7 @@ class ReliableKafkaReceiver[ val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}" - ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) + zkUtils.updatePersistentPath(zkPath, offset.toString) } catch { case e: Exception => logWarning(s"Exception during commit offset $offset for topic" + diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 7b9aee39ffb7..a2706ecb9a81 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -25,7 +25,7 @@ import scala.language.postfixOps import scala.util.Random import kafka.serializer.StringDecoder -import kafka.utils.{ZKGroupTopicDirs, ZkUtils} +import kafka.utils.{ZKGroupTopicDirs} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.concurrent.Eventually @@ -143,6 +143,6 @@ class ReliableKafkaStreamSuite extends SparkFunSuite private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = { val topicDirs = new ZKGroupTopicDirs(groupId, topic) val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" - ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong) + kafkaTestUtils.zookeeperUtils.readDataMaybeNull(zkPath)._1.map(_.toLong) } } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 14e3c90f1b0d..ea271940a1db 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -281,6 +281,10 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry$") + ) ++ Seq( + // SPARK-13252 Bump up Kafka to 0.9.0.0 + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.zookeeperClient"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$zkClient") ) case v if v.startsWith("1.6") => Seq(