Skip to content

Commit

Permalink
Do not close the socket when the broker failed to acquire ownership f…
Browse files Browse the repository at this point in the history
…or namespace bundle

### Motivation

When the broker failed to acquire the ownership of a namespace bundle by
`LockBusyException`. It means there is another broker that has acquired
the metadata store path and didn't release that path. For example:

Broker 1:

```
2024-01-24T23:35:36,626+0000 [metadata-store-10-1] WARN  org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup <role> for topic persistent://<tenant>/<ns>/<topic> with error org.apache.pulsar.broker.PulsarServerException: Failed to acquire ownership for namespace bundle <tenant>/<ns>/0x50000000_0x51000000
   Caused by: java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException$LockBusyException: Resource at /namespace/<tenant>/<ns>/0x50000000_0x51000000 is already locked
```

Broker 2:

```
2024-01-24T23:35:36,650+0000 [broker-topic-workers-OrderedExecutor-3-0] INFO  org.apache.pulsar.broker.PulsarService - Loaded 1 topics on <tenant>/<ns>/0x50000000_0x51000000 -- time taken: 0.044 seconds
```

After broker 2 released the lock at 23:35:36,650, the lookup request to
broker 1 should tell the client that namespace bundle
0x50000000_0x51000000 is currently being unloaded and in the next retry
the client will connect to the new owner broker.

Here is another typical error:

```
2024-01-24T23:57:57,264+0000 [pulsar-io-4-5] INFO  org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup <role> for topic persistent://<tenant>/<ns>/<topic> with error Namespace bundle <tenant>/<ns>/0x0d000000_0x0e000000 is being unloaded
```

Though after apache/pulsar#21211, the server
error becomes `MetadataError` rather than `ServiceNotReady`.

However, since the `ServerError` is `ServiceNotReady`, the client will
close the connection. If there are many other producers or consumers on
the same connection, they will all reestablish connection to the broker,
which is unnecessary and brings much pressure to broker side.

### Modifications

In `checkServerError`, when the error code is `ServiceNotReady`, check
the error message as well, if it hit the case in `handleLookupError`, do
not close the connection.
  • Loading branch information
BewareMyPower committed Jan 30, 2024
1 parent d1dd08b commit 7237831
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
13 changes: 9 additions & 4 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1469,10 +1469,15 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
return promise.getFuture();
}

void ClientConnection::checkServerError(ServerError error) {
void ClientConnection::checkServerError(ServerError error, const std::string& message) {
switch (error) {
case proto::ServerError::ServiceNotReady:
close(ResultDisconnected);
// See
// https://github.com/apache/pulsar/blob/1952f94769d9dc80908d159be6e6ce1ff48b83fb/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java#L334
// These errors should be retryable and we should not close the connection
if (message.find("Failed to lookup") != std::string::npos) {
close(ResultDisconnected);
}
break;
case proto::ServerError::TooManyRequests:
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
Expand Down Expand Up @@ -1573,7 +1578,7 @@ void ClientConnection::handlePartitionedMetadataResponse(
<< partitionMetadataResponse.request_id()
<< " error: " << partitionMetadataResponse.error()
<< " msg: " << partitionMetadataResponse.message());
checkServerError(partitionMetadataResponse.error());
checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message());
lookupDataPromise->setFailed(
getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message()));
} else {
Expand Down Expand Up @@ -1650,7 +1655,7 @@ void ClientConnection::handleLookupTopicRespose(
LOG_ERROR(cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " error: " << lookupTopicResponse.error()
<< " msg: " << lookupTopicResponse.message());
checkServerError(lookupTopicResponse.error());
checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message());
lookupDataPromise->setFailed(
getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
} else {
Expand Down
2 changes: 1 addition & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

friend class PulsarFriend;

void checkServerError(ServerError error);
void checkServerError(ServerError error, const std::string& message);

void handleSendReceipt(const proto::CommandSendReceipt&);
void handleSendError(const proto::CommandSendError&);
Expand Down

0 comments on commit 7237831

Please sign in to comment.