Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 69 additions & 39 deletions docs/streaming-kinesis-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,58 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below.
**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.**

2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream of byte array as follows:
2. **Programming:** In the streaming application code, import `KinesisInputDStream` and create the input DStream of byte array as follows:

<div class="codetabs">
<div data-lang="scala" markdown="1">
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()

See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisInputDStream)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example.

</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kinesis.*;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build();

See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.

</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
Expand All @@ -70,27 +87,40 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m

<div class="codetabs">
<div data-lang="scala" markdown="1">

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream[T](
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
[message handler])
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.buildWithMessageHandler([message handler])

</div>
<div data-lang="java" markdown="1">

import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kinesis.*;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

JavaReceiverInputDStream<T> kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
[message handler], [class T]);
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.buildWithMessageHandler([message handler]);

</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.nio.ByteBuffer
import scala.util.Random

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.PutRecordRequest
Expand All @@ -34,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.streaming.kinesis.KinesisInputDStream


/**
Expand Down Expand Up @@ -135,8 +134,16 @@ object KinesisWordCountASL extends Logging {

// Create the Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
}

// Union all the streams
Expand Down