Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions docs/streaming-kinesis-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position])
[appName], [streamingContext], [stream name], [endpoint URL], [regionName], [awsAccessKeyId], [awsSecretKey], [checkpoint interval], [initial position] [storage level])

See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the Running the Example section for instructions on how to run the example.
Expand All @@ -44,29 +44,36 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]);
[appName], [javaStreamingContext], [stream name], [endpoint URL], [regionName], [awsAccessKeyId], [awsSecretKey], [checkpoint interval], [initial position] [storage level]);

See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.

</div>
</div>
- `app name`: Name of the application used used by Kinesis to tie this Kinesis application to the Kinesis stream. Note: this application name will override the one from StreamingContext.SparkConf

- `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
- `streamingContext`: StreamingContext possibly containing a SparkConf with an application name used by Kinesis to tie this Kinesis application to the Kinesis stream

- `[Kinesis stream name]`: The Kinesis stream that this streaming application receives from
- `[stream name]`: The Kinesis stream that this streaming application receives from
- The application name used in the streaming context becomes the Kinesis application name
- The application name must be unique for a given account and region.
- The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization.
- The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table created in the specified region (can be separate from the Kinesis stream region) during Kinesis Client Library initialization.
- Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table.


- `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).

- `[region name]`: The region used by the Kinesis Client Library for DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)

- `[aws access key id]`: AWS access key id

- `[aws secret key]`: AWS secret key

- `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

- `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).

- `[storage level]`: Can be one of the following `StorageLevel.MEMORY_AND_DISK_2`, `StorageLevel.MEMORY_ONLY_2`, `StorageLevel.MEMORY_AND_DISK`, etc. (See Spark RDD StorageLevel documentation for more details).

3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).

Expand Down Expand Up @@ -99,7 +106,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m

- The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load.

- As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning when possible.
- As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning from the start when possible.

- Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details.

Expand All @@ -115,19 +122,19 @@ To run the example,

- Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created.

- Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials.
- Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials or pass the credentials in explicitly. Explictly-passed credentials will take precedence.

- In the Spark root directory, run the example as

<div class="codetabs">
<div data-lang="scala" markdown="1">

bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL]
bin/run-example streaming.KinesisWordCountASL [app name] [stream name] [endpoint URL] [region name] [aws access key id] [aws secret key id]

</div>
<div data-lang="java" markdown="1">

bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL]
bin/run-example streaming.JavaKinesisWordCountASL [app name] [stream name] [endpoint URL] [region name] [aws access key id] [aws secret key id]

</div>
</div>
Expand All @@ -136,7 +143,7 @@ To run the example,

- To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.

bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10
bin/run-example streaming.KinesisWordCountProducerASL [stream name] [endpoint URL] [aws access key id] [aws secret key id] 1000 10

This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
import org.apache.spark.streaming.kinesis.KinesisUtils;

import scala.Tuple2;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.common.collect.Lists;
Expand All @@ -52,30 +51,26 @@
*
* Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
*
* This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
* in the following order of precedence:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
* Instance profile credentials - delivered through the Amazon EC2 metadata service
*
* Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>
* <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
* <endpoint-url> is the endpoint of the Kinesis service
* (ie. https://kinesis.us-east-1.amazonaws.com)
* This example requires the AWS credentials to be passed as args.
*
* Example:
* $ export AWS_ACCESS_KEY_ID=<your-access-key>
* $ export AWS_SECRET_KEY=<your-secret-key>
* $ $SPARK_HOME/bin/run-example \
* org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \
* https://kinesis.us-east-1.amazonaws.com
* Usage: JavaKinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name> \
* <aws-access-key-id> <aws-secret-key>
* <app-name> name of the consumer app
* <stream-name> name of the Kinesis stream (ie. mySparkStream)
* <endpoint-url> endpoint of the Kinesis service
* (ie. https://kinesis.us-east-1.amazonaws.com)
* <region-name> region name for DynamoDB and CloudWatch backing services
* <aws-access-key-id> AWS access key id
* <aws-secret-key> AWS secret key
*
* Note that number of workers/threads should be 1 more than the number of receivers.
* This leaves one thread available for actually processing the data.
* Examples:
* $ SPARK_HOME/bin/run-example \
* org.apache.spark.examples.streaming.JavaKinesisWordCountASL myKinesisApp myKinesisStream\
* https://kinesis.us-east-1.amazonaws.com us-east-1 <aws-access-key-id> <aws-secret-key>
*
* There is a companion helper class called KinesisWordCountProducerASL which puts dummy data
* onto the Kinesis stream.
* onto the Kinesis stream.
*
* Usage instructions for KinesisWordCountProducerASL are provided in the class definition.
*/
public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
Expand All @@ -88,65 +83,79 @@ private JavaKinesisWordCountASL() {

public static void main(String[] args) {
/* Check that all required args were passed in. */
if (args.length < 2) {
if (args.length < 6) {
System.err.println(
"Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n" +
"Usage: JavaKinesisWordCountASL <app-name> <stream-name> <endpoint-url>" +
" <region-name> <aws-access-key-id> <aws-secret-key>\n" +
" <app-name> is the name of the consumer app\n" +
" <stream-name> is the name of the Kinesis stream\n" +
" <endpoint-url> is the endpoint of the Kinesis service\n" +
" (e.g. https://kinesis.us-east-1.amazonaws.com)\n");
" (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
" <region-name> region name for DynamoDB and CloudWatch backing services\n" +
" <aws-access-key-id> is the AWS Access Key Id\n" +
" <aws-secret-key> is the AWS Secret Key");
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();

/* Populate the appropriate variables from the given args */
String streamName = args[0];
String endpointUrl = args[1];
String appName = args[0];
String streamName = args[1];
String endpointUrl = args[2];
String regionName = args[3];
String awsAccessKeyId = args[4];
String awsSecretKey = args[5];

/* Set the batch interval to a fixed 2000 millis (2 seconds) */
Duration batchInterval = new Duration(2000);

/* Create a Kinesis client in order to determine the number of shards for the given stream */
/* Create Kinesis client in order to determine the number of shards for the given stream */
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(
new DefaultAWSCredentialsProviderChain());
new BasicAWSCredentials(awsAccessKeyId, awsSecretKey));
kinesisClient.setEndpoint(endpointUrl);

/* Determine the number of shards from the stream */
int numShards = kinesisClient.describeStream(streamName)
.getStreamDescription().getShards().size();

/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */
/* Create 1 Kinesis Worker/Receiver/DStream for each shard */
int numStreams = numShards;

/* Setup the Spark config. */
SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount");

/* Kinesis checkpoint interval. Same as batchInterval for this example. */
Duration checkpointInterval = batchInterval;

/* Setup the Spark config. */
SparkConf sparkConfig = new SparkConf().setAppName(appName);

/* Setup the StreamingContext */
JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);

/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
for (int i = 0; i < numStreams; i++) {
streamsList.add(
KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval,
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2())
KinesisUtils.createStream(appName, jssc, streamName, endpointUrl,
regionName, awsAccessKeyId, awsSecretKey, checkpointInterval,
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2())
);
}

/* Union all the streams if there is more than 1 stream */
JavaDStream<byte[]> unionStreams;
if (streamsList.size() > 1) {
unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
unionStreams = jssc.union(streamsList.get(0),
streamsList.subList(1, streamsList.size()));
} else {
/* Otherwise, just use the 1 stream */
unionStreams = streamsList.get(0);
}

/*
* Split each line of the union'd DStreams into multiple words using flatMap to produce the collection.
* Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR.
* Split each line of the union'd DStreams into multiple words using flatMap
* to produce the collection.
* Convert lines of byte[] to multiple Strings by first converting to String,
* then splitting on WORD_SEPARATOR.
*/
JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
@Override
Expand Down
Loading