Skip to content

Commit

Permalink
Treat unassigned leases (as well as expired ones) as available-to-be-…
Browse files Browse the repository at this point in the history
…taken

If a lease is 'unassigned' (it has no lease owner) then it should be
considered available for taking in `DynamoDBLeaseTaker`. Prior to this
change, the only ways `DynamoDBLeaseTaker` could take leases for a
scheduler was either by incremental lease stealing, or waiting for the
lease to expire by not having been updated in `failoverTimeMillis` - which
could be slow if `failoverTimeMillis` was set reasonably high (with it set
to just 30s, I've seen new instances take over 3 minutes to take all leases
from old instances in a deployment).

This would be one half of the a fix for
#845 - the other
half of the fix is invoking `evictLease()` (setting the lease owner to
null) on graceful shutdown of a scheduler.
  • Loading branch information
rtyley committed Jan 18, 2022
1 parent fda91d9 commit f9d5077
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) {
}
}

/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
* @return true if lease is available (unassigned or expired as-of given time), false otherwise
*/
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) {
return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos);
}

/**
* Sets lastCounterIncrementNanos
*
Expand Down Expand Up @@ -310,6 +319,10 @@ public void leaseOwner(String leaseOwner) {
this.leaseOwner = leaseOwner;
}

public boolean isUnassigned() {
return leaseOwner == null;
}

/**
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
Expand All @@ -44,6 +45,10 @@

import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.summingInt;
import static java.util.stream.Collectors.toList;

/**
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
*/
Expand Down Expand Up @@ -191,9 +196,7 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
return takenLeases;
}

List<Lease> expiredLeases = getExpiredLeases();

Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
Set<Lease> leasesToTake = computeLeasesToTake(getAvailableLeases());
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);

Set<String> untakenLeaseKeys = new HashSet<>();
Expand Down Expand Up @@ -357,50 +360,39 @@ private void updateAllLeases(Callable<Long> timeProvider)
}

/**
* @return list of leases that were expired as of our last scan.
* @return list of leases that were available (unassigned or expired) as of our last scan.
*/
private List<Lease> getExpiredLeases() {
List<Lease> expiredLeases = new ArrayList<>();

for (Lease lease : allLeases.values()) {
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
expiredLeases.add(lease);
}
}

return expiredLeases;
private List<Lease> getAvailableLeases() {
return allLeases.values().stream().filter(
lease -> lease.isAvailable(leaseDurationNanos, lastScanTimeNanos)
).collect(toList());
}

/**
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
* @return set of leases to take.
* @param availableLeases list of leases we determined to be available (unassigned or expired) * @return set of leases to take.
*/
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
private Set<Lease> computeLeasesToTake(List<Lease> availableLeases) {
Map<String, Integer> leaseCountPerWorker = computeActiveLeaseCountsByWorker(availableLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>();

final int numAvailableLeases = expiredLeases.size();
int numLeases = 0;
int numWorkers = 0;
final int numAvailableLeases = availableLeases.size();
final int numLeases = allLeases.size();
final int numWorkers = leaseCountPerWorker.size();
int numLeasesToReachTarget = 0;
int leaseSpillover = 0;
int veryOldLeaseCount = 0;

try {
numLeases = allLeases.size();
numWorkers = leaseCounts.size();

if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any.
return leasesToTake;
}


int target;
if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
Expand All @@ -426,17 +418,17 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
}
}

int myCount = leaseCounts.get(workerIdentifier);
int myCount = leaseCountPerWorker.get(workerIdentifier);
numLeasesToReachTarget = target - myCount;

int currentLeaseCount = leaseCounts.get(workerIdentifier);
int currentLeaseCount = leaseCountPerWorker.get(workerIdentifier);
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
veryOldLeases = allLeases.values().stream()
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
.collect(Collectors.toList());
.collect(toList());

if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases);
Expand All @@ -453,17 +445,17 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
return leasesToTake;
}

// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
// Shuffle availableLeases so workers don't all try to contend for the same leases.
Collections.shuffle(availableLeases);

if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
if (availableLeases.size() > 0) {
// If we have available leases, get up to <needed> leases from availableLeases
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(availableLeases.remove(0));
}
} else {
// If there are no expired leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCountPerWorker, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
Expand All @@ -481,7 +473,7 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
}

} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
Expand Down Expand Up @@ -574,36 +566,28 @@ private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int ne
}

/**
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
* leases.
* Count leases by host, excluding all leases considered 'available to be taken' (ie unassigned or expired)
* using the supplied list. Always includes myself, but otherwise only includes hosts that are actively
* updating leases. There will be no entry for unassigned leases - ie the returned map will only have
* entries for non-null worker ids - so a good active-worker count is given by the size of the map.
*
* @param expiredLeases list of leases that are currently expired
* @param excludingAvailableLeases list of 'available' (unassigned/expired) leases to exclude from the count
* @return map of workerIdentifier to lease count
*/
@VisibleForTesting
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = new HashMap<>();
Map<String, Integer> computeActiveLeaseCountsByWorker(List<Lease> excludingAvailableLeases) {
// The set will give much faster lookup than the original list, an
// important optimization when the list is large
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
Set<Lease> availableLeasesSet = new HashSet<>(excludingAvailableLeases);

// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
for (Lease lease : allLeases.values()) {
if (!expiredLeasesSet.contains(lease)) {
String leaseOwner = lease.leaseOwner();
Integer oldCount = leaseCounts.get(leaseOwner);
if (oldCount == null) {
leaseCounts.put(leaseOwner, 1);
} else {
leaseCounts.put(leaseOwner, oldCount + 1);
}
}
}
Map<String, Integer> activeLeaseCountsByWorker = allLeases.values().stream()
.filter(lease -> !lease.isUnassigned() && !availableLeasesSet.contains(lease))
.collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1)));

// If I have no leases, I wasn't represented in leaseCounts. Let's fix that.
leaseCounts.putIfAbsent(workerIdentifier, 0);
// If I have no leases, I won't be represented in activeLeaseCountsByWorker. Let's fix that.
activeLeaseCountsByWorker.putIfAbsent(workerIdentifier, 0);

return leaseCounts;
return activeLeaseCountsByWorker;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public final void testStringJoin() {
}

@Test
public void test_computeLeaseCounts_noExpiredLease() throws Exception {
public void test_computeActiveLeaseCountsByWorker_noAvailableLeases() throws Exception {
final List<Lease> leases = new ImmutableList.Builder<Lease>()
.add(createLease(null, "1"))
.add(createLease("foo", "2"))
Expand All @@ -125,18 +125,17 @@ public void test_computeLeaseCounts_noExpiredLease() throws Exception {
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);

final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of());
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(ImmutableList.of());

final Map<String, Integer> expectedOutput = new HashMap<>();
expectedOutput.put(null, 1);
expectedOutput.put("foo", 1);
expectedOutput.put("bar", 1);
expectedOutput.put("baz", 1);
assertEquals(expectedOutput, actualOutput);
}

@Test
public void test_computeLeaseCounts_withExpiredLease() throws Exception {
public void test_computeActiveLeaseCountsByWorker_withAvailableLeases() throws Exception {
final List<Lease> leases = new ImmutableList.Builder<Lease>()
.add(createLease("foo", "2"))
.add(createLease("bar", "3"))
Expand All @@ -149,7 +148,7 @@ public void test_computeLeaseCounts_withExpiredLease() throws Exception {
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);

final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases);
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(leases);

final Map<String, Integer> expectedOutput = new HashMap<>();
expectedOutput.put("foo", 0);
Expand Down

0 comments on commit f9d5077

Please sign in to comment.