Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change agedFailoverTimeMultiplier config to enablePriorityLeaseAssignment #1317

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public class KinesisClientLibConfiguration {
private AwsCredentialsProvider dynamoDBCredentialsProvider;
private AwsCredentialsProvider cloudWatchCredentialsProvider;
private long failoverTimeMillis;
private int agedFailoverTimeMultiplier;
private boolean enablePriorityLeaseAssignment;
private String workerIdentifier;
private long shardSyncIntervalMillis;
private int maxRecords;
Expand Down Expand Up @@ -960,9 +960,8 @@ public KinesisClientLibConfiguration withFailoverTimeMillis(long failoverTimeMil
return this;
}

public KinesisClientLibConfiguration withAgedFailoverTimeMultiplier(int agedFailoverTimeMultiplier) {
checkIsValuePositive("AgedFailoverTimeMultiplier", agedFailoverTimeMultiplier);
this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
public KinesisClientLibConfiguration withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) {
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void setWorkerId(String workerId) {
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long failoverTimeMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private int agedFailoverTimeMultiplier;
private Boolean enablePriorityLeaseAssignment;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long shardSyncIntervalMillis;
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ public void testSetPrimitiveValue() {
}

@Test
public void testSetAgedFailoverTimeMultiplier() {
public void testSetEnablePriorityLeaseAssignment() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setAgedFailoverTimeMultiplier(5);
configuration.setEnablePriorityLeaseAssignment(false);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration
.resolvedConfiguration(shardRecordProcessorFactory);
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration(
shardRecordProcessorFactory);

assertThat(resolvedConfiguration.leaseManagementConfig.agedFailoverTimeMultiplier(), equalTo(5));
assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class LeaseManagementConfig {
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
public static final int DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER = 3;
public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;


Expand Down Expand Up @@ -104,13 +104,15 @@ public class LeaseManagementConfig {
private long failoverTimeMillis = 10000L;

/**
* Multiplier for the failoverTimeMillis in which leases which are expired for an extended period of time defined by
* (agedFailoverTimeMultiplier * failoverTimeMillis) are taken with priority, disregarding the target
* but obeying the maximum limit per worker.
* Whether workers should take very expired leases at priority. A very expired lease is when a worker does not
* renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also mentioned newly created leases due to shard mutation that are not assigned to any worker are also considered very-expired ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

* priority for a worker which disregards the target leases for the worker but obeys
* {@link LeaseManagementConfig#maxLeasesForWorker}. New leases for new shards due to shard mutation are
* considered to be very expired and taken with priority.
*
* <p>Default value: 3 </p>
* <p>Default value: true </p>
*/
private int agedFailoverTimeMultiplier = DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER;
private boolean enablePriorityLeaseAssignment = DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT;

/**
* Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
Expand Down Expand Up @@ -380,7 +382,7 @@ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer lease
workerIdentifier(),
executorService(),
failoverTimeMillis(),
agedFailoverTimeMultiplier(),
enablePriorityLeaseAssignment(),
epsilonMillis(),
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory) {
this(leaseRefresher, workerIdentifier, leaseDurationMillis,
LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
Expand All @@ -168,8 +168,8 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
* Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis
* Duration of a lease
* @param agedFailoverTimeMultiplier
* Multiplier to determine when leases should be taken at priority
* @param enablePriorityLeaseAssignment
* Whether to enable priority lease assignment for very expired leases
* @param epsilonMillis
* Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker
Expand All @@ -186,7 +186,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
final int agedFailoverTimeMultiplier,
final boolean enablePriorityLeaseAssignment,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
Expand All @@ -199,7 +199,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime)
.withVeryOldLeaseDurationNanosMultiplier(agedFailoverTimeMultiplier);
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
this.leaseRenewer = new DynamoDBLeaseRenewer(
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private Function<StreamConfig, ShardDetector> customShardDetectorProvider;

private final long failoverTimeMillis;
private final int agedFailoverTimeMultiplier;
private final boolean enablePriorityLeaseAssignment;
private final long epsilonMillis;
private final int maxLeasesForWorker;
private final int maxLeasesToStealAtOneTime;
Expand Down Expand Up @@ -563,7 +563,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
LeaseCleanupConfig leaseCleanupConfig) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis,
LeaseManagementConfig.DEFAULT_AGED_FAILOVER_TIME_MULTIPLIER, epsilonMillis, maxLeasesForWorker,
LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
Expand All @@ -581,7 +581,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param agedFailoverTimeMultiplier
* @param enablePriorityLeaseAssignment
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
Expand Down Expand Up @@ -610,7 +610,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis,
final int agedFailoverTimeMultiplier, final long epsilonMillis,
final boolean enablePriorityLeaseAssignment, final long epsilonMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
Expand All @@ -628,7 +628,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
this.agedFailoverTimeMultiplier = agedFailoverTimeMultiplier;
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
Expand Down Expand Up @@ -660,8 +660,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) {
return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(),
workerIdentifier,
failoverTimeMillis,
agedFailoverTimeMultiplier,
failoverTimeMillis, enablePriorityLeaseAssignment,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// TODO: Remove these defaults and use the defaults in the config
private int maxLeasesForWorker = Integer.MAX_VALUE;
private int maxLeasesToStealAtOneTime = 1;

private boolean enablePriorityLeaseAssignment = true;
private int veryOldLeaseDurationNanosMultiplier = 3;
private long lastScanTimeNanos = 0L;

Expand Down Expand Up @@ -124,6 +124,11 @@ public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultiplier(int veryOldLea
return this;
}

public DynamoDBLeaseTaker withEnablePriorityLeaseAssignment(boolean enablePriorityLeaseAssignment) {
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
return this;
}

/**
* Max leases to steal from a more loaded Worker at one time (for load balancing).
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
Expand Down Expand Up @@ -441,25 +446,28 @@ Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timePro
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
long currentNanoTime;
try {
currentNanoTime = timeProvider.call();
} catch (Exception e) {
throw new DependencyException("Exception caught from timeProvider", e);
}
final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
final List<Lease> veryOldLeases = allLeases.values().stream()
.filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos())
.collect(Collectors.toList());

if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases);
veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
HashSet<Lease> result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount));
if (veryOldLeaseCount > 0) {
log.info("Taking leases that have been expired for a long time: {}", result);
if (enablePriorityLeaseAssignment) {
long currentNanoTime;
try {
currentNanoTime = timeProvider.call();
} catch (Exception e) {
throw new DependencyException("Exception caught from timeProvider", e);
}
final long nanoThreshold = currentNanoTime - (veryOldLeaseDurationNanosMultiplier * leaseDurationNanos);
final List<Lease> veryOldLeases = allLeases.values()
.stream()
.filter(lease -> nanoThreshold > lease.lastCounterIncrementNanos())
.collect(Collectors.toList());

if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases);
veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
HashSet<Lease> result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount));
if (veryOldLeaseCount > 0) {
log.info("Taking leases that have been expired for a long time: {}", result);
}
return result;
}
return result;
}

if (numLeasesToReachTarget <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class DynamoDBLeaseCoordinatorTest {

private static final String WORKER_ID = UUID.randomUUID().toString();
private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
private static final boolean ENABLE_PRIORITY_LEASE_ASSIGNMENT = true;
private static final long LEASE_DURATION_MILLIS = 5000L;
private static final long EPSILON_MILLIS = 25L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
Expand All @@ -40,7 +40,7 @@ public class DynamoDBLeaseCoordinatorTest {
@Before
public void setup() {
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
VERY_OLD_LEASE_DURATION_MULTIPLIER, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER,
ENABLE_PRIORITY_LEASE_ASSIGNMENT, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER,
MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class DynamoDBLeaseTakerTest {

private static final String WORKER_IDENTIFIER = "foo";
private static final long LEASE_DURATION_MILLIS = 1000L;
private static final int DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER = 3;
private static final int VERY_OLD_LEASE_DURATION_MULTIPLIER = 5;
private static final long MOCK_CURRENT_TIME = 10000000000L;

Expand Down Expand Up @@ -151,6 +152,36 @@ public void test_veryOldLeaseDurationNanosMultiplierGetsCorrectLeases() throws E
assertEquals(expectedOutput, output);
}

@Test
public void test_disableEnablePriorityLeaseAssignmentGetsCorrectLeases() throws Exception {
long veryOldThreshold = MOCK_CURRENT_TIME -
(TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS) * DEFAULT_VERY_OLD_LEASE_DURATION_MULTIPLIER);
DynamoDBLeaseTaker dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment =
new DynamoDBLeaseTaker(leaseRefresher, WORKER_IDENTIFIER, LEASE_DURATION_MILLIS, metricsFactory)
.withEnablePriorityLeaseAssignment(false);
final List<Lease> allLeases = new ArrayList<>();
allLeases.add(createLease("bar", "2", MOCK_CURRENT_TIME));
allLeases.add(createLease("bar", "3", MOCK_CURRENT_TIME));
allLeases.add(createLease("bar", "4", MOCK_CURRENT_TIME));
allLeases.add(createLease("baz", "5", veryOldThreshold - 1));
allLeases.add(createLease("baz", "6", veryOldThreshold + 1));
allLeases.add(createLease(null, "7"));
final List<Lease> expiredLeases = allLeases.subList(3, 6);

dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.allLeases.putAll(
allLeases.stream().collect(Collectors.toMap(Lease::leaseKey, Function.identity())));
when(leaseRefresher.listLeases()).thenReturn(allLeases);
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(MOCK_CURRENT_TIME);

Set<Lease> output = dynamoDBLeaseTakerWithDisabledPriorityLeaseAssignment.computeLeasesToTake(expiredLeases, timeProvider);
final Set<Lease> expectedOutput = new HashSet<>();
expectedOutput.add(createLease("baz", "5", veryOldThreshold - 1));
expectedOutput.add(createLease("baz", "6", veryOldThreshold + 1));
expectedOutput.add(createLease(null, "7"));
assertEquals(expectedOutput, output);
}

private Lease createLease(String leaseOwner, String leaseKey) {
final Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
Expand Down