From 0f70aa01143cd686d3ac52996f7db3725b3d8dce Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sat, 24 Feb 2024 00:31:25 +0900 Subject: [PATCH 1/5] Implement index updating for numeric and tag field --- src/common/string_util.cc | 12 ++--- src/common/string_util.h | 8 ++-- src/search/indexer.cc | 85 +++++++++++++++++++++++++++++++++++- src/search/indexer.h | 5 ++- src/search/search_encoding.h | 2 + src/storage/storage.cc | 8 +++- src/storage/storage.h | 2 + 7 files changed, 107 insertions(+), 15 deletions(-) diff --git a/src/common/string_util.cc b/src/common/string_util.cc index b3004a3afa1..ae41f918af9 100644 --- a/src/common/string_util.cc +++ b/src/common/string_util.cc @@ -47,7 +47,7 @@ bool EqualICase(std::string_view lhs, std::string_view rhs) { [](char l, char r) { return std::tolower(l) == std::tolower(r); }); } -std::string Trim(std::string in, const std::string &chars) { +std::string Trim(std::string in, std::string_view chars) { if (in.empty()) return in; in.erase(0, in.find_first_not_of(chars)); @@ -56,7 +56,7 @@ std::string Trim(std::string in, const std::string &chars) { return in; } -std::vector Split(const std::string &in, const std::string &delim) { +std::vector Split(std::string_view in, std::string_view delim) { std::vector out; if (in.empty()) { @@ -71,8 +71,8 @@ std::vector Split(const std::string &in, const std::string &delim) size_t begin = 0, end = in.find_first_of(delim); do { - std::string elem = in.substr(begin, end - begin); - if (!elem.empty()) out.push_back(std::move(elem)); + std::string_view elem = in.substr(begin, end - begin); + if (!elem.empty()) out.emplace_back(elem.begin(), elem.end()); if (end == std::string::npos) break; begin = end + 1; end = in.find_first_of(delim, begin); @@ -228,7 +228,7 @@ std::vector RegexMatch(const std::string &str, const std::string &r return out; } -std::string StringToHex(const std::string &input) { +std::string StringToHex(std::string_view input) { static const char hex_digits[] = "0123456789ABCDEF"; std::string output; output.reserve(input.length() * 2); @@ -331,7 +331,7 @@ std::vector TokenizeRedisProtocol(const std::string &value) { /* escape string where all the non-printable characters * (tested with isprint()) are turned into escapes in * the form "\n\r\a...." or "\x". */ -std::string EscapeString(const std::string &s) { +std::string EscapeString(std::string_view s) { std::string str; str.reserve(s.size()); diff --git a/src/common/string_util.h b/src/common/string_util.h index 2ebd7639673..d23ebad7b90 100644 --- a/src/common/string_util.h +++ b/src/common/string_util.h @@ -28,15 +28,15 @@ std::string Float2String(double d); std::string ToLower(std::string in); bool EqualICase(std::string_view lhs, std::string_view rhs); std::string BytesToHuman(uint64_t n); -std::string Trim(std::string in, const std::string &chars); -std::vector Split(const std::string &in, const std::string &delim); +std::string Trim(std::string in, std::string_view chars); +std::vector Split(std::string_view in, std::string_view delim); std::vector Split2KV(const std::string &in, const std::string &delim); bool HasPrefix(const std::string &str, const std::string &prefix); int StringMatch(const std::string &pattern, const std::string &in, int nocase); int StringMatchLen(const char *p, size_t plen, const char *s, size_t slen, int nocase); std::vector RegexMatch(const std::string &str, const std::string ®ex); -std::string StringToHex(const std::string &input); +std::string StringToHex(std::string_view input); std::vector TokenizeRedisProtocol(const std::string &value); -std::string EscapeString(const std::string &s); +std::string EscapeString(std::string_view s); } // namespace util diff --git a/src/search/indexer.cc b/src/search/indexer.cc index f608d3df6dc..e73c82f4523 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -22,7 +22,11 @@ #include +#include "parse_util.h" +#include "search/search_encoding.h" #include "storage/redis_metadata.h" +#include "storage/storage.h" +#include "string_util.h" #include "types/redis_hash.h" namespace redis { @@ -79,12 +83,12 @@ StatusOr IndexUpdater::Record(std::string_view key, c auto s = db.Type(key, &type); if (!s.ok()) return {Status::NotOK, s.ToString()}; - if (type != static_cast(on_data_type)) { + if (type != static_cast(metadata.on_data_type)) { // not the expected type, stop record return {Status::NotOK, "this data type cannot be indexed"}; } - auto retriever = GET_OR_RET(FieldValueRetriever::Create(on_data_type, key, indexer->storage, ns)); + auto retriever = GET_OR_RET(FieldValueRetriever::Create(metadata.on_data_type, key, indexer->storage, ns)); FieldValues values; for (const auto &[field, info] : fields) { @@ -99,6 +103,83 @@ StatusOr IndexUpdater::Record(std::string_view key, c return values; } +Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original, + std::string_view current, const std::string &ns) { + if (original == current) { + // the value of this field is unchanged, no need to update + return Status::OK(); + } + + auto iter = fields.find(field); + if (iter == fields.end()) { + return {Status::NotOK, "No such field to do index updating"}; + } + + auto *metadata = iter->second.get(); + auto *storage = indexer->storage; + auto ns_key = ComposeNamespaceKey(ns, name, storage->IsSlotIdEncoded()); + if (auto tag = dynamic_cast(metadata)) { + const char delim[] = {tag->separator, '\0'}; + auto original_tags = util::Split(original, delim); + auto current_tags = util::Split(current, delim); + + std::set tags_to_delete(original_tags.begin(), original_tags.end()); + std::set tags_to_add(current_tags.begin(), current_tags.end()); + + for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) { + if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) { + it = tags_to_delete.erase(it); + tags_to_add.erase(jt); + } else { + ++it; + } + } + + auto batch = storage->GetWriteBatchBase(); + for (const auto &tag : tags_to_delete) { + auto sub_key = ConstructTagFieldSubkey(field, tag, key); + auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); + + batch->Delete(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode()); + } + + for (const auto &tag : tags_to_add) { + auto sub_key = ConstructTagFieldSubkey(field, tag, key); + auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); + + batch->Put(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode(), Slice()); + } + + auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!s.ok()) return {Status::NotOK, s.ToString()}; + } else if ([[maybe_unused]] auto numeric = dynamic_cast(metadata)) { + auto batch = storage->GetWriteBatchBase(); + + if (!original.empty()) { + auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end()))); + auto sub_key = ConstructNumericFieldSubkey(field, original_num, key); + auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); + + batch->Delete(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode()); + } + + if (!current.empty()) { + auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end()))); + auto sub_key = ConstructNumericFieldSubkey(field, current_num, key); + auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); + + batch->Put(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode(), Slice()); + } + + auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!s.ok()) return {Status::NotOK, s.ToString()}; + } else { + return {Status::NotOK, "Unexpected field type"}; + } + + return Status::OK(); +} + void GlobalIndexer::Add(IndexUpdater updater) { auto &up = updaters.emplace_back(std::move(updater)); for (const auto &prefix : up.prefixes) { diff --git a/src/search/indexer.h b/src/search/indexer.h index e153d55584c..4112fee636f 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -69,12 +69,15 @@ struct FieldValueRetriever { struct IndexUpdater { using FieldValues = std::map; - SearchOnDataType on_data_type; + std::string name; + SearchMetadata metadata; std::vector prefixes; std::map> fields; GlobalIndexer *indexer = nullptr; StatusOr Record(std::string_view key, const std::string &ns); + Status UpdateIndex(const std::string &field, std::string_view key, std::string_view original, + std::string_view current, const std::string &ns); }; struct GlobalIndexer { diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h index 2acec050dde..1637a504c28 100644 --- a/src/search/search_encoding.h +++ b/src/search/search_encoding.h @@ -73,6 +73,8 @@ struct SearchFieldMetadata { void DecodeFlag(uint8_t flag) { noindex = flag & 1; } + virtual ~SearchFieldMetadata() = default; + virtual void Encode(std::string *dst) const { PutFixed8(dst, MakeFlag()); } virtual rocksdb::Status Decode(Slice *input) { diff --git a/src/storage/storage.cc b/src/storage/storage.cc index a74e49d3523..56cda4aede7 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -231,8 +231,9 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options &options) { rocksdb::ColumnFamilyOptions cf_options(options); auto res = util::DBOpen(options, config_->db_dir); if (res) { - std::vector cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, kPubSubColumnFamilyName, - kPropagateColumnFamilyName, kStreamColumnFamilyName}; + std::vector cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, + kPubSubColumnFamilyName, kPropagateColumnFamilyName, + kStreamColumnFamilyName, kSearchColumnFamilyName}; std::vector cf_handles; auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles); if (!s.ok()) { @@ -339,6 +340,7 @@ Status Storage::Open(DBOpenMode mode) { column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts); column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts); column_families.emplace_back(kStreamColumnFamilyName, subkey_opts); + column_families.emplace_back(kSearchColumnFamilyName, subkey_opts); std::vector old_column_families; auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, &old_column_families); @@ -730,6 +732,8 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) { return cf_handles_[4]; } else if (name == kStreamColumnFamilyName) { return cf_handles_[5]; + } else if (name == kSearchColumnFamilyName) { + return cf_handles_[6]; } return cf_handles_[0]; } diff --git a/src/storage/storage.h b/src/storage/storage.h index 0e20425a68d..e36e77bdb46 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -57,6 +57,7 @@ enum ColumnFamilyID { kColumnFamilyIDPubSub, kColumnFamilyIDPropagate, kColumnFamilyIDStream, + kColumnFamilyIDSearch, }; enum DBOpenMode { @@ -73,6 +74,7 @@ constexpr const char *kMetadataColumnFamilyName = "metadata"; constexpr const char *kSubkeyColumnFamilyName = "default"; constexpr const char *kPropagateColumnFamilyName = "propagate"; constexpr const char *kStreamColumnFamilyName = "stream"; +constexpr const char *kSearchColumnFamilyName = "search"; constexpr const char *kPropagateScriptCommand = "script"; From fd26e03a009a82ff7328c703108982812afcb6f5 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sat, 24 Feb 2024 10:08:20 +0900 Subject: [PATCH 2/5] fix --- src/search/indexer.cc | 26 +++++++++++++++++++++++--- src/search/indexer.h | 6 +++++- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/search/indexer.cc b/src/search/indexer.cc index e73c82f4523..c13bbc400f9 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -152,7 +152,7 @@ Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch()); if (!s.ok()) return {Status::NotOK, s.ToString()}; - } else if ([[maybe_unused]] auto numeric = dynamic_cast(metadata)) { + } else if (auto numeric [[maybe_unused]] = dynamic_cast(metadata)) { auto batch = storage->GetWriteBatchBase(); if (!original.empty()) { @@ -180,6 +180,25 @@ Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, return Status::OK(); } +Status IndexUpdater::Update(const FieldValues &original, std::string_view key, const std::string &ns) { + auto current = GET_OR_RET(Record(key, ns)); + + for (const auto &[field, _] : fields) { + std::string_view original_val, current_val; + + if (auto it = original.find(field); it != original.end()) { + original_val = it->second; + } + if (auto it = current.find(field); it != current.end()) { + current_val = it->second; + } + + GET_OR_RET(UpdateIndex(field, key, original_val, current_val, ns)); + } + + return Status::OK(); +} + void GlobalIndexer::Add(IndexUpdater updater) { auto &up = updaters.emplace_back(std::move(updater)); for (const auto &prefix : up.prefixes) { @@ -187,10 +206,11 @@ void GlobalIndexer::Add(IndexUpdater updater) { } } -StatusOr GlobalIndexer::Record(std::string_view key, const std::string &ns) { +StatusOr GlobalIndexer::Record(std::string_view key, const std::string &ns) { auto iter = prefix_map.longest_prefix(key); if (iter != prefix_map.end()) { - return iter.value()->Record(key, ns); + auto updater = iter.value(); + return std::make_pair(updater, GET_OR_RET(updater->Record(key, ns))); } return {Status::NoPrefixMatched}; diff --git a/src/search/indexer.h b/src/search/indexer.h index 4112fee636f..f2d588414a7 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -78,9 +78,13 @@ struct IndexUpdater { StatusOr Record(std::string_view key, const std::string &ns); Status UpdateIndex(const std::string &field, std::string_view key, std::string_view original, std::string_view current, const std::string &ns); + Status Update(const FieldValues &original, std::string_view key, const std::string &ns); }; struct GlobalIndexer { + using FieldValues = IndexUpdater::FieldValues; + using RecordResult = std::pair; + std::deque updaters; tsl::htrie_map prefix_map; @@ -89,7 +93,7 @@ struct GlobalIndexer { explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {} void Add(IndexUpdater updater); - StatusOr Record(std::string_view key, const std::string &ns); + StatusOr Record(std::string_view key, const std::string &ns); }; } // namespace redis From 1dd2ea8a44dd456bcb59a9d72a0699f1455f0894 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sat, 24 Feb 2024 10:14:24 +0900 Subject: [PATCH 3/5] add GlobalIndex::Update --- src/search/indexer.cc | 4 ++++ src/search/indexer.h | 1 + 2 files changed, 5 insertions(+) diff --git a/src/search/indexer.cc b/src/search/indexer.cc index c13bbc400f9..5b0ac0451f7 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -216,4 +216,8 @@ StatusOr GlobalIndexer::Record(std::string_view key return {Status::NoPrefixMatched}; } +Status GlobalIndexer::Update(const RecordResult &original, std::string_view key, const std::string &ns) { + return original.first->Update(original.second, key, ns); +} + } // namespace redis diff --git a/src/search/indexer.h b/src/search/indexer.h index f2d588414a7..001ade1346d 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -94,6 +94,7 @@ struct GlobalIndexer { void Add(IndexUpdater updater); StatusOr Record(std::string_view key, const std::string &ns); + static Status Update(const RecordResult &original, std::string_view key, const std::string &ns); }; } // namespace redis From 5d68bc520e9294c6da1f23cab14207ec12ad9e8f Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sat, 24 Feb 2024 16:47:55 +0900 Subject: [PATCH 4/5] add new status code --- src/common/status.h | 14 +++----------- src/search/indexer.cc | 2 +- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/common/status.h b/src/common/status.h index ade19f86d0e..b425ea5b238 100644 --- a/src/common/status.h +++ b/src/common/status.h @@ -37,38 +37,30 @@ class [[nodiscard]] Status { enum Code : unsigned char { NotOK = 1, NotFound, + NotSupported, + InvalidArgument, // DB DBOpenErr, DBBackupErr, DBGetWALErr, - DBBackupFileErr, - - // Replication - DBMismatched, // Redis RedisUnknownCmd, RedisInvalidCmd, RedisParseErr, RedisExecErr, - RedisReplicationConflict, // Cluster ClusterDown, ClusterInvalidInfo, - // Slot - SlotImport, - - // Network - NetSendErr, - // Blocking BlockingCmd, // Search NoPrefixMatched, + TypeMismatched, }; Status() : impl_{nullptr} {} diff --git a/src/search/indexer.cc b/src/search/indexer.cc index 5b0ac0451f7..902516f8f1a 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -85,7 +85,7 @@ StatusOr IndexUpdater::Record(std::string_view key, c if (type != static_cast(metadata.on_data_type)) { // not the expected type, stop record - return {Status::NotOK, "this data type cannot be indexed"}; + return {Status::TypeMismatched}; } auto retriever = GET_OR_RET(FieldValueRetriever::Create(metadata.on_data_type, key, indexer->storage, ns)); From 1b9ec94e5c56f6a8f602424247d5279f7982c841 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sun, 25 Feb 2024 13:39:58 +0900 Subject: [PATCH 5/5] add skip --- src/search/indexer.cc | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/search/indexer.cc b/src/search/indexer.cc index 902516f8f1a..4576e280510 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -135,32 +135,40 @@ Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, } } + if (tags_to_add.empty() && tags_to_delete.empty()) { + // no change, skip index updating + return Status::OK(); + } + auto batch = storage->GetWriteBatchBase(); + auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName); + for (const auto &tag : tags_to_delete) { auto sub_key = ConstructTagFieldSubkey(field, tag, key); auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); - batch->Delete(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode()); + batch->Delete(cf_handle, index_key.Encode()); } for (const auto &tag : tags_to_add) { auto sub_key = ConstructTagFieldSubkey(field, tag, key); auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); - batch->Put(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode(), Slice()); + batch->Put(cf_handle, index_key.Encode(), Slice()); } auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch()); if (!s.ok()) return {Status::NotOK, s.ToString()}; } else if (auto numeric [[maybe_unused]] = dynamic_cast(metadata)) { auto batch = storage->GetWriteBatchBase(); + auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName); if (!original.empty()) { auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end()))); auto sub_key = ConstructNumericFieldSubkey(field, original_num, key); auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); - batch->Delete(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode()); + batch->Delete(cf_handle, index_key.Encode()); } if (!current.empty()) { @@ -168,7 +176,7 @@ Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, auto sub_key = ConstructNumericFieldSubkey(field, current_num, key); auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); - batch->Put(storage->GetCFHandle(engine::kSearchColumnFamilyName), index_key.Encode(), Slice()); + batch->Put(cf_handle, index_key.Encode(), Slice()); } auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());