Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into fix_GEORADIUSBY…
Browse files Browse the repository at this point in the history
…MEMBER_store_return
  • Loading branch information
enjoy-binbin committed Sep 11, 2023
2 parents 2e13291 + b9c7359 commit 2b965b5
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 57 deletions.
8 changes: 6 additions & 2 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,9 @@ StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &k
std::string *restore_cmds) {
std::string bytes = encoded_metadata.ToString();
Metadata metadata(kRedisNone, false);
metadata.Decode(bytes);
if (auto s = metadata.Decode(bytes); !s.ok()) {
return {Status::NotOK, s.ToString()};
}

if (metadata.Type() != kRedisString && metadata.Type() != kRedisStream && metadata.size == 0) {
return KeyMigrationResult::kUnderlyingStructEmpty;
Expand Down Expand Up @@ -617,7 +619,9 @@ StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &k
}
case kRedisStream: {
StreamMetadata stream_md(false);
stream_md.Decode(bytes);
if (auto s = stream_md.Decode(bytes); !s.ok()) {
return {Status::NotOK, s.ToString()};
}

auto s = migrateStream(key, stream_md, restore_cmds);
if (!s.IsOK()) {
Expand Down
10 changes: 7 additions & 3 deletions src/commands/cmd_script.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@ class CommandScript : public Commander {
}

if (args_.size() == 2 && subcommand_ == "flush") {
svr->ScriptFlush();
auto s = svr->Propagate(engine::kPropagateScriptCommand, args_);
if (!s.IsOK()) {
auto s = svr->ScriptFlush();
if (!s) {
LOG(ERROR) << "Failed to flush scripts: " << s.Msg();
return s;
}
s = svr->Propagate(engine::kPropagateScriptCommand, args_);
if (!s) {
LOG(ERROR) << "Failed to propagate script command: " << s.Msg();
return s;
}
Expand Down
7 changes: 4 additions & 3 deletions src/common/db_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ StatusOr<std::unique_ptr<T>> WrapOutPtrToUnique(Args&&... args) {
return ptr;
}

inline rocksdb::Status DBOpenForReadOnly(const rocksdb::DBOptions& db_options, const std::string& dbname,
const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
std::vector<rocksdb::ColumnFamilyHandle*>* handles, rocksdb::DB** dbptr) {
[[nodiscard]] inline rocksdb::Status DBOpenForReadOnly(
const rocksdb::DBOptions& db_options, const std::string& dbname,
const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
std::vector<rocksdb::ColumnFamilyHandle*>* handles, rocksdb::DB** dbptr) {
return rocksdb::DB::OpenForReadOnly(db_options, dbname, column_families, handles, dbptr);
}

Expand Down
12 changes: 9 additions & 3 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,11 @@ Status Server::AsyncCompactDB(const std::string &begin_key, const std::string &e
std::unique_ptr<Slice> begin = nullptr, end = nullptr;
if (!begin_key.empty()) begin = std::make_unique<Slice>(begin_key);
if (!end_key.empty()) end = std::make_unique<Slice>(end_key);
storage->Compact(begin.get(), end.get());

auto s = storage->Compact(begin.get(), end.get());
if (!s.ok()) {
LOG(ERROR) << "[task runner] Failed to do compaction: " << s.ToString();
}

std::lock_guard<std::mutex> lg(db_job_mu_);
db_compacting_ = false;
Expand Down Expand Up @@ -1538,10 +1542,12 @@ void Server::ScriptReset() {
lua::DestroyState(lua);
}

void Server::ScriptFlush() {
Status Server::ScriptFlush() {
auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
storage->FlushScripts(storage->DefaultWriteOptions(), cf);
auto s = storage->FlushScripts(storage->DefaultWriteOptions(), cf);
if (!s.ok()) return {Status::NotOK, s.ToString()};
ScriptReset();
return Status::OK();
}

// Generally, we store data into RocksDB and just replicate WAL instead of propagating
Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class Server {
Status ScriptGet(const std::string &sha, std::string *body) const;
Status ScriptSet(const std::string &sha, const std::string &body) const;
void ScriptReset();
void ScriptFlush();
Status ScriptFlush();

Status Propagate(const std::string &channel, const std::vector<std::string> &tokens) const;
Status ExecPropagatedCommand(const std::vector<std::string> &tokens);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
}

Metadata metadata(kRedisNone);
metadata.Decode(value);
auto s = metadata.Decode(value);
if (!s.ok()) return s;

if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
Expand Down
5 changes: 4 additions & 1 deletion src/storage/compaction_checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ void CompactionChecker::PickCompactionFiles(const std::string &cf_name) {
if (best_delete_ratio > 0.1 && !best_start_key.empty() && !best_stop_key.empty()) {
LOG(INFO) << "[compaction checker] Going to compact the key in file: " << best_filename
<< ", delete ratio: " << best_delete_ratio;
storage_->Compact(&best_start_key, &best_stop_key);
auto s = storage_->Compact(&best_start_key, &best_stop_key);
if (!s.ok()) {
LOG(ERROR) << "[compaction checker] Failed to do compaction: " << s.ToString();
}
}
}
51 changes: 35 additions & 16 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,25 @@ rocksdb::Status Database::GetMetadata(RedisType type, const Slice &ns_key, Metad
std::string bytes;
auto s = GetRawMetadata(ns_key, &bytes);
if (!s.ok()) return s;
metadata->Decode(bytes);
s = metadata->Decode(bytes);
if (!s.ok()) return s;

if (metadata->Expired()) {
metadata->Decode(old_metadata);
// error discarded here since it already failed
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound(kErrMsgKeyExpired);
}
if (metadata->Type() != type &&
(metadata->size > 0 || metadata->Type() == kRedisString || metadata->Type() == kRedisStream)) {
metadata->Decode(old_metadata);
// error discarded here since it already failed
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::InvalidArgument(kErrMsgWrongType);
}
if (metadata->size == 0 && type != kRedisStream &&
type != kRedisBloomFilter) { // stream and bloom is allowed to be empty
metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no elements");
// error discarded here since it already failed
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no element found");
}
return s;
}
Expand All @@ -83,7 +87,9 @@ rocksdb::Status Database::Expire(const Slice &user_key, uint64_t timestamp) {
LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
metadata.Decode(value);

s = metadata.Decode(value);
if (!s.ok()) return s;
if (metadata.Expired()) {
return rocksdb::Status::NotFound(kErrMsgKeyExpired);
}
Expand Down Expand Up @@ -114,7 +120,8 @@ rocksdb::Status Database::Del(const Slice &user_key) {
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
Metadata metadata(kRedisNone, false);
metadata.Decode(value);
s = metadata.Decode(value);
if (!s.ok()) return s;
if (metadata.Expired()) {
return rocksdb::Status::NotFound(kErrMsgKeyExpired);
}
Expand Down Expand Up @@ -155,7 +162,8 @@ rocksdb::Status Database::MDel(const std::vector<Slice> &keys, uint64_t *deleted
if (statuses[i].IsNotFound()) continue;

Metadata metadata(kRedisNone, false);
metadata.Decode(pin_values[i]);
auto s = metadata.Decode(pin_values[i]);
if (!s.ok()) continue;
if (metadata.Expired()) continue;

batch->Delete(metadata_cf_handle_, lock_keys[i]);
Expand All @@ -181,7 +189,8 @@ rocksdb::Status Database::Exists(const std::vector<Slice> &keys, int *ret) {
if (!s.ok() && !s.IsNotFound()) return s;
if (s.ok()) {
Metadata metadata(kRedisNone, false);
metadata.Decode(value);
s = metadata.Decode(value);
if (!s.ok()) return s;
if (!metadata.Expired()) *ret += 1;
}
}
Expand All @@ -200,15 +209,18 @@ rocksdb::Status Database::TTL(const Slice &user_key, int64_t *ttl) {
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Metadata metadata(kRedisNone, false);
metadata.Decode(value);
s = metadata.Decode(value);
if (!s.ok()) return s;
*ttl = metadata.TTL();

return rocksdb::Status::OK();
}

void Database::GetKeyNumStats(const std::string &prefix, KeyNumStats *stats) { Keys(prefix, nullptr, stats); }
rocksdb::Status Database::GetKeyNumStats(const std::string &prefix, KeyNumStats *stats) {
return Keys(prefix, nullptr, stats);
}

void Database::Keys(const std::string &prefix, std::vector<std::string> *keys, KeyNumStats *stats) {
rocksdb::Status Database::Keys(const std::string &prefix, std::vector<std::string> *keys, KeyNumStats *stats) {
uint16_t slot_id = 0;
std::string ns_prefix;
if (namespace_ != kDefaultNamespace || keys != nullptr) {
Expand Down Expand Up @@ -236,7 +248,8 @@ void Database::Keys(const std::string &prefix, std::vector<std::string> *keys, K
break;
}
Metadata metadata(kRedisNone, false);
metadata.Decode(iter->value());
auto s = metadata.Decode(iter->value());
if (!s.ok()) continue;
if (metadata.Expired()) {
if (stats) stats->n_expired++;
continue;
Expand Down Expand Up @@ -267,6 +280,8 @@ void Database::Keys(const std::string &prefix, std::vector<std::string> *keys, K
if (stats && stats->n_expires > 0) {
stats->avg_ttl = ttl_sum / stats->n_expires / 1000;
}

return rocksdb::Status::OK();
}

rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const std::string &prefix,
Expand Down Expand Up @@ -312,7 +327,9 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
break;
}
Metadata metadata(kRedisNone, false);
metadata.Decode(iter->value());
auto s = metadata.Decode(iter->value());
if (!s.ok()) continue;

if (metadata.Expired()) continue;
std::tie(std::ignore, user_key) = ExtractNamespaceKey<std::string>(iter->key(), storage_->IsSlotIdEncoded());
keys->emplace_back(user_key);
Expand Down Expand Up @@ -432,7 +449,8 @@ rocksdb::Status Database::Dump(const Slice &user_key, std::vector<std::string> *
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Metadata metadata(kRedisNone, false);
metadata.Decode(value);
s = metadata.Decode(value);
if (!s.ok()) return s;

infos->emplace_back("namespace");
infos->emplace_back(namespace_);
Expand Down Expand Up @@ -484,7 +502,8 @@ rocksdb::Status Database::Type(const Slice &user_key, RedisType *type) {
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Metadata metadata(kRedisNone, false);
metadata.Decode(value);
s = metadata.Decode(value);
if (!s.ok()) return s;
if (metadata.Expired()) {
*type = kRedisNone;
} else {
Expand Down
5 changes: 3 additions & 2 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ class Database {
rocksdb::Status Dump(const Slice &user_key, std::vector<std::string> *infos);
rocksdb::Status FlushDB();
rocksdb::Status FlushAll();
void GetKeyNumStats(const std::string &prefix, KeyNumStats *stats);
void Keys(const std::string &prefix, std::vector<std::string> *keys = nullptr, KeyNumStats *stats = nullptr);
rocksdb::Status GetKeyNumStats(const std::string &prefix, KeyNumStats *stats);
rocksdb::Status Keys(const std::string &prefix, std::vector<std::string> *keys = nullptr,
KeyNumStats *stats = nullptr);
rocksdb::Status Scan(const std::string &cursor, uint64_t limit, const std::string &prefix,
std::vector<std::string> *keys, std::string *end_cursor = nullptr);
rocksdb::Status RandomKey(const std::string &cursor, std::string *key);
Expand Down
4 changes: 2 additions & 2 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ class Metadata {
bool ExpireAt(uint64_t expired_ts) const;

virtual void Encode(std::string *dst) const;
virtual rocksdb::Status Decode(Slice *input);
rocksdb::Status Decode(Slice input);
[[nodiscard]] virtual rocksdb::Status Decode(Slice *input);
[[nodiscard]] rocksdb::Status Decode(Slice input);

bool operator==(const Metadata &that) const;
virtual ~Metadata() = default;
Expand Down
19 changes: 10 additions & 9 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,28 @@ class Storage {
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();

rocksdb::Status Get(const rocksdb::ReadOptions &options, const rocksdb::Slice &key, std::string *value);
rocksdb::Status Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Slice &key, std::string *value);
[[nodiscard]] rocksdb::Status Get(const rocksdb::ReadOptions &options, const rocksdb::Slice &key, std::string *value);
[[nodiscard]] rocksdb::Status Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Slice &key, std::string *value);
void MultiGet(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family, size_t num_keys,
const rocksdb::Slice *keys, rocksdb::PinnableSlice *values, rocksdb::Status *statuses);
rocksdb::Iterator *NewIterator(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family);
rocksdb::Iterator *NewIterator(const rocksdb::ReadOptions &options);

rocksdb::Status Write(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
[[nodiscard]] rocksdb::Status Write(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
const rocksdb::WriteOptions &DefaultWriteOptions() { return write_opts_; }
rocksdb::ReadOptions DefaultScanOptions() const;
rocksdb::ReadOptions DefaultMultiGetOptions() const;
rocksdb::Status Delete(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle,
const rocksdb::Slice &key);
rocksdb::Status DeleteRange(const std::string &first_key, const std::string &last_key);
rocksdb::Status FlushScripts(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle);
[[nodiscard]] rocksdb::Status Delete(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle,
const rocksdb::Slice &key);
[[nodiscard]] rocksdb::Status DeleteRange(const std::string &first_key, const std::string &last_key);
[[nodiscard]] rocksdb::Status FlushScripts(const rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle);
bool WALHasNewData(rocksdb::SequenceNumber seq) { return seq <= LatestSeqNumber(); }
Status InWALBoundary(rocksdb::SequenceNumber seq);
Status WriteToPropagateCF(const std::string &key, const std::string &value);

rocksdb::Status Compact(const rocksdb::Slice *begin, const rocksdb::Slice *end);
[[nodiscard]] rocksdb::Status Compact(const rocksdb::Slice *begin, const rocksdb::Slice *end);
rocksdb::DB *GetDB();
bool IsClosing() const { return db_closing_; }
std::string GetName() const { return config_->db_name; }
Expand Down
10 changes: 6 additions & 4 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@ rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key, BitmapMetadata *metadat
metadata->Encode(&old_metadata);
auto s = GetRawMetadata(ns_key, raw_value);
if (!s.ok()) return s;
metadata->Decode(*raw_value);
s = metadata->Decode(*raw_value);
if (!s.ok()) return s;

if (metadata->Expired()) {
metadata->Decode(old_metadata);
// error discarded here since it already failed
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound(kErrMsgKeyExpired);
}
if (metadata->Type() == kRedisString) return s;
if (metadata->Type() != kRedisBitmap && metadata->size > 0) {
metadata->Decode(old_metadata);
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::InvalidArgument(kErrMsgWrongType);
}
if (metadata->size == 0) {
metadata->Decode(old_metadata);
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no elements");
}
return s;
Expand Down
10 changes: 8 additions & 2 deletions src/types/redis_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ std::vector<rocksdb::Status> String::getRawValues(const std::vector<Slice> &keys
if (!statuses[i].ok()) continue;
(*raw_values)[i].assign(pin_values[i].data(), pin_values[i].size());
Metadata metadata(kRedisNone, false);
metadata.Decode((*raw_values)[i]);
auto s = metadata.Decode((*raw_values)[i]);
if (!s.ok()) {
(*raw_values)[i].clear();
statuses[i] = s;
continue;
}
if (metadata.Expired()) {
(*raw_values)[i].clear();
statuses[i] = rocksdb::Status::NotFound(kErrMsgKeyExpired);
Expand All @@ -73,7 +78,8 @@ rocksdb::Status String::getRawValue(const std::string &ns_key, std::string *raw_
if (!s.ok()) return s;

Metadata metadata(kRedisNone, false);
metadata.Decode(*raw_value);
s = metadata.Decode(*raw_value);
if (!s.ok()) return s;
if (metadata.Expired()) {
raw_value->clear();
return rocksdb::Status::NotFound(kErrMsgKeyExpired);
Expand Down
Loading

0 comments on commit 2b965b5

Please sign in to comment.