Skip to content

Commit

Permalink
Treat unassigned leases (as well as expired ones) as available-to-be-…
Browse files Browse the repository at this point in the history
…taken

If a lease is 'unassigned' (it has no lease owner) then it should be
considered available for taking in `DynamoDBLeaseTaker`. Prior to this
change, the only ways `DynamoDBLeaseTaker` could take leases for a
scheduler was either by incremental lease stealing, or waiting for the
lease to expire by not having been updated in `failoverTimeMillis` - which
could be slow if `failoverTimeMillis` was set reasonably high (with it set
to just 30s, I've seen new instances take over 3 minutes to take all leases
from old instances in a deployment).

This would be one half of the a fix for
#845 - the other
half of the fix is invoking `evictLease()` (setting the lease owner to
null) on graceful shutdown of a scheduler.
  • Loading branch information
rtyley committed Nov 25, 2021
1 parent 25714f5 commit 5cdc9b5
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) {
}
}

/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
* @return true if lease is available (unassigned or expired as-of given time), false otherwise
*/
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) {
return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos);
}

/**
* Sets lastCounterIncrementNanos
*
Expand Down Expand Up @@ -299,6 +308,10 @@ public void leaseOwner(String leaseOwner) {
this.leaseOwner = leaseOwner;
}

public boolean isUnassigned() {
return leaseOwner == null;
}

/**
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
Expand All @@ -42,6 +42,10 @@
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.summingInt;
import static java.util.stream.Collectors.toList;

/**
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
*/
Expand Down Expand Up @@ -183,9 +187,7 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
return takenLeases;
}

List<Lease> expiredLeases = getExpiredLeases();

Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
Set<Lease> leasesToTake = computeLeasesToTake(getAvailableLeases());
Set<String> untakenLeaseKeys = new HashSet<>();

for (Lease lease : leasesToTake) {
Expand Down Expand Up @@ -320,50 +322,40 @@ private void updateAllLeases(Callable<Long> timeProvider)
}

/**
* @return list of leases that were expired as of our last scan.
* @return list of leases that were available (unassigned or expired) as of our last scan.
*/
private List<Lease> getExpiredLeases() {
List<Lease> expiredLeases = new ArrayList<>();

for (Lease lease : allLeases.values()) {
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
expiredLeases.add(lease);
}
}

return expiredLeases;
private List<Lease> getAvailableLeases() {
return allLeases.values().stream().filter(
lease -> lease.isAvailable(leaseDurationNanos, lastScanTimeNanos)
).collect(toList());
}

/**
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
* @param availableLeases list of leases we determined to be available (unassigned or expired)
* @return set of leases to take.
*/
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
private Set<Lease> computeLeasesToTake(List<Lease> availableLeases) {
Map<String, Integer> leaseCountPerWorker = computeActiveLeaseCountsByWorker(availableLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>();

final int numAvailableLeases = expiredLeases.size();
int numLeases = 0;
int numWorkers = 0;
final int numAvailableLeases = availableLeases.size();
final int numLeases = allLeases.size();
final int numWorkers = leaseCountPerWorker.size();
int numLeasesToReachTarget = 0;
int leaseSpillover = 0;
int veryOldLeaseCount = 0;

try {
numLeases = allLeases.size();
numWorkers = leaseCounts.size();

if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any.
return leasesToTake;
}


int target;
if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
Expand All @@ -389,17 +381,17 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
}
}

int myCount = leaseCounts.get(workerIdentifier);
int myCount = leaseCountPerWorker.get(workerIdentifier);
numLeasesToReachTarget = target - myCount;

int currentLeaseCount = leaseCounts.get(workerIdentifier);
int currentLeaseCount = leaseCountPerWorker.get(workerIdentifier);
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
veryOldLeases = allLeases.values().stream()
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
.collect(Collectors.toList());
.collect(toList());

if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases);
Expand All @@ -416,17 +408,17 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
return leasesToTake;
}

// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
// Shuffle availableLeases so workers don't all try to contend for the same leases.
Collections.shuffle(availableLeases);

if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
if (availableLeases.size() > 0) {
// If we have available leases, get up to <needed> leases from availableLeases
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(availableLeases.remove(0));
}
} else {
// If there are no expired leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCountPerWorker, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
Expand All @@ -444,7 +436,7 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
}

} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
Expand Down Expand Up @@ -536,40 +528,28 @@ private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int ne
}

/**
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
* leases.
* Count leases by host, excluding all leases considered 'available to be taken' (ie unassigned or expired)
* using the supplied list. Always includes myself, but otherwise only includes hosts that are actively
* updating leases. There will be no entry for unassigned leases - ie the returned map will only have
* entries for non-null worker ids - so a good active-worker count is given by the size of the map.
*
* @param expiredLeases list of leases that are currently expired
* @param excludingAvailableLeases list of 'available' (unassigned/expired) leases to exclude from the count
* @return map of workerIdentifier to lease count
*/
@VisibleForTesting
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = new HashMap<>();
Map<String, Integer> computeActiveLeaseCountsByWorker(List<Lease> excludingAvailableLeases) {
// The set will give much faster lookup than the original list, an
// important optimization when the list is large
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
Set<Lease> availableLeasesSet = new HashSet<>(excludingAvailableLeases);

// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
for (Lease lease : allLeases.values()) {
if (!expiredLeasesSet.contains(lease)) {
String leaseOwner = lease.leaseOwner();
Integer oldCount = leaseCounts.get(leaseOwner);
if (oldCount == null) {
leaseCounts.put(leaseOwner, 1);
} else {
leaseCounts.put(leaseOwner, oldCount + 1);
}
}
}
Map<String, Integer> activeLeaseCountsByWorker = allLeases.values().stream()
.filter(lease -> !lease.isUnassigned() && !availableLeasesSet.contains(lease))
.collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1)));

// If I have no leases, I wasn't represented in leaseCounts. Let's fix that.
Integer myCount = leaseCounts.get(workerIdentifier);
if (myCount == null) {
myCount = 0;
leaseCounts.put(workerIdentifier, myCount);
}
// If I have no leases, I won't be represented in activeLeaseCountsByWorker. Let's fix that.
activeLeaseCountsByWorker.putIfAbsent(workerIdentifier, 0);

return leaseCounts;
return activeLeaseCountsByWorker;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public final void testStringJoin() {
}

@Test
public void test_computeLeaseCounts_noExpiredLease() throws Exception {
public void test_computeActiveLeaseCountsByWorker_noAvailableLeases() throws Exception {
final List<Lease> leases = new ImmutableList.Builder<Lease>()
.add(createLease(null, "1"))
.add(createLease("foo", "2"))
Expand All @@ -125,18 +125,17 @@ public void test_computeLeaseCounts_noExpiredLease() throws Exception {
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);

final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of());
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(ImmutableList.of());

final Map<String, Integer> expectedOutput = new HashMap<>();
expectedOutput.put(null, 1);
expectedOutput.put("foo", 1);
expectedOutput.put("bar", 1);
expectedOutput.put("baz", 1);
assertEquals(expectedOutput, actualOutput);
}

@Test
public void test_computeLeaseCounts_withExpiredLease() throws Exception {
public void test_computeActiveLeaseCountsByWorker_withAvailableLeases() throws Exception {
final List<Lease> leases = new ImmutableList.Builder<Lease>()
.add(createLease("foo", "2"))
.add(createLease("bar", "3"))
Expand All @@ -149,7 +148,7 @@ public void test_computeLeaseCounts_withExpiredLease() throws Exception {
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);

final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases);
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(leases);

final Map<String, Integer> expectedOutput = new HashMap<>();
expectedOutput.put("foo", 0);
Expand Down

0 comments on commit 5cdc9b5

Please sign in to comment.