Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update LockRenewalOperation API #14705

Merged
merged 13 commits into from
Sep 4, 2020
Prev Previous commit
Next Next commit
Update LockRenewalOperation to return a completion Mono.
  • Loading branch information
conniey committed Sep 4, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit aad313f93b0b2c3997362d5d294b4a6f6c82b6a3
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.models.LockRenewalStatus;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
@@ -30,6 +29,7 @@ class LockRenewalOperation implements AutoCloseable {
private final AtomicReference<Throwable> throwable = new AtomicReference<>();
private final AtomicReference<LockRenewalStatus> status = new AtomicReference<>(LockRenewalStatus.RUNNING);
private final MonoProcessor<Void> cancellationProcessor = MonoProcessor.create();
private final Mono<Void> completionMono;

private final String lockToken;
private final boolean isSession;
@@ -73,7 +73,31 @@ class LockRenewalOperation implements AutoCloseable {
}

this.lockedUntil.set(lockedUntil);
this.subscription = getRenewLockOperation(lockedUntil, maxLockRenewalDuration);
final Flux<Instant> renewLockOperation = getRenewLockOperation(lockedUntil, maxLockRenewalDuration);
this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until),
error -> {
logger.error("token[{}]. Error occurred while renewing lock token.", error);
status.set(LockRenewalStatus.FAILED);
throwable.set(error);
cancellationProcessor.onComplete();
}, () -> {
if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
logger.verbose("token[{}]. Renewing session lock task completed.", lockToken);
}

cancellationProcessor.onComplete();
});

this.completionMono = renewLockOperation.then();
}

/**
* Gets a mono that completes when the operation does.
*
* @return A mono that completes when the renewal operation does.
*/
Mono<Void> getCompletionOperation() {
return completionMono;
}

/**
@@ -146,10 +170,11 @@ public void close() {
* @param maxLockRenewalDuration Duration to renew lock for.
* @return The subscription for the operation.
*/
private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Duration maxLockRenewalDuration) {
private Flux<OffsetDateTime> getRenewLockOperation(OffsetDateTime initialLockedUntil,
Duration maxLockRenewalDuration) {
if (maxLockRenewalDuration.isZero()) {
status.set(LockRenewalStatus.COMPLETE);
return Disposables.single();
return Flux.empty();
}

final OffsetDateTime now = OffsetDateTime.now();
@@ -174,7 +199,6 @@ private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Dura
sink.next(initialInterval);

final Flux<Object> cancellationSignals = Flux.first(cancellationProcessor, Mono.delay(maxLockRenewalDuration));

return Flux.switchOnNext(emitterProcessor.map(interval -> Mono.delay(interval)
.thenReturn(Flux.create(s -> s.next(interval)))))
.takeUntilOther(cancellationSignals)
@@ -189,19 +213,6 @@ private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Dura

sink.next(MessageUtils.adjustServerTimeout(next));
return offsetDateTime;
})
.subscribe(until -> lockedUntil.set(until),
error -> {
logger.error("token[{}]. Error occurred while renewing lock token.", error);
status.set(LockRenewalStatus.FAILED);
throwable.set(error);
cancellationProcessor.onComplete();
}, () -> {
if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
logger.verbose("token[{}]. Renewing session lock task completed.", lockToken);
}

cancellationProcessor.onComplete();
});
});
}
}
Original file line number Diff line number Diff line change
@@ -449,7 +449,7 @@ public Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOption
* @throws IllegalArgumentException if {@code lockToken} is an empty string.
* @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed.
*/
public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) {
public Mono<Void> getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) {
if (isDisposed.get()) {
throw logger.logExceptionAsError(new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewMessageLock")));
@@ -470,7 +470,8 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m
final LockRenewalOperation operation = new LockRenewalOperation(lockToken, maxLockRenewalDuration, false,
this::renewMessageLock);
renewalContainer.addOrUpdate(lockToken, Instant.now().plus(maxLockRenewalDuration), operation);
return operation;

return operation.getCompletionOperation();
}

/**
@@ -484,7 +485,7 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m
* @throws IllegalArgumentException if {@code lockToken} is an empty string.
* @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed.
*/
public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
public Mono<Void> getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
if (isDisposed.get()) {
throw logger.logExceptionAsError(new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewSessionLock")));
@@ -506,7 +507,7 @@ public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration m
this::renewSessionLock);

renewalContainer.addOrUpdate(sessionId, Instant.now().plus(maxLockRenewalDuration), operation);
return operation;
return operation.getCompletionOperation();
}

/**