Skip to content

Commit

Permalink
Improve conditions in code
Browse files Browse the repository at this point in the history
* Reduce `else` condition and modernize switch pattern
* Check condition after calling lock method
* Some additional code clean up
  • Loading branch information
ngocnhan-tran1996 authored and artembilan committed Oct 21, 2024
1 parent f330752 commit 33da2a6
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 193 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@
*
* @author Gary Russell
* @author Christian Tzolov
* @author Ngoc Nhan
* @since 2.4
*
*/
Expand Down Expand Up @@ -107,29 +108,31 @@ public RabbitStreamTemplate(Environment environment, String streamName) {


private Producer createOrGetProducer() {
this.lock.lock();
try {
if (this.producer == null) {
ProducerBuilder builder = this.environment.producerBuilder();
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
if (this.producer == null) {
this.lock.lock();
try {
if (this.producer == null) {
ProducerBuilder builder = this.environment.producerBuilder();
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
}
}
}
return this.producer;
}
finally {
this.lock.unlock();
finally {
this.lock.unlock();
}
}
return this.producer;
}

@Override
Expand Down Expand Up @@ -305,24 +308,13 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
}
else {
int code = confStatus.getCode();
String errorMessage;
switch (code) {
case Constants.CODE_MESSAGE_ENQUEUEING_FAILED:
errorMessage = "Message Enqueueing Failed";
break;
case Constants.CODE_PRODUCER_CLOSED:
errorMessage = "Producer Closed";
break;
case Constants.CODE_PRODUCER_NOT_AVAILABLE:
errorMessage = "Producer Not Available";
break;
case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT:
errorMessage = "Publish Confirm Timeout";
break;
default:
errorMessage = "Unknown code: " + code;
break;
}
String errorMessage = switch (code) {
case Constants.CODE_MESSAGE_ENQUEUEING_FAILED -> "Message Enqueueing Failed";
case Constants.CODE_PRODUCER_CLOSED -> "Producer Closed";
case Constants.CODE_PRODUCER_NOT_AVAILABLE -> "Producer Not Available";
case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT -> "Publish Confirm Timeout";
default -> "Unknown code: " + code;
};
StreamSendException ex = new StreamSendException(errorMessage, code);
observation.error(ex);
observation.stop();
Expand All @@ -339,15 +331,17 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
*/
@Override
public void close() {
this.lock.lock();
try {
if (this.producer != null) {
this.producer.close();
this.producer = null;
if (this.producer != null) {
this.lock.lock();
try {
if (this.producer != null) {
this.producer.close();
this.producer = null;
}
}
finally {
this.lock.unlock();
}
}
finally {
this.lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.MergedAnnotation;
import org.springframework.core.annotation.MergedAnnotations;
import org.springframework.core.annotation.MergedAnnotations.SearchStrategy;
import org.springframework.core.convert.ConversionService;
Expand Down Expand Up @@ -357,7 +358,7 @@ else if (source instanceof Method method) {
}
return !name.contains("$MockitoMock$");
})
.map(ann -> ann.synthesize())
.map(MergedAnnotation::synthesize)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -893,7 +894,7 @@ private void addToMap(Map<String, Object> map, String key, Object value, Class<?
}
}
else {
if (value instanceof String && !StringUtils.hasText((String) value)) {
if (value instanceof String string && !StringUtils.hasText(string)) {
putEmpty(map, key);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,12 @@ public Date nextRelease() {
if (this.messages.isEmpty() || this.timeout <= 0) {
return null;
}
else if (this.currentSize >= this.bufferLimit) {
if (this.currentSize >= this.bufferLimit) {
// release immediately, we're already over the limit
return new Date();
}
else {
return new Date(System.currentTimeMillis() + this.timeout);
}

return new Date(System.currentTimeMillis() + this.timeout);
}

@Override
Expand All @@ -122,9 +121,8 @@ public Collection<MessageBatch> releaseBatches() {
if (batch == null) {
return Collections.emptyList();
}
else {
return Collections.singletonList(batch);
}

return Collections.singletonList(batch);
}

private MessageBatch doReleaseBatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,13 @@ private AbstractMessageListenerContainer createContainer() {
.acceptIfNotNull(this.retryDeclarationInterval, container::setRetryDeclarationInterval);
return container;
}
else {
DirectMessageListenerContainer container = new DirectMessageListenerContainer(this.connectionFactory);
JavaUtils.INSTANCE
.acceptIfNotNull(this.consumersPerQueue, container::setConsumersPerQueue)
.acceptIfNotNull(this.taskScheduler, container::setTaskScheduler)
.acceptIfNotNull(this.monitorInterval, container::setMonitorInterval);
return container;
}

DirectMessageListenerContainer container = new DirectMessageListenerContainer(this.connectionFactory);
JavaUtils.INSTANCE
.acceptIfNotNull(this.consumersPerQueue, container::setConsumersPerQueue)
.acceptIfNotNull(this.taskScheduler, container::setTaskScheduler)
.acceptIfNotNull(this.monitorInterval, container::setMonitorInterval);
return container;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public BeanDefinition parse(Element element, ParserContext parserContext) {
}

List<Element> childElements = DomUtils.getChildElementsByTagName(element, LISTENER_ELEMENT);
for (int i = 0; i < childElements.size(); i++) {
parseListener(childElements.get(i), element, parserContext, containerList);
for (Element childElement : childElements) {
parseListener(childElement, element, parserContext, containerList);
}

parserContext.popAndRegisterContainingComponent();
Expand Down Expand Up @@ -190,8 +190,8 @@ private void parseListener(Element listenerEle, Element containerEle, ParserCont
else {
String[] names = StringUtils.commaDelimitedListToStringArray(queues);
List<RuntimeBeanReference> values = new ManagedList<>();
for (int i = 0; i < names.length; i++) {
values.add(new RuntimeBeanReference(names[i].trim()));
for (String name : names) {
values.add(new RuntimeBeanReference(name.trim()));
}
containerDef.getPropertyValues().add("queues", values);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,12 +27,14 @@
import org.springframework.amqp.rabbit.retry.MessageKeyGenerator;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.NewMessageIdentifier;
import org.springframework.lang.Nullable;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.interceptor.MethodArgumentsKeyGenerator;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.interceptor.NewMethodArgumentsIdentifier;
import org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/**
* Convenient factory bean for creating a stateful retry interceptor for use in a message listener container, giving you
Expand All @@ -47,6 +49,7 @@
*
* @author Dave Syer
* @author Gary Russell
* @author Ngoc Nhan
*
* @see RetryOperations#execute(org.springframework.retry.RetryCallback, org.springframework.retry.RecoveryCallback,
* org.springframework.retry.RetryState)
Expand Down Expand Up @@ -90,9 +93,8 @@ private NewMethodArgumentsIdentifier createNewItemIdentifier() {
if (StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier == null) {
return !message.getMessageProperties().isRedelivered();
}
else {
return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message);
}

return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message);
};
}

Expand Down Expand Up @@ -120,40 +122,33 @@ else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecovere
private MethodArgumentsKeyGenerator createKeyGenerator() {
return args -> {
Message message = argToMessage(args);
Assert.notNull(message, "The 'args' must not convert to null");
if (StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator == null) {
String messageId = message.getMessageProperties().getMessageId();
if (messageId == null && message.getMessageProperties().isRedelivered()) {
message.getMessageProperties().setFinalRetryForMessageWithNoId(true);
}
return messageId;
}
else {
return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message);
}
return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message);
};
}

@SuppressWarnings("unchecked")
@Nullable
private Message argToMessage(Object[] args) {
Object arg = args[1];
Message message = null;
if (arg instanceof Message msg) {
message = msg;
return msg;
}
else if (arg instanceof List) {
message = ((List<Message>) arg).get(0);
if (arg instanceof List<?> list) {
return (Message) list.get(0);
}
return message;
return null;
}

@Override
public Class<?> getObjectType() {
return StatefulRetryOperationsInterceptor.class;
}

@Override
public boolean isSingleton() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setTargetConnectionFactories(Map<Object, ConnectionFactory> targetCo
Assert.noNullElements(targetConnectionFactories.values().toArray(),
"'targetConnectionFactories' cannot have null values.");
this.targetConnectionFactories.putAll(targetConnectionFactories);
targetConnectionFactories.values().stream().forEach(cf -> checkConfirmsAndReturns(cf));
targetConnectionFactories.values().forEach(this::checkConfirmsAndReturns);
}

/**
Expand Down Expand Up @@ -293,7 +293,7 @@ public void destroy() {

@Override
public void resetConnection() {
this.targetConnectionFactories.values().forEach(factory -> factory.resetConnection());
this.targetConnectionFactories.values().forEach(ConnectionFactory::resetConnection);
this.defaultTargetConnectionFactory.resetConnection();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ public static void unRegisterConsumerChannel() {
@Nullable
public static Channel getConsumerChannel() {
ChannelHolder channelHolder = consumerChannel.get();
Channel channel = null;
if (channelHolder != null) {
channel = channelHolder.getChannel();
}
return channel;
return channelHolder != null
? channelHolder.getChannel()
: null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
* expired. It also holds {@link CorrelationData} for
* the client to correlate a confirm with a sent message.
* @author Gary Russell
* @author Ngoc Nhan
* @since 1.0.1
*
*/
Expand Down Expand Up @@ -115,7 +116,7 @@ public void setReturned(boolean isReturned) {
* @since 2.2.10
*/
public boolean waitForReturnIfNeeded() throws InterruptedException {
return this.returned ? this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS) : true;
return !this.returned || this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS);
}

/**
Expand Down
Loading

0 comments on commit 33da2a6

Please sign in to comment.