Skip to content

Commit 2c5914b

Browse files
neetkeeartembilan
authored andcommitted
Minor fixes and improvements
1 parent b10b068 commit 2c5914b

File tree

13 files changed

+21
-26
lines changed

13 files changed

+21
-26
lines changed

CONTRIBUTING.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ Please carefully follow the whitespace and formatting conventions already presen
151151
8. Latin-1 (ISO-8859-1) encoding for Java sources; use `native2ascii` to convert
152152
if necessary
153153

154-
## Add Apache license header to all new classes
154+
== Add Apache license header to all new classes
155155

156156
[source, java]
157157
----

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ task distZip(type: Zip, dependsOn: [docsZip]) { //, schemaZip]) {
384384
description = "Builds -${classifier} archive, containing all jars and docs, " +
385385
"suitable for community download page."
386386

387-
ext.baseDir = "${project.name}-${project.version}";
387+
ext.baseDir = "${project.name}-${project.version}"
388388

389389
from('src/dist') {
390390
include 'readme.txt'

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,8 @@ public BrokerAddress getBrokerAddress(int i) {
383383

384384
public BrokerAddress[] getBrokerAddresses() {
385385
List<BrokerAddress> addresses = new ArrayList<BrokerAddress>();
386-
for (int i = 0; i < this.kafkaPorts.length; i++) {
387-
addresses.add(new BrokerAddress("127.0.0.1", this.kafkaPorts[i]));
386+
for (int kafkaPort : this.kafkaPorts) {
387+
addresses.add(new BrokerAddress("127.0.0.1", kafkaPort));
388388
}
389389
return addresses.toArray(new BrokerAddress[0]);
390390
}

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchLoggingErrorHandler.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.util.Iterator;
2019

2120
import org.apache.commons.logging.LogFactory;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2222
import org.apache.kafka.clients.consumer.ConsumerRecords;
2323

2424
import org.springframework.core.log.LogAccessor;
@@ -28,7 +28,6 @@
2828
*
2929
* @author Gary Russell
3030
* @since 1.1
31-
*
3231
*/
3332
public class BatchLoggingErrorHandler implements BatchErrorHandler {
3433

@@ -42,9 +41,8 @@ public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
4241
message.append("null ");
4342
}
4443
else {
45-
Iterator<?> iterator = data.iterator();
46-
while (iterator.hasNext()) {
47-
message.append(iterator.next()).append('\n');
44+
for (ConsumerRecord<?, ?> record : data) {
45+
message.append(record).append('\n');
4846
}
4947
}
5048
LOGGER.error(thrownException, () -> message.substring(0, message.length() - 1));

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ public enum AckMode {
231231
*/
232232
public ContainerProperties(String... topics) {
233233
Assert.notEmpty(topics, "An array of topics must be provided");
234-
this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
234+
this.topics = topics.clone();
235235
this.topicPattern = null;
236236
this.topicPartitions = null;
237237
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,10 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Objec
126126
this.templates = templates;
127127
this.transactional = templates.values().iterator().next().isTransactional();
128128
Boolean tx = this.transactional;
129-
Assert.isTrue(!templates.values()
129+
Assert.isTrue(templates.values()
130130
.stream()
131131
.map(t -> t.isTransactional())
132-
.filter(t -> !t.equals(tx))
133-
.findFirst()
134-
.isPresent(), "All templates must have the same setting for transactional");
132+
.allMatch(t -> t.equals(tx)), "All templates must have the same setting for transactional");
135133
this.destinationResolver = destinationResolver;
136134
}
137135

@@ -172,7 +170,7 @@ private KafkaTemplate<Object, Object> findTemplateForValue(Object value) {
172170
if (key.isPresent()) {
173171
return (KafkaTemplate<Object, Object>) this.templates.get(key.get());
174172
}
175-
LOGGER.warn(() -> "Failed to find a template for " + value.getClass() + " attemting to use the last entry");
173+
LOGGER.warn(() -> "Failed to find a template for " + value.getClass() + " attempting to use the last entry");
176174
return (KafkaTemplate<Object, Object>) this.templates.values()
177175
.stream()
178176
.reduce((first, second) -> second)

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1344,7 +1344,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
13441344
while (iterator.hasNext()) {
13451345
records.add(iterator.next());
13461346
}
1347-
((RemainingRecordsErrorHandler) this.errorHandler).handle(decorateException(e), records, this.consumer,
1347+
this.errorHandler.handle(decorateException(e), records, this.consumer,
13481348
KafkaMessageListenerContainer.this.container);
13491349
}
13501350
else {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ protected final Object invokeHandler(Object data, Acknowledgment acknowledgment,
288288
if (this.hasAckParameter && acknowledgment == null) {
289289
throw new ListenerExecutionFailedException("invokeHandler Failed",
290290
new IllegalStateException("No Acknowledgment available as an argument, "
291-
+ "the listener container must have a MANUAL Ackmode to populate the Acknowledgment.",
291+
+ "the listener container must have a MANUAL AckMode to populate the Acknowledgment.",
292292
ex));
293293
}
294294
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +

spring-kafka/src/main/java/org/springframework/kafka/support/SendResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.kafka.clients.producer.RecordMetadata;
2121

2222
/**
23-
* Result for a Listenablefuture after a send.
23+
* Result for a ListenableFuture after a send.
2424
*
2525
* @param <K> the key type.
2626
* @param <V> the value type.

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@
3636
public interface MessageConverter {
3737

3838
@Nullable
39-
static String getGroupid() {
40-
String groupId = KafkaUtils.getConsumerGroupId();
41-
return groupId == null ? null : groupId;
39+
static String getGroupId() {
40+
return KafkaUtils.getConsumerGroupId();
4241
}
4342

4443
default void commonHeaders(Acknowledgment acknowledgment, Consumer<?, ?> consumer, Map<String, Object> rawHeaders,
@@ -52,7 +51,7 @@ default void commonHeaders(Acknowledgment acknowledgment, Consumer<?, ?> consume
5251
rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType);
5352
rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp);
5453
JavaUtils.INSTANCE
55-
.acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupid(),
54+
.acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupId(),
5655
(key, val) -> rawHeaders.put(key, val))
5756
.acceptIfNotNull(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment, (key, val) -> rawHeaders.put(key, val))
5857
.acceptIfNotNull(KafkaHeaders.CONSUMER, consumer, (key, val) -> rawHeaders.put(key, val));

0 commit comments

Comments
 (0)