Skip to content

Commit

Permalink
Improve condition
Browse files Browse the repository at this point in the history
  • Loading branch information
ngocnhan-tran1996 committed Oct 19, 2024
1 parent f330752 commit 366f9da
Showing 1 changed file with 38 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@
*
* @author Gary Russell
* @author Christian Tzolov
* @author Ngoc Nhan
* @since 2.4
*
*/
Expand Down Expand Up @@ -107,29 +108,29 @@ public RabbitStreamTemplate(Environment environment, String streamName) {


private Producer createOrGetProducer() {
this.lock.lock();
try {
if (this.producer == null) {
ProducerBuilder builder = this.environment.producerBuilder();
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
}
if (this.producer == null) {
this.lock.lock();
try {
ProducerBuilder builder = this.environment.producerBuilder();
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
}
}
finally {
this.lock.unlock();
}
return this.producer;
}
finally {
this.lock.unlock();
}
return this.producer;
}

@Override
Expand Down Expand Up @@ -305,24 +306,13 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
}
else {
int code = confStatus.getCode();
String errorMessage;
switch (code) {
case Constants.CODE_MESSAGE_ENQUEUEING_FAILED:
errorMessage = "Message Enqueueing Failed";
break;
case Constants.CODE_PRODUCER_CLOSED:
errorMessage = "Producer Closed";
break;
case Constants.CODE_PRODUCER_NOT_AVAILABLE:
errorMessage = "Producer Not Available";
break;
case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT:
errorMessage = "Publish Confirm Timeout";
break;
default:
errorMessage = "Unknown code: " + code;
break;
}
String errorMessage = switch (code) {
case Constants.CODE_MESSAGE_ENQUEUEING_FAILED -> "Message Enqueueing Failed";
case Constants.CODE_PRODUCER_CLOSED -> "Producer Closed";
case Constants.CODE_PRODUCER_NOT_AVAILABLE -> "Producer Not Available";
case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT -> "Publish Confirm Timeout";
default -> "Unknown code: " + code;
};
StreamSendException ex = new StreamSendException(errorMessage, code);
observation.error(ex);
observation.stop();
Expand All @@ -339,15 +329,15 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
*/
@Override
public void close() {
this.lock.lock();
try {
if (this.producer != null) {
this.producer.close();
this.producer = null;
if (this.producer != null) {
this.lock.lock();
try {
this.producer.close();
this.producer = null;
}
finally {
this.lock.unlock();
}
}
finally {
this.lock.unlock();
}
}

Expand Down

0 comments on commit 366f9da

Please sign in to comment.