- import org.apache.spark.streaming.Duration
- import org.apache.spark.streaming.kinesis._
- import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-
- val kinesisStream = KinesisUtils.createStream(
- streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
- [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
-
- See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
+ import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream
+ import org.apache.spark.streaming.{Seconds, StreamingContext}
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+ val kinesisStream = KinesisInputDStream.builder
+ .streamingContext(streamingContext)
+ .endpointUrl([endpoint URL])
+ .regionName([region name])
+ .streamName([streamName])
+ .initialPositionInStream([initial position])
+ .checkpointAppName([Kinesis app name])
+ .checkpointInterval([checkpoint interval])
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .build()
+
+ See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisInputDStream)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example.
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.kinesis.*;
- import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
- JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream(
- streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
- [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
+ import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream
+ import org.apache.spark.streaming.Seconds
+ import org.apache.spark.streaming.StreamingContext
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+ KinesisInputDStream kinesisStream = KinesisInputDStream.builder
+ .streamingContext(streamingContext)
+ .endpointUrl([endpoint URL])
+ .regionName([region name])
+ .streamName([streamName])
+ .initialPositionInStream([initial position])
+ .checkpointAppName([Kinesis app name])
+ .checkpointInterval([checkpoint interval])
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .build();
See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
- from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+ from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
- kinesisStream = KinesisUtils.createStream(
- streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
- [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
+ kinesisStream = KinesisUtils.createStream(
+ streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
+ [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
@@ -70,27 +87,40 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
-
- import org.apache.spark.streaming.Duration
- import org.apache.spark.streaming.kinesis._
- import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-
- val kinesisStream = KinesisUtils.createStream[T](
- streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
- [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
- [message handler])
+ import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream
+ import org.apache.spark.streaming.{Seconds, StreamingContext}
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+ val kinesisStream = KinesisInputDStream.builder
+ .streamingContext(streamingContext)
+ .endpointUrl([endpoint URL])
+ .regionName([region name])
+ .streamName([streamName])
+ .initialPositionInStream([initial position])
+ .checkpointAppName([Kinesis app name])
+ .checkpointInterval([checkpoint interval])
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .buildWithMessageHandler([message handler])
-
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.kinesis.*;
- import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
- JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream(
- streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
- [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
- [message handler], [class T]);
+ import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream
+ import org.apache.spark.streaming.Seconds
+ import org.apache.spark.streaming.StreamingContext
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+ KinesisInputDStream kinesisStream = KinesisInputDStream.builder
+ .streamingContext(streamingContext)
+ .endpointUrl([endpoint URL])
+ .regionName([region name])
+ .streamName([streamName])
+ .initialPositionInStream([initial position])
+ .checkpointAppName([Kinesis app name])
+ .checkpointInterval([checkpoint interval])
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .buildWithMessageHandler([message handler]);
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index f14117b708a0..cde2c4b04c0c 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
import scala.util.Random
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
-import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.PutRecordRequest
@@ -34,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
-import org.apache.spark.streaming.kinesis.KinesisUtils
+import org.apache.spark.streaming.kinesis.KinesisInputDStream
/**
@@ -135,8 +134,16 @@ object KinesisWordCountASL extends Logging {
// Create the Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i =>
- KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
- InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
+ KinesisInputDStream.builder
+ .streamingContext(ssc)
+ .streamName(streamName)
+ .endpointUrl(endpointUrl)
+ .regionName(regionName)
+ .initialPositionInStream(InitialPositionInStream.LATEST)
+ .checkpointAppName(appName)
+ .checkpointInterval(kinesisCheckpointInterval)
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .build()
}
// Union all the streams