Skip to content

Commit 60f262b

Browse files
committed
Fix creating producer or consumer is not retried for connection failure
Fixes #391 ### Motivation When `connectionFailed` is called, no matter if the result is retryable the creation of producer or consumer will fail without retry. ### Modifications Check if the result is retryable in `connectionFailed` for `ProducerImpl` and `ConsumerImpl` and only fail for non-retryable errors or the timeout error. Register another timer in `HandlerBase` to propagate the timeout error to `connectionFailed`. Add `testRetryUntilSucceed`, `testRetryTimeout`, `testNoRetry` to verify client could retry according to the result returned by `ClientImpl::getConnection`.
1 parent 1f94dd9 commit 60f262b

9 files changed

+178
-9
lines changed

lib/ClientImpl.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517517
}
518518
}
519519

520-
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
520+
GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
521521
Promise<Result, ClientConnectionPtr> promise;
522522

523523
const auto topicNamePtr = TopicName::get(topic);
@@ -562,7 +562,7 @@ const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddr
562562
}
563563
}
564564

565-
Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string& logicalAddress, size_t key) {
565+
GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
566566
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
567567
Promise<Result, ClientConnectionPtr> promise;
568568
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)

lib/ClientImpl.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,14 @@ class TopicName;
6363
using TopicNamePtr = std::shared_ptr<TopicName>;
6464

6565
using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
66+
using GetConnectionFuture = Future<Result, ClientConnectionPtr>;
6667

6768
std::string generateRandomName();
6869

6970
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
7071
public:
7172
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
72-
~ClientImpl();
73+
virtual ~ClientImpl();
7374

7475
/**
7576
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
@@ -95,9 +96,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9596

9697
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9798

98-
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
99+
// Use virtual method to test
100+
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
99101

100-
Future<Result, ClientConnectionPtr> connect(const std::string& logicalAddress, size_t key);
102+
GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
101103

102104
void closeAsync(CloseCallback callback);
103105
void shutdown();

lib/ConsumerImpl.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ void ConsumerImpl::connectionFailed(Result result) {
277277
// Keep a reference to ensure object is kept alive
278278
auto ptr = get_shared_this_ptr();
279279

280-
if (consumerCreatedPromise_.setFailed(result)) {
280+
if (!isResultRetryable(result) && consumerCreatedPromise_.setFailed(result)) {
281281
state_ = Failed;
282282
}
283283
}

lib/Future.h

+9
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ class Future {
116116

117117
Result get(Type &result) { return state_->get(result); }
118118

119+
static Future<Result, Type> failed(Result result);
120+
119121
private:
120122
InternalStatePtr<Result, Type> state_;
121123

@@ -144,6 +146,13 @@ class Promise {
144146
InternalStatePtr<Result, Type> state_;
145147
};
146148

149+
template <typename Result, typename Type>
150+
inline Future<Result, Type> Future<Result, Type>::failed(Result result) {
151+
Promise<Result, Type> promise;
152+
promise.setFailed(result);
153+
return promise.getFuture();
154+
}
155+
147156
} // namespace pulsar
148157

149158
#endif

lib/HandlerBase.cc

+17-2
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,31 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
4242
backoff_(backoff),
4343
epoch_(0),
4444
timer_(executor_->createDeadlineTimer()),
45+
creationTimer_(executor_->createDeadlineTimer()),
4546
reconnectionPending_(false) {}
4647

47-
HandlerBase::~HandlerBase() { timer_->cancel(); }
48+
HandlerBase::~HandlerBase() {
49+
ASIO_ERROR ignored;
50+
timer_->cancel(ignored);
51+
creationTimer_->cancel(ignored);
52+
}
4853

4954
void HandlerBase::start() {
5055
// guard against concurrent state changes such as closing
5156
State state = NotStarted;
5257
if (state_.compare_exchange_strong(state, Pending)) {
5358
grabCnx();
5459
}
60+
creationTimer_->expires_from_now(operationTimeut_);
61+
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
62+
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
63+
auto self = weakSelf.lock();
64+
if (self && !error) {
65+
connectionFailed(ResultTimeout);
66+
ASIO_ERROR ignored;
67+
timer_->cancel(ignored);
68+
}
69+
});
5570
}
5671

5772
ClientConnectionWeakPtr HandlerBase::getCnx() const {
@@ -96,7 +111,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
96111
ClientImplPtr client = client_.lock();
97112
if (!client) {
98113
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
99-
connectionFailed(ResultConnectError);
114+
connectionFailed(ResultAlreadyClosed);
100115
reconnectionPending_ = false;
101116
return;
102117
}

lib/HandlerBase.h

+1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
140140

141141
private:
142142
DeadlineTimerPtr timer_;
143+
DeadlineTimerPtr creationTimer_;
143144

144145
mutable std::mutex connectionMutex_;
145146
std::atomic<bool> reconnectionPending_;

lib/ProducerImpl.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ void ProducerImpl::connectionFailed(Result result) {
176176
// if producers are lazy, then they should always try to restart
177177
// so don't change the state and allow reconnections
178178
return;
179-
} else if (producerCreatedPromise_.setFailed(result)) {
179+
} else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(result)) {
180180
state_ = Failed;
181181
}
182182
}

tests/ClientTest.cc

+67
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <future>
2626
#include <sstream>
2727

28+
#include "MockClientImpl.h"
2829
#include "PulsarAdminHelper.h"
2930
#include "PulsarFriend.h"
3031
#include "WaitUtils.h"
@@ -36,6 +37,7 @@
3637
DECLARE_LOG_OBJECT()
3738

3839
using namespace pulsar;
40+
using testing::AtLeast;
3941

4042
static std::string lookupUrl = "pulsar://localhost:6650";
4143
static std::string adminUrl = "http://localhost:8080/";
@@ -434,3 +436,68 @@ TEST(ClientTest, testConnectionClose) {
434436
client.close();
435437
}
436438
}
439+
440+
TEST(ClientTest, testRetryUntilSucceed) {
441+
auto clientImpl = std::make_shared<MockClientImpl>(lookupUrl);
442+
constexpr int kFailCount = 3;
443+
EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2);
444+
std::atomic_int count{0};
445+
ON_CALL(*clientImpl, getConnection)
446+
.WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) {
447+
if (count++ < kFailCount) {
448+
return GetConnectionFuture::failed(ResultRetryable);
449+
}
450+
return clientImpl->getConnectionReal(topic, index);
451+
});
452+
453+
auto topic = "client-test-retry-until-succeed";
454+
ASSERT_EQ(ResultOk, clientImpl->createProducer(topic).result);
455+
count = 0;
456+
ASSERT_EQ(ResultOk, clientImpl->subscribe(topic).result);
457+
ASSERT_EQ(ResultOk, clientImpl->close());
458+
}
459+
460+
TEST(ClientTest, testRetryTimeout) {
461+
auto clientImpl =
462+
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2));
463+
EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2));
464+
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) {
465+
return GetConnectionFuture::failed(ResultRetryable);
466+
});
467+
468+
auto topic = "client-test-retry-timeout";
469+
{
470+
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
471+
ASSERT_EQ(ResultTimeout, result.result);
472+
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "producer: " << result.timeMs << " ms";
473+
}
474+
{
475+
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
476+
ASSERT_EQ(ResultTimeout, result.result);
477+
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "consumer: " << result.timeMs << " ms";
478+
}
479+
480+
ASSERT_EQ(ResultOk, clientImpl->close());
481+
}
482+
483+
TEST(ClientTest, testNoRetry) {
484+
auto clientImpl =
485+
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100));
486+
EXPECT_CALL(*clientImpl, getConnection).Times(2);
487+
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) {
488+
return GetConnectionFuture::failed(ResultAuthenticationError);
489+
});
490+
491+
auto topic = "client-test-no-retry";
492+
{
493+
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
494+
ASSERT_EQ(ResultAuthenticationError, result.result);
495+
ASSERT_TRUE(result.timeMs < 1000) << "producer: " << result.timeMs << " ms";
496+
}
497+
{
498+
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
499+
LOG_INFO("It takes " << result.timeMs << " ms to subscribe");
500+
ASSERT_EQ(ResultAuthenticationError, result.result);
501+
ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms";
502+
}
503+
}

tests/MockClientImpl.h

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <gmock/gmock.h>
22+
23+
#include <chrono>
24+
#include <future>
25+
26+
#include "lib/ClientImpl.h"
27+
28+
namespace pulsar {
29+
30+
class MockClientImpl : public ClientImpl {
31+
public:
32+
struct SyncOpResult {
33+
Result result;
34+
long timeMs;
35+
};
36+
MockClientImpl(const std::string& serviceUrl, ClientConfiguration conf = {})
37+
: ClientImpl(serviceUrl, conf) {}
38+
39+
MOCK_METHOD((Future<Result, ClientConnectionPtr>), getConnection, (const std::string&, size_t),
40+
(override));
41+
42+
SyncOpResult createProducer(const std::string& topic) {
43+
using namespace std::chrono;
44+
auto start = high_resolution_clock::now();
45+
std::promise<SyncOpResult> promise;
46+
createProducerAsync(topic, {}, [&start, &promise](Result result, Producer) {
47+
auto timeMs = duration_cast<milliseconds>(high_resolution_clock::now() - start).count();
48+
promise.set_value({result, timeMs});
49+
});
50+
return promise.get_future().get();
51+
}
52+
53+
SyncOpResult subscribe(const std::string& topic) {
54+
using namespace std::chrono;
55+
auto start = std::chrono::high_resolution_clock::now();
56+
std::promise<SyncOpResult> promise;
57+
subscribeAsync(topic, "sub", {}, [&start, &promise](Result result, Consumer) {
58+
auto timeMs = duration_cast<milliseconds>(high_resolution_clock::now() - start).count();
59+
promise.set_value({result, timeMs});
60+
});
61+
return promise.get_future().get();
62+
}
63+
64+
GetConnectionFuture getConnectionReal(const std::string& topic, size_t key) {
65+
return ClientImpl::getConnection(topic, key);
66+
}
67+
68+
Result close() {
69+
std::promise<Result> promise;
70+
closeAsync([&promise](Result result) { promise.set_value(result); });
71+
return promise.get_future().get();
72+
}
73+
};
74+
75+
} // namespace pulsar

0 commit comments

Comments
 (0)