Skip to content

Commit

Permalink
Implement the unify WAL iterator (apache#2040)
Browse files Browse the repository at this point in the history
Implement the unified WAL iterator for kvrocks, similar to the DBIterator.
It will wrap rocksdb WAL Iterator and can return different item types.
It is possible to implement their processing logic depending on the concrete type.
  • Loading branch information
caipengbo authored Jan 26, 2024
1 parent ba6f61e commit 201ba83
Show file tree
Hide file tree
Showing 5 changed files with 725 additions and 17 deletions.
129 changes: 125 additions & 4 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "db_util.h"

namespace engine {
DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot)
DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot)
: storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_));
Expand Down Expand Up @@ -80,7 +80,7 @@ void DBIterator::Reset() {
if (metadata_iter_) metadata_iter_.reset();
}

void DBIterator::Seek(const std::string& target) {
void DBIterator::Seek(const std::string &target) {
if (!metadata_iter_) return;

// Iterate with the slot id but storage didn't enable slot id encoding
Expand Down Expand Up @@ -112,7 +112,7 @@ std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
return std::make_unique<SubKeyIterator>(storage_, read_options_, type, std::move(prefix));
}

SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
: storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) {
if (type_ == kRedisStream) {
cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
Expand Down Expand Up @@ -145,7 +145,7 @@ Slice SubKeyIterator::UserKey() const {
return internal_key.GetSubKey();
}

rocksdb::ColumnFamilyHandle* SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; }
rocksdb::ColumnFamilyHandle *SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; }

Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); }

Expand All @@ -164,4 +164,125 @@ void SubKeyIterator::Reset() {
if (iter_) iter_.reset();
}

rocksdb::Status WALBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) {
if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
return rocksdb::Status::OK();
}
items_.emplace_back(WALItem::Type::kTypePut, column_family_id, key.ToString(), value.ToString());
return rocksdb::Status::OK();
}

rocksdb::Status WALBatchExtractor::DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) {
if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
return rocksdb::Status::OK();
}
items_.emplace_back(WALItem::Type::kTypeDelete, column_family_id, key.ToString(), std::string{});
return rocksdb::Status::OK();
}

rocksdb::Status WALBatchExtractor::DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key,
const rocksdb::Slice &end_key) {
items_.emplace_back(WALItem::Type::kTypeDeleteRange, column_family_id, begin_key.ToString(), end_key.ToString());
return rocksdb::Status::OK();
}

void WALBatchExtractor::LogData(const rocksdb::Slice &blob) {
items_.emplace_back(WALItem::Type::kTypeLogData, 0, blob.ToString(), std::string{});
};

void WALBatchExtractor::Clear() { items_.clear(); }

WALBatchExtractor::Iter WALBatchExtractor::GetIter() { return Iter(&items_); }

bool WALBatchExtractor::Iter::Valid() { return items_ && cur_ < items_->size(); }

void WALBatchExtractor::Iter::Next() { cur_++; }

WALItem WALBatchExtractor::Iter::Value() {
if (!Valid()) {
return {};
}
return (*items_)[cur_];
}

void WALIterator::Reset() {
if (iter_) {
iter_.reset();
}
if (batch_iter_) {
batch_iter_.reset();
}
extractor_.Clear();
next_batch_seq_ = 0;
}

bool WALIterator::Valid() const { return (batch_iter_ && batch_iter_->Valid()) || (iter_ && iter_->Valid()); }

void WALIterator::nextBatch() {
if (!iter_ || !iter_->Valid()) {
Reset();
return;
}

auto batch = iter_->GetBatch();
if (batch.sequence != next_batch_seq_ || !batch.writeBatchPtr) {
Reset();
return;
}

extractor_.Clear();

auto s = batch.writeBatchPtr->Iterate(&extractor_);
if (!s.ok()) {
Reset();
return;
}

next_batch_seq_ += batch.writeBatchPtr->Count();
batch_iter_ = std::make_unique<WALBatchExtractor::Iter>(extractor_.GetIter());
}

void WALIterator::Seek(rocksdb::SequenceNumber seq) {
if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
Reset();
return;
}

auto s = storage_->GetWALIter(seq, &iter_);
if (!s.IsOK()) {
Reset();
return;
}

next_batch_seq_ = seq;

nextBatch();
}

WALItem WALIterator::Item() {
if (batch_iter_ && batch_iter_->Valid()) {
return batch_iter_->Value();
}
return {};
}

rocksdb::SequenceNumber WALIterator::NextSequenceNumber() const { return next_batch_seq_; }

void WALIterator::Next() {
if (!Valid()) {
Reset();
return;
}

if (batch_iter_ && batch_iter_->Valid()) {
batch_iter_->Next();
if (batch_iter_->Valid()) {
return;
}
}

iter_->Next();
nextBatch();
}

} // namespace engine
82 changes: 82 additions & 0 deletions src/storage/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,86 @@ class DBIterator {
std::unique_ptr<SubKeyIterator> subkey_iter_;
};

struct WALItem {
enum class Type : uint8_t {
kTypeInvalid = 0,
kTypeLogData = 1,
kTypePut = 2,
kTypeDelete = 3,
kTypeDeleteRange = 4,
};

WALItem() = default;
WALItem(WALItem::Type t, uint32_t cf_id, std::string k, std::string v)
: type(t), column_family_id(cf_id), key(std::move(k)), value(std::move(v)) {}

WALItem::Type type = WALItem::Type::kTypeInvalid;
uint32_t column_family_id = 0;
std::string key;
std::string value;
};

class WALBatchExtractor : public rocksdb::WriteBatch::Handler {
public:
// If set slot, storage must enable slot id encoding
explicit WALBatchExtractor(int slot = -1) : slot_(slot) {}

rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) override;

rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) override;

rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key,
const rocksdb::Slice &end_key) override;

void LogData(const rocksdb::Slice &blob) override;

void Clear();

class Iter {
friend class WALBatchExtractor;

public:
bool Valid();
void Next();
WALItem Value();

private:
explicit Iter(std::vector<WALItem> *items) : items_(items), cur_(0) {}
std::vector<WALItem> *items_;
size_t cur_;
};

WALBatchExtractor::Iter GetIter();

private:
std::vector<WALItem> items_;
int slot_;
};

class WALIterator {
public:
explicit WALIterator(engine::Storage *storage, int slot = -1)
: storage_(storage), slot_(slot), extractor_(slot), next_batch_seq_(0){};
~WALIterator() = default;

bool Valid() const;
void Seek(rocksdb::SequenceNumber seq);
void Next();
WALItem Item();

rocksdb::SequenceNumber NextSequenceNumber() const;
void Reset();

private:
void nextBatch();

engine::Storage *storage_;
int slot_;

std::unique_ptr<rocksdb::TransactionLogIterator> iter_;
WALBatchExtractor extractor_;
std::unique_ptr<WALBatchExtractor::Iter> batch_iter_;
rocksdb::SequenceNumber next_batch_seq_;
};

} // namespace engine
11 changes: 11 additions & 0 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ bool InternalKey::operator==(const InternalKey &that) const {
return version_ == that.version_;
}

// Must slot encoded
uint16_t ExtractSlotId(Slice ns_key) {
uint8_t namespace_size = 0;
GetFixed8(&ns_key, &namespace_size);
ns_key.remove_prefix(namespace_size);

uint16_t slot_id = HASH_SLOTS_SIZE;
GetFixed16(&ns_key, &slot_id);
return slot_id;
}

template <typename T>
std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded) {
uint8_t namespace_size = 0;
Expand Down
1 change: 1 addition & 0 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ struct KeyNumStats {
uint64_t avg_ttl = 0;
};

[[nodiscard]] uint16_t ExtractSlotId(Slice ns_key);
template <typename T = Slice>
[[nodiscard]] std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded);
[[nodiscard]] std::string ComposeNamespaceKey(const Slice &ns, const Slice &key, bool slot_id_encoded);
Expand Down
Loading

0 comments on commit 201ba83

Please sign in to comment.