Skip to content

Commit 711aaaf

Browse files
committed
Fix creating producer or consumer is not retried for connection failure
Fixes #391 ### Motivation When `connectionFailed` is called, no matter if the result is retryable the creation of producer or consumer will fail without retry. ### Modifications Check if the result is retryable in `connectionFailed` for `ProducerImpl` and `ConsumerImpl` and only fail for non-retryable errors or the timeout error. Register another timer in `HandlerBase` to propagate the timeout error to `connectionFailed`. Add `testRetryUntilSucceed`, `testRetryTimeout`, `testNoRetry` to verify client could retry according to the result returned by `ClientImpl::getConnection`. On the other handle, check all `close()` calls in `ClientConnection` and pass the correct result. For example, a handshake error should be retryable so we must call `close(ResultRetryable)`.
1 parent c7e53ac commit 711aaaf

12 files changed

+197
-33
lines changed

Diff for: lib/ClientConnection.cc

+14-19
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ ClientConnection::~ClientConnection() {
278278
void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) {
279279
if (!cmdConnected.has_server_version()) {
280280
LOG_ERROR(cnxString_ << "Server version is not set");
281-
close();
281+
close(ResultUnknownError);
282282
return;
283283
}
284284

@@ -451,7 +451,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
451451
Url service_url;
452452
if (!Url::parse(physicalAddress_, service_url)) {
453453
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
454-
close();
454+
close(ResultInvalidUrl);
455455
return;
456456
}
457457
}
@@ -489,12 +489,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
489489
}
490490
});
491491
} else {
492-
if (err == ASIO::error::operation_aborted) {
493-
// TCP connect timeout, which is not retryable
494-
close();
495-
} else {
496-
close(ResultRetryable);
497-
}
492+
close(ResultRetryable);
498493
}
499494
} else {
500495
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
@@ -504,8 +499,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
504499

505500
void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
506501
if (err) {
507-
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
508-
close();
502+
LOG_WARN(cnxString_ << "Handshake failed: " << err.message());
503+
close(ResultRetryable);
509504
return;
510505
}
511506

@@ -532,7 +527,7 @@ void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const Shar
532527
}
533528
if (err) {
534529
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
535-
close();
530+
close(ResultRetryable);
536531
return;
537532
}
538533

@@ -546,7 +541,7 @@ void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const Share
546541
}
547542
if (err) {
548543
LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message());
549-
close();
544+
close(ResultRetryable);
550545
return;
551546
}
552547
}
@@ -567,14 +562,14 @@ void ClientConnection::tcpConnectAsync() {
567562
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
568563
if (!Url::parse(hostUrl, service_url)) {
569564
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
570-
close();
565+
close(ResultInvalidUrl);
571566
return;
572567
}
573568

574569
if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") {
575570
LOG_ERROR(cnxString_ << "Invalid Url protocol '" << service_url.protocol()
576571
<< "'. Valid values are 'pulsar' and 'pulsar+ssl'");
577-
close();
572+
close(ResultInvalidUrl);
578573
return;
579574
}
580575

@@ -592,8 +587,8 @@ void ClientConnection::tcpConnectAsync() {
592587
void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
593588
if (err) {
594589
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
595-
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
596-
close();
590+
LOG_WARN(hostUrl << "Resolve error: " << err << " : " << err.message());
591+
close(ResultRetryable);
597592
return;
598593
}
599594

@@ -629,8 +624,8 @@ void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::itera
629624
}
630625
});
631626
} else {
632-
LOG_WARN(cnxString_ << "No IP address found");
633-
close();
627+
LOG_ERROR(cnxString_ << "No IP address found");
628+
close(ResultConnectError);
634629
return;
635630
}
636631
}
@@ -885,7 +880,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
885880
// Handle Pulsar Connected
886881
if (incomingCmd.type() != BaseCommand::CONNECTED) {
887882
// Wrong cmd
888-
close();
883+
close(ResultRetryable);
889884
} else {
890885
handlePulsarConnected(incomingCmd.connected());
891886
}

Diff for: lib/ClientConnection.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
157157
*
158158
* `detach` should only be false when the connection pool is closed.
159159
*/
160-
void close(Result result = ResultConnectError, bool detach = true);
160+
void close(Result result, bool detach = true);
161161

162162
bool isClosed() const;
163163

Diff for: lib/ClientImpl.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517517
}
518518
}
519519

520-
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
520+
GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
521521
Promise<Result, ClientConnectionPtr> promise;
522522

523523
const auto topicNamePtr = TopicName::get(topic);
@@ -562,7 +562,7 @@ const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddr
562562
}
563563
}
564564

565-
Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string& logicalAddress, size_t key) {
565+
GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
566566
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
567567
Promise<Result, ClientConnectionPtr> promise;
568568
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)

Diff for: lib/ClientImpl.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,14 @@ class TopicName;
6363
using TopicNamePtr = std::shared_ptr<TopicName>;
6464

6565
using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
66+
using GetConnectionFuture = Future<Result, ClientConnectionPtr>;
6667

6768
std::string generateRandomName();
6869

6970
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
7071
public:
7172
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
72-
~ClientImpl();
73+
virtual ~ClientImpl();
7374

7475
/**
7576
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
@@ -95,9 +96,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9596

9697
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9798

98-
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
99+
// Use virtual method to test
100+
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
99101

100-
Future<Result, ClientConnectionPtr> connect(const std::string& logicalAddress, size_t key);
102+
GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
101103

102104
void closeAsync(CloseCallback callback);
103105
void shutdown();

Diff for: lib/ConsumerImpl.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ void ConsumerImpl::connectionFailed(Result result) {
277277
// Keep a reference to ensure object is kept alive
278278
auto ptr = get_shared_this_ptr();
279279

280-
if (consumerCreatedPromise_.setFailed(result)) {
280+
if (!isResultRetryable(result) && consumerCreatedPromise_.setFailed(result)) {
281281
state_ = Failed;
282282
}
283283
}

Diff for: lib/Future.h

+9
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ class Future {
116116

117117
Result get(Type &result) { return state_->get(result); }
118118

119+
static Future<Result, Type> failed(Result result);
120+
119121
private:
120122
InternalStatePtr<Result, Type> state_;
121123

@@ -144,6 +146,13 @@ class Promise {
144146
InternalStatePtr<Result, Type> state_;
145147
};
146148

149+
template <typename Result, typename Type>
150+
inline Future<Result, Type> Future<Result, Type>::failed(Result result) {
151+
Promise<Result, Type> promise;
152+
promise.setFailed(result);
153+
return promise.getFuture();
154+
}
155+
147156
} // namespace pulsar
148157

149158
#endif

Diff for: lib/HandlerBase.cc

+17-2
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,31 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
4242
backoff_(backoff),
4343
epoch_(0),
4444
timer_(executor_->createDeadlineTimer()),
45+
creationTimer_(executor_->createDeadlineTimer()),
4546
reconnectionPending_(false) {}
4647

47-
HandlerBase::~HandlerBase() { timer_->cancel(); }
48+
HandlerBase::~HandlerBase() {
49+
ASIO_ERROR ignored;
50+
timer_->cancel(ignored);
51+
creationTimer_->cancel(ignored);
52+
}
4853

4954
void HandlerBase::start() {
5055
// guard against concurrent state changes such as closing
5156
State state = NotStarted;
5257
if (state_.compare_exchange_strong(state, Pending)) {
5358
grabCnx();
5459
}
60+
creationTimer_->expires_from_now(operationTimeut_);
61+
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
62+
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
63+
auto self = weakSelf.lock();
64+
if (self && !error) {
65+
connectionFailed(ResultTimeout);
66+
ASIO_ERROR ignored;
67+
timer_->cancel(ignored);
68+
}
69+
});
5570
}
5671

5772
ClientConnectionWeakPtr HandlerBase::getCnx() const {
@@ -96,7 +111,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
96111
ClientImplPtr client = client_.lock();
97112
if (!client) {
98113
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
99-
connectionFailed(ResultConnectError);
114+
connectionFailed(ResultAlreadyClosed);
100115
reconnectionPending_ = false;
101116
return;
102117
}

Diff for: lib/HandlerBase.h

+1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
140140

141141
private:
142142
DeadlineTimerPtr timer_;
143+
DeadlineTimerPtr creationTimer_;
143144

144145
mutable std::mutex connectionMutex_;
145146
std::atomic<bool> reconnectionPending_;

Diff for: lib/ProducerImpl.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ void ProducerImpl::connectionFailed(Result result) {
176176
// if producers are lazy, then they should always try to restart
177177
// so don't change the state and allow reconnections
178178
return;
179-
} else if (producerCreatedPromise_.setFailed(result)) {
179+
} else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(result)) {
180180
state_ = Failed;
181181
}
182182
}

Diff for: tests/BasicEndToEndTest.cc

+3-3
Original file line numberDiff line numberDiff line change
@@ -1245,7 +1245,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
12451245
std::this_thread::sleep_for(std::chrono::seconds(1));
12461246
} while (!clientConnectionPtr);
12471247
oldConnections.push_back(clientConnectionPtr);
1248-
clientConnectionPtr->close();
1248+
clientConnectionPtr->close(ResultConnectError);
12491249
}
12501250
LOG_INFO("checking message " << i);
12511251
ASSERT_EQ(producer.send(msg), ResultOk);
@@ -1264,7 +1264,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
12641264
ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(cImpl);
12651265
ClientConnectionPtr clientConnectionPtr = clientConnectionWeakPtr.lock();
12661266
oldConnections.push_back(clientConnectionPtr);
1267-
clientConnectionPtr->close();
1267+
clientConnectionPtr->close(ResultConnectError);
12681268

12691269
while (consumer.receive(msg, 30000) == ResultOk) {
12701270
consumer.acknowledge(msg);
@@ -1316,7 +1316,7 @@ void testHandlerReconnectionPartitionProducers(bool lazyStartPartitionedProducer
13161316
std::this_thread::sleep_for(std::chrono::seconds(1));
13171317
} while (!clientConnectionPtr);
13181318
oldConnections.push_back(clientConnectionPtr);
1319-
clientConnectionPtr->close();
1319+
clientConnectionPtr->close(ResultConnectError);
13201320
}
13211321
ASSERT_EQ(producer.send(msg), ResultOk);
13221322
}

Diff for: tests/ClientTest.cc

+68-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <future>
2626
#include <sstream>
2727

28+
#include "MockClientImpl.h"
2829
#include "PulsarAdminHelper.h"
2930
#include "PulsarFriend.h"
3031
#include "WaitUtils.h"
@@ -36,6 +37,7 @@
3637
DECLARE_LOG_OBJECT()
3738

3839
using namespace pulsar;
40+
using testing::AtLeast;
3941

4042
static std::string lookupUrl = "pulsar://localhost:6650";
4143
static std::string adminUrl = "http://localhost:8080/";
@@ -410,7 +412,7 @@ TEST(ClientTest, testConnectionClose) {
410412
auto executor = PulsarFriend::getExecutor(*cnx);
411413
// Simulate the close() happens in the event loop
412414
executor->postWork([cnx, &client, numConnections] {
413-
cnx->close();
415+
cnx->close(ResultConnectError);
414416
ASSERT_EQ(PulsarFriend::getConnections(client).size(), numConnections - 1);
415417
LOG_INFO("Connection refcnt: " << cnx.use_count() << " after close");
416418
});
@@ -434,3 +436,68 @@ TEST(ClientTest, testConnectionClose) {
434436
client.close();
435437
}
436438
}
439+
440+
TEST(ClientTest, testRetryUntilSucceed) {
441+
auto clientImpl = std::make_shared<MockClientImpl>(lookupUrl);
442+
constexpr int kFailCount = 3;
443+
EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2);
444+
std::atomic_int count{0};
445+
ON_CALL(*clientImpl, getConnection)
446+
.WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) {
447+
if (count++ < kFailCount) {
448+
return GetConnectionFuture::failed(ResultRetryable);
449+
}
450+
return clientImpl->getConnectionReal(topic, index);
451+
});
452+
453+
auto topic = "client-test-retry-until-succeed";
454+
ASSERT_EQ(ResultOk, clientImpl->createProducer(topic).result);
455+
count = 0;
456+
ASSERT_EQ(ResultOk, clientImpl->subscribe(topic).result);
457+
ASSERT_EQ(ResultOk, clientImpl->close());
458+
}
459+
460+
TEST(ClientTest, testRetryTimeout) {
461+
auto clientImpl =
462+
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2));
463+
EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2));
464+
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) {
465+
return GetConnectionFuture::failed(ResultRetryable);
466+
});
467+
468+
auto topic = "client-test-retry-timeout";
469+
{
470+
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
471+
ASSERT_EQ(ResultTimeout, result.result);
472+
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "producer: " << result.timeMs << " ms";
473+
}
474+
{
475+
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
476+
ASSERT_EQ(ResultTimeout, result.result);
477+
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "consumer: " << result.timeMs << " ms";
478+
}
479+
480+
ASSERT_EQ(ResultOk, clientImpl->close());
481+
}
482+
483+
TEST(ClientTest, testNoRetry) {
484+
auto clientImpl =
485+
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100));
486+
EXPECT_CALL(*clientImpl, getConnection).Times(2);
487+
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) {
488+
return GetConnectionFuture::failed(ResultAuthenticationError);
489+
});
490+
491+
auto topic = "client-test-no-retry";
492+
{
493+
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
494+
ASSERT_EQ(ResultAuthenticationError, result.result);
495+
ASSERT_TRUE(result.timeMs < 1000) << "producer: " << result.timeMs << " ms";
496+
}
497+
{
498+
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
499+
LOG_INFO("It takes " << result.timeMs << " ms to subscribe");
500+
ASSERT_EQ(ResultAuthenticationError, result.result);
501+
ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms";
502+
}
503+
}

0 commit comments

Comments
 (0)