Skip to content

Commit

Permalink
Increased logging verbosity around lease management. Also included ad…
Browse files Browse the repository at this point in the history
…ditional javadocs for methods (#1040)

Co-authored-by: Ryan Pelaez <rmpelaez@amazon.com>
  • Loading branch information
pelaezryan and Ryan Pelaez authored Feb 13, 2023
1 parent 34f19c5 commit 9fb58a2
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ private void runShardSync() {
log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
} else {
log.info("Submitted shard sync task for stream {} because of reason {}",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName(),
shardSyncResponse.reasonForDecision());
}
} else {
log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(),
Expand All @@ -222,6 +226,14 @@ private void runShardSync() {
}
}

/**
* Retrieve all the streams, along with their associated leases
* @param streamIdentifiersToFilter
* @return
* @throws DependencyException
* @throws ProvisionedThroughputException
* @throws InvalidStateException
*/
private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(
final Set<StreamIdentifier> streamIdentifiersToFilter)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
Expand All @@ -242,6 +254,13 @@ private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(
}
}


/**
* Given a list of leases for a stream, determine if a shard sync is necessary.
* @param streamIdentifier
* @param leases
* @return
*/
@VisibleForTesting
ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) {
if (CollectionUtils.isNullOrEmpty(leases)) {
Expand Down Expand Up @@ -272,12 +291,24 @@ ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Leas
}
}

/**
* Object containing metadata about the state of a shard sync
*/
@Value
@Accessors(fluent = true)
@VisibleForTesting
static class ShardSyncResponse {

/**
* Flag to determine if a shard sync is necessary or not
*/
private final boolean shouldDoShardSync;

private final boolean isHoleDetected;

/**
* Reason behind the state of 'shouldDoShardSync' flag
*/
private final String reasonForDecision;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ void initialize() {

for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) {
try {
log.info("Initialization attempt {}", (i + 1));
log.info("Initializing LeaseCoordinator");
log.info("Initializing LeaseCoordinator attempt {}", (i + 1));
leaseCoordinator.initialize();

if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,24 @@ public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final Shard
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases,
initialPosition, inconsistentShardIds, multiStreamArgs);
log.debug("{} - Num new leases to create: {}", streamIdentifier, newLeasesToCreate.size());
log.info("{} - Number of new leases to create: {}", streamIdentifier, newLeasesToCreate.size());

final Set<Lease> createdLeases = new HashSet<>();

for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
boolean success = false;
try {
leaseRefresher.createLeaseIfNotExists(lease);
if(leaseRefresher.createLeaseIfNotExists(lease)) {
createdLeases.add(lease);
}
success = true;
} finally {
}
finally {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
}
}
log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases);
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
return true;
Expand Down Expand Up @@ -398,6 +405,7 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId
isDescendant = true;
// We don't need to add leases of its ancestors,
// because we'd have done it when creating a lease for this shard.
log.debug("{} - Shard {} is a descendant shard of an existing shard. Skipping lease creation", streamIdentifier, shardId);
} else {

final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
Expand Down Expand Up @@ -474,9 +482,12 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", lease.leaseKey(), lease.checkpoint());
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.checkpoint(convertToCheckpoint(initialPosition));
final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition);
log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", lease.leaseKey(), newCheckpoint, lease.checkpoint());
lease.checkpoint(newCheckpoint);
}
}
}
Expand Down Expand Up @@ -512,7 +523,7 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
* a/ they are not null
* b/ if they exist in the current shard map (i.e. haven't expired)
*
*
* @param shard Will return parents of this shard
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
Expand All @@ -538,6 +549,12 @@ public synchronized Lease createLeaseForChildShard(final ChildShard childShard,
: newKCLLeaseForChildShard(childShard);
}

/**
* Generate a lease object for the given Child Shard. Checkpoint is set to TRIM_HORIZON
* @param childShard Shard for which a lease should be created
* @return Lease for the shard
* @throws InvalidStateException If the child shard has no parent shards
*/
private static Lease newKCLLeaseForChildShard(final ChildShard childShard) throws InvalidStateException {
Lease newLease = new Lease();
newLease.leaseKey(childShard.shardId());
Expand Down Expand Up @@ -571,7 +588,7 @@ private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childS
/**
* Helper method to create a new Lease POJO for a shard.
* Note: Package level access only for testing purposes
*
*
* @param shard
* @return
*/
Expand Down Expand Up @@ -611,7 +628,7 @@ private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdent

/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
*
*
* @param shards List of shards
* @return ShardId->Shard map
*/
Expand All @@ -622,7 +639,7 @@ static Map<String, Shard> constructShardIdToShardMap(final List<Shard> shards) {
/**
* Helper method to return all the open shards for a stream.
* Note: Package level access only for testing purposes.
*
*
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
Expand All @@ -633,15 +650,15 @@ static List<Shard> getOpenShards(final List<Shard> allShards, final String strea

private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;

if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
checkpoint = ExtendedSequenceNumber.LATEST;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}

return checkpoint;
}

Expand Down Expand Up @@ -688,7 +705,7 @@ private static class StartingSequenceNumberAndShardIdBasedComparator implements
* We assume that lease1 and lease2 are:
* a/ not null,
* b/ shards (if found) have non-null starting sequence numbers
*
*
* {@inheritDoc}
*/
@Override
Expand All @@ -698,18 +715,18 @@ public int compare(final Lease lease1, final Lease lease2) {
final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs);
final Shard shard1 = shardIdToShardMap.get(shardId1);
final Shard shard2 = shardIdToShardMap.get(shardId2);

// If we found shards for the two leases, use comparison of the starting sequence numbers
if (shard1 != null && shard2 != null) {
BigInteger sequenceNumber1 = new BigInteger(shard1.sequenceNumberRange().startingSequenceNumber());
BigInteger sequenceNumber2 = new BigInteger(shard2.sequenceNumberRange().startingSequenceNumber());
result = sequenceNumber1.compareTo(sequenceNumber2);
result = sequenceNumber1.compareTo(sequenceNumber2);
}

if (result == 0) {
result = shardId1.compareTo(shardId2);
}

return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,18 @@ private int leasesPendingDeletion() {
return deletionQueue.size();
}

/**
*
* @return true if the 'Completed Lease Stopwatch' has elapsed more time than the 'Completed Lease Cleanup Interval'
*/
private boolean timeToCheckForCompletedShard() {
return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis;
}

/**
*
* @return true if the 'Garbage Lease Stopwatch' has elapsed more time than the 'Garbage Lease Cleanup Interval'
*/
private boolean timeToCheckForGarbageShard() {
return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
}
Expand Down Expand Up @@ -230,6 +238,15 @@ private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws Dep
return true;
}

/**
* Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a lease, return false. Otherwise return true
* @param lease
* @param shardInfo
* @return
* @throws DependencyException
* @throws ProvisionedThroughputException
* @throws InvalidStateException
*/
private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
for (String parentShard : lease.parentShardIds()) {
final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @No
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();
}else{
} else {
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
.build();
Expand Down Expand Up @@ -429,7 +429,7 @@ private List<Lease> list(Integer limit, Integer maxPages, StreamIdentifier strea
@Override
public boolean createLeaseIfNotExists(@NonNull final Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Creating lease {}", lease);
log.debug("Creating lease: {}", lease);

PutItemRequest request = PutItemRequest.builder().tableName(table).item(serializer.toDynamoRecord(lease))
.expected(serializer.getDynamoNonexistantExpectation()).build();
Expand All @@ -452,6 +452,7 @@ public boolean createLeaseIfNotExists(@NonNull final Lease lease)
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("create", lease.leaseKey(), e);
}
log.info("Created lease: {}",lease);
return true;
}

Expand All @@ -476,7 +477,7 @@ public Lease getLease(@NonNull final String leaseKey)
return null;
} else {
final Lease lease = serializer.fromDynamoRecord(dynamoRecord);
log.debug("Got lease {}", lease);
log.debug("Retrieved lease: {}", lease);
return lease;
}
} catch (ExecutionException e) {
Expand Down Expand Up @@ -535,6 +536,7 @@ public boolean renewLease(@NonNull final Lease lease)
}

lease.leaseCounter(lease.leaseCounter() + 1);
log.debug("Renewed lease with key {}", lease.leaseKey());
return true;
}

Expand Down Expand Up @@ -582,6 +584,8 @@ public boolean takeLease(@NonNull final Lease lease, @NonNull final String owner
lease.ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint() + 1);
}

log.info("Transferred lease {} ownership from {} to {}", lease.leaseKey(), oldOwner, owner);

return true;
}

Expand Down Expand Up @@ -620,6 +624,8 @@ public boolean evictLease(@NonNull final Lease lease)

lease.leaseOwner(null);
lease.leaseCounter(lease.leaseCounter() + 1);

log.info("Evicted lease with leaseKey {}", lease.leaseKey());
return true;
}

Expand Down Expand Up @@ -648,6 +654,7 @@ public void deleteAll() throws DependencyException, InvalidStateException, Provi
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("deleteAll", lease.leaseKey(), e);
}
log.debug("Deleted lease {} from table {}", lease.leaseKey(), table);
}
}

Expand Down Expand Up @@ -675,6 +682,8 @@ public void deleteLease(@NonNull final Lease lease)
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("delete", lease.leaseKey(), e);
}

log.info("Deleted lease with leaseKey {}", lease.leaseKey());
}

/**
Expand All @@ -683,7 +692,7 @@ public void deleteLease(@NonNull final Lease lease)
@Override
public boolean updateLease(@NonNull final Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Updating lease {}", lease);
log.debug("Updating lease: {}", lease);

final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ConditionalCheckFailedException.class, t -> t);
Expand Down Expand Up @@ -711,6 +720,7 @@ public boolean updateLease(@NonNull final Lease lease)
}

lease.leaseCounter(lease.leaseCounter() + 1);
log.info("Updated lease {}.", lease.leaseKey());
return true;
}

Expand Down Expand Up @@ -738,6 +748,8 @@ public void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField)
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("update", lease.leaseKey(), e);
}

log.info("Updated lease without expectation {}.", lease);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public Lease getCurrentlyHeldLease(String leaseKey) {

/**
* Internal method to return a lease with a specific lease key only if we currently hold it.
*
*
* @param leaseKey key of lease to return
* @param now current timestamp for old-ness checking
* @return non-authoritative copy of the held lease, or null if we don't currently hold it
Expand Down Expand Up @@ -309,6 +309,7 @@ public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String o
long startTime = System.currentTimeMillis();
boolean success = false;
try {
log.info("Updating lease from {} to {}", authoritativeLease, lease);
synchronized (authoritativeLease) {
authoritativeLease.update(lease);
boolean updatedLease = leaseRefresher.updateLease(authoritativeLease);
Expand All @@ -325,15 +326,15 @@ public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String o
/*
* Remove only if the value currently in the map is the same as the authoritative lease. We're
* guarding against a pause after the concurrency token check above. It plays out like so:
*
*
* 1) Concurrency token check passes
* 2) Pause. Lose lease, re-acquire lease. This requires at least one lease counter update.
* 3) Unpause. leaseRefresher.updateLease fails conditional write due to counter updates, returns
* false.
* 4) ownedLeases.remove(key, value) doesn't do anything because authoritativeLease does not
* .equals() the re-acquired version in the map on the basis of lease counter. This is what we want.
* If we just used ownedLease.remove(key), we would have pro-actively removed a lease incorrectly.
*
*
* Note that there is a subtlety here - Lease.equals() deliberately does not check the concurrency
* token, but it does check the lease counter, so this scheme works.
*/
Expand Down
Loading

0 comments on commit 9fb58a2

Please sign in to comment.