Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Fix connection is not closed when broker closes the connection to proxy #15009

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Apr 3, 2022

Motivation

Motivation

Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
AuthResponse might be sent by proxy, which leads to a disconnection
from broker side. See

Then, proxy will return a ServiceNotReady error to client as the
result of topic lookup. See

clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, t.getMessage());
proxyConnection.ctx().writeAndFlush(
Commands.newLookupErrorResponse(ServerError.ServiceNotReady, t.getMessage(), clientRequestId));

clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
t.getMessage(), t);
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
t.getMessage(), clientRequestId));

However, in this case, C++ client only completes the future of lookup
with ResultConnectError. The ClientConnection object is still cached
in the pool and will be used for followed interactions.

Modifications

Like what Java client does, add checkServerError method to
ClientConnection, which closes the socket for ServiceNotReady error.
Here we don't call close directly just not to interrupt the execution
of handleIncomingCommand. close will be called in handleRead
because the err will be boost::asio::error::bad_descriptor.

Verifying this change

It's hard to mock the ServiceNotReady case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):

2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]

We can see a new connection was established after handling the
ServiceUnitNotReady error (port from 50660 to 51081).

@BewareMyPower BewareMyPower added type/bug The PR fixed a bug or issue reported a bug component/client-c++ doc-not-needed Your PR changes do not impact docs release/2.9.3 release/2.8.4 release/2.10.1 labels Apr 3, 2022
@BewareMyPower BewareMyPower added this to the 2.11.0 milestone Apr 3, 2022
@BewareMyPower BewareMyPower requested review from merlimat and tuteng April 3, 2022 05:58
@BewareMyPower BewareMyPower self-assigned this Apr 3, 2022
@BewareMyPower BewareMyPower changed the title [C++] Fix connection is not closed when broker closes the connection to proxy [WIP][C++] Fix connection is not closed when broker closes the connection to proxy Apr 3, 2022
@BewareMyPower BewareMyPower marked this pull request as draft April 3, 2022 10:30
…to proxy

### Motivation

### Motivation

Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687

Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. See

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247

However, in this case, C++ client only completes the future of lookup
with `ResultConnectError`. The `ClientConnection` object is still cached
in the pool and will be used for followed interactions.

### Modifications

Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady` error.
Here we don't call `close` directly just not to interrupt the execution
of `handleIncomingCommand`. `close` will be called in `handleRead`
because the `err` will be `boost::asio::error::bad_descriptor`.

### Verifying this change

It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):

```
2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
```

We can see a new connection was established after handling the
`ServiceUnitNotReady` error (port from 50660 to 51081).
@BewareMyPower BewareMyPower force-pushed the bewaremypower/fix-token-not-refreshed branch from 2acbd4b to 5e22e53 Compare April 3, 2022 11:09
@BewareMyPower BewareMyPower marked this pull request as ready for review April 3, 2022 11:10
@BewareMyPower BewareMyPower changed the title [WIP][C++] Fix connection is not closed when broker closes the connection to proxy [C++] Fix connection is not closed when broker closes the connection to proxy Apr 3, 2022
@BewareMyPower
Copy link
Contributor Author

In addition to the test, I tried to schedule reconnection to a remote endpoint periodically.

    Producer producer;
    auto result = client.createProducer("topic-1", producer);
    ASSERT_EQ(result, ResultOk);

    for (int i = 0; i < 100000; i++) {
        auto state = PulsarFriend::reconnection(producer);
        LOG_INFO(i << " reconnection: " << state);
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

The helper method in PulsarFriend is

    static HandlerBase::State reconnection(Producer producer) {
        auto impl = std::static_pointer_cast<ProducerImpl>(producer.impl_);
        impl->connection_.reset();
        impl->state_ = HandlerBase::State::Pending;  
        HandlerBase::scheduleReconnection(std::static_pointer_cast<HandlerBase>(impl));
        for (int i = 0; i < 1000; i++) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            std::lock_guard<std::mutex> lock(impl->mutex_);
            if (impl->state_ == HandlerBase::State::Ready) {
                break;
            }
        }
        std::lock_guard<std::mutex> lock(impl->mutex_);
        return impl->state_;
    }

@BewareMyPower BewareMyPower merged commit 8c7e1d9 into apache:master Apr 4, 2022
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-token-not-refreshed branch April 5, 2022 13:58
BewareMyPower added a commit that referenced this pull request Apr 5, 2022
…to proxy (#15009)

### Motivation

### Motivation

Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687

Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. See

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247

However, in this case, C++ client only completes the future of lookup
with `ResultConnectError`. The `ClientConnection` object is still cached
in the pool and will be used for followed interactions.

### Modifications

Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady` error.
Here we don't call `close` directly just not to interrupt the execution
of `handleIncomingCommand`. `close` will be called in `handleRead`
because the `err` will be `boost::asio::error::bad_descriptor`.

### Verifying this change

It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):

```
2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
```

We can see a new connection was established after handling the
`ServiceUnitNotReady` error (port from 50660 to 51081).

(cherry picked from commit 8c7e1d9)
BewareMyPower added a commit that referenced this pull request Apr 5, 2022
…to proxy (#15009)

### Motivation

### Motivation

Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687

Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. See

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247

However, in this case, C++ client only completes the future of lookup
with `ResultConnectError`. The `ClientConnection` object is still cached
in the pool and will be used for followed interactions.

### Modifications

Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady` error.
Here we don't call `close` directly just not to interrupt the execution
of `handleIncomingCommand`. `close` will be called in `handleRead`
because the `err` will be `boost::asio::error::bad_descriptor`.

### Verifying this change

It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):

```
2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
```

We can see a new connection was established after handling the
`ServiceUnitNotReady` error (port from 50660 to 51081).

(cherry picked from commit 8c7e1d9)
@BewareMyPower BewareMyPower added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Apr 5, 2022
BewareMyPower added a commit that referenced this pull request Apr 5, 2022
…to proxy (#15009)

### Motivation

### Motivation

Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687

Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. See

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247

However, in this case, C++ client only completes the future of lookup
with `ResultConnectError`. The `ClientConnection` object is still cached
in the pool and will be used for followed interactions.

### Modifications

Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady` error.
Here we don't call `close` directly just not to interrupt the execution
of `handleIncomingCommand`. `close` will be called in `handleRead`
because the `err` will be `boost::asio::error::bad_descriptor`.

### Verifying this change

It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):

```
2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
```

We can see a new connection was established after handling the
`ServiceUnitNotReady` error (port from 50660 to 51081).

(cherry picked from commit 8c7e1d9)
@BewareMyPower BewareMyPower added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Apr 5, 2022
Lannnnh pushed a commit to Lannnnh/pulsar that referenced this pull request Apr 6, 2022
…to proxy (apache#15009)

### Motivation

### Motivation

Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687

Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. See

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247

However, in this case, C++ client only completes the future of lookup
with `ResultConnectError`. The `ClientConnection` object is still cached
in the pool and will be used for followed interactions.

### Modifications

Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady` error.
Here we don't call `close` directly just not to interrupt the execution
of `handleIncomingCommand`. `close` will be called in `handleRead`
because the `err` will be `boost::asio::error::bad_descriptor`.

### Verifying this change

It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):

```
2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
```

We can see a new connection was established after handling the
`ServiceUnitNotReady` error (port from 50660 to 51081).
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
…to proxy (apache#15009)

### Motivation

### Motivation

Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687

Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. See

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154

https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247

However, in this case, C++ client only completes the future of lookup
with `ResultConnectError`. The `ClientConnection` object is still cached
in the pool and will be used for followed interactions.

### Modifications

Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady` error.
Here we don't call `close` directly just not to interrupt the execution
of `handleIncomingCommand`. `close` will be called in `handleRead`
because the `err` will be `boost::asio::error::bad_descriptor`.

### Verifying this change

It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):

```
2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
```

We can see a new connection was established after handling the
`ServiceUnitNotReady` error (port from 50660 to 51081).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 doc-not-needed Your PR changes do not impact docs release/2.8.4 release/2.9.3 release/2.10.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants