Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increased logging verbosity around lease management #1040

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ private void runShardSync() {
log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
} else {
log.info("Submitted shard sync task for stream {} because of reason {}",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName(),
shardSyncResponse.reasonForDecision());
}
} else {
log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(),
Expand All @@ -222,6 +226,14 @@ private void runShardSync() {
}
}

/**
* Retrieve all the streams, along with their associated leases
* @param streamIdentifiersToFilter
* @return
* @throws DependencyException
* @throws ProvisionedThroughputException
* @throws InvalidStateException
*/
private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(
final Set<StreamIdentifier> streamIdentifiersToFilter)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
Expand All @@ -242,6 +254,13 @@ private Map<StreamIdentifier, List<Lease>> getStreamToLeasesMap(
}
}


/**
* Given a list of leases for a stream, determine if a shard sync is necessary.
* @param streamIdentifier
* @param leases
* @return
*/
@VisibleForTesting
ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Lease> leases) {
if (CollectionUtils.isNullOrEmpty(leases)) {
Expand Down Expand Up @@ -272,12 +291,24 @@ ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Leas
}
}

/**
* Object containing metadata about the state of a shard sync
*/
@Value
@Accessors(fluent = true)
@VisibleForTesting
static class ShardSyncResponse {

/**
* Flag to determine if a shard sync is necessary or not
*/
private final boolean shouldDoShardSync;

private final boolean isHoleDetected;

/**
* Reason behind the state of 'shouldDoShardSync' flag
*/
private final String reasonForDecision;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ void initialize() {

for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) {
try {
log.info("Initialization attempt {}", (i + 1));
log.info("Initializing LeaseCoordinator");
log.info("Initializing LeaseCoordinator attempt {}", (i + 1));
leaseCoordinator.initialize();

if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,24 @@ public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final Shard
new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap);
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(leaseSynchronizer, latestShards, currentLeases,
initialPosition, inconsistentShardIds, multiStreamArgs);
log.debug("{} - Num new leases to create: {}", streamIdentifier, newLeasesToCreate.size());
log.info("{} - Number of new leases to create: {}", streamIdentifier, newLeasesToCreate.size());

final Set<Lease> createdLeases = new HashSet<>();

for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis();
boolean success = false;
try {
leaseRefresher.createLeaseIfNotExists(lease);
if(leaseRefresher.createLeaseIfNotExists(lease)) {
createdLeases.add(lease);
}
success = true;
} finally {
}
finally {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
}
}
log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases);
final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate);
return true;
Expand Down Expand Up @@ -398,6 +405,7 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId
isDescendant = true;
// We don't need to add leases of its ancestors,
// because we'd have done it when creating a lease for this shard.
log.debug("{} - Shard {} is a descendant shard of an existing shard. Skipping lease creation", streamIdentifier, shardId);
} else {

final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
Expand Down Expand Up @@ -474,9 +482,12 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
log.info("Setting Lease '{}' checkpoint to 'TRIM_HORIZON'. Checkpoint was previously set to {}", lease.leaseKey(), lease.checkpoint());
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.checkpoint(convertToCheckpoint(initialPosition));
final ExtendedSequenceNumber newCheckpoint = convertToCheckpoint(initialPosition);
log.info("Setting Lease '{}' checkpoint to '{}'. Checkpoint was previously set to {}", lease.leaseKey(), newCheckpoint, lease.checkpoint());
lease.checkpoint(newCheckpoint);
}
}
}
Expand Down Expand Up @@ -512,7 +523,7 @@ static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId
* Helper method to get parent shardIds of the current shard - includes the parent shardIds if:
* a/ they are not null
* b/ if they exist in the current shard map (i.e. haven't expired)
*
*
* @param shard Will return parents of this shard
* @param shardIdToShardMapOfAllKinesisShards ShardId->Shard map containing all shards obtained via DescribeStream.
* @return Set of parentShardIds
Expand All @@ -538,6 +549,12 @@ public synchronized Lease createLeaseForChildShard(final ChildShard childShard,
: newKCLLeaseForChildShard(childShard);
}

/**
* Generate a lease object for the given Child Shard. Checkpoint is set to TRIM_HORIZON
* @param childShard Shard for which a lease should be created
* @return Lease for the shard
* @throws InvalidStateException If the child shard has no parent shards
*/
private static Lease newKCLLeaseForChildShard(final ChildShard childShard) throws InvalidStateException {
Lease newLease = new Lease();
newLease.leaseKey(childShard.shardId());
Expand Down Expand Up @@ -571,7 +588,7 @@ private static Lease newKCLMultiStreamLeaseForChildShard(final ChildShard childS
/**
* Helper method to create a new Lease POJO for a shard.
* Note: Package level access only for testing purposes
*
*
* @param shard
* @return
*/
Expand Down Expand Up @@ -611,7 +628,7 @@ private static Lease newKCLMultiStreamLease(final Shard shard, final StreamIdent

/**
* Helper method to construct a shardId->Shard map for the specified list of shards.
*
*
* @param shards List of shards
* @return ShardId->Shard map
*/
Expand All @@ -622,7 +639,7 @@ static Map<String, Shard> constructShardIdToShardMap(final List<Shard> shards) {
/**
* Helper method to return all the open shards for a stream.
* Note: Package level access only for testing purposes.
*
*
* @param allShards All shards returved via DescribeStream. We assume this to represent a consistent shard list.
* @return List of open shards (shards at the tip of the stream) - may include shards that are not yet active.
*/
Expand All @@ -633,15 +650,15 @@ static List<Shard> getOpenShards(final List<Shard> allShards, final String strea

private static ExtendedSequenceNumber convertToCheckpoint(final InitialPositionInStreamExtended position) {
ExtendedSequenceNumber checkpoint = null;

if (position.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
checkpoint = ExtendedSequenceNumber.TRIM_HORIZON;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
checkpoint = ExtendedSequenceNumber.LATEST;
} else if (position.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
checkpoint = ExtendedSequenceNumber.AT_TIMESTAMP;
}

return checkpoint;
}

Expand Down Expand Up @@ -688,7 +705,7 @@ private static class StartingSequenceNumberAndShardIdBasedComparator implements
* We assume that lease1 and lease2 are:
* a/ not null,
* b/ shards (if found) have non-null starting sequence numbers
*
*
* {@inheritDoc}
*/
@Override
Expand All @@ -698,18 +715,18 @@ public int compare(final Lease lease1, final Lease lease2) {
final String shardId2 = shardIdFromLeaseDeducer.apply(lease2, multiStreamArgs);
final Shard shard1 = shardIdToShardMap.get(shardId1);
final Shard shard2 = shardIdToShardMap.get(shardId2);

// If we found shards for the two leases, use comparison of the starting sequence numbers
if (shard1 != null && shard2 != null) {
BigInteger sequenceNumber1 = new BigInteger(shard1.sequenceNumberRange().startingSequenceNumber());
BigInteger sequenceNumber2 = new BigInteger(shard2.sequenceNumberRange().startingSequenceNumber());
result = sequenceNumber1.compareTo(sequenceNumber2);
result = sequenceNumber1.compareTo(sequenceNumber2);
}

if (result == 0) {
result = shardId1.compareTo(shardId2);
}

return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,18 @@ private int leasesPendingDeletion() {
return deletionQueue.size();
}

/**
*
* @return true if the 'Completed Lease Stopwatch' has elapsed more time than the 'Completed Lease Cleanup Interval'
*/
private boolean timeToCheckForCompletedShard() {
return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis;
}

/**
*
* @return true if the 'Garbage Lease Stopwatch' has elapsed more time than the 'Garbage Lease Cleanup Interval'
*/
private boolean timeToCheckForGarbageShard() {
return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
}
Expand Down Expand Up @@ -230,6 +238,15 @@ private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws Dep
return true;
}

/**
* Check if the all of the parent shards for a given lease have an ongoing lease. If any one parent still has a lease, return false. Otherwise return true
* @param lease
* @param shardInfo
* @return
* @throws DependencyException
* @throws ProvisionedThroughputException
* @throws InvalidStateException
*/
private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
for (String parentShard : lease.parentShardIds()) {
final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @No
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();
}else{
} else {
request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions()).provisionedThroughput(throughput)
.build();
Expand Down Expand Up @@ -429,7 +429,7 @@ private List<Lease> list(Integer limit, Integer maxPages, StreamIdentifier strea
@Override
public boolean createLeaseIfNotExists(@NonNull final Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Creating lease {}", lease);
log.debug("Creating lease: {}", lease);

PutItemRequest request = PutItemRequest.builder().tableName(table).item(serializer.toDynamoRecord(lease))
.expected(serializer.getDynamoNonexistantExpectation()).build();
Expand All @@ -452,6 +452,7 @@ public boolean createLeaseIfNotExists(@NonNull final Lease lease)
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("create", lease.leaseKey(), e);
}
log.info("Created lease: {}",lease);
return true;
}

Expand All @@ -476,7 +477,7 @@ public Lease getLease(@NonNull final String leaseKey)
return null;
} else {
final Lease lease = serializer.fromDynamoRecord(dynamoRecord);
log.debug("Got lease {}", lease);
log.debug("Retrieved lease: {}", lease);
return lease;
}
} catch (ExecutionException e) {
Expand Down Expand Up @@ -535,6 +536,7 @@ public boolean renewLease(@NonNull final Lease lease)
}

lease.leaseCounter(lease.leaseCounter() + 1);
log.debug("Renewed lease with key {}", lease.leaseKey());
return true;
}

Expand Down Expand Up @@ -582,6 +584,8 @@ public boolean takeLease(@NonNull final Lease lease, @NonNull final String owner
lease.ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint() + 1);
}

log.info("Transferred lease {} ownership from {} to {}", lease.leaseKey(), oldOwner, owner);

return true;
}

Expand Down Expand Up @@ -620,6 +624,8 @@ public boolean evictLease(@NonNull final Lease lease)

lease.leaseOwner(null);
lease.leaseCounter(lease.leaseCounter() + 1);

log.info("Evicted lease with leaseKey {}", lease.leaseKey());
return true;
}

Expand Down Expand Up @@ -648,6 +654,7 @@ public void deleteAll() throws DependencyException, InvalidStateException, Provi
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("deleteAll", lease.leaseKey(), e);
}
log.debug("Deleted lease {} from table {}", lease.leaseKey(), table);
}
}

Expand Down Expand Up @@ -675,6 +682,8 @@ public void deleteLease(@NonNull final Lease lease)
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("delete", lease.leaseKey(), e);
}

log.info("Deleted lease with leaseKey {}", lease.leaseKey());
}

/**
Expand All @@ -683,7 +692,7 @@ public void deleteLease(@NonNull final Lease lease)
@Override
public boolean updateLease(@NonNull final Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
log.debug("Updating lease {}", lease);
log.debug("Updating lease: {}", lease);

final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ConditionalCheckFailedException.class, t -> t);
Expand Down Expand Up @@ -711,6 +720,7 @@ public boolean updateLease(@NonNull final Lease lease)
}

lease.leaseCounter(lease.leaseCounter() + 1);
log.info("Updated lease {}.", lease.leaseKey());
return true;
}

Expand Down Expand Up @@ -738,6 +748,8 @@ public void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField)
} catch (DynamoDbException | TimeoutException e) {
throw convertAndRethrowExceptions("update", lease.leaseKey(), e);
}

log.info("Updated lease without expectation {}.", lease);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public Lease getCurrentlyHeldLease(String leaseKey) {

/**
* Internal method to return a lease with a specific lease key only if we currently hold it.
*
*
* @param leaseKey key of lease to return
* @param now current timestamp for old-ness checking
* @return non-authoritative copy of the held lease, or null if we don't currently hold it
Expand Down Expand Up @@ -309,6 +309,7 @@ public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String o
long startTime = System.currentTimeMillis();
boolean success = false;
try {
log.info("Updating lease from {} to {}", authoritativeLease, lease);
synchronized (authoritativeLease) {
authoritativeLease.update(lease);
boolean updatedLease = leaseRefresher.updateLease(authoritativeLease);
Expand All @@ -325,15 +326,15 @@ public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String o
/*
* Remove only if the value currently in the map is the same as the authoritative lease. We're
* guarding against a pause after the concurrency token check above. It plays out like so:
*
*
* 1) Concurrency token check passes
* 2) Pause. Lose lease, re-acquire lease. This requires at least one lease counter update.
* 3) Unpause. leaseRefresher.updateLease fails conditional write due to counter updates, returns
* false.
* 4) ownedLeases.remove(key, value) doesn't do anything because authoritativeLease does not
* .equals() the re-acquired version in the map on the basis of lease counter. This is what we want.
* If we just used ownedLease.remove(key), we would have pro-actively removed a lease incorrectly.
*
*
* Note that there is a subtlety here - Lease.equals() deliberately does not check the concurrency
* token, but it does check the lease counter, so this scheme works.
*/
Expand Down
Loading