Skip to content

Commit

Permalink
fix: reduce memory consumption during migration (#4017)
Browse files Browse the repository at this point in the history
* refactor: reduce memory consumption for RestoreStreamer
* fix: add Throttling into RestoreStreamer::WriteBucket
  • Loading branch information
BorysTheDev authored Nov 3, 2024
1 parent 5a597cf commit e4b468d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 51 deletions.
61 changes: 25 additions & 36 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void JournalStreamer::Write(std::string_view str) {
DVLOG(2) << "Writing " << str.size() << " bytes";

size_t total_pending = pending_buf_.size() + str.size();

if (in_flight_bytes_ > 0) {
// We can not flush data while there are in flight requests because AsyncWrite
// is not atomic. Therefore, we just aggregate.
Expand Down Expand Up @@ -212,17 +213,11 @@ void RestoreStreamer::Run() {
if (fiber_cancelled_)
return;

bool written = false;
cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) {
db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);
if (WriteBucket(it)) {
written = true;
}
WriteBucket(it);
});
if (written) {
ThrottleIfNeeded();
}

if (++last_yield >= 100) {
ThisFiber::Yield();
Expand Down Expand Up @@ -282,18 +277,15 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {
return my_slots_.Contains(slot_id);
}

bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
bool written = false;

void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
if (it.GetVersion() < snapshot_version_) {
FiberAtomicGuard fg;
it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
const auto& pv = it->second;
string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) {
written = true;

uint64_t expire = 0;
if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
Expand All @@ -304,8 +296,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
}
}
}

return written;
ThrottleIfNeeded();
}

void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
Expand All @@ -332,33 +323,31 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr
string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);

io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());
io::StringSink restore_cmd_sink;
{ // to destroy extra copy
io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());

args.push_back("ABSTTL"); // Means expire string is since epoch
args.push_back("ABSTTL"); // Means expire string is since epoch

if (pk.IsSticky()) {
args.push_back("STICK");
}

WriteCommand(journal::Entry::Payload("RESTORE", ArgSlice(args)));
}
if (pk.IsSticky()) {
args.push_back("STICK");
}

void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) {
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
cmd_payload);
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload("RESTORE", ArgSlice(args)));

// TODO: From WriteEntry to till Write we tripple copy the PrimeValue. It's ver in-efficient and
JournalWriter writer{&restore_cmd_sink};
writer.Write(entry);
}
// TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and
// will burn CPU for large values.
io::StringSink sink;
JournalWriter writer{&sink};
writer.Write(entry);
Write(sink.str());
Write(restore_cmd_sink.str());
}

} // namespace dfly
3 changes: 1 addition & 2 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ class RestoreStreamer : public JournalStreamer {
bool ShouldWrite(cluster::SlotId slot_id) const;

// Returns whether anything was written
bool WriteBucket(PrimeTable::bucket_iterator it);
void WriteBucket(PrimeTable::bucket_iterator it);
void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms);
void WriteCommand(journal::Entry::Payload cmd_payload);

DbSlice* db_slice_;
DbTableArray db_array_;
Expand Down
18 changes: 5 additions & 13 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1994,37 +1994,29 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_


@pytest.mark.skip("Takes more than 10 minutes")
@dfly_args({"proactor_threads": 12, "cluster_mode": "yes"})
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFactory):
# Check data migration from one node to another
instances = [
df_factory.create(
maxmemory="15G",
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
vmodule="streamer=9",
)
for i in range(2)
for i in range(3)
]

replica = df_factory.create(
port=BASE_PORT + 3,
admin_port=BASE_PORT + 3 + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
)

df_factory.start_all(instances + [replica])
df_factory.start_all(instances)

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
for i in range(1, len(instances)):
nodes[i].slots = []

await replica.admin_client().execute_command(f"replicaof localhost {nodes[0].instance.port}")

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await nodes[0].client.execute_command("DEBUG POPULATE 22500000 test 1000 RAND SLOTS 0 16383")
await nodes[0].client.execute_command("DEBUG POPULATE 5000000 test 1000 RAND SLOTS 0 16383")

await asyncio.sleep(2)

Expand Down

0 comments on commit e4b468d

Please sign in to comment.