Skip to content

Commit

Permalink
[C++] Fix connection is not closed when broker closes the connection …
Browse files Browse the repository at this point in the history
…to proxy (apache#15009)

### Motivation

### Motivation

Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687

Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. See

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247

However, in this case, C++ client only completes the future of lookup
with `ResultConnectError`. The `ClientConnection` object is still cached
in the pool and will be used for followed interactions.

### Modifications

Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady` error.
Here we don't call `close` directly just not to interrupt the execution
of `handleIncomingCommand`. `close` will be called in `handleRead`
because the `err` will be `boost::asio::error::bad_descriptor`.

### Verifying this change

It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):

```
2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
```

We can see a new connection was established after handling the
`ServiceUnitNotReady` error (port from 50660 to 51081).
  • Loading branch information
BewareMyPower authored and haiilan committed Apr 6, 2022
1 parent 682a7ea commit e0163a3
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
48 changes: 37 additions & 11 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -910,13 +910,16 @@ void ClientConnection::handleIncomingCommand() {
if (partitionMetadataResponse.has_error()) {
LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id()
<< " error: " << partitionMetadataResponse.error());
<< " error: " << partitionMetadataResponse.error()
<< " msg: " << partitionMetadataResponse.message());
checkServerError(partitionMetadataResponse.error());
lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error()));
} else {
LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id()
<< " with empty response: ");
lookupDataPromise->setFailed(ResultConnectError);
}
lookupDataPromise->setFailed(ResultConnectError);
} else {
LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>();
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
Expand Down Expand Up @@ -994,13 +997,16 @@ void ClientConnection::handleIncomingCommand() {
if (lookupTopicResponse.has_error()) {
LOG_ERROR(cnxString_
<< "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " error: " << lookupTopicResponse.error());
<< " error: " << lookupTopicResponse.error()
<< " msg: " << lookupTopicResponse.message());
checkServerError(lookupTopicResponse.error());
lookupDataPromise->setFailed(getResult(lookupTopicResponse.error()));
} else {
LOG_ERROR(cnxString_
<< "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " with empty response: ");
lookupDataPromise->setFailed(ResultConnectError);
}
lookupDataPromise->setFailed(ResultConnectError);
} else {
LOG_DEBUG(cnxString_
<< "Received lookup response from server. req_id: "
Expand Down Expand Up @@ -1511,15 +1517,10 @@ void ClientConnection::close(Result result) {
return;
}
state_ = Disconnected;
boost::system::error_code err;
if (socket_) {
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}

closeSocket();
if (tlsSocket_) {
boost::system::error_code err;
tlsSocket_->lowest_layer().close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
Expand Down Expand Up @@ -1664,4 +1665,29 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con
return promise.getFuture();
}

void ClientConnection::closeSocket() {
boost::system::error_code err;
if (socket_) {
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
}

void ClientConnection::checkServerError(const proto::ServerError& error) {
switch (error) {
case proto::ServerError::ServiceNotReady:
closeSocket();
break;
case proto::ServerError::TooManyRequests:
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
// https://github.com/apache/pulsar/pull/274
closeSocket();
break;
default:
break;
}
}

} // namespace pulsar
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
friend class PulsarFriend;

bool isTlsAllowInsecureConnection_ = false;

void closeSocket();
void checkServerError(const proto::ServerError& error);
};
} // namespace pulsar

Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/tests/AuthTokenTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ TEST(AuthPluginToken, testNoAuth) {

Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultConnectError, result);
ASSERT_EQ(ResultAuthorizationError, result);

Consumer consumer;
result = client.subscribe(topicName, subName, consumer);
ASSERT_EQ(ResultConnectError, result);
ASSERT_EQ(ResultAuthorizationError, result);
}

TEST(AuthPluginToken, testNoAuthWithHttp) {
Expand Down

0 comments on commit e0163a3

Please sign in to comment.