Skip to content

Commit

Permalink
Fixed NPE in LeaseCleanupManager. (#1061)
Browse files Browse the repository at this point in the history
  • Loading branch information
stair-aws authored Mar 10, 2023
1 parent 27b166c commit 504ea10
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public void shutdown() {
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
final Lease lease = leasePendingDeletion.lease();
if (lease == null) {
log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.",
lease.leaseKey());
log.warn("Cannot enqueue {} for {} as instance doesn't hold the lease for that shard.",
leasePendingDeletion.shardInfo(), leasePendingDeletion.streamIdentifier());
} else {
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
if (!deletionQueue.add(leasePendingDeletion)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,14 @@
@RunWith(MockitoJUnitRunner.class)
public class LeaseCleanupManagerTest {

private ShardInfo shardInfo;
private StreamIdentifier streamIdentifier;
private String concurrencyToken = "1234";

private String shardId = "shardId";
private String splitParent = "splitParent";
private String mergeParent1 = "mergeParent-1";
private String mergeParent2 = "mergeParent-2";

private Duration maxFutureWait = Duration.ofSeconds(1);
private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
private static final ShardInfo SHARD_INFO = new ShardInfo("shardId", "concurrencyToken",
Collections.emptySet(), ExtendedSequenceNumber.LATEST);

private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");

private final long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
private final long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
private final long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
private boolean cleanupLeasesOfCompletedShards = true;
private LeaseCleanupManager leaseCleanupManager;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
Expand All @@ -73,9 +68,6 @@ public class LeaseCleanupManagerTest {

@Before
public void setUp() throws Exception {
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
streamIdentifier = StreamIdentifier.singleStreamInstance("streamName");
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
garbageLeaseCleanupIntervalMillis);
Expand Down Expand Up @@ -112,10 +104,8 @@ public final void testSubsequentShutdowns() {
*/
@Test
public final void testParentShardLeaseDeletedSplitCase() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);

verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1);
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForSplit(),
ExtendedSequenceNumber.LATEST, 1);
}

/**
Expand All @@ -124,10 +114,8 @@ public final void testParentShardLeaseDeletedSplitCase() throws Exception {
*/
@Test
public final void testParentShardLeaseDeletedMergeCase() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);

verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1);
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForMerge(),
ExtendedSequenceNumber.LATEST, 1);
}

/**
Expand All @@ -136,15 +124,14 @@ public final void testParentShardLeaseDeletedMergeCase() throws Exception {
*/
@Test
public final void testNoLeasesDeletedWhenNotEnabled() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
cleanupLeasesOfCompletedShards = false;

leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
garbageLeaseCleanupIntervalMillis);

verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0);
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForSplit(),
ExtendedSequenceNumber.LATEST, 0);
}

/**
Expand All @@ -155,10 +142,8 @@ public final void testNoLeasesDeletedWhenNotEnabled() throws Exception {
public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception {
List<ChildShard> childShards = childShardsForSplit();

shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);

verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0);
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShards,
ExtendedSequenceNumber.LATEST, false, 0);
}

/**
Expand All @@ -179,46 +164,48 @@ public final void testParentShardLeaseNotDeletedWhenChildIsAtTimestamp() throws
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP);
}

private final void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber)
private void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber)
throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);

verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0);
verifyExpectedDeletedLeasesCompletedShardCase(SHARD_INFO, childShardsForMerge(), extendedSequenceNumber, 0);
}

/**
* Tests that if a lease's parents are still present, we do not delete the lease.
*/
@Test
public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.singleton("parent"),
final ShardInfo shardInfo = new ShardInfo("shardId-0", "concurrencyToken", Collections.singleton("parent"),
ExtendedSequenceNumber.LATEST);

verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0);
}

/**
* Verify {@link NullPointerException} is not thrown when a null lease is enqueued.
*/
@Test
public void testEnqueueNullLease() {
leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(null, SHARD_INFO));
}

/**
* Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found.
*/
@Test
public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
final Lease heldLease = LeaseHelper.createLease(SHARD_INFO.shardId(), "leaseOwner",
Collections.singleton("parentShardId"));

testLeaseDeletedWhenShardDoesNotExist(heldLease);
}

/**
* Tests ResourceNotFound case when completed lease cleanup is disabled.
* @throws Exception
*/
@Test
public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
final Lease heldLease = LeaseHelper.createLease(SHARD_INFO.shardId(), "leaseOwner",
Collections.singleton("parentShardId"));

cleanupLeasesOfCompletedShards = false;

Expand All @@ -229,32 +216,31 @@ public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseD
testLeaseDeletedWhenShardDoesNotExist(heldLease);
}

public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception {
private void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception {
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
when(leaseCoordinator.getCurrentlyHeldLease(SHARD_INFO.shardId())).thenReturn(heldLease);
when(shardDetector.getChildShards(any(String.class))).thenThrow(ResourceNotFoundException.class);
when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease);

leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo, shardDetector));
leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(heldLease, SHARD_INFO));
leaseCleanupManager.cleanupLeases();

verify(shardDetector, times(1)).getChildShards(shardInfo.shardId());
verify(leaseRefresher, times(1)).deleteLease(heldLease);
verify(shardDetector).getChildShards(SHARD_INFO.shardId());
verify(leaseRefresher).deleteLease(heldLease);
}

private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
ExtendedSequenceNumber extendedSequenceNumber,
int expectedDeletedLeases) throws Exception {
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases);
}

private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
ExtendedSequenceNumber extendedSequenceNumber,
boolean childShardLeasesPresent,
int expectedDeletedLeases) throws Exception {

final Lease lease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", shardInfo.parentShardIds(),
childShards.stream().map(c -> c.shardId()).collect(Collectors.toSet()));
childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()));
final List<Lease> childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease(
ShardInfo.getLeaseKey(shardInfo, c.shardId()), "leaseOwner", Collections.singleton(shardInfo.shardId()),
Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList());
Expand All @@ -273,15 +259,15 @@ private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shard
}
}

leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo, shardDetector));
leaseCleanupManager.enqueueForDeletion(createLeasePendingDeletion(lease, shardInfo));
leaseCleanupManager.cleanupLeases();

verify(shardDetector, times(1)).getChildShards(shardInfo.shardId());
verify(shardDetector).getChildShards(shardInfo.shardId());
verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class));
}

private List<ChildShard> childShardsForSplit() {
List<String> parentShards = Arrays.asList(splitParent);
final List<String> parentShards = Collections.singletonList("splitParent");

ChildShard leftChild = ChildShard.builder()
.shardId("leftChild")
Expand All @@ -294,11 +280,11 @@ private List<ChildShard> childShardsForSplit() {
.hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"))
.build();

return Arrays.asList(leftChild, rightChild);
return Arrays.asList(leftChild, rightChild);
}

private List<ChildShard> childShardsForMerge() {
List<String> parentShards = Arrays.asList(mergeParent1, mergeParent2);
final List<String> parentShards = Arrays.asList("mergeParent1", "mergeParent2");

ChildShard child = ChildShard.builder()
.shardId("onlyChild")
Expand All @@ -308,4 +294,8 @@ private List<ChildShard> childShardsForMerge() {

return Collections.singletonList(child);
}

private LeasePendingDeletion createLeasePendingDeletion(final Lease lease, final ShardInfo shardInfo) {
return new LeasePendingDeletion(STREAM_IDENTIFIER, lease, shardInfo, shardDetector);
}
}

0 comments on commit 504ea10

Please sign in to comment.