Skip to content

Commit

Permalink
Fix preparing a checkpoint at SHARD_END (#301)
Browse files Browse the repository at this point in the history
Fix IllegalArgumentException: Sequence number must be numeric, when preparing a checkpoint at SHARD_END
  • Loading branch information
wdbaruni authored and pfifer committed Feb 27, 2018
1 parent 24916ba commit 523cc0e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ public class SequenceNumberValidator {
*/
void validateSequenceNumber(String sequenceNumber)
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
if (!isDigits(sequenceNumber)) {
boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber);

if (!atShardEnd && !isDigits(sequenceNumber)) {
LOG.info("Sequence number must be numeric, but was " + sequenceNumber);
throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber);
}
try {
if (validateWithGetIterator) {
if (!atShardEnd &&validateWithGetIterator) {
proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber);
LOG.info("Validated sequence number " + sequenceNumber + " with shard id " + shardId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ public final void testCheckpointExtendedSequenceNumber() throws Exception {
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
}

/**
* Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}.
*/
@Test
public final void testCheckpointAtShardEnd() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
processingCheckpointer.checkpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
}


/**
* Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}.
Expand Down Expand Up @@ -299,6 +314,30 @@ public final void testPrepareCheckpointExtendedSequenceNumber() throws Exception
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}

/**
* Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}.
*/
@Test
public final void testPrepareCheckpointAtShardEnd() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber());
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());

// Checkpoint using preparedCheckpoint
preparedCheckpoint.checkpoint();
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}


/**
* Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private void nonNumericValueValidationTest(SequenceNumberValidator validator,
boolean validateWithGetIterator) {

String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(),
SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(),
SentinelCheckpoint.TRIM_HORIZON.toString(),
SentinelCheckpoint.AT_TIMESTAMP.toString() };

for (String nonNumericString : nonNumericStrings) {
Expand Down

0 comments on commit 523cc0e

Please sign in to comment.