Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
awsCredentialsOption: Option[SerializableAWSCredentials]
awsCredentialsPool: AWSCredentialPool
) extends ReceiverInputDStream[T](_ssc) {

private[streaming]
Expand All @@ -61,7 +61,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
isBlockIdValid = isBlockIdValid,
retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
messageHandler = messageHandler,
awsCredentialsOption = awsCredentialsOption)
awsCredentialsOption = awsCredentialsPool.getKinesisCredentials())
} else {
logWarning("Kinesis sequence number information was not present with some block metadata," +
" it may not be possible to recover from failures")
Expand All @@ -71,6 +71,6 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](

override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsOption)
checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsPool)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,66 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
override def getAWSSecretKey: String = secretKey
}

private[kinesis]
object Utilities {
def createCredentials(awsAccessKey: String, awsSecretKey: String) : Option[SerializableAWSCredentials] = {
if(awsAccessKey != null && awsSecretKey != null)
Some(new SerializableAWSCredentials(awsAccessKey, awsSecretKey))
else
None
}
}

case class AWSCredentialPool(
kinesisCredentials: Option[SerializableAWSCredentials],
dynamoDBCredentials: Option[SerializableAWSCredentials],
cloudWatchCredentials: Option[SerializableAWSCredentials]) extends Object with Logging {

def this() {
this(None, None, None);
}

def this(kinesisAwsAccessKey: String, kinesisAwsSecretKey: String,
dynamoDBAwsAccessKey: String, dynamoDBAwsSecretKey: String,
cloudWatchAwsAccessKey: String, cloudWatchAwsSecretKey: String) {
this(Utilities.createCredentials(kinesisAwsAccessKey, kinesisAwsSecretKey),
Utilities.createCredentials(dynamoDBAwsAccessKey, dynamoDBAwsSecretKey),
Utilities.createCredentials(cloudWatchAwsAccessKey, cloudWatchAwsSecretKey));
}

def getKinesisCredentials() = kinesisCredentials

/**
* If AWS credential is provided, return a AWSCredentialProvider returning that credential.
* Otherwise, return the DefaultAWSCredentialsProviderChain.
*/
private def resolveAWSCredentialsProvider(credentialsName: String, credentials: Option[SerializableAWSCredentials]): AWSCredentialsProvider = {
credentials match {
case Some(awsCredentials) =>
logInfo("Using provided AWS credentials for " + credentialsName)
new AWSCredentialsProvider {
override def getCredentials: AWSCredentials = awsCredentials
override def refresh(): Unit = { }
}
case None =>
logInfo("Using DefaultAWSCredentialsProviderChain for " + credentialsName)
new DefaultAWSCredentialsProviderChain()
}
}

def getKinesisCredentialsProvider() = {
resolveAWSCredentialsProvider("kinesis", kinesisCredentials)
}

def getDynamoDbCredentialsProvider() = {
resolveAWSCredentialsProvider("dynamoDb", dynamoDBCredentials)
}

def getCloudWatchCredentialsProvider() = {
resolveAWSCredentialsProvider("cloudWatch", cloudWatchCredentials)
}
}

/**
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
* This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
Expand Down Expand Up @@ -78,8 +138,7 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects
* @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
* the credentials
* @param awsCredentialPool CredentialPool which contains the optional SerializableAWSCredentials for Kinesis, DynamoDb and CloudWatch.
*/
private[kinesis] class KinesisReceiver[T](
val streamName: String,
Expand All @@ -90,7 +149,7 @@ private[kinesis] class KinesisReceiver[T](
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
awsCredentialsOption: Option[SerializableAWSCredentials])
awsCredentialPool: AWSCredentialPool)
extends Receiver[T](storageLevel) with Logging { receiver =>

/*
Expand Down Expand Up @@ -148,9 +207,11 @@ private[kinesis] class KinesisReceiver[T](

kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
// KCL config instance
val awsCredProvider = resolveAWSCredentialsProvider()
val kinesisCredProvider = awsCredentialPool.getKinesisCredentialsProvider()
val dynamoDbCredProvider = awsCredentialPool.getDynamoDbCredentialsProvider()
val cloudWatchCredProvider = awsCredentialPool.getCloudWatchCredentialsProvider()
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
new KinesisClientLibConfiguration(checkpointAppName, streamName, kinesisCredProvider, dynamoDbCredProvider, cloudWatchCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
Expand Down Expand Up @@ -299,25 +360,6 @@ private[kinesis] class KinesisReceiver[T](
}
}

/**
* If AWS credential is provided, return a AWSCredentialProvider returning that credential.
* Otherwise, return the DefaultAWSCredentialsProviderChain.
*/
private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
awsCredentialsOption match {
case Some(awsCredentials) =>
logInfo("Using provided AWS credentials")
new AWSCredentialsProvider {
override def getCredentials: AWSCredentials = awsCredentials
override def refresh(): Unit = { }
}
case None =>
logInfo("Using DefaultAWSCredentialsProviderChain")
new DefaultAWSCredentialsProviderChain()
}
}


/**
* Class to handle blocks generated by this receiver's block generator. Specifically, in
* the context of the Kinesis Receiver, this handler does the following.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@ package org.apache.spark.streaming.kinesis

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Random, Success, Try}

import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
import scala.util.{ Failure, Random, Success, Try }
import com.amazonaws.auth.{ AWSCredentials, DefaultAWSCredentialsProviderChain }
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model._

import org.apache.spark.Logging
import com.amazonaws.auth.profile.ProfileCredentialsProvider

/**
* Shared utility methods for performing Kinesis tests that actually transfer data.
Expand Down Expand Up @@ -66,6 +64,12 @@ private[kinesis] class KinesisTestUtils extends Logging {
new DynamoDB(dynamoDBClient)
}

private lazy val profileDynamoDb = {
val dynamoDBClient = new AmazonDynamoDBClient(KinesisTestUtils.getAWSProfileCredentials(KinesisTestUtils.DYNAMODB_PROFILE))
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
new DynamoDB(dynamoDBClient)
}

protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
if (!aggregate) {
new SimpleDataGenerator(kinesisClient)
Expand Down Expand Up @@ -131,6 +135,14 @@ private[kinesis] class KinesisTestUtils extends Logging {
val table = dynamoDB.getTable(tableName)
table.delete()
table.waitForDelete()
val defaultAccountAccessKeyId = KinesisTestUtils.getAWSCredentials().getAWSAccessKeyId;
val profileAccountAccessKeyId = KinesisTestUtils.getAWSProfileCredentials(KinesisTestUtils.DYNAMODB_PROFILE).getAWSAccessKeyId;
if (!defaultAccountAccessKeyId.equals(profileAccountAccessKeyId)) {
logInfo(s"Deleting tables in AWS account with profile name $KinesisTestUtils.DYNAMODB_PROFILE");
val table = profileDynamoDb.getTable(tableName)
table.delete()
table.waitForDelete()
}
} catch {
case e: Exception =>
logWarning(s"Could not delete DynamoDB table $tableName")
Expand Down Expand Up @@ -179,6 +191,10 @@ private[kinesis] object KinesisTestUtils {
val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
val envVarNameForCredentialPoolTests = "ENABLE_CREDENTIAL_POOL_TESTS"
final val DYNAMODB_PROFILE = "dynamoDB"
final val CLOUDWATCH_PROFILE = "cloudWatch"
var cleanUpProfileDynamoTables = false

lazy val shouldRunTests = {
val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
Expand Down Expand Up @@ -221,13 +237,51 @@ private[kinesis] object KinesisTestUtils {
case Failure(e) =>
throw new Exception(
s"""
|Kinesis tests enabled using environment variable $envVarNameForEnablingTests
|but could not find AWS credentials. Please follow instructions in AWS documentation
|to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
|can find the credentials.
|Kinesis tests that actually send data has been enabled by setting the environment
|variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
|DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
|By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
|To change this endpoint URL to a different region, you can set the environment variable
|$endVarNameForEndpoint to the desired endpoint URL
|(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
""".stripMargin)
}
}

def getAWSProfileCredentials(profile: String): AWSCredentials = {
Try { new ProfileCredentialsProvider(profile).getCredentials() } match {
case Success(cred) => {
if (profile.equals(KinesisTestUtils.DYNAMODB_PROFILE)) {
KinesisTestUtils.cleanUpProfileDynamoTables = true;
}
cred
}
case Failure(e) =>
println(
s"""
|Could not load the AWS credentials for profile $profile.
|
|Credentialpool tests verify the ability to uses Kinesis stream, Dynamo DB and CloudWatch Metrics in
|separate AWS accounts. This requires a separate set of AWS credentials to be stored in system with the profile
|names "dynamoDB" and "cloudWatch" along with the default credentials.
|In absence of these credentials only the underlying API will be tested.
""")
Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
case Success(cred) => cred
case Failure(e) =>
throw new Exception(
s"""
|Kinesis tests that actually send data has been enabled by setting the environment
|variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
|DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
|By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
|To change this endpoint URL to a different region, you can set the environment variable
|$endVarNameForEndpoint to the desired endpoint URL
|(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
""".stripMargin)
}
}
}
}

/** A wrapper interface that will allow us to consolidate the code for synthetic data generation. */
Expand Down
Loading