Skip to content

Commit

Permalink
[fix] [broker] Replication stopped due to unload topic failed (#21947)
Browse files Browse the repository at this point in the history
### Motivation

**Steps to reproduce the issue**
- Enable replication.
- Send `10` messages to the local cluster then close the producer.
- Call `pulsar-admin topics unload <topic>` and get an error due to the internal producer of the replicator close failing.
- The topic closed failed, so we assumed the topic could work as expected, but the replication stopped.

**Root cause**
- `pulsar-admin topics unload <topic>`  will wait for the clients(including `consumers & producers & replicators`) to close successfully, and it will fail if clients can not be closed successfully.
- `replicator.producer` close failed causing the Admin API to fail, but there is a scheduled task that will retry to close `replicator.producer` which causes replication to stop. see https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java#L209

### Modifications

since the "replicator.producer.closeAsync()" will retry after it fails, the topic unload should be successful.

(cherry picked from commit 49f6a9f)
  • Loading branch information
poorbarcode committed Feb 28, 2024
1 parent 89e061c commit 7339f97
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ protected synchronized CompletableFuture<Void> closeProducerAsync() {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = producer.closeAsync();
future.thenRun(() -> {
return future.thenRun(() -> {
STATE_UPDATER.set(this, State.Stopped);
this.producer = null;
// deactivate further read
Expand All @@ -201,7 +201,6 @@ protected synchronized CompletableFuture<Void> closeProducerAsync() {
brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS);
return null;
});
return future;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.junit.Assert;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -51,6 +57,29 @@ public void cleanup() throws Exception {
super.cleanup();
}

private void waitReplicatorStarted(String topicName) {
Awaitility.await().untilAsserted(() -> {
Optional<Topic> topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get();
assertTrue(topicOptional2.isPresent());
PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get();
assertFalse(persistentTopic2.getProducers().isEmpty());
});
}

/**
* Override "AbstractReplicator.producer" by {@param producer} and return the original value.
*/
private ProducerImpl overrideProducerForReplicator(AbstractReplicator replicator, ProducerImpl newProducer)
throws Exception {
Field producerField = AbstractReplicator.class.getDeclaredField("producer");
producerField.setAccessible(true);
ProducerImpl originalValue = (ProducerImpl) producerField.get(replicator);
synchronized (replicator) {
producerField.set(replicator, newProducer);
}
return originalValue;
}

@Test
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
Expand Down Expand Up @@ -86,18 +115,13 @@ public void testReplicatorProducerStatInTopic() throws Exception {
public void testCreateRemoteConsumerFirst() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
// Wait for replicator started.
Awaitility.await().untilAsserted(() -> {
Optional<Topic> topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get();
assertTrue(topicOptional2.isPresent());
PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get();
assertFalse(persistentTopic2.getProducers().isEmpty());
});

// The topic in cluster2 has a replicator created producer(schema Auto_Produce), but does not have any schema。
// Verify: the consumer of this cluster2 can create successfully.
Consumer<String> consumer2 = client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName("s1")
.subscribe();;

// Wait for replicator started.
waitReplicatorStarted(topicName);
// cleanup.
producer1.close();
consumer2.close();
Expand All @@ -106,4 +130,34 @@ public void testCreateRemoteConsumerFirst() throws Exception {
admin2.topics().delete(topicName);
});
}

@Test
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
admin1.topics().createNonPartitionedTopic(topicName);
// Wait for replicator started.
waitReplicatorStarted(topicName);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
PersistentReplicator replicator =
(PersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
// Mock an error when calling "replicator.disconnect()"
ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class);
Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new Exception("mocked ex")));
ProducerImpl originalProducer = overrideProducerForReplicator(replicator, mockProducer);
// Verify: since the "replicator.producer.closeAsync()" will retry after it failed, the topic unload should be
// successful.
admin1.topics().unload(topicName);
// Verify: After "replicator.producer.closeAsync()" retry again, the "replicator.producer" will be closed
// successful.
overrideProducerForReplicator(replicator, originalProducer);
Awaitility.await().untilAsserted(() -> {
Assert.assertFalse(replicator.isConnected());
});
// cleanup.
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
}
}

0 comments on commit 7339f97

Please sign in to comment.