diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 4074db229..7f3029287 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -195,6 +195,15 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) { } } + /** + * @param leaseDurationNanos duration of lease in nanoseconds + * @param asOfNanos time in nanoseconds to check expiration as-of + * @return true if lease is available (unassigned or expired as-of given time), false otherwise + */ + public boolean isAvailable(long leaseDurationNanos, long asOfNanos) { + return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos); + } + /** * Sets lastCounterIncrementNanos * @@ -310,6 +319,10 @@ public void leaseOwner(String leaseOwner) { this.leaseOwner = leaseOwner; } + public boolean isUnassigned() { + return leaseOwner == null; + } + /** * Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 365b1cbaf..065875561 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -28,6 +28,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -44,6 +45,10 @@ import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.summingInt; +import static java.util.stream.Collectors.toList; + /** * An implementation of {@link LeaseTaker} that uses DynamoDB via {@link LeaseRefresher}. */ @@ -191,9 +196,7 @@ synchronized Map takeLeases(Callable timeProvider) return takenLeases; } - List expiredLeases = getExpiredLeases(); - - Set leasesToTake = computeLeasesToTake(expiredLeases); + Set leasesToTake = computeLeasesToTake(getAvailableLeases()); leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake); Set untakenLeaseKeys = new HashSet<>(); @@ -357,50 +360,39 @@ private void updateAllLeases(Callable timeProvider) } /** - * @return list of leases that were expired as of our last scan. + * @return list of leases that were available (unassigned or expired) as of our last scan. */ - private List getExpiredLeases() { - List expiredLeases = new ArrayList<>(); - - for (Lease lease : allLeases.values()) { - if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) { - expiredLeases.add(lease); - } - } - - return expiredLeases; + private List getAvailableLeases() { + return allLeases.values().stream().filter( + lease -> lease.isAvailable(leaseDurationNanos, lastScanTimeNanos) + ).collect(toList()); } /** * Compute the number of leases I should try to take based on the state of the system. * - * @param expiredLeases list of leases we determined to be expired - * @return set of leases to take. + * @param availableLeases list of leases we determined to be available (unassigned or expired) * @return set of leases to take. */ - private Set computeLeasesToTake(List expiredLeases) { - Map leaseCounts = computeLeaseCounts(expiredLeases); + private Set computeLeasesToTake(List availableLeases) { + Map leaseCountPerWorker = computeActiveLeaseCountsByWorker(availableLeases); Set leasesToTake = new HashSet<>(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); List veryOldLeases = new ArrayList<>(); - final int numAvailableLeases = expiredLeases.size(); - int numLeases = 0; - int numWorkers = 0; + final int numAvailableLeases = availableLeases.size(); + final int numLeases = allLeases.size(); + final int numWorkers = leaseCountPerWorker.size(); int numLeasesToReachTarget = 0; int leaseSpillover = 0; int veryOldLeaseCount = 0; try { - numLeases = allLeases.size(); - numWorkers = leaseCounts.size(); - if (numLeases == 0) { // If there are no leases, I shouldn't try to take any. return leasesToTake; } - int target; if (numWorkers >= numLeases) { // If we have n leases and n or more workers, each worker can have up to 1 lease, including myself. @@ -426,17 +418,17 @@ private Set computeLeasesToTake(List expiredLeases) { } } - int myCount = leaseCounts.get(workerIdentifier); + int myCount = leaseCountPerWorker.get(workerIdentifier); numLeasesToReachTarget = target - myCount; - int currentLeaseCount = leaseCounts.get(workerIdentifier); + int currentLeaseCount = leaseCountPerWorker.get(workerIdentifier); // If there are leases that have been expired for an extended period of // time, take them with priority, disregarding the target (computed // later) but obeying the maximum limit per worker. veryOldLeases = allLeases.values().stream() .filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos() > veryOldLeaseDurationNanosMultiplier * leaseDurationNanos) - .collect(Collectors.toList()); + .collect(toList()); if (!veryOldLeases.isEmpty()) { Collections.shuffle(veryOldLeases); @@ -453,17 +445,17 @@ private Set computeLeasesToTake(List expiredLeases) { return leasesToTake; } - // Shuffle expiredLeases so workers don't all try to contend for the same leases. - Collections.shuffle(expiredLeases); + // Shuffle availableLeases so workers don't all try to contend for the same leases. + Collections.shuffle(availableLeases); - if (expiredLeases.size() > 0) { - // If we have expired leases, get up to leases from expiredLeases - for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) { - leasesToTake.add(expiredLeases.remove(0)); + if (availableLeases.size() > 0) { + // If we have available leases, get up to leases from availableLeases + for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) { + leasesToTake.add(availableLeases.remove(0)); } } else { // If there are no expired leases and we need a lease, consider stealing. - List leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target); + List leasesToSteal = chooseLeasesToSteal(leaseCountPerWorker, numLeasesToReachTarget, target); for (Lease leaseToSteal : leasesToSteal) { log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}", workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(), @@ -481,7 +473,7 @@ private Set computeLeasesToTake(List expiredLeases) { } } finally { - scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED); scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED); @@ -574,36 +566,28 @@ private List chooseLeasesToSteal(Map leaseCounts, int ne } /** - * Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding - * leases. + * Count leases by host, excluding all leases considered 'available to be taken' (ie unassigned or expired) + * using the supplied list. Always includes myself, but otherwise only includes hosts that are actively + * updating leases. There will be no entry for unassigned leases - ie the returned map will only have + * entries for non-null worker ids - so a good active-worker count is given by the size of the map. * - * @param expiredLeases list of leases that are currently expired + * @param excludingAvailableLeases list of 'available' (unassigned/expired) leases to exclude from the count * @return map of workerIdentifier to lease count */ @VisibleForTesting - Map computeLeaseCounts(List expiredLeases) { - Map leaseCounts = new HashMap<>(); + Map computeActiveLeaseCountsByWorker(List excludingAvailableLeases) { // The set will give much faster lookup than the original list, an // important optimization when the list is large - Set expiredLeasesSet = new HashSet<>(expiredLeases); + Set availableLeasesSet = new HashSet<>(excludingAvailableLeases); - // Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired. - for (Lease lease : allLeases.values()) { - if (!expiredLeasesSet.contains(lease)) { - String leaseOwner = lease.leaseOwner(); - Integer oldCount = leaseCounts.get(leaseOwner); - if (oldCount == null) { - leaseCounts.put(leaseOwner, 1); - } else { - leaseCounts.put(leaseOwner, oldCount + 1); - } - } - } + Map activeLeaseCountsByWorker = allLeases.values().stream() + .filter(lease -> !lease.isUnassigned() && !availableLeasesSet.contains(lease)) + .collect(groupingBy(Lease::leaseOwner, summingInt(lease -> 1))); - // If I have no leases, I wasn't represented in leaseCounts. Let's fix that. - leaseCounts.putIfAbsent(workerIdentifier, 0); + // If I have no leases, I won't be represented in activeLeaseCountsByWorker. Let's fix that. + activeLeaseCountsByWorker.putIfAbsent(workerIdentifier, 0); - return leaseCounts; + return activeLeaseCountsByWorker; } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java index 193970f6f..fbe8cb08f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerTest.java @@ -111,7 +111,7 @@ public final void testStringJoin() { } @Test - public void test_computeLeaseCounts_noExpiredLease() throws Exception { + public void test_computeActiveLeaseCountsByWorker_noAvailableLeases() throws Exception { final List leases = new ImmutableList.Builder() .add(createLease(null, "1")) .add(createLease("foo", "2")) @@ -125,10 +125,9 @@ public void test_computeLeaseCounts_noExpiredLease() throws Exception { when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); when(timeProvider.call()).thenReturn(1000L); - final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(ImmutableList.of()); + final Map actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(ImmutableList.of()); final Map expectedOutput = new HashMap<>(); - expectedOutput.put(null, 1); expectedOutput.put("foo", 1); expectedOutput.put("bar", 1); expectedOutput.put("baz", 1); @@ -136,7 +135,7 @@ public void test_computeLeaseCounts_noExpiredLease() throws Exception { } @Test - public void test_computeLeaseCounts_withExpiredLease() throws Exception { + public void test_computeActiveLeaseCountsByWorker_withAvailableLeases() throws Exception { final List leases = new ImmutableList.Builder() .add(createLease("foo", "2")) .add(createLease("bar", "3")) @@ -149,7 +148,7 @@ public void test_computeLeaseCounts_withExpiredLease() throws Exception { when(metricsFactory.createMetrics()).thenReturn(new NullMetricsScope()); when(timeProvider.call()).thenReturn(1000L); - final Map actualOutput = dynamoDBLeaseTaker.computeLeaseCounts(leases); + final Map actualOutput = dynamoDBLeaseTaker.computeActiveLeaseCountsByWorker(leases); final Map expectedOutput = new HashMap<>(); expectedOutput.put("foo", 0);