diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index b4999bec3..a2d05e6d7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -202,6 +202,10 @@ private void runShardSync() { log.warn( "Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.", shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + } else { + log.info("Submitted shard sync task for stream {} because of reason {}", + shardSyncTaskManager.shardDetector().streamIdentifier().streamName(), + shardSyncResponse.reasonForDecision()); } } else { log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(), @@ -222,6 +226,14 @@ private void runShardSync() { } } + /** + * Retrieve all the streams, along with their associated leases + * @param streamIdentifiersToFilter + * @return + * @throws DependencyException + * @throws ProvisionedThroughputException + * @throws InvalidStateException + */ private Map> getStreamToLeasesMap( final Set streamIdentifiersToFilter) throws DependencyException, ProvisionedThroughputException, InvalidStateException { @@ -242,6 +254,13 @@ private Map> getStreamToLeasesMap( } } + + /** + * Given a list of leases for a stream, determine if a shard sync is necessary. + * @param streamIdentifier + * @param leases + * @return + */ @VisibleForTesting ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List leases) { if (CollectionUtils.isNullOrEmpty(leases)) { @@ -272,12 +291,24 @@ ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases, initialPosition, inconsistentShardIds, multiStreamArgs); - log.debug("{} - Num new leases to create: {}", streamIdentifier, newLeasesToCreate.size()); + log.info("{} - Number of new leases to create: {}", streamIdentifier, newLeasesToCreate.size()); + + final Set createdLeases = new HashSet<>(); + for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); boolean success = false; try { - leaseRefresher.createLeaseIfNotExists(lease); + if(leaseRefresher.createLeaseIfNotExists(lease)) { + createdLeases.add(lease); + } success = true; - } finally { + } + finally { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); } } + log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases); final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); return true; @@ -398,6 +405,7 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId isDescendant = true; // We don't need to add leases of its ancestors, // because we'd have done it when creating a lease for this shard. + log.debug("{} - Shard {} is a descendant shard of an existing shard. Skipping lease creation", streamIdentifier, shardId); } else { final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId); @@ -474,9 +482,12 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId if (descendantParentShardIds.contains(parentShardId) && !initialPosition.getInitialPositionInStream() .equals(InitialPositionInStream.AT_TIMESTAMP)) { + log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", lease.leaseKey(), lease.checkpoint()); lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); } else { - lease.checkpoint(convertToCheckpoint(initialPosition)); + final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition); + log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", lease.leaseKey(), newCheckpoint, lease.checkpoint()); + lease.checkpoint(newCheckpoint); } } } @@ -512,7 +523,7 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId * Helper method to get parent shardIds of the current shard - includes the parent shardIds if: * a/ they are not null * b/ if they exist in the current shard map (i.e. haven't expired) - * + * * @param shard Will return parents of this shard * @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream. * @return Set of parentShardIds @@ -538,6 +549,12 @@ public synchronized Lease createLeaseForChildShard(final ChildShard childShard, : newKCLLeaseForChildShard(childShard); } + /** + * Generate a lease object for the given Child Shard. Checkpoint is set to TRIM_HORIZON + * @param childShard Shard for which a lease should be created + * @return Lease for the shard + * @throws InvalidStateException If the child shard has no parent shards + */ private static Lease newKCLLeaseForChildShard(final ChildShard childShard) throws InvalidStateException { Lease newLease = new Lease(); newLease.leaseKey(childShard.shardId()); @@ -571,7 +588,7 @@ private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childS /** * Helper method to create a new Lease POJO for a shard. * Note: Package level access only for testing purposes - * + * * @param shard * @return */ @@ -611,7 +628,7 @@ private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdent /** * Helper method to construct a shardId->Shard map for the specified list of shards. - * + * * @param shards List of shards * @return ShardId->Shard map */ @@ -622,7 +639,7 @@ static Map constructShardIdToShardMap(final List shards) { /** * Helper method to return all the open shards for a stream. * Note: Package level access only for testing purposes. - * + * * @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list. * @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active. */ @@ -633,7 +650,7 @@ static List getOpenShards(final List allShards, final String strea private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) { ExtendedSequenceNumber checkpoint = null; - + if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) { checkpoint = ExtendedSequenceNumber.TRIM_HORIZON; } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) { @@ -641,7 +658,7 @@ private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionI } else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) { checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP; } - + return checkpoint; } @@ -688,7 +705,7 @@ private static class StartingSequenceNumberAndShardIdBasedComparator implements * We assume that lease1 and lease2 are: * a/ not null, * b/ shards (if found) have non-null starting sequence numbers - * + * * {@inheritDoc} */ @Override @@ -698,18 +715,18 @@ public int compare(final Lease lease1, final Lease lease2) { final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs); final Shard shard1 = shardIdToShardMap.get(shardId1); final Shard shard2 = shardIdToShardMap.get(shardId2); - + // If we found shards for the two leases, use comparison of the starting sequence numbers if (shard1 != null && shard2 != null) { BigInteger sequenceNumber1 = new BigInteger(shard1.sequenceNumberRange().startingSequenceNumber()); BigInteger sequenceNumber2 = new BigInteger(shard2.sequenceNumberRange().startingSequenceNumber()); - result = sequenceNumber1.compareTo(sequenceNumber2); + result = sequenceNumber1.compareTo(sequenceNumber2); } - + if (result == 0) { result = shardId1.compareTo(shardId2); } - + return result; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index a5928c2a7..f9e52e1c5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -145,10 +145,18 @@ private int leasesPendingDeletion() { return deletionQueue.size(); } + /** + * + * @return true if the 'Completed Lease Stopwatch' has elapsed more time than the 'Completed Lease Cleanup Interval' + */ private boolean timeToCheckForCompletedShard() { return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis; } + /** + * + * @return true if the 'Garbage Lease Stopwatch' has elapsed more time than the 'Garbage Lease Cleanup Interval' + */ private boolean timeToCheckForGarbageShard() { return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis; } @@ -230,6 +238,15 @@ private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws Dep return true; } + /** + * Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a lease, return false. Otherwise return true + * @param lease + * @param shardInfo + * @return + * @throws DependencyException + * @throws ProvisionedThroughputException + * @throws InvalidStateException + */ private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException { for (String parentShard : lease.parentShardIds()) { final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard)); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 361db9f92..acb61a380 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -169,7 +169,7 @@ public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @No request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) .attributeDefinitions(serializer.getAttributeDefinitions()) .billingMode(billingMode).build(); - }else{ + } else { request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) .attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput) .build(); @@ -429,7 +429,7 @@ private List list(Integer limit, Integer maxPages, StreamIdentifier strea @Override public boolean createLeaseIfNotExists(@NonNull final Lease lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - log.debug("Creating lease {}", lease); + log.debug("Creating lease: {}", lease); PutItemRequest request = PutItemRequest.builder().tableName(table).item(serializer.toDynamoRecord(lease)) .expected(serializer.getDynamoNonexistantExpectation()).build(); @@ -452,6 +452,7 @@ public boolean createLeaseIfNotExists(@NonNull final Lease lease) } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("create", lease.leaseKey(), e); } + log.info("Created lease: {}",lease); return true; } @@ -476,7 +477,7 @@ public Lease getLease(@NonNull final String leaseKey) return null; } else { final Lease lease = serializer.fromDynamoRecord(dynamoRecord); - log.debug("Got lease {}", lease); + log.debug("Retrieved lease: {}", lease); return lease; } } catch (ExecutionException e) { @@ -535,6 +536,7 @@ public boolean renewLease(@NonNull final Lease lease) } lease.leaseCounter(lease.leaseCounter() + 1); + log.debug("Renewed lease with key {}", lease.leaseKey()); return true; } @@ -582,6 +584,8 @@ public boolean takeLease(@NonNull final Lease lease, @NonNull final String owner lease.ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint() + 1); } + log.info("Transferred lease {} ownership from {} to {}", lease.leaseKey(), oldOwner, owner); + return true; } @@ -620,6 +624,8 @@ public boolean evictLease(@NonNull final Lease lease) lease.leaseOwner(null); lease.leaseCounter(lease.leaseCounter() + 1); + + log.info("Evicted lease with leaseKey {}", lease.leaseKey()); return true; } @@ -648,6 +654,7 @@ public void deleteAll() throws DependencyException, InvalidStateException, Provi } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("deleteAll", lease.leaseKey(), e); } + log.debug("Deleted lease {} from table {}", lease.leaseKey(), table); } } @@ -675,6 +682,8 @@ public void deleteLease(@NonNull final Lease lease) } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("delete", lease.leaseKey(), e); } + + log.info("Deleted lease with leaseKey {}", lease.leaseKey()); } /** @@ -683,7 +692,7 @@ public void deleteLease(@NonNull final Lease lease) @Override public boolean updateLease(@NonNull final Lease lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - log.debug("Updating lease {}", lease); + log.debug("Updating lease: {}", lease); final AWSExceptionManager exceptionManager = createExceptionManager(); exceptionManager.add(ConditionalCheckFailedException.class, t -> t); @@ -711,6 +720,7 @@ public boolean updateLease(@NonNull final Lease lease) } lease.leaseCounter(lease.leaseCounter() + 1); + log.info("Updated lease {}.", lease.leaseKey()); return true; } @@ -738,6 +748,8 @@ public void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField) } catch (DynamoDbException | TimeoutException e) { throw convertAndRethrowExceptions("update", lease.leaseKey(), e); } + + log.info("Updated lease without expectation {}.", lease); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index e457b5ec3..ab2d38c5d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -242,7 +242,7 @@ public Lease getCurrentlyHeldLease(String leaseKey) { /** * Internal method to return a lease with a specific lease key only if we currently hold it. - * + * * @param leaseKey key of lease to return * @param now current timestamp for old-ness checking * @return non-authoritative copy of the held lease, or null if we don't currently hold it @@ -309,6 +309,7 @@ public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String o long startTime = System.currentTimeMillis(); boolean success = false; try { + log.info("Updating lease from {} to {}", authoritativeLease, lease); synchronized (authoritativeLease) { authoritativeLease.update(lease); boolean updatedLease = leaseRefresher.updateLease(authoritativeLease); @@ -325,7 +326,7 @@ public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String o /* * Remove only if the value currently in the map is the same as the authoritative lease. We're * guarding against a pause after the concurrency token check above. It plays out like so: - * + * * 1) Concurrency token check passes * 2) Pause. Lose lease, re-acquire lease. This requires at least one lease counter update. * 3) Unpause. leaseRefresher.updateLease fails conditional write due to counter updates, returns @@ -333,7 +334,7 @@ public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String o * 4) ownedLeases.remove(key, value) doesn't do anything because authoritativeLease does not * .equals() the re-acquired version in the map on the basis of lease counter. This is what we want. * If we just used ownedLease.remove(key), we would have pro-actively removed a lease incorrectly. - * + * * Note that there is a subtlety here - Lease.equals() deliberately does not check the concurrency * token, but it does check the lease counter, so this scheme works. */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index a90ef56e0..4a4f086fd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -259,6 +259,7 @@ private Set updateStaleLeasesWithLatestState(long updateAllLeasesEndTime, leasesToTake = leasesToTake.stream().map(lease -> { if (lease.isMarkedForLeaseSteal()) { try { + log.debug("Updating stale lease {}.", lease.leaseKey()); return leaseRefresher.getLease(lease.leaseKey()); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { log.warn("Failed to fetch latest state of the lease {} that needs to be stolen, " @@ -408,7 +409,7 @@ private Set computeLeasesToTake(List expiredLeases) { target = 1; } else { /* - * numWorkers must be < numLeases. + * if we have made it here, it means there are more leases than workers * * Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases) */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index c2c5c790e..c436f38a8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -267,13 +267,14 @@ private void createLeasesForChildShardsIfNotExist() } } } - // Attempt create leases for child shards. for(ChildShard childShard : childShards) { final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId()); if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) { + log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); - log.info("Shard {}: Created child shard lease: {}", shardInfo.shardId(), leaseToCreate.leaseKey()); + + log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate); } } }