Skip to content

Commit 68b4244

Browse files
Fix creating producer or consumer is not retried for connection failure (apache#396)
Fixes apache#391 ### Motivation When `connectionFailed` is called, no matter if the result is retryable the creation of producer or consumer will fail without retry. ### Modifications Check if the result is retryable in `connectionFailed` for `ProducerImpl` and `ConsumerImpl` and only fail for non-retryable errors or the timeout error. Register another timer in `HandlerBase` to propagate the timeout error to `connectionFailed`. Add `testRetryUntilSucceed`, `testRetryTimeout`, `testNoRetry` to verify client could retry according to the result returned by `ClientImpl::getConnection`.
1 parent 3983500 commit 68b4244

12 files changed

+239
-19
lines changed

lib/ClientConnection.cc

+8-3
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ static Result getResult(ServerError serverError, const std::string& message) {
8383
case ServiceNotReady:
8484
return (message.find("the broker do not have test listener") == std::string::npos)
8585
? ResultRetryable
86-
: ResultServiceUnitNotReady;
86+
: ResultConnectError;
8787

8888
case ProducerBlockedQuotaExceededError:
8989
return ResultProducerBlockedQuotaExceededError;
@@ -508,8 +508,13 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
508508

509509
void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
510510
if (err) {
511-
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
512-
close();
511+
if (err.value() == ASIO::ssl::error::stream_truncated) {
512+
LOG_WARN(cnxString_ << "Handshake failed: " << err.message());
513+
close(ResultRetryable);
514+
} else {
515+
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
516+
close();
517+
}
513518
return;
514519
}
515520

lib/ClientConnectionAdaptor.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ inline void checkServerError(Connection& connection, ServerError error, const st
3737
// "Namespace bundle ... is being unloaded"
3838
// "KeeperException$..."
3939
// "Failed to acquire ownership for namespace bundle ..."
40+
// "the broker do not have test listener"
4041
// Before https://github.com/apache/pulsar/pull/21211, the error of the 1st and 2nd messages
4142
// is ServiceNotReady. Before https://github.com/apache/pulsar/pull/21993, the error of the 3rd
4243
// message is ServiceNotReady.
4344
if (message.find("Failed to acquire ownership") == std::string::npos &&
4445
message.find("KeeperException") == std::string::npos &&
45-
message.find("is being unloaded") == std::string::npos) {
46+
message.find("is being unloaded") == std::string::npos &&
47+
message.find("the broker do not have test listener") == std::string::npos) {
4648
connection.close(ResultDisconnected);
4749
}
4850
break;

lib/ClientImpl.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
517517
}
518518
}
519519

520-
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
520+
GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
521521
Promise<Result, ClientConnectionPtr> promise;
522522

523523
const auto topicNamePtr = TopicName::get(topic);
@@ -562,7 +562,7 @@ const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddr
562562
}
563563
}
564564

565-
Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string& logicalAddress, size_t key) {
565+
GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
566566
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
567567
Promise<Result, ClientConnectionPtr> promise;
568568
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)

lib/ClientImpl.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,14 @@ class TopicName;
6363
using TopicNamePtr = std::shared_ptr<TopicName>;
6464

6565
using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
66+
using GetConnectionFuture = Future<Result, ClientConnectionPtr>;
6667

6768
std::string generateRandomName();
6869

6970
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
7071
public:
7172
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
72-
~ClientImpl();
73+
virtual ~ClientImpl();
7374

7475
/**
7576
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
@@ -95,9 +96,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9596

9697
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9798

98-
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
99+
// Use virtual method to test
100+
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
99101

100-
Future<Result, ClientConnectionPtr> connect(const std::string& logicalAddress, size_t key);
102+
GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
101103

102104
void closeAsync(CloseCallback callback);
103105
void shutdown();

lib/ConsumerImpl.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ void ConsumerImpl::connectionFailed(Result result) {
277277
// Keep a reference to ensure object is kept alive
278278
auto ptr = get_shared_this_ptr();
279279

280-
if (consumerCreatedPromise_.setFailed(result)) {
280+
if (!isResultRetryable(result) && consumerCreatedPromise_.setFailed(result)) {
281281
state_ = Failed;
282282
}
283283
}

lib/Future.h

+9
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ class Future {
116116

117117
Result get(Type &result) { return state_->get(result); }
118118

119+
static Future<Result, Type> failed(Result result);
120+
119121
private:
120122
InternalStatePtr<Result, Type> state_;
121123

@@ -144,6 +146,13 @@ class Promise {
144146
InternalStatePtr<Result, Type> state_;
145147
};
146148

149+
template <typename Result, typename Type>
150+
inline Future<Result, Type> Future<Result, Type>::failed(Result result) {
151+
Promise<Result, Type> promise;
152+
promise.setFailed(result);
153+
return promise.getFuture();
154+
}
155+
147156
} // namespace pulsar
148157

149158
#endif

lib/HandlerBase.cc

+18-3
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,31 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
4242
backoff_(backoff),
4343
epoch_(0),
4444
timer_(executor_->createDeadlineTimer()),
45+
creationTimer_(executor_->createDeadlineTimer()),
4546
reconnectionPending_(false) {}
4647

47-
HandlerBase::~HandlerBase() { timer_->cancel(); }
48+
HandlerBase::~HandlerBase() {
49+
ASIO_ERROR ignored;
50+
timer_->cancel(ignored);
51+
creationTimer_->cancel(ignored);
52+
}
4853

4954
void HandlerBase::start() {
5055
// guard against concurrent state changes such as closing
5156
State state = NotStarted;
5257
if (state_.compare_exchange_strong(state, Pending)) {
5358
grabCnx();
5459
}
60+
creationTimer_->expires_from_now(operationTimeut_);
61+
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
62+
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
63+
auto self = weakSelf.lock();
64+
if (self && !error) {
65+
connectionFailed(ResultTimeout);
66+
ASIO_ERROR ignored;
67+
timer_->cancel(ignored);
68+
}
69+
});
5570
}
5671

5772
ClientConnectionWeakPtr HandlerBase::getCnx() const {
@@ -96,7 +111,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
96111
ClientImplPtr client = client_.lock();
97112
if (!client) {
98113
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
99-
connectionFailed(ResultConnectError);
114+
connectionFailed(ResultAlreadyClosed);
100115
reconnectionPending_ = false;
101116
return;
102117
}
@@ -108,7 +123,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
108123
connectionOpened(cnx).addListener([this, self](Result result, bool) {
109124
// Do not use bool, only Result.
110125
reconnectionPending_ = false;
111-
if (isResultRetryable(result)) {
126+
if (result != ResultOk && isResultRetryable(result)) {
112127
scheduleReconnection();
113128
}
114129
});

lib/HandlerBase.h

+1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
140140

141141
private:
142142
DeadlineTimerPtr timer_;
143+
DeadlineTimerPtr creationTimer_;
143144

144145
mutable std::mutex connectionMutex_;
145146
std::atomic<bool> reconnectionPending_;

lib/ProducerImpl.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ void ProducerImpl::connectionFailed(Result result) {
176176
// if producers are lazy, then they should always try to restart
177177
// so don't change the state and allow reconnections
178178
return;
179-
} else if (producerCreatedPromise_.setFailed(result)) {
179+
} else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(result)) {
180180
state_ = Failed;
181181
}
182182
}

lib/ResultUtils.h

+28-1
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,39 @@
1818
*/
1919
#pragma once
2020

21+
#include <assert.h>
2122
#include <pulsar/Result.h>
2223

24+
#include <unordered_set>
25+
2326
namespace pulsar {
2427

2528
inline bool isResultRetryable(Result result) {
26-
return result == ResultRetryable || result == ResultDisconnected;
29+
assert(result != ResultOk);
30+
if (result == ResultRetryable || result == ResultDisconnected) {
31+
return true;
32+
}
33+
34+
static const std::unordered_set<int> fatalResults{ResultConnectError,
35+
ResultTimeout,
36+
ResultAuthenticationError,
37+
ResultAuthorizationError,
38+
ResultInvalidUrl,
39+
ResultInvalidConfiguration,
40+
ResultIncompatibleSchema,
41+
ResultTopicNotFound,
42+
ResultOperationNotSupported,
43+
ResultNotAllowedError,
44+
ResultChecksumError,
45+
ResultCryptoError,
46+
ResultConsumerAssignError,
47+
ResultProducerBusy,
48+
ResultConsumerBusy,
49+
ResultLookupError,
50+
ResultTooManyLookupRequestException,
51+
ResultProducerBlockedQuotaExceededException,
52+
ResultProducerBlockedQuotaExceededError};
53+
return fatalResults.find(static_cast<int>(result)) == fatalResults.cend();
2754
}
2855

2956
} // namespace pulsar

tests/ClientTest.cc

+71-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <future>
2626
#include <sstream>
2727

28+
#include "MockClientImpl.h"
2829
#include "PulsarAdminHelper.h"
2930
#include "PulsarFriend.h"
3031
#include "WaitUtils.h"
@@ -36,6 +37,7 @@
3637
DECLARE_LOG_OBJECT()
3738

3839
using namespace pulsar;
40+
using testing::AtLeast;
3941

4042
static std::string lookupUrl = "pulsar://localhost:6650";
4143
static std::string adminUrl = "http://localhost:8080/";
@@ -248,7 +250,7 @@ TEST(ClientTest, testWrongListener) {
248250

249251
Client client(lookupUrl, ClientConfiguration().setListenerName("test"));
250252
Producer producer;
251-
ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer));
253+
ASSERT_EQ(ResultConnectError, client.createProducer(topic, producer));
252254
ASSERT_EQ(ResultProducerNotInitialized, producer.close());
253255
ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
254256
ASSERT_EQ(ResultOk, client.close());
@@ -257,7 +259,7 @@ TEST(ClientTest, testWrongListener) {
257259
// creation of Consumer or Reader could fail with ResultConnectError.
258260
client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
259261
Consumer consumer;
260-
ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer));
262+
ASSERT_EQ(ResultConnectError, client.subscribe(topic, "sub", consumer));
261263
ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());
262264

263265
ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
@@ -266,7 +268,7 @@ TEST(ClientTest, testWrongListener) {
266268
client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
267269

268270
Consumer multiTopicsConsumer;
269-
ASSERT_EQ(ResultServiceUnitNotReady,
271+
ASSERT_EQ(ResultConnectError,
270272
client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"},
271273
"sub", multiTopicsConsumer));
272274

@@ -278,7 +280,7 @@ TEST(ClientTest, testWrongListener) {
278280

279281
// Currently Reader can only read a non-partitioned topic in C++ client
280282
Reader reader;
281-
ASSERT_EQ(ResultServiceUnitNotReady,
283+
ASSERT_EQ(ResultConnectError,
282284
client.createReader(topic + "-partition-0", MessageId::earliest(), {}, reader));
283285
ASSERT_EQ(ResultConsumerNotInitialized, reader.close());
284286
ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
@@ -434,3 +436,68 @@ TEST(ClientTest, testConnectionClose) {
434436
client.close();
435437
}
436438
}
439+
440+
TEST(ClientTest, testRetryUntilSucceed) {
441+
auto clientImpl = std::make_shared<MockClientImpl>(lookupUrl);
442+
constexpr int kFailCount = 3;
443+
EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2);
444+
std::atomic_int count{0};
445+
ON_CALL(*clientImpl, getConnection)
446+
.WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) {
447+
if (count++ < kFailCount) {
448+
return GetConnectionFuture::failed(ResultRetryable);
449+
}
450+
return clientImpl->getConnectionReal(topic, index);
451+
});
452+
453+
auto topic = "client-test-retry-until-succeed";
454+
ASSERT_EQ(ResultOk, clientImpl->createProducer(topic).result);
455+
count = 0;
456+
ASSERT_EQ(ResultOk, clientImpl->subscribe(topic).result);
457+
ASSERT_EQ(ResultOk, clientImpl->close());
458+
}
459+
460+
TEST(ClientTest, testRetryTimeout) {
461+
auto clientImpl =
462+
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2));
463+
EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2));
464+
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) {
465+
return GetConnectionFuture::failed(ResultRetryable);
466+
});
467+
468+
auto topic = "client-test-retry-timeout";
469+
{
470+
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
471+
ASSERT_EQ(ResultTimeout, result.result);
472+
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "producer: " << result.timeMs << " ms";
473+
}
474+
{
475+
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
476+
ASSERT_EQ(ResultTimeout, result.result);
477+
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "consumer: " << result.timeMs << " ms";
478+
}
479+
480+
ASSERT_EQ(ResultOk, clientImpl->close());
481+
}
482+
483+
TEST(ClientTest, testNoRetry) {
484+
auto clientImpl =
485+
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100));
486+
EXPECT_CALL(*clientImpl, getConnection).Times(2);
487+
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) {
488+
return GetConnectionFuture::failed(ResultAuthenticationError);
489+
});
490+
491+
auto topic = "client-test-no-retry";
492+
{
493+
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
494+
ASSERT_EQ(ResultAuthenticationError, result.result);
495+
ASSERT_TRUE(result.timeMs < 1000) << "producer: " << result.timeMs << " ms";
496+
}
497+
{
498+
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
499+
LOG_INFO("It takes " << result.timeMs << " ms to subscribe");
500+
ASSERT_EQ(ResultAuthenticationError, result.result);
501+
ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms";
502+
}
503+
}

0 commit comments

Comments
 (0)