Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[kinesis] class KinesisCheckpointState(
/**
* Advance the checkpoint clock by the checkpoint interval.
*/
def advanceCheckpoint() = {
def advanceCheckpoint(): Unit = {
checkpointClock.advance(checkpointInterval.milliseconds)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,40 @@
*/
package org.apache.spark.streaming.kinesis

import java.net.InetAddress
import java.util.UUID

import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker

private[kinesis]
case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable

/**
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
* This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
* https://github.com/awslabs/amazon-kinesis-client
* This is a custom receiver used with StreamingContext.receiverStream(Receiver)
* as described here:
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* Instances of this class will get shipped to the Spark Streaming Workers
* to run within a Spark Executor.
* This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* Instances of this class will get shipped to the Spark Streaming Workers to run within a
* Spark Executor.
*
* @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
* by the Kinesis Client Library. If you change the App name or Stream name,
* the KCL will throw errors. This usually requires deleting the backing
* DynamoDB table with the same name this Kinesis application.
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Region name used by the Kinesis Client Library for
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
Expand All @@ -59,92 +60,103 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
* (InitialPositionInStream.TRIM_HORIZON) or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@param initialPositionInStream seems to be missing

also, it appears to have moved below checkpointInterval which is a bit confusing. i hit this while testing.

the @param docs may need to be re-ordered, as well

* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
*
* @return ReceiverInputDStream[Array[Byte]]
* @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
* the credentials
*/
private[kinesis] class KinesisReceiver(
appName: String,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
regionName: String,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel)
extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>

/*
* The following vars are built in the onStart() method which executes in the Spark Worker after
* this code is serialized and shipped remotely.
*/

/*
* workerId should be based on the ip address of the actual Spark Worker where this code runs
* (not the Driver's ip address.)
*/
var workerId: String = null
checkpointInterval: Duration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this goes above initialPositionInStream

storageLevel: StorageLevel,
awsCredentialsOption: Option[SerializableAWSCredentials]
) extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>

/*
* This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials
* in the following order of precedence:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file at the default location (~/.aws/credentials) shared by all
* AWS SDKs and the AWS CLI
* Instance profile credentials delivered through the Amazon EC2 metadata service
* =================================================================================
* The following vars are initialize in the onStart() method which executes in the
* Spark worker after this Receiver is serialized and shipped to the worker.
* =================================================================================
*/
var credentialsProvider: AWSCredentialsProvider = null

/* KCL config instance. */
var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null

/*
* RecordProcessorFactory creates impls of IRecordProcessor.
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
* IRecordProcessor.processRecords() method.
* We're using our custom KinesisRecordProcessor in this case.
/**
* workerId is used by the KCL should be based on the ip address of the actual Spark Worker where this code runs
* (not the driver's IP address.)
*/
var recordProcessorFactory: IRecordProcessorFactory = null
private var workerId: String = null

/*
* Create a Kinesis Worker.
* This is the core client abstraction from the Kinesis Client Library (KCL).
* We pass the RecordProcessorFactory from above as well as the KCL config instance.
* A Kinesis Worker can process 1..* shards from the given stream - each with its
* own RecordProcessor.
/**
* Worker is the core client abstraction from the Kinesis Client Library (KCL).
* A worker can process more than one shards from the given stream.
* Each shard is assigned its own IRecordProcessor and the worker run multiple such
* processors.
*/
var worker: Worker = null
private var worker: Worker = null

/**
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through the Worker.run()
* method.
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
*/
override def onStart() {
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
credentialsProvider = new DefaultAWSCredentialsProviderChain()
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500)
recordProcessorFactory = new IRecordProcessorFactory {

// KCL config instance
val awsCredProvider = resolveAWSCredentialsProvider()
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)

/*
* RecordProcessorFactory creates impls of IRecordProcessor.
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
* IRecordProcessor.processRecords() method.
* We're using our custom KinesisRecordProcessor in this case.
*/
val recordProcessorFactory = new IRecordProcessorFactory {
override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver,
workerId, new KinesisCheckpointState(checkpointInterval))
}

worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
worker.run()

logInfo(s"Started receiver with workerId $workerId")
}

/**
* This is called when the KinesisReceiver stops.
* The KCL worker.shutdown() method stops the receiving/processing threads.
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
* This is called when the KinesisReceiver stops.
* The KCL worker.shutdown() method stops the receiving/processing threads.
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
*/
override def onStop() {
worker.shutdown()
logInfo(s"Shut down receiver with workerId $workerId")
if (worker != null) {
worker.shutdown()
logInfo(s"Stopped receiver for workerId $workerId")
worker = null
}
workerId = null
credentialsProvider = null
kinesisClientLibConfiguration = null
recordProcessorFactory = null
worker = null
}

/**
* If AWS credential is provided, return a AWSCredentialProvider returning that credential.
* Otherwise, return the DefaultAWSCredentialsProviderChain.
*/
private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
awsCredentialsOption match {
case Some(awsCredentials) =>
logInfo("Using provided AWS credentials")
new AWSCredentialsProvider {
override def getCredentials: AWSCredentials = awsCredentials
override def refresh(): Unit = { }
}
case None =>
logInfo("Using DefaultAWSCredentialsProviderChain")
new DefaultAWSCredentialsProviderChain()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ import com.amazonaws.services.kinesis.model.Record
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
* The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup.
* The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
* shard in the Kinesis stream upon startup. This is normally done in separate threads,
* but the KCLs within the KinesisReceivers will balance themselves out if you create
* multiple Receivers.
*
* @param receiver Kinesis receiver
* @param workerId for logging purposes
Expand All @@ -47,17 +50,17 @@ private[kinesis] class KinesisRecordProcessor(
workerId: String,
checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {

/* shardId to be populated during initialize() */
var shardId: String = _
// shardId to be populated during initialize()
private var shardId: String = _

/**
* The Kinesis Client Library calls this method during IRecordProcessor initialization.
*
* @param shardId assigned by the KCL to this particular RecordProcessor.
*/
override def initialize(shardId: String) {
logInfo(s"Initialize: Initializing workerId $workerId with shardId $shardId")
this.shardId = shardId
logInfo(s"Initialized workerId $workerId with shardId $shardId")
}

/**
Expand All @@ -73,12 +76,17 @@ private[kinesis] class KinesisRecordProcessor(
if (!receiver.isStopped()) {
try {
/*
* Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
* internally-configured Spark serializer (kryo, etc).
* This is not desirable, so we instead store a raw Array[Byte] and decouple
* ourselves from Spark's internal serialization strategy.
*/
* Notes:
* 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
* internally-configured Spark serializer (kryo, etc).
* 2) This is not desirable, so we instead store a raw Array[Byte] and decouple
* ourselves from Spark's internal serialization strategy.
* 3) For performance, the BlockGenerator is asynchronously queuing elements within its
* memory before creating blocks. This prevents the small block scenario, but requires
* that you register callbacks to know when a block has been generated and stored
* (WAL is sufficient for storage) before can checkpoint back to the source.
*/
batch.foreach(record => receiver.store(record.getData().array()))

logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
Expand Down Expand Up @@ -116,7 +124,7 @@ private[kinesis] class KinesisRecordProcessor(
logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)

/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use // instead of /* */

throw e
}
}
Expand Down Expand Up @@ -190,7 +198,7 @@ private[kinesis] object KinesisRecordProcessor extends Logging {
logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
}
/* Throw: Shutdown has been requested by the Kinesis Client Library.*/
/* Throw: Shutdown has been requested by the Kinesis Client Library. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// instead of /* */

case _: ShutdownException => {
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
throw e
Expand Down
Loading