Skip to content

Commit

Permalink
Add the connection timeout when connecting master (#1172)
Browse files Browse the repository at this point in the history
  • Loading branch information
caipengbo authored Dec 12, 2022
1 parent e6aa898 commit 1de4e93
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
13 changes: 10 additions & 3 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "status.h"
#include "storage/batch_debugger.h"
#include "thread_util.h"
#include "time_util.h"

Status FeedSlaveThread::Start() {
try {
Expand Down Expand Up @@ -236,18 +237,24 @@ void ReplicationThread::CallbacksStateMachine::Start() {
handlers_.emplace_front(CallbacksStateMachine::WRITE, "auth write", authWriteCB);
}

uint64_t last_connect_timestamp = 0;
int connect_timeout_ms = 3100;

while (!repl_->stop_flag_ && bev == nullptr) {
Status s = Util::SockConnect(repl_->host_, repl_->port_, &cfd);
if (Util::GetTimeStampMS() - last_connect_timestamp < 1000) {
// prevent frequent re-connect when the master is down with the connection refused error
sleep(1);
}
last_connect_timestamp = Util::GetTimeStampMS();
Status s = Util::SockConnect(repl_->host_, repl_->port_, &cfd, connect_timeout_ms);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to connect the master, err: " << s.Msg();
sleep(1);
continue;
}
bev = bufferevent_socket_new(repl_->base_, cfd, BEV_OPT_CLOSE_ON_FREE);
if (bev == nullptr) {
close(cfd);
LOG(ERROR) << "[replication] Failed to create the event socket";
sleep(1);
continue;
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/common/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ Status SockConnect(const std::string &host, uint32_t port, int *fd, int conn_tim
sin.sin_port = htons(port);

fcntl(*fd, F_SETFL, O_NONBLOCK);
if (connect(*fd, reinterpret_cast<sockaddr *>(&sin), sizeof(sin))) {
return Status::FromErrno();
}
connect(*fd, reinterpret_cast<sockaddr *>(&sin), sizeof(sin));

auto retmask = Util::aeWait(*fd, AE_WRITABLE, conn_timeout);
if ((retmask & AE_WRITABLE) == 0 || (retmask & AE_ERROR) != 0 || (retmask & AE_HUP) != 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/common/scope_exit.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ struct ScopeExit {
if (enabled_) f_();
}

void Enable() { enabled_ = false; }
void Enable() { enabled_ = true; }

void Disable() { enabled_ = true; }
void Disable() { enabled_ = false; }

bool enabled_;
F f_;
Expand Down

0 comments on commit 1de4e93

Please sign in to comment.