diff --git a/src/common/status.h b/src/common/status.h index ade19f86d0e..b425ea5b238 100644 --- a/src/common/status.h +++ b/src/common/status.h @@ -37,38 +37,30 @@ class [[nodiscard]] Status { enum Code : unsigned char { NotOK = 1, NotFound, + NotSupported, + InvalidArgument, // DB DBOpenErr, DBBackupErr, DBGetWALErr, - DBBackupFileErr, - - // Replication - DBMismatched, // Redis RedisUnknownCmd, RedisInvalidCmd, RedisParseErr, RedisExecErr, - RedisReplicationConflict, // Cluster ClusterDown, ClusterInvalidInfo, - // Slot - SlotImport, - - // Network - NetSendErr, - // Blocking BlockingCmd, // Search NoPrefixMatched, + TypeMismatched, }; Status() : impl_{nullptr} {} diff --git a/src/common/string_util.cc b/src/common/string_util.cc index b3004a3afa1..ae41f918af9 100644 --- a/src/common/string_util.cc +++ b/src/common/string_util.cc @@ -47,7 +47,7 @@ bool EqualICase(std::string_view lhs, std::string_view rhs) { [](char l, char r) { return std::tolower(l) == std::tolower(r); }); } -std::string Trim(std::string in, const std::string &chars) { +std::string Trim(std::string in, std::string_view chars) { if (in.empty()) return in; in.erase(0, in.find_first_not_of(chars)); @@ -56,7 +56,7 @@ std::string Trim(std::string in, const std::string &chars) { return in; } -std::vector Split(const std::string &in, const std::string &delim) { +std::vector Split(std::string_view in, std::string_view delim) { std::vector out; if (in.empty()) { @@ -71,8 +71,8 @@ std::vector Split(const std::string &in, const std::string &delim) size_t begin = 0, end = in.find_first_of(delim); do { - std::string elem = in.substr(begin, end - begin); - if (!elem.empty()) out.push_back(std::move(elem)); + std::string_view elem = in.substr(begin, end - begin); + if (!elem.empty()) out.emplace_back(elem.begin(), elem.end()); if (end == std::string::npos) break; begin = end + 1; end = in.find_first_of(delim, begin); @@ -228,7 +228,7 @@ std::vector RegexMatch(const std::string &str, const std::string &r return out; } -std::string StringToHex(const std::string &input) { +std::string StringToHex(std::string_view input) { static const char hex_digits[] = "0123456789ABCDEF"; std::string output; output.reserve(input.length() * 2); @@ -331,7 +331,7 @@ std::vector TokenizeRedisProtocol(const std::string &value) { /* escape string where all the non-printable characters * (tested with isprint()) are turned into escapes in * the form "\n\r\a...." or "\x". */ -std::string EscapeString(const std::string &s) { +std::string EscapeString(std::string_view s) { std::string str; str.reserve(s.size()); diff --git a/src/common/string_util.h b/src/common/string_util.h index 2ebd7639673..d23ebad7b90 100644 --- a/src/common/string_util.h +++ b/src/common/string_util.h @@ -28,15 +28,15 @@ std::string Float2String(double d); std::string ToLower(std::string in); bool EqualICase(std::string_view lhs, std::string_view rhs); std::string BytesToHuman(uint64_t n); -std::string Trim(std::string in, const std::string &chars); -std::vector Split(const std::string &in, const std::string &delim); +std::string Trim(std::string in, std::string_view chars); +std::vector Split(std::string_view in, std::string_view delim); std::vector Split2KV(const std::string &in, const std::string &delim); bool HasPrefix(const std::string &str, const std::string &prefix); int StringMatch(const std::string &pattern, const std::string &in, int nocase); int StringMatchLen(const char *p, size_t plen, const char *s, size_t slen, int nocase); std::vector RegexMatch(const std::string &str, const std::string ®ex); -std::string StringToHex(const std::string &input); +std::string StringToHex(std::string_view input); std::vector TokenizeRedisProtocol(const std::string &value); -std::string EscapeString(const std::string &s); +std::string EscapeString(std::string_view s); } // namespace util diff --git a/src/search/indexer.cc b/src/search/indexer.cc index f608d3df6dc..4576e280510 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -22,7 +22,11 @@ #include +#include "parse_util.h" +#include "search/search_encoding.h" #include "storage/redis_metadata.h" +#include "storage/storage.h" +#include "string_util.h" #include "types/redis_hash.h" namespace redis { @@ -79,12 +83,12 @@ StatusOr IndexUpdater::Record(std::string_view key, c auto s = db.Type(key, &type); if (!s.ok()) return {Status::NotOK, s.ToString()}; - if (type != static_cast(on_data_type)) { + if (type != static_cast(metadata.on_data_type)) { // not the expected type, stop record - return {Status::NotOK, "this data type cannot be indexed"}; + return {Status::TypeMismatched}; } - auto retriever = GET_OR_RET(FieldValueRetriever::Create(on_data_type, key, indexer->storage, ns)); + auto retriever = GET_OR_RET(FieldValueRetriever::Create(metadata.on_data_type, key, indexer->storage, ns)); FieldValues values; for (const auto &[field, info] : fields) { @@ -99,6 +103,110 @@ StatusOr IndexUpdater::Record(std::string_view key, c return values; } +Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original, + std::string_view current, const std::string &ns) { + if (original == current) { + // the value of this field is unchanged, no need to update + return Status::OK(); + } + + auto iter = fields.find(field); + if (iter == fields.end()) { + return {Status::NotOK, "No such field to do index updating"}; + } + + auto *metadata = iter->second.get(); + auto *storage = indexer->storage; + auto ns_key = ComposeNamespaceKey(ns, name, storage->IsSlotIdEncoded()); + if (auto tag = dynamic_cast(metadata)) { + const char delim[] = {tag->separator, '\0'}; + auto original_tags = util::Split(original, delim); + auto current_tags = util::Split(current, delim); + + std::set tags_to_delete(original_tags.begin(), original_tags.end()); + std::set tags_to_add(current_tags.begin(), current_tags.end()); + + for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) { + if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) { + it = tags_to_delete.erase(it); + tags_to_add.erase(jt); + } else { + ++it; + } + } + + if (tags_to_add.empty() && tags_to_delete.empty()) { + // no change, skip index updating + return Status::OK(); + } + + auto batch = storage->GetWriteBatchBase(); + auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName); + + for (const auto &tag : tags_to_delete) { + auto sub_key = ConstructTagFieldSubkey(field, tag, key); + auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); + + batch->Delete(cf_handle, index_key.Encode()); + } + + for (const auto &tag : tags_to_add) { + auto sub_key = ConstructTagFieldSubkey(field, tag, key); + auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); + + batch->Put(cf_handle, index_key.Encode(), Slice()); + } + + auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!s.ok()) return {Status::NotOK, s.ToString()}; + } else if (auto numeric [[maybe_unused]] = dynamic_cast(metadata)) { + auto batch = storage->GetWriteBatchBase(); + auto cf_handle = storage->GetCFHandle(engine::kSearchColumnFamilyName); + + if (!original.empty()) { + auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end()))); + auto sub_key = ConstructNumericFieldSubkey(field, original_num, key); + auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); + + batch->Delete(cf_handle, index_key.Encode()); + } + + if (!current.empty()) { + auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end()))); + auto sub_key = ConstructNumericFieldSubkey(field, current_num, key); + auto index_key = InternalKey(ns_key, sub_key, this->metadata.version, storage->IsSlotIdEncoded()); + + batch->Put(cf_handle, index_key.Encode(), Slice()); + } + + auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!s.ok()) return {Status::NotOK, s.ToString()}; + } else { + return {Status::NotOK, "Unexpected field type"}; + } + + return Status::OK(); +} + +Status IndexUpdater::Update(const FieldValues &original, std::string_view key, const std::string &ns) { + auto current = GET_OR_RET(Record(key, ns)); + + for (const auto &[field, _] : fields) { + std::string_view original_val, current_val; + + if (auto it = original.find(field); it != original.end()) { + original_val = it->second; + } + if (auto it = current.find(field); it != current.end()) { + current_val = it->second; + } + + GET_OR_RET(UpdateIndex(field, key, original_val, current_val, ns)); + } + + return Status::OK(); +} + void GlobalIndexer::Add(IndexUpdater updater) { auto &up = updaters.emplace_back(std::move(updater)); for (const auto &prefix : up.prefixes) { @@ -106,13 +214,18 @@ void GlobalIndexer::Add(IndexUpdater updater) { } } -StatusOr GlobalIndexer::Record(std::string_view key, const std::string &ns) { +StatusOr GlobalIndexer::Record(std::string_view key, const std::string &ns) { auto iter = prefix_map.longest_prefix(key); if (iter != prefix_map.end()) { - return iter.value()->Record(key, ns); + auto updater = iter.value(); + return std::make_pair(updater, GET_OR_RET(updater->Record(key, ns))); } return {Status::NoPrefixMatched}; } +Status GlobalIndexer::Update(const RecordResult &original, std::string_view key, const std::string &ns) { + return original.first->Update(original.second, key, ns); +} + } // namespace redis diff --git a/src/search/indexer.h b/src/search/indexer.h index e153d55584c..001ade1346d 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -69,15 +69,22 @@ struct FieldValueRetriever { struct IndexUpdater { using FieldValues = std::map; - SearchOnDataType on_data_type; + std::string name; + SearchMetadata metadata; std::vector prefixes; std::map> fields; GlobalIndexer *indexer = nullptr; StatusOr Record(std::string_view key, const std::string &ns); + Status UpdateIndex(const std::string &field, std::string_view key, std::string_view original, + std::string_view current, const std::string &ns); + Status Update(const FieldValues &original, std::string_view key, const std::string &ns); }; struct GlobalIndexer { + using FieldValues = IndexUpdater::FieldValues; + using RecordResult = std::pair; + std::deque updaters; tsl::htrie_map prefix_map; @@ -86,7 +93,8 @@ struct GlobalIndexer { explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {} void Add(IndexUpdater updater); - StatusOr Record(std::string_view key, const std::string &ns); + StatusOr Record(std::string_view key, const std::string &ns); + static Status Update(const RecordResult &original, std::string_view key, const std::string &ns); }; } // namespace redis diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h index 2acec050dde..1637a504c28 100644 --- a/src/search/search_encoding.h +++ b/src/search/search_encoding.h @@ -73,6 +73,8 @@ struct SearchFieldMetadata { void DecodeFlag(uint8_t flag) { noindex = flag & 1; } + virtual ~SearchFieldMetadata() = default; + virtual void Encode(std::string *dst) const { PutFixed8(dst, MakeFlag()); } virtual rocksdb::Status Decode(Slice *input) { diff --git a/src/storage/storage.cc b/src/storage/storage.cc index a74e49d3523..56cda4aede7 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -231,8 +231,9 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options &options) { rocksdb::ColumnFamilyOptions cf_options(options); auto res = util::DBOpen(options, config_->db_dir); if (res) { - std::vector cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, kPubSubColumnFamilyName, - kPropagateColumnFamilyName, kStreamColumnFamilyName}; + std::vector cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, + kPubSubColumnFamilyName, kPropagateColumnFamilyName, + kStreamColumnFamilyName, kSearchColumnFamilyName}; std::vector cf_handles; auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles); if (!s.ok()) { @@ -339,6 +340,7 @@ Status Storage::Open(DBOpenMode mode) { column_families.emplace_back(kPubSubColumnFamilyName, pubsub_opts); column_families.emplace_back(kPropagateColumnFamilyName, propagate_opts); column_families.emplace_back(kStreamColumnFamilyName, subkey_opts); + column_families.emplace_back(kSearchColumnFamilyName, subkey_opts); std::vector old_column_families; auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, &old_column_families); @@ -730,6 +732,8 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) { return cf_handles_[4]; } else if (name == kStreamColumnFamilyName) { return cf_handles_[5]; + } else if (name == kSearchColumnFamilyName) { + return cf_handles_[6]; } return cf_handles_[0]; } diff --git a/src/storage/storage.h b/src/storage/storage.h index 0e20425a68d..e36e77bdb46 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -57,6 +57,7 @@ enum ColumnFamilyID { kColumnFamilyIDPubSub, kColumnFamilyIDPropagate, kColumnFamilyIDStream, + kColumnFamilyIDSearch, }; enum DBOpenMode { @@ -73,6 +74,7 @@ constexpr const char *kMetadataColumnFamilyName = "metadata"; constexpr const char *kSubkeyColumnFamilyName = "default"; constexpr const char *kPropagateColumnFamilyName = "propagate"; constexpr const char *kStreamColumnFamilyName = "stream"; +constexpr const char *kSearchColumnFamilyName = "search"; constexpr const char *kPropagateScriptCommand = "script";