Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize write binlog by narrowing down critical secion #2129

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ elseif(${BUILD_TYPE} STREQUAL RELWITHDEBINFO)
set(LIB_BUILD_TYPE RELWITHDEBINFO)
else()
set(LIB_BUILD_TYPE RELEASE)
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -g -DNDEBUG")
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
endif()

if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
Expand Down
3 changes: 1 addition & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,7 @@ std::string FlushallCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t
std::string flushdb_cmd("flushdb");
RedisAppendLenUint64(content, flushdb_cmd.size(), "$");
RedisAppendContent(content, flushdb_cmd);
return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void FlushallCmd::Execute() {
Expand Down
21 changes: 20 additions & 1 deletion src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <utility>

#include "include/pika_binlog_transverter.h"
#include "pstd/include/pstd_defer.h"
#include "pstd_status.h"

using pstd::Status;
Expand Down Expand Up @@ -168,7 +169,25 @@ Status Binlog::Put(const std::string& item) {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
}
Status s = Put(item.c_str(), static_cast<int>(item.size()));
uint32_t filenum = 0;
uint32_t term = 0;
uint64_t offset = 0;
uint64_t logic_id = 0;

Lock();
DEFER {
Unlock();
};

Status s = GetProducerStatus(&filenum, &offset, &term, &logic_id);
if (!s.ok()) {
return s;
}
logic_id++;
std::string data = PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst,
time(nullptr), term, logic_id, filenum, offset, item, {});

s = Put(data.c_str(), static_cast<int>(data.size()));
if (!s.ok()) {
binlog_io_error_.store(true);
}
Expand Down
3 changes: 1 addition & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,7 @@ std::string Cmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_i
RedisAppendContent(content, v);
}

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); }
Expand Down
23 changes: 1 addition & 22 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,31 +355,12 @@ Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std

LogOffset log_offset;

stable_logger_->Logger()->Lock();
// std::this_thread::sleep_for(std::chrono::seconds(20));
// build BinlogItem
uint32_t filenum = 0;
uint32_t term = 0;
uint64_t offset = 0;
uint64_t logic_id = 0;
Status s = stable_logger_->Logger()->GetProducerStatus(&filenum, &offset, &term, &logic_id);
if (!s.ok()) {
stable_logger_->Logger()->Unlock();
return s;
}
BinlogItem item;
item.set_exec_time(time(nullptr));
item.set_term_id(term);
item.set_logic_id(logic_id + 1);
item.set_filenum(filenum);
item.set_offset(offset);
// make sure stable log and mem log consistent
s = InternalAppendLog(item, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
Status s = InternalAppendLog(item, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
if (!s.ok()) {
stable_logger_->Logger()->Unlock();
return s;
}
stable_logger_->Logger()->Unlock();

g_pika_server->SignalAuxiliary();
return Status::OK();
Expand Down Expand Up @@ -409,9 +390,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
return Status::OK();
}

stable_logger_->Logger()->Lock();
Status s = InternalAppendLog(attribute, cmd_ptr, nullptr, nullptr);
stable_logger_->Logger()->Unlock();

InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr));
return Status::OK();
Expand Down
25 changes: 7 additions & 18 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ std::string SetCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logi
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);
return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
} else {
return Cmd::ToBinlog(exec_time, term_id, logic_id, filenum, offset);
}
Expand Down Expand Up @@ -510,9 +509,7 @@ std::string SetnxCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t lo
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void SetexCmd::DoInitial() {
Expand Down Expand Up @@ -561,8 +558,7 @@ std::string SetexCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t lo
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);
return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void PsetexCmd::DoInitial() {
Expand Down Expand Up @@ -610,8 +606,7 @@ std::string PsetexCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t l
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);
return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void DelvxCmd::DoInitial() {
Expand Down Expand Up @@ -891,9 +886,7 @@ std::string ExpireCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t l
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void PexpireCmd::DoInitial() {
Expand Down Expand Up @@ -938,9 +931,7 @@ std::string PexpireCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void ExpireatCmd::DoInitial() {
Expand Down Expand Up @@ -997,9 +988,7 @@ std::string PexpireatCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);

return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, term_id, logic_id, filenum, offset,
content, {});
return content;
}

void PexpireatCmd::Do(std::shared_ptr<Slot> slot) {
Expand Down