Skip to content

Commit 4d4eb80

Browse files
garyrussellartembilan
authored andcommitted
Bump kafka-clients to 2.2.0
- fix deprecations
1 parent 06c34c5 commit 4d4eb80

File tree

6 files changed

+78
-68
lines changed

6 files changed

+78
-68
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ subprojects { subproject ->
7878
junit4Version = '4.12'
7979
junitJupiterVersion = '5.4.0'
8080
junitPlatformVersion = '1.4.0'
81-
kafkaVersion = '2.1.1'
81+
kafkaVersion = '2.2.0'
8282
log4jVersion = '2.11.2'
8383
mockitoVersion = '2.24.0'
8484
scalaVersion = '2.11'

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
9494
*/
9595
public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
9696

97-
private static final int DEFAULT_ADMIN_TIMEOUT = 30;
97+
private static final Duration DEFAULT_ADMIN_TIMEOUT = Duration.ofSeconds(10);
9898

9999
private final int count;
100100

@@ -116,7 +116,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
116116

117117
private int[] kafkaPorts;
118118

119-
private int adminTimeout = DEFAULT_ADMIN_TIMEOUT;
119+
private Duration adminTimeout = DEFAULT_ADMIN_TIMEOUT;
120120

121121
private String brokerListProperty;
122122

@@ -157,12 +157,12 @@ public EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions
157157
/**
158158
* Specify the properties to configure Kafka Broker before start, e.g.
159159
* {@code auto.create.topics.enable}, {@code transaction.state.log.replication.factor} etc.
160-
* @param brokerProperties the properties to use for configuring Kafka Broker(s).
160+
* @param properties the properties to use for configuring Kafka Broker(s).
161161
* @return this for chaining configuration.
162162
* @see KafkaConfig
163163
*/
164-
public EmbeddedKafkaBroker brokerProperties(Map<String, String> brokerProperties) {
165-
this.brokerProperties.putAll(brokerProperties);
164+
public EmbeddedKafkaBroker brokerProperties(Map<String, String> properties) {
165+
this.brokerProperties.putAll(properties);
166166
return this;
167167
}
168168

@@ -180,13 +180,13 @@ public EmbeddedKafkaBroker brokerProperty(String property, Object value) {
180180
/**
181181
* Set explicit ports on which the kafka brokers will listen. Useful when running an
182182
* embedded broker that you want to access from other processes.
183-
* @param kafkaPorts the ports.
183+
* @param ports the ports.
184184
* @return the {@link EmbeddedKafkaBroker}.
185185
*/
186-
public EmbeddedKafkaBroker kafkaPorts(int... kafkaPorts) {
187-
Assert.isTrue(kafkaPorts.length == this.count, "A port must be provided for each instance ["
188-
+ this.count + "], provided: " + Arrays.toString(kafkaPorts) + ", use 0 for a random port");
189-
this.kafkaPorts = kafkaPorts;
186+
public EmbeddedKafkaBroker kafkaPorts(int... ports) {
187+
Assert.isTrue(ports.length == this.count, "A port must be provided for each instance ["
188+
+ this.count + "], provided: " + Arrays.toString(ports) + ", use 0 for a random port");
189+
this.kafkaPorts = ports;
190190
return this;
191191
}
192192

@@ -197,7 +197,7 @@ public EmbeddedKafkaBroker kafkaPorts(int... kafkaPorts) {
197197
* @since 2.2
198198
*/
199199
public void setAdminTimeout(int adminTimeout) {
200-
this.adminTimeout = adminTimeout;
200+
this.adminTimeout = Duration.ofSeconds(adminTimeout);
201201
}
202202

203203
@Override
@@ -247,42 +247,42 @@ private Properties createBrokerProperties(int i) {
247247
/**
248248
* Add topics to the existing broker(s) using the configured number of partitions.
249249
* The broker(s) must be running.
250-
* @param topics the topics.
250+
* @param topicsToAdd the topics.
251251
*/
252-
public void addTopics(String... topics) {
252+
public void addTopics(String... topicsToAdd) {
253253
Assert.notNull(this.zookeeper, "Broker must be started before this method can be called");
254-
HashSet<String> set = new HashSet<>(Arrays.asList(topics));
254+
HashSet<String> set = new HashSet<>(Arrays.asList(topicsToAdd));
255255
createKafkaTopics(set);
256256
this.topics.addAll(set);
257257
}
258258

259259
/**
260260
* Add topics to the existing broker(s).
261261
* The broker(s) must be running.
262-
* @param topics the topics.
262+
* @param topicsToAdd the topics.
263263
* @since 2.2
264264
*/
265-
public void addTopics(NewTopic... topics) {
265+
public void addTopics(NewTopic... topicsToAdd) {
266266
Assert.notNull(this.zookeeper, "Broker must be started before this method can be called");
267-
for (NewTopic topic : topics) {
267+
for (NewTopic topic : topicsToAdd) {
268268
Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic);
269269
Assert.isTrue(topic.replicationFactor() <= this.count
270270
&& (topic.replicasAssignments() == null
271271
|| topic.replicasAssignments().size() <= this.count),
272272
() -> "Embedded kafka does not support the requested replication factor: " + topic);
273273
}
274274

275-
doWithAdmin(admin -> createTopics(admin, Arrays.asList(topics)));
275+
doWithAdmin(admin -> createTopics(admin, Arrays.asList(topicsToAdd)));
276276
}
277277

278278
/**
279279
* Create topics in the existing broker(s) using the configured number of partitions.
280-
* @param topics the topics.
280+
* @param topicsToCreate the topics.
281281
*/
282-
private void createKafkaTopics(Set<String> topics) {
282+
private void createKafkaTopics(Set<String> topicsToCreate) {
283283
doWithAdmin(admin -> {
284284
createTopics(admin,
285-
topics.stream()
285+
topicsToCreate.stream()
286286
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
287287
.collect(Collectors.toList()));
288288
});
@@ -291,7 +291,7 @@ private void createKafkaTopics(Set<String> topics) {
291291
private void createTopics(AdminClient admin, List<NewTopic> newTopics) {
292292
CreateTopicsResult createTopics = admin.createTopics(newTopics);
293293
try {
294-
createTopics.all().get(this.adminTimeout, TimeUnit.SECONDS);
294+
createTopics.all().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS);
295295
}
296296
catch (Exception e) {
297297
throw new KafkaException(e);
@@ -312,7 +312,7 @@ public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
312312
}
313313
finally {
314314
if (admin != null) {
315-
admin.close(this.adminTimeout, TimeUnit.SECONDS);
315+
admin.close(this.adminTimeout);
316316
}
317317
}
318318
}
@@ -453,16 +453,16 @@ public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) {
453453
/**
454454
* Subscribe a consumer to one or more of the embedded topics.
455455
* @param consumer the consumer.
456-
* @param topics the topics.
456+
* @param topicsToConsume the topics.
457457
*/
458-
public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topics) {
459-
HashSet<String> diff = new HashSet<>(Arrays.asList(topics));
458+
public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume) {
459+
HashSet<String> diff = new HashSet<>(Arrays.asList(topicsToConsume));
460460
diff.removeAll(new HashSet<>(this.topics));
461461
assertThat(this.topics)
462462
.as("topic(s):'" + diff + "' are not in embedded topic list")
463-
.containsAll(new HashSet<>(Arrays.asList(topics)));
463+
.containsAll(new HashSet<>(Arrays.asList(topicsToConsume)));
464464
final AtomicBoolean assigned = new AtomicBoolean();
465-
consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener() {
465+
consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener() {
466466

467467
@Override
468468
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.time.Duration;
1920
import java.util.Collections;
2021
import java.util.HashMap;
2122
import java.util.Iterator;
@@ -85,7 +86,7 @@
8586
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, ApplicationContextAware,
8687
ApplicationListener<ContextStoppedEvent>, DisposableBean {
8788

88-
private static final int DEFAULT_PHYSICAL_CLOSE_TIMEOUT = 30;
89+
private static final Duration DEFAULT_PHYSICAL_CLOSE_TIMEOUT = Duration.ofSeconds(30);
8990

9091
private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class); // NOSONAR
9192

@@ -103,7 +104,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
103104

104105
private Serializer<V> valueSerializer;
105106

106-
private int physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
107+
private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
107108

108109
private String transactionIdPrefix;
109110

@@ -153,7 +154,7 @@ public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
153154
* @since 1.0.7
154155
*/
155156
public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
156-
this.physicalCloseTimeout = physicalCloseTimeout;
157+
this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout);
157158
}
158159

159160
/**
@@ -216,16 +217,16 @@ public boolean transactionCapable() {
216217

217218
@SuppressWarnings("resource")
218219
@Override
219-
public void destroy() throws Exception { //NOSONAR
220+
public void destroy() {
220221
CloseSafeProducer<K, V> producerToClose = this.producer;
221222
this.producer = null;
222223
if (producerToClose != null) {
223-
producerToClose.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
224+
producerToClose.delegate.close(this.physicalCloseTimeout);
224225
}
225226
producerToClose = this.cache.poll();
226227
while (producerToClose != null) {
227228
try {
228-
producerToClose.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
229+
producerToClose.delegate.close(this.physicalCloseTimeout);
229230
}
230231
catch (Exception e) {
231232
logger.error("Exception while closing producer", e);
@@ -234,7 +235,7 @@ public void destroy() throws Exception { //NOSONAR
234235
}
235236
synchronized (this.consumerProducers) {
236237
this.consumerProducers.forEach(
237-
(k, v) -> v.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
238+
(k, v) -> v.delegate.close(this.physicalCloseTimeout));
238239
this.consumerProducers.clear();
239240
}
240241
}
@@ -337,11 +338,11 @@ Producer<K, V> createTransactionalProducerForPartition() {
337338
}
338339
}
339340

340-
private void removeConsumerProducer(CloseSafeProducer<K, V> producer) {
341+
private void removeConsumerProducer(CloseSafeProducer<K, V> producerToRemove) {
341342
synchronized (this.consumerProducers) {
342343
Iterator<Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
343344
while (iterator.hasNext()) {
344-
if (iterator.next().getValue().equals(producer)) {
345+
if (iterator.next().getValue().equals(producerToRemove)) {
345346
iterator.remove();
346347
break;
347348
}
@@ -380,12 +381,12 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
380381
}
381382

382383
@Override
383-
public void closeProducerFor(String transactionIdSuffix) {
384+
public void closeProducerFor(String suffix) {
384385
if (this.producerPerConsumerPartition) {
385386
synchronized (this.consumerProducers) {
386-
CloseSafeProducer<K, V> removed = this.consumerProducers.remove(transactionIdSuffix);
387+
CloseSafeProducer<K, V> removed = this.consumerProducers.remove(suffix);
387388
if (removed != null) {
388-
removed.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
389+
removed.delegate.close(this.physicalCloseTimeout);
389390
}
390391
}
391392
}
@@ -524,22 +525,29 @@ public void abortTransaction() throws ProducerFencedException {
524525

525526
@Override
526527
public void close() {
527-
close(0, null);
528+
close(null);
528529
}
529530

530531
@Override
532+
@SuppressWarnings("deprecation")
533+
@Deprecated
531534
public void close(long timeout, @Nullable TimeUnit unit) {
535+
close(Duration.ofMillis(unit.toMillis(timeout)));
536+
}
537+
538+
@Override
539+
public void close(Duration timeout) {
532540
if (this.cache != null) {
533541
if (this.txFailed) {
534542
if (logger.isWarnEnabled()) {
535543
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
536544
+ "broker restarted during transaction: " + this);
537545
}
538-
if (unit == null) {
546+
if (timeout == null) {
539547
this.delegate.close();
540548
}
541549
else {
542-
this.delegate.close(timeout, unit);
550+
this.delegate.close(timeout);
543551
}
544552
if (this.removeConsumerProducer != null) {
545553
this.removeConsumerProducer.accept(this);
@@ -550,11 +558,11 @@ public void close(long timeout, @Nullable TimeUnit unit) {
550558
synchronized (this) {
551559
if (!this.cache.contains(this)
552560
&& !this.cache.offer(this)) {
553-
if (unit == null) {
561+
if (timeout == null) {
554562
this.delegate.close();
555563
}
556564
else {
557-
this.delegate.close(timeout, unit);
565+
this.delegate.close(timeout);
558566
}
559567
}
560568
}

0 commit comments

Comments
 (0)