From 14c68296f032d428326f7777f8b0393b0126258d Mon Sep 17 00:00:00 2001 From: xiaoyu meng Date: Tue, 9 Oct 2018 17:29:59 -0700 Subject: [PATCH] Introducing HierarchicalShardSyncer inorder to run multiple Schedulers in a JVM (#395) * Run multiple instance of scheduler on one JVM * handling creation of shardSyncer in DynamoDBLeaseManagementFactory and LeaseManagementConfig * remove multi-threading unit test and do some small refactorings * refectoring * deprecate ShardSyncer and use HierarchichalShardSyncer instead; change the order for metricsFactory and HierarchichalShardSyncer in ShardConsumerArgument * fix typos and use mock object of shardSyncer * delete improper comments * fix comments * remove duplicated comments --- .../amazon/kinesis/coordinator/Scheduler.java | 11 +- ...ncer.java => HierarchicalShardSyncer.java} | 22 ++-- .../kinesis/leases/LeaseManagementConfig.java | 3 + .../amazon/kinesis/leases/ShardSyncTask.java | 4 +- .../kinesis/leases/ShardSyncTaskManager.java | 61 ++++++++++ .../DynamoDBLeaseManagementFactory.java | 10 +- .../leases/exceptions/ShardSyncer.java | 46 ++++++++ .../kinesis/lifecycle/ConsumerStates.java | 1 + .../lifecycle/ShardConsumerArgument.java | 4 +- .../kinesis/lifecycle/ShutdownTask.java | 8 +- ....java => HierarchicalShardSyncerTest.java} | 108 +++++++++++------- .../leases/ShardSyncTaskIntegrationTest.java | 4 +- .../kinesis/lifecycle/ConsumerStatesTest.java | 6 +- .../kinesis/lifecycle/ShutdownTaskTest.java | 14 ++- 14 files changed, 237 insertions(+), 65 deletions(-) rename amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/{ShardSyncer.java => HierarchicalShardSyncer.java} (98%) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java rename amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/{ShardSyncerTest.java => HierarchicalShardSyncerTest.java} (94%) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 99de1ebda..f8596419e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -47,6 +47,7 @@ import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.lifecycle.LifecycleConfig; @@ -113,6 +114,7 @@ public class Scheduler implements Runnable { private final ShardDetector shardDetector; private final boolean ignoreUnexpetedChildShards; private final AggregatorUtil aggregatorUtil; + private final HierarchicalShardSyncer hierarchicalShardSyncer; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -195,6 +197,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardDetector = this.shardSyncTaskManager.shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); + this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); } /** @@ -239,7 +242,8 @@ private void initialize() { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { log.info("Syncing Kinesis shard info"); ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory); + cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, + metricsFactory); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); @@ -575,8 +579,9 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, shardDetector, - metricsFactory, - aggregatorUtil); + aggregatorUtil, + hierarchicalShardSyncer, + metricsFactory); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java similarity index 98% rename from amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 1e021b6d0..c61bf935c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -30,13 +30,12 @@ import org.apache.commons.lang3.StringUtils; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; @@ -54,24 +53,27 @@ * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it * and begun processing it's child shards. */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) @Slf4j -public class ShardSyncer { +@KinesisClientInternalApi +public class HierarchicalShardSyncer { + /** * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards * (e.g. at startup, or when we reach end of a shard). - * + * + * @param shardDetector * @param leaseRefresher * @param initialPosition * @param cleanupLeasesOfCompletedShards * @param ignoreUnexpectedChildShards + * @param scope * @throws DependencyException * @throws InvalidStateException * @throws ProvisionedThroughputException * @throws KinesisClientLibIOException */ // CHECKSTYLE:OFF CyclomaticComplexity - public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, + public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope) throws DependencyException, InvalidStateException, @@ -152,7 +154,7 @@ private static Set findInconsistentShardIds(final Map shardIdToShardMap, + synchronized void assertClosedShardsAreCoveredOrAbsent(final Map shardIdToShardMap, final Map> shardIdToChildShardIdsMap, final Set shardIdsOfClosedShards) throws KinesisClientLibIOException { final String exceptionMessageSuffix = "This can happen if we constructed the list of shards " @@ -181,7 +183,7 @@ static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map shardIdToShardMap, final Set childShardIds) throws KinesisClientLibIOException { BigInteger minStartingHashKeyOfChildren = null; @@ -583,7 +585,7 @@ static boolean isCandidateForCleanup(final Lease lease, final Set curren * @throws ProvisionedThroughputException * @throws KinesisClientLibIOException */ - private static synchronized void cleanupLeasesOfFinishedShards(final Collection currentLeases, + private synchronized void cleanupLeasesOfFinishedShards(final Collection currentLeases, final Map shardIdToShardMap, final Map> shardIdToChildShardIdsMap, final List trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { @@ -625,7 +627,7 @@ private static synchronized void cleanupLeasesOfFinishedShards(final Collection< * @throws InvalidStateException * @throws DependencyException */ - static synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set childShardIds, + synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set childShardIds, final Map trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Lease leaseForClosedShard = trackedLeases.get(closedShardId); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index d19c583e3..5c98bae68 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -231,6 +231,8 @@ static class LeaseManagementThreadPool extends ThreadPoolExecutor { */ private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK; + private HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer(); + private LeaseManagementFactory leaseManagementFactory; public LeaseManagementFactory leaseManagementFactory() { @@ -258,6 +260,7 @@ public LeaseManagementFactory leaseManagementFactory() { cacheMissWarningModulus(), initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), + hierarchicalShardSyncer(), tableCreatorCallback()); } return leaseManagementFactory; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index dd98206a8..046efdea4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -48,6 +48,8 @@ public class ShardSyncTask implements ConsumerTask { private final boolean ignoreUnexpectedChildShards; private final long shardSyncTaskIdleTimeMillis; @NonNull + private final HierarchicalShardSyncer hierarchicalShardSyncer; + @NonNull private final MetricsFactory metricsFactory; private final TaskType taskType = TaskType.SHARDSYNC; @@ -62,7 +64,7 @@ public TaskResult call() { final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION); try { - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPosition, + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index dcc13dbd6..d97c9b904 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -50,8 +50,68 @@ public class ShardSyncTaskManager { @NonNull private final ExecutorService executorService; @NonNull + private final HierarchicalShardSyncer hierarchicalShardSyncer; + @NonNull private final MetricsFactory metricsFactory; + /** + * Constructor. + * + *

NOTE: This constructor is deprecated and will be removed in a future release.

+ * + * @param shardDetector + * @param leaseRefresher + * @param initialPositionInStream + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIdleTimeMillis + * @param executorService + * @param metricsFactory + */ + @Deprecated + public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion, + boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService, + MetricsFactory metricsFactory) { + this.shardDetector = shardDetector; + this.leaseRefresher = leaseRefresher; + this.initialPositionInStream = initialPositionInStream; + this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; + this.executorService = executorService; + this.hierarchicalShardSyncer = new HierarchicalShardSyncer(); + this.metricsFactory = metricsFactory; + } + + /** + * Constructor. + * + * @param shardDetector + * @param leaseRefresher + * @param initialPositionInStream + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIdleTimeMillis + * @param executorService + * @param hierarchicalShardSyncer + * @param metricsFactory + */ + public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, + InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion, + boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService, + HierarchicalShardSyncer hierarchicalShardSyncer, MetricsFactory metricsFactory) { + this.shardDetector = shardDetector; + this.leaseRefresher = leaseRefresher; + this.initialPositionInStream = initialPositionInStream; + this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; + this.executorService = executorService; + this.hierarchicalShardSyncer = hierarchicalShardSyncer; + this.metricsFactory = metricsFactory; + } + private ConsumerTask currentTask; private Future future; @@ -82,6 +142,7 @@ private synchronized boolean checkAndSubmitNextTask() { cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, + hierarchicalShardSyncer, metricsFactory), metricsFactory); future = executorService.submit(currentTask); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 3fe692b2c..1ec3e0b39 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementFactory; @@ -50,6 +51,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final ExecutorService executorService; @NonNull private final InitialPositionInStreamExtended initialPositionInStream; + @NonNull + private final HierarchicalShardSyncer hierarchicalShardSyncer; + private final long failoverTimeMillis; private final long epsilonMillis; private final int maxLeasesForWorker; @@ -162,7 +166,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); + new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); } /** @@ -191,6 +195,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity + * @param hierarchicalShardSyncer * @param tableCreatorCallback */ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, @@ -203,6 +208,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { this.kinesisClient = kinesisClient; this.streamName = streamName; @@ -227,6 +233,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi this.cacheMissWarningModulus = cacheMissWarningModulus; this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; + this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.tableCreatorCallback = tableCreatorCallback; } @@ -253,6 +260,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac ignoreUnexpectedChildShards, shardSyncIntervalMillis, executorService, + hierarchicalShardSyncer, metricsFactory); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java new file mode 100644 index 000000000..4e9245f62 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -0,0 +1,46 @@ +package software.amazon.kinesis.leases.exceptions; + +import lombok.NonNull; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.ShardDetector; +import software.amazon.kinesis.metrics.MetricsScope; + +/** + * Helper class to sync leases with shards of the Kinesis stream. + * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). + * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it + * and begun processing it's child shards. + * + *

NOTE: This class is deprecated and will be removed in a future release.

+ */ +@Deprecated +public class ShardSyncer { + private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer(); + + /** + *

NOTE: This method is deprecated and will be removed in a future release.

+ * + * @param shardDetector + * @param leaseRefresher + * @param initialPosition + * @param cleanupLeasesOfCompletedShards + * @param ignoreUnexpectedChildShards + * @param scope + * @throws DependencyException + * @throws InvalidStateException + * @throws ProvisionedThroughputException + * @throws KinesisClientLibIOException + */ + @Deprecated + public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, + final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, + final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, + final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, + KinesisClientLibIOException { + HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index d3ce82c2e..ef0a8d75e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -495,6 +495,7 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con argument.leaseRefresher(), argument.taskBackoffTimeMillis(), argument.recordsPublisher(), + argument.hierarchicalShardSyncer(), argument.metricsFactory()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index 4fcda0762..d5ec57feb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -24,6 +24,7 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; @@ -65,7 +66,8 @@ public class ShardConsumerArgument { private final boolean ignoreUnexpectedChildShards; @NonNull private final ShardDetector shardDetector; + private final AggregatorUtil aggregatorUtil; + private final HierarchicalShardSyncer hierarchicalShardSyncer; @NonNull private final MetricsFactory metricsFactory; - private final AggregatorUtil aggregatorUtil; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index a07dc7838..1466dd027 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -25,7 +25,7 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.leases.ShardSyncer; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.metrics.MetricsFactory; @@ -66,6 +66,8 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final RecordsPublisher recordsPublisher; @NonNull + private final HierarchicalShardSyncer hierarchicalShardSyncer; + @NonNull private final MetricsFactory metricsFactory; private final TaskType taskType = TaskType.SHUTDOWN; @@ -123,8 +125,8 @@ public TaskResult call() { if (reason == ShutdownReason.SHARD_END) { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPositionInStream, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, + initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java similarity index 94% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 0a567ab91..78d68b79f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -43,6 +43,7 @@ import java.util.stream.IntStream; import org.apache.commons.lang3.StringUtils; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; @@ -63,7 +64,7 @@ @RunWith(MockitoJUnitRunner.class) // CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES -public class ShardSyncerTest { +public class HierarchicalShardSyncerTest { private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended .newInitialPosition(InitialPositionInStream.LATEST); private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended @@ -76,6 +77,9 @@ public class ShardSyncerTest { private final boolean cleanupLeasesOfCompletedShards = true; private final boolean ignoreUnexpectedChildShards = false; + + private HierarchicalShardSyncer hierarchicalShardSyncer; + /** * Old/Obsolete max value of a sequence number (2^128 -1). */ @@ -86,6 +90,11 @@ public class ShardSyncerTest { @Mock private DynamoDBLeaseRefresher dynamoDBLeaseRefresher; + @Before + public void setup() { + hierarchicalShardSyncer = new HierarchicalShardSyncer(); + } + /** * Test determineNewLeasesToCreate() where there are no shards */ @@ -94,7 +103,7 @@ public void testDetermineNewLeasesToCreateNoShards() { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); - assertThat(ShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(), + assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); } @@ -111,7 +120,7 @@ public void testDetermineNewLeasesToCreate0Leases0Reshards() { ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); final List currentLeases = Collections.emptyList(); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -138,7 +147,7 @@ public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() { final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -151,7 +160,7 @@ public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() { */ @Test public void testBootstrapShardLeasesAtTrimHorizon() throws Exception { - testCheckAndCreateLeasesForNewShards(INITIAL_POSITION_TRIM_HORIZON); + testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_TRIM_HORIZON); } /** @@ -159,11 +168,11 @@ public void testBootstrapShardLeasesAtTrimHorizon() throws Exception { */ @Test public void testBootstrapShardLeasesAtLatest() throws Exception { - testCheckAndCreateLeasesForNewShards(INITIAL_POSITION_LATEST); + testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST); } @Test - public void testCheckAndCreateLeasesForNewShardsAtLatest() throws Exception { + public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception { final List shards = constructShardListForGraphA(); final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); @@ -172,7 +181,8 @@ public void testCheckAndCreateLeasesForNewShardsAtLatest() throws Exception { when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, cleanupLeasesOfCompletedShards, false, SCOPE); final Set expectedShardIds = new HashSet<>( @@ -197,12 +207,12 @@ public void testCheckAndCreateLeasesForNewShardsAtLatest() throws Exception { @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception { - testCheckAndCreateLeaseForNewShards(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON); + testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON); } @Test public void testCheckAndCreateLeasesForNewShardsAtTimestamp() throws Exception { - testCheckAndCreateLeaseForNewShards(constructShardListForGraphA(), INITIAL_POSITION_AT_TIMESTAMP); + testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_AT_TIMESTAMP); } @Test(expected = KinesisClientLibIOException.class) @@ -217,7 +227,7 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpen() throws Except when(shardDetector.listShards()).thenReturn(shards); try { - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE); } finally { verify(shardDetector).listShards(); @@ -251,7 +261,8 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, cleanupLeasesOfCompletedShards, true, SCOPE); final List leases = leaseCaptor.getAllValues(); @@ -307,7 +318,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); // Initial call: No leases present, create leases. - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -322,7 +333,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -382,7 +393,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseEx .when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); // Initial call: Call to create leases. - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -398,7 +409,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseEx try { // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); } finally { List deleteLeases = leaseDeleteCaptor.getAllValues(); @@ -421,7 +433,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseEx verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class)); // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); deleteLeases = leaseDeleteCaptor.getAllValues(); @@ -481,7 +494,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc try { // Initial call: Call to create leases. Fails on ListLeases - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); } finally { verify(shardDetector, times(1)).listShards(); @@ -490,7 +504,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); // Second call: Leases not present, leases will be created. - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -504,7 +519,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); final List deleteLeases = leaseDeleteCaptor.getAllValues(); @@ -569,7 +585,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx try { // Initial call: No leases present, create leases. Create lease Fails - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); } finally { verify(shardDetector, times(1)).listShards(); @@ -577,7 +594,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -591,7 +609,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); final List deleteLeases = leaseDeleteCaptor.getAllValues(); @@ -653,7 +672,7 @@ public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception { when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); @@ -665,7 +684,7 @@ public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception { verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); } - private void testCheckAndCreateLeasesForNewShards(InitialPositionInStreamExtended initialPosition) + private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition) throws Exception { final String shardId0 = "shardId-0"; final String shardId1 = "shardId-1"; @@ -673,10 +692,10 @@ private void testCheckAndCreateLeasesForNewShards(InitialPositionInStreamExtende final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange), ShardObjectHelper.newShard(shardId1, null, null, sequenceRange)); - testCheckAndCreateLeaseForNewShards(shards, initialPosition); + testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition); } - private void testCheckAndCreateLeaseForNewShards(final List shards, + private void testCheckAndCreateLeaseForShardsIfMissing(final List shards, final InitialPositionInStreamExtended initialPosition) throws Exception { final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); @@ -684,7 +703,8 @@ private void testCheckAndCreateLeaseForNewShards(final List shards, when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); - ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, false, SCOPE); final List leases = leaseCaptor.getAllValues(); @@ -720,7 +740,7 @@ public void testDetermineNewLeasesToCreateStartingPosition() { final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); for (InitialPositionInStreamExtended initialPosition : initialPositions) { - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, initialPosition); assertThat(newLeases.size(), equalTo(2)); @@ -743,7 +763,7 @@ public void testDetermineNewLeasesToCreateIgnoreClosedShard() { ShardObjectHelper.newShard(lastShardId, null, null, ShardObjectHelper.newSequenceNumberRange("405", null))); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); assertThat(newLeases.size(), equalTo(1)); @@ -766,7 +786,7 @@ public void testDetermineNewLeasesToCreateSplitMergeLatest1() { final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); final Map expectedShardIdCheckpointMap = new HashMap<>(); @@ -801,7 +821,7 @@ public void testDetermineNewLeasesToCreateSplitMergeLatest2() { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_LATEST); final Map expectedShardIdCheckpointMap = new HashMap<>(); @@ -834,7 +854,7 @@ public void testDetermineNewLeasesToCreateSplitMergeHorizon1() { final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -869,7 +889,7 @@ public void testDetermineNewLeasesToCreateSplitMergeHorizon2() { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -899,7 +919,7 @@ public void testDetermineNewLeasesToCreateGraphBNoInitialLeasesTrim() { final List shards = constructShardListForGraphB(); final List currentLeases = new ArrayList<>(); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_TRIM_HORIZON); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -934,7 +954,7 @@ public void testDetermineNewLeasesToCreateSplitMergeAtTimestamp1() { final List currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"), newLease("shardId-5")); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -968,7 +988,7 @@ public void testDetermineNewLeasesToCreateSplitMergeAtTimestamp2() { final List currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"), newLease("shardId-7")); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -995,7 +1015,7 @@ public void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() { final List shards = constructShardListForGraphB(); final List currentLeases = new ArrayList<>(); - final List newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, INITIAL_POSITION_AT_TIMESTAMP); final Set leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final List checkpoints = newLeases.stream().map(Lease::checkpoint) @@ -1092,7 +1112,8 @@ private List constructShardListForGraphB() { public void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() { final Map memoizationContext = new HashMap<>(); - assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null, + assertThat(HierarchicalShardSyncer + .checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null, null, memoizationContext), equalTo(false)); } @@ -1104,7 +1125,8 @@ public void testCheckIfDescendantAndAddNewLeasesForAncestorsTrimmedShard() { final String shardId = "shardId-trimmed"; final Map memoizationContext = new HashMap<>(); - assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null, + assertThat(HierarchicalShardSyncer + .checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null, new HashMap<>(), null, memoizationContext), equalTo(false)); } @@ -1120,7 +1142,8 @@ public void testCheckIfDescendantAndAddNewLeasesForAncestorsForShardWithCurrentL final Map kinesisShards = new HashMap<>(); kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, null, null, null)); - assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, + assertThat( + HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(true)); assertThat(newLeaseMap.isEmpty(), equalTo(true)); } @@ -1142,7 +1165,8 @@ public void testCheckIfDescendantAndAddNewLeasesForAncestors2P2ANotDescendant() kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null)); kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null)); - assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, + assertThat( + HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(false)); assertThat(newLeaseMap.isEmpty(), equalTo(true)); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java index cffb7154d..a89e8e560 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java @@ -63,6 +63,7 @@ public class ShardSyncTaskIntegrationTest { private LeaseRefresher leaseRefresher; private ShardDetector shardDetector; + private HierarchicalShardSyncer hierarchicalShardSyncer; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -97,6 +98,7 @@ public void setup() { shardDetector = new KinesisShardDetector(kinesisClient, STREAM_NAME, 500L, 50, LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS); + hierarchicalShardSyncer = new HierarchicalShardSyncer(); } /** @@ -117,7 +119,7 @@ public final void testCall() throws DependencyException, InvalidStateException, Set shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet()); ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L, - NULL_METRICS_FACTORY); + hierarchicalShardSyncer, NULL_METRICS_FACTORY); syncTask.call(); List leases = leaseRefresher.listLeases(); Set leaseKeys = new HashSet<>(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 6444d4200..f41d773b8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -46,6 +46,7 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.processor.Checkpointer; @@ -86,6 +87,8 @@ public class ConsumerStatesTest { @Mock private ShardDetector shardDetector; @Mock + private HierarchicalShardSyncer hierarchicalShardSyncer; + @Mock private MetricsFactory metricsFactory; @Mock private ProcessRecordsInput processRecordsInput; @@ -109,7 +112,8 @@ public void setup() { taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, metricsFactory, new AggregatorUtil()); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(), + hierarchicalShardSyncer, metricsFactory); consumer = spy( new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index b17b4ca35..07fb92aef 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -17,6 +17,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -33,6 +34,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -73,6 +75,8 @@ public class ShutdownTaskTest { private LeaseRefresher leaseRefresher; @Mock private ShardDetector shardDetector; + @Mock + private HierarchicalShardSyncer hierarchicalShardSyncer; @Before public void setUp() throws Exception { @@ -86,7 +90,7 @@ public void setUp() throws Exception { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, TERMINATE_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - NULL_METRICS_FACTORY); + hierarchicalShardSyncer, NULL_METRICS_FACTORY); } /** @@ -104,9 +108,15 @@ public final void testCallWhenApplicationDoesNotCheckpoint() { * Test method for {@link ShutdownTask#call()}. */ @Test - public final void testCallWhenSyncingShardsThrows() { + public final void testCallWhenSyncingShardsThrows() throws Exception { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(shardDetector.listShards()).thenReturn(null); + doAnswer((invocation) -> { + throw new KinesisClientLibIOException("KinesisClientLibIOException"); + }).when(hierarchicalShardSyncer) + .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, + NULL_METRICS_FACTORY.createMetrics()); TaskResult result = task.call(); assertNotNull(result.getException());