Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use compareAndSet instead lock #2862

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package org.springframework.rabbit.stream.producer;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import org.springframework.amqp.core.Message;
Expand Down Expand Up @@ -55,15 +54,14 @@
*
* @author Gary Russell
* @author Christian Tzolov
* @author Jeonggi Kim
* @since 2.4
*
*/
public class RabbitStreamTemplate implements RabbitStreamOperations, ApplicationContextAware, BeanNameAware {

protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR

private final Lock lock = new ReentrantLock();

private ApplicationContext applicationContext;

private final Environment environment;
Expand All @@ -74,9 +72,9 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, Application

private MessageConverter messageConverter = new SimpleMessageConverter();

private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter();
private StreamMessageConverter streamMessageConverter = new DefaultStreamMessageConverter();

private boolean streamConverterSet;
private final AtomicBoolean producerInitialized = new AtomicBoolean(false);

private Producer producer;

Expand Down Expand Up @@ -107,29 +105,23 @@ public RabbitStreamTemplate(Environment environment, String streamName) {


private Producer createOrGetProducer() {
this.lock.lock();
try {
if (this.producer == null) {
ProducerBuilder builder = this.environment.producerBuilder();
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
}
if (this.producerInitialized.compareAndSet(false, true)) {
ProducerBuilder builder = this.environment.producerBuilder();
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (this.streamMessageConverter instanceof DefaultStreamMessageConverter) {
((DefaultStreamMessageConverter) this.streamMessageConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
}
return this.producer;
}
finally {
this.lock.unlock();
}
return this.producer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the problem we are trying to cover with a Lock.
Imaging the situation when several threads concurrently call this method.
So, the first one calls compareAndSet() and enters into a block to create a producer.
The second one got false for the compareAndSet() and goes to the end of the method.
Since the first one has not finished its work for producer initialization, the second one ends up with a null.

Do I miss anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. my mistake.

}

@Override
Expand All @@ -139,13 +131,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws

@Override
public void setBeanName(String name) {
this.lock.lock();
try {
this.beanName = name;
}
finally {
this.lock.unlock();
}
throwIfProducerAlreadyInitialized();
this.beanName = name;
}

/**
Expand All @@ -154,13 +141,8 @@ public void setBeanName(String name) {
* @since 3.0
*/
public void setSuperStreamRouting(Function<com.rabbitmq.stream.Message, String> superStreamRouting) {
this.lock.lock();
try {
this.superStreamRouting = superStreamRouting;
}
finally {
this.lock.unlock();
}
throwIfProducerAlreadyInitialized();
this.superStreamRouting = superStreamRouting;
}


Expand All @@ -176,18 +158,12 @@ public void setMessageConverter(MessageConverter messageConverter) {
/**
* Set a converter to convert from {@link Message} to {@link com.rabbitmq.stream.Message}
* for {@link #send(Message)} and {@link #convertAndSend(Object)} methods.
* @param streamConverter the converter.
* @param streamMessageConverter the converter.
*/
public void setStreamConverter(StreamMessageConverter streamConverter) {
Assert.notNull(streamConverter, "'streamConverter' cannot be null");
this.lock.lock();
try {
this.streamConverter = streamConverter;
this.streamConverterSet = true;
}
finally {
this.lock.unlock();
}
public void setStreamConverter(StreamMessageConverter streamMessageConverter) {
Assert.notNull(streamMessageConverter, "'streamMessageConverter' cannot be null");
throwIfProducerAlreadyInitialized();
this.streamMessageConverter = streamMessageConverter;
}

/**
Expand All @@ -196,12 +172,13 @@ public void setStreamConverter(StreamMessageConverter streamConverter) {
*/
public void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
Assert.notNull(producerCustomizer, "'producerCustomizer' cannot be null");
this.lock.lock();
try {
this.producerCustomizer = producerCustomizer;
}
finally {
this.lock.unlock();
throwIfProducerAlreadyInitialized();
this.producerCustomizer = producerCustomizer;
}

private void throwIfProducerAlreadyInitialized() {
if (this.producerInitialized.get()) {
throw new IllegalStateException("producer is already initialized");
}
}

Expand All @@ -223,14 +200,14 @@ public MessageConverter messageConverter() {

@Override
public StreamMessageConverter streamMessageConverter() {
return this.streamConverter;
return this.streamMessageConverter;
}


@Override
public CompletableFuture<Boolean> send(Message message) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
observeSend(this.streamConverter.fromMessage(message), future);
observeSend(this.streamMessageConverter.fromMessage(message), future);
return future;
}

Expand Down Expand Up @@ -339,15 +316,9 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
*/
@Override
public void close() {
this.lock.lock();
try {
if (this.producer != null) {
this.producer.close();
this.producer = null;
}
}
finally {
this.lock.unlock();
Producer producer = this.producer;
if (this.producerInitialized.compareAndSet(true, false)) {
producer.close();
}
}

Expand Down