Skip to content

Commit

Permalink
Implement index updating for numeric and tag field
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice committed Feb 24, 2024
1 parent 7571034 commit 0f70aa0
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 15 deletions.
12 changes: 6 additions & 6 deletions src/common/string_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ bool EqualICase(std::string_view lhs, std::string_view rhs) {
[](char l, char r) { return std::tolower(l) == std::tolower(r); });
}

std::string Trim(std::string in, const std::string &chars) {
std::string Trim(std::string in, std::string_view chars) {
if (in.empty()) return in;

in.erase(0, in.find_first_not_of(chars));
Expand All @@ -56,7 +56,7 @@ std::string Trim(std::string in, const std::string &chars) {
return in;
}

std::vector<std::string> Split(const std::string &in, const std::string &delim) {
std::vector<std::string> Split(std::string_view in, std::string_view delim) {
std::vector<std::string> out;

if (in.empty()) {
Expand All @@ -71,8 +71,8 @@ std::vector<std::string> Split(const std::string &in, const std::string &delim)

size_t begin = 0, end = in.find_first_of(delim);
do {
std::string elem = in.substr(begin, end - begin);
if (!elem.empty()) out.push_back(std::move(elem));
std::string_view elem = in.substr(begin, end - begin);
if (!elem.empty()) out.emplace_back(elem.begin(), elem.end());
if (end == std::string::npos) break;
begin = end + 1;
end = in.find_first_of(delim, begin);
Expand Down Expand Up @@ -228,7 +228,7 @@ std::vector<std::string> RegexMatch(const std::string &str, const std::string &r
return out;
}

std::string StringToHex(const std::string &input) {
std::string StringToHex(std::string_view input) {
static const char hex_digits[] = "0123456789ABCDEF";
std::string output;
output.reserve(input.length() * 2);
Expand Down Expand Up @@ -331,7 +331,7 @@ std::vector<std::string> TokenizeRedisProtocol(const std::string &value) {
/* escape string where all the non-printable characters
* (tested with isprint()) are turned into escapes in
* the form "\n\r\a...." or "\x<hex-number>". */
std::string EscapeString(const std::string &s) {
std::string EscapeString(std::string_view s) {
std::string str;
str.reserve(s.size());

Expand Down
8 changes: 4 additions & 4 deletions src/common/string_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ std::string Float2String(double d);
std::string ToLower(std::string in);
bool EqualICase(std::string_view lhs, std::string_view rhs);
std::string BytesToHuman(uint64_t n);
std::string Trim(std::string in, const std::string &chars);
std::vector<std::string> Split(const std::string &in, const std::string &delim);
std::string Trim(std::string in, std::string_view chars);
std::vector<std::string> Split(std::string_view in, std::string_view delim);
std::vector<std::string> Split2KV(const std::string &in, const std::string &delim);
bool HasPrefix(const std::string &str, const std::string &prefix);
int StringMatch(const std::string &pattern, const std::string &in, int nocase);
int StringMatchLen(const char *p, size_t plen, const char *s, size_t slen, int nocase);
std::vector<std::string> RegexMatch(const std::string &str, const std::string &regex);
std::string StringToHex(const std::string &input);
std::string StringToHex(std::string_view input);
std::vector<std::string> TokenizeRedisProtocol(const std::string &value);
std::string EscapeString(const std::string &s);
std::string EscapeString(std::string_view s);

} // namespace util
85 changes: 83 additions & 2 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

#include <variant>

#include "parse_util.h"
#include "search/search_encoding.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "string_util.h"
#include "types/redis_hash.h"

namespace redis {
Expand Down Expand Up @@ -79,12 +83,12 @@ StatusOr<IndexUpdater::FieldValues> IndexUpdater::Record(std::string_view key, c
auto s = db.Type(key, &type);
if (!s.ok()) return {Status::NotOK, s.ToString()};

if (type != static_cast<RedisType>(on_data_type)) {
if (type != static_cast<RedisType>(metadata.on_data_type)) {
// not the expected type, stop record
return {Status::NotOK, "this data type cannot be indexed"};
}

auto retriever = GET_OR_RET(FieldValueRetriever::Create(on_data_type, key, indexer->storage, ns));
auto retriever = GET_OR_RET(FieldValueRetriever::Create(metadata.on_data_type, key, indexer->storage, ns));

FieldValues values;
for (const auto &[field, info] : fields) {
Expand All @@ -99,6 +103,83 @@ StatusOr<IndexUpdater::FieldValues> IndexUpdater::Record(std::string_view key, c
return values;
}

Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current, const std::string &ns) {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
}

auto iter = fields.find(field);
if (iter == fields.end()) {
return {Status::NotOK, "No such field to do index updating"};
}

auto *metadata = iter->second.get();
auto *storage = indexer->storage;
auto ns_key = ComposeNamespaceKey(ns, name, storage->IsSlotIdEncoded());
if (auto tag = dynamic_cast<SearchTagFieldMetadata *>(metadata)) {
const char delim[] = {tag->separator, '\0'};
auto original_tags = util::Split(original, delim);
auto current_tags = util::Split(current, delim);

std::set<std::string> tags_to_delete(original_tags.begin(), original_tags.end());
std::set<std::string> tags_to_add(current_tags.begin(), current_tags.end());

for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) {
if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) {
it = tags_to_delete.erase(it);
tags_to_add.erase(jt);
} else {
++it;
}
}

auto batch = storage->GetWriteBatchBase();
for (const auto &tag : tags_to_delete) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded());

batch->Delete(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode());
}

for (const auto &tag : tags_to_add) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded());

batch->Put(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode(), Slice());
}

auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
} else if ([[maybe_unused]] auto numeric = dynamic_cast<SearchNumericFieldMetadata *>(metadata)) {
auto batch = storage->GetWriteBatchBase();

if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end())));
auto sub_key = ConstructNumericFieldSubkey(field, original_num, key);
auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded());

batch->Delete(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode());
}

if (!current.empty()) {
auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end())));
auto sub_key = ConstructNumericFieldSubkey(field, current_num, key);
auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded());

batch->Put(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode(), Slice());
}

auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
} else {
return {Status::NotOK, "Unexpected field type"};
}

return Status::OK();
}

void GlobalIndexer::Add(IndexUpdater updater) {
auto &up = updaters.emplace_back(std::move(updater));
for (const auto &prefix : up.prefixes) {
Expand Down
5 changes: 4 additions & 1 deletion src/search/indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ struct FieldValueRetriever {
struct IndexUpdater {
using FieldValues = std::map<std::string, std::string>;

SearchOnDataType on_data_type;
std::string name;
SearchMetadata metadata;
std::vector<std::string> prefixes;
std::map<std::string, std::unique_ptr<SearchFieldMetadata>> fields;
GlobalIndexer *indexer = nullptr;

StatusOr<FieldValues> Record(std::string_view key, const std::string &ns);
Status UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current, const std::string &ns);
};

struct GlobalIndexer {
Expand Down
2 changes: 2 additions & 0 deletions src/search/search_encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct SearchFieldMetadata {

void DecodeFlag(uint8_t flag) { noindex = flag & 1; }

virtual ~SearchFieldMetadata() = default;

virtual void Encode(std::string *dst) const { PutFixed8(dst, MakeFlag()); }

virtual rocksdb::Status Decode(Slice *input) {
Expand Down
8 changes: 6 additions & 2 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options &options) {
rocksdb::ColumnFamilyOptions cf_options(options);
auto res = util::DBOpen(options, config_->db_dir);
if (res) {
std::vector<std::string> cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, kPubSubColumnFamilyName,
kPropagateColumnFamilyName, kStreamColumnFamilyName};
std::vector<std::string> cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName,
kPubSubColumnFamilyName, kPropagateColumnFamilyName,
kStreamColumnFamilyName, kSearchColumnFamilyName};
std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
if (!s.ok()) {
Expand Down Expand Up @@ -339,6 +340,7 @@ Status Storage::Open(DBOpenMode mode) {
column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts);
column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts);
column_families.emplace_back(kStreamColumnFamilyName, subkey_opts);
column_families.emplace_back(kSearchColumnFamilyName, subkey_opts);

std::vector<std::string> old_column_families;
auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, &old_column_families);
Expand Down Expand Up @@ -730,6 +732,8 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) {
return cf_handles_[4];
} else if (name == kStreamColumnFamilyName) {
return cf_handles_[5];
} else if (name == kSearchColumnFamilyName) {
return cf_handles_[6];
}
return cf_handles_[0];
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ enum ColumnFamilyID {
kColumnFamilyIDPubSub,
kColumnFamilyIDPropagate,
kColumnFamilyIDStream,
kColumnFamilyIDSearch,
};

enum DBOpenMode {
Expand All @@ -73,6 +74,7 @@ constexpr const char *kMetadataColumnFamilyName = "metadata";
constexpr const char *kSubkeyColumnFamilyName = "default";
constexpr const char *kPropagateColumnFamilyName = "propagate";
constexpr const char *kStreamColumnFamilyName = "stream";
constexpr const char *kSearchColumnFamilyName = "search";

constexpr const char *kPropagateScriptCommand = "script";

Expand Down

0 comments on commit 0f70aa0

Please sign in to comment.