Skip to content

Commit

Permalink
Merge branch 'apache:unstable' into add-BZMPOP
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangsx-1 authored Jun 13, 2023
2 parents e85da52 + e89af52 commit bbd38d6
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 6 deletions.
25 changes: 24 additions & 1 deletion src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "commander.h"
#include "commands/scan_base.h"
#include "common/io_util.h"
#include "config/config.h"
#include "error_constants.h"
#include "server/redis_connection.h"
Expand Down Expand Up @@ -867,6 +868,24 @@ class CommandFlushBackup : public Commander {

class CommandSlaveOf : public Commander {
public:
static Status IsTryingToReplicateItself(Server *svr, const std::string &host, uint32_t port) {
auto ip_addresses = util::LookupHostByName(host);
if (!ip_addresses) {
return {Status::NotOK, "Can not resolve hostname: " + host};
}
for (auto &ip : *ip_addresses) {
if (util::MatchListeningIP(svr->GetConfig()->binds, ip) && port == svr->GetConfig()->port) {
return {Status::NotOK, "can't replicate itself"};
}
for (std::pair<std::string, uint32_t> &host_port_pair : svr->GetSlaveHostAndPort()) {
if (host_port_pair.first == ip && host_port_pair.second == port) {
return {Status::NotOK, "can't replicate your own replicas"};
}
}
}
return Status::OK();
}

Status Parse(const std::vector<std::string> &args) override {
host_ = args[1];
const auto &port = args[2];
Expand Down Expand Up @@ -914,7 +933,11 @@ class CommandSlaveOf : public Commander {
return Status::OK();
}

auto s = svr->AddMaster(host_, port_, false);
auto s = IsTryingToReplicateItself(svr, host_, port_);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
s = svr->AddMaster(host_, port_, false);
if (s.IsOK()) {
*output = redis::SimpleString("OK");
LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr()
Expand Down
69 changes: 69 additions & 0 deletions src/commands/cmd_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*
*/

#include <cstdint>

#include "commander.h"
#include "commands/scan_base.h"
#include "error_constants.h"
Expand Down Expand Up @@ -292,6 +294,72 @@ class CommandSInter : public Commander {
}
};

/*
* description:
* syntax: `SINTERCARD numkeys key [key ...] [LIMIT limit]`
*
* limit: the valid limit is an non-negative integer.
*/
class CommandSInterCard : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_numkey = ParseInt<int>(args[1], 10);
if (!parse_numkey) {
return {Status::RedisParseErr, errValueNotInteger};
}

if (*parse_numkey <= 0) {
return {Status::RedisParseErr, errValueMustBePositive};
}
numkeys_ = *parse_numkey;

// command: for example, SINTERCARD 2 key1 key2 LIMIT 1
auto arg_sz = args.size();
if (arg_sz == numkeys_ + 4 && util::ToLower(args[numkeys_ + 2]) == "limit") {
auto parse_limit = ParseInt<int>(args[numkeys_ + 3], 10);
if (!parse_limit) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (*parse_limit < 0) {
return {Status::RedisParseErr, errLimitIsNegative};
}
limit_ = *parse_limit;
return Commander::Parse(args);
}

if (arg_sz != numkeys_ + 2) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
std::vector<Slice> keys;
for (size_t i = 2; i < numkeys_ + 2; i++) {
keys.emplace_back(args_[i]);
}

redis::Set set_db(svr->storage, conn->GetNamespace());
uint64_t ret = 0;
auto s = set_db.InterCard(keys, limit_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::Integer(ret);
return Status::OK();
}

static CommandKeyRange Range(const std::vector<std::string> &args) {
int num_key = *ParseInt<int>(args[1], 10);
return {2, 1 + num_key, 1};
}

private:
uint64_t numkeys_ = 0;
uint64_t limit_ = 0;
};

class CommandSDiffStore : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -380,6 +448,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandSAdd>("sadd", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandSDiff>("sdiff", -2, "read-only", 1, -1, 1),
MakeCmdAttr<CommandSUnion>("sunion", -2, "read-only", 1, -1, 1),
MakeCmdAttr<CommandSInter>("sinter", -2, "read-only", 1, -1, 1),
MakeCmdAttr<CommandSInterCard>("sintercard", -3, "read-only", CommandSInterCard::Range),
MakeCmdAttr<CommandSDiffStore>("sdiffstore", -3, "write", 1, -1, 1),
MakeCmdAttr<CommandSUnionStore>("sunionstore", -3, "write", 1, -1, 1),
MakeCmdAttr<CommandSInterStore>("sinterstore", -3, "write", 1, -1, 1),
Expand Down
1 change: 1 addition & 0 deletions src/commands/error_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ inline constexpr const char *errNoSuchKey = "no such key";
inline constexpr const char *errUnbalancedStreamList =
"Unbalanced XREAD list of streams: for each stream key an ID or '$' must be specified.";
inline constexpr const char *errTimeoutIsNegative = "timeout is negative";
inline constexpr const char *errLimitIsNegative = "LIMIT can't be negative";
inline constexpr const char *errLimitOptionNotAllowed =
"syntax error, LIMIT cannot be used without the special ~ option";
inline constexpr const char *errZSetLTGTNX = "GT, LT, and/or NX options at the same time are not compatible";
Expand Down
27 changes: 27 additions & 0 deletions src/common/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,33 @@ Status SockSetTcpKeepalive(int fd, int interval) {
return Status::OK();
}

// Lookup IP addresses by hostname
StatusOr<std::vector<std::string>> LookupHostByName(const std::string &host) {
addrinfo hints = {}, *servinfo = nullptr;

hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;

if (int rv = getaddrinfo(host.c_str(), nullptr, &hints, &servinfo); rv != 0) {
return {Status::NotOK, gai_strerror(rv)};
}

auto exit = MakeScopeExit([servinfo] { freeaddrinfo(servinfo); });

std::vector<std::string> ips;
for (auto p = servinfo; p != nullptr; p = p->ai_next) {
char ip[INET6_ADDRSTRLEN] = {};
if (p->ai_family == AF_INET) {
inet_ntop(p->ai_family, &((struct sockaddr_in *)p->ai_addr)->sin_addr, ip, sizeof(ip));
} else {
inet_ntop(p->ai_family, &((struct sockaddr_in6 *)p->ai_addr)->sin6_addr, ip, sizeof(ip));
}
ips.emplace_back(ip);
}

return ips;
}

StatusOr<int> SockConnect(const std::string &host, uint32_t port, int conn_timeout, int timeout) {
addrinfo hints = {}, *servinfo = nullptr;

Expand Down
1 change: 1 addition & 0 deletions src/common/io_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace util {

sockaddr_in NewSockaddrInet(const std::string &host, uint32_t port);
StatusOr<std::vector<std::string>> LookupHostByName(const std::string &host);
StatusOr<int> SockConnect(const std::string &host, uint32_t port, int conn_timeout = 0, int timeout = 0);
Status SockSetTcpNoDelay(int fd, int val);
Status SockSetTcpKeepalive(int fd, int interval);
Expand Down
3 changes: 2 additions & 1 deletion src/common/task_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ Status TaskRunner::Join() {

for (auto &thread : threads_) {
if (auto s = util::ThreadJoin(thread); !s) {
return s.Prefixed("Task thread operation failed");
LOG(WARNING) << "Failed to join thread: " << s.Msg();
continue;
}
}

Expand Down
19 changes: 16 additions & 3 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,15 @@ void Server::Join() {
worker->Join();
}

if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << s.Msg();
}
if (auto s = util::ThreadJoin(cron_thread_); !s) {
LOG(WARNING) << "Cron thread operation failed: " << s.Msg();
}
if (auto s = util::ThreadJoin(compaction_checker_thread_); !s) {
LOG(WARNING) << "Compaction checker thread operation failed: " << s.Msg();
}
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << s.Msg();
}
}

Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reconnect) {
Expand Down Expand Up @@ -1751,3 +1751,16 @@ void Server::ResetWatchedKeys(redis::Connection *conn) {
watched_key_size_ = watched_key_map_.size();
}
}

std::list<std::pair<std::string, uint32_t>> Server::GetSlaveHostAndPort() {
std::list<std::pair<std::string, uint32_t>> result;
slave_threads_mu_.lock();
for (const auto &slave : slave_threads_) {
if (slave->IsStopped()) continue;
std::pair<std::string, int> host_port_pair = {slave->GetConn()->GetAnnounceIP(),
slave->GetConn()->GetListeningPort()};
result.emplace_back(host_port_pair);
}
slave_threads_mu_.unlock();
return result;
}
1 change: 1 addition & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class Server {
void WatchKey(redis::Connection *conn, const std::vector<std::string> &keys);
static bool IsWatchedKeysModified(redis::Connection *conn);
void ResetWatchedKeys(redis::Connection *conn);
std::list<std::pair<std::string, uint32_t>> GetSlaveHostAndPort();

#ifdef ENABLE_OPENSSL
UniqueSSLContext ssl_ctx;
Expand Down
46 changes: 46 additions & 0 deletions src/types/redis_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,52 @@ rocksdb::Status Set::Inter(const std::vector<Slice> &keys, std::vector<std::stri
return rocksdb::Status::OK();
}

rocksdb::Status Set::InterCard(const std::vector<Slice> &keys, uint64_t limit, uint64_t *cardinality) {
*cardinality = 0;

std::map<std::string, size_t> member_counters;
std::vector<std::string> target_members;

auto s = Members(keys[0], &target_members);
if (!s.ok() || target_members.empty()) return s;
for (const auto &member : target_members) {
member_counters[member] = 1;
}
if (limit == 0) {
limit = target_members.size();
}

size_t keys_size = keys.size();
if (keys_size == 1) {
*cardinality = std::min(static_cast<uint64_t>(target_members.size()), limit);
return rocksdb::Status::OK();
}

bool limit_reached = false;
for (size_t i = 1; i < keys_size; i++) {
s = Members(keys[i], &target_members);
if (!s.ok() || target_members.empty()) {
return s;
}

for (const auto &member : target_members) {
auto iter = member_counters.find(member);
if (iter == member_counters.end()) continue;
if (++iter->second == keys_size) {
*cardinality += 1;
if (--limit == 0) {
limit_reached = true;
break;
}
}
}

if (limit_reached) break;
}

return rocksdb::Status::OK();
}

rocksdb::Status Set::DiffStore(const Slice &dst, const std::vector<Slice> &keys, uint64_t *saved_cnt) {
*saved_cnt = 0;
std::vector<std::string> members;
Expand Down
1 change: 1 addition & 0 deletions src/types/redis_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Set : public SubKeyScanner {
rocksdb::Status Diff(const std::vector<Slice> &keys, std::vector<std::string> *members);
rocksdb::Status Union(const std::vector<Slice> &keys, std::vector<std::string> *members);
rocksdb::Status Inter(const std::vector<Slice> &keys, std::vector<std::string> *members);
rocksdb::Status InterCard(const std::vector<Slice> &keys, uint64_t limit, uint64_t *cardinality);
rocksdb::Status Overwrite(Slice user_key, const std::vector<std::string> &members);
rocksdb::Status DiffStore(const Slice &dst, const std::vector<Slice> &keys, uint64_t *saved_cnt);
rocksdb::Status UnionStore(const Slice &dst, const std::vector<Slice> &keys, uint64_t *save_cnt);
Expand Down
2 changes: 2 additions & 0 deletions tests/cppunit/task_runner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ TEST(TaskRunner, PublishAfterStart) {
std::this_thread::sleep_for(0.1s);

ASSERT_EQ(counter, 20);
tr.Cancel();
_ = tr.Join();
}
45 changes: 44 additions & 1 deletion tests/cppunit/types/set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,62 @@ TEST_F(RedisSetTest, Union) {

TEST_F(RedisSetTest, Inter) {
uint64_t ret = 0;
std::string k1 = "key1", k2 = "key2", k3 = "key3";
std::string k1 = "key1", k2 = "key2", k3 = "key3", k4 = "key4", k5 = "key5";
rocksdb::Status s = set_->Add(k1, {"a", "b", "c", "d"}, &ret);
EXPECT_EQ(ret, 4);
set_->Add(k2, {"c"}, &ret);
EXPECT_EQ(ret, 1);
set_->Add(k3, {"a", "c", "e"}, &ret);
EXPECT_EQ(ret, 3);
set_->Add(k5, {"a"}, &ret);
EXPECT_EQ(ret, 1);
std::vector<std::string> members;
set_->Inter({k1, k2, k3}, &members);
EXPECT_EQ(1, members.size());
members.clear();
set_->Inter({k1, k2, k4}, &members);
EXPECT_EQ(0, members.size());
set_->Inter({k1, k4, k5}, &members);
EXPECT_EQ(0, members.size());
set_->Del(k1);
set_->Del(k2);
set_->Del(k3);
set_->Del(k4);
set_->Del(k5);
}

TEST_F(RedisSetTest, InterCard) {
uint64_t ret = 0;
std::string k1 = "key1", k2 = "key2", k3 = "key3", k4 = "key4";
rocksdb::Status s = set_->Add(k1, {"a", "b", "c", "d"}, &ret);
EXPECT_EQ(ret, 4);
set_->Add(k2, {"c", "d", "e"}, &ret);
EXPECT_EQ(ret, 3);
set_->Add(k3, {"e", "f"}, &ret);
EXPECT_EQ(ret, 2);
set_->InterCard({k1, k2}, 0, &ret);
EXPECT_EQ(ret, 2);
set_->InterCard({k1, k2}, 1, &ret);
EXPECT_EQ(ret, 1);
set_->InterCard({k1, k2}, 3, &ret);
EXPECT_EQ(ret, 2);
set_->InterCard({k2, k3}, 1, &ret);
EXPECT_EQ(ret, 1);
set_->InterCard({k1, k3}, 5, &ret);
EXPECT_EQ(ret, 0);
set_->InterCard({k1, k4}, 5, &ret);
EXPECT_EQ(ret, 0);
set_->InterCard({k1}, 0, &ret);
EXPECT_EQ(ret, 4);
for (uint32_t i = 1; i < 20; i++) {
set_->InterCard({k1}, i, &ret);
uint64_t val = (i >= 4) ? 4 : i;
EXPECT_EQ(ret, val);
}
set_->Del(k1);
set_->Del(k2);
set_->Del(k3);
set_->Del(k4);
}

TEST_F(RedisSetTest, Overwrite) {
Expand Down
Loading

0 comments on commit bbd38d6

Please sign in to comment.