Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement index updating for numeric and tag field #2115

Merged
merged 6 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,30 @@ class [[nodiscard]] Status {
enum Code : unsigned char {
NotOK = 1,
NotFound,
NotSupported,
InvalidArgument,

// DB
DBOpenErr,
DBBackupErr,
DBGetWALErr,
DBBackupFileErr,

// Replication
DBMismatched,

// Redis
RedisUnknownCmd,
RedisInvalidCmd,
RedisParseErr,
RedisExecErr,
RedisReplicationConflict,

// Cluster
ClusterDown,
ClusterInvalidInfo,

// Slot
SlotImport,

// Network
NetSendErr,

// Blocking
BlockingCmd,

// Search
NoPrefixMatched,
TypeMismatched,
};

Status() : impl_{nullptr} {}
Expand Down
12 changes: 6 additions & 6 deletions src/common/string_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ bool EqualICase(std::string_view lhs, std::string_view rhs) {
[](char l, char r) { return std::tolower(l) == std::tolower(r); });
}

std::string Trim(std::string in, const std::string &chars) {
std::string Trim(std::string in, std::string_view chars) {
if (in.empty()) return in;

in.erase(0, in.find_first_not_of(chars));
Expand All @@ -56,7 +56,7 @@ std::string Trim(std::string in, const std::string &chars) {
return in;
}

std::vector<std::string> Split(const std::string &in, const std::string &delim) {
std::vector<std::string> Split(std::string_view in, std::string_view delim) {
std::vector<std::string> out;

if (in.empty()) {
Expand All @@ -71,8 +71,8 @@ std::vector<std::string> Split(const std::string &in, const std::string &delim)

size_t begin = 0, end = in.find_first_of(delim);
do {
std::string elem = in.substr(begin, end - begin);
if (!elem.empty()) out.push_back(std::move(elem));
std::string_view elem = in.substr(begin, end - begin);
if (!elem.empty()) out.emplace_back(elem.begin(), elem.end());
if (end == std::string::npos) break;
begin = end + 1;
end = in.find_first_of(delim, begin);
Expand Down Expand Up @@ -228,7 +228,7 @@ std::vector<std::string> RegexMatch(const std::string &str, const std::string &r
return out;
}

std::string StringToHex(const std::string &input) {
std::string StringToHex(std::string_view input) {
static const char hex_digits[] = "0123456789ABCDEF";
std::string output;
output.reserve(input.length() * 2);
Expand Down Expand Up @@ -331,7 +331,7 @@ std::vector<std::string> TokenizeRedisProtocol(const std::string &value) {
/* escape string where all the non-printable characters
* (tested with isprint()) are turned into escapes in
* the form "\n\r\a...." or "\x<hex-number>". */
std::string EscapeString(const std::string &s) {
std::string EscapeString(std::string_view s) {
std::string str;
str.reserve(s.size());

Expand Down
8 changes: 4 additions & 4 deletions src/common/string_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ std::string Float2String(double d);
std::string ToLower(std::string in);
bool EqualICase(std::string_view lhs, std::string_view rhs);
std::string BytesToHuman(uint64_t n);
std::string Trim(std::string in, const std::string &chars);
std::vector<std::string> Split(const std::string &in, const std::string &delim);
std::string Trim(std::string in, std::string_view chars);
std::vector<std::string> Split(std::string_view in, std::string_view delim);
std::vector<std::string> Split2KV(const std::string &in, const std::string &delim);
bool HasPrefix(const std::string &str, const std::string &prefix);
int StringMatch(const std::string &pattern, const std::string &in, int nocase);
int StringMatchLen(const char *p, size_t plen, const char *s, size_t slen, int nocase);
std::vector<std::string> RegexMatch(const std::string &str, const std::string &regex);
std::string StringToHex(const std::string &input);
std::string StringToHex(std::string_view input);
std::vector<std::string> TokenizeRedisProtocol(const std::string &value);
std::string EscapeString(const std::string &s);
std::string EscapeString(std::string_view s);

} // namespace util
123 changes: 118 additions & 5 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

#include <variant>

#include "parse_util.h"
#include "search/search_encoding.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "string_util.h"
#include "types/redis_hash.h"

namespace redis {
Expand Down Expand Up @@ -79,12 +83,12 @@ StatusOr<IndexUpdater::FieldValues> IndexUpdater::Record(std::string_view key, c
auto s = db.Type(key, &type);
if (!s.ok()) return {Status::NotOK, s.ToString()};

if (type != static_cast<RedisType>(on_data_type)) {
if (type != static_cast<RedisType>(metadata.on_data_type)) {
// not the expected type, stop record
return {Status::NotOK, "this data type cannot be indexed"};
return {Status::TypeMismatched};
}

auto retriever = GET_OR_RET(FieldValueRetriever::Create(on_data_type, key, indexer->storage, ns));
auto retriever = GET_OR_RET(FieldValueRetriever::Create(metadata.on_data_type, key, indexer->storage, ns));

FieldValues values;
for (const auto &[field, info] : fields) {
Expand All @@ -99,20 +103,129 @@ StatusOr<IndexUpdater::FieldValues> IndexUpdater::Record(std::string_view key, c
return values;
}

Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current, const std::string &ns) {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
}

auto iter = fields.find(field);
if (iter == fields.end()) {
return {Status::NotOK, "No such field to do index updating"};
}

auto *metadata = iter->second.get();
auto *storage = indexer->storage;
auto ns_key = ComposeNamespaceKey(ns, name, storage->IsSlotIdEncoded());
if (auto tag = dynamic_cast<SearchTagFieldMetadata *>(metadata)) {
const char delim[] = {tag->separator, '\0'};
auto original_tags = util::Split(original, delim);
auto current_tags = util::Split(current, delim);

std::set<std::string> tags_to_delete(original_tags.begin(), original_tags.end());
std::set<std::string> tags_to_add(current_tags.begin(), current_tags.end());

for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) {
if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) {
it = tags_to_delete.erase(it);
tags_to_add.erase(jt);
} else {
++it;
}
}

if (tags_to_add.empty() && tags_to_delete.empty()) {
// no change, skip index updating
return Status::OK();
}

auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);

for (const auto &tag : tags_to_delete) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded());

batch->Delete(cf_handle, index_key.Encode());
}

for (const auto &tag : tags_to_add) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded());

batch->Put(cf_handle, index_key.Encode(), Slice());
}

auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
} else if (auto numeric [[maybe_unused]] = dynamic_cast<SearchNumericFieldMetadata *>(metadata)) {
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName);

if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end())));
auto sub_key = ConstructNumericFieldSubkey(field, original_num, key);
auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded());

batch->Delete(cf_handle, index_key.Encode());
}

if (!current.empty()) {
auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end())));
auto sub_key = ConstructNumericFieldSubkey(field, current_num, key);
auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded());

batch->Put(cf_handle, index_key.Encode(), Slice());
}

auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
} else {
return {Status::NotOK, "Unexpected field type"};
}

return Status::OK();
}

Status IndexUpdater::Update(const FieldValues &original, std::string_view key, const std::string &ns) {
auto current = GET_OR_RET(Record(key, ns));

for (const auto &[field, _] : fields) {
std::string_view original_val, current_val;

if (auto it = original.find(field); it != original.end()) {
original_val = it->second;
}
if (auto it = current.find(field); it != current.end()) {
current_val = it->second;
}

GET_OR_RET(UpdateIndex(field, key, original_val, current_val, ns));
}

return Status::OK();
}

void GlobalIndexer::Add(IndexUpdater updater) {
auto &up = updaters.emplace_back(std::move(updater));
for (const auto &prefix : up.prefixes) {
prefix_map.emplace(prefix, &up);
}
}

StatusOr<IndexUpdater::FieldValues> GlobalIndexer::Record(std::string_view key, const std::string &ns) {
StatusOr<GlobalIndexer::RecordResult> GlobalIndexer::Record(std::string_view key, const std::string &ns) {
auto iter = prefix_map.longest_prefix(key);
if (iter != prefix_map.end()) {
return iter.value()->Record(key, ns);
auto updater = iter.value();
return std::make_pair(updater, GET_OR_RET(updater->Record(key, ns)));
}

return {Status::NoPrefixMatched};
}

Status GlobalIndexer::Update(const RecordResult &original, std::string_view key, const std::string &ns) {
return original.first->Update(original.second, key, ns);
}

} // namespace redis
12 changes: 10 additions & 2 deletions src/search/indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,22 @@ struct FieldValueRetriever {
struct IndexUpdater {
using FieldValues = std::map<std::string, std::string>;

SearchOnDataType on_data_type;
std::string name;
SearchMetadata metadata;
std::vector<std::string> prefixes;
std::map<std::string, std::unique_ptr<SearchFieldMetadata>> fields;
GlobalIndexer *indexer = nullptr;

StatusOr<FieldValues> Record(std::string_view key, const std::string &ns);
Status UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current, const std::string &ns);
Status Update(const FieldValues &original, std::string_view key, const std::string &ns);
};

struct GlobalIndexer {
using FieldValues = IndexUpdater::FieldValues;
using RecordResult = std::pair<IndexUpdater *, FieldValues>;

std::deque<IndexUpdater> updaters;
tsl::htrie_map<char, IndexUpdater *> prefix_map;

Expand All @@ -86,7 +93,8 @@ struct GlobalIndexer {
explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {}

void Add(IndexUpdater updater);
StatusOr<IndexUpdater::FieldValues> Record(std::string_view key, const std::string &ns);
StatusOr<RecordResult> Record(std::string_view key, const std::string &ns);
static Status Update(const RecordResult &original, std::string_view key, const std::string &ns);
};

} // namespace redis
2 changes: 2 additions & 0 deletions src/search/search_encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct SearchFieldMetadata {

void DecodeFlag(uint8_t flag) { noindex = flag & 1; }

virtual ~SearchFieldMetadata() = default;

virtual void Encode(std::string *dst) const { PutFixed8(dst, MakeFlag()); }

virtual rocksdb::Status Decode(Slice *input) {
Expand Down
8 changes: 6 additions & 2 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options &options) {
rocksdb::ColumnFamilyOptions cf_options(options);
auto res = util::DBOpen(options, config_->db_dir);
if (res) {
std::vector<std::string> cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, kPubSubColumnFamilyName,
kPropagateColumnFamilyName, kStreamColumnFamilyName};
std::vector<std::string> cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName,
kPubSubColumnFamilyName, kPropagateColumnFamilyName,
kStreamColumnFamilyName, kSearchColumnFamilyName};
std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
if (!s.ok()) {
Expand Down Expand Up @@ -339,6 +340,7 @@ Status Storage::Open(DBOpenMode mode) {
column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts);
column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts);
column_families.emplace_back(kStreamColumnFamilyName, subkey_opts);
column_families.emplace_back(kSearchColumnFamilyName, subkey_opts);

std::vector<std::string> old_column_families;
auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, &old_column_families);
Expand Down Expand Up @@ -730,6 +732,8 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) {
return cf_handles_[4];
} else if (name == kStreamColumnFamilyName) {
return cf_handles_[5];
} else if (name == kSearchColumnFamilyName) {
return cf_handles_[6];
}
return cf_handles_[0];
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ enum ColumnFamilyID {
kColumnFamilyIDPubSub,
kColumnFamilyIDPropagate,
kColumnFamilyIDStream,
kColumnFamilyIDSearch,
};

enum DBOpenMode {
Expand All @@ -73,6 +74,7 @@ constexpr const char *kMetadataColumnFamilyName = "metadata";
constexpr const char *kSubkeyColumnFamilyName = "default";
constexpr const char *kPropagateColumnFamilyName = "propagate";
constexpr const char *kStreamColumnFamilyName = "stream";
constexpr const char *kSearchColumnFamilyName = "search";

constexpr const char *kPropagateScriptCommand = "script";

Expand Down
Loading