Skip to content

Commit ca4257a

Browse files
committed
[SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINESIS] Updates to the Kinesis API
SPARK-6514 - Use correct region SPARK-5960 - Allow AWS Credentials to be directly passed SPARK-6656 - Specify kinesis application name explicitly SPARK-7679 - Upgrade to latest KCL and AWS SDK. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6147 from tdas/kinesis-api-update and squashes the following commits: f23ea77 [Tathagata Das] Updated versions and updated APIs 373b201 [Tathagata Das] Updated Kinesis API
1 parent 2ca60ac commit ca4257a

File tree

6 files changed

+348
-120
lines changed

6 files changed

+348
-120
lines changed

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private[kinesis] class KinesisCheckpointState(
4848
/**
4949
* Advance the checkpoint clock by the checkpoint interval.
5050
*/
51-
def advanceCheckpoint() = {
51+
def advanceCheckpoint(): Unit = {
5252
checkpointClock.advance(checkpointInterval.milliseconds)
5353
}
5454
}

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 82 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,40 @@
1616
*/
1717
package org.apache.spark.streaming.kinesis
1818

19-
import java.net.InetAddress
2019
import java.util.UUID
2120

21+
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
22+
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
23+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
24+
2225
import org.apache.spark.Logging
2326
import org.apache.spark.storage.StorageLevel
2427
import org.apache.spark.streaming.Duration
2528
import org.apache.spark.streaming.receiver.Receiver
2629
import org.apache.spark.util.Utils
2730

28-
import com.amazonaws.auth.AWSCredentialsProvider
29-
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
30-
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
31-
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
32-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
33-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
34-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
31+
32+
private[kinesis]
33+
case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
34+
extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
3535

3636
/**
3737
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
3838
* This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
3939
* https://github.com/awslabs/amazon-kinesis-client
40-
* This is a custom receiver used with StreamingContext.receiverStream(Receiver)
41-
* as described here:
42-
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
43-
* Instances of this class will get shipped to the Spark Streaming Workers
44-
* to run within a Spark Executor.
40+
* This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
41+
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
42+
* Instances of this class will get shipped to the Spark Streaming Workers to run within a
43+
* Spark Executor.
4544
*
4645
* @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
4746
* by the Kinesis Client Library. If you change the App name or Stream name,
4847
* the KCL will throw errors. This usually requires deleting the backing
4948
* DynamoDB table with the same name this Kinesis application.
5049
* @param streamName Kinesis stream name
5150
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
51+
* @param regionName Region name used by the Kinesis Client Library for
52+
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
5253
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
5354
* See the Kinesis Spark Streaming documentation for more
5455
* details on the different types of checkpoints.
@@ -59,92 +60,103 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
5960
* (InitialPositionInStream.TRIM_HORIZON) or
6061
* the tip of the stream (InitialPositionInStream.LATEST).
6162
* @param storageLevel Storage level to use for storing the received objects
62-
*
63-
* @return ReceiverInputDStream[Array[Byte]]
63+
* @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
64+
* the credentials
6465
*/
6566
private[kinesis] class KinesisReceiver(
6667
appName: String,
6768
streamName: String,
6869
endpointUrl: String,
69-
checkpointInterval: Duration,
70+
regionName: String,
7071
initialPositionInStream: InitialPositionInStream,
71-
storageLevel: StorageLevel)
72-
extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
73-
74-
/*
75-
* The following vars are built in the onStart() method which executes in the Spark Worker after
76-
* this code is serialized and shipped remotely.
77-
*/
78-
79-
/*
80-
* workerId should be based on the ip address of the actual Spark Worker where this code runs
81-
* (not the Driver's ip address.)
82-
*/
83-
var workerId: String = null
72+
checkpointInterval: Duration,
73+
storageLevel: StorageLevel,
74+
awsCredentialsOption: Option[SerializableAWSCredentials]
75+
) extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
8476

8577
/*
86-
* This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials
87-
* in the following order of precedence:
88-
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
89-
* Java System Properties - aws.accessKeyId and aws.secretKey
90-
* Credential profiles file at the default location (~/.aws/credentials) shared by all
91-
* AWS SDKs and the AWS CLI
92-
* Instance profile credentials delivered through the Amazon EC2 metadata service
78+
* =================================================================================
79+
* The following vars are initialize in the onStart() method which executes in the
80+
* Spark worker after this Receiver is serialized and shipped to the worker.
81+
* =================================================================================
9382
*/
94-
var credentialsProvider: AWSCredentialsProvider = null
95-
96-
/* KCL config instance. */
97-
var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null
9883

99-
/*
100-
* RecordProcessorFactory creates impls of IRecordProcessor.
101-
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
102-
* IRecordProcessor.processRecords() method.
103-
* We're using our custom KinesisRecordProcessor in this case.
84+
/**
85+
* workerId is used by the KCL should be based on the ip address of the actual Spark Worker where this code runs
86+
* (not the driver's IP address.)
10487
*/
105-
var recordProcessorFactory: IRecordProcessorFactory = null
88+
private var workerId: String = null
10689

107-
/*
108-
* Create a Kinesis Worker.
109-
* This is the core client abstraction from the Kinesis Client Library (KCL).
110-
* We pass the RecordProcessorFactory from above as well as the KCL config instance.
111-
* A Kinesis Worker can process 1..* shards from the given stream - each with its
112-
* own RecordProcessor.
90+
/**
91+
* Worker is the core client abstraction from the Kinesis Client Library (KCL).
92+
* A worker can process more than one shards from the given stream.
93+
* Each shard is assigned its own IRecordProcessor and the worker run multiple such
94+
* processors.
11395
*/
114-
var worker: Worker = null
96+
private var worker: Worker = null
11597

11698
/**
117-
* This is called when the KinesisReceiver starts and must be non-blocking.
118-
* The KCL creates and manages the receiving/processing thread pool through the Worker.run()
119-
* method.
99+
* This is called when the KinesisReceiver starts and must be non-blocking.
100+
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
120101
*/
121102
override def onStart() {
122103
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
123-
credentialsProvider = new DefaultAWSCredentialsProviderChain()
124-
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
125-
credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
126-
.withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500)
127-
recordProcessorFactory = new IRecordProcessorFactory {
104+
105+
// KCL config instance
106+
val awsCredProvider = resolveAWSCredentialsProvider()
107+
val kinesisClientLibConfiguration =
108+
new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, workerId)
109+
.withKinesisEndpoint(endpointUrl)
110+
.withInitialPositionInStream(initialPositionInStream)
111+
.withTaskBackoffTimeMillis(500)
112+
.withRegionName(regionName)
113+
114+
/*
115+
* RecordProcessorFactory creates impls of IRecordProcessor.
116+
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
117+
* IRecordProcessor.processRecords() method.
118+
* We're using our custom KinesisRecordProcessor in this case.
119+
*/
120+
val recordProcessorFactory = new IRecordProcessorFactory {
128121
override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver,
129122
workerId, new KinesisCheckpointState(checkpointInterval))
130123
}
124+
131125
worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
132126
worker.run()
127+
133128
logInfo(s"Started receiver with workerId $workerId")
134129
}
135130

136131
/**
137-
* This is called when the KinesisReceiver stops.
138-
* The KCL worker.shutdown() method stops the receiving/processing threads.
139-
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
132+
* This is called when the KinesisReceiver stops.
133+
* The KCL worker.shutdown() method stops the receiving/processing threads.
134+
* The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
140135
*/
141136
override def onStop() {
142-
worker.shutdown()
143-
logInfo(s"Shut down receiver with workerId $workerId")
137+
if (worker != null) {
138+
worker.shutdown()
139+
logInfo(s"Stopped receiver for workerId $workerId")
140+
worker = null
141+
}
144142
workerId = null
145-
credentialsProvider = null
146-
kinesisClientLibConfiguration = null
147-
recordProcessorFactory = null
148-
worker = null
143+
}
144+
145+
/**
146+
* If AWS credential is provided, return a AWSCredentialProvider returning that credential.
147+
* Otherwise, return the DefaultAWSCredentialsProviderChain.
148+
*/
149+
private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
150+
awsCredentialsOption match {
151+
case Some(awsCredentials) =>
152+
logInfo("Using provided AWS credentials")
153+
new AWSCredentialsProvider {
154+
override def getCredentials: AWSCredentials = awsCredentials
155+
override def refresh(): Unit = { }
156+
}
157+
case None =>
158+
logInfo("Using DefaultAWSCredentialsProviderChain")
159+
new DefaultAWSCredentialsProviderChain()
160+
}
149161
}
150162
}

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ import com.amazonaws.services.kinesis.model.Record
3535
/**
3636
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
3737
* This implementation operates on the Array[Byte] from the KinesisReceiver.
38-
* The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup.
38+
* The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
39+
* shard in the Kinesis stream upon startup. This is normally done in separate threads,
40+
* but the KCLs within the KinesisReceivers will balance themselves out if you create
41+
* multiple Receivers.
3942
*
4043
* @param receiver Kinesis receiver
4144
* @param workerId for logging purposes
@@ -47,17 +50,17 @@ private[kinesis] class KinesisRecordProcessor(
4750
workerId: String,
4851
checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {
4952

50-
/* shardId to be populated during initialize() */
51-
var shardId: String = _
53+
// shardId to be populated during initialize()
54+
private var shardId: String = _
5255

5356
/**
5457
* The Kinesis Client Library calls this method during IRecordProcessor initialization.
5558
*
5659
* @param shardId assigned by the KCL to this particular RecordProcessor.
5760
*/
5861
override def initialize(shardId: String) {
59-
logInfo(s"Initialize: Initializing workerId $workerId with shardId $shardId")
6062
this.shardId = shardId
63+
logInfo(s"Initialized workerId $workerId with shardId $shardId")
6164
}
6265

6366
/**
@@ -73,12 +76,17 @@ private[kinesis] class KinesisRecordProcessor(
7376
if (!receiver.isStopped()) {
7477
try {
7578
/*
76-
* Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
77-
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
78-
* internally-configured Spark serializer (kryo, etc).
79-
* This is not desirable, so we instead store a raw Array[Byte] and decouple
80-
* ourselves from Spark's internal serialization strategy.
81-
*/
79+
* Notes:
80+
* 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
81+
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
82+
* internally-configured Spark serializer (kryo, etc).
83+
* 2) This is not desirable, so we instead store a raw Array[Byte] and decouple
84+
* ourselves from Spark's internal serialization strategy.
85+
* 3) For performance, the BlockGenerator is asynchronously queuing elements within its
86+
* memory before creating blocks. This prevents the small block scenario, but requires
87+
* that you register callbacks to know when a block has been generated and stored
88+
* (WAL is sufficient for storage) before can checkpoint back to the source.
89+
*/
8290
batch.foreach(record => receiver.store(record.getData().array()))
8391

8492
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
@@ -116,7 +124,7 @@ private[kinesis] class KinesisRecordProcessor(
116124
logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
117125
" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
118126

119-
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/
127+
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
120128
throw e
121129
}
122130
}
@@ -190,7 +198,7 @@ private[kinesis] object KinesisRecordProcessor extends Logging {
190198
logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
191199
retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
192200
}
193-
/* Throw: Shutdown has been requested by the Kinesis Client Library.*/
201+
/* Throw: Shutdown has been requested by the Kinesis Client Library. */
194202
case _: ShutdownException => {
195203
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
196204
throw e

0 commit comments

Comments
 (0)