Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an unify key-value iterator for Kvrocks #2004

Merged
merged 6 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "iterator.h"

#include <cluster/redis_slot.h>

#include "db_util.h"

namespace engine {
DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot)
: storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
metadata_iter_ = std::move(util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_)));
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
}

void DBIterator::Next() {
metadata_iter_->Next();
nextUntilValid();
}

void DBIterator::nextUntilValid() {
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
// slot_ != -1 means we would like to iterate all keys in the slot
// so we can skip the afterwards keys if the slot id doesn't match
if (slot_ != -1 && metadata_iter_->Valid()) {
auto [_, user_key] = ExtractNamespaceKey(metadata_iter_->key(), storage_->IsSlotIdEncoded());
// Release the iterator if the slot id doesn't match
if (GetSlotIdFromKey(user_key.ToString()) != slot_) {
Reset();
return;
}
}

while (metadata_iter_->Valid()) {
Metadata metadata(kRedisNone, false);
// Skip the metadata if it's expired
if (metadata.Decode(metadata_iter_->value()).ok() && !metadata.Expired()) {
metadata_ = metadata;
break;
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
}
metadata_iter_->Next();
}
}

bool DBIterator::Valid() const { return metadata_iter_ && metadata_iter_->Valid(); }

Slice DBIterator::Key() const { return Valid() ? metadata_iter_->key() : Slice(); }

std::tuple<Slice, Slice> DBIterator::UserKey() const {
if (!Valid()) {
return {};
}
return ExtractNamespaceKey(metadata_iter_->key(), slot_ != -1);
}

Slice DBIterator::Value() const { return Valid() ? metadata_iter_->value() : Slice(); }

RedisType DBIterator::Type() const { return Valid() ? metadata_.Type() : kRedisNone; }

void DBIterator::Reset() {
if (metadata_iter_) metadata_iter_.reset();
}

void DBIterator::Seek(const std::string& target) {
metadata_iter_->Reset();

// Iterate with the slot id but storage didn't enable slot id encoding
if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
Reset();
return;
}
std::string prefix = target;
if (slot_ != -1) {
// Use the slot id as the prefix if it's specified
prefix = ComposeSlotKeyPrefix(kDefaultNamespace, slot_) + target;
}

metadata_iter_->Seek(prefix);
nextUntilValid();
}

std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
if (!Valid()) {
return nullptr;
}

// The string/json type doesn't have sub keys
RedisType type = metadata_.Type();
if (type == kRedisNone || type == kRedisString || type == kRedisJson) {
return nullptr;
}

auto prefix = InternalKey(Key(), "", metadata_.version, storage_->IsSlotIdEncoded()).Encode();
return std::make_unique<SubKeyIterator>(storage_, read_options_, type, prefix);
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
}

SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
: storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) {
if (type_ == kRedisStream) {
cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
} else {
cf_handle_ = storage_->GetCFHandle(kSubkeyColumnFamilyName);
}
iter_ = std::move(util::UniqueIterator(storage_->NewIterator(read_options_, cf_handle_)));
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
}

void SubKeyIterator::Next() {
iter_->Next();
git-hulk marked this conversation as resolved.
Show resolved Hide resolved

if (!Valid()) return;

if (!iter_->key().starts_with(prefix_)) {
Reset();
}
}

bool SubKeyIterator::Valid() const { return iter_ && iter_->Valid(); }

Slice SubKeyIterator::Key() const { return Valid() ? iter_->key() : Slice(); }

Slice SubKeyIterator::UserKey() const {
if (!Valid()) return {};

const InternalKey internal_key(iter_->key(), storage_->IsSlotIdEncoded());
return internal_key.GetSubKey();
}

Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); }

void SubKeyIterator::Seek() {
iter_->Reset();
iter_->Seek(prefix_);

if (!iter_->Valid()) return;
// For the subkey iterator, it MUST contain the prefix key itself
if (!iter_->key().starts_with(prefix_)) {
Reset();
}
}

void SubKeyIterator::Reset() {
if (iter_) iter_.reset();
}

} // namespace engine
82 changes: 82 additions & 0 deletions src/storage/iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#pragma once

#include <rocksdb/iterator.h>
#include <rocksdb/options.h>

#include "storage.h"

namespace engine {

class SubKeyIterator {
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
public:
explicit SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix);
~SubKeyIterator() = default;
bool Valid() const;
void Seek();
void Next();
// return the raw key in rocksdb
Slice Key() const;
// return the user key without prefix
Slice UserKey() const;
Slice Value() const;
void Reset();

private:
Storage *storage_;
rocksdb::ReadOptions read_options_;
RedisType type_;
std::string prefix_;
std::unique_ptr<rocksdb::Iterator> iter_;
rocksdb::ColumnFamilyHandle *cf_handle_ = nullptr;
};

class DBIterator {
public:
explicit DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot = -1);
~DBIterator() = default;

bool Valid() const;
void Seek(const std::string &target = "");
void Next();
// return the raw key in rocksdb
Slice Key() const;
// return the namespace and user key without prefix
std::tuple<Slice, Slice> UserKey() const;
Slice Value() const;
RedisType Type() const;
void Reset();
std::unique_ptr<SubKeyIterator> GetSubKeyIterator() const;

private:
void nextUntilValid();

Storage *storage_;
rocksdb::ReadOptions read_options_;
int slot_ = -1;
Metadata metadata_ = Metadata(kRedisNone, false);

rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = nullptr;
std::unique_ptr<rocksdb::Iterator> metadata_iter_;
std::unique_ptr<SubKeyIterator> subkey_iter_;
};

} // namespace engine
Loading