Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -124,9 +123,7 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>();

private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();

private final Set<CloseSafeProducer<K, V>> threadBoundProducersAll = ConcurrentHashMap.newKeySet();
private final Map<Thread, CloseSafeProducer<K, V>> threadBoundProducers = new ConcurrentHashMap<>();

private final AtomicInteger epoch = new AtomicInteger();

Expand Down Expand Up @@ -725,15 +722,15 @@ public void destroy() {
}
});
this.cache.clear();
this.threadBoundProducersAll.forEach(prod -> {
this.threadBoundProducers.values().forEach(prod -> {
try {
prod.closeDelegate(this.physicalCloseTimeout, this.listeners);
}
catch (Exception e) {
LOGGER.error(e, "Exception while closing producer");
}
});
this.threadBoundProducersAll.clear();
this.threadBoundProducers.clear();
this.epoch.incrementAndGet();
}

Expand Down Expand Up @@ -800,10 +797,9 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
}

private Producer<K, V> getOrCreateThreadBoundProducer() {
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get(Thread.currentThread());
if (tlProducer != null && (tlProducer.closed || this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {
closeThreadBoundProducer();
this.threadBoundProducersAll.remove(tlProducer);
tlProducer = null;
}
if (tlProducer == null) {
Expand All @@ -812,8 +808,7 @@ private Producer<K, V> getOrCreateThreadBoundProducer() {
for (Listener<K, V> listener : this.listeners) {
listener.producerAdded(tlProducer.clientId, tlProducer);
}
this.threadBoundProducers.set(tlProducer);
this.threadBoundProducersAll.add(tlProducer);
this.threadBoundProducers.put(Thread.currentThread(), tlProducer);
}
return tlProducer;
}
Expand Down Expand Up @@ -949,10 +944,8 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
*/
@Override
public void closeThreadBoundProducer() {
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.remove(Thread.currentThread());
if (tlProducer != null) {
this.threadBoundProducers.remove();
this.threadBoundProducersAll.remove(tlProducer);
tlProducer.closeDelegate(this.physicalCloseTimeout, this.listeners);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
Expand Down Expand Up @@ -113,7 +114,7 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo

private final boolean transactional;

private final ThreadLocal<Producer<K, V>> producers = new ThreadLocal<>();
private final Map<Thread, Producer<K, V>> producers = new ConcurrentHashMap<>();

private final Map<String, String> micrometerTags = new HashMap<>();

Expand Down Expand Up @@ -616,7 +617,8 @@ public <T> T execute(ProducerCallback<K, V, T> callback) {
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
Assert.notNull(callback, "'callback' cannot be null");
Assert.state(this.transactional, "Producer factory does not support transactions");
Producer<K, V> producer = this.producers.get();
Thread currentThread = Thread.currentThread();
Producer<K, V> producer = this.producers.get(currentThread);
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");

producer = this.producerFactory.createProducer(this.transactionIdPrefix);
Expand All @@ -629,7 +631,7 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
throw e;
}

this.producers.set(producer);
this.producers.put(currentThread, producer);
try {
T result = callback.doInOperations(this);
try {
Expand All @@ -648,7 +650,7 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
throw e;
}
finally {
this.producers.remove();
this.producers.remove(currentThread);
closeProducer(producer, false);
}
}
Expand Down Expand Up @@ -727,7 +729,7 @@ private ConsumerRecord<K, V> receiveOne(TopicPartition topicPartition, long offs
}

private Producer<K, V> producerForOffsets() {
Producer<K, V> producer = this.producers.get();
Producer<K, V> producer = this.producers.get(Thread.currentThread());
if (producer == null) {
@SuppressWarnings("unchecked")
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
Expand Down Expand Up @@ -876,7 +878,7 @@ private void failureTimer(@Nullable Object sample, Exception exception, Producer
*/
@Override
public boolean inTransaction() {
return this.transactional && (this.producers.get() != null
return this.transactional && (this.producers.get(Thread.currentThread()) != null
|| TransactionSynchronizationManager.getResource(this.producerFactory) != null
|| TransactionSynchronizationManager.isActualTransactionActive());
}
Expand All @@ -900,7 +902,7 @@ protected Producer<K, V> getTheProducer(@SuppressWarnings("unused") @Nullable St
}
}
if (transactionalProducer) {
Producer<K, V> producer = this.producers.get();
Producer<K, V> producer = this.producers.get(Thread.currentThread());
if (producer != null) {
return producer;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,20 +38,20 @@
*/
public abstract class AbstractConsumerSeekAware implements ConsumerSeekAware {

private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<Thread, ConsumerSeekCallback> callbackForThread = new ConcurrentHashMap<>();

private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

private final Map<ConsumerSeekCallback, List<TopicPartition>> callbacksToTopic = new ConcurrentHashMap<>();

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
this.callbackForThread.put(Thread.currentThread(), callback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
ConsumerSeekCallback threadCallback = this.callbackForThread.get();
ConsumerSeekCallback threadCallback = this.callbackForThread.get(Thread.currentThread());
if (threadCallback != null) {
assignments.keySet().forEach(tp -> {
this.callbacks.put(tp, threadCallback);
Expand All @@ -78,7 +78,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

@Override
public void unregisterSeekCallback() {
this.callbackForThread.remove();
this.callbackForThread.remove(Thread.currentThread());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,7 +35,8 @@ public interface ConsumerSeekAware {
/**
* Register the callback to use when seeking at some arbitrary time. When used with a
* {@code ConcurrentMessageListenerContainer} or the same listener instance in multiple
* containers listeners should store the callback in a {@code ThreadLocal}.
* containers listeners should store the callback in a {@code ThreadLocal} or a map keyed
* by the thread.
* @param callback the callback.
*/
default void registerSeekCallback(ConsumerSeekCallback callback) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -52,9 +54,9 @@
public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor
implements AfterRollbackProcessor<K, V> {

private final ThreadLocal<BackOffExecution> backOffs = new ThreadLocal<>(); // Intentionally not static
private final Map<Thread, BackOffExecution> backOffs = new ConcurrentHashMap<>();

private final ThreadLocal<Long> lastIntervals = new ThreadLocal<>(); // Intentionally not static
private final Map<Thread, Long> lastIntervals = new ConcurrentHashMap<>();

private final BackOff backOff;

Expand Down Expand Up @@ -182,8 +184,9 @@ public boolean isProcessInTransaction() {
@Override
public void clearThreadState() {
super.clearThreadState();
this.backOffs.remove();
this.lastIntervals.remove();
Thread currentThread = Thread.currentThread();
this.backOffs.remove(currentThread);
this.lastIntervals.remove(currentThread);
}

private static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand All @@ -46,7 +47,7 @@
*/
class FailedRecordTracker implements RecoveryStrategy {

private final ThreadLocal<Map<TopicPartition, FailedRecord>> failures = new ThreadLocal<>(); // intentionally not static
private final Map<Thread, Map<TopicPartition, FailedRecord>> failures = new ConcurrentHashMap<>();

private final ConsumerAwareRecordRecoverer recoverer;

Expand Down Expand Up @@ -76,7 +77,7 @@ class FailedRecordTracker implements RecoveryStrategy {
Assert.notNull(backOff, "'backOff' cannot be null");
if (recoverer == null) {
this.recoverer = (rec, consumer, thr) -> {
Map<TopicPartition, FailedRecord> map = this.failures.get();
Map<TopicPartition, FailedRecord> map = this.failures.get(Thread.currentThread());
FailedRecord failedRecord = null;
if (map != null) {
failedRecord = map.get(new TopicPartition(rec.topic(), rec.partition()));
Expand Down Expand Up @@ -172,11 +173,8 @@ public boolean recovered(ConsumerRecord<?, ?> record, Exception exception,
attemptRecovery(record, exception, null, consumer);
return true;
}
Map<TopicPartition, FailedRecord> map = this.failures.get();
if (map == null) {
this.failures.set(new HashMap<>());
map = this.failures.get();
}
Thread currentThread = Thread.currentThread();
Map<TopicPartition, FailedRecord> map = this.failures.computeIfAbsent(currentThread, t -> new HashMap<>());
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
this.retryListeners.forEach(rl ->
Expand All @@ -190,7 +188,7 @@ public boolean recovered(ConsumerRecord<?, ?> record, Exception exception,
attemptRecovery(record, exception, topicPartition, consumer);
map.remove(topicPartition);
if (map.isEmpty()) {
this.failures.remove();
this.failures.remove(currentThread);
}
return true;
}
Expand Down Expand Up @@ -233,14 +231,14 @@ private void attemptRecovery(ConsumerRecord<?, ?> record, Exception exception, @
catch (RuntimeException e) {
this.retryListeners.forEach(rl -> rl.recoveryFailed(record, exception, e));
if (tp != null && this.resetStateOnRecoveryFailure) {
this.failures.get().remove(tp);
this.failures.get(Thread.currentThread()).remove(tp);
}
throw e;
}
}

void clearThreadState() {
this.failures.remove();
this.failures.remove(Thread.currentThread());
}

ConsumerAwareRecordRecoverer getRecoverer() {
Expand All @@ -254,7 +252,7 @@ ConsumerAwareRecordRecoverer getRecoverer() {
* @since 2.5
*/
int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
Map<TopicPartition, FailedRecord> map = this.failures.get();
Map<TopicPartition, FailedRecord> map = this.failures.get(Thread.currentThread());
if (map == null) {
return 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -58,7 +60,7 @@ class FallbackBatchErrorHandler extends ExceptionClassifier implements CommonErr

private final CommonErrorHandler seeker = new SeekAfterRecoverFailsOrInterrupted();

private final ThreadLocal<Boolean> retrying = ThreadLocal.withInitial(() -> false);
private final Map<Thread, Boolean> retrying = new ConcurrentHashMap<>();

private final List<RetryListener> retryListeners = new ArrayList<>();

Expand Down Expand Up @@ -145,22 +147,22 @@ public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?,
this.logger.error(thrownException, "Called with no records; consumer exception");
return;
}
this.retrying.set(true);
this.retrying.put(Thread.currentThread(), true);
try {
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners, getClassifier(),
this.reclassifyOnExceptionChange);
}
finally {
this.retrying.set(false);
this.retrying.remove(Thread.currentThread());
}
}

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {

if (this.retrying.get()) {
if (Boolean.TRUE.equals(this.retrying.get(Thread.currentThread()))) {
consumer.pause(consumer.assignment());
publishPause.run();
}
Expand Down
Loading