Skip to content

Commit f46b5c8

Browse files
committed
SPARK-13252: Bump up Kafka to 0.9.0.0
1 parent 140ddef commit f46b5c8

File tree

5 files changed

+30
-25
lines changed

5 files changed

+30
-25
lines changed

external/kafka/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
<dependency>
5252
<groupId>org.apache.kafka</groupId>
5353
<artifactId>kafka_${scala.binary.version}</artifactId>
54-
<version>0.8.2.1</version>
54+
<version>0.9.0.0</version>
5555
<exclusions>
5656
<exclusion>
5757
<groupId>com.sun.jmx</groupId>

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import kafka.api.Request
3333
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
3434
import kafka.serializer.StringEncoder
3535
import kafka.server.{KafkaConfig, KafkaServer}
36-
import kafka.utils.{ZKStringSerializer, ZkUtils}
37-
import org.I0Itec.zkclient.ZkClient
36+
import kafka.utils.ZkUtils
37+
import org.apache.kafka.common.security.JaasUtils
3838
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
3939

4040
import org.apache.spark.{Logging, SparkConf}
@@ -57,7 +57,7 @@ private[kafka] class KafkaTestUtils extends Logging {
5757

5858
private var zookeeper: EmbeddedZookeeper = _
5959

60-
private var zkClient: ZkClient = _
60+
private var zkUtils: ZkUtils = _
6161

6262
// Kafka broker related configurations
6363
private val brokerHost = "localhost"
@@ -84,9 +84,9 @@ private[kafka] class KafkaTestUtils extends Logging {
8484
s"$brokerHost:$brokerPort"
8585
}
8686

87-
def zookeeperClient: ZkClient = {
87+
def zookeeperUtils: ZkUtils = {
8888
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
89-
Option(zkClient).getOrElse(
89+
Option(zkUtils).getOrElse(
9090
throw new IllegalStateException("Zookeeper client is not yet initialized"))
9191
}
9292

@@ -96,8 +96,8 @@ private[kafka] class KafkaTestUtils extends Logging {
9696
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
9797
// Get the actual zookeeper binding port
9898
zkPort = zookeeper.actualPort
99-
zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
100-
ZKStringSerializer)
99+
val zkClient = ZkUtils.createZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout)
100+
zkUtils = ZkUtils(zkClient, JaasUtils.isZkSecurityEnabled())
101101
zkReady = true
102102
}
103103

@@ -140,9 +140,9 @@ private[kafka] class KafkaTestUtils extends Logging {
140140

141141
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
142142

143-
if (zkClient != null) {
144-
zkClient.close()
145-
zkClient = null
143+
if (zkUtils != null) {
144+
zkUtils.close()
145+
zkUtils = null
146146
}
147147

148148
if (zookeeper != null) {
@@ -153,7 +153,7 @@ private[kafka] class KafkaTestUtils extends Logging {
153153

154154
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
155155
def createTopic(topic: String): Unit = {
156-
AdminUtils.createTopic(zkClient, topic, 1, 1)
156+
AdminUtils.createTopic(zkUtils, topic, 1, 1)
157157
// wait until metadata is propagated
158158
waitUntilMetadataIsPropagated(topic, 0)
159159
}
@@ -234,7 +234,7 @@ private[kafka] class KafkaTestUtils extends Logging {
234234
case Some(partitionState) =>
235235
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
236236

237-
ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
237+
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
238238
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
239239
leaderAndInSyncReplicas.isr.size >= 1
240240

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import kafka.common.TopicAndPartition
2727
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
2828
import kafka.message.MessageAndMetadata
2929
import kafka.serializer.Decoder
30-
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
31-
import org.I0Itec.zkclient.ZkClient
30+
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZkUtils}
31+
import org.apache.kafka.common.security.JaasUtils
3232

3333
import org.apache.spark.{Logging, SparkEnv}
3434
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
@@ -65,8 +65,8 @@ class ReliableKafkaReceiver[
6565
/** High level consumer to connect to Kafka. */
6666
private var consumerConnector: ConsumerConnector = null
6767

68-
/** zkClient to connect to Zookeeper to commit the offsets. */
69-
private var zkClient: ZkClient = null
68+
/** zkUtils to connect to Zookeeper to commit the offsets. */
69+
private var zkUtils: ZkUtils = null
7070

7171
/**
7272
* A HashMap to manage the offset for each topic/partition, this HashMap is called in
@@ -118,8 +118,9 @@ class ReliableKafkaReceiver[
118118
consumerConnector = Consumer.create(consumerConfig)
119119
logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
120120

121-
zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
122-
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
121+
val zkClient = ZkUtils.createZkClient(consumerConfig.zkConnect, consumerConfig
122+
.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs)
123+
zkUtils = ZkUtils(zkClient, JaasUtils.isZkSecurityEnabled())
123124

124125
messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
125126
topics.values.sum, "KafkaMessageHandler")
@@ -155,9 +156,9 @@ class ReliableKafkaReceiver[
155156
consumerConnector = null
156157
}
157158

158-
if (zkClient != null) {
159-
zkClient.close()
160-
zkClient = null
159+
if (zkUtils != null) {
160+
zkUtils.close()
161+
zkUtils = null
161162
}
162163

163164
if (blockGenerator != null) {
@@ -233,7 +234,7 @@ class ReliableKafkaReceiver[
233234
* metadata schema in Zookeeper.
234235
*/
235236
private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
236-
if (zkClient == null) {
237+
if (zkUtils == null) {
237238
val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
238239
stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
239240
return
@@ -244,7 +245,7 @@ class ReliableKafkaReceiver[
244245
val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
245246
val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
246247

247-
ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
248+
zkUtils.updatePersistentPath(zkPath, offset.toString)
248249
} catch {
249250
case e: Exception =>
250251
logWarning(s"Exception during commit offset $offset for topic" +

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,6 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
143143
private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
144144
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
145145
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
146-
ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong)
146+
kafkaTestUtils.zookeeperUtils.readDataMaybeNull(zkPath)._1.map(_.toLong)
147147
}
148148
}

project/MimaExcludes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,10 @@ object MimaExcludes {
231231
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metadataCleaner"),
232232
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint"),
233233
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint")
234+
) ++ Seq(
235+
// SPARK-13252 Bump up Kafka to 0.9.0.0
236+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.zookeeperClient"),
237+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$zkClient")
234238
)
235239
case v if v.startsWith("1.6") =>
236240
Seq(

0 commit comments

Comments
 (0)