Skip to content

Commit

Permalink
chore(tiering): Faster smallbins serialization dragonflydb#2
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Jul 26, 2024
1 parent e2d65a0 commit 0008c57
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
}

void CompactObj::Materialize(std::string_view blob, bool is_raw) {
CHECK(IsExternal()) << int(taglen_);
// CHECK(IsExternal()) << int(taglen_);

DCHECK_GT(blob.size(), kInlineLen);

Expand Down
9 changes: 9 additions & 0 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ class CompactObj {

bool HasAllocated() const;

uint8_t GetEncodingMask() const {
return mask_ & kEncMask;
}

void SetEncodingMask(uint8_t mask) {
mask_ &= ~kEncMask;
mask_ |= (mask & kEncMask);
}

private:
void EncodeString(std::string_view str);
size_t DecodedLen(size_t sz) const;
Expand Down
5 changes: 4 additions & 1 deletion src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
constexpr uint8_t RDB_TYPE_SBF = 33;
constexpr uint8_t RDB_TYPE_TIERED_SEGMENT = 34;

constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
(type == RDB_TYPE_SBF);
(type == RDB_TYPE_SBF) || (type == RDB_TYPE_TIERED_SEGMENT);
}

// Opcodes: Range 200-240 is used by DF extensions.
Expand All @@ -40,6 +41,8 @@ constexpr uint8_t RDB_OPCODE_JOURNAL_BLOB = 210;
// so it is always sent at the end of the RDB stream.
constexpr uint8_t RDB_OPCODE_JOURNAL_OFFSET = 211;

constexpr uint8_t RDB_OPCODE_TIERED_PAGE = 212;

constexpr uint8_t RDB_OPCODE_DF_MASK = 220; /* Mask for key properties */

// RDB_OPCODE_DF_MASK define 4byte field with next flags
Expand Down
91 changes: 85 additions & 6 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class RdbLoaderBase::OpaqueObjLoader {
void operator()(const LzfString& lzfstr);
void operator()(const unique_ptr<LoadTrace>& ptr);
void operator()(const RdbSBF& src);
void operator()(const RdbTieredSegment& segmnet);

std::error_code ec() const {
return ec_;
Expand Down Expand Up @@ -481,6 +482,10 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) {
pv_->SetSBF(sbf);
}

void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbTieredSegment& src) {
CHECK(false) << "unreachable";
}

void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count();

Expand Down Expand Up @@ -1385,6 +1390,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
case RDB_TYPE_SBF:
iores = ReadSBF();
break;
case RDB_TYPE_TIERED_SEGMENT:
iores = ReadTieredSegment();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;

Expand Down Expand Up @@ -1878,6 +1886,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
}

auto RdbLoaderBase::ReadTieredSegment() -> io::Result<OpaqueObj> {
RdbTieredSegment segment;
SET_OR_UNEXPECT(LoadLen(nullptr), segment.offset);
SET_OR_UNEXPECT(LoadLen(nullptr), segment.length);
SET_OR_UNEXPECT(LoadLen(nullptr), segment.enc_mask);
return OpaqueObj{segment, RDB_TYPE_TIERED_SEGMENT};
};

template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
auto ec = EnsureRead(sizeof(T));
if (ec)
Expand Down Expand Up @@ -1983,7 +1999,7 @@ error_code RdbLoader::Load(io::Source* src) {
/* Read type. */
SET_OR_RETURN(FetchType(), type);

DVLOG(2) << "Opcode type: " << type;
DVLOG(0) << "Opcode type: " << type;

/* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) {
Expand Down Expand Up @@ -2117,6 +2133,15 @@ error_code RdbLoader::Load(io::Source* src) {
continue;
}

if (type == RDB_OPCODE_TIERED_PAGE) {
size_t offset;
SET_OR_RETURN(LoadLen(nullptr), offset);
// CHECK(false) << "Offload pages if needed, tiering?";
SET_OR_RETURN(FetchGenericString(), small_items_pages_[offset]);
VLOG(0) << "Found tiered page " << offset;
continue;
}

if (!rdbIsObjectTypeDF(type)) {
return RdbError(errc::invalid_rdb_type);
}
Expand All @@ -2126,6 +2151,11 @@ error_code RdbLoader::Load(io::Source* src) {
settings.Reset();
} // main load loop

// Flush all small items
HandleSmallItems(true);

FlushAllShards();

DVLOG(1) << "RdbLoad loop finished";

if (stop_early_) {
Expand Down Expand Up @@ -2531,20 +2561,38 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {

item->is_sticky = settings->is_sticky;

ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;

auto& out_buf = shard_buf_[sid];
std::move(cleanup).Cancel();

if (item->val.rdb_type == RDB_TYPE_TIERED_SEGMENT) {
auto segment = get<RdbTieredSegment>(item->val.obj);
VLOG(0) << "Found tiered segment " << segment.offset;
{
size_t offset = segment.offset / tiering::kPageSize * tiering::kPageSize;
auto& items = small_items_[offset];
small_items_sizes_.erase({items.size(), offset});
items.push_back(item);
small_items_sizes_.insert({items.size(), offset});
}
HandleSmallItems(false); // don't flush
return kOk;
}

Add(item);
return kOk;
}

void RdbLoader::Add(Item* item) {
ShardId sid = Shard(item->key, shard_set->size());

auto& out_buf = shard_buf_[sid];
out_buf.emplace_back(item);
std::move(cleanup).Cancel();

constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {
FlushShardAsync(sid);
}

return kOk;
}

void RdbLoader::LoadScriptFromAux(string&& body) {
Expand All @@ -2559,6 +2607,37 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
}
}

void RdbLoader::HandleSmallItems(bool flush) {
while (!small_items_.empty() && (flush || small_items_.size() > 1000)) {
auto [_, offset] = small_items_sizes_.extract(small_items_sizes_.begin()).value();
VLOG(0) << "Wantin offset " << offset;
auto node = small_items_.extract(offset);

VLOG(0) << "Handling small group with offset " << offset << " entries " << node.mapped().size();
auto page = small_items_pages_[offset];

for (Item* item : node.mapped()) {
RdbTieredSegment segment = get<RdbTieredSegment>(item->val.obj);
VLOG(0) << "Loading length " << segment.length;

base::PODArray<char> arr(segment.length, nullptr);
memcpy(arr.begin(), page.data() + (segment.offset - offset), segment.length);

CompactObj co;
co.SetEncodingMask(segment.enc_mask);
co.Materialize({arr.data(), arr.size()}, true); // todo: skip double copy

arr.resize(co.Size());
co.GetString(arr.data());

item->val.rdb_type = RDB_TYPE_STRING;
item->val.obj = std::move(arr);

Add(item);
}
}
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::CapturingReplyBuilder crb{};
ConnectionContext cntx{nullptr, nullptr, &crb};
Expand Down
29 changes: 27 additions & 2 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//
#pragma once

#include <absl/container/btree_set.h>
#include <absl/container/flat_hash_map.h>

#include <system_error>

extern "C" {
Expand Down Expand Up @@ -54,8 +57,18 @@ class RdbLoaderBase {
std::vector<Filter> filters;
};

using RdbVariant =
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>, RdbSBF>;
struct RdbTieredSegment {
size_t offset, length;
uint8_t enc_mask;
};

struct RdbTieredPage {
size_t offset;
std::string blob;
};

using RdbVariant = std::variant<long long, base::PODArray<char>, LzfString,
std::unique_ptr<LoadTrace>, RdbSBF, RdbTieredSegment>;

struct OpaqueObj {
RdbVariant obj;
Expand Down Expand Up @@ -148,6 +161,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadJson();
::io::Result<OpaqueObj> ReadSBF();
::io::Result<OpaqueObj> ReadTieredSegment();

std::error_code SkipModuleData();
std::error_code HandleCompressedBlob(int op_type);
Expand All @@ -168,10 +182,13 @@ class RdbLoaderBase {

size_t bytes_read_ = 0;
size_t source_limit_ = SIZE_MAX;

base::PODArray<uint8_t> compr_buf_;
std::unique_ptr<DecompressImpl> decompress_impl_;

JournalReader journal_reader_{nullptr, 0};
std::optional<uint64_t> journal_offset_ = std::nullopt;

RdbVersion rdb_version_ = RDB_VERSION;
};

Expand Down Expand Up @@ -259,10 +276,13 @@ class RdbLoader : protected RdbLoaderBase {
void FlushShardAsync(ShardId sid);
void FlushAllShards();

void Add(Item* item);
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);

void LoadScriptFromAux(std::string&& value);

void HandleSmallItems(bool flush);

// Load index definition from RESP string describing it in FT.CREATE format,
// issues an FT.CREATE call, but does not start indexing
void LoadSearchIndexDefFromAux(std::string&& value);
Expand All @@ -285,6 +305,11 @@ class RdbLoader : protected RdbLoaderBase {
std::function<void()> full_sync_cut_cb;

base::MPSCIntrusiveQueue<Item> item_queue_;

absl::flat_hash_map<size_t /* offset */, std::vector<Item*>> small_items_;
absl::btree_set<std::pair<size_t /* num entries*/, size_t /* offset */>, std::greater<>>
small_items_sizes_;
absl::flat_hash_map<size_t /* offset */, std::string> small_items_pages_;
};

} // namespace dfly
22 changes: 20 additions & 2 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
unsigned compact_enc = pv.Encoding();
switch (type) {
case OBJ_STRING:
if (pv.IsExternal())
return RDB_TYPE_TIERED_SEGMENT;
return RDB_TYPE_STRING;
case OBJ_LIST:
if (compact_enc == OBJ_ENCODING_QUICKLIST)
Expand Down Expand Up @@ -311,8 +313,9 @@ RdbSerializer::~RdbSerializer() {
std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) {
std::error_code ec;
if (pv.ObjType() == OBJ_STRING) {
auto opt_int = pv.TryGetInt();
if (opt_int) {
if (pv.IsExternal()) {
ec = SaveExternalSegment(pv);
} else if (auto opt_int = pv.TryGetInt(); opt_int) {
ec = SaveLongLongAsString(*opt_int);
} else {
ec = SaveString(pv.GetSlice(&tmp_str_));
Expand Down Expand Up @@ -681,6 +684,14 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) {
return {};
}

std::error_code RdbSerializer::SaveExternalSegment(const PrimeValue& pv) {
auto [offset, length] = pv.GetExternalSlice();
VLOG(0) << "Saving external segment " << offset << " " << length;
RETURN_ON_ERR(SaveLen(offset));
RETURN_ON_ERR(SaveLen(length));
return SaveLen(pv.GetEncodingMask());
}

/* Save a long long value as either an encoded string or a string. */
error_code RdbSerializer::SaveLongLongAsString(int64_t value) {
uint8_t buf[32];
Expand Down Expand Up @@ -1635,6 +1646,13 @@ size_t RdbSerializer::GetTempBufferSize() const {
return SerializerBase::GetTempBufferSize() + tmp_str_.size();
}

error_code RdbSerializer::SaveTieringPage(size_t offset, std::string_view page) {
VLOG(0) << "Saving tiering page " << offset << "\n\n" << page << "\n\n";
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_TIERED_PAGE));
RETURN_ON_ERR(SaveLen(offset));
return SaveString(page);
}

void RdbSerializer::FlushIfNeeded(SerializerBase::FlushState flush_state) {
if (flush_fun_) {
flush_fun_(SerializedLen(), flush_state);
Expand Down
3 changes: 3 additions & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ class RdbSerializer : public SerializerBase {

size_t GetTempBufferSize() const override;

std::error_code SaveTieringPage(size_t offset, std::string_view page);

private:
// Might preempt if flush_fun_ is used
std::error_code SaveObject(const PrimeValue& pv);
Expand All @@ -237,6 +239,7 @@ class RdbSerializer : public SerializerBase {
std::error_code SaveStreamObject(const PrimeValue& obj);
std::error_code SaveJsonObject(const PrimeValue& pv);
std::error_code SaveSBFObject(const PrimeValue& pv);
std::error_code SaveExternalSegment(const PrimeValue& pv);

std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);
Expand Down
Loading

0 comments on commit 0008c57

Please sign in to comment.