@@ -22,28 +22,29 @@ import scala.util.Random
2222import kafka .serializer .StringDecoder
2323import kafka .common .TopicAndPartition
2424import kafka .message .MessageAndMetadata
25- import org .scalatest .BeforeAndAfter
25+ import org .scalatest .BeforeAndAfterAll
2626
2727import org .apache .spark ._
2828import org .apache .spark .SparkContext ._
2929
30- class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
30+ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
31+ val sparkConf = new SparkConf ().setMaster(" local[4]" ).setAppName(this .getClass.getSimpleName)
3132 var sc : SparkContext = _
32- before {
33+ override def beforeAll {
34+ sc = new SparkContext (sparkConf)
35+
3336 setupKafka()
3437 }
3538
36- after {
39+ override def afterAll {
3740 if (sc != null ) {
3841 sc.stop
3942 sc = null
4043 }
4144 tearDownKafka()
4245 }
4346
44- test(" Kafka RDD basic usage" ) {
45- val sparkConf = new SparkConf ().setMaster(" local[4]" ).setAppName(this .getClass.getSimpleName)
46- sc = new SparkContext (sparkConf)
47+ test(" basic usage" ) {
4748 val topic = " topicbasic"
4849 createTopic(topic)
4950 val messages = Set (" the" , " quick" , " brown" , " fox" )
@@ -62,11 +63,8 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
6263 assert(received === messages)
6364 }
6465
65- test(" Kafka RDD integration " ) {
66+ test(" iterator boundary conditions " ) {
6667 // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
67-
68- val sparkConf = new SparkConf ().setMaster(" local[4]" ).setAppName(this .getClass.getSimpleName)
69- sc = new SparkContext (sparkConf)
7068 val topic = " topic1"
7169 val sent = Map (" a" -> 5 , " b" -> 3 , " c" -> 10 )
7270 createTopic(topic)
@@ -111,26 +109,29 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
111109 // get an rdd from the committed consumer offsets until the latest leader offsets,
112110 private def getRdd (kc : KafkaCluster , topics : Set [String ]) = {
113111 val groupId = kc.kafkaParams(" group.id" )
114- for {
115- topicPartitions <- kc.getPartitions(topics).right.toOption
116- from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
112+ def consumerOffsets (topicPartitions : Set [TopicAndPartition ]) = {
113+ kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
117114 kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
118115 offs.map(kv => kv._1 -> kv._2.offset)
119116 }
120117 )
121- until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
122- } yield {
123- val leaders = until.map { case (tp, lo) =>
124- tp -> Broker (lo.host, lo.port)
125- }.toMap
126- val offsetRanges = from.map { case (tp, f) =>
127- val u = until(tp)
128- OffsetRange (tp.topic, tp.partition, f, u.offset)
129- }.toArray
130-
131- KafkaUtils .createRDD[String , String , StringDecoder , StringDecoder , String ](
132- sc, kc.kafkaParams, offsetRanges, leaders,
133- (mmd : MessageAndMetadata [String , String ]) => s " ${mmd.offset} ${mmd.message}" )
118+ }
119+ kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
120+ consumerOffsets(topicPartitions).flatMap { from =>
121+ kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until =>
122+ val offsetRanges = from.map { case (tp : TopicAndPartition , fromOffset : Long ) =>
123+ OffsetRange (tp.topic, tp.partition, fromOffset, until(tp).offset)
124+ }.toArray
125+
126+ val leaders = until.map { case (tp : TopicAndPartition , lo : KafkaCluster .LeaderOffset ) =>
127+ tp -> Broker (lo.host, lo.port)
128+ }.toMap
129+
130+ KafkaUtils .createRDD[String , String , StringDecoder , StringDecoder , String ](
131+ sc, kc.kafkaParams, offsetRanges, leaders,
132+ (mmd : MessageAndMetadata [String , String ]) => s " ${mmd.offset} ${mmd.message}" )
133+ }
134+ }
134135 }
135136 }
136137}
0 commit comments