From bb9f635a556bc1ed2f9fd9593a5a3f2c6af62cf9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 30 Jul 2016 13:19:48 +0900 Subject: [PATCH 1/3] Remove deprecated API in KafkaTestUtils --- .../streaming/kafka010/KafkaTestUtils.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 19192e4b9594..75f4f423227c 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -30,10 +30,10 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request -import kafka.producer.{KeyedMessage, Producer, ProducerConfig} -import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.ZkUtils +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.StringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf @@ -68,7 +68,7 @@ private[kafka010] class KafkaTestUtils extends Logging { private var server: KafkaServer = _ // Kafka producer - private var producer: Producer[String, String] = _ + private var producer: KafkaProducer[String, String] = _ // Flag to test whether the system is correctly started private var zkReady = false @@ -178,8 +178,9 @@ private[kafka010] class KafkaTestUtils extends Logging { /** Send the array of messages to the Kafka broker */ def sendMessages(topic: String, messages: Array[String]): Unit = { - producer = new Producer[String, String](new ProducerConfig(producerConfiguration)) - producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) + producer = new KafkaProducer[String, String](producerConfiguration) + val records = messages.map { new ProducerRecord[String, String](topic, _) } + records.map(producer.send) producer.close() producer = null } @@ -198,10 +199,12 @@ private[kafka010] class KafkaTestUtils extends Logging { private def producerConfiguration: Properties = { val props = new Properties() - props.put("metadata.broker.list", brokerAddress) - props.put("serializer.class", classOf[StringEncoder].getName) + props.put("bootstrap.servers", brokerAddress) + props.put("value.serializer", classOf[StringSerializer].getName) + // Key serializer is required. + props.put("key.serializer", classOf[StringSerializer].getName) // wait for all in-sync replicas to ack sends - props.put("request.required.acks", "-1") + props.put("acks", "all") props } From b7a53f2902f5de0c4bca1dbaa76e4c70bfd92b86 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 30 Jul 2016 15:31:24 +0900 Subject: [PATCH 2/3] Use foreach instead of map --- .../org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 75f4f423227c..f94fbc40f4a7 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -180,7 +180,7 @@ private[kafka010] class KafkaTestUtils extends Logging { def sendMessages(topic: String, messages: Array[String]): Unit = { producer = new KafkaProducer[String, String](producerConfiguration) val records = messages.map { new ProducerRecord[String, String](topic, _) } - records.map(producer.send) + records.foreach(producer.send) producer.close() producer = null } From a1ef31129faa3c571bfd58c196ce3757eec890b8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 30 Jul 2016 23:08:17 +0900 Subject: [PATCH 3/3] Use only foreach --- .../org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index f94fbc40f4a7..ecabe1c365b4 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -179,8 +179,9 @@ private[kafka010] class KafkaTestUtils extends Logging { /** Send the array of messages to the Kafka broker */ def sendMessages(topic: String, messages: Array[String]): Unit = { producer = new KafkaProducer[String, String](producerConfiguration) - val records = messages.map { new ProducerRecord[String, String](topic, _) } - records.foreach(producer.send) + messages.foreach { message => + producer.send(new ProducerRecord[String, String](topic, message)) + } producer.close() producer = null }