Skip to content

Commit

Permalink
Cleanup already deleted namespace topics. (#12597)
Browse files Browse the repository at this point in the history
Cherry pick from #7473.

#7473 has fix the `Cleanup already deleted namespace topics` issue, but with #8129 involved, changes have been 
changed back.

### Motivation
We are having frequent issues when user removes cluster from the global namespace where broker from removed-cluster fails to unload topic and namespace bundle still loaded with the broker. It happens when broker from removed-cluster receives below error
```
17:38:52.199 [pulsar-io-22-28] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://prop/global/ns/tp1][east -> west] Failed to close dispatch rate limiter: org.apache.pulsar.client.api.PulsarClientException: Producer was not registered on the connection
:
17:38:52.199 [pulsar-io-22-28] WARN  org.apache.pulsar.broker.service.AbstractReplicator - [persistent://prop/global/ns/tp1][east -> west]] Exception: 'org.apache.pulsar.client.api.PulsarClientException: Producer was not registered on the connection' occured while trying to close the producer. retrying again in 0.1 s
:
17:38:52.351 [pulsar-io-22-37] ERROR org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://prop/global/ns/tp1] Error closing topic
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: Producer was not registered on the connection
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
```

### Modification
Source Broker should return explicit error-code when producer is already closed and dest-broker from removed-cluster should handle this error and clean up the replicator and topic gracefully.
  • Loading branch information
Technoboy- authored Nov 3, 2021
1 parent 22d6537 commit 6b3fb41
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1579,9 +1579,8 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {

CompletableFuture<Producer> producerFuture = producers.get(producerId);
if (producerFuture == null) {
log.warn("[{}] Producer was not registered on the connection. producerId={}", remoteAddress, producerId);
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Producer was not registered on the connection");
log.info("[{}] Producer {} was not registered on the connection", remoteAddress, producerId);
ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}

Expand Down Expand Up @@ -1626,8 +1625,8 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {

CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture == null) {
log.warn("[{}] Consumer was not registered on the connection: consumerId={}", remoteAddress, consumerId);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
log.info("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress);
ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}

Expand Down

0 comments on commit 6b3fb41

Please sign in to comment.