diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 45dc3c388cb8..32add5f49ab9 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator /** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */ private[kinesis] case class SequenceNumberRange( - streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) + streamName: String, + shardId: String, + fromSeqNumber: String, + toSeqNumber: String, + recordCount: Int) /** Class representing an array of Kinesis sequence number ranges */ private[kinesis] @@ -138,6 +142,8 @@ class KinesisSequenceRangeIterator( private val client = new AmazonKinesisClient(credentials) private val streamName = range.streamName private val shardId = range.shardId + // AWS limits to maximum of 10k records per get call + private val maxGetRecordsLimit = 10000 private var toSeqNumberReceived = false private var lastSeqNumber: String = null @@ -155,12 +161,14 @@ class KinesisSequenceRangeIterator( // If the internal iterator has not been initialized, // then fetch records from starting sequence number - internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber) + internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber, + range.recordCount) } else if (!internalIterator.hasNext) { // If the internal iterator does not have any more records, // then fetch more records after the last consumed sequence number - internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) + internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber, + range.recordCount) } if (!internalIterator.hasNext) { @@ -193,9 +201,12 @@ class KinesisSequenceRangeIterator( /** * Get records starting from or after the given sequence number. */ - private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = { + private def getRecords( + iteratorType: ShardIteratorType, + seqNum: String, + recordCount: Int): Iterator[Record] = { val shardIterator = getKinesisIterator(iteratorType, seqNum) - val result = getRecordsAndNextKinesisIterator(shardIterator) + val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount) result._1 } @@ -204,10 +215,12 @@ class KinesisSequenceRangeIterator( * to get records from Kinesis), and get the next shard iterator for next consumption. */ private def getRecordsAndNextKinesisIterator( - shardIterator: String): (Iterator[Record], String) = { + shardIterator: String, + recordCount: Int): (Iterator[Record], String) = { val getRecordsRequest = new GetRecordsRequest getRecordsRequest.setRequestCredentials(credentials) getRecordsRequest.setShardIterator(shardIterator) + getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit)) val getRecordsResult = retryOrTimeout[GetRecordsResult]( s"getting records using shard iterator") { client.getRecords(getRecordsRequest) diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 393e56a39320..e36cde5aa3d8 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -216,7 +216,8 @@ private[kinesis] class KinesisReceiver[T]( if (records.size > 0) { val dataIterator = records.iterator().asScala.map(messageHandler) val metadata = SequenceNumberRange(streamName, shardId, - records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber()) + records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber(), + records.size()) blockGenerator.addMultipleDataWithCallback(dataIterator, metadata) } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index 18a5a1509a33..2c7b9c58e6fa 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -51,7 +51,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => val seqNumRange = SequenceNumberRange( - testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last) + testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size) (shardId, seqNumRange) } allRanges = shardIdToRange.values.toSeq @@ -181,7 +181,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) // Create the necessary ranges to use in the RDD val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)( - SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))) + SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 1))) val realRanges = Array.tabulate(numPartitionsInKinesis) { i => val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis))) SequenceNumberRanges(Array(range)) diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 404b673c0117..5a99776e510d 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -119,13 +119,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Generate block info data for testing val seqNumRanges1 = SequenceNumberRanges( - SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")) + SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67)) val blockId1 = StreamBlockId(kinesisStream.id, 123) val blockInfo1 = ReceivedBlockInfo( 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None)) val seqNumRanges2 = SequenceNumberRanges( - SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb")) + SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89)) val blockId2 = StreamBlockId(kinesisStream.id, 345) val blockInfo2 = ReceivedBlockInfo( 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))