diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 379eb513d521..2dae960810fb 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -32,7 +32,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream val kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]) + [appName], [streamingContext], [stream name], [endpoint URL], [regionName], [awsAccessKeyId], [awsSecretKey], [checkpoint interval], [initial position] [storage level]) See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$) and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the Running the Example section for instructions on how to run the example. @@ -44,29 +44,36 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]); + [appName], [javaStreamingContext], [stream name], [endpoint URL], [regionName], [awsAccessKeyId], [awsSecretKey], [checkpoint interval], [initial position] [storage level]); See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html) and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example. + - `app name`: Name of the application used used by Kinesis to tie this Kinesis application to the Kinesis stream. Note: this application name will override the one from StreamingContext.SparkConf - - `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream + - `streamingContext`: StreamingContext possibly containing a SparkConf with an application name used by Kinesis to tie this Kinesis application to the Kinesis stream - - `[Kinesis stream name]`: The Kinesis stream that this streaming application receives from + - `[stream name]`: The Kinesis stream that this streaming application receives from - The application name used in the streaming context becomes the Kinesis application name - The application name must be unique for a given account and region. - - The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization. + - The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table created in the specified region (can be separate from the Kinesis stream region) during Kinesis Client Library initialization. - Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table. - - `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region). + - `[region name]`: The region used by the Kinesis Client Library for DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + + - `[aws access key id]`: AWS access key id + + - `[aws secret key]`: AWS secret key + - `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application. - `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details). + - `[storage level]`: Can be one of the following `StorageLevel.MEMORY_AND_DISK_2`, `StorageLevel.MEMORY_ONLY_2`, `StorageLevel.MEMORY_AND_DISK`, etc. (See Spark RDD StorageLevel documentation for more details). 3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). @@ -99,7 +106,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m - The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load. - - As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning when possible. + - As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning from the start when possible. - Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details. @@ -115,19 +122,19 @@ To run the example, - Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created. -- Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials. +- Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials or pass the credentials in explicitly. Explictly-passed credentials will take precedence. - In the Spark root directory, run the example as
- bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL] + bin/run-example streaming.KinesisWordCountASL [app name] [stream name] [endpoint URL] [region name] [aws access key id] [aws secret key id]
- bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL] + bin/run-example streaming.JavaKinesisWordCountASL [app name] [stream name] [endpoint URL] [region name] [aws access key id] [aws secret key id]
@@ -136,7 +143,7 @@ To run the example, - To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer. - bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10 + bin/run-example streaming.KinesisWordCountProducerASL [stream name] [endpoint URL] [aws access key id] [aws secret key id] 1000 10 This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example. diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index b0bff27a61c1..d7a24bc799a6 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -33,8 +33,7 @@ import org.apache.spark.streaming.kinesis.KinesisUtils; import scala.Tuple2; - -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.google.common.collect.Lists; @@ -52,30 +51,26 @@ * * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region * - * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials - * in the following order of precedence: - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - * Instance profile credentials - delivered through the Amazon EC2 metadata service - * - * Usage: JavaKinesisWordCountASL - * is the name of the Kinesis stream (ie. mySparkStream) - * is the endpoint of the Kinesis service - * (ie. https://kinesis.us-east-1.amazonaws.com) + * This example requires the AWS credentials to be passed as args. * - * Example: - * $ export AWS_ACCESS_KEY_ID= - * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/bin/run-example \ - * org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com + * Usage: JavaKinesisWordCountASL \ + * + * name of the consumer app + * name of the Kinesis stream (ie. mySparkStream) + * endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * region name for DynamoDB and CloudWatch backing services + * AWS access key id + * AWS secret key * - * Note that number of workers/threads should be 1 more than the number of receivers. - * This leaves one thread available for actually processing the data. + * Examples: + * $ SPARK_HOME/bin/run-example \ + * org.apache.spark.examples.streaming.JavaKinesisWordCountASL myKinesisApp myKinesisStream\ + * https://kinesis.us-east-1.amazonaws.com us-east-1 * * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data - * onto the Kinesis stream. + * onto the Kinesis stream. + * * Usage instructions for KinesisWordCountProducerASL are provided in the class definition. */ public final class JavaKinesisWordCountASL { // needs to be public for access from run-example @@ -88,41 +83,51 @@ private JavaKinesisWordCountASL() { public static void main(String[] args) { /* Check that all required args were passed in. */ - if (args.length < 2) { + if (args.length < 6) { System.err.println( - "Usage: JavaKinesisWordCountASL \n" + + "Usage: JavaKinesisWordCountASL " + + " \n" + + " is the name of the consumer app\n" + " is the name of the Kinesis stream\n" + " is the endpoint of the Kinesis service\n" + - " (e.g. https://kinesis.us-east-1.amazonaws.com)\n"); + " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" + + " region name for DynamoDB and CloudWatch backing services\n" + + " is the AWS Access Key Id\n" + + " is the AWS Secret Key"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); /* Populate the appropriate variables from the given args */ - String streamName = args[0]; - String endpointUrl = args[1]; + String appName = args[0]; + String streamName = args[1]; + String endpointUrl = args[2]; + String regionName = args[3]; + String awsAccessKeyId = args[4]; + String awsSecretKey = args[5]; + /* Set the batch interval to a fixed 2000 millis (2 seconds) */ Duration batchInterval = new Duration(2000); - /* Create a Kinesis client in order to determine the number of shards for the given stream */ + /* Create Kinesis client in order to determine the number of shards for the given stream */ AmazonKinesisClient kinesisClient = new AmazonKinesisClient( - new DefaultAWSCredentialsProviderChain()); + new BasicAWSCredentials(awsAccessKeyId, awsSecretKey)); kinesisClient.setEndpoint(endpointUrl); /* Determine the number of shards from the stream */ int numShards = kinesisClient.describeStream(streamName) .getStreamDescription().getShards().size(); - /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */ + /* Create 1 Kinesis Worker/Receiver/DStream for each shard */ int numStreams = numShards; - /* Setup the Spark config. */ - SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount"); - /* Kinesis checkpoint interval. Same as batchInterval for this example. */ Duration checkpointInterval = batchInterval; + /* Setup the Spark config. */ + SparkConf sparkConfig = new SparkConf().setAppName(appName); + /* Setup the StreamingContext */ JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); @@ -130,23 +135,27 @@ public static void main(String[] args) { List> streamsList = new ArrayList>(numStreams); for (int i = 0; i < numStreams; i++) { streamsList.add( - KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()) + KinesisUtils.createStream(appName, jssc, streamName, endpointUrl, + regionName, awsAccessKeyId, awsSecretKey, checkpointInterval, + InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()) ); } /* Union all the streams if there is more than 1 stream */ JavaDStream unionStreams; if (streamsList.size() > 1) { - unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); + unionStreams = jssc.union(streamsList.get(0), + streamsList.subList(1, streamsList.size())); } else { /* Otherwise, just use the 1 stream */ unionStreams = streamsList.get(0); } /* - * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection. - * Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR. + * Split each line of the union'd DStreams into multiple words using flatMap + * to produce the collection. + * Convert lines of byte[] to multiple Strings by first converting to String, + * then splitting on WORD_SEPARATOR. */ JavaDStream words = unionStreams.flatMap(new FlatMapFunction() { @Override diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index 32da0858d1a1..749c83054b0c 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -18,20 +18,25 @@ package org.apache.spark.examples.streaming import java.nio.ByteBuffer + import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger import org.apache.spark.Logging import org.apache.spark.SparkConf +import org.apache.spark.annotation.Experimental import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Milliseconds import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisUtils -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain + +import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.PutRecordRequest -import org.apache.log4j.Logger -import org.apache.log4j.Level + /** * Kinesis Spark Streaming WordCount example. @@ -46,43 +51,43 @@ import org.apache.log4j.Level * * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region * - * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials - * in the following order of precedence: - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - * Instance profile credentials - delivered through the Amazon EC2 metadata service + * This example requires the AWS credentials to be passed as args. * - * Usage: KinesisWordCountASL - * is the name of the Kinesis stream (ie. mySparkStream) - * is the endpoint of the Kinesis service + * Usage: KinesisWordCountASL \ + * + * name of the consumer app + * name of the Kinesis stream (ie. mySparkStream) + * endpoint of the Kinesis service * (ie. https://kinesis.us-east-1.amazonaws.com) + * region name for DynamoDB and CloudWatch backing services + * AWS access key id + * AWS secret key * * Example: - * $ export AWS_ACCESS_KEY_ID= - * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/bin/run-example \ - * org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com + * $ SPARK_HOME/bin/run-example \ + * org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \ + * https://kinesis.us-east-1.amazonaws.com us-east-1 * - * - * Note that number of workers/threads should be 1 more than the number of receivers. - * This leaves one thread available for actually processing the data. + * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data + * onto the Kinesis stream. * - * There is a companion helper class below called KinesisWordCountProducerASL which puts - * dummy data onto the Kinesis stream. - * Usage instructions for KinesisWordCountProducerASL are provided in that class definition. + * Usage instructions for KinesisWordCountProducerASL are provided in the class definition. */ -private object KinesisWordCountASL extends Logging { +object KinesisWordCountASL extends Logging { def main(args: Array[String]) { /* Check that all required args were passed in. */ - if (args.length < 2) { + if (args.length < 6) { System.err.println( """ - |Usage: KinesisWordCount + |Usage: JavaKinesisWordCountASL + | + | is the name of the consumer app | is the name of the Kinesis stream | is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com) + | region name for DynamoDB and CloudWatch backing services + | is the AWS Access Key Id + | is the AWS Secret Key """.stripMargin) System.exit(1) } @@ -90,30 +95,37 @@ private object KinesisWordCountASL extends Logging { StreamingExamples.setStreamingLogLevels() /* Populate the appropriate variables from the given args */ - val Array(streamName, endpointUrl) = args + val Array(appName, streamName, endpointUrl, regionName, awsAccessKeyId, awsSecretKey) = args + + /* Spark Streaming batch interval */ + val batchInterval = Milliseconds(2000) - /* Determine the number of shards from the stream */ - val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) + /* Create the low-level Kinesis Client from the AWS Java SDK. */ + /* Determine the number of shards from the stream. */ + val kinesisClient = new AmazonKinesisClient( + new BasicAWSCredentials(awsAccessKeyId, awsSecretKey)) kinesisClient.setEndpoint(endpointUrl) + val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size() /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */ val numStreams = numShards - /* Setup the and SparkConfig and StreamingContext */ - /* Spark Streaming batch interval */ - val batchInterval = Milliseconds(2000) - val sparkConfig = new SparkConf().setAppName("KinesisWordCount") - val ssc = new StreamingContext(sparkConfig, batchInterval) - /* Kinesis checkpoint interval. Same as batchInterval for this example. */ - val kinesisCheckpointInterval = batchInterval + val checkpointInterval = batchInterval + + /* Setup the and SparkConfig */ + val sparkConfig = new SparkConf().setAppName(appName) + + /* Setup the StreamingContext */ + val ssc = new StreamingContext(sparkConfig, batchInterval) /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ val kinesisStreams = (0 until numStreams).map { i => - KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) + KinesisUtils.createStream(appName, ssc, streamName, endpointUrl, regionName, awsAccessKeyId, + awsSecretKey, checkpointInterval, InitialPositionInStream.LATEST, + StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ @@ -125,7 +137,7 @@ private object KinesisWordCountASL extends Logging { /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */ val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) - + /* Print the first 10 wordCounts */ wordCounts.print() @@ -136,74 +148,76 @@ private object KinesisWordCountASL extends Logging { } /** - * Usage: KinesisWordCountProducerASL - * + * Usage: KinesisWordCountProducerASL \ + * * is the name of the Kinesis stream (ie. mySparkStream) - * is the endpoint of the Kinesis service + * is the endpoint of the Kinesis service * (ie. https://kinesis.us-east-1.amazonaws.com) + * is the AWS Access Key Id + * is the AWS Secret Key * is the rate of records per second to put onto the stream * is the rate of records per second to put onto the stream * * Example: - * $ export AWS_ACCESS_KEY_ID= - * $ export AWS_SECRET_KEY= - * $ $SPARK_HOME/bin/run-example \ + * $ SPARK_HOME/bin/run-example \ * org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com 10 5 + * https://kinesis.us-east-1.amazonaws.com 10 5 */ -private object KinesisWordCountProducerASL { +object KinesisWordCountProducerASL { def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KinesisWordCountProducerASL " + - " ") + if (args.length < 6) { + System.err.println("Usage: KinesisWordCountProducerASL " + + " ") System.exit(1) } StreamingExamples.setStreamingLogLevels() /* Populate the appropriate variables from the given args */ - val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args + val Array(stream, endpoint, awsAccessKeyId, awsSecretKey, recordsPerSecond, wordsPerRecord) + = args /* Generate the records and return the totals */ - val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt) + val totals = generate(stream, endpoint, awsAccessKeyId, awsSecretKey, recordsPerSecond.toInt, + wordsPerRecord.toInt) - /* Print the array of (index, total) tuples */ + /* Print the array of (word, total) tuples */ println("Totals") - totals.foreach(total => println(total.toString())) + totals.foreach(println(_)) } def generate(stream: String, endpoint: String, + awsAccessKeyId: String, + awsSecretKey: String, recordsPerSecond: Int, - wordsPerRecord: Int): Seq[(Int, Int)] = { - - val MaxRandomInts = 10 - - /* Create the Kinesis client */ - val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) + wordsPerRecord: Int): Seq[(String, Int)] = { + + val randomWords = List("spark","you","are","my","father") + val totals = scala.collection.mutable.Map[String, Int]() + + /* Create the low-level Kinesis Client from the AWS Java SDK. */ + val kinesisClient = new AmazonKinesisClient( + new BasicAWSCredentials(awsAccessKeyId, awsSecretKey)) kinesisClient.setEndpoint(endpoint) - + println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" + - s" $recordsPerSecond records per second and $wordsPerRecord words per record"); - - val totals = new Array[Int](MaxRandomInts) - /* Put String records onto the stream per the given recordPerSec and wordsPerRecord */ + s" $recordsPerSecond records per second and $wordsPerRecord words per record"); + + /* Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord */ for (i <- 1 to 5) { - /* Generate recordsPerSec records to put onto the stream */ val records = (1 to recordsPerSecond.toInt).map { recordNum => - /* - * Randomly generate each wordsPerRec words between 0 (inclusive) - * and MAX_RANDOM_INTS (exclusive) - */ + /* Randomly generate wordsPerRecord number of words */ val data = (1 to wordsPerRecord.toInt).map(x => { - /* Generate the random int */ - val randomInt = Random.nextInt(MaxRandomInts) + /* Get a random index to a word */ + val randomWordIdx = Random.nextInt(randomWords.size) + val randomWord = randomWords(randomWordIdx) - /* Keep track of the totals */ - totals(randomInt) += 1 + /* Increment total count to compare to server counts later */ + totals(randomWord) = totals.getOrElse(randomWord, 0) + 1 - randomInt.toString() + randomWord }).mkString(" ") /* Create a partitionKey based on recordNum */ @@ -222,9 +236,8 @@ private object KinesisWordCountProducerASL { Thread.sleep(1000) println("Sent " + recordsPerSecond + " records") } - - /* Convert the totals to (index, total) tuple */ - (0 to (MaxRandomInts - 1)).zip(totals) + /* Convert the totals to (index, total) tuple */ + totals.toSeq.sortBy(_._1) } } @@ -233,7 +246,6 @@ private object KinesisWordCountProducerASL { * This has been lifted from the examples/ project to remove the circular dependency. */ private[streaming] object StreamingExamples extends Logging { - /** Set reasonable logging levels for streaming if the user has not configured log4j. */ def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/BasicAWSCredentialsProvider.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/BasicAWSCredentialsProvider.scala new file mode 100644 index 000000000000..ff3d3743456f --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/BasicAWSCredentialsProvider.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.annotation.DeveloperApi + +import com.amazonaws.auth.AWSCredentials +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.BasicAWSCredentials + +/** + * :: DeveloperAPI :: + * + * Implementation of AWSCredentialsProvider for BasicAWSCredentials. + * (Surprisingly, this is not part of the AWS Java SDK.) + * + * Note: This is intentionally not Serializable to match the other AWSCredentials implementations. + * Making this Serializable could lead to a refactoring that would introduce a + * NotSerializableExceptions when migrating to different AWSCredentials impls such as + * DefaultAWSCredentialsProviderChain. + * In other words, I'm following the existing Non-Serializable hierarchy dictated by AWS. + * + * @param awsAccessKeyId AWS Access Key Id + * @param awsSecretKey AWS Secret Key + */ +@DeveloperApi +class BasicAWSCredentialsProvider(awsAccessKeyId: String, awsSecretKey: String) + extends AWSCredentialsProvider { + + override def getCredentials(): AWSCredentials = + new BasicAWSCredentials(awsAccessKeyId, awsSecretKey) + override def refresh() {} + override def toString(): String = getClass().getSimpleName() +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala index 588e86a1887e..1c9b0c218ae1 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala @@ -48,7 +48,7 @@ private[kinesis] class KinesisCheckpointState( /** * Advance the checkpoint clock by the checkpoint interval. */ - def advanceCheckpoint() = { + def advanceCheckpoint(): Unit = { checkpointClock.advance(checkpointInterval.milliseconds) } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index a7fe4476cacb..1647242b7677 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -16,10 +16,10 @@ */ package org.apache.spark.streaming.kinesis -import java.net.InetAddress import java.util.UUID import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Duration import org.apache.spark.streaming.receiver.Receiver @@ -27,6 +27,7 @@ import org.apache.spark.util.Utils import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream @@ -37,11 +38,10 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: * https://github.com/awslabs/amazon-kinesis-client - * This is a custom receiver used with StreamingContext.receiverStream(Receiver) - * as described here: - * http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * Instances of this class will get shipped to the Spark Streaming Workers - * to run within a Spark Executor. + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a + * Spark Executor. * * @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams * by the Kinesis Client Library. If you change the App name or Stream name, @@ -49,6 +49,10 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker * DynamoDB table with the same name this Kinesis application. * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName The region used by the Kinesis Client Library for + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) + * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. * See the Kinesis Spark Streaming documentation for more * details on the different types of checkpoints. @@ -65,11 +69,14 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker private[kinesis] class KinesisReceiver( appName: String, streamName: String, - endpointUrl: String, + endpointUrl: String, + regionName: String, + awsAccessKeyId: String, + awsSecretKey: String, checkpointInterval: Duration, initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel) - extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => + storageLevel: StorageLevel + ) extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => /* * The following vars are built in the onStart() method which executes in the Spark Worker after @@ -80,38 +87,16 @@ private[kinesis] class KinesisReceiver( * workerId should be based on the ip address of the actual Spark Worker where this code runs * (not the Driver's ip address.) */ - var workerId: String = null - - /* - * This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials - * in the following order of precedence: - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file at the default location (~/.aws/credentials) shared by all - * AWS SDKs and the AWS CLI - * Instance profile credentials delivered through the Amazon EC2 metadata service - */ - var credentialsProvider: AWSCredentialsProvider = null - - /* KCL config instance. */ - var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null - - /* - * RecordProcessorFactory creates impls of IRecordProcessor. - * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the - * IRecordProcessor.processRecords() method. - * We're using our custom KinesisRecordProcessor in this case. - */ - var recordProcessorFactory: IRecordProcessorFactory = null + private var workerId: String = null /* * Create a Kinesis Worker. * This is the core client abstraction from the Kinesis Client Library (KCL). * We pass the RecordProcessorFactory from above as well as the KCL config instance. - * A Kinesis Worker can process 1..* shards from the given stream - each with its - * own RecordProcessor. + * A Kinesis Worker can process 1..* shards from the given stream. + * Each shard is assigned its own IRecordProcessor. */ - var worker: Worker = null + private var worker: Worker = null /** * This is called when the KinesisReceiver starts and must be non-blocking. @@ -120,16 +105,27 @@ private[kinesis] class KinesisReceiver( */ override def onStart() { workerId = Utils.localHostName() + ":" + UUID.randomUUID() - credentialsProvider = new DefaultAWSCredentialsProviderChain() - kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName, - credentialsProvider, workerId).withKinesisEndpoint(endpointUrl) + + // KCL config instance + val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName, + resolveAWSCredentialsProvider(), workerId).withKinesisEndpoint(endpointUrl) .withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500) - recordProcessorFactory = new IRecordProcessorFactory { + .withRegionName(regionName) + + /* + * RecordProcessorFactory creates impls of IRecordProcessor. + * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the + * IRecordProcessor.processRecords() method. + * We're using our custom KinesisRecordProcessor in this case. + */ + val recordProcessorFactory = new IRecordProcessorFactory { override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver, workerId, new KinesisCheckpointState(checkpointInterval)) } + worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) worker.run() + logInfo(s"Started receiver with workerId $workerId") } @@ -139,12 +135,57 @@ private[kinesis] class KinesisReceiver( * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown. */ override def onStop() { - worker.shutdown() - logInfo(s"Shut down receiver with workerId $workerId") + if (worker != null) { + worker.shutdown() + logInfo(s"Stopped receiver for workerId $workerId") + worker = null + } workerId = null - credentialsProvider = null - kinesisClientLibConfiguration = null - recordProcessorFactory = null - worker = null + } + + /* + * If non-null aws credentials are passed in, use BasicAWSCredentialsProvider. + * Otherwise, use the DefaultAWSCredentialsProviderChain. + * + * Note: DefaultAWSCredentialsProviderChain searches for credentials in the following order of + * precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file at the default location (~/.aws/credentials) shared by all + * AWS SDKs and the AWS CLI + * Instance profile credentials delivered through the Amazon EC2 metadata service + * + * @return non-Serializable AWSCredentialsProvider + */ + def resolveAWSCredentialsProvider(): AWSCredentialsProvider = { + if (awsAccessKeyId != null && awsSecretKey != null) { + logInfo("Using BasicAWSCredentialsProvider") + new BasicAWSCredentialsProvider(awsAccessKeyId, awsSecretKey) + } else { + logInfo("Using DefaultAWSCredentialsProviderChain") + new DefaultAWSCredentialsProviderChain() + } + } + + /* + * If non-null regionName is specified, validate it and use this for DynamoDB and CloudWatch. + * Otherwise, try the region from the endpointUrl + * + * @return regionName + * + * @throws InvalidArgumentException if either regionName or endpoint are not valid + */ + def resolveRegionName(): String = { + if (regionName != null) { // regionName is non-null and expected to be valid + logInfo(s"Trying regionName $regionName") + if (RegionUtils.getRegion(regionName) == null) { + throw new IllegalArgumentException(s"regionName $regionName is not valid") + } + regionName + } else { // regionName is null, so try to use the region from the endpointUrl + logInfo(s"Trying regionName from $endpointUrl") + // This throws IllegalArgumentException if endpointUrl does not represent a valid region + RegionUtils.getRegionByEndpoint(endpointUrl).getName() + } } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index af8cd875b454..f65e743c4e2a 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -35,7 +35,10 @@ import com.amazonaws.services.kinesis.model.Record /** * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. * This implementation operates on the Array[Byte] from the KinesisReceiver. - * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup. + * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each + * shard in the Kinesis stream upon startup. This is normally done in separate threads, + * but the KCLs within the KinesisReceivers will balance themselves out if you create + * multiple Receivers. * * @param receiver Kinesis receiver * @param workerId for logging purposes @@ -47,8 +50,8 @@ private[kinesis] class KinesisRecordProcessor( workerId: String, checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging { - /* shardId to be populated during initialize() */ - var shardId: String = _ + // shardId to be populated during initialize() + private var shardId: String = _ /** * The Kinesis Client Library calls this method during IRecordProcessor initialization. @@ -56,8 +59,8 @@ private[kinesis] class KinesisRecordProcessor( * @param shardId assigned by the KCL to this particular RecordProcessor. */ override def initialize(shardId: String) { - logInfo(s"Initialize: Initializing workerId $workerId with shardId $shardId") this.shardId = shardId + logInfo(s"Initialized workerId $workerId with shardId $shardId") } /** @@ -73,12 +76,17 @@ private[kinesis] class KinesisRecordProcessor( if (!receiver.isStopped()) { try { /* - * Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming - * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the - * internally-configured Spark serializer (kryo, etc). - * This is not desirable, so we instead store a raw Array[Byte] and decouple - * ourselves from Spark's internal serialization strategy. - */ + * Notes: + * 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming + * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the + * internally-configured Spark serializer (kryo, etc). + * 2) This is not desirable, so we instead store a raw Array[Byte] and decouple + * ourselves from Spark's internal serialization strategy. + * 3) For performance, the BlockGenerator is asynchronously queuing elements within its + * memory before creating blocks. This prevents the small block scenario, but requires + * that you register callbacks to know when a block has been generated and stored + * (WAL is sufficient for storage) before can checkpoint back to the source. + */ batch.foreach(record => receiver.store(record.getData().array())) logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") @@ -116,7 +124,7 @@ private[kinesis] class KinesisRecordProcessor( logError(s"Exception: WorkerId $workerId encountered and exception while storing " + " or checkpointing a batch for workerId $workerId and shardId $shardId.", e) - /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/ + /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */ throw e } } @@ -190,7 +198,7 @@ private[kinesis] object KinesisRecordProcessor extends Logging { logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e) retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis) } - /* Throw: Shutdown has been requested by the Kinesis Client Library.*/ + /* Throw: Shutdown has been requested by the Kinesis Client Library. */ case _: ShutdownException => { logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e) throw e diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index 96f4399accd3..fa7d7d07775b 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -36,9 +36,22 @@ object KinesisUtils { /** * Create an InputDStream that pulls messages from a Kinesis stream. * :: Experimental :: + * + * Notes: + * If appName is passed in, it will be favored over the SparkConf appName for Kinesis purposes. + * This is useful for spark shell and other contexts that provide a pre-built SparkContext. + * + * @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. This usually requires deleting the backing + * DynamoDB table with the same name this Kinesis application. * @param ssc StreamingContext object * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName The region used by the Kinesis Client Library for the following: + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param awsAccessKeyId AWS AccessKeyId (if null, we use DefaultAWSCredentialsProviderChain) + * @param awsSecretKey AWS SecretKey (if null, we use DefaultAWSCredentialsProviderChain) * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. * See the Kinesis Spark Streaming documentation for more * details on the different types of checkpoints. @@ -48,7 +61,54 @@ object KinesisUtils { * per Kinesis' limit of 24 hours * (InitialPositionInStream.TRIM_HORIZON) or * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects + * Checkpoint info will override this setting, however. + * @param storageLevel Storage and replication level to use for storing the received objects + * (StorageLevel.MEMORY_AND_DISK_2, StorageLevel.MEMORY_ONLY_2, etc) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + @Experimental + def createStream( + appName: String, + ssc: StreamingContext, + streamName: String, + endpointUrl: String, + regionName: String, + awsAccessKeyId: String, + awsSecretKey: String, + checkpointInterval: Duration, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel + ): ReceiverInputDStream[Array[Byte]] = { + ssc.receiverStream(new KinesisReceiver(appName, streamName, endpointUrl, regionName, + awsAccessKeyId, awsSecretKey, checkpointInterval, initialPositionInStream, storageLevel)) + } + + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * :: Experimental :: + + * Notes: + * This maintains the original API from the initial KinesisUtils implementation. + * The appName will be the value in SparkConf.appName. + * The endpointUrl's region will be used for DynamoDB and CloudWatch. + * The DefaultAWSCredentialsProviderChain will be used for authenticating against Kinesis. + * + * @param ssc StreamingContext object + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + * See the Kinesis Spark Streaming documentation for more + * details on the different types of checkpoints. + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * Checkpoint info will override this setting, however. + * @param storageLevel Storage and replication level to use for storing the received objects + * (StorageLevel.MEMORY_AND_DISK_2, StorageLevel.MEMORY_ONLY_2, etc) * * @return ReceiverInputDStream[Array[Byte]] */ @@ -59,14 +119,72 @@ object KinesisUtils { endpointUrl: String, checkpointInterval: Duration, initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { - ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, - checkpointInterval, initialPositionInStream, storageLevel)) + storageLevel: StorageLevel + ): ReceiverInputDStream[Array[Byte]] = { + ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, null, null, + null, checkpointInterval, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * :: Experimental :: + * + * Notes: + * If appName is passed in, it will be favored over the SparkConf appName for Kinesis purposes. + * This is useful for spark shell and other contexts that provide a pre-built SparkContext. + * + * @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. This usually requires deleting the backing + * DynamoDB table with the same name this Kinesis application. + * @param jssc Java StreamingContext object + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName The region used by the Kinesis Client Library for the following: + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) + * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + * See the Kinesis Spark Streaming documentation for more + * details on the different types of checkpoints. + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param storageLevel Storage level to use for storing the received objects + * (StorageLevel.MEMORY_AND_DISK_2, StorageLevel.MEMORY_ONLY_2, etc) + * + * @return JavaReceiverInputDStream[Array[Byte]] + */ + @Experimental + def createStream( + appName: String, + jssc: JavaStreamingContext, + streamName: String, + endpointUrl: String, + regionName: String, + awsAccessKeyId: String, + awsSecretKey: String, + checkpointInterval: Duration, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[Array[Byte]] = { + jssc.receiverStream(new KinesisReceiver(appName, streamName, endpointUrl, regionName, + awsAccessKeyId, awsSecretKey, checkpointInterval, initialPositionInStream, storageLevel)) } /** * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. * :: Experimental :: + * + * Notes: + * This maintains the original API from the initial KinesisUtils implementation. + * The appName will be the value in SparkConf.appName. + * The endpointUrl's region will be used for DynamoDB and CloudWatch. + * The DefaultAWSCredentialsProviderChain will be used for authenticating against Kinesis. + * * @param jssc Java StreamingContext object * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) @@ -80,18 +198,20 @@ object KinesisUtils { * (InitialPositionInStream.TRIM_HORIZON) or * the tip of the stream (InitialPositionInStream.LATEST). * @param storageLevel Storage level to use for storing the received objects + * (StorageLevel.MEMORY_AND_DISK_2, StorageLevel.MEMORY_ONLY_2, etc) * * @return JavaReceiverInputDStream[Array[Byte]] */ @Experimental def createStream( - jssc: JavaStreamingContext, - streamName: String, - endpointUrl: String, + jssc: JavaStreamingContext, + streamName: String, + endpointUrl: String, checkpointInterval: Duration, initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel): JavaReceiverInputDStream[Array[Byte]] = { - jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName, - endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)) + storageLevel: StorageLevel + ): JavaReceiverInputDStream[Array[Byte]] = { + jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName, endpointUrl, null, + null, null, checkpointInterval, initialPositionInStream, storageLevel)) } } diff --git a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java deleted file mode 100644 index 87954a31f60c..000000000000 --- a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis; - -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.junit.Test; - -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - -/** - * Demonstrate the use of the KinesisUtils Java API - */ -public class JavaKinesisStreamSuite extends LocalJavaStreamingContext { - @Test - public void testKinesisStream() { - // Tests the API, does not actually test data receiving - JavaDStream kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", new Duration(2000), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); - - ssc.stop(); - } -} diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index 255fe6581960..94519cbfa35e 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -40,6 +40,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorC import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain /** * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor @@ -47,11 +48,17 @@ import com.amazonaws.services.kinesis.model.Record class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter with MockitoSugar { - val app = "TestKinesisReceiver" - val stream = "mySparkStream" - val endpoint = "endpoint-url" + val explicitAppName = "myExplicitAppName" + val streamName = "mySparkStream" + val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" + val regionName = "us-east-1" val workerId = "dummyWorkerId" - val shardId = "dummyShardId" + val awsAccessKeyId = "awsAccessKeyId" + val awsSecretKey = "awsSecretKey" + val batchInterval = Seconds(1) + val checkpointInterval = Seconds(2) + val initialPosition = InitialPositionInStream.LATEST + val storageLevel = StorageLevel.MEMORY_AND_DISK_2 val record1 = new Record() record1.setData(ByteBuffer.wrap("Spark In Action".getBytes())) @@ -81,13 +88,69 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft checkpointStateMock, currentClockMock) } - test("kinesis utils api") { - val ssc = new StreamingContext(master, framework, batchDuration) - // Tests the API, does not actually test data receiving - val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", Seconds(2), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2); - ssc.stop() + test("Use regionName when it's provided and valid") { + val receiver = new KinesisReceiver(explicitAppName, streamName, endpointUrl, regionName, + awsAccessKeyId, awsSecretKey, checkpointInterval, initialPosition, storageLevel) + + assert(receiver.resolveRegionName() === "us-east-1") + } + + test("Throw exception when regionName is provided, but not valid") { + val receiver = new KinesisReceiver(explicitAppName, streamName, endpointUrl, "janitor-monkey", + awsAccessKeyId, awsSecretKey, checkpointInterval, initialPosition, storageLevel) + + intercept[IllegalArgumentException] { + receiver.resolveRegionName() + } + } + + test("Use endpoint url when regionName not provided") { + val receiver = new KinesisReceiver(explicitAppName, streamName, endpointUrl, null, + awsAccessKeyId, awsSecretKey, checkpointInterval, initialPosition, storageLevel) + + assert(receiver.resolveRegionName() === "us-west-2") + } + + test("Throw exception when region cannot be derived from endpointUrl") { + val receiver = new KinesisReceiver(explicitAppName, streamName, + "https://kinesis.eu-chaos-monkey-2.amazonaws.com", null, awsAccessKeyId, awsSecretKey, + checkpointInterval, initialPosition, storageLevel) + + intercept[IllegalArgumentException] { + receiver.resolveRegionName() + } + } + + test("Use DefaultAWSCredentialsProviderChain when awsAccessKeyId and awsSecretKey are null") { + val receiver = new KinesisReceiver(explicitAppName, streamName, endpointUrl, regionName, + null, null, checkpointInterval, initialPosition, storageLevel) + + assert(receiver.resolveAWSCredentialsProvider() + .isInstanceOf[DefaultAWSCredentialsProviderChain]) + } + + test("Use DefaultAWSCredentialsProviderChain when awsAccessKeyId is null") { + val receiver = new KinesisReceiver(explicitAppName, streamName, endpointUrl, regionName, + null, awsSecretKey, checkpointInterval, initialPosition, storageLevel) + + assert(receiver.resolveAWSCredentialsProvider() + .isInstanceOf[DefaultAWSCredentialsProviderChain]) + } + + test("Use DefaultAWSCredentialsProviderChain when awsSecretKey is null") { + val receiver = new KinesisReceiver(explicitAppName, streamName, endpointUrl, regionName, + awsAccessKeyId, null, checkpointInterval, initialPosition, storageLevel) + + assert(receiver.resolveAWSCredentialsProvider() + .isInstanceOf[DefaultAWSCredentialsProviderChain]) + } + + test("Use BasicAWSCredentialsProvider when both awsAccessKeyId and awsSecretKey are non-null") { + val receiver = new KinesisReceiver(explicitAppName, streamName, endpointUrl, regionName, + awsAccessKeyId, awsSecretKey, checkpointInterval, initialPosition, storageLevel) + + assert(receiver.resolveAWSCredentialsProvider() + .isInstanceOf[BasicAWSCredentialsProvider]) } test("process records including store and checkpoint") {