Skip to content

Commit

Permalink
Merge branch 'unstable' into impl-zrandmember
Browse files Browse the repository at this point in the history
  • Loading branch information
JxLi0921 authored Jan 14, 2024
2 parents 8b69810 + 9d656f1 commit dd2ca96
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 8 deletions.
71 changes: 69 additions & 2 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,72 @@ class CommandAnalyze : public Commander {
std::vector<std::string> command_args_;
};

class CommandReset : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
// 1. Discards the current MULTI transaction block, if one exists.
if (conn->IsFlagEnabled(Connection::kMultiExec)) {
conn->ResetMultiExec();
}
// 2. Unwatches all keys WATCHed by the connection.
srv->ResetWatchedKeys(conn);
// 3. Disables CLIENT TRACKING, if in use. (not yet supported)
// 4. Sets the connection to READWRITE mode.
// 5. Cancels the connection's ASKING mode, if previously set. (not yet supported)
// 6. Sets CLIENT REPLY to ON. (not yet supported)
// 9. Exits MONITOR mode, when applicable.
if (conn->IsFlagEnabled(Connection::kMonitor)) {
conn->Owner()->QuitMonitorConn(conn);
}
// 10. Aborts Pub/Sub's subscription state (SUBSCRIBE and PSUBSCRIBE), when appropriate.
if (conn->SubscriptionsCount() != 0) {
conn->UnsubscribeAll();
}
if (conn->PSubscriptionsCount() != 0) {
conn->PUnsubscribeAll();
}
// 11. Deauthenticates the connection, requiring a call AUTH to reauthenticate when authentication is enabled.
conn->SetNamespace(kDefaultNamespace);
conn->BecomeAdmin();
// 12. Turns off NO-EVICT / NO-TOUCH mode. (not yet supported)
*output = redis::SimpleString("RESET");
return Status::OK();
}
};

class CommandApplyBatch : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
raw_batch_ = args[1];
if (args.size() > 2) {
if (args.size() > 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
if (!util::EqualICase(args[2], "lowpri")) {
return {Status::RedisParseErr, "only support LOWPRI option"};
}
low_pri_ = true;
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
size_t size = raw_batch_.size();
auto options = svr->storage->DefaultWriteOptions();
options.low_pri = low_pri_;
auto s = svr->storage->ApplyWriteBatch(options, std::move(raw_batch_));
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::Integer(size);
return Status::OK();
}

private:
std::string raw_batch_;
bool low_pri_ = false;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading", 0, 0, 0),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0, 0, 0),
MakeCmdAttr<CommandSelect>("select", 2, "read-only", 0, 0, 0),
Expand Down Expand Up @@ -1257,6 +1323,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loadin
MakeCmdAttr<CommandSlaveOf>("slaveof", 3, "read-only exclusive no-script", 0, 0, 0),
MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0, 0, 0),
MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive", 0, 0, 0),
MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0, 0), )

MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0, 0),
MakeCmdAttr<CommandReset>("reset", -1, "multi pub-sub", 0, 0, 0),
MakeCmdAttr<CommandApplyBatch>("applybatch", -2, "write no-multi", 0, 0, 0), )
} // namespace redis
10 changes: 10 additions & 0 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,16 @@ void Worker::BecomeMonitorConn(redis::Connection *conn) {
conn->EnableFlag(redis::Connection::kMonitor);
}

void Worker::QuitMonitorConn(redis::Connection *conn) {
{
std::lock_guard<std::mutex> guard(conns_mu_);
monitor_conns_.erase(conn->GetFD());
conns_[conn->GetFD()] = conn;
}
srv->DecrMonitorClientNum();
conn->DisableFlag(redis::Connection::kMonitor);
}

void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) {
std::unique_lock<std::mutex> lock(conns_mu_);

Expand Down
1 change: 1 addition & 0 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
Status EnableWriteEvent(int fd);
Status Reply(int fd, const std::string &reply);
void BecomeMonitorConn(redis::Connection *conn);
void QuitMonitorConn(redis::Connection *conn);
void FeedMonitorConns(redis::Connection *conn, const std::string &response);

std::string GetClientsStr();
Expand Down
2 changes: 1 addition & 1 deletion src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ bool SubKeyFilter::IsMetadataExpired(const InternalKey &ikey, const Metadata &me
// `util::GetTimeStampMS() - 300000` means extending 5 minutes for expired items,
// to prevent them from being recycled once they reach the expiration time.
uint64_t lazy_expired_ts = util::GetTimeStampMS() - 300000;
return metadata.Type() == kRedisString // metadata key was overwrite by set command
return metadata.IsSingleKVType() // metadata key was overwrite by set command
|| metadata.ExpireAt(lazy_expired_ts) || ikey.GetVersion() != metadata.version;
}

Expand Down
3 changes: 1 addition & 2 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
return nullptr;
}

// The string/json type doesn't have sub keys
RedisType type = metadata_.Type();
if (type == kRedisNone || type == kRedisString || type == kRedisJson) {
if (type == kRedisNone || metadata_.IsSingleKVType()) {
return nullptr;
}

Expand Down
8 changes: 5 additions & 3 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -661,16 +661,18 @@ rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rock
}

Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return ApplyWriteBatch(write_opts_, std::move(raw_batch));
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
if (db_size_limit_reached_) {
return {Status::NotOK, "reach space limit"};
}

auto batch = rocksdb::WriteBatch(std::move(raw_batch));
auto s = db_->Write(write_opts_, &batch);
auto s = db_->Write(options, &batch);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class Storage {
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();

[[nodiscard]] rocksdb::Status Get(const rocksdb::ReadOptions &options, const rocksdb::Slice &key, std::string *value);
Expand Down
58 changes: 58 additions & 0 deletions tests/gocase/unit/applybatch/applybatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package applybatch

import (
"context"
"encoding/hex"
"testing"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/stretchr/testify/require"
)

func TestApplyBatch_Basic(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

t.Run("Make sure the apply batch command works", func(t *testing.T) {
// SET a 1
batch, err := hex.DecodeString("04000000000000000100000003013105010D0B5F5F6E616D6573706163656106010000000031")
require.NoError(t, err)
r := rdb.Do(ctx, "ApplyBatch", string(batch))
val, err := r.Int64()
require.NoError(t, err)
require.EqualValues(t, len(batch), val)
require.Equal(t, "1", rdb.Get(ctx, "a").Val())

// HSET hash field value
batch, err = hex.DecodeString("05000000000000000200000003013201210B5F5F6E616D65737061636500000004686173683076F331696342A76669656C640576616C75650501100B5F5F6E616D657370616365686173681102000000003076F331696342A700000002")
require.NoError(t, err)
r = rdb.Do(ctx, "ApplyBatch", string(batch))
val, err = r.Int64()
require.NoError(t, err)
require.EqualValues(t, len(batch), val)
require.Equal(t, "value", rdb.HGet(ctx, "hash", "field").Val())
})
}
67 changes: 67 additions & 0 deletions tests/gocase/unit/reset/reset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package reset

import (
"context"
"fmt"
"testing"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/stretchr/testify/require"
)

func TestReset(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

t.Run("reset with ongoing txn", func(t *testing.T) {
require.NoError(t, rdb.Set(ctx, "x", "30", 0).Err())
require.NoError(t, rdb.Do(ctx, "multi").Err())
require.NoError(t, rdb.Set(ctx, "x", "40", 0).Err())
require.NoError(t, rdb.Do(ctx, "reset").Err())

v1 := rdb.Do(ctx, "get", "x").Val()
require.Equal(t, "30", fmt.Sprintf("%v", v1))
})

t.Run("unwatch keys", func(t *testing.T) {
require.NoError(t, rdb.Set(ctx, "x", 30, 0).Err())
require.NoError(t, rdb.Do(ctx, "watch", "x").Err())
require.NoError(t, rdb.Do(ctx, "multi").Err())
require.NoError(t, rdb.Do(ctx, "ping").Err())
require.NoError(t, rdb.Do(ctx, "reset").Err())

require.NoError(t, rdb.Set(ctx, "x", 40, 0).Err())
require.NoError(t, rdb.Do(ctx, "multi").Err())
require.NoError(t, rdb.Do(ctx, "ping").Err())
require.Equal(t, rdb.Do(ctx, "exec").Val(), []interface{}{"PONG"})
})

t.Run("unsub and punsub", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "subscribe", "chan1").Err())
require.NoError(t, rdb.Do(ctx, "reset").Err())
require.Equal(t, rdb.Do(ctx, "subscribe", "chan2").Val(), []interface{}{"subscribe", "chan2", (int64)(1)})
})
}

0 comments on commit dd2ca96

Please sign in to comment.