Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock between pc-control and pc-broker-poll threads where partitions are revoked #548

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ endif::[]
=== Fixes

* fix: RunLength offset decoding returns 0 base offset after no-progress commit - related to (#546)
* fix: Transactional PConsumer stuck while rebalancing - related to (#541)

== 0.5.2.5

Expand Down
1 change: 1 addition & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,7 @@ endif::[]
=== Fixes

* fix: RunLength offset decoding returns 0 base offset after no-progress commit - related to (#546)
* fix: Transactional PConsumer stuck while rebalancing - related to (#541)

== 0.5.2.5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public Duration getTimeBetweenCommits() {
@Getter(PROTECTED)
private final BlockingQueue<ControllerEventMessage<K, V>> workMailBox = new LinkedBlockingQueue<>(); // Thread safe, highly performant, non blocking

private final AtomicBoolean isRebalanceInProgress = new AtomicBoolean(false);
/**
* An inbound message to the controller.
* <p>
Expand Down Expand Up @@ -356,21 +357,35 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
* <p>
* Make sure the calling thread is the thread which performs commit - i.e. is the {@link OffsetCommitter}.
*/
@SneakyThrows
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.debug("Partitions revoked {}, state: {}", partitions, state);
isRebalanceInProgress.set(true);
while (this.producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false))
Thread.sleep(100); //wait for the transaction to finish committing

numberOfAssignedPartitions = numberOfAssignedPartitions - partitions.size();

try {
// commit any offsets from revoked partitions BEFORE truncation
this.producerManager.ifPresent(pm -> {
try{
pm.preAcquireOffsetsToCommit();
} catch (Exception exc){
throw new InternalRuntimeException(exc);
}
});

commitOffsetsThatAreReady();

// truncate the revoked partitions
wm.onPartitionsRevoked(partitions);
} catch (Exception e) {
throw new InternalRuntimeException("onPartitionsRevoked event error", e);
} finally {
isRebalanceInProgress.set(false);
}

//
try {
usersConsumerRebalanceListener.ifPresent(listener -> listener.onPartitionsRevoked(partitions));
Expand Down Expand Up @@ -677,7 +692,6 @@ protected <R> void controlLoop(Function<PollContextInternal<K, V>, List<R>> user
Consumer<R> callback) throws TimeoutException, ExecutionException, InterruptedException {
maybeWakeupPoller();

//
final boolean shouldTryCommitNow = maybeAcquireCommitLock();

// make sure all work that's been completed are arranged ready for commit
Expand Down Expand Up @@ -748,7 +762,7 @@ private void maybeWakeupPoller() {
* @return true if committing should either way be attempted now
*/
private boolean maybeAcquireCommitLock() throws TimeoutException, InterruptedException {
final boolean shouldTryCommitNow = isTimeToCommitNow() && wm.isDirty();
final boolean shouldTryCommitNow = isTimeToCommitNow() && wm.isDirty() && !isRebalanceInProgress.get();
// could do this optimistically as well, and only get the lock if it's time to commit, so is not frequent
if (shouldTryCommitNow && options.isUsingTransactionCommitMode()) {
// get into write lock queue, so that no new work can be started from here on
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
*/
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import io.confluent.parallelconsumer.internal.PCModule;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION;
import static io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils.GroupOption.REUSE_GROUP;
import static java.time.Duration.ofSeconds;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.number.OrderingComparison.greaterThan;

/**
* Originally created to reproduce the bug #541 https://github.com/confluentinc/parallel-consumer/issues/541
* <p>
* This test reproduces the potential deadlock situation when a rebalance occurs
* using EoS with transactional producer configuration.
* The solution aims to avoid the deadlock by reordering the acquisition of locks
* in the {@link io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor#onPartitionsRevoked(Collection)} method.
*
* @author Nacho Munoz
*/
@Slf4j
class RebalanceEoSDeadlockTest extends BrokerIntegrationTest<String, String> {

private static final String PC_CONTROL = "pc-control";
public static final String PC_BROKER_POLL = "pc-broker-poll";
Consumer<String, String> consumer;
Producer<String, String> producer;

CountDownLatch rebalanceLatch;
private long sleepTimeMs = 0L;

ParallelEoSStreamProcessor<String, String> pc;

{
super.numPartitions = 2;
}

private String outputTopic;
@BeforeEach
void setup() {
rebalanceLatch = new CountDownLatch(1);
setupTopic();
outputTopic = setupTopic("output-topic");
producer = getKcu().createNewProducer(KafkaClientUtils.ProducerMode.TRANSACTIONAL);
consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);
var pcOptions = ParallelConsumerOptions.<String,String>builder()
.commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)
.consumer(consumer)
.produceLockAcquisitionTimeout(Duration.ofMinutes(2))
.producer(producer)
.ordering(PARTITION) // just so we dont need to use keys
.build();

pc = new ParallelEoSStreamProcessor<>(pcOptions, new PCModule<>(pcOptions)) {

@Override
protected void commitOffsetsThatAreReady() throws TimeoutException, InterruptedException {
final var threadName = Thread.currentThread().getName();
if (threadName.contains(PC_CONTROL)) {
log.info("Delaying pc-control thread {}ms to force the potential deadlock on rebalance", sleepTimeMs);
ThreadUtils.sleepQuietly(sleepTimeMs);
}

super.commitOffsetsThatAreReady();

if (threadName.contains(PC_BROKER_POLL)) {
// onPartitionsRevoked managed to commit the offsets
rebalanceLatch.countDown();
}
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
super.onPartitionsRevoked(partitions);
}
};

pc.subscribe(UniSets.of(topic));
}
final static long SLEEP_TIME_MS = 3000L;
@SneakyThrows
@RepeatedTest(5)
void noDeadlockOnRevoke() {
this.sleepTimeMs = (long) (SLEEP_TIME_MS + (Math.random() * 1000));
var numberOfRecordsToProduce = 100L;
var count = new AtomicLong();

getKcu().produceMessages(topic, numberOfRecordsToProduce);
pc.setTimeBetweenCommits(ofSeconds(1));
// consume some records
pc.pollAndProduce((recordContexts) -> {
count.getAndIncrement();
log.debug("Processed record, count now {} - offset: {}", count, recordContexts.offset());
return new ProducerRecord<>(outputTopic, recordContexts.key(), recordContexts.value());
});

await().timeout(Duration.ofSeconds(30)).untilAtomic(count,is(greaterThan(5L)));
log.debug("Records are getting consumed");

// cause rebalance
final Duration newPollTimeout = Duration.ofSeconds(5);
log.debug("Creating new consumer in same group and subscribing to same topic set with a no record timeout of {}, expect this phase to take entire timeout...", newPollTimeout);
try (var newConsumer = getKcu().createNewConsumer(REUSE_GROUP)) {
newConsumer.subscribe(UniLists.of(topic));
newConsumer.poll(newPollTimeout);

if (!rebalanceLatch.await(30, TimeUnit.SECONDS)) {
Assertions.fail("Rebalance did not finished");
}
log.debug("Test finished");
}

}
}