Skip to content

Commit

Permalink
Merge branch 'unstable' into cpctor
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk authored Jan 26, 2024
2 parents c87a176 + 201ba83 commit 5f61cb8
Show file tree
Hide file tree
Showing 5 changed files with 725 additions and 17 deletions.
129 changes: 125 additions & 4 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "db_util.h"

namespace engine {
DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot)
DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot)
: storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_));
Expand Down Expand Up @@ -80,7 +80,7 @@ void DBIterator::Reset() {
if (metadata_iter_) metadata_iter_.reset();
}

void DBIterator::Seek(const std::string& target) {
void DBIterator::Seek(const std::string &target) {
if (!metadata_iter_) return;

// Iterate with the slot id but storage didn't enable slot id encoding
Expand Down Expand Up @@ -112,7 +112,7 @@ std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
return std::make_unique<SubKeyIterator>(storage_, read_options_, type, std::move(prefix));
}

SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
: storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) {
if (type_ == kRedisStream) {
cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
Expand Down Expand Up @@ -145,7 +145,7 @@ Slice SubKeyIterator::UserKey() const {
return internal_key.GetSubKey();
}

rocksdb::ColumnFamilyHandle* SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; }
rocksdb::ColumnFamilyHandle *SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; }

Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); }

Expand All @@ -164,4 +164,125 @@ void SubKeyIterator::Reset() {
if (iter_) iter_.reset();
}

rocksdb::Status WALBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) {
if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
return rocksdb::Status::OK();
}
items_.emplace_back(WALItem::Type::kTypePut, column_family_id, key.ToString(), value.ToString());
return rocksdb::Status::OK();
}

rocksdb::Status WALBatchExtractor::DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) {
if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
return rocksdb::Status::OK();
}
items_.emplace_back(WALItem::Type::kTypeDelete, column_family_id, key.ToString(), std::string{});
return rocksdb::Status::OK();
}

rocksdb::Status WALBatchExtractor::DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key,
const rocksdb::Slice &end_key) {
items_.emplace_back(WALItem::Type::kTypeDeleteRange, column_family_id, begin_key.ToString(), end_key.ToString());
return rocksdb::Status::OK();
}

void WALBatchExtractor::LogData(const rocksdb::Slice &blob) {
items_.emplace_back(WALItem::Type::kTypeLogData, 0, blob.ToString(), std::string{});
};

void WALBatchExtractor::Clear() { items_.clear(); }

WALBatchExtractor::Iter WALBatchExtractor::GetIter() { return Iter(&items_); }

bool WALBatchExtractor::Iter::Valid() { return items_ && cur_ < items_->size(); }

void WALBatchExtractor::Iter::Next() { cur_++; }

WALItem WALBatchExtractor::Iter::Value() {
if (!Valid()) {
return {};
}
return (*items_)[cur_];
}

void WALIterator::Reset() {
if (iter_) {
iter_.reset();
}
if (batch_iter_) {
batch_iter_.reset();
}
extractor_.Clear();
next_batch_seq_ = 0;
}

bool WALIterator::Valid() const { return (batch_iter_ && batch_iter_->Valid()) || (iter_ && iter_->Valid()); }

void WALIterator::nextBatch() {
if (!iter_ || !iter_->Valid()) {
Reset();
return;
}

auto batch = iter_->GetBatch();
if (batch.sequence != next_batch_seq_ || !batch.writeBatchPtr) {
Reset();
return;
}

extractor_.Clear();

auto s = batch.writeBatchPtr->Iterate(&extractor_);
if (!s.ok()) {
Reset();
return;
}

next_batch_seq_ += batch.writeBatchPtr->Count();
batch_iter_ = std::make_unique<WALBatchExtractor::Iter>(extractor_.GetIter());
}

void WALIterator::Seek(rocksdb::SequenceNumber seq) {
if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
Reset();
return;
}

auto s = storage_->GetWALIter(seq, &iter_);
if (!s.IsOK()) {
Reset();
return;
}

next_batch_seq_ = seq;

nextBatch();
}

WALItem WALIterator::Item() {
if (batch_iter_ && batch_iter_->Valid()) {
return batch_iter_->Value();
}
return {};
}

rocksdb::SequenceNumber WALIterator::NextSequenceNumber() const { return next_batch_seq_; }

void WALIterator::Next() {
if (!Valid()) {
Reset();
return;
}

if (batch_iter_ && batch_iter_->Valid()) {
batch_iter_->Next();
if (batch_iter_->Valid()) {
return;
}
}

iter_->Next();
nextBatch();
}

} // namespace engine
82 changes: 82 additions & 0 deletions src/storage/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,86 @@ class DBIterator {
std::unique_ptr<SubKeyIterator> subkey_iter_;
};

struct WALItem {
enum class Type : uint8_t {
kTypeInvalid = 0,
kTypeLogData = 1,
kTypePut = 2,
kTypeDelete = 3,
kTypeDeleteRange = 4,
};

WALItem() = default;
WALItem(WALItem::Type t, uint32_t cf_id, std::string k, std::string v)
: type(t), column_family_id(cf_id), key(std::move(k)), value(std::move(v)) {}

WALItem::Type type = WALItem::Type::kTypeInvalid;
uint32_t column_family_id = 0;
std::string key;
std::string value;
};

class WALBatchExtractor : public rocksdb::WriteBatch::Handler {
public:
// If set slot, storage must enable slot id encoding
explicit WALBatchExtractor(int slot = -1) : slot_(slot) {}

rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) override;

rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) override;

rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key,
const rocksdb::Slice &end_key) override;

void LogData(const rocksdb::Slice &blob) override;

void Clear();

class Iter {
friend class WALBatchExtractor;

public:
bool Valid();
void Next();
WALItem Value();

private:
explicit Iter(std::vector<WALItem> *items) : items_(items), cur_(0) {}
std::vector<WALItem> *items_;
size_t cur_;
};

WALBatchExtractor::Iter GetIter();

private:
std::vector<WALItem> items_;
int slot_;
};

class WALIterator {
public:
explicit WALIterator(engine::Storage *storage, int slot = -1)
: storage_(storage), slot_(slot), extractor_(slot), next_batch_seq_(0){};
~WALIterator() = default;

bool Valid() const;
void Seek(rocksdb::SequenceNumber seq);
void Next();
WALItem Item();

rocksdb::SequenceNumber NextSequenceNumber() const;
void Reset();

private:
void nextBatch();

engine::Storage *storage_;
int slot_;

std::unique_ptr<rocksdb::TransactionLogIterator> iter_;
WALBatchExtractor extractor_;
std::unique_ptr<WALBatchExtractor::Iter> batch_iter_;
rocksdb::SequenceNumber next_batch_seq_;
};

} // namespace engine
11 changes: 11 additions & 0 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ bool InternalKey::operator==(const InternalKey &that) const {
return version_ == that.version_;
}

// Must slot encoded
uint16_t ExtractSlotId(Slice ns_key) {
uint8_t namespace_size = 0;
GetFixed8(&ns_key, &namespace_size);
ns_key.remove_prefix(namespace_size);

uint16_t slot_id = HASH_SLOTS_SIZE;
GetFixed16(&ns_key, &slot_id);
return slot_id;
}

template <typename T>
std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded) {
uint8_t namespace_size = 0;
Expand Down
1 change: 1 addition & 0 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ struct KeyNumStats {
uint64_t avg_ttl = 0;
};

[[nodiscard]] uint16_t ExtractSlotId(Slice ns_key);
template <typename T = Slice>
[[nodiscard]] std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded);
[[nodiscard]] std::string ComposeNamespaceKey(const Slice &ns, const Slice &key, bool slot_id_encoded);
Expand Down
Loading

0 comments on commit 5f61cb8

Please sign in to comment.