Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
cff3a44
[SPARK-12177] [STREAMING] Update KafkaDStreams to new Kafka 0.9 Consu…
nikit-os Jan 10, 2016
af0c7b9
Getting rid of an extra dependency
markgrover Jan 7, 2016
ff0a427
Merge pull request #1 from markgrover/kafka09-integration
nikit-os Jan 11, 2016
e5f638f
Fix Kafka params in Java example
nikit-os Jan 11, 2016
e0f1275
Removing Broker.scala since it's not needed
markgrover Jan 19, 2016
71ea192
Scala style related changes to not exceed the line length
markgrover Jan 19, 2016
6d27bbd
Formatting changes for styling, also updating scaladoc for functions.
markgrover Jan 19, 2016
6960736
Adding missing leaderhost parameter to various methods in OffsetRange…
markgrover Jan 19, 2016
bb8cef8
Fixing a compilation failure related to previous commit
markgrover Jan 20, 2016
90b80da
Merge pull request #2 from markgrover/kafka09-integration
nikit-os Jan 20, 2016
4f56ffe
Refactoring Java/Scala code to have use the package name newapi inste…
markgrover Jan 26, 2016
d5b7b56
Fixing a scalastyle error
markgrover Jan 26, 2016
e2402f8
Merge branch 'master' into kafka09-integration
markgrover Jan 26, 2016
eedbddc
Fixing scalastyle errors related to import ordering
markgrover Jan 26, 2016
adfd335
Fixing a compilation error
markgrover Jan 27, 2016
a0f052a
More import ordering fixes
markgrover Jan 27, 2016
d2bc2a9
Function declaration formatting in KafkaUtils and other minor formatt…
markgrover Jan 27, 2016
c530fa6
Removing support for Kafka 0.8 completely. You can use the new api or…
markgrover Jan 27, 2016
0763979
Making the new api KafkaTestUtils to not use zkClient explicitly
markgrover Jan 27, 2016
b35ea65
Removing the separate copy of KafkaTestUtils
markgrover Jan 27, 2016
b3f163e
Fixing a scalastyle error
markgrover Jan 27, 2016
41a8816
Adding excludes for binary compat testing. The changes were for 2.0 a…
markgrover Jan 29, 2016
c5657f2
Implementing some of the review feedback from Mario
markgrover Jan 29, 2016
b9cde40
More feedback implemented from Mario
markgrover Jan 30, 2016
82223f8
Getting rid of some duplicate code
markgrover Jan 30, 2016
b3f90cb
Merge branch 'master' into kafka09-integration
markgrover Feb 1, 2016
d9c3b2a
Getting rid of extra whitespace
markgrover Feb 1, 2016
4d4220f
Merge branch 'master' into kafka09-integration
markgrover Feb 2, 2016
d932d75
Updating the new kafka-newapi module to be 2.11 based as well
markgrover Feb 2, 2016
6cd558a
Update KafkaRDD.scala
mariobriggs Feb 2, 2016
903a8b8
Correcting the package name in the wordcount example
markgrover Feb 2, 2016
1f77501
Getting rid of extra parameter of batch interval in examples. Paramet…
markgrover Feb 2, 2016
fd7df3e
Adding a change that will conflict with Mario's change for now
markgrover Feb 2, 2016
8fae471
Merge pull request #2 from mariobriggs/patch-1
markgrover Feb 2, 2016
7dcb848
Changes related to poll time.
markgrover Feb 2, 2016
edfca9f
Updating the import ordering
markgrover Feb 2, 2016
e20dfd0
Fixing up some minor things in the Example
markgrover Feb 3, 2016
424b8f8
Correcting comment
markgrover Feb 3, 2016
2842273
Reintroducing if loop because some times buffer may return empty
markgrover Feb 3, 2016
10a18b8
WIP regarding getting rid of spark.kafka.poll.time in Kafka Params
markgrover Feb 3, 2016
971d74e
WIP refactor of old KafkaRDD
markgrover Feb 5, 2016
b881d39
With this change, the only files that still need to be taken care of …
markgrover Feb 5, 2016
1de98c4
Everything seems to be refactored except KafkaUtils and Java code
markgrover Feb 5, 2016
58e0e01
Refactoring of Kafka Utils. Left is java code, test code, and example…
markgrover Feb 5, 2016
839be24
Complete refactor of KafkaUtils for Scala and potentially Java and ch…
markgrover Feb 5, 2016
4be1cc0
Refactor of DirectKafkaStreamSuite
markgrover Feb 5, 2016
ab69c10
Taken care of KafkaClusterSuite
markgrover Feb 5, 2016
dccb0f9
KafkaRDDSuite is done too. Now Java code and Java tests are the only …
markgrover Feb 6, 2016
0ff2b14
Fixing examples pom file to not have a reference to the old package name
markgrover Feb 6, 2016
2c4ae74
Adding Java tests to the original place
markgrover Feb 6, 2016
647ec61
Adding the Java example. It should compile but it doesn't seem to be …
markgrover Feb 6, 2016
2089b07
Fixing some test bugs and adding some improvements related to clean up
markgrover Feb 7, 2016
a4825c0
Selective revert of the previous patch since the randoms weren't need…
markgrover Feb 7, 2016
f773a32
Test improvements. Now we are only down to only the new tests failing
markgrover Feb 7, 2016
c32518e
Fixing scala style but also minor bug fix in OffsetRanges and polltime
markgrover Feb 7, 2016
cc95e38
Finally figured out how to read the configuration variable from withi…
markgrover Feb 7, 2016
f195ec2
Adding ignore for a test that was failing. This is because of a kafka…
markgrover Feb 7, 2016
f52296c
Adding some extra functions in OffsetRanges
markgrover Feb 8, 2016
1c772fd
Adding excludes for binary compatibility
markgrover Feb 8, 2016
feb6993
Getting rid of unnecessary getPartitions method in the base class, ma…
markgrover Feb 8, 2016
f0e3499
Fixing a Java test bug
markgrover Feb 8, 2016
ec4b4c5
WIP - offset recovery test is failing but this is testing to have no …
markgrover Feb 8, 2016
abd167c
Removing all the extra separate subproject code
markgrover Feb 8, 2016
15d217c
WIP - offset recovery test is failing but this is testing to have no …
markgrover Feb 8, 2016
846a85a
Merge branch 'master' into kafka09-integration
markgrover Feb 8, 2016
eb6ea49
Merge branch 'kafka09-integration' into kafka09-integration-refactor-…
markgrover Feb 9, 2016
1f45b96
Merge branch 'kafka09-integration-refactor-params' of github.com:mark…
markgrover Feb 9, 2016
b2f13ba
Fixing the test by clearing the collection
markgrover Feb 9, 2016
7081b97
Updating the excludes
markgrover Feb 9, 2016
9983e7d
Making NewKafkaCluster class public
markgrover Feb 9, 2016
c422cd5
Merge branch 'master' into kafka09-integration
markgrover Feb 12, 2016
075226e
Merge branch 'master' into kafka09-integration
markgrover Mar 2, 2016
993c6fd
Taking the SNAPSHOT out since 0.9.0.1 has now been released
markgrover Mar 2, 2016
229b773
Getting rid of TODOs since Kafka 0.9.0.1 is now released
markgrover Mar 3, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.regex.Pattern;

import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: JavaNewDirectKafkaWordCount <brokers> <topics>
* <brokers> is a list of one or more Kafka brokers
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
* $ bin/run-example streaming.JavaNewDirectKafkaWordCount
* broker1-host:port,broker2-host:port topic1,topic2
*/

public final class JavaNewDirectKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: JavaNewDirectKafkaWordCount <brokers> <topics>\n" +
" <brokers> is a list of one or more Kafka brokers\n" +
" <topics> is a list of one or more kafka topics to consume from\n\n");
System.exit(1);
}

// StreamingExamples.setStreamingLogLevels();

String brokers = args[0];
String topics = args[1];

// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("Direct Kafka Java Wordcount (New Consumer " +
"API)");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");

// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createNewDirectStream(
jssc,
String.class,
String.class,
kafkaParams,
topicsSet
);

// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Lists.newArrayList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();

// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.kafka.clients.consumer.ConsumerConfig

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: DirectNewKafkaWordCount <brokers> <topics>
* <brokers> is a list of one or more Kafka brokers
* <topics> is a list of one or more kafka topics to consume from
* <groupId> is the name of kafka consumer group
* <auto.offset.reset> What to do when there is no initial offset in Kafka or
* if the current offset does not exist any more on the server
* earliest: automatically reset the offset to the earliest offset
* latest: automatically reset the offset to the latest offset
* Example:
* $ bin/run-example streaming.DirectNewKafkaWordCount broker1-host:port,broker2-host:port \
* topic1,topic2 my-consumer-group earliest
*
*/
object DirectNewKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println(s"""
|Usage: DirectNewKafkaWordCount <brokers> <topics> <groupId> \
|<auto.offset.reset>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <groupId> is the name of kafka consumer group (can be arbitrary)
| <auto.offset.reset> What to do when there is no initial offset
| in Kafka or if the current offset does not exist
| any more on the server
| earliest: automatically reset the offset
| to the earliest offset
| latest: automatically reset the offset
| to the latest offset
""".stripMargin)
System.exit(1)
}

// StreamingExamples.setStreamingLogLevels()

val Array(brokers, topics, groupId, offsetReset) = args

// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("Direct Kafka Wordcount (New Consumer API)")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
)
val messages = KafkaUtils.createNewDirectStream[String, String](ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println
2 changes: 1 addition & 1 deletion external/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.2.1</version>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ import kafka.serializer.Decoder
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator

/**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
Expand Down Expand Up @@ -62,59 +59,19 @@ class DirectKafkaInputDStream[
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](_ssc) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
) extends DirectKafkaInputDStreamBase[K, V, U, T, R] (_ssc, kafkaParams, fromOffsets) {

// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
private[streaming] override def name: String = s"Kafka direct stream [$id]"

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData


/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override protected[streaming] val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new DirectKafkaRateController(id,
RateEstimator.create(ssc.conf, context.graph.batchDuration)))
} else {
None
}
}

protected val kc = new KafkaCluster(kafkaParams)

private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
protected def maxMessagesPerPartition: Option[Long] = {
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
val numPartitions = currentOffsets.keys.size

val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
.map { limit =>
if (maxRateLimitPerPartition > 0) {
Math.min(maxRateLimitPerPartition, (limit / numPartitions))
} else {
limit / numPartitions
}
}.getOrElse(maxRateLimitPerPartition)

if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
} else {
None
}
}

protected var currentOffsets = fromOffsets

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
protected final override def latestLeaderOffsets(retries: Int): Map[TopicAndPartition,
LeaderOffset] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
Expand All @@ -131,48 +88,10 @@ class DirectKafkaInputDStream[
}
}

// limits the maximum number of messages per partition
protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { case (tp, lo) =>
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
}
}.getOrElse(leaderOffsets)
}

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

// Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = currentOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}

override def start(): Unit = {
}

def stop(): Unit = {
override def getRdd(untilOffsets: Map[TopicAndPartition, LeaderOffset]): KafkaRDD[K, V, U, T,
R] = {
KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
messageHandler)
}

private[streaming]
Expand Down Expand Up @@ -204,11 +123,4 @@ class DirectKafkaInputDStream[
}
}

/**
* A RateController to retrieve the rate from RateEstimator.
*/
private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit = ()
}
}
Loading