Skip to content

Commit

Permalink
u
Browse files Browse the repository at this point in the history
  • Loading branch information
AntiTopQuark committed Sep 12, 2024
1 parent 24df710 commit 92000fa
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void FeedSlaveThread::loop() {
// first batch here to work around this issue instead of waiting for enough batch size.
bool is_first_repl_batch = true;
uint32_t yield_microseconds = 2 * 1000;
std::string &batches_bulk = conn_->GetOutputBuffer();
std::string &batches_bulk = conn_->GetSlaveOutputBuffer();
batches_bulk.clear();
size_t updates_in_batches = 0;
while (!IsStopped()) {
Expand Down
11 changes: 6 additions & 5 deletions src/common/parse_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,22 @@
#include <limits>

#include "bit_util.h"
#include <iostream>

StatusOr<std::uint64_t> ParseSizeAndUnit(const std::string &v) {
auto [num, rest] = GET_OR_RET(TryParseInt<std::uint64_t>(v.c_str(), 10));

if (*rest == 0) {
return num;
} else if (util::EqualICase(rest, "k")) {
} else if (util::EqualICase(rest, "k") || util::EqualICase(rest, "kb")) {
return util::CheckedShiftLeft(num, 10);
} else if (util::EqualICase(rest, "m")) {
} else if (util::EqualICase(rest, "m") || util::EqualICase(rest, "mb")) {
return util::CheckedShiftLeft(num, 20);
} else if (util::EqualICase(rest, "g")) {
} else if (util::EqualICase(rest, "g") || util::EqualICase(rest, "gb")) {
return util::CheckedShiftLeft(num, 30);
} else if (util::EqualICase(rest, "t")) {
} else if (util::EqualICase(rest, "t") || util::EqualICase(rest, "tb")) {
return util::CheckedShiftLeft(num, 40);
} else if (util::EqualICase(rest, "p")) {
} else if (util::EqualICase(rest, "p") || util::EqualICase(rest, "pb")) {
return util::CheckedShiftLeft(num, 50);
}

Expand Down
35 changes: 1 addition & 34 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -684,44 +684,11 @@ bool Connection::CheckClientReachOutputBufferLimits(size_t reply_bytes) {
}

std::string Connection::CheckClientReachOutputBufferLimits(const std::string &msg) {
auto memSize = msg.size() + GetOutputBuffer().capacity() + evbuffer_get_length(Output());
auto memSize = msg.size() + GetOutputBuffer().capacity() + GetSlaveOutputBuffer().capacity() + evbuffer_get_length(Output());
if (CheckClientReachOutputBufferLimits(memSize)) {
SetReachOutputBufferLimit(true);
return "";
}
return msg;
}

size_t Connection::GetConnectionMemoryUsed() const {
size_t total_memory = sizeof(*this); // 包含所有成员变量的静态内存大小

total_memory += name_.capacity();
total_memory += ns_.capacity();
total_memory += ip_.capacity();
total_memory += announce_ip_.capacity();
total_memory += addr_.capacity();
total_memory += last_cmd_.capacity();
total_memory += output_buffer_.capacity();
total_memory += evbuffer_get_length(Output()) + evbuffer_get_length(Input());

for (const auto &channel : subscribe_channels_) {
total_memory += channel.capacity();
}
for (const auto &pattern : subscribe_patterns_) {
total_memory += pattern.capacity();
}
for (const auto &channel : subscribe_shard_channels_) {
total_memory += channel.capacity();
}
for (const auto &cmd : multi_cmds_) {
total_memory += cmd.capacity();
}

if (saved_current_command_) {
total_memory += saved_current_command_->GetMemoryUsage();
}

return total_memory;
}

} // namespace redis
3 changes: 2 additions & 1 deletion src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class Connection : public EvbufCallbackBase<Connection> {
void SetOutputBufferSoftLimitReachedTime(int64_t time) { output_buffer_soft_limit_reached_time_ = time; }
int64_t GetOutputBufferSoftLimitReachedTime() const { return output_buffer_soft_limit_reached_time_; }
inline std::string &GetOutputBuffer() { return output_buffer_; }
size_t GetConnectionMemoryUsed() const;
inline std::string &GetSlaveOutputBuffer() { return slave_output_buffer_; }
void SetReachOutputBufferLimit(bool reach) { is_reach_output_buffer_limit_ = reach; }
bool IsReachOutputBufferLimit() const { return is_reach_output_buffer_limit_; }

Expand Down Expand Up @@ -220,6 +220,7 @@ class Connection : public EvbufCallbackBase<Connection> {
RESP protocol_version_ = RESP::v2;
int64_t output_buffer_soft_limit_reached_time_ = 0;
std::string output_buffer_;
std::string slave_output_buffer_;
mutable bool is_reach_output_buffer_limit_ = false;
};

Expand Down
13 changes: 2 additions & 11 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ redis::Connection *Worker::removeConnection(int fd) {
if (iter != conns_.end()) {
conn = iter->second;
conn->GetOutputBuffer().clear();
conn->GetSlaveOutputBuffer().clear();
conns_.erase(iter);
srv->DecrClientNum();
}
Expand Down Expand Up @@ -577,7 +578,7 @@ void Worker::KickoutReachOutputBufferLimitsClients() {

for (auto &it : conns_) {
auto conn = it.second;
if (conn->CheckClientReachOutputBufferLimits(conn->GetOutputBuffer().capacity() +
if (conn->CheckClientReachOutputBufferLimits(conn->GetOutputBuffer().capacity() + conn->GetSlaveOutputBuffer().capacity() +
evbuffer_get_length(conn->Output()))) {
to_be_killed_conns.emplace_back(it.first, conn->GetID());
}
Expand All @@ -590,16 +591,6 @@ void Worker::KickoutReachOutputBufferLimitsClients() {
}
}

size_t Worker::GetConnectionsMemoryUsed() {
size_t mem = 0;
std::lock_guard<std::mutex> guard(conns_mu_);

for (auto &it : conns_) {
mem += it.second->GetConnectionMemoryUsed();
}
return mem;
}

void WorkerThread::Start() {
auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); });

Expand Down
1 change: 0 additions & 1 deletion src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
lua_State *Lua() { return lua_; }
std::map<int, redis::Connection *> GetConnections() const { return conns_; }
std::mutex &GetConnectionsMutex() { return conns_mu_; }
size_t GetConnectionsMemoryUsed();
Server *srv;

private:
Expand Down

0 comments on commit 92000fa

Please sign in to comment.