Skip to content

Commit

Permalink
Fix bug where producer for geo-replication is not closed when topic i…
Browse files Browse the repository at this point in the history
…s unloaded (apache#7735)

### Motivation

When a topic is unloaded and moved to another broker, the producer for geo-replication often remains unclosed. Because of this, geo-replication is not possible on the broker to which the topic was moved and messages accumulate in the replication backlog.

```
18:56:55.166 [pulsar-io-21-6] ERROR o.a.pulsar.client.impl.ProducerImpl  - [persistent://xxx/yyy/zzz] [pulsar.repl.dc2] Failed to create producer: Producer with name 'pulsar.repl.dc2' is already connected to topic
```

When this issue occurs, the following log is output on the broker where the topic is unloaded.

```
17:14:36.424 [bookkeeper-ml-workers-OrderedExecutor-18-0] INFO  o.a.p.b.s.persistent.PersistentTopic - [persistent://xxx/yyy/zzz] Un-fencing topic...
```

Unloaded topics are usually fenced to prevent new clients from connecting. In this case, however, the producers reconnected to the topic because it had been unfenced, and the replicator was restarted.

I think this is due to apache#5271. If a topic is fenced to close or delete, we should not unfence it.

### Modifications

When closing or deleting the `PersistentTopic` instance, set the `isClosingOrDeleting` flag to true. If `isClosingOrDeleting` is true, do not unfence the topic unless closing or deleting fails.
  • Loading branch information
Masahiro Sakamoto authored and Livio committed Sep 5, 2020
1 parent d124e8a commit 42f85ca
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ protected TopicStatsHelper initialValue() {
private volatile double lastUpdatedAvgPublishRateInByte = 0;

public volatile int maxUnackedMessagesOnSubscription = -1;
private volatile boolean isClosingOrDeleting = false;

private static class TopicStatsHelper {
public double averageMsgSize;
Expand Down Expand Up @@ -346,9 +347,9 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont

private void decrementPendingWriteOpsAndCheck() {
long pending = pendingWriteOps.decrementAndGet();
if (pending == 0 && isFenced) {
if (pending == 0 && isFenced && !isClosingOrDeleting) {
synchronized (this) {
if (isFenced) {
if (isFenced && !isClosingOrDeleting) {
messageDeduplication.resetHighestSequenceIdPushed();
log.info("[{}] Un-fencing topic...", topic);
// signal to managed ledger that we are ready to resume by creating a new ledger
Expand Down Expand Up @@ -844,7 +845,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,

lock.writeLock().lock();
try {
if (isFenced) {
if (isClosingOrDeleting) {
log.warn("[{}] Topic is already being closed or deleted", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
} else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
Expand All @@ -853,7 +854,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions did not catch up"));
}

isFenced = true; // Avoid clients reconnections while deleting
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
Expand All @@ -864,7 +865,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
isFenced = false;
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
});
Expand All @@ -885,7 +886,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
deleteSchemaFuture.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
isFenced = false;
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
Expand All @@ -907,7 +908,7 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
log.info("[{}] Topic is already deleted {}", topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
isFenced = false;
unfenceTopicToResume();
log.error("[{}] Error deleting topic", topic, exception);
deleteFuture.completeExceptionally(new PersistenceException(exception));
}
Expand All @@ -916,12 +917,12 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
});
} else {
isFenced = false;
unfenceTopicToResume();
deleteFuture.completeExceptionally(new TopicBusyException(
"Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
}
}).exceptionally(ex->{
isFenced = false;
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic."));
return null;
Expand Down Expand Up @@ -951,8 +952,8 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
try {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
if (!isFenced || closeWithoutWaitingClientDisconnect) {
isFenced = true;
if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
fenceTopicToCloseOrDelete();
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
Expand Down Expand Up @@ -998,7 +999,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
isFenced = false;
unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
return null;
});
Expand Down Expand Up @@ -2210,4 +2211,14 @@ public CompactedTopic getCompactedTopic() {
public boolean isSystemTopic() {
return false;
}

private void fenceTopicToCloseOrDelete() {
isClosingOrDeleting = true;
isFenced = true;
}

private void unfenceTopicToResume() {
isFenced = false;
isClosingOrDeleting = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -868,26 +868,80 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}
}

@Test
public void testCloseTopic() throws Exception {
// create topic
PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();

Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
isFencedField.setAccessible(true);
Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
isClosingOrDeletingField.setAccessible(true);

assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

// 1. close topic
topic.close().get();
assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
assertTrue((boolean) isFencedField.get(topic));
assertTrue((boolean) isClosingOrDeletingField.get(topic));

// 2. publish message to closed topic
ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
final CountDownLatch latch = new CountDownLatch(1);
topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
assertTrue(exception instanceof BrokerServiceException.TopicFencedException);
latch.countDown();
});
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue((boolean) isFencedField.get(topic));
assertTrue((boolean) isClosingOrDeletingField.get(topic));
}

@Test
public void testDeleteTopic() throws Exception {
// create topic
PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();

Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
isFencedField.setAccessible(true);
Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
isClosingOrDeletingField.setAccessible(true);

assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

String role = "appid1";
// 1. delete inactive topic
topic.delete().get();
assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
assertTrue((boolean) isFencedField.get(topic));
assertTrue((boolean) isClosingOrDeletingField.get(topic));

// 2. delete topic with producer
// 2. publish message to deleted topic
ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
final CountDownLatch latch = new CountDownLatch(1);
topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
assertTrue(exception instanceof BrokerServiceException.TopicFencedException);
latch.countDown();
});
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue((boolean) isFencedField.get(topic));
assertTrue((boolean) isClosingOrDeletingField.get(topic));

// 3. delete topic with producer
topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false);
topic.addProducer(producer);

assertTrue(topic.delete().isCompletedExceptionally());
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));
topic.removeProducer(producer);

// 3. delete topic with subscriber
// 4. delete topic with subscriber
CommandSubscribe cmd = CommandSubscribe.newBuilder().setConsumerId(1).setTopic(successTopicName)
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();

Expand All @@ -897,6 +951,8 @@ public void testDeleteTopic() throws Exception {
f1.get();

assertTrue(topic.delete().isCompletedExceptionally());
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));
topic.unsubscribe(successSubName);
}

Expand Down Expand Up @@ -1146,6 +1202,14 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
any(OpenCursorCallback.class), any());

doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((CloseCallback) invocationOnMock.getArguments()[0]).closeComplete(null);
return null;
}
}).when(ledgerMock).asyncClose(any(CloseCallback.class), any());

// call deleteLedgerComplete on ledger asyncDelete
doAnswer(new Answer<Object>() {
@Override
Expand Down

0 comments on commit 42f85ca

Please sign in to comment.