Skip to content

Commit

Permalink
Treat unassigned leases (as well as expired ones) as available-to-be-…
Browse files Browse the repository at this point in the history
…taken

If a lease is 'unassigned' (it has no lease owner) then it should be
considered available for taking in `DynamoDBLeaseTaker`. Prior to this
change, the only ways `DynamoDBLeaseTaker` could take leases for a
scheduler was either by incremental lease stealing, or waiting for the
lease to expire by not having been updated in `failoverTimeMillis` - which
could be slow if `failoverTimeMillis` was set reasonably high (with it set
to just 30s, I've seen new instances take over 3 minutes to take all leases
from old instances in a deployment).

This would be one half of the a fix for
#845 - the other
half of the fix is invoking `evictLease()` (setting the lease owner to
null) on graceful shutdown of a scheduler.
  • Loading branch information
rtyley committed Oct 4, 2021
1 parent e638c17 commit 613a96f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) {
}
}

/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
* @return true if lease is available (unassigned or expired as-of given time), false otherwise
*/
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) {
return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos);
}

/**
* Sets lastCounterIncrementNanos
*
Expand Down Expand Up @@ -299,6 +308,10 @@ public void leaseOwner(String leaseOwner) {
this.leaseOwner = leaseOwner;
}

public boolean isUnassigned() {
return leaseOwner == null;
}

/**
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
Expand All @@ -40,8 +40,7 @@
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.summingInt;
import static java.util.stream.Collectors.*;

/**
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
Expand Down Expand Up @@ -183,9 +182,7 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
return takenLeases;
}

List<Lease> expiredLeases = getExpiredLeases();

Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
Set<Lease> leasesToTake = computeLeasesToTake(getAvailableLeases());
Set<String> untakenLeaseKeys = new HashSet<>();

for (Lease lease : leasesToTake) {
Expand Down Expand Up @@ -320,49 +317,39 @@ private void updateAllLeases(Callable<Long> timeProvider)
}

/**
* @return list of leases that were expired as of our last scan.
* @return list of leases that were available (unassigned or expired) as of our last scan.
*/
private List<Lease> getExpiredLeases() {
List<Lease> expiredLeases = new ArrayList<>();

for (Lease lease : allLeases.values()) {
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
expiredLeases.add(lease);
}
}

return expiredLeases;
private List<Lease> getAvailableLeases() {
return allLeases.values().stream().filter(
lease -> lease.isAvailable(leaseDurationNanos, lastScanTimeNanos)
).collect(toList());
}

/**
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
* @param availableLeases list of leases we determined to be available (unassigned or expired)
* @return set of leases to take.
*/
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
private Set<Lease> computeLeasesToTake(List<Lease> availableLeases) {
Map<String, Integer> leaseCountPerWorker = computeActiveLeaseCountsByWorker(availableLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>();

int numLeases = 0;
int numWorkers = 0;
final int numLeases = allLeases.size();
final int numWorkers = leaseCountPerWorker.size();
int numLeasesToReachTarget = 0;
int leaseSpillover = 0;
int veryOldLeaseCount = 0;

try {
numLeases = allLeases.size();
numWorkers = leaseCounts.size();

if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any.
return leasesToTake;
}


int target;
if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
Expand All @@ -388,17 +375,17 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
}
}

int myCount = leaseCounts.get(workerIdentifier);
int myCount = leaseCountPerWorker.get(workerIdentifier);
numLeasesToReachTarget = target - myCount;

int currentLeaseCount = leaseCounts.get(workerIdentifier);
int currentLeaseCount = leaseCountPerWorker.get(workerIdentifier);
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
veryOldLeases = allLeases.values().stream()
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
.collect(Collectors.toList());
.collect(toList());

if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases);
Expand All @@ -415,17 +402,17 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
return leasesToTake;
}

// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
// Shuffle availableLeases so workers don't all try to contend for the same leases.
Collections.shuffle(availableLeases);

if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
if (availableLeases.size() > 0) {
// If we have available leases, get up to <needed> leases from availableLeases
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(availableLeases.remove(0));
}
} else {
// If there are no expired leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCountPerWorker, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
Expand All @@ -438,12 +425,12 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
log.info(
"Worker {} saw {} total leases, {} available leases, {} "
+ "workers. Target is {} leases, I have {} leases, I will take {} leases",
workerIdentifier, numLeases, expiredLeases.size(), numWorkers, target, myCount,
workerIdentifier, numLeases, availableLeases.size(), numWorkers, target, myCount,
leasesToTake.size());
}

} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("ExpiredLeases", availableLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
Expand Down Expand Up @@ -535,25 +522,29 @@ private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int ne
}

/**
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
* leases.
* Count leases by host, excluding all leases considered 'available to be taken' (ie unassigned or expired)
* using the supplied list. Always includes myself, but otherwise only includes hosts that are actively
* updating leases. There will be no entry for unassigned leases - ie the returned map will only have
* entries for non-null worker ids - so a good active-worker count is given by the size of the map.
*
* @param expiredLeases list of leases that are currently expired
* @param excludingAvailableLeases list of 'available' (unassigned/expired) leases to exclude from the count
* @return map of workerIdentifier to lease count
*/
private Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
private Map<String, Integer> computeActiveLeaseCountsByWorker(List<Lease> excludingAvailableLeases) {
// The set will give much faster lookup than the original list, an
// important optimization when the list is large
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
Set<Lease> availableLeasesSet = new HashSet<>(excludingAvailableLeases);

Map<String, Integer> leaseCounts = allLeases.values().stream()
.filter(lease -> !expiredLeasesSet.contains(lease))
Map<String, Integer> activeLeaseCountsByWorker = allLeases.values().stream()
.filter(lease -> !availableLeasesSet.contains(lease))
.collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1)));

// If I have no leases, I won't be represented in leaseCounts. Let's fix that.
leaseCounts.putIfAbsent(workerIdentifier, 0);
// If I have no leases, I won't be represented in activeLeaseCountsByWorker. Let's fix that.
activeLeaseCountsByWorker.putIfAbsent(workerIdentifier, 0);

activeLeaseCountsByWorker.remove(null); // explicitly exclude leases that are unassigned

return leaseCounts;
return activeLeaseCountsByWorker;
}

/**
Expand Down

0 comments on commit 613a96f

Please sign in to comment.