From aa47fce30bec2dab91a1e3c9a2148fad06b3f33e Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Fri, 5 Aug 2016 09:28:07 -0700 Subject: [PATCH] Added Time Based Iterator Support Added support for time based iterators. Time based iterators are only used if there is no current checkpoint for that shard, otherwise the sequence number of the checkpoint is used. --- NOTICE.txt | 2 +- pom.xml | 2 +- .../lib/checkpoint/SentinelCheckpoint.java | 8 +- .../lib/worker/InitialPositionInStream.java | 12 +- .../InitialPositionInStreamExtended.java | 78 ++++ .../lib/worker/InitializeTask.java | 9 +- .../worker/KinesisClientLibConfiguration.java | 39 +- .../lib/worker/KinesisDataFetcher.java | 67 +++- .../clientlibrary/lib/worker/ProcessTask.java | 6 +- .../lib/worker/ShardConsumer.java | 5 +- .../lib/worker/ShardSyncTask.java | 12 +- .../lib/worker/ShardSyncTaskManager.java | 6 +- .../clientlibrary/lib/worker/ShardSyncer.java | 72 +++- .../lib/worker/ShutdownTask.java | 6 +- .../lib/worker/StreamConfig.java | 9 +- .../clientlibrary/lib/worker/Worker.java | 18 +- .../clientlibrary/proxies/IKinesisProxy.java | 39 +- .../clientlibrary/proxies/KinesisProxy.java | 42 ++- ...etricsCollectingKinesisProxyDecorator.java | 37 +- .../types/ExtendedSequenceNumber.java | 23 +- .../worker/BlockOnParentShardTaskTest.java | 1 - .../KinesisClientLibConfigurationTest.java | 58 ++- .../lib/worker/KinesisDataFetcherTest.java | 69 ++-- .../lib/worker/ProcessTaskTest.java | 6 +- .../worker/SequenceNumberValidatorTest.java | 6 +- .../lib/worker/ShardConsumerTest.java | 108 +++++- .../worker/ShardSyncTaskIntegrationTest.java | 7 +- .../lib/worker/ShardSyncerTest.java | 336 +++++++++++++++--- .../lib/worker/ShutdownTaskTest.java | 41 +-- .../clientlibrary/lib/worker/WorkerTest.java | 57 ++- .../proxies/KinesisLocalFileProxy.java | 88 ++++- .../util/KinesisLocalFileDataCreator.java | 19 +- 32 files changed, 1040 insertions(+), 248 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java diff --git a/NOTICE.txt b/NOTICE.txt index 79e11d382..650c34d73 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1,3 +1,3 @@ AmazonKinesisClientLibrary -Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. diff --git a/pom.xml b/pom.xml index f6648d81b..b2da769e1 100644 --- a/pom.xml +++ b/pom.xml @@ -93,7 +93,7 @@ com.amazonaws DynamoDBLocal - 1.10.5.1 + 1.11.0.1 test diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/SentinelCheckpoint.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/SentinelCheckpoint.java index 65e00d221..d4442b82e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/SentinelCheckpoint.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/checkpoint/SentinelCheckpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -31,5 +31,9 @@ public enum SentinelCheckpoint { /** * We've completely processed all records in this shard. */ - SHARD_END; + SHARD_END, + /** + * Start from the record at or after the specified server-side timestamp. + */ + AT_TIMESTAMP } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream.java index 241683b18..94f9b4556 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -19,14 +19,18 @@ * This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents). */ public enum InitialPositionInStream { - /** * Start after the most recent data record (fetch new data). */ LATEST, - + /** * Start from the oldest available data record. */ - TRIM_HORIZON; + TRIM_HORIZON, + + /** + * Start from the record at or after the specified server-side timestamp. + */ + AT_TIMESTAMP } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java new file mode 100644 index 000000000..6a9948c7e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStreamExtended.java @@ -0,0 +1,78 @@ +/* + * Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.Date; + +/** + * Class that houses the entities needed to specify the position in the stream from where a new application should + * start. + */ +class InitialPositionInStreamExtended { + + private final InitialPositionInStream position; + private final Date timestamp; + + /** + * This is scoped as private to forbid callers from using it directly and to convey the intent to use the + * static methods instead. + * + * @param position One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The Amazon Kinesis Client Library will start + * fetching records from this position when the application starts up if there are no checkpoints. + * If there are checkpoints, we will process records from the checkpoint position. + * @param timestamp The timestamp to use with the AT_TIMESTAMP value for initialPositionInStream. + */ + private InitialPositionInStreamExtended(final InitialPositionInStream position, final Date timestamp) { + this.position = position; + this.timestamp = timestamp; + } + + /** + * Get the initial position in the stream where the application should start from. + * + * @return The initial position in stream. + */ + protected InitialPositionInStream getInitialPositionInStream() { + return this.position; + } + + /** + * Get the timestamp from where we need to start the application. + * Valid only for initial position of type AT_TIMESTAMP, returns null for other positions. + * + * @return The timestamp from where we need to start the application. + */ + protected Date getTimestamp() { + return this.timestamp; + } + + protected static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) { + switch (position) { + case LATEST: + return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null); + case TRIM_HORIZON: + return new InitialPositionInStreamExtended(InitialPositionInStream.TRIM_HORIZON, null); + default: + throw new IllegalArgumentException("Invalid InitialPosition: " + position); + } + } + + protected static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) { + if (timestamp == null) { + throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP"); + } + return new InitialPositionInStreamExtended(InitialPositionInStream.AT_TIMESTAMP, timestamp); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 617813a48..262b98c76 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ class InitializeTask implements ITask { private final RecordProcessorCheckpointer recordProcessorCheckpointer; // Back off for this interval if we encounter a problem (exception) private final long backoffTimeMillis; + private final StreamConfig streamConfig; /** * Constructor. @@ -50,13 +51,15 @@ class InitializeTask implements ITask { ICheckpoint checkpoint, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis) { + long backoffTimeMillis, + StreamConfig streamConfig) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.checkpoint = checkpoint; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.dataFetcher = dataFetcher; this.backoffTimeMillis = backoffTimeMillis; + this.streamConfig = streamConfig; } /* @@ -74,7 +77,7 @@ public TaskResult call() { LOG.debug("Initializing ShardId " + shardInfo.getShardId()); ExtendedSequenceNumber initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId()); - dataFetcher.initialize(initialCheckpoint.getSequenceNumber()); + dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 8e07e5012..0d45d359d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.Date; import java.util.Set; import com.amazonaws.ClientConfiguration; @@ -185,6 +186,7 @@ public class KinesisClientLibConfiguration { private int maxLeasesToStealAtOneTime; private int initialLeaseTableReadCapacity; private int initialLeaseTableWriteCapacity; + private InitialPositionInStreamExtended initialPositionInStreamExtended; /** * Constructor. @@ -263,7 +265,6 @@ public KinesisClientLibConfiguration(String applicationName, * with a call to Amazon Kinesis before checkpointing for calls to * {@link RecordProcessorCheckpointer#checkpoint(String)} * @param regionName The region name for the service - * */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES @@ -330,6 +331,8 @@ public KinesisClientLibConfiguration(String applicationName, this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + this.initialPositionInStreamExtended = + InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); } // Check if value is positive, otherwise throw an exception @@ -580,6 +583,22 @@ public int getInitialLeaseTableWriteCapacity() { return initialLeaseTableWriteCapacity; } + /** + * Keeping it protected to forbid outside callers from depending on this internal object. + * @return The initialPositionInStreamExtended object. + */ + protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() { + return initialPositionInStreamExtended; + } + + /** + * @return The timestamp from where we need to start the application. + * Valid only for initial position of type AT_TIMESTAMP, returns null for other positions. + */ + public Date getTimestampAtInitialPositionInStream() { + return initialPositionInStreamExtended.getTimestamp(); + } + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** * @param tableName name of the lease table in DynamoDB @@ -600,13 +619,25 @@ public KinesisClientLibConfiguration withKinesisEndpoint(String kinesisEndpoint) } /** - * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library will start - * fetching records from this position when the application starts up if there are no checkpoints. If there - * are checkpoints, we will process records from the checkpoint position. + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library + * will start fetching records from this position when the application starts up if there are no checkpoints. + * If there are checkpoints, we will process records from the checkpoint position. * @return KinesisClientLibConfiguration */ public KinesisClientLibConfiguration withInitialPositionInStream(InitialPositionInStream initialPositionInStream) { this.initialPositionInStream = initialPositionInStream; + this.initialPositionInStreamExtended = + InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + return this; + } + + /** + * @param timestamp The timestamp to use with the AT_TIMESTAMP value for initialPositionInStream. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withTimestampAtInitialPositionInStream(Date timestamp) { + this.initialPositionInStream = InitialPositionInStream.AT_TIMESTAMP; + this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp); return this; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 14d0448cc..2ce3152a2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -25,6 +25,8 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import java.util.Date; + /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ @@ -41,8 +43,7 @@ class KinesisDataFetcher { /** * * @param kinesisProxy Kinesis proxy - * @param shardId shardId (we'll fetch data for this shard) - * @param checkpoint used to get current checkpoint from which to start fetching records + * @param shardInfo The shardInfo object. */ public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) { this.shardId = shardInfo.getShardId(); @@ -83,17 +84,18 @@ public GetRecordsResult getRecords(int maxRecords) { /** * Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number. * @param initialCheckpoint Current checkpoint sequence number for this shard. - * + * @param initialPositionInStream The initialPositionInStream. */ - public void initialize(String initialCheckpoint) { + public void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream) { LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint); - advanceIteratorTo(initialCheckpoint); + advanceIteratorTo(initialCheckpoint, initialPositionInStream); isInitialized = true; } - - public void initialize(ExtendedSequenceNumber initialCheckpoint) { + + public void initialize(ExtendedSequenceNumber initialCheckpoint, + InitialPositionInStreamExtended initialPositionInStream) { LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber()); - advanceIteratorTo(initialCheckpoint.getSequenceNumber()); + advanceIteratorTo(initialCheckpoint.getSequenceNumber(), initialPositionInStream); isInitialized = true; } @@ -101,14 +103,17 @@ public void initialize(ExtendedSequenceNumber initialCheckpoint) { * Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number. * * @param sequenceNumber advance the iterator to the record at this sequence number. + * @param initialPositionInStream The initialPositionInStream. */ - void advanceIteratorTo(String sequenceNumber) { + void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { if (sequenceNumber == null) { throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId); } else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) { - nextIterator = getIterator(ShardIteratorType.LATEST.toString(), null); + nextIterator = getIterator(ShardIteratorType.LATEST.toString()); } else if (sequenceNumber.equals(SentinelCheckpoint.TRIM_HORIZON.toString())) { - nextIterator = getIterator(ShardIteratorType.TRIM_HORIZON.toString(), null); + nextIterator = getIterator(ShardIteratorType.TRIM_HORIZON.toString()); + } else if (sequenceNumber.equals(SentinelCheckpoint.AT_TIMESTAMP.toString())) { + nextIterator = getIterator(initialPositionInStream.getTimestamp()); } else if (sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString())) { nextIterator = null; } else { @@ -120,8 +125,8 @@ void advanceIteratorTo(String sequenceNumber) { } /** - * @param iteratorType - * @param sequenceNumber + * @param iteratorType The iteratorType - either AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. + * @param sequenceNumber The sequenceNumber. * * @return iterator or null if we catch a ResourceNotFound exception */ @@ -139,6 +144,40 @@ private String getIterator(String iteratorType, String sequenceNumber) { return iterator; } + /** + * @param iteratorType The iteratorType - either TRIM_HORIZON or LATEST. + * @return iterator or null if we catch a ResourceNotFound exception + */ + private String getIterator(String iteratorType) { + String iterator = null; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Calling getIterator for " + shardId + " and iterator type " + iteratorType); + } + iterator = kinesisProxy.getIterator(shardId, iteratorType); + } catch (ResourceNotFoundException e) { + LOG.info("Caught ResourceNotFoundException when getting an iterator for shard " + shardId, e); + } + return iterator; + } + + /** + * @param timestamp The timestamp. + * @return iterator or null if we catch a ResourceNotFound exception + */ + private String getIterator(Date timestamp) { + String iterator = null; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Calling getIterator for " + shardId + " and timestamp " + timestamp); + } + iterator = kinesisProxy.getIterator(shardId, timestamp); + } catch (ResourceNotFoundException e) { + LOG.info("Caught ResourceNotFoundException when getting an iterator for shard " + shardId, e); + } + return iterator; + } + /** * @return the shardEndReached */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 47ee7a5d6..db0970d5b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -259,8 +259,8 @@ private GetRecordsResult getRecordsResult() { * Advance the iterator to after the greatest processed sequence number (remembered by * recordProcessorCheckpointer). */ - dataFetcher.advanceIteratorTo( - recordProcessorCheckpointer.getLargestPermittedCheckpointValue().getSequenceNumber()); + dataFetcher.advanceIteratorTo(recordProcessorCheckpointer.getLargestPermittedCheckpointValue() + .getSequenceNumber(), streamConfig.getInitialPositionInStream()); // Try a second time - if we fail this time, expose the failure. try { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 8445e6843..10dacc047 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -252,7 +252,8 @@ private ITask getNextTask() { checkpoint, recordProcessorCheckpointer, dataFetcher, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + streamConfig); break; case PROCESSING: nextTask = diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java index f0db8cda9..ddfb8459b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ class ShardSyncTask implements ITask { private final IKinesisProxy kinesisProxy; private final ILeaseManager leaseManager; - private InitialPositionInStream initialPosition; + private InitialPositionInStreamExtended initialPosition; private final boolean cleanupLeasesUponShardCompletion; private final long shardSyncTaskIdleTimeMillis; private final TaskType taskType = TaskType.SHARDSYNC; @@ -41,13 +41,13 @@ class ShardSyncTask implements ITask { /** * @param kinesisProxy Used to fetch information about the stream (e.g. shard list) * @param leaseManager Used to fetch and create leases - * @param initialPosition One of LATEST or TRIM_HORIZON. Amazon Kinesis Client Library will start processing records - * from this point in the stream (when an application starts up for the first time) except for shards that - * already have a checkpoint (and their descendant shards). + * @param initialPositionInStream One of LATEST, TRIM_HORIZON or AT_TIMESTAMP. Amazon Kinesis Client Library will + * start processing records from this point in the stream (when an application starts up for the first time) + * except for shards that already have a checkpoint (and their descendant shards). */ ShardSyncTask(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, - InitialPositionInStream initialPositionInStream, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion, long shardSyncTaskIdleTimeMillis) { this.kinesisProxy = kinesisProxy; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java index 7fffe1230..c1bfae760 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ class ShardSyncTaskManager { private final ILeaseManager leaseManager; private final IMetricsFactory metricsFactory; private final ExecutorService executorService; - private final InitialPositionInStream initialPositionInStream; + private final InitialPositionInStreamExtended initialPositionInStream; private boolean cleanupLeasesUponShardCompletion; private final long shardSyncIdleTimeMillis; @@ -61,7 +61,7 @@ class ShardSyncTaskManager { */ ShardSyncTaskManager(final IKinesisProxy kinesisProxy, final ILeaseManager leaseManager, - final InitialPositionInStream initialPositionInStream, + final InitialPositionInStreamExtended initialPositionInStream, final boolean cleanupLeasesUponShardCompletion, final long shardSyncIdleTimeMillis, final IMetricsFactory metricsFactory, diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java index 97298d489..52944200f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -59,7 +59,7 @@ private ShardSyncer() { static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, - InitialPositionInStream initialPositionInStream, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards); @@ -82,7 +82,7 @@ static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, */ static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, - InitialPositionInStream initialPositionInStream, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards); @@ -106,7 +106,7 @@ static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisP // CHECKSTYLE:OFF CyclomaticComplexity private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, - InitialPositionInStream initialPosition, + InitialPositionInStreamExtended initialPosition, boolean cleanupLeasesOfCompletedShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { List shards = getShardList(kinesisProxy); @@ -327,15 +327,15 @@ private static List getShardList(IKinesisProxy kinesisProxy) throws Kines * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail * before creating all the leases. * - * @param shardIds Set of all shardIds in Kinesis (we'll create new leases based on this set) + * @param shards List of all shards in Kinesis (we'll create new leases based on this set) * @param currentLeases List of current leases - * @param initialPosition One of LATEST or TRIM_HORIZON. We'll start fetching records from that location in the - * shard (when an application starts up for the first time - and there are no checkpoints). + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that + * location in the shard (when an application starts up for the first time - and there are no checkpoints). * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard */ static List determineNewLeasesToCreate(List shards, List currentLeases, - InitialPositionInStream initialPosition) { + InitialPositionInStreamExtended initialPosition) { Map shardIdToNewLeaseMap = new HashMap(); Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards); @@ -364,7 +364,32 @@ static List determineNewLeasesToCreate(List shards, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap, memoizationContext); - if (isDescendant) { + + /** + * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the + * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a + * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side + * timestamp at or after the specified initial position timestamp. + * + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5 - shards till epoch 102 + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * + * Current leases: empty set + * + * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with + * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to + * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin + * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases + * would then be deleted since they won't have records with server-side timestamp at/after 206. And + * after that we will begin processing the descendant shards with epoch at/after 206 and we will + * return the records that meet the timestamp requirement for these shards. + */ + if (isDescendant && !initialPosition.getInitialPositionInStream() + .equals(InitialPositionInStream.AT_TIMESTAMP)) { newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); } else { newLease.setCheckpoint(convertToCheckpoint(initialPosition)); @@ -388,8 +413,10 @@ static List determineNewLeasesToCreate(List shards, * Create leases for the ancestors of this shard as required. * See javadoc of determineNewLeasesToCreate() for rules and example. * - * @param shardIds Ancestors of these shards will be considered for addition into the new lease map - * @param shardIdsOfCurrentLeases + * @param shardId The shardId to check. + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that + * location in the shard (when an application starts up for the first time - and there are no checkpoints). + * @param shardIdsOfCurrentLeases The shardIds for the current leases. * @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream. * @param shardIdToLeaseMapOfNewShards Add lease POJOs corresponding to ancestors to this map. * @param memoizationContext Memoization of shards that have been evaluated as part of the evaluation @@ -397,7 +424,7 @@ static List determineNewLeasesToCreate(List shards, */ // CHECKSTYLE:OFF CyclomaticComplexity static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, - InitialPositionInStream initialPosition, + InitialPositionInStreamExtended initialPosition, Set shardIdsOfCurrentLeases, Map shardIdToShardMapOfAllKinesisShards, Map shardIdToLeaseMapOfNewShards, @@ -449,7 +476,9 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, shardIdToLeaseMapOfNewShards.put(parentShardId, lease); } - if (descendantParentShardIds.contains(parentShardId)) { + if (descendantParentShardIds.contains(parentShardId) + && !initialPosition.getInitialPositionInStream() + .equals(InitialPositionInStream.AT_TIMESTAMP)) { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); } else { lease.setCheckpoint(convertToCheckpoint(initialPosition)); @@ -457,8 +486,13 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(String shardId, } } } else { - // This shard should be included, if the customer wants to process all records in the stream. - if (initialPosition.equals(InitialPositionInStream.TRIM_HORIZON)) { + // This shard should be included, if the customer wants to process all records in the stream or + // if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a lease just like we do + // for TRIM_HORIZON. However we will only return back records with server-side timestamp at or + // after the specified initial position timestamp. + if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON) + || initialPosition.getInitialPositionInStream() + .equals(InitialPositionInStream.AT_TIMESTAMP)) { isDescendant = true; } } @@ -737,13 +771,15 @@ static List getOpenShards(List allShards) { return openShards; } - private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStream position) { + private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended position) { ExtendedSequenceNumber checkpoint = null; - if (position.equals(InitialPositionInStream.TRIM_HORIZON)) { + if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) { checkpoint = ExtendedSequenceNumber.TRIM_HORIZON; - } else if (position.equals(InitialPositionInStream.LATEST)) { + } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) { checkpoint = ExtendedSequenceNumber.LATEST; + } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { + checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP; } return checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index ecf4873ec..3ce052036 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ class ShutdownTask implements ITask { private final ShutdownReason reason; private final IKinesisProxy kinesisProxy; private final ILeaseManager leaseManager; - private final InitialPositionInStream initialPositionInStream; + private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; @@ -56,7 +56,7 @@ class ShutdownTask implements ITask { RecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownReason reason, IKinesisProxy kinesisProxy, - InitialPositionInStream initialPositionInStream, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, ILeaseManager leaseManager, long backoffTimeMillis) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java index 2b7120fdf..b5c283fbe 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ class StreamConfig { private final int maxRecords; private final long idleTimeInMilliseconds; private final boolean callProcessRecordsEvenForEmptyRecordList; - private InitialPositionInStream initialPositionInStream; + private InitialPositionInStreamExtended initialPositionInStream; private final boolean validateSequenceNumberBeforeCheckpointing; /** @@ -42,7 +42,7 @@ class StreamConfig { long idleTimeInMilliseconds, boolean callProcessRecordsEvenForEmptyRecordList, boolean validateSequenceNumberBeforeCheckpointing, - InitialPositionInStream initialPositionInStream) { + InitialPositionInStreamExtended initialPositionInStream) { this.streamProxy = proxy; this.maxRecords = maxRecords; this.idleTimeInMilliseconds = idleTimeInMilliseconds; @@ -82,7 +82,7 @@ boolean shouldCallProcessRecordsEvenForEmptyRecordList() { /** * @return the initialPositionInStream */ - InitialPositionInStream getInitialPositionInStream() { + InitialPositionInStreamExtended getInitialPositionInStream() { return initialPositionInStream; } @@ -92,5 +92,4 @@ InitialPositionInStream getInitialPositionInStream() { boolean shouldValidateSequenceNumberBeforeCheckpointing() { return validateSequenceNumberBeforeCheckpointing; } - } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 6aec2346f..50861c290 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -64,7 +64,7 @@ public class Worker implements Runnable { private final String applicationName; private final IRecordProcessorFactory recordProcessorFactory; private final StreamConfig streamConfig; - private final InitialPositionInStream initialPosition; + private final InitialPositionInStreamExtended initialPosition; private final ICheckpoint checkpointTracker; private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing @@ -212,8 +212,8 @@ public Worker( config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing(), - config.getInitialPositionInStream()), - config.getInitialPositionInStream(), + config.getInitialPositionInStreamExtended()), + config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), @@ -258,9 +258,9 @@ public Worker( * @param applicationName Name of the Kinesis application * @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param streamConfig Stream configuration - * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching data - * from this location in the stream when an application starts up for the first time and there are no - * checkpoints. If there are checkpoints, we start from the checkpoint position. + * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start + * fetching data from this location in the stream when an application starts up for the first time and + * there are no checkpoints. If there are checkpoints, we start from the checkpoint position. * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in @@ -277,7 +277,7 @@ public Worker( Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, - InitialPositionInStream initialPositionInStream, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, @@ -946,8 +946,8 @@ public Worker build() { config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing(), - config.getInitialPositionInStream()), - config.getInitialPositionInStream(), + config.getInitialPositionInStreamExtended()), + config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java index 8dbb97fa3..df7f951d0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/IKinesisProxy.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; import java.nio.ByteBuffer; +import java.util.Date; import java.util.List; import java.util.Set; @@ -72,7 +73,16 @@ GetRecordsResult get(String shardIterator, int maxRecords) /** * Fetch a shard iterator from the specified position in the shard. - * + * This is to fetch a shard iterator for ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER which + * requires the starting sequence number. + * + * NOTE: Currently this method continues to fetch iterators for ShardIteratorTypes TRIM_HORIZON, LATEST, + * AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER. + * But this behavior will change in the next release, after which this method will only serve + * AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER ShardIteratorTypes. + * We recommend users who call this method directly to use the appropriate getIterator method based on the + * ShardIteratorType. + * * @param shardId Shard id * @param iteratorEnum one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER * @param sequenceNumber the sequence number - must be null unless iteratorEnum is AT_SEQUENCE_NUMBER or @@ -84,6 +94,31 @@ GetRecordsResult get(String shardIterator, int maxRecords) String getIterator(String shardId, String iteratorEnum, String sequenceNumber) throws ResourceNotFoundException, InvalidArgumentException; + /** + * Fetch a shard iterator from the specified position in the shard. + * This is to fetch a shard iterator for ShardIteratorType LATEST or TRIM_HORIZON which doesn't require a starting + * sequence number. + * + * @param shardId Shard id + * @param iteratorEnum Either TRIM_HORIZON or LATEST. + * @return shard iterator which can be used to read data from Kinesis. + * @throws ResourceNotFoundException The Kinesis stream or shard was not found + * @throws InvalidArgumentException Invalid input parameters + */ + String getIterator(String shardId, String iteratorEnum) throws ResourceNotFoundException, InvalidArgumentException; + + /** + * Fetch a shard iterator from the specified position in the shard. + * This is to fetch a shard iterator for ShardIteratorType AT_TIMESTAMP which requires the timestamp field. + * + * @param shardId Shard id + * @param timestamp The timestamp. + * @return shard iterator which can be used to read data from Kinesis. + * @throws ResourceNotFoundException The Kinesis stream or shard was not found + * @throws InvalidArgumentException Invalid input parameters + */ + String getIterator(String shardId, Date timestamp) throws ResourceNotFoundException, InvalidArgumentException; + /** * @param sequenceNumberForOrdering (optional) used for record ordering * @param explicitHashKey optionally supplied transformation of partitionkey diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index 0cf36da74..ad929c210 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -40,6 +41,7 @@ import com.amazonaws.services.kinesis.model.PutRecordResult; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamStatus; /** @@ -263,12 +265,50 @@ public Set getAllShardIds() throws ResourceNotFoundException { */ @Override public String getIterator(String shardId, String iteratorType, String sequenceNumber) { + if (!iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString()) || !iteratorType.equals( + ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString())) { + LOG.info("This method should only be used for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER " + + "ShardIteratorTypes. For methods to use with other ShardIteratorTypes, see IKinesisProxy.java"); + } final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); getShardIteratorRequest.setStreamName(streamName); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(iteratorType); getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber); + getShardIteratorRequest.setTimestamp(null); + final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest); + return response.getShardIterator(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getIterator(String shardId, String iteratorType) { + final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); + getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); + getShardIteratorRequest.setStreamName(streamName); + getShardIteratorRequest.setShardId(shardId); + getShardIteratorRequest.setShardIteratorType(iteratorType); + getShardIteratorRequest.setStartingSequenceNumber(null); + getShardIteratorRequest.setTimestamp(null); + final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest); + return response.getShardIterator(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getIterator(String shardId, Date timestamp) { + final GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); + getShardIteratorRequest.setRequestCredentials(credentialsProvider.getCredentials()); + getShardIteratorRequest.setStreamName(streamName); + getShardIteratorRequest.setShardId(shardId); + getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP); + getShardIteratorRequest.setStartingSequenceNumber(null); + getShardIteratorRequest.setTimestamp(timestamp); final GetShardIteratorResult response = client.getShardIterator(getShardIteratorRequest); return response.getShardIterator(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java index 7263351e6..d27fc6a1f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/MetricsCollectingKinesisProxyDecorator.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies; import java.nio.ByteBuffer; +import java.util.Date; import java.util.List; import java.util.Set; @@ -128,6 +129,40 @@ public String getIterator(String shardId, String iteratorEnum, String sequenceNu } } + /** + * {@inheritDoc} + */ + @Override + public String getIterator(String shardId, String iteratorEnum) + throws ResourceNotFoundException, InvalidArgumentException { + long startTime = System.currentTimeMillis(); + boolean success = false; + try { + String response = other.getIterator(shardId, iteratorEnum); + success = true; + return response; + } finally { + MetricsHelper.addSuccessAndLatency(getIteratorMetric, startTime, success, MetricsLevel.DETAILED); + } + } + + /** + * {@inheritDoc} + */ + @Override + public String getIterator(String shardId, Date timestamp) + throws ResourceNotFoundException, InvalidArgumentException { + long startTime = System.currentTimeMillis(); + boolean success = false; + try { + String response = other.getIterator(shardId, timestamp); + success = true; + return response; + } finally { + MetricsHelper.addSuccessAndLatency(getIteratorMetric, startTime, success, MetricsLevel.DETAILED); + } + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java index 0202a17a6..1ed7ed67e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ExtendedSequenceNumber.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -36,9 +36,10 @@ public class ExtendedSequenceNumber implements Comparable expectedRecords = new ArrayList(); GetRecordsResult response = new GetRecordsResult(); response.setRecords(expectedRecords); - - when(kinesis.getIterator(SHARD_ID, iteratorType, null)).thenReturn(iterator); + when(kinesis.getIterator(SHARD_ID, initialPositionInStream.getTimestamp())).thenReturn(iterator); when(kinesis.getIterator(SHARD_ID, AT_SEQUENCE_NUMBER, seqNo)).thenReturn(iterator); + when(kinesis.getIterator(SHARD_ID, iteratorType)).thenReturn(iterator); when(kinesis.get(iterator, MAX_RECORDS)).thenReturn(response); ICheckpoint checkpoint = mock(ICheckpoint.class); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); - - fetcher.initialize(seqNo); + fetcher.initialize(seqNo, initialPositionInStream); List actualRecords = fetcher.getRecords(MAX_RECORDS).getRecords(); Assert.assertEquals(expectedRecords, actualRecords); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index 5385d05e9..6576e47f8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -66,7 +66,8 @@ private static class RecordSubclass extends Record {} private final boolean callProcessRecordsForEmptyRecordList = true; // We don't want any of these tests to run checkpoint validation private final boolean skipCheckpointValidationValue = false; - private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST; + private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); private @Mock KinesisDataFetcher mockDataFetcher; private @Mock IRecordProcessor mockRecordProcessor; @@ -84,7 +85,8 @@ public void setUpProcessTask() { // Set up process task final StreamConfig config = new StreamConfig(null, maxRecords, idleTimeMillis, callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, initialPositionInStream); + skipCheckpointValidationValue, + INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null); processTask = new ProcessTask( shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java index ce222f9ee..aae93f296 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidatorTest.java @@ -86,9 +86,9 @@ private void nonNumericValueValidationTest(SequenceNumberValidator validator, IKinesisProxy proxy, boolean validateWithGetIterator) { - String[] nonNumericStrings = - { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(), - SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString() }; + String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(), + SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(), + SentinelCheckpoint.AT_TIMESTAMP.toString() }; for (String nonNumericString : nonNumericStrings) { try { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 27f8f13c5..d8d393778 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -33,6 +34,7 @@ import java.io.File; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.ListIterator; import java.util.concurrent.ExecutionException; @@ -77,7 +79,8 @@ public class ShardConsumerTest { private final boolean cleanupLeasesOfCompletedShards = true; // We don't want any of these tests to run checkpoint validation private final boolean skipCheckpointValidationValue = false; - private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST; + private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // ... a non-final public class, and so can be mocked and spied. @@ -102,8 +105,7 @@ public final void testInitializationStateUponFailure() throws Exception { 1, 10, callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, - initialPositionInStream); + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -153,8 +155,7 @@ public final void testInitializationStateUponSubmissionFailure() throws Exceptio 1, 10, callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, - initialPositionInStream); + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -198,8 +199,7 @@ public final void testRecordProcessorThrowable() throws Exception { 1, 10, callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, - initialPositionInStream); + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -282,13 +282,102 @@ public final void testConsumeShard() throws Exception { TestStreamlet processor = new TestStreamlet(); + StreamConfig streamConfig = + new StreamConfig(fileBasedProxy, + maxRecords, + idleTimeMS, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null); + ShardConsumer consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis); + + assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); // check on parent shards + Thread.sleep(50L); + consumer.consumeShard(); // start initialization + assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + consumer.consumeShard(); // initialize + Thread.sleep(50L); + + // We expect to process all records in numRecs calls + for (int i = 0; i < numRecs;) { + boolean newTaskSubmitted = consumer.consumeShard(); + if (newTaskSubmitted) { + LOG.debug("New processing task was submitted, call # " + i); + assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); + // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES + i += maxRecords; + } + Thread.sleep(50L); + } + + assertThat(processor.getShutdownReason(), nullValue()); + consumer.beginShutdown(); + Thread.sleep(50L); + assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN))); + consumer.beginShutdown(); + assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + + String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString()); + List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); + file.delete(); + } + + /** + * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()} + * that starts from initial position of type AT_TIMESTAMP. + */ + @Test + public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception { + int numRecs = 7; + BigInteger startSeqNum = BigInteger.ONE; + Date timestamp = new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP + 3); + InitialPositionInStreamExtended atTimestamp = + InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp); + String streamShardId = "kinesis-0-0"; + String testConcurrencyToken = "testToken"; + File file = + KinesisLocalFileDataCreator.generateTempDataFile(1, + "kinesis-0-", + numRecs, + startSeqNum, + "unitTestSCT002"); + + IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); + + final int maxRecords = 2; + final int idleTimeMS = 0; // keep unit tests fast + ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); + checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); + @SuppressWarnings("unchecked") + ILeaseManager leaseManager = mock(ILeaseManager.class); + when(leaseManager.getLease(anyString())).thenReturn(null); + + TestStreamlet processor = new TestStreamlet(); + StreamConfig streamConfig = new StreamConfig(fileBasedProxy, maxRecords, idleTimeMS, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, - initialPositionInStream); + atTimestamp); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null); ShardConsumer consumer = @@ -334,9 +423,10 @@ public final void testConsumeShard() throws Exception { executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); - String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString(), null); + String iterator = fileBasedProxy.getIterator(streamShardId, timestamp); List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); + assertEquals(4, processor.getProcessedRecords().size()); file.delete(); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java index 6843efbd0..307596e39 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTaskIntegrationTest.java @@ -120,8 +120,11 @@ public final void testCall() throws DependencyException, InvalidStateException, } leaseManager.deleteAll(); Set shardIds = kinesisProxy.getAllShardIds(); - ShardSyncTask syncTask = - new ShardSyncTask(kinesisProxy, leaseManager, InitialPositionInStream.LATEST, false, 0L); + ShardSyncTask syncTask = new ShardSyncTask(kinesisProxy, + leaseManager, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), + false, + 0L); syncTask.call(); List leases = leaseManager.listLeases(); Set leaseKeys = new HashSet(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index f02943b4b..b8f6ae563 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -59,9 +60,14 @@ // CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES public class ShardSyncerTest { private static final Log LOG = LogFactory.getLog(ShardSyncer.class); - private final InitialPositionInStream latestPosition = InitialPositionInStream.LATEST; + private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); + private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = + InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000L)); private final boolean cleanupLeasesOfCompletedShards = true; - AmazonDynamoDB ddbClient = DynamoDBEmbedded.create(); + AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); LeaseManager leaseManager = new KinesisClientLeaseManager("tempTestTable", ddbClient); private static final int EXPONENT = 128; /** @@ -111,8 +117,7 @@ public final void testDetermineNewLeasesToCreateNoShards() { List shards = new ArrayList(); List leases = new ArrayList(); - Assert.assertTrue( - ShardSyncer.determineNewLeasesToCreate(shards, leases, InitialPositionInStream.LATEST).isEmpty()); + Assert.assertTrue(ShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty()); } /** @@ -131,7 +136,7 @@ public final void testDetermineNewLeasesToCreate0Leases0Reshards() { shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); List newLeases = - ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST); + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); Assert.assertEquals(2, newLeases.size()); Set expectedLeaseShardIds = new HashSet(); expectedLeaseShardIds.add(shardId0); @@ -154,7 +159,7 @@ public final void testDetermineNewLeasesToCreate0Leases0Reshards() { public final void testBootstrapShardLeasesAtTrimHorizon() throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException, KinesisClientLibIOException { - testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.TRIM_HORIZON); + testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_TRIM_HORIZON); } /** @@ -170,7 +175,7 @@ public final void testBootstrapShardLeasesAtTrimHorizon() public final void testBootstrapShardLeasesAtLatest() throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException, KinesisClientLibIOException { - testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.LATEST); + testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST); } /** @@ -189,9 +194,7 @@ public final void testCheckAndCreateLeasesForNewShardsAtLatest() dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); - ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, - leaseManager, - InitialPositionInStream.LATEST, + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_LATEST, cleanupLeasesOfCompletedShards); List newLeases = leaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet(); @@ -223,9 +226,7 @@ public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); - ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, - leaseManager, - InitialPositionInStream.TRIM_HORIZON, + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards); List newLeases = leaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet(); @@ -240,6 +241,37 @@ public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() dataFile.delete(); } + /** + * @throws KinesisClientLibIOException + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws IOException + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsAtTimestamp() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, + ProvisionedThroughputException, IOException { + List shards = constructShardListForGraphA(); + File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 1, "testBootstrap1"); + dataFile.deleteOnExit(); + IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); + + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_AT_TIMESTAMP, + cleanupLeasesOfCompletedShards); + List newLeases = leaseManager.listLeases(); + Set expectedLeaseShardIds = new HashSet(); + for (int i = 0; i < 11; i++) { + expectedLeaseShardIds.add("shardId-" + i); + } + Assert.assertEquals(expectedLeaseShardIds.size(), newLeases.size()); + for (KinesisClientLease lease1 : newLeases) { + Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey())); + Assert.assertEquals(ExtendedSequenceNumber.AT_TIMESTAMP, lease1.getCheckpoint()); + } + dataFile.delete(); + } + /** * @throws KinesisClientLibIOException * @throws DependencyException @@ -259,9 +291,7 @@ public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpen() dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); - ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, - leaseManager, - InitialPositionInStream.TRIM_HORIZON, + ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards); dataFile.delete(); } @@ -275,9 +305,10 @@ public final void testCheckAndCreateLeasesForNewShardsWhenParentIsOpen() */ @Test public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShard() - throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, - IOException { - testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl(null, Integer.MAX_VALUE); + throws KinesisClientLibIOException, DependencyException, InvalidStateException, + ProvisionedThroughputException, IOException { + testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(null, + Integer.MAX_VALUE, INITIAL_POSITION_TRIM_HORIZON); } /** @@ -295,8 +326,8 @@ public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShar // From the Shard Graph, the max count of calling could be 10 int maxCallingCount = 10; for (int c = 1; c <= maxCallingCount; c = c + 2) { - testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl( - ExceptionThrowingLeaseManagerMethods.DELETELEASE, c); + testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( + ExceptionThrowingLeaseManagerMethods.DELETELEASE, c, INITIAL_POSITION_TRIM_HORIZON); // Need to clean up lease manager every time after calling ShardSyncer leaseManager.deleteAll(); } @@ -317,8 +348,8 @@ public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShar // From the Shard Graph, the max count of calling could be 10 int maxCallingCount = 10; for (int c = 1; c <= maxCallingCount; c = c + 2) { - testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl( - ExceptionThrowingLeaseManagerMethods.LISTLEASES, c); + testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( + ExceptionThrowingLeaseManagerMethods.LISTLEASES, c, INITIAL_POSITION_TRIM_HORIZON); // Need to clean up lease manager every time after calling ShardSyncer leaseManager.deleteAll(); } @@ -339,8 +370,8 @@ public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShar // From the Shard Graph, the max count of calling could be 10 int maxCallingCount = 5; for (int c = 1; c <= maxCallingCount; c = c + 2) { - testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl( - ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c); + testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( + ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, c,INITIAL_POSITION_TRIM_HORIZON); // Need to clean up lease manager every time after calling ShardSyncer leaseManager.deleteAll(); } @@ -352,7 +383,7 @@ public final void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShar // 2). exceptionTime is a very big or negative value. private void retryCheckAndCreateLeaseForNewShards(IKinesisProxy kinesisProxy, ExceptionThrowingLeaseManagerMethods exceptionMethod, - int exceptionTime) + int exceptionTime, InitialPositionInStreamExtended position) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException { if (exceptionMethod != null) { ExceptionThrowingLeaseManager exceptionThrowingLeaseManager = @@ -364,7 +395,7 @@ private void retryCheckAndCreateLeaseForNewShards(IKinesisProxy kinesisProxy, try { ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, exceptionThrowingLeaseManager, - InitialPositionInStream.TRIM_HORIZON, + position, cleanupLeasesOfCompletedShards); return; } catch (LeasingException e) { @@ -376,28 +407,116 @@ private void retryCheckAndCreateLeaseForNewShards(IKinesisProxy kinesisProxy, } else { ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, - InitialPositionInStream.TRIM_HORIZON, + position, cleanupLeasesOfCompletedShards); } } + /** + * @throws KinesisClientLibIOException + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws IOException + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShard() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, + ProvisionedThroughputException, IOException { + testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl(null, + Integer.MAX_VALUE, INITIAL_POSITION_AT_TIMESTAMP); + } + + /** + * @throws KinesisClientLibIOException + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws IOException + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithDeleteLeaseExceptions() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, + IOException { + // Define the max calling count for lease manager methods. + // From the Shard Graph, the max count of calling could be 10 + int maxCallingCount = 10; + for (int c = 1; c <= maxCallingCount; c = c + 2) { + testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( + ExceptionThrowingLeaseManagerMethods.DELETELEASE, + c, INITIAL_POSITION_AT_TIMESTAMP); + // Need to clean up lease manager every time after calling ShardSyncer + leaseManager.deleteAll(); + } + } + + /** + * @throws KinesisClientLibIOException + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws IOException + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithListLeasesExceptions() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, + IOException { + // Define the max calling count for lease manager methods. + // From the Shard Graph, the max count of calling could be 10 + int maxCallingCount = 10; + for (int c = 1; c <= maxCallingCount; c = c + 2) { + testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( + ExceptionThrowingLeaseManagerMethods.LISTLEASES, + c, INITIAL_POSITION_AT_TIMESTAMP); + // Need to clean up lease manager every time after calling ShardSyncer + leaseManager.deleteAll(); + } + } + + /** + * @throws KinesisClientLibIOException + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws IOException + */ + @Test + public final void testCheckAndCreateLeasesForNewShardsAtTimestampAndClosedShardWithCreateLeaseExceptions() + throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, + IOException { + // Define the max calling count for lease manager methods. + // From the Shard Graph, the max count of calling could be 10 + int maxCallingCount = 5; + for (int c = 1; c <= maxCallingCount; c = c + 2) { + testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( + ExceptionThrowingLeaseManagerMethods.CREATELEASEIFNOTEXISTS, + c, INITIAL_POSITION_AT_TIMESTAMP); + // Need to clean up lease manager every time after calling ShardSyncer + leaseManager.deleteAll(); + } + } + // Real implementation of testing CheckAndCreateLeasesForNewShards with different leaseManager types. - private void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl( - ExceptionThrowingLeaseManagerMethods exceptionMethod, int exceptionTime) + private void testCheckAndCreateLeasesForNewShardsAtSpecifiedPositionAndClosedShardImpl( + ExceptionThrowingLeaseManagerMethods exceptionMethod, + int exceptionTime, + InitialPositionInStreamExtended position) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { + ExtendedSequenceNumber extendedSequenceNumber = + new ExtendedSequenceNumber(position.getInitialPositionInStream().toString()); List shards = constructShardListForGraphA(); File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); - retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime); + retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime, position); List newLeases = leaseManager.listLeases(); Map expectedShardIdToCheckpointMap = new HashMap(); for (int i = 0; i < 11; i++) { - expectedShardIdToCheckpointMap.put("shardId-" + i, ExtendedSequenceNumber.TRIM_HORIZON); + expectedShardIdToCheckpointMap.put("shardId-" + i, extendedSequenceNumber); } Assert.assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size()); for (KinesisClientLease lease1 : newLeases) { @@ -415,7 +534,7 @@ private void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardImpl leaseManager.updateLease(childShardLease); expectedShardIdToCheckpointMap.put(childShardLease.getLeaseKey(), new ExtendedSequenceNumber("34290")); - retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime); + retryCheckAndCreateLeaseForNewShards(kinesisProxy, exceptionMethod, exceptionTime, position); newLeases = leaseManager.listLeases(); Assert.assertEquals(expectedShardIdToCheckpointMap.size(), newLeases.size()); @@ -449,11 +568,11 @@ public final void testBootstrapShardLeasesCleanupGarbage() garbageLease.setCheckpoint(new ExtendedSequenceNumber("999")); leaseManager.createLeaseIfNotExists(garbageLease); Assert.assertEquals(garbageShardId, leaseManager.getLease(garbageShardId).getLeaseKey()); - testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream.LATEST); + testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST); Assert.assertNull(leaseManager.getLease(garbageShardId)); } - private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream initialPosition) + private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition) throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException, KinesisClientLibIOException { List shards = new ArrayList(); @@ -463,7 +582,7 @@ private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream shards.add(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange)); String shardId1 = "shardId-1"; shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); - File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 10, "testBootstrap1"); + File dataFile = KinesisLocalFileDataCreator.generateTempDataFile(shards, 2, "testBootstrap1"); dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); @@ -475,7 +594,8 @@ private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStream expectedLeaseShardIds.add(shardId1); for (KinesisClientLease lease1 : newLeases) { Assert.assertTrue(expectedLeaseShardIds.contains(lease1.getLeaseKey())); - Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.toString()), lease1.getCheckpoint()); + Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().toString()), + lease1.getCheckpoint()); } dataFile.delete(); } @@ -495,11 +615,11 @@ public final void testDetermineNewLeasesToCreateStartingPosition() { String shardId1 = "shardId-1"; shards.add(ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); - Set initialPositions = new HashSet(); - initialPositions.add(InitialPositionInStream.LATEST); - initialPositions.add(InitialPositionInStream.TRIM_HORIZON); + Set initialPositions = new HashSet(); + initialPositions.add(INITIAL_POSITION_LATEST); + initialPositions.add(INITIAL_POSITION_TRIM_HORIZON); - for (InitialPositionInStream initialPosition : initialPositions) { + for (InitialPositionInStreamExtended initialPosition : initialPositions) { List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, initialPosition); Assert.assertEquals(2, newLeases.size()); @@ -508,7 +628,8 @@ public final void testDetermineNewLeasesToCreateStartingPosition() { expectedLeaseShardIds.add(shardId1); for (KinesisClientLease lease : newLeases) { Assert.assertTrue(expectedLeaseShardIds.contains(lease.getLeaseKey())); - Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.toString()), lease.getCheckpoint()); + Assert.assertEquals(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().toString()), + lease.getCheckpoint()); } } } @@ -532,7 +653,7 @@ public final void testDetermineNewLeasesToCreateIgnoreClosedShard() { ShardObjectHelper.newSequenceNumberRange("405", null))); List newLeases = - ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST); + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); Assert.assertEquals(1, newLeases.size()); Assert.assertEquals(lastShardId, newLeases.get(0).getLeaseKey()); } @@ -557,7 +678,7 @@ public final void testDetermineNewLeasesToCreateSplitMergeLatest1() { currentLeases.add(newLease("shardId-5")); List newLeases = - ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST); + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -595,7 +716,7 @@ public final void testDetermineNewLeasesToCreateSplitMergeLatest2() { currentLeases.add(newLease("shardId-7")); List newLeases = - ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.LATEST); + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -631,7 +752,7 @@ public final void testDetermineNewLeasesToCreateSplitMergeHorizon1() { currentLeases.add(newLease("shardId-5")); List newLeases = - ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON); + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -671,7 +792,7 @@ public final void testDetermineNewLeasesToCreateSplitMergeHorizon2() { currentLeases.add(newLease("shardId-7")); List newLeases = - ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON); + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); Map expectedShardIdCheckpointMap = new HashMap(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.TRIM_HORIZON); @@ -700,7 +821,7 @@ public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesTrim() { List shards = constructShardListForGraphB(); List currentLeases = new ArrayList(); List newLeases = - ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, InitialPositionInStream.TRIM_HORIZON); + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); Map expectedShardIdCheckpointMap = new HashMap(); for (int i = 0; i < 11; i++) { @@ -716,6 +837,110 @@ public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesTrim() { } } + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (3, 4, 5) + */ + @Test + public final void testDetermineNewLeasesToCreateSplitMergeAtTimestamp1() { + List shards = constructShardListForGraphA(); + List currentLeases = new ArrayList(); + + + currentLeases.add(newLease("shardId-3")); + currentLeases.add(newLease("shardId-4")); + currentLeases.add(newLease("shardId-5")); + + List newLeases = + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); + Map expectedShardIdCheckpointMap = new HashMap(); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + + Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); + for (KinesisClientLease lease : newLeases) { + Assert.assertTrue("Unexpected lease: " + lease, + expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); + Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); + } + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) + * Shard structure (each level depicts a stream segment): + * 0 1 2 3 4 5- shards till epoch 102 + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + * Current leases: (4, 5, 7) + */ + @Test + public final void testDetermineNewLeasesToCreateSplitMergeAtTimestamp2() { + List shards = constructShardListForGraphA(); + List currentLeases = new ArrayList(); + + currentLeases.add(newLease("shardId-4")); + currentLeases.add(newLease("shardId-5")); + currentLeases.add(newLease("shardId-7")); + + List newLeases = + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); + Map expectedShardIdCheckpointMap = new HashMap(); + expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); + expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); + + Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); + for (KinesisClientLease lease : newLeases) { + Assert.assertTrue("Unexpected lease: " + lease, + expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); + Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); + } + } + + /** + * Test CheckIfDescendantAndAddNewLeasesForAncestors (initial position AT_TIMESTAMP) + * For shard graph B (see the construct method doc for structure). + * Current leases: empty set + */ + @Test + public final void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() { + List shards = constructShardListForGraphB(); + List currentLeases = new ArrayList(); + List newLeases = + ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); + Map expectedShardIdCheckpointMap = + new HashMap(); + for (int i = 0; i < shards.size(); i++) { + String expectedShardId = "shardId-" + i; + expectedShardIdCheckpointMap.put(expectedShardId, ExtendedSequenceNumber.AT_TIMESTAMP); + } + + Assert.assertEquals(expectedShardIdCheckpointMap.size(), newLeases.size()); + for (KinesisClientLease lease : newLeases) { + Assert.assertTrue("Unexpected lease: " + lease, + expectedShardIdCheckpointMap.containsKey(lease.getLeaseKey())); + Assert.assertEquals(expectedShardIdCheckpointMap.get(lease.getLeaseKey()), lease.getCheckpoint()); + } + } + + /* * Helper method to construct a shard list for graph A. Graph A is defined below. * Shard structure (y-axis is epochs): @@ -808,8 +1033,7 @@ List constructShardListForGraphB() { @Test public final void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() { Map memoizationContext = new HashMap<>(); - Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, - latestPosition, + Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null, null, @@ -824,8 +1048,7 @@ public final void testCheckIfDescendantAndAddNewLeasesForAncestorsTrimmedShard() String shardId = "shardId-trimmed"; Map kinesisShards = new HashMap(); Map memoizationContext = new HashMap<>(); - Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, - latestPosition, + Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null, kinesisShards, null, @@ -844,8 +1067,7 @@ public final void testCheckIfDescendantAndAddNewLeasesForAncestorsForShardWithCu shardIdsOfCurrentLeases.add(shardId); Map newLeaseMap = new HashMap(); Map memoizationContext = new HashMap<>(); - Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, - latestPosition, + Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, @@ -872,8 +1094,7 @@ public final void testCheckIfDescendantAndAddNewLeasesForAncestors2P2ANotDescend kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null)); Map memoizationContext = new HashMap<>(); - Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, - latestPosition, + Assert.assertFalse(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, @@ -902,8 +1123,7 @@ public final void testCheckIfDescendantAndAddNewLeasesForAncestors2P2A1PDescenda kinesisShards.put(shardId, shard); Map memoizationContext = new HashMap<>(); - Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, - latestPosition, + Assert.assertTrue(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 6b77f8182..a2302ad0f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -42,6 +42,9 @@ */ public class ShutdownTaskTest { private static final long TASK_BACKOFF_TIME_MILLIS = 1L; + private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + Set defaultParentShardIds = new HashSet<>(); String defaultConcurrencyToken = "testToken4398"; String defaultShardId = "shardId-0000397840"; @@ -88,16 +91,15 @@ public final void testCallWhenApplicationDoesNotCheckpoint() { IKinesisProxy kinesisProxy = mock(IKinesisProxy.class); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); boolean cleanupLeasesOfCompletedShards = false; - ShutdownTask task = - new ShutdownTask(defaultShardInfo, - defaultRecordProcessor, - checkpointer, - ShutdownReason.TERMINATE, - kinesisProxy, - InitialPositionInStream.TRIM_HORIZON, - cleanupLeasesOfCompletedShards , - leaseManager, - TASK_BACKOFF_TIME_MILLIS); + ShutdownTask task = new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + leaseManager, + TASK_BACKOFF_TIME_MILLIS); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); @@ -114,16 +116,15 @@ public final void testCallWhenSyncingShardsThrows() { when(kinesisProxy.getShardList()).thenReturn(null); ILeaseManager leaseManager = mock(KinesisClientLeaseManager.class); boolean cleanupLeasesOfCompletedShards = false; - ShutdownTask task = - new ShutdownTask(defaultShardInfo, - defaultRecordProcessor, - checkpointer, - ShutdownReason.TERMINATE, - kinesisProxy, - InitialPositionInStream.TRIM_HORIZON, - cleanupLeasesOfCompletedShards , - leaseManager, - TASK_BACKOFF_TIME_MILLIS); + ShutdownTask task = new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + leaseManager, + TASK_BACKOFF_TIME_MILLIS); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index f4cd9307b..f0b426714 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -29,6 +29,7 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -80,7 +81,6 @@ import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Unit tests of Worker. @@ -101,7 +101,10 @@ public class WorkerTest { private final boolean cleanupLeasesUponShardCompletion = true; // We don't want any of these tests to run checkpoint validation private final boolean skipCheckpointValidationValue = false; - private final InitialPositionInStream initialPositionInStream = InitialPositionInStream.LATEST; + private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST); + private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = @@ -167,9 +170,7 @@ public final void testCreateOrGetShardConsumer() { new StreamConfig(proxy, maxRecords, idleTimeInMilliseconds, - callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, - initialPositionInStream); + callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); final String testConcurrencyToken = "testToken"; final String anotherConcurrencyToken = "anotherTestToken"; final String dummyKinesisShardId = "kinesis-0-0"; @@ -182,9 +183,7 @@ public final void testCreateOrGetShardConsumer() { Worker worker = new Worker(stageName, - streamletFactory, - streamConfig, - InitialPositionInStream.LATEST, + streamletFactory, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -219,9 +218,7 @@ public final void testCleanupShardConsumers() { new StreamConfig(proxy, maxRecords, idleTimeInMilliseconds, - callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, - initialPositionInStream); + callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); final String concurrencyToken = "testToken"; final String anotherConcurrencyToken = "anotherTestToken"; final String dummyKinesisShardId = "kinesis-0-0"; @@ -235,9 +232,7 @@ public final void testCleanupShardConsumers() { Worker worker = new Worker(stageName, - streamletFactory, - streamConfig, - InitialPositionInStream.LATEST, + streamletFactory, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -283,9 +278,7 @@ public final void testInitializationFailureWithRetries() { new StreamConfig(proxy, maxRecords, idleTimeInMilliseconds, - callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, - initialPositionInStream); + callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); @SuppressWarnings("unchecked") ILeaseManager leaseManager = mock(ILeaseManager.class); @@ -295,8 +288,7 @@ public final void testInitializationFailureWithRetries() { Worker worker = new Worker(stageName, recordProcessorFactory, - streamConfig, - InitialPositionInStream.TRIM_HORIZON, + streamConfig, INITIAL_POSITION_TRIM_HORIZON, shardPollInterval, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -621,6 +613,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { /** * Returns executor service that will be owned by the worker. This is useful to test the scenario * where worker shuts down the executor service also during shutdown flow. + * * @return Executor service that will be owned by the worker. */ private WorkerThreadPoolExecutor getWorkerThreadPoolExecutor() { @@ -665,7 +658,7 @@ private void runAndTestWorker(int numShards, int threadPoolSize) throws Exceptio List initialLeases = new ArrayList(); for (Shard shard : shardList) { KinesisClientLease lease = ShardSyncer.newKCLLease(shard); - lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + lease.setCheckpoint(ExtendedSequenceNumber.AT_TIMESTAMP); initialLeases.add(lease); } runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard); @@ -719,7 +712,7 @@ private WorkerThread runWorker(List shardList, final long epsilonMillis = 1000L; final long idleTimeInMilliseconds = 2L; - AmazonDynamoDB ddbClient = DynamoDBEmbedded.create(); + AmazonDynamoDB ddbClient = DynamoDBEmbedded.create().amazonDynamoDB(); LeaseManager leaseManager = new KinesisClientLeaseManager("foo", ddbClient); leaseManager.createLeaseTableIfNotExists(1L, 1L); for (KinesisClientLease initialLease : initialLeases) { @@ -733,19 +726,17 @@ private WorkerThread runWorker(List shardList, epsilonMillis, metricsFactory); - StreamConfig streamConfig = - new StreamConfig(kinesisProxy, - maxRecords, - idleTimeInMilliseconds, - callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, - initialPositionInStream); + final Date timestamp = new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP); + StreamConfig streamConfig = new StreamConfig(kinesisProxy, + maxRecords, + idleTimeInMilliseconds, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); Worker worker = new Worker(stageName, recordProcessorFactory, - streamConfig, - InitialPositionInStream.TRIM_HORIZON, + streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -843,7 +834,8 @@ private void verifyAllRecordsOfEachShardWithOnlyOneProcessorWereConsumedExactlyO findShardIdsAndStreamLetsOfShardsWithOnlyOneProcessor(recordProcessorFactory); for (Shard shard : shardList) { String shardId = shard.getShardId(); - String iterator = fileBasedProxy.getIterator(shardId, ShardIteratorType.TRIM_HORIZON.toString(), null); + String iterator = + fileBasedProxy.getIterator(shardId, new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP)); List expectedRecords = fileBasedProxy.get(iterator, numRecs).getRecords(); if (shardIdsAndStreamLetsOfShardsWithOnlyOneProcessor.containsKey(shardId)) { verifyAllRecordsWereConsumedExactlyOnce(expectedRecords, @@ -859,7 +851,8 @@ private void verifyAllRecordsOfEachShardWereConsumedAtLeastOnce(List shar Map> shardStreamletsRecords) { for (Shard shard : shardList) { String shardId = shard.getShardId(); - String iterator = fileBasedProxy.getIterator(shardId, ShardIteratorType.TRIM_HORIZON.toString(), null); + String iterator = + fileBasedProxy.getIterator(shardId, new Date(KinesisLocalFileDataCreator.STARTING_TIMESTAMP)); List expectedRecords = fileBasedProxy.get(iterator, numRecs).getRecords(); verifyAllRecordsWereConsumedAtLeastOnce(expectedRecords, shardStreamletsRecords.get(shardId)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java index a346b5c6d..db70b5de4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisLocalFileProxy.java @@ -25,6 +25,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -65,7 +66,9 @@ public enum LocalFileFields { /** Partition key associated with data record. */ PARTITION_KEY(2), /** Data. */ - DATA(3); + DATA(3), + /** Approximate arrival timestamp. */ + APPROXIMATE_ARRIVAL_TIMESTAMP(4); private final int position; @@ -149,7 +152,7 @@ private void populateDataRecordsFromFile(String file) throws IOException { String[] strArr = str.split(","); if (strArr.length != NUM_FIELDS_IN_FILE) { throw new InvalidArgumentException("Unexpected input in file." - + "Expected format (shardId, sequenceNumber, partitionKey, dataRecord)"); + + "Expected format (shardId, sequenceNumber, partitionKey, dataRecord, timestamp)"); } String shardId = strArr[LocalFileFields.SHARD_ID.getPosition()]; Record record = new Record(); @@ -157,6 +160,9 @@ private void populateDataRecordsFromFile(String file) throws IOException { record.setPartitionKey(strArr[LocalFileFields.PARTITION_KEY.getPosition()]); ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(strArr[LocalFileFields.DATA.getPosition()])); record.setData(byteBuffer); + Date timestamp = + new Date(Long.parseLong(strArr[LocalFileFields.APPROXIMATE_ARRIVAL_TIMESTAMP.getPosition()])); + record.setApproximateArrivalTimestamp(timestamp); List shardRecords = shardedDataRecords.get(shardId); if (shardRecords == null) { shardRecords = new ArrayList(); @@ -221,11 +227,8 @@ static IteratorInfo deserializeIterator(String iterator) { return new IteratorInfo(splits[0], splits[1]); } - /* - * (non-Javadoc) - * - * @see com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy#getIterator(java.lang.String, - * java.lang.String, java.lang.String) + /** + * {@inheritDoc} */ @Override public String getIterator(String shardId, String iteratorEnum, String sequenceNumber) @@ -262,6 +265,77 @@ public String getIterator(String shardId, String iteratorEnum, String sequenceNu } } + /** + * {@inheritDoc} + */ + @Override + public String getIterator(String shardId, String iteratorEnum) + throws ResourceNotFoundException, InvalidArgumentException { + /* + * If we don't have records in this shard, any iterator will return the empty list. Using a + * sequence number of 1 on an empty shard will give this behavior. + */ + List shardRecords = shardedDataRecords.get(shardId); + if (shardRecords == null) { + throw new ResourceNotFoundException(shardId + " does not exist"); + } + if (shardRecords.isEmpty()) { + return serializeIterator(shardId, "1"); + } + + final String serializedIterator; + if (ShardIteratorType.LATEST.toString().equals(iteratorEnum)) { + /* + * If we do have records, LATEST should return an iterator that can be used to read the + * last record. Our iterators are inclusive for convenience. + */ + Record last = shardRecords.get(shardRecords.size() - 1); + serializedIterator = serializeIterator(shardId, last.getSequenceNumber()); + } else if (ShardIteratorType.TRIM_HORIZON.toString().equals(iteratorEnum)) { + serializedIterator = serializeIterator(shardId, shardRecords.get(0).getSequenceNumber()); + } else { + throw new IllegalArgumentException("IteratorEnum value was invalid: " + iteratorEnum); + } + return serializedIterator; + } + + /** + * {@inheritDoc} + */ + @Override + public String getIterator(String shardId, Date timestamp) + throws ResourceNotFoundException, InvalidArgumentException { + /* + * If we don't have records in this shard, any iterator will return the empty list. Using a + * sequence number of 1 on an empty shard will give this behavior. + */ + List shardRecords = shardedDataRecords.get(shardId); + if (shardRecords == null) { + throw new ResourceNotFoundException(shardId + " does not exist"); + } + if (shardRecords.isEmpty()) { + return serializeIterator(shardId, "1"); + } + + final String serializedIterator; + if (timestamp != null) { + String seqNumAtTimestamp = findSequenceNumberAtTimestamp(shardRecords, timestamp); + serializedIterator = serializeIterator(shardId, seqNumAtTimestamp); + } else { + throw new IllegalArgumentException("Timestamp must be specified for AT_TIMESTAMP iterator"); + } + return serializedIterator; + } + + private String findSequenceNumberAtTimestamp(final List shardRecords, final Date timestamp) { + for (Record rec : shardRecords) { + if (rec.getApproximateArrivalTimestamp().getTime() >= timestamp.getTime()) { + return rec.getSequenceNumber(); + } + } + return null; + } + /* * (non-Javadoc) * diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java index 795f2db94..e5e4419a0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/proxies/util/KinesisLocalFileDataCreator.java @@ -51,6 +51,17 @@ public class KinesisLocalFileDataCreator { private static final int PARTITION_KEY_LENGTH = 10; private static final int DATA_LENGTH = 40; + /** + * Starting timestamp - also referenced in KinesisLocalFileProxyTest. + */ + public static final long STARTING_TIMESTAMP = 1462345678910L; + + /** + * This is used to allow few records to have the same timestamps (to mimic real life scenarios). + * Records 5n-1 and 5n will have the same timestamp (n > 0). + */ + private static final int DIVISOR = 5; + private KinesisLocalFileDataCreator() { } @@ -96,6 +107,7 @@ public static File generateTempDataFile(List shardList, int numRecordsPer fileWriter.write(serializedShardList); fileWriter.newLine(); BigInteger sequenceNumberIncrement = new BigInteger("0"); + long timestamp = STARTING_TIMESTAMP; for (int i = 0; i < numRecordsPerShard; i++) { for (Shard shard : shardList) { BigInteger sequenceNumber = @@ -112,7 +124,12 @@ public static File generateTempDataFile(List shardList, int numRecordsPer String partitionKey = PARTITION_KEY_PREFIX + shard.getShardId() + generateRandomString(PARTITION_KEY_LENGTH); String data = generateRandomString(DATA_LENGTH); - String line = shard.getShardId() + "," + sequenceNumber + "," + partitionKey + "," + data; + + // Allow few records to have the same timestamps (to mimic real life scenarios). + timestamp = (i % DIVISOR == 0) ? timestamp : timestamp + 1; + String line = shard.getShardId() + "," + sequenceNumber + "," + partitionKey + "," + data + "," + + timestamp; + fileWriter.write(line); fileWriter.newLine(); sequenceNumberIncrement = sequenceNumberIncrement.add(BigInteger.ONE);