Skip to content

Commit

Permalink
use auto
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice committed Mar 9, 2023
1 parent c372910 commit e0e9436
Showing 1 changed file with 41 additions and 43 deletions.
84 changes: 41 additions & 43 deletions src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class CommandFetchMeta : public Commander {
svr->stats_.IncrFullSyncCounter();

// Feed-replica-meta thread
std::thread t = GET_OR_RET(Util::CreateThread("feed-repl-info", [svr, repl_fd, ip, bev = conn->GetBufferEvent()] {
auto t = GET_OR_RET(Util::CreateThread("feed-repl-info", [svr, repl_fd, ip, bev = conn->GetBufferEvent()] {
svr->IncrFetchFileThread();
auto exit = MakeScopeExit([svr, bev] {
bufferevent_free(bev);
Expand Down Expand Up @@ -273,48 +273,46 @@ class CommandFetchFile : public Commander {
conn->NeedNotFreeBufferEvent(); // Feed-replica-file thread will close the replica bufferevent
conn->EnableFlag(Redis::Connection::kCloseAsync);

std::thread t =
GET_OR_RET(Util::CreateThread("feed-repl-file", [svr, repl_fd, ip, files, bev = conn->GetBufferEvent()]() {
auto exit = MakeScopeExit([bev] { bufferevent_free(bev); });
svr->IncrFetchFileThread();

for (const auto &file : files) {
if (svr->IsStopped()) break;

uint64_t file_size = 0, max_replication_bytes = 0;
if (svr->GetConfig()->max_replication_mb > 0) {
max_replication_bytes = (svr->GetConfig()->max_replication_mb * MiB) / svr->GetFetchFileThreadNum();
}
auto start = std::chrono::high_resolution_clock::now();
auto fd = UniqueFD(Engine::Storage::ReplDataManager::OpenDataFile(svr->storage_, file, &file_size));
if (!fd) break;

// Send file size and content
if (Util::SockSend(repl_fd, std::to_string(file_size) + CRLF).IsOK() &&
Util::SockSendFile(repl_fd, *fd, file_size).IsOK()) {
LOG(INFO) << "[replication] Succeed sending file " << file << " to " << ip;
} else {
LOG(WARNING) << "[replication] Fail to send file " << file << " to " << ip
<< ", error: " << strerror(errno);
break;
}
fd.Close();

// Sleep if the speed of sending file is more than replication speed limit
auto end = std::chrono::high_resolution_clock::now();
uint64_t duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
auto shortest = static_cast<uint64_t>(static_cast<double>(file_size) /
static_cast<double>(max_replication_bytes) * (1000 * 1000));
if (max_replication_bytes > 0 && duration < shortest) {
LOG(INFO) << "[replication] Need to sleep " << (shortest - duration) / 1000
<< " ms since of sending files too quickly";
usleep(shortest - duration);
}
}
auto now = static_cast<time_t>(Util::GetTimeStamp());
svr->storage_->SetCheckpointAccessTime(now);
svr->DecrFetchFileThread();
}));
auto t = GET_OR_RET(Util::CreateThread("feed-repl-file", [svr, repl_fd, ip, files, bev = conn->GetBufferEvent()]() {
auto exit = MakeScopeExit([bev] { bufferevent_free(bev); });
svr->IncrFetchFileThread();

for (const auto &file : files) {
if (svr->IsStopped()) break;

uint64_t file_size = 0, max_replication_bytes = 0;
if (svr->GetConfig()->max_replication_mb > 0) {
max_replication_bytes = (svr->GetConfig()->max_replication_mb * MiB) / svr->GetFetchFileThreadNum();
}
auto start = std::chrono::high_resolution_clock::now();
auto fd = UniqueFD(Engine::Storage::ReplDataManager::OpenDataFile(svr->storage_, file, &file_size));
if (!fd) break;

// Send file size and content
if (Util::SockSend(repl_fd, std::to_string(file_size) + CRLF).IsOK() &&
Util::SockSendFile(repl_fd, *fd, file_size).IsOK()) {
LOG(INFO) << "[replication] Succeed sending file " << file << " to " << ip;
} else {
LOG(WARNING) << "[replication] Fail to send file " << file << " to " << ip << ", error: " << strerror(errno);
break;
}
fd.Close();

// Sleep if the speed of sending file is more than replication speed limit
auto end = std::chrono::high_resolution_clock::now();
uint64_t duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
auto shortest = static_cast<uint64_t>(static_cast<double>(file_size) /
static_cast<double>(max_replication_bytes) * (1000 * 1000));
if (max_replication_bytes > 0 && duration < shortest) {
LOG(INFO) << "[replication] Need to sleep " << (shortest - duration) / 1000
<< " ms since of sending files too quickly";
usleep(shortest - duration);
}
}
auto now = static_cast<time_t>(Util::GetTimeStamp());
svr->storage_->SetCheckpointAccessTime(now);
svr->DecrFetchFileThread();
}));

if (auto s = Util::ThreadDetach(t); !s) {
return s;
Expand Down

0 comments on commit e0e9436

Please sign in to comment.