Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add a method to support renewing a lock in LockService class #74

Merged
merged 2 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,38 @@ public void deleteLock(final String lockId, ActionListener<Boolean> listener) {
}));
}

/**
* Attempt to renew a lock.
* It is used to give an extended valid period to a lock. The start time of the lock will be updated to
* the current time when the method get called, and the duration of the lock remains.
* It works as long as the lock is not acquired by others, and no matter if the lock is expired of not.
*
* @param lock a {@code LockModel} to be renewed.
* @param listener a {@code ActionListener} that has onResponse and onFailure that is used to
* return the renewed lock if renewal succeed, otherwise return null.
*/
public void renewLock(final LockModel lock, ActionListener<LockModel> listener) {
if (lock == null) {
logger.debug("Lock is null. Nothing to renew.");
listener.onResponse(null);
} else {
logger.debug("Renewing lock: {}. The lock was acquired or renewed on: {}, and the duration was {} sec.",
lock, lock.getLockTime(), lock.getLockDurationSeconds());
final LockModel lockToRenew = new LockModel(lock, getNow(), lock.getLockDurationSeconds(), false);
updateLock(lockToRenew, ActionListener.wrap(
renewedLock -> {
logger.debug("Renewed lock: {}. It is supposed to be valid for another {} sec from {}.",
renewedLock, renewedLock.getLockDurationSeconds(), renewedLock.getLockTime());
listener.onResponse(renewedLock);
},
exception -> {
logger.debug("Failed to renew lock: {}.", lock);
listener.onFailure(exception);
}
));
}
}

private Instant getNow() {
return testInstant != null ? testInstant : Instant.now();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,4 +449,45 @@ public void testMultiThreadAcquireLock() throws Exception {
));
assertTrue("Test timed out - possibly leaked into other tests", latch.await(30L, TimeUnit.SECONDS));
}

public void testRenewLock() throws Exception {
String uniqSuffix = "_lock_renew";
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);

lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
// Set the time of LockService (the 'lockTime' of acquired locks) to a fixed time.
Instant now = Instant.now();
lockService.setTime(now);
lockService.renewLock(lock, ActionListener.wrap(
renewedLock -> {
assertNotNull("Expected to successfully renew lock", renewedLock);
assertEquals("lock_time is expected to be the renewal time.", now, renewedLock.getLockTime());
assertEquals("lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(), renewedLock.getLockDurationSeconds());
lockService.release(lock, ActionListener.wrap(
released -> {
assertTrue("Failed to release lock.", released);
lockService.deleteLock(lock.getLockId(), ActionListener.wrap(
deleted -> {
assertTrue("Failed to delete lock.", deleted);
latch.countDown();
},
exception -> fail(exception.getMessage())
));
},
exception -> fail(exception.getMessage())
));
},
exception -> fail(exception.getMessage())
));
},
exception -> fail(exception.getMessage())
));
latch.await(5L, TimeUnit.SECONDS);
}
}