diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md
index de95ea90137e..c1ef396907db 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -8,9 +8,9 @@ The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 [
### Linking
For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
- groupId = org.apache.spark
- artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
- version = {{site.SPARK_VERSION_SHORT}}
+ groupId = org.apache.spark
+ artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
### Creating a Direct Stream
Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010
@@ -44,6 +44,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea
Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
+ import java.util.*;
+ import org.apache.spark.SparkConf;
+ import org.apache.spark.TaskContext;
+ import org.apache.spark.api.java.*;
+ import org.apache.spark.api.java.function.*;
+ import org.apache.spark.streaming.api.java.*;
+ import org.apache.spark.streaming.kafka010.*;
+ import org.apache.kafka.clients.consumer.ConsumerRecord;
+ import org.apache.kafka.common.TopicPartition;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import scala.Tuple2;
+
+ Map kafkaParams = new HashMap<>();
+ kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
+ kafkaParams.put("key.deserializer", StringDeserializer.class);
+ kafkaParams.put("value.deserializer", StringDeserializer.class);
+ kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
+ kafkaParams.put("auto.offset.reset", "latest");
+ kafkaParams.put("enable.auto.commit", false);
+
+ Collection topics = Arrays.asList("topicA", "topicB");
+
+ final JavaInputDStream> stream =
+ KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.Subscribe(topics, kafkaParams)
+ );
+
+ stream.mapToPair(
+ new PairFunction, String, String>() {
+ @Override
+ public Tuple2 call(ConsumerRecord record) {
+ return new Tuple2<>(record.key(), record.value());
+ }
+ })
@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch processing, you can create
+ // Import dependencies and create kafka params as in Create Direct Stream above
+
+ OffsetRange[] offsetRanges = {
+ // topic, partition, inclusive starting offset, exclusive ending offset
+ OffsetRange.create("test", 0, 0, 100),
+ OffsetRange.create("test", 1, 0, 100)
+ };
+
+ JavaRDD> rdd = KafkaUtils.createRDD(
+ sparkContext,
+ kafkaParams,
+ offsetRanges,
+ LocationStrategies.PreferConsistent()
+ );
@@ -103,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no
}
+ stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+ final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ rdd.foreachPartition(new VoidFunction>>() {
+ @Override
+ public void call(Iterator> consumerRecords) {
+ OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
+ }
+ });
+ }
+ });
@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in a special Kafka topic. By
stream.foreachRDD { rdd =>
- val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
- stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
+ stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.
+ stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+
+ // some time later, after outputs have completed
+ ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
+ }
+ });
@@ -141,7 +214,7 @@ For data stores that support transactions, saving offsets in the same transactio
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
- new TopicPartition(resultSet.string("topic")), resultSet.int("partition")) -> resultSet.long("offset")
+ new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
@@ -155,16 +228,46 @@ For data stores that support transactions, saving offsets in the same transactio
val results = yourCalculation(rdd)
- yourTransactionBlock {
- // update results
+ // begin your transaction
- // update offsets where the end of existing offsets matches the beginning of this batch of offsets
+ // update results
+ // update offsets where the end of existing offsets matches the beginning of this batch of offsets
+ // assert that offsets were updated correctly
- // assert that offsets were updated correctly
- }
+ // end your transaction
}
+ // The details depend on your data store, but the general idea looks like this
+
+ // begin from the the offsets committed to the database
+ Map fromOffsets = new HashMap<>();
+ for (resultSet : selectOffsetsFromYourDatabase)
+ fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
+ }
+
+ JavaInputDStream> stream = KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
+ );
+
+ stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+
+ Object results = yourCalculation(rdd);
+
+ // begin your transaction
+
+ // update results
+ // update offsets where the end of existing offsets matches the beginning of this batch of offsets
+ // assert that offsets were updated correctly
+
+ // end your transaction
+ }
+ });
@@ -185,6 +288,14 @@ The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html
)
+ Map kafkaParams = new HashMap();
+ // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
+ kafkaParams.put("security.protocol", "SSL");
+ kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
+ kafkaParams.put("ssl.truststore.password", "test1234");
+ kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
+ kafkaParams.put("ssl.keystore.password", "test1234");
+ kafkaParams.put("ssl.key.password", "test1234");