Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient private val isBlockIdValid: Array[Boolean] = Array.empty,
val retryTimeoutMs: Int = 10000,
val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
val kinesisCredsProvider: SerializableCredentialsProvider = DefaultCredentialsProvider
val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _,
val kinesisCreds: SparkAWSCredentials = DefaultCredentials
) extends BlockRDD[T](sc, _blockIds) {

require(_blockIds.length == arrayOfseqNumberRanges.length,
Expand All @@ -109,7 +109,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
}

def getBlockFromKinesis(): Iterator[T] = {
val credentials = kinesisCredsProvider.provider.getCredentials
val credentials = kinesisCreds.provider.getCredentials
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
range, retryTimeoutMs).map(messageHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,28 @@ import scala.reflect.ClassTag
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Record

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo

private[kinesis] class KinesisInputDStream[T: ClassTag](
_ssc: StreamingContext,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointAppName: String,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
kinesisCredsProvider: SerializableCredentialsProvider
val streamName: String,
val endpointUrl: String,
val regionName: String,
val initialPositionInStream: InitialPositionInStream,
val checkpointAppName: String,
val checkpointInterval: Duration,
val _storageLevel: StorageLevel,
val messageHandler: Record => T,
val kinesisCreds: SparkAWSCredentials,
val dynamoDBCreds: Option[SparkAWSCredentials],
val cloudWatchCreds: Option[SparkAWSCredentials]
) extends ReceiverInputDStream[T](_ssc) {

private[streaming]
Expand All @@ -61,7 +65,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
isBlockIdValid = isBlockIdValid,
retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
messageHandler = messageHandler,
kinesisCredsProvider = kinesisCredsProvider)
kinesisCreds = kinesisCreds)
} else {
logWarning("Kinesis sequence number information was not present with some block metadata," +
" it may not be possible to recover from failures")
Expand All @@ -71,7 +75,238 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](

override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
checkpointAppName, checkpointInterval, storageLevel, messageHandler,
kinesisCredsProvider)
checkpointAppName, checkpointInterval, _storageLevel, messageHandler,
kinesisCreds, dynamoDBCreds, cloudWatchCreds)
}
}

@InterfaceStability.Evolving
object KinesisInputDStream {
/**
* Builder for [[KinesisInputDStream]] instances.
*
* @since 2.2.0
*/
@InterfaceStability.Evolving
class Builder {
// Required params
private var streamingContext: Option[StreamingContext] = None
private var streamName: Option[String] = None
private var checkpointAppName: Option[String] = None

// Params with defaults
private var endpointUrl: Option[String] = None
private var regionName: Option[String] = None
private var initialPositionInStream: Option[InitialPositionInStream] = None
private var checkpointInterval: Option[Duration] = None
private var storageLevel: Option[StorageLevel] = None
private var kinesisCredsProvider: Option[SparkAWSCredentials] = None
private var dynamoDBCredsProvider: Option[SparkAWSCredentials] = None
private var cloudWatchCredsProvider: Option[SparkAWSCredentials] = None

/**
* Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a
* required parameter.
*
* @param ssc [[StreamingContext]] used to construct Kinesis DStreams
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def streamingContext(ssc: StreamingContext): Builder = {
streamingContext = Option(ssc)
this
}

/**
* Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a
* required parameter.
*
* @param jssc [[JavaStreamingContext]] used to construct Kinesis DStreams
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def streamingContext(jssc: JavaStreamingContext): Builder = {
streamingContext = Option(jssc.ssc)
this
}

/**
* Sets the name of the Kinesis stream that the DStream will read from. This is a required
* parameter.
*
* @param streamName Name of Kinesis stream that the DStream will read from
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def streamName(streamName: String): Builder = {
this.streamName = Option(streamName)
this
}

/**
* Sets the KCL application name to use when checkpointing state to DynamoDB. This is a
* required parameter.
*
* @param appName Value to use for the KCL app name (used when creating the DynamoDB checkpoint
* table and when writing metrics to CloudWatch)
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def checkpointAppName(appName: String): Builder = {
checkpointAppName = Option(appName)
this
}

/**
* Sets the AWS Kinesis endpoint URL. Defaults to "https://kinesis.us-east-1.amazonaws.com" if
* no custom value is specified
*
* @param url Kinesis endpoint URL to use
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def endpointUrl(url: String): Builder = {
endpointUrl = Option(url)
this
}

/**
* Sets the AWS region to construct clients for. Defaults to "us-east-1" if no custom value
* is specified.
*
* @param regionName Name of AWS region to use (e.g. "us-west-2")
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def regionName(regionName: String): Builder = {
this.regionName = Option(regionName)
this
}

/**
* Sets the initial position data is read from in the Kinesis stream. Defaults to
* [[InitialPositionInStream.LATEST]] if no custom value is specified.
*
* @param initialPosition InitialPositionInStream value specifying where Spark Streaming
* will start reading records in the Kinesis stream from
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def initialPositionInStream(initialPosition: InitialPositionInStream): Builder = {
initialPositionInStream = Option(initialPosition)
this
}

/**
* Sets how often the KCL application state is checkpointed to DynamoDB. Defaults to the Spark
* Streaming batch interval if no custom value is specified.
*
* @param interval [[Duration]] specifying how often the KCL state should be checkpointed to
* DynamoDB.
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def checkpointInterval(interval: Duration): Builder = {
checkpointInterval = Option(interval)
this
}

/**
* Sets the storage level of the blocks for the DStream created. Defaults to
* [[StorageLevel.MEMORY_AND_DISK_2]] if no custom value is specified.
*
* @param storageLevel [[StorageLevel]] to use for the DStream data blocks
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def storageLevel(storageLevel: StorageLevel): Builder = {
this.storageLevel = Option(storageLevel)
this
}

/**
* Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS Kinesis
* endpoint. Defaults to [[DefaultCredentialsProvider]] if no custom value is specified.
*
* @param credentials [[SparkAWSCredentials]] to use for Kinesis authentication
*/
def kinesisCredentials(credentials: SparkAWSCredentials): Builder = {
kinesisCredsProvider = Option(credentials)
this
}

/**
* Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS DynamoDB
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for DynamoDB authentication
*/
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder = {
dynamoDBCredsProvider = Option(credentials)
this
}

/**
* Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS CloudWatch
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for CloudWatch authentication
*/
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder = {
cloudWatchCredsProvider = Option(credentials)
this
}

/**
* Create a new instance of [[KinesisInputDStream]] with configured parameters and the provided
* message handler.
*
* @param handler Function converting [[Record]] instances read by the KCL to DStream type [[T]]
* @return Instance of [[KinesisInputDStream]] constructed with configured parameters
*/
def buildWithMessageHandler[T: ClassTag](
handler: Record => T): KinesisInputDStream[T] = {
val ssc = getRequiredParam(streamingContext, "streamingContext")
new KinesisInputDStream(
ssc,
getRequiredParam(streamName, "streamName"),
endpointUrl.getOrElse(DEFAULT_KINESIS_ENDPOINT_URL),
regionName.getOrElse(DEFAULT_KINESIS_REGION_NAME),
initialPositionInStream.getOrElse(DEFAULT_INITIAL_POSITION_IN_STREAM),
getRequiredParam(checkpointAppName, "checkpointAppName"),
checkpointInterval.getOrElse(ssc.graph.batchDuration),
storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
handler,
kinesisCredsProvider.getOrElse(DefaultCredentials),
dynamoDBCredsProvider,
cloudWatchCredsProvider)
}

/**
* Create a new instance of [[KinesisInputDStream]] with configured parameters and using the
* default message handler, which returns [[Array[Byte]]].
*
* @return Instance of [[KinesisInputDStream]] constructed with configured parameters
*/
def build(): KinesisInputDStream[Array[Byte]] = buildWithMessageHandler(defaultMessageHandler)

private def getRequiredParam[T](param: Option[T], paramName: String): T = param.getOrElse {
throw new IllegalArgumentException(s"No value provided for required parameter $paramName")
}
}

/**
* Creates a [[KinesisInputDStream.Builder]] for constructing [[KinesisInputDStream]] instances.
*
* @since 2.2.0
*
* @return [[KinesisInputDStream.Builder]] instance
*/
def builder: Builder = new Builder

private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you mind just re-using the code in KinesisUtils instead of copying the code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about keeping it here and refactoring KinesisUtils to use it? I think this is what I was intending to do originally, just forgot to update the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

if (record == null) return null
val byteBuffer = record.getData()
val byteArray = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(byteArray)
byteArray
}

private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String =
"https://kinesis.us-east-1.amazonaws.com"
private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: InitialPositionInStream =
InitialPositionInStream.LATEST
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ import org.apache.spark.util.Utils
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects
* @param kinesisCredsProvider SerializableCredentialsProvider instance that will be used to
* generate the AWSCredentialsProvider instance used for KCL
* authorization.
* @param kinesisCreds SparkAWSCredentials instance that will be used to generate the
* AWSCredentialsProvider passed to the KCL to authorize Kinesis API calls.
* @param cloudWatchCreds Optional SparkAWSCredentials instance that will be used to generate the
* AWSCredentialsProvider passed to the KCL to authorize CloudWatch API
* calls. Will use kinesisCreds if value is None.
* @param dynamoDBCreds Optional SparkAWSCredentials instance that will be used to generate the
* AWSCredentialsProvider passed to the KCL to authorize DynamoDB API calls.
* Will use kinesisCreds if value is None.
*/
private[kinesis] class KinesisReceiver[T](
val streamName: String,
Expand All @@ -83,7 +88,9 @@ private[kinesis] class KinesisReceiver[T](
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
kinesisCredsProvider: SerializableCredentialsProvider)
kinesisCreds: SparkAWSCredentials,
dynamoDBCreds: Option[SparkAWSCredentials],
cloudWatchCreds: Option[SparkAWSCredentials])
extends Receiver[T](storageLevel) with Logging { receiver =>

/*
Expand Down Expand Up @@ -140,10 +147,13 @@ private[kinesis] class KinesisReceiver[T](
workerId = Utils.localHostName() + ":" + UUID.randomUUID()

kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
val kinesisProvider = kinesisCreds.provider
val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
checkpointAppName,
streamName,
kinesisCredsProvider.provider,
kinesisProvider,
dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
Expand Down
Loading