Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void onNext(T amqpChannel) {

final ConcurrentLinkedDeque<ChannelSubscriber<T>> currentSubscribers = subscribers;
logger.info("namespace[{}] entityPath[{}]: Next AMQP channel received, updating {} current "
+ "subscribers: {}", fullyQualifiedNamespace, entityPath, subscribers.size(), subscribers);
+ "subscribers", fullyQualifiedNamespace, entityPath, subscribers.size());

currentSubscribers.forEach(subscription -> subscription.onNext(amqpChannel));

Expand Down Expand Up @@ -232,7 +232,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

subscribers.add(subscriber);
logger.info("Added a subscriber {} to AMQP channel processor. Total "
logger.verbose("Added a subscriber {} to AMQP channel processor. Total "
+ "subscribers = {}", subscriber, subscribers.size());

if (!isRetryPending.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,9 @@ protected Message decodeDelivery(Delivery delivery) {
delivery.settle();
return message;
}

@Override
public String toString() {
return String.format("link name: [%s], entity path: [%s]", receiver.getName(), entityPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPat

if (isDisposed()) {
return Mono.error(logger.logExceptionAsError(new IllegalStateException(String.format(
"Cannot create send link '%s' from a closed session. entityPath[%s]", linkName, entityPath))));
"Cannot create receive link '%s' from a closed session. entityPath[%s]", linkName, entityPath))));
}

final LinkSubscription<AmqpReceiveLink> existingLink = openReceiveLinks.get(linkName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ private static Flux<Long> retry(Flux<Throwable> source, AmqpRetryPolicy retryPol
return source.zipWith(Flux.range(1, retryPolicy.getMaxRetries() + 1),
(error, attempt) -> {
if (attempt > retryPolicy.getMaxRetries()) {
LOGGER.warning("Retry attempts are exhausted. Current: {}. Max: {}.", retryPolicy.getMaxRetries(),
attempt);

LOGGER.warning("Retry attempts are exhausted. Current: {}. Max: {}.", attempt,
retryPolicy.getMaxRetries());
throw Exceptions.propagate(error);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

import java.time.Duration;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -111,6 +109,7 @@ public boolean isTerminated() {
@Override
public void onSubscribe(Subscription subscription) {
Objects.requireNonNull(subscription, "'subscription' cannot be null");
logger.info("Setting new subscription for receive link processor");

if (!Operators.setOnce(UPSTREAM, this, subscription)) {
throw logger.logExceptionAsError(new IllegalStateException("Cannot set upstream twice."));
Expand Down Expand Up @@ -176,7 +175,7 @@ public void onNext(AmqpReceiveLink next) {
},
() -> {
if (parentConnection.isDisposed() || isTerminated()
|| upstream == Operators.cancelledSubscription()) {
|| UPSTREAM.get(this) == Operators.cancelledSubscription()) {
logger.info("Terminal state reached. Disposing of link processor.");
dispose();
} else {
Expand All @@ -187,7 +186,6 @@ public void onNext(AmqpReceiveLink next) {
if (existing != null) {
existing.dispose();
}

requestUpstream();
}
}),
Expand Down Expand Up @@ -258,36 +256,18 @@ public void subscribe(CoreSubscriber<? super Message> actual) {
@Override
public void onError(Throwable throwable) {
Objects.requireNonNull(throwable, "'throwable' is required.");
logger.info("Error on receive link {}", currentLink, throwable);

if (isTerminated() || isCancelled) {
logger.info("AmqpReceiveLinkProcessor is terminated. Cannot process another error.", throwable);
Operators.onErrorDropped(throwable, currentContext());
return;
}

final int attempt = retryAttempts.incrementAndGet();
final Duration retryInterval = retryPolicy.calculateRetryDelay(throwable, attempt);

final AmqpReceiveLink link = currentLink;
final String linkName = link != null ? link.getLinkName() : "n/a";
final String entityPath = link != null ? link.getEntityPath() : "n/a";

if (retryInterval != null && !parentConnection.isDisposed() && upstream != Operators.cancelledSubscription()) {
logger.warning("linkName[{}] entityPath[{}]. Transient error occurred. Attempt: {}. Retrying after {} ms.",
linkName, entityPath, attempt, retryInterval.toMillis(), throwable);

retrySubscription = Mono.delay(retryInterval).subscribe(i -> requestUpstream());

return;
}

if (parentConnection.isDisposed()) {
logger.info("Parent connection is disposed. Not reopening on error.");
}

logger.warning("linkName[{}] entityPath[{}]. Non-retryable error occurred in AMQP receive link.",
linkName, entityPath, throwable);

lastError = throwable;
isTerminated.set(true);

Expand All @@ -304,11 +284,13 @@ public void onError(Throwable throwable) {
*/
@Override
public void onComplete() {
this.upstream = Operators.cancelledSubscription();
logger.info("Receive link completed {}", currentLink);
UPSTREAM.set(this, Operators.cancelledSubscription());
}

@Override
public void dispose() {
logger.info("Disposing receive link {}", currentLink);
if (isTerminated.getAndSet(true)) {
return;
}
Expand Down Expand Up @@ -359,10 +341,10 @@ private void requestUpstream() {
if (isTerminated()) {
logger.info("Processor is terminated. Not requesting another link.");
return;
} else if (upstream == null) {
} else if (UPSTREAM.get(this) == null) {
logger.info("There is no upstream. Not requesting another link.");
return;
} else if (upstream == Operators.cancelledSubscription()) {
} else if (UPSTREAM.get(this) == Operators.cancelledSubscription()) {
logger.info("Upstream is cancelled or complete. Not requesting another link.");
return;
}
Expand All @@ -375,7 +357,7 @@ private void requestUpstream() {
}

logger.info("Requesting a new AmqpReceiveLink from upstream.");
upstream.request(1L);
UPSTREAM.get(this).request(1L);
}

private void onDispose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,44 +271,6 @@ void newLinkOnClose() {
Assertions.assertNull(processor.getError());
}

/**
* Verifies that we can get the next AMQP link when the first one encounters a retryable error.
*/
@Test
void newLinkOnRetryableError() {
// Arrange
final AmqpReceiveLink[] connections = new AmqpReceiveLink[]{link1, link2};

final AmqpReceiveLinkProcessor processor = createSink(connections).subscribeWith(linkProcessor);
final FluxSink<AmqpEndpointState> endpointSink = endpointProcessor.sink();

when(link2.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE)));
when(link2.receive()).thenReturn(Flux.just(message2));

final AmqpException amqpException = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-error",
new AmqpErrorContext("test-namespace"));
when(retryPolicy.calculateRetryDelay(amqpException, 1)).thenReturn(Duration.ofSeconds(1));

// Act & Assert
// Verify that we get the first connection.
StepVerifier.create(processor)
.then(() -> {
endpointSink.next(AmqpEndpointState.ACTIVE);
messageProcessorSink.next(message1);
})
.expectNext(message1)
.then(() -> {
endpointSink.error(amqpException);
})
.expectNext(message2)
.thenCancel()
.verify();

Assertions.assertTrue(processor.isTerminated());
Assertions.assertFalse(processor.hasError());
Assertions.assertNull(processor.getError());
}

/**
* Verifies that an error is propagated when the first connection encounters a non-retryable error.
*/
Expand Down Expand Up @@ -387,54 +349,6 @@ void noSubscribersWhenTerminated() {
verifyZeroInteractions(subscription);
}

/**
* Verifies it keeps trying to get a link and stops after retries are exhausted.
*/
@Test
void retriesUntilExhausted() {
// Arrange
final Duration delay = Duration.ofSeconds(1);
final AmqpReceiveLink[] connections = new AmqpReceiveLink[]{link1, link2, link3};
final Message message3 = mock(Message.class);

final AmqpReceiveLinkProcessor processor = createSink(connections).subscribeWith(linkProcessor);
final FluxSink<AmqpEndpointState> endpointSink = endpointProcessor.sink();

final DirectProcessor<AmqpEndpointState> link2StateProcessor = DirectProcessor.create();
final FluxSink<AmqpEndpointState> link2StateSink = link2StateProcessor.sink();

when(link2.getEndpointStates()).thenReturn(link2StateProcessor);
when(link2.receive()).thenReturn(Flux.never());

when(link3.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE)));
when(link3.receive()).thenReturn(Flux.never());

// Simulates two busy signals, but our retry policy says to try only once.
final AmqpException amqpException = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-error",
new AmqpErrorContext("test-namespace"));
final AmqpException amqpException2 = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-error",
new AmqpErrorContext("test-namespace"));
when(retryPolicy.calculateRetryDelay(amqpException, 1)).thenReturn(delay);
when(retryPolicy.calculateRetryDelay(amqpException2, 2)).thenReturn(null);

// Act & Assert
// Verify that we get the first connection.
StepVerifier.create(processor)
.then(() -> {
endpointSink.next(AmqpEndpointState.ACTIVE);
messageProcessorSink.next(message1);
})
.expectNext(message1)
.then(() -> endpointSink.error(amqpException))
.thenAwait(delay)
.then(() -> link2StateSink.error(amqpException2))
.expectErrorSatisfies(error -> Assertions.assertSame(amqpException2, error))
.verify();

Assertions.assertTrue(processor.isTerminated());
Assertions.assertTrue(processor.hasError());
Assertions.assertSame(amqpException2, processor.getError());
}

/**
* Does not request another link when parent connection is closed.
Expand Down