Skip to content

Commit c59a2f3

Browse files
committed
Revert unrelated changes
1 parent c4d1473 commit c59a2f3

File tree

4 files changed

+22
-17
lines changed

4 files changed

+22
-17
lines changed

lib/ClientConnection.cc

+17-12
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ ClientConnection::~ClientConnection() {
278278
void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) {
279279
if (!cmdConnected.has_server_version()) {
280280
LOG_ERROR(cnxString_ << "Server version is not set");
281-
close(ResultUnknownError);
281+
close();
282282
return;
283283
}
284284

@@ -451,7 +451,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
451451
Url service_url;
452452
if (!Url::parse(physicalAddress_, service_url)) {
453453
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
454-
close(ResultInvalidUrl);
454+
close();
455455
return;
456456
}
457457
}
@@ -489,7 +489,12 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
489489
}
490490
});
491491
} else {
492-
close(ResultRetryable);
492+
if (err == ASIO::error::operation_aborted) {
493+
// TCP connect timeout, which is not retryable
494+
close();
495+
} else {
496+
close(ResultRetryable);
497+
}
493498
}
494499
} else {
495500
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
@@ -499,8 +504,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
499504

500505
void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
501506
if (err) {
502-
LOG_WARN(cnxString_ << "Handshake failed: " << err.message());
503-
close(ResultRetryable);
507+
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
508+
close();
504509
return;
505510
}
506511

@@ -527,7 +532,7 @@ void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const Shar
527532
}
528533
if (err) {
529534
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
530-
close(ResultRetryable);
535+
close();
531536
return;
532537
}
533538

@@ -562,14 +567,14 @@ void ClientConnection::tcpConnectAsync() {
562567
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
563568
if (!Url::parse(hostUrl, service_url)) {
564569
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
565-
close(ResultInvalidUrl);
570+
close();
566571
return;
567572
}
568573

569574
if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") {
570575
LOG_ERROR(cnxString_ << "Invalid Url protocol '" << service_url.protocol()
571576
<< "'. Valid values are 'pulsar' and 'pulsar+ssl'");
572-
close(ResultInvalidUrl);
577+
close();
573578
return;
574579
}
575580

@@ -588,7 +593,7 @@ void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::itera
588593
if (err) {
589594
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
590595
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
591-
close(ResultConnectError);
596+
close();
592597
return;
593598
}
594599

@@ -624,8 +629,8 @@ void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::itera
624629
}
625630
});
626631
} else {
627-
LOG_ERROR(cnxString_ << "No IP address found");
628-
close(ResultConnectError);
632+
LOG_WARN(cnxString_ << "No IP address found");
633+
close();
629634
return;
630635
}
631636
}
@@ -880,7 +885,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
880885
// Handle Pulsar Connected
881886
if (incomingCmd.type() != BaseCommand::CONNECTED) {
882887
// Wrong cmd
883-
close(ResultRetryable);
888+
close();
884889
} else {
885890
handlePulsarConnected(incomingCmd.connected());
886891
}

lib/ClientConnection.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
157157
*
158158
* `detach` should only be false when the connection pool is closed.
159159
*/
160-
void close(Result result, bool detach = true);
160+
void close(Result result = ResultConnectError, bool detach = true);
161161

162162
bool isClosed() const;
163163

tests/BasicEndToEndTest.cc

+3-3
Original file line numberDiff line numberDiff line change
@@ -1245,7 +1245,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
12451245
std::this_thread::sleep_for(std::chrono::seconds(1));
12461246
} while (!clientConnectionPtr);
12471247
oldConnections.push_back(clientConnectionPtr);
1248-
clientConnectionPtr->close(ResultConnectError);
1248+
clientConnectionPtr->close();
12491249
}
12501250
LOG_INFO("checking message " << i);
12511251
ASSERT_EQ(producer.send(msg), ResultOk);
@@ -1264,7 +1264,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
12641264
ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(cImpl);
12651265
ClientConnectionPtr clientConnectionPtr = clientConnectionWeakPtr.lock();
12661266
oldConnections.push_back(clientConnectionPtr);
1267-
clientConnectionPtr->close(ResultConnectError);
1267+
clientConnectionPtr->close();
12681268

12691269
while (consumer.receive(msg, 30000) == ResultOk) {
12701270
consumer.acknowledge(msg);
@@ -1316,7 +1316,7 @@ void testHandlerReconnectionPartitionProducers(bool lazyStartPartitionedProducer
13161316
std::this_thread::sleep_for(std::chrono::seconds(1));
13171317
} while (!clientConnectionPtr);
13181318
oldConnections.push_back(clientConnectionPtr);
1319-
clientConnectionPtr->close(ResultConnectError);
1319+
clientConnectionPtr->close();
13201320
}
13211321
ASSERT_EQ(producer.send(msg), ResultOk);
13221322
}

tests/ClientTest.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ TEST(ClientTest, testConnectionClose) {
412412
auto executor = PulsarFriend::getExecutor(*cnx);
413413
// Simulate the close() happens in the event loop
414414
executor->postWork([cnx, &client, numConnections] {
415-
cnx->close(ResultConnectError);
415+
cnx->close();
416416
ASSERT_EQ(PulsarFriend::getConnections(client).size(), numConnections - 1);
417417
LOG_INFO("Connection refcnt: " << cnx.use_count() << " after close");
418418
});

0 commit comments

Comments
 (0)