From 523cc0e2cc70bf2f7fdcf472391d67ef42203b3e Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Tue, 27 Feb 2018 08:49:20 -0800 Subject: [PATCH] Fix preparing a checkpoint at SHARD_END (#301) Fix IllegalArgumentException: Sequence number must be numeric, when preparing a checkpoint at SHARD_END --- .../lib/worker/SequenceNumberValidator.java | 6 ++- .../RecordProcessorCheckpointerTest.java | 39 +++++++++++++++++++ .../worker/SequenceNumberValidatorTest.java | 2 +- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java index 96af5f7c3..8cebbf33f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java @@ -70,12 +70,14 @@ public class SequenceNumberValidator { */ void validateSequenceNumber(String sequenceNumber) throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException { - if (!isDigits(sequenceNumber)) { + boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber); + + if (!atShardEnd && !isDigits(sequenceNumber)) { LOG.info("Sequence number must be numeric, but was " + sequenceNumber); throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber); } try { - if (validateWithGetIterator) { + if (!atShardEnd &&validateWithGetIterator) { proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber); LOG.info("Validated sequence number " + sequenceNumber + " with shard id " + shardId); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java index a3153aec8..67c36d200 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointerTest.java @@ -168,6 +168,21 @@ public final void testCheckpointExtendedSequenceNumber() throws Exception { Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); } + /** + * Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}. + */ + @Test + public final void testCheckpointAtShardEnd() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END; + processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); + processingCheckpointer.checkpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + } + + /** * Test method for * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}. @@ -299,6 +314,30 @@ public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); } + /** + * Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}. + */ + @Test + public final void testPrepareCheckpointAtShardEnd() throws Exception { + RecordProcessorCheckpointer processingCheckpointer = + new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory); + processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber); + ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END; + processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber); + IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber()); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint()); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + + // Checkpoint using preparedCheckpoint + preparedCheckpoint.checkpoint(); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); + Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint()); + Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); + } + + /** * Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order. */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java index aae93f296..51d1376d5 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java @@ -87,7 +87,7 @@ private void nonNumericValueValidationTest(SequenceNumberValidator validator, boolean validateWithGetIterator) { String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(), - SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(), + SentinelCheckpoint.TRIM_HORIZON.toString(), SentinelCheckpoint.AT_TIMESTAMP.toString() }; for (String nonNumericString : nonNumericStrings) {