From c7957d9a86db30bd5cc9593e2908cdd876dca1bb Mon Sep 17 00:00:00 2001 From: Lucien Luc Date: Fri, 19 Apr 2024 12:08:43 -0700 Subject: [PATCH 1/4] Change agedFailoverTimeMultiplier config to doPriorityLeaseTaking --- .../KinesisClientLibConfiguration.java | 7 ++- .../config/MultiLangDaemonConfiguration.java | 2 +- .../MultiLangDaemonConfigurationTest.java | 10 ++-- .../kinesis/leases/LeaseManagementConfig.java | 15 +++--- .../dynamodb/DynamoDBLeaseCoordinator.java | 10 ++-- .../DynamoDBLeaseManagementFactory.java | 12 ++--- .../leases/dynamodb/DynamoDBLeaseTaker.java | 46 +++++++++++-------- .../DynamoDBLeaseCoordinatorTest.java | 4 +- .../dynamodb/DynamoDBLeaseTakerTest.java | 27 +++++++++++ 9 files changed, 84 insertions(+), 49 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index 12a8fc9c8..d1358be47 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -217,7 +217,7 @@ public class KinesisClientLibConfiguration { private AwsCredentialsProvider dynamoDBCredentialsProvider; private AwsCredentialsProvider cloudWatchCredentialsProvider; private long failoverTimeMillis; - private int agedFailoverTimeMultiplier; + private boolean doPriorityLeaseTaking; private String workerIdentifier; private long shardSyncIntervalMillis; private int maxRecords; @@ -960,9 +960,8 @@ public KinesisClientLibConfiguration withFailoverTimeMillis(long failoverTimeMil return this; } - public KinesisClientLibConfiguration withAgedFailoverTimeMultiplier(int agedFailoverTimeMultiplier) { - checkIsValuePositive("AgedFailoverTimeMultiplier", agedFailoverTimeMultiplier); - this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier; + public KinesisClientLibConfiguration withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) { + this.doPriorityLeaseTaking = doPriorityLeaseTaking; return this; } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index dbaa14f9e..b857b6c39 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -87,7 +87,7 @@ public void setWorkerId(String workerId) { @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long failoverTimeMillis; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) - private int agedFailoverTimeMultiplier; + private Boolean doPriorityLeaseTaking; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long shardSyncIntervalMillis; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index 2bbdde9ff..6c419bdc1 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -91,14 +91,14 @@ public void testSetPrimitiveValue() { } @Test - public void testSetAgedFailoverTimeMultiplier() { + public void testSetDoPriorityLeaseTaking() { MultiLangDaemonConfiguration configuration = baseConfiguration(); - configuration.setAgedFailoverTimeMultiplier(5); + configuration.setDoPriorityLeaseTaking(Boolean.FALSE); - MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration - .resolvedConfiguration(shardRecordProcessorFactory); + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration( + shardRecordProcessorFactory); - assertThat(resolvedConfiguration.leaseManagementConfig.agedFailoverTimeMultiplier(), equalTo(5)); + assertThat(resolvedConfiguration.leaseManagementConfig.doPriorityLeaseTaking(), equalTo(false)); } @Test diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 58c9f3820..2c01d4716 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -57,7 +57,7 @@ public class LeaseManagementConfig { public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis(); public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis(); public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; - public static final int DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER = 3; + public static final boolean DEFAULT_DO_PRIORITY_LEASE_TAKING = true; public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3; @@ -104,13 +104,14 @@ public class LeaseManagementConfig { private long failoverTimeMillis = 10000L; /** - * Multiplier for the failoverTimeMillis in which leases which are expired for an extended period of time defined by - * (agedFailoverTimeMultiplier * failoverTimeMillis) are taken with priority, disregarding the target - * but obeying the maximum limit per worker. + * Whether workers should take very expired leases at priority. A very expired lease is when a worker does not + * renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at + * priority for a worker which disregards the target leases for the worker but obeys + * {@link LeaseManagementConfig#maxLeasesForWorker} * - *

Default value: 3

+ *

Default value: true

*/ - private int agedFailoverTimeMultiplier = DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER; + private boolean doPriorityLeaseTaking = DEFAULT_DO_PRIORITY_LEASE_TAKING; /** * Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. @@ -380,7 +381,7 @@ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer lease workerIdentifier(), executorService(), failoverTimeMillis(), - agedFailoverTimeMultiplier(), + doPriorityLeaseTaking(), epsilonMillis(), maxLeasesForWorker(), maxLeasesToStealAtOneTime(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index ef2b236f5..c7b83d822 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -153,7 +153,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, final long initialLeaseTableWriteCapacity, final MetricsFactory metricsFactory) { this(leaseRefresher, workerIdentifier, leaseDurationMillis, - LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker, + LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); @@ -168,8 +168,8 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, * Identifies the worker (e.g. useful to track lease ownership) * @param leaseDurationMillis * Duration of a lease - * @param agedFailoverTimeMultiplier - * Multiplier to determine when leases should be taken at priority + * @param doPriorityLeaseTaking + * Whether to do priority lease taking for very expired leases * @param epsilonMillis * Allow for some variance when calculating lease expirations * @param maxLeasesForWorker @@ -186,7 +186,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, final String workerIdentifier, final long leaseDurationMillis, - final int agedFailoverTimeMultiplier, + final boolean doPriorityLeaseTaking, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, @@ -199,7 +199,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime) - .withVeryOldLeaseDurationNanosMultiplier(agedFailoverTimeMultiplier); + .withDoPriorityLeaseTaking(doPriorityLeaseTaking); this.leaseRenewer = new DynamoDBLeaseRenewer( leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index e447397ba..3d4573d0c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -71,7 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private Function customShardDetectorProvider; private final long failoverTimeMillis; - private final int agedFailoverTimeMultiplier; + private final boolean doPriorityLeaseTaking; private final long epsilonMillis; private final int maxLeasesForWorker; private final int maxLeasesToStealAtOneTime; @@ -563,7 +563,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, LeaseCleanupConfig leaseCleanupConfig) { this(kinesisClient, dynamoDBClient, tableName, workerIdentifier, executorService, failoverTimeMillis, - LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker, + LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -581,7 +581,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, * @param workerIdentifier * @param executorService * @param failoverTimeMillis - * @param agedFailoverTimeMultiplier + * @param doPriorityLeaseTaking * @param epsilonMillis * @param maxLeasesForWorker * @param maxLeasesToStealAtOneTime @@ -610,7 +610,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final long failoverTimeMillis, - final int agedFailoverTimeMultiplier, final long epsilonMillis, + final boolean doPriorityLeaseTaking, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -628,7 +628,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, this.workerIdentifier = workerIdentifier; this.executorService = executorService; this.failoverTimeMillis = failoverTimeMillis; - this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier; + this.doPriorityLeaseTaking = doPriorityLeaseTaking; this.epsilonMillis = epsilonMillis; this.maxLeasesForWorker = maxLeasesForWorker; this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; @@ -661,7 +661,7 @@ public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory met return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(), workerIdentifier, failoverTimeMillis, - agedFailoverTimeMultiplier, + doPriorityLeaseTaking, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index e77767387..76717ae40 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -69,7 +69,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; - + private boolean doPriorityLeaseTaking = true; private int veryOldLeaseDurationNanosMultiplier = 3; private long lastScanTimeNanos = 0L; @@ -124,6 +124,11 @@ public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultiplier(int veryOldLea return this; } + public DynamoDBLeaseTaker withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) { + this.doPriorityLeaseTaking = doPriorityLeaseTaking; + return this; + } + /** * Max leases to steal from a more loaded Worker at one time (for load balancing). * Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), @@ -441,25 +446,28 @@ Set computeLeasesToTake(List expiredLeases, Callable timePro // If there are leases that have been expired for an extended period of // time, take them with priority, disregarding the target (computed // later) but obeying the maximum limit per worker. - long currentNanoTime; - try { - currentNanoTime = timeProvider.call(); - } catch (Exception e) { - throw new DependencyException("Exception caught from timeProvider", e); - } - final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos); - final List veryOldLeases = allLeases.values().stream() - .filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos()) - .collect(Collectors.toList()); - - if (!veryOldLeases.isEmpty()) { - Collections.shuffle(veryOldLeases); - veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size())); - HashSet result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount)); - if (veryOldLeaseCount > 0) { - log.info("Taking leases that have been expired for a long time: {}", result); + if (doPriorityLeaseTaking) { + long currentNanoTime; + try { + currentNanoTime = timeProvider.call(); + } catch (Exception e) { + throw new DependencyException("Exception caught from timeProvider", e); + } + final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos); + final List veryOldLeases = allLeases.values() + .stream() + .filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos()) + .collect(Collectors.toList()); + + if (!veryOldLeases.isEmpty()) { + Collections.shuffle(veryOldLeases); + veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size())); + HashSet result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount)); + if (veryOldLeaseCount > 0) { + log.info("Taking leases that have been expired for a long time: {}", result); + } + return result; } - return result; } if (numLeasesToReachTarget <= 0) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java index 77b3666c5..97914bff8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java @@ -19,7 +19,7 @@ public class DynamoDBLeaseCoordinatorTest { private static final String WORKER_ID = UUID.randomUUID().toString(); - private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5; + private static final boolean DO_PRIORITY_LEASE_TAKING = true; private static final long LEASE_DURATION_MILLIS = 5000L; private static final long EPSILON_MILLIS = 25L; private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; @@ -40,7 +40,7 @@ public class DynamoDBLeaseCoordinatorTest { @Before public void setup() { this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, - VERY_OLD_LEASE_DURATION_MULTIPLIER, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, + DO_PRIORITY_LEASE_TAKING, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index e3a918ffa..bf62a50e1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -48,6 +48,7 @@ public class DynamoDBLeaseTakerTest { private static final String WORKER_IDENTIFIER = "foo"; private static final long LEASE_DURATION_MILLIS = 1000L; + private static final int DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER = 3; private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5; private static final long MOCK_CURRENT_TIME = 10000000000L; @@ -151,6 +152,32 @@ public void test_veryOldLeaseDurationNanosMultiplierGetsCorrectLeases() throws E assertEquals(expectedOutput, output); } + @Test + public void test_disableDoPriorityLeaseTakingGetsCorrectLeases() throws Exception { + long veryOldThreshold = MOCK_CURRENT_TIME - + (TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER); + DynamoDBLeaseTaker dynamoDBLeaseTakerWithCustomMultiplier = + new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory) + .withDoPriorityLeaseTaking(false); + final List allLeases = new ArrayList<>(); + allLeases.add(createLease("foo", "2", MOCK_CURRENT_TIME)); + allLeases.add(createLease("bar", "3", veryOldThreshold - 1)); + allLeases.add(createLease("baz", "4", veryOldThreshold + 1)); + final List expiredLeases = allLeases.subList(1, 3); + + dynamoDBLeaseTakerWithCustomMultiplier.allLeases.putAll( + allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity()))); + when(leaseRefresher.listLeases()).thenReturn(allLeases); + when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); + when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME); + + Set output = dynamoDBLeaseTakerWithCustomMultiplier.computeLeasesToTake(expiredLeases, timeProvider); + final Set expectedOutput = new HashSet<>(); + expectedOutput.add(createLease("bar", "3", veryOldThreshold - 1)); + expectedOutput.add(createLease("baz", "4", veryOldThreshold + 1)); + assertEquals(expectedOutput, output); + } + private Lease createLease(String leaseOwner, String leaseKey) { final Lease lease = new Lease(); lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); From 99794ab2ab06e4b3707f842322733b2fae91d4ef Mon Sep 17 00:00:00 2001 From: Lucien Luc Date: Fri, 19 Apr 2024 15:25:03 -0700 Subject: [PATCH 2/4] Minor comment updates. Update test. Rename config variable. --- .../KinesisClientLibConfiguration.java | 6 ++-- .../config/MultiLangDaemonConfiguration.java | 2 +- .../MultiLangDaemonConfigurationTest.java | 6 ++-- .../kinesis/leases/LeaseManagementConfig.java | 9 +++--- .../dynamodb/DynamoDBLeaseCoordinator.java | 10 +++---- .../DynamoDBLeaseManagementFactory.java | 13 ++++----- .../leases/dynamodb/DynamoDBLeaseTaker.java | 8 +++--- .../dynamodb/DynamoDBLeaseTakerTest.java | 28 +++++++++++-------- 8 files changed, 43 insertions(+), 39 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index d1358be47..d8d9068db 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -217,7 +217,7 @@ public class KinesisClientLibConfiguration { private AwsCredentialsProvider dynamoDBCredentialsProvider; private AwsCredentialsProvider cloudWatchCredentialsProvider; private long failoverTimeMillis; - private boolean doPriorityLeaseTaking; + private boolean enablePriorityLeaseAssignment; private String workerIdentifier; private long shardSyncIntervalMillis; private int maxRecords; @@ -960,8 +960,8 @@ public KinesisClientLibConfiguration withFailoverTimeMillis(long failoverTimeMil return this; } - public KinesisClientLibConfiguration withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) { - this.doPriorityLeaseTaking = doPriorityLeaseTaking; + public KinesisClientLibConfiguration withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) { + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; return this; } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index b857b6c39..8b6bc5e64 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -87,7 +87,7 @@ public void setWorkerId(String workerId) { @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long failoverTimeMillis; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) - private Boolean doPriorityLeaseTaking; + private Boolean enablePriorityLeaseAssignment; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long shardSyncIntervalMillis; @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index 6c419bdc1..da18e6590 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -91,14 +91,14 @@ public void testSetPrimitiveValue() { } @Test - public void testSetDoPriorityLeaseTaking() { + public void testSetEnablePriorityLeaseAssignment() { MultiLangDaemonConfiguration configuration = baseConfiguration(); - configuration.setDoPriorityLeaseTaking(Boolean.FALSE); + configuration.setEnablePriorityLeaseAssignment(false); MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration( shardRecordProcessorFactory); - assertThat(resolvedConfiguration.leaseManagementConfig.doPriorityLeaseTaking(), equalTo(false)); + assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false)); } @Test diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 2c01d4716..aef4d87e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -57,7 +57,7 @@ public class LeaseManagementConfig { public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis(); public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis(); public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; - public static final boolean DEFAULT_DO_PRIORITY_LEASE_TAKING = true; + public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true; public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3; @@ -107,11 +107,12 @@ public class LeaseManagementConfig { * Whether workers should take very expired leases at priority. A very expired lease is when a worker does not * renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at * priority for a worker which disregards the target leases for the worker but obeys - * {@link LeaseManagementConfig#maxLeasesForWorker} + * {@link LeaseManagementConfig#maxLeasesForWorker}. New leases for new shards due to shard mutation are + * considered to be very expired and taken with priority. * *

Default value: true

*/ - private boolean doPriorityLeaseTaking = DEFAULT_DO_PRIORITY_LEASE_TAKING; + private boolean enablePriorityLeaseAssignment = DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT; /** * Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. @@ -381,7 +382,7 @@ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer lease workerIdentifier(), executorService(), failoverTimeMillis(), - doPriorityLeaseTaking(), + enablePriorityLeaseAssignment(), epsilonMillis(), maxLeasesForWorker(), maxLeasesToStealAtOneTime(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index c7b83d822..647f3c357 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -153,7 +153,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, final long initialLeaseTableWriteCapacity, final MetricsFactory metricsFactory) { this(leaseRefresher, workerIdentifier, leaseDurationMillis, - LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker, + LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); @@ -168,8 +168,8 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, * Identifies the worker (e.g. useful to track lease ownership) * @param leaseDurationMillis * Duration of a lease - * @param doPriorityLeaseTaking - * Whether to do priority lease taking for very expired leases + * @param enablePriorityLeaseAssignment + * Whether to do priority lease assignment for very expired leases * @param epsilonMillis * Allow for some variance when calculating lease expirations * @param maxLeasesForWorker @@ -186,7 +186,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, final String workerIdentifier, final long leaseDurationMillis, - final boolean doPriorityLeaseTaking, + final boolean enablePriorityLeaseAssignment, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, @@ -199,7 +199,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) .withMaxLeasesForWorker(maxLeasesForWorker) .withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime) - .withDoPriorityLeaseTaking(doPriorityLeaseTaking); + .withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment); this.leaseRenewer = new DynamoDBLeaseRenewer( leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 3d4573d0c..7d9ebeefa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -71,7 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private Function customShardDetectorProvider; private final long failoverTimeMillis; - private final boolean doPriorityLeaseTaking; + private final boolean enablePriorityLeaseAssignment; private final long epsilonMillis; private final int maxLeasesForWorker; private final int maxLeasesToStealAtOneTime; @@ -563,7 +563,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, LeaseCleanupConfig leaseCleanupConfig) { this(kinesisClient, dynamoDBClient, tableName, workerIdentifier, executorService, failoverTimeMillis, - LeaseManagementConfig.DEFAULT_DO_PRIORITY_LEASE_TAKING, epsilonMillis, maxLeasesForWorker, + LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -581,7 +581,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, * @param workerIdentifier * @param executorService * @param failoverTimeMillis - * @param doPriorityLeaseTaking + * @param enablePriorityLeaseAssignment * @param epsilonMillis * @param maxLeasesForWorker * @param maxLeasesToStealAtOneTime @@ -610,7 +610,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final long failoverTimeMillis, - final boolean doPriorityLeaseTaking, final long epsilonMillis, + final boolean enablePriorityLeaseAssignment, final long epsilonMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -628,7 +628,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, this.workerIdentifier = workerIdentifier; this.executorService = executorService; this.failoverTimeMillis = failoverTimeMillis; - this.doPriorityLeaseTaking = doPriorityLeaseTaking; + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; this.epsilonMillis = epsilonMillis; this.maxLeasesForWorker = maxLeasesForWorker; this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; @@ -660,8 +660,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) { return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(), workerIdentifier, - failoverTimeMillis, - doPriorityLeaseTaking, + failoverTimeMillis, enablePriorityLeaseAssignment, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 76717ae40..7020a94b1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -69,7 +69,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { // TODO: Remove these defaults and use the defaults in the config private int maxLeasesForWorker = Integer.MAX_VALUE; private int maxLeasesToStealAtOneTime = 1; - private boolean doPriorityLeaseTaking = true; + private boolean enablePriorityLeaseAssignment = true; private int veryOldLeaseDurationNanosMultiplier = 3; private long lastScanTimeNanos = 0L; @@ -124,8 +124,8 @@ public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultiplier(int veryOldLea return this; } - public DynamoDBLeaseTaker withDoPriorityLeaseTaking(boolean doPriorityLeaseTaking) { - this.doPriorityLeaseTaking = doPriorityLeaseTaking; + public DynamoDBLeaseTaker withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) { + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; return this; } @@ -446,7 +446,7 @@ Set computeLeasesToTake(List expiredLeases, Callable timePro // If there are leases that have been expired for an extended period of // time, take them with priority, disregarding the target (computed // later) but obeying the maximum limit per worker. - if (doPriorityLeaseTaking) { + if (enablePriorityLeaseAssignment) { long currentNanoTime; try { currentNanoTime = timeProvider.call(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index bf62a50e1..1700460f1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -153,28 +153,32 @@ public void test_veryOldLeaseDurationNanosMultiplierGetsCorrectLeases() throws E } @Test - public void test_disableDoPriorityLeaseTakingGetsCorrectLeases() throws Exception { + public void test_disableEnablePriorityLeaseAssignmentGetsCorrectLeases() throws Exception { long veryOldThreshold = MOCK_CURRENT_TIME - (TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER); - DynamoDBLeaseTaker dynamoDBLeaseTakerWithCustomMultiplier = + DynamoDBLeaseTaker dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment = new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory) - .withDoPriorityLeaseTaking(false); + .withEnablePriorityLeaseAssignment(false); final List allLeases = new ArrayList<>(); - allLeases.add(createLease("foo", "2", MOCK_CURRENT_TIME)); - allLeases.add(createLease("bar", "3", veryOldThreshold - 1)); - allLeases.add(createLease("baz", "4", veryOldThreshold + 1)); - final List expiredLeases = allLeases.subList(1, 3); - - dynamoDBLeaseTakerWithCustomMultiplier.allLeases.putAll( + allLeases.add(createLease("bar", "2", MOCK_CURRENT_TIME)); + allLeases.add(createLease("bar", "3", MOCK_CURRENT_TIME)); + allLeases.add(createLease("bar", "4", MOCK_CURRENT_TIME)); + allLeases.add(createLease("baz", "5", veryOldThreshold - 1)); + allLeases.add(createLease("baz", "6", veryOldThreshold + 1)); + allLeases.add(createLease(null, "7")); + final List expiredLeases = allLeases.subList(3, 6); + + dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.allLeases.putAll( allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity()))); when(leaseRefresher.listLeases()).thenReturn(allLeases); when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME); - Set output = dynamoDBLeaseTakerWithCustomMultiplier.computeLeasesToTake(expiredLeases, timeProvider); + Set output = dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.computeLeasesToTake(expiredLeases, timeProvider); final Set expectedOutput = new HashSet<>(); - expectedOutput.add(createLease("bar", "3", veryOldThreshold - 1)); - expectedOutput.add(createLease("baz", "4", veryOldThreshold + 1)); + expectedOutput.add(createLease("baz", "5", veryOldThreshold - 1)); + expectedOutput.add(createLease("baz", "6", veryOldThreshold + 1)); + expectedOutput.add(createLease(null, "7")); assertEquals(expectedOutput, output); } From 917dec6334b94f4bc4acf32754eade573b10d5d8 Mon Sep 17 00:00:00 2001 From: Lucien Luc Date: Fri, 19 Apr 2024 15:26:48 -0700 Subject: [PATCH 3/4] Rename one variable --- .../kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java index 97914bff8..7347b4cb4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java @@ -19,7 +19,7 @@ public class DynamoDBLeaseCoordinatorTest { private static final String WORKER_ID = UUID.randomUUID().toString(); - private static final boolean DO_PRIORITY_LEASE_TAKING = true; + private static final boolean ENABLE_PRIORITY_LEASE_ASSIGNMENT = true; private static final long LEASE_DURATION_MILLIS = 5000L; private static final long EPSILON_MILLIS = 25L; private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; @@ -40,7 +40,7 @@ public class DynamoDBLeaseCoordinatorTest { @Before public void setup() { this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, - DO_PRIORITY_LEASE_TAKING, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, + ENABLE_PRIORITY_LEASE_ASSIGNMENT, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); } From 1963438f3d56df3603cf38f9bd14844c32c01823 Mon Sep 17 00:00:00 2001 From: Lucien Luc Date: Fri, 19 Apr 2024 15:29:07 -0700 Subject: [PATCH 4/4] Fix comment --- .../kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 647f3c357..6c0803f21 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -169,7 +169,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, * @param leaseDurationMillis * Duration of a lease * @param enablePriorityLeaseAssignment - * Whether to do priority lease assignment for very expired leases + * Whether to enable priority lease assignment for very expired leases * @param epsilonMillis * Allow for some variance when calculating lease expirations * @param maxLeasesForWorker