From 5cbc123ee05f0b6dbd95b504ae262d00d03549be Mon Sep 17 00:00:00 2001 From: Nuo Xu <44099579+chrisxu333@users.noreply.github.com> Date: Sat, 13 Jan 2024 19:26:19 -0800 Subject: [PATCH 1/3] Add support of new command: RESET (#1999) Co-authored-by: hulk --- src/commands/cmd_server.cc | 37 ++++++++++++++- src/server/worker.cc | 10 ++++ src/server/worker.h | 1 + tests/gocase/unit/reset/reset_test.go | 67 +++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 tests/gocase/unit/reset/reset_test.go diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index d94d81e73ec..d921c7666c1 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1222,6 +1222,39 @@ class CommandAnalyze : public Commander { std::vector command_args_; }; +class CommandReset : public Commander { + public: + Status Execute(Server *srv, Connection *conn, std::string *output) override { + // 1. Discards the current MULTI transaction block, if one exists. + if (conn->IsFlagEnabled(Connection::kMultiExec)) { + conn->ResetMultiExec(); + } + // 2. Unwatches all keys WATCHed by the connection. + srv->ResetWatchedKeys(conn); + // 3. Disables CLIENT TRACKING, if in use. (not yet supported) + // 4. Sets the connection to READWRITE mode. + // 5. Cancels the connection's ASKING mode, if previously set. (not yet supported) + // 6. Sets CLIENT REPLY to ON. (not yet supported) + // 9. Exits MONITOR mode, when applicable. + if (conn->IsFlagEnabled(Connection::kMonitor)) { + conn->Owner()->QuitMonitorConn(conn); + } + // 10. Aborts Pub/Sub's subscription state (SUBSCRIBE and PSUBSCRIBE), when appropriate. + if (conn->SubscriptionsCount() != 0) { + conn->UnsubscribeAll(); + } + if (conn->PSubscriptionsCount() != 0) { + conn->PUnsubscribeAll(); + } + // 11. Deauthenticates the connection, requiring a call AUTH to reauthenticate when authentication is enabled. + conn->SetNamespace(kDefaultNamespace); + conn->BecomeAdmin(); + // 12. Turns off NO-EVICT / NO-TOUCH mode. (not yet supported) + *output = redis::SimpleString("RESET"); + return Status::OK(); + } +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loading", 0, 0, 0), MakeCmdAttr("ping", -1, "read-only", 0, 0, 0), MakeCmdAttr("select", 2, "read-only", 0, 0, 0), @@ -1257,6 +1290,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loadin MakeCmdAttr("slaveof", 3, "read-only exclusive no-script", 0, 0, 0), MakeCmdAttr("stats", 1, "read-only", 0, 0, 0), MakeCmdAttr("rdb", -3, "write exclusive", 0, 0, 0), - MakeCmdAttr("analyze", -1, "", 0, 0, 0), ) - + MakeCmdAttr("analyze", -1, "", 0, 0, 0), + MakeCmdAttr("reset", -1, "multi pub-sub", 0, 0, 0), ) } // namespace redis diff --git a/src/server/worker.cc b/src/server/worker.cc index 47042d030b7..1e8fed37441 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -464,6 +464,16 @@ void Worker::BecomeMonitorConn(redis::Connection *conn) { conn->EnableFlag(redis::Connection::kMonitor); } +void Worker::QuitMonitorConn(redis::Connection *conn) { + { + std::lock_guard guard(conns_mu_); + monitor_conns_.erase(conn->GetFD()); + conns_[conn->GetFD()] = conn; + } + srv->DecrMonitorClientNum(); + conn->DisableFlag(redis::Connection::kMonitor); +} + void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) { std::unique_lock lock(conns_mu_); diff --git a/src/server/worker.h b/src/server/worker.h index a9f618e57f5..b6918ba9296 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -62,6 +62,7 @@ class Worker : EventCallbackBase, EvconnlistenerBase { Status EnableWriteEvent(int fd); Status Reply(int fd, const std::string &reply); void BecomeMonitorConn(redis::Connection *conn); + void QuitMonitorConn(redis::Connection *conn); void FeedMonitorConns(redis::Connection *conn, const std::string &response); std::string GetClientsStr(); diff --git a/tests/gocase/unit/reset/reset_test.go b/tests/gocase/unit/reset/reset_test.go new file mode 100644 index 00000000000..9d16edb13a9 --- /dev/null +++ b/tests/gocase/unit/reset/reset_test.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package reset + +import ( + "context" + "fmt" + "testing" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/stretchr/testify/require" +) + +func TestReset(t *testing.T) { + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + t.Run("reset with ongoing txn", func(t *testing.T) { + require.NoError(t, rdb.Set(ctx, "x", "30", 0).Err()) + require.NoError(t, rdb.Do(ctx, "multi").Err()) + require.NoError(t, rdb.Set(ctx, "x", "40", 0).Err()) + require.NoError(t, rdb.Do(ctx, "reset").Err()) + + v1 := rdb.Do(ctx, "get", "x").Val() + require.Equal(t, "30", fmt.Sprintf("%v", v1)) + }) + + t.Run("unwatch keys", func(t *testing.T) { + require.NoError(t, rdb.Set(ctx, "x", 30, 0).Err()) + require.NoError(t, rdb.Do(ctx, "watch", "x").Err()) + require.NoError(t, rdb.Do(ctx, "multi").Err()) + require.NoError(t, rdb.Do(ctx, "ping").Err()) + require.NoError(t, rdb.Do(ctx, "reset").Err()) + + require.NoError(t, rdb.Set(ctx, "x", 40, 0).Err()) + require.NoError(t, rdb.Do(ctx, "multi").Err()) + require.NoError(t, rdb.Do(ctx, "ping").Err()) + require.Equal(t, rdb.Do(ctx, "exec").Val(), []interface{}{"PONG"}) + }) + + t.Run("unsub and punsub", func(t *testing.T) { + require.NoError(t, rdb.Do(ctx, "subscribe", "chan1").Err()) + require.NoError(t, rdb.Do(ctx, "reset").Err()) + require.Equal(t, rdb.Do(ctx, "subscribe", "chan2").Val(), []interface{}{"subscribe", "chan2", (int64)(1)}) + }) +} From 7cbf0a577859da1f9e41364a56931a1402f02145 Mon Sep 17 00:00:00 2001 From: Twice Date: Sun, 14 Jan 2024 14:53:01 +0900 Subject: [PATCH 2/3] Replace manual RedisType check by calling IsSingleKVType (#2013) --- src/storage/compact_filter.cc | 2 +- src/storage/iterator.cc | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc index 91c6751a0a8..5353608eea9 100644 --- a/src/storage/compact_filter.cc +++ b/src/storage/compact_filter.cc @@ -91,7 +91,7 @@ bool SubKeyFilter::IsMetadataExpired(const InternalKey &ikey, const Metadata &me // `util::GetTimeStampMS() - 300000` means extending 5 minutes for expired items, // to prevent them from being recycled once they reach the expiration time. uint64_t lazy_expired_ts = util::GetTimeStampMS() - 300000; - return metadata.Type() == kRedisString // metadata key was overwrite by set command + return metadata.IsSingleKVType() // metadata key was overwrite by set command || metadata.ExpireAt(lazy_expired_ts) || ikey.GetVersion() != metadata.version; } diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc index 12238ceafc2..58e283b165c 100644 --- a/src/storage/iterator.cc +++ b/src/storage/iterator.cc @@ -103,9 +103,8 @@ std::unique_ptr DBIterator::GetSubKeyIterator() const { return nullptr; } - // The string/json type doesn't have sub keys RedisType type = metadata_.Type(); - if (type == kRedisNone || type == kRedisString || type == kRedisJson) { + if (type == kRedisNone || metadata_.IsSingleKVType()) { return nullptr; } From 9d656f1476583651e2fae20ed1c37cf35d0679ca Mon Sep 17 00:00:00 2001 From: Myth Date: Sun, 14 Jan 2024 15:59:07 +0800 Subject: [PATCH 3/3] Add the ApplyBatch command for data migration scenario (#2010) Co-authored-by: git-hulk --- src/commands/cmd_server.cc | 36 +++++++++++- src/storage/storage.cc | 8 ++- src/storage/storage.h | 1 + .../gocase/unit/applybatch/applybatch_test.go | 58 +++++++++++++++++++ 4 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 tests/gocase/unit/applybatch/applybatch_test.go diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index d921c7666c1..e0700af8829 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1255,6 +1255,39 @@ class CommandReset : public Commander { } }; +class CommandApplyBatch : public Commander { + public: + Status Parse(const std::vector &args) override { + raw_batch_ = args[1]; + if (args.size() > 2) { + if (args.size() > 3) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + if (!util::EqualICase(args[2], "lowpri")) { + return {Status::RedisParseErr, "only support LOWPRI option"}; + } + low_pri_ = true; + } + return Commander::Parse(args); + } + + Status Execute(Server *svr, Connection *conn, std::string *output) override { + size_t size = raw_batch_.size(); + auto options = svr->storage->DefaultWriteOptions(); + options.low_pri = low_pri_; + auto s = svr->storage->ApplyWriteBatch(options, std::move(raw_batch_)); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + *output = redis::Integer(size); + return Status::OK(); + } + + private: + std::string raw_batch_; + bool low_pri_ = false; +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loading", 0, 0, 0), MakeCmdAttr("ping", -1, "read-only", 0, 0, 0), MakeCmdAttr("select", 2, "read-only", 0, 0, 0), @@ -1291,5 +1324,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loadin MakeCmdAttr("stats", 1, "read-only", 0, 0, 0), MakeCmdAttr("rdb", -3, "write exclusive", 0, 0, 0), MakeCmdAttr("analyze", -1, "", 0, 0, 0), - MakeCmdAttr("reset", -1, "multi pub-sub", 0, 0, 0), ) + MakeCmdAttr("reset", -1, "multi pub-sub", 0, 0, 0), + MakeCmdAttr("applybatch", -2, "write no-multi", 0, 0, 0), ) } // namespace redis diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 08e79076974..a600813fae8 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -661,16 +661,18 @@ rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rock } Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) { + return ApplyWriteBatch(write_opts_, std::move(raw_batch)); +} + +Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) { if (db_size_limit_reached_) { return {Status::NotOK, "reach space limit"}; } - auto batch = rocksdb::WriteBatch(std::move(raw_batch)); - auto s = db_->Write(write_opts_, &batch); + auto s = db_->Write(options, &batch); if (!s.ok()) { return {Status::NotOK, s.ToString()}; } - return Status::OK(); } diff --git a/src/storage/storage.h b/src/storage/storage.h index f0134cbfac1..7e37d82f7ea 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -132,6 +132,7 @@ class Storage { Status RestoreFromCheckpoint(); Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr *iter); Status ReplicaApplyWriteBatch(std::string &&raw_batch); + Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch); rocksdb::SequenceNumber LatestSeqNumber(); [[nodiscard]] rocksdb::Status Get(const rocksdb::ReadOptions &options, const rocksdb::Slice &key, std::string *value); diff --git a/tests/gocase/unit/applybatch/applybatch_test.go b/tests/gocase/unit/applybatch/applybatch_test.go new file mode 100644 index 00000000000..275b8663bf9 --- /dev/null +++ b/tests/gocase/unit/applybatch/applybatch_test.go @@ -0,0 +1,58 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. + */ + +package applybatch + +import ( + "context" + "encoding/hex" + "testing" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/stretchr/testify/require" +) + +func TestApplyBatch_Basic(t *testing.T) { + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + t.Run("Make sure the apply batch command works", func(t *testing.T) { + // SET a 1 + batch, err := hex.DecodeString("04000000000000000100000003013105010D0B5F5F6E616D6573706163656106010000000031") + require.NoError(t, err) + r := rdb.Do(ctx, "ApplyBatch", string(batch)) + val, err := r.Int64() + require.NoError(t, err) + require.EqualValues(t, len(batch), val) + require.Equal(t, "1", rdb.Get(ctx, "a").Val()) + + // HSET hash field value + batch, err = hex.DecodeString("05000000000000000200000003013201210B5F5F6E616D65737061636500000004686173683076F331696342A76669656C640576616C75650501100B5F5F6E616D657370616365686173681102000000003076F331696342A700000002") + require.NoError(t, err) + r = rdb.Do(ctx, "ApplyBatch", string(batch)) + val, err = r.Int64() + require.NoError(t, err) + require.EqualValues(t, len(batch), val) + require.Equal(t, "value", rdb.HGet(ctx, "hash", "field").Val()) + }) +}