Skip to content

Commit

Permalink
Added few comments and todo
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Dec 18, 2023
1 parent 55f9541 commit 8495648
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class UnsignedCounterBarrier {
class IncomingMessagesUnsignedCounterBarrier {
private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1)
private final Lock awaitLock = new ReentrantLock();
private final Condition isZero = awaitLock.newCondition();

private static final Log logger = LogFactory.getLog(UnsignedCounterBarrier.class);
private static final Log logger = LogFactory.getLog(IncomingMessagesUnsignedCounterBarrier.class);

public UnsignedCounterBarrier(long initialValue) {
public IncomingMessagesUnsignedCounterBarrier(long initialValue) {
counter = new AtomicLong(initialValue);
}

public UnsignedCounterBarrier() {
public IncomingMessagesUnsignedCounterBarrier() {
this(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reaso
if (metadata != null) {
outcome = metadata.get(SettleMetadata.class)
.map(SettleMetadata::getOutcome)
.orElseGet(() -> messageOutCome != null ? messageOutCome
: MessageAcknowledgementConfiguration.Outcome.FAILED /* TODO get outcome from reason */);
.orElseGet(() -> messageOutCome /* TODO get outcome from reason */);
} else {
outcome = messageOutCome != null ? messageOutCome
: MessageAcknowledgementConfiguration.Outcome.FAILED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public class SolaceInboundMessage<T> implements ContextAwareMessage<T>, Metadata
private final SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler;
private final SolaceConnectorIncomingConfiguration ic;
private final T payload;
private final UnsignedCounterBarrier unacknowledgedMessageTracker;
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker;

private Metadata metadata;

public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler,
SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler,
SolaceConnectorIncomingConfiguration ic, UnsignedCounterBarrier unacknowledgedMessageTracker) {
SolaceConnectorIncomingConfiguration ic, IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) {
this.msg = message;
this.unacknowledgedMessageTracker = unacknowledgedMessageTracker;
this.payload = (T) convertPayload();
Expand Down Expand Up @@ -103,7 +103,7 @@ public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {
.atMost(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts())
.onFailure().transform((throwable -> {
SolaceLogging.log.unsuccessfulToTopic(ic.getConsumerQueueErrorTopic().get(), ic.getChannel());
throw new RuntimeException(throwable);
throw new RuntimeException(throwable); // TODO How to catch this exception in tests
}))
.await().atMost(Duration.ofSeconds(30));

Expand All @@ -115,14 +115,14 @@ public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {

MessageAcknowledgementConfiguration.Outcome outcome = ic.getConsumerQueueEnableNacks()
&& ic.getConsumerQueueDiscardMessagesOnFailure() && solaceErrorTopicPublisherHandler == null
? MessageAcknowledgementConfiguration.Outcome.REJECTED
: MessageAcknowledgementConfiguration.Outcome.FAILED;
? MessageAcknowledgementConfiguration.Outcome.REJECTED // will move message to DMQ is enabled on queue & message
: MessageAcknowledgementConfiguration.Outcome.FAILED; // will redeliver the message
if (outcome == MessageAcknowledgementConfiguration.Outcome.REJECTED) {
this.unacknowledgedMessageTracker.decrement();
}
return ic.getConsumerQueueEnableNacks()
? nackHandler.handle(this, reason, nackMetadata, outcome)
: Uni.createFrom().voidItem().subscribeAsCompletionStage();
: Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO Disconnect and reconnect the receiver in order to redeliver the message. Required when nacks are not supported by the broker version.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private long waitTimeout = -1;

// Assuming we won't ever exceed the limit of an unsigned long...
private final UnsignedCounterBarrier unacknowledgedMessageTracker = new UnsignedCounterBarrier();
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier();

public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
this.channel = ic.getChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class UnsignedCounterBarrier {
class OutgoingMessagesUnsignedCounterBarrier {
private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1)
private final Lock awaitLock = new ReentrantLock();
private final Condition isZero = awaitLock.newCondition();

private static final Log logger = LogFactory.getLog(UnsignedCounterBarrier.class);
private static final Log logger = LogFactory.getLog(OutgoingMessagesUnsignedCounterBarrier.class);

public UnsignedCounterBarrier(long initialValue) {
public OutgoingMessagesUnsignedCounterBarrier(long initialValue) {
counter = new AtomicLong(initialValue);
}

public UnsignedCounterBarrier() {
public OutgoingMessagesUnsignedCounterBarrier() {
this(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class SolaceOutgoingChannel
private long waitTimeout = -1;

// Assuming we won't ever exceed the limit of an unsigned long...
private final UnsignedCounterBarrier publishedMessagesTracker = new UnsignedCounterBarrier();
private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier();

public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration oc, MessagingService solace) {
this.channel = oc.getChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,26 @@ Message<?> consumeAndPublish(SolaceInboundMessage<?> p) {
*
* @param p
*/
@Incoming("dynamic-destination-in")
@Outgoing("dynamic-destination-out")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
Message<?> consumeAndPublishToDynamicTopic(SolaceInboundMessage<?> p) {
Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8));
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message
.createPubSubOutboundMetadata();
Message<?> outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> {
CompletableFuture completableFuture = new CompletableFuture();
p.ack();
completableFuture.complete(null);
return completableFuture;
}, (throwable) -> {
CompletableFuture completableFuture = new CompletableFuture();
p.nack(throwable, p.getMetadata());
completableFuture.complete(null);
return completableFuture;
});
return outboundMessage;
}
// @Incoming("dynamic-destination-in")
// @Outgoing("dynamic-destination-out")
// @Acknowledgment(Acknowledgment.Strategy.MANUAL)
// Message<?> consumeAndPublishToDynamicTopic(SolaceInboundMessage<?> p) {
// Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8));
// SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
// .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message
// .createPubSubOutboundMetadata();
// Message<?> outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> {
// CompletableFuture completableFuture = new CompletableFuture();
// p.ack();
// completableFuture.complete(null);
// return completableFuture;
// }, (throwable) -> {
// CompletableFuture completableFuture = new CompletableFuture();
// p.nack(throwable, p.getMetadata());
// completableFuture.complete(null);
// return completableFuture;
// });
// return outboundMessage;
// }

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
quarkus.solace.host=
quarkus.solace.vpn=
quarkus.solace.authentication.basic.username=
quarkus.solace.authentication.basic.password=
quarkus.solace.host=tcps://mr-connection-h0zr2jc6v7f.messaging.solace.cloud:55443
quarkus.solace.vpn=sjthotak_solace
quarkus.solace.authentication.basic.username=solace-cloud-client
quarkus.solace.authentication.basic.password=qu03808nfjfprlk3ck458u7bv4

mp.messaging.outgoing.hello-out.connector=quarkus-solace
mp.messaging.outgoing.hello-out.producer.topic=
mp.messaging.outgoing.hello-out.producer.topic=quarkus/static/topic
#mp.messaging.outgoing.hello-out.producer.back-pressure.strategy=wait
#mp.messaging.outgoing.hello-out.producer.back-pressure.buffer-capacity=1
#mp.messaging.outgoing.hello-out.producer.waitForPublishReceipt=false

mp.messaging.incoming.hello-in.connector=quarkus-solace
mp.messaging.incoming.hello-in.consumer.queue.enable-nacks=true
mp.messaging.incoming.hello-in.consumer.queue.name=
mp.messaging.incoming.hello-in.consumer.queue.name=queue.orders.outgoing
mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive
mp.messaging.incoming.hello-in.consumer.queue.discard-messages-on-failure=false
mp.messaging.incoming.hello-in.consumer.queue.publish-to-error-topic-on-failure=true
mp.messaging.incoming.hello-in.consumer.queue.error.topic=solace/quarkus/error

mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace
mp.messaging.incoming.dynamic-destination-in.consumer.queue.enable-nacks=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=
mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=queue.orders.outgoing
mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive
mp.messaging.incoming.dynamic-destination-in.consumer.queue.discard-messages-on-failure=false
mp.messaging.incoming.dynamic-destination-in.consumer.queue.publish-to-error-topic-on-failure=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.error.topic=solace/quarkus/error

mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace
mp.messaging.outgoing.dynamic-destination-out.producer.topic=
mp.messaging.outgoing.dynamic-destination-out.producer.topic=quarkus/dynamic/topic

0 comments on commit 8495648

Please sign in to comment.