Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat unassigned leases (as well as expired ones) as available-to-be-taken #848

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) {
}
}

/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
* @return true if lease is available (unassigned or expired as-of given time), false otherwise
*/
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) {
return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos);
}

/**
* Sets lastCounterIncrementNanos
*
Expand Down Expand Up @@ -310,6 +319,10 @@ public void leaseOwner(String leaseOwner) {
this.leaseOwner = leaseOwner;
}

public boolean isUnassigned() {
return leaseOwner == null;
}

/**
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
Expand All @@ -44,6 +45,10 @@

import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.summingInt;
import static java.util.stream.Collectors.toList;

/**
* An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}.
*/
Expand Down Expand Up @@ -191,9 +196,7 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
return takenLeases;
}

List<Lease> expiredLeases = getExpiredLeases();

Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
Set<Lease> leasesToTake = computeLeasesToTake(getAvailableLeases());
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);

Set<String> untakenLeaseKeys = new HashSet<>();
Expand Down Expand Up @@ -357,50 +360,39 @@ private void updateAllLeases(Callable<Long> timeProvider)
}

/**
* @return list of leases that were expired as of our last scan.
* @return list of leases that were available (unassigned or expired) as of our last scan.
*/
private List<Lease> getExpiredLeases() {
List<Lease> expiredLeases = new ArrayList<>();

for (Lease lease : allLeases.values()) {
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
expiredLeases.add(lease);
}
}

return expiredLeases;
private List<Lease> getAvailableLeases() {
return allLeases.values().stream().filter(
lease -> lease.isAvailable(leaseDurationNanos, lastScanTimeNanos)
).collect(toList());
}

/**
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
* @return set of leases to take.
* @param availableLeases list of leases we determined to be available (unassigned or expired) * @return set of leases to take.
*/
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
private Set<Lease> computeLeasesToTake(List<Lease> availableLeases) {
Map<String, Integer> leaseCountPerWorker = computeActiveLeaseCountsByWorker(availableLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>();

final int numAvailableLeases = expiredLeases.size();
int numLeases = 0;
int numWorkers = 0;
final int numAvailableLeases = availableLeases.size();
final int numLeases = allLeases.size();
final int numWorkers = leaseCountPerWorker.size();
int numLeasesToReachTarget = 0;
int leaseSpillover = 0;
int veryOldLeaseCount = 0;

try {
numLeases = allLeases.size();
numWorkers = leaseCounts.size();

if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any.
return leasesToTake;
}


int target;
if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
Expand All @@ -426,17 +418,17 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
}
}

int myCount = leaseCounts.get(workerIdentifier);
int myCount = leaseCountPerWorker.get(workerIdentifier);
numLeasesToReachTarget = target - myCount;

int currentLeaseCount = leaseCounts.get(workerIdentifier);
int currentLeaseCount = leaseCountPerWorker.get(workerIdentifier);
// If there are leases that have been expired for an extended period of
// time, take them with priority, disregarding the target (computed
// later) but obeying the maximum limit per worker.
veryOldLeases = allLeases.values().stream()
.filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
> veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
.collect(Collectors.toList());
.collect(toList());

if (!veryOldLeases.isEmpty()) {
Collections.shuffle(veryOldLeases);
Expand All @@ -453,17 +445,17 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
return leasesToTake;
}

// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
// Shuffle availableLeases so workers don't all try to contend for the same leases.
Collections.shuffle(availableLeases);

if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
if (availableLeases.size() > 0) {
// If we have available leases, get up to <needed> leases from availableLeases
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(availableLeases.remove(0));
}
} else {
// If there are no expired leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCountPerWorker, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
Expand All @@ -481,7 +473,7 @@ private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
}

} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
Expand Down Expand Up @@ -574,36 +566,28 @@ private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int ne
}

/**
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
* leases.
* Count leases by host, excluding all leases considered 'available to be taken' (ie unassigned or expired)
* using the supplied list. Always includes myself, but otherwise only includes hosts that are actively
* updating leases. There will be no entry for unassigned leases - ie the returned map will only have
* entries for non-null worker ids - so a good active-worker count is given by the size of the map.
*
* @param expiredLeases list of leases that are currently expired
* @param excludingAvailableLeases list of 'available' (unassigned/expired) leases to exclude from the count
* @return map of workerIdentifier to lease count
*/
@VisibleForTesting
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = new HashMap<>();
Map<String, Integer> computeActiveLeaseCountsByWorker(List<Lease> excludingAvailableLeases) {
// The set will give much faster lookup than the original list, an
// important optimization when the list is large
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
Set<Lease> availableLeasesSet = new HashSet<>(excludingAvailableLeases);

// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
for (Lease lease : allLeases.values()) {
if (!expiredLeasesSet.contains(lease)) {
String leaseOwner = lease.leaseOwner();
Integer oldCount = leaseCounts.get(leaseOwner);
if (oldCount == null) {
leaseCounts.put(leaseOwner, 1);
} else {
leaseCounts.put(leaseOwner, oldCount + 1);
}
}
}
Map<String, Integer> activeLeaseCountsByWorker = allLeases.values().stream()
.filter(lease -> !lease.isUnassigned() && !availableLeasesSet.contains(lease))
.collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1)));
Comment on lines +584 to +585
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here filtering for !lease.isUnassigned() is what now stops the groupingBy Lease::leaseOwner from failing with the NullPointerException: element cannot be mapped to a null key error reported in #861 - given that unassigned leases are filtered out, there is no attempt to insert a null key entry in the map, so the code doesn't crash.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want to keep this change given that #861 has been merged?


// If I have no leases, I wasn't represented in leaseCounts. Let's fix that.
leaseCounts.putIfAbsent(workerIdentifier, 0);
// If I have no leases, I won't be represented in activeLeaseCountsByWorker. Let's fix that.
activeLeaseCountsByWorker.putIfAbsent(workerIdentifier, 0);

return leaseCounts;
return activeLeaseCountsByWorker;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public final void testStringJoin() {
}

@Test
public void test_computeLeaseCounts_noExpiredLease() throws Exception {
public void test_computeActiveLeaseCountsByWorker_noAvailableLeases() throws Exception {
final List<Lease> leases = new ImmutableList.Builder<Lease>()
.add(createLease(null, "1"))
.add(createLease("foo", "2"))
Expand All @@ -125,18 +125,17 @@ public void test_computeLeaseCounts_noExpiredLease() throws Exception {
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);

final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of());
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(ImmutableList.of());

final Map<String, Integer> expectedOutput = new HashMap<>();
expectedOutput.put(null, 1);
expectedOutput.put("foo", 1);
expectedOutput.put("bar", 1);
expectedOutput.put("baz", 1);
assertEquals(expectedOutput, actualOutput);
}

@Test
public void test_computeLeaseCounts_withExpiredLease() throws Exception {
public void test_computeActiveLeaseCountsByWorker_withAvailableLeases() throws Exception {
final List<Lease> leases = new ImmutableList.Builder<Lease>()
.add(createLease("foo", "2"))
.add(createLease("bar", "3"))
Expand All @@ -149,7 +148,7 @@ public void test_computeLeaseCounts_withExpiredLease() throws Exception {
when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope());
when(timeProvider.call()).thenReturn(1000L);

final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases);
final Map<String, Integer> actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(leases);

final Map<String, Integer> expectedOutput = new HashMap<>();
expectedOutput.put("foo", 0);
Expand Down