From 1d68e870a58e31b1af71d8aaf87f073c891da2fe Mon Sep 17 00:00:00 2001 From: laststem Date: Fri, 18 Oct 2024 23:07:49 +0900 Subject: [PATCH 1/2] Make use compareAndSet instead lock --- .../stream/producer/RabbitStreamTemplate.java | 109 +++++++----------- 1 file changed, 40 insertions(+), 69 deletions(-) diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java index 0751e7dbd3..b201fdf010 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java @@ -17,8 +17,7 @@ package org.springframework.rabbit.stream.producer; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.springframework.amqp.core.Message; @@ -55,6 +54,7 @@ * * @author Gary Russell * @author Christian Tzolov + * @author Jeonggi Kim * @since 2.4 * */ @@ -62,8 +62,6 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, Application protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR - private final Lock lock = new ReentrantLock(); - private ApplicationContext applicationContext; private final Environment environment; @@ -74,9 +72,9 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, Application private MessageConverter messageConverter = new SimpleMessageConverter(); - private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter(); + private StreamMessageConverter streamMessageConverter = new DefaultStreamMessageConverter(); - private boolean streamConverterSet; + private final AtomicBoolean producerInitialized = new AtomicBoolean(false); private Producer producer; @@ -107,29 +105,23 @@ public RabbitStreamTemplate(Environment environment, String streamName) { private Producer createOrGetProducer() { - this.lock.lock(); - try { - if (this.producer == null) { - ProducerBuilder builder = this.environment.producerBuilder(); - if (this.superStreamRouting == null) { - builder.stream(this.streamName); - } - else { - builder.superStream(this.streamName) - .routing(this.superStreamRouting); - } - this.producerCustomizer.accept(this.beanName, builder); - this.producer = builder.build(); - if (!this.streamConverterSet) { - ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier( - () -> this.producer.messageBuilder()); - } + if (this.producerInitialized.compareAndSet(false, true)) { + ProducerBuilder builder = this.environment.producerBuilder(); + if (this.superStreamRouting == null) { + builder.stream(this.streamName); + } + else { + builder.superStream(this.streamName) + .routing(this.superStreamRouting); + } + this.producerCustomizer.accept(this.beanName, builder); + this.producer = builder.build(); + if (this.streamMessageConverter instanceof DefaultStreamMessageConverter) { + ((DefaultStreamMessageConverter) this.streamMessageConverter).setBuilderSupplier( + () -> this.producer.messageBuilder()); } - return this.producer; - } - finally { - this.lock.unlock(); } + return this.producer; } @Override @@ -139,13 +131,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws @Override public void setBeanName(String name) { - this.lock.lock(); - try { - this.beanName = name; - } - finally { - this.lock.unlock(); - } + throwIfProducerAlreadyInitialized(); + this.beanName = name; } /** @@ -154,13 +141,8 @@ public void setBeanName(String name) { * @since 3.0 */ public void setSuperStreamRouting(Function superStreamRouting) { - this.lock.lock(); - try { - this.superStreamRouting = superStreamRouting; - } - finally { - this.lock.unlock(); - } + throwIfProducerAlreadyInitialized(); + this.superStreamRouting = superStreamRouting; } @@ -176,18 +158,12 @@ public void setMessageConverter(MessageConverter messageConverter) { /** * Set a converter to convert from {@link Message} to {@link com.rabbitmq.stream.Message} * for {@link #send(Message)} and {@link #convertAndSend(Object)} methods. - * @param streamConverter the converter. + * @param streamMessageConverter the converter. */ - public void setStreamConverter(StreamMessageConverter streamConverter) { - Assert.notNull(streamConverter, "'streamConverter' cannot be null"); - this.lock.lock(); - try { - this.streamConverter = streamConverter; - this.streamConverterSet = true; - } - finally { - this.lock.unlock(); - } + public void setStreamConverter(StreamMessageConverter streamMessageConverter) { + Assert.notNull(streamMessageConverter, "'streamMessageConverter' cannot be null"); + throwIfProducerAlreadyInitialized(); + this.streamMessageConverter = streamMessageConverter; } /** @@ -196,12 +172,13 @@ public void setStreamConverter(StreamMessageConverter streamConverter) { */ public void setProducerCustomizer(ProducerCustomizer producerCustomizer) { Assert.notNull(producerCustomizer, "'producerCustomizer' cannot be null"); - this.lock.lock(); - try { - this.producerCustomizer = producerCustomizer; - } - finally { - this.lock.unlock(); + throwIfProducerAlreadyInitialized(); + this.producerCustomizer = producerCustomizer; + } + + private void throwIfProducerAlreadyInitialized() { + if (producerInitialized.get()) { + throw new IllegalStateException("producer is already initialized"); } } @@ -223,14 +200,14 @@ public MessageConverter messageConverter() { @Override public StreamMessageConverter streamMessageConverter() { - return this.streamConverter; + return this.streamMessageConverter; } @Override public CompletableFuture send(Message message) { CompletableFuture future = new CompletableFuture<>(); - observeSend(this.streamConverter.fromMessage(message), future); + observeSend(this.streamMessageConverter.fromMessage(message), future); return future; } @@ -339,15 +316,9 @@ private ConfirmationHandler handleConfirm(CompletableFuture future, Obs */ @Override public void close() { - this.lock.lock(); - try { - if (this.producer != null) { - this.producer.close(); - this.producer = null; - } - } - finally { - this.lock.unlock(); + Producer producer = this.producer; + if (this.producerInitialized.compareAndSet(true, false)) { + producer.close(); } } From fac0cddca2a84d3c2cf88dcc2e910a659f387681 Mon Sep 17 00:00:00 2001 From: laststem Date: Sat, 19 Oct 2024 00:07:43 +0900 Subject: [PATCH 2/2] fix ci --- .../rabbit/stream/producer/RabbitStreamTemplate.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java index b201fdf010..04bc58cf67 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java @@ -177,7 +177,7 @@ public void setProducerCustomizer(ProducerCustomizer producerCustomizer) { } private void throwIfProducerAlreadyInitialized() { - if (producerInitialized.get()) { + if (this.producerInitialized.get()) { throw new IllegalStateException("producer is already initialized"); } }