Skip to content

Commit

Permalink
Implement the RESP3 null for the nil string and array
Browse files Browse the repository at this point in the history
For the null type in RESP2, the nil string is represented by `$-1\r\n`
and nil array is `*-1\r\n`. But after RESP3, they will be reduced into
an unify string `_\r\n`. So in this PR, most of them are renamed
redis::MultiBulkString/Redis::NilString to connection::MultiBulkString
and connection::NilString.
  • Loading branch information
git-hulk committed Jan 14, 2024
1 parent 9d656f1 commit 3df2070
Show file tree
Hide file tree
Showing 32 changed files with 364 additions and 242 deletions.
16 changes: 8 additions & 8 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ void ReplicationThread::run() {
}

ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"AUTH", srv_->GetConfig()->masterauth}));
SendString(bev, redis::Array2RESP({"AUTH", srv_->GetConfig()->masterauth}));
LOG(INFO) << "[replication] Auth request was sent, waiting for response";
repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
return CBState::NEXT;
Expand All @@ -418,7 +418,7 @@ ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) { //
}

ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"_db_name"}));
SendString(bev, redis::Array2RESP({"_db_name"}));
repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
LOG(INFO) << "[replication] Check db name request was sent, waiting for response";
return CBState::NEXT;
Expand Down Expand Up @@ -456,7 +456,7 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev)
data_to_send.emplace_back("ip-address");
data_to_send.emplace_back(config->replica_announce_ip);
}
SendString(bev, redis::MultiBulkString(data_to_send));
SendString(bev, redis::Array2RESP(data_to_send));
repl_state_.store(kReplReplConf, std::memory_order_relaxed);
LOG(INFO) << "[replication] replconf request was sent, waiting for response";
return CBState::NEXT;
Expand Down Expand Up @@ -513,11 +513,11 @@ ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev)
// Also use old PSYNC if replica can't find replication id from WAL and DB.
if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || replid.length() != kReplIdLength) {
next_try_old_psync_ = false; // Reset next_try_old_psync_
SendString(bev, redis::MultiBulkString({"PSYNC", std::to_string(next_seq)}));
SendString(bev, redis::Array2RESP({"PSYNC", std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
} else {
// NEW PSYNC "Unique Replication Sequence ID": replication id and sequence id
SendString(bev, redis::MultiBulkString({"PSYNC", replid, std::to_string(next_seq)}));
SendString(bev, redis::Array2RESP({"PSYNC", replid, std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use new psync, current unique replication sequence id: " << replid << ":"
<< cur_seq;
}
Expand Down Expand Up @@ -607,7 +607,7 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
}

ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"_fetch_meta"}));
SendString(bev, redis::Array2RESP({"_fetch_meta"}));
repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
LOG(INFO) << "[replication] Start syncing data with fullsync";
return CBState::NEXT;
Expand Down Expand Up @@ -835,7 +835,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) {
std::string auth = srv_->GetConfig()->masterauth;
if (!auth.empty()) {
UniqueEvbuf evbuf;
const auto auth_command = redis::MultiBulkString({"AUTH", auth});
const auto auth_command = redis::Array2RESP({"AUTH", auth});
auto s = util::SockSend(sock_fd, auth_command, ssl);
if (!s.IsOK()) return s.Prefixed("send auth command err");
while (true) {
Expand Down Expand Up @@ -921,7 +921,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const
}
files_str.pop_back();

const auto fetch_command = redis::MultiBulkString({"_fetch_file", files_str});
const auto fetch_command = redis::Array2RESP({"_fetch_file", files_str});
auto s = util::SockSend(sock_fd, fetch_command, ssl);
if (!s.IsOK()) return s.Prefixed("send fetch file command");

Expand Down
26 changes: 12 additions & 14 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ void SlotMigrator::clean() {
}

Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
std::string cmd = redis::MultiBulkString({"auth", password}, false);
std::string cmd = redis::Array2RESP({"auth", password});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send AUTH command");
Expand All @@ -455,8 +455,7 @@ Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};

std::string cmd =
redis::MultiBulkString({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)});
std::string cmd = redis::Array2RESP({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send command to the destination node");
Expand Down Expand Up @@ -666,7 +665,7 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
}
*restore_cmds += redis::MultiBulkString(command, false);
*restore_cmds += redis::Array2RESP(command);
current_pipeline_size_++;

// Check whether pipeline needs to be sent
Expand Down Expand Up @@ -747,7 +746,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
if (metadata.Type() != kRedisBitmap) {
item_count++;
if (item_count >= kMaxItemsInCommand) {
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::Array2RESP(user_cmd);
current_pipeline_size_++;
item_count = 0;
// Have to clear saved items
Expand All @@ -764,13 +763,13 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata

// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::Array2RESP(user_cmd);
current_pipeline_size_++;
}

// Add TTL for complex key
if (metadata.expire > 0) {
*restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += redis::Array2RESP({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}

Expand Down Expand Up @@ -809,7 +808,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad
if (!s.IsOK()) {
return s;
}
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::Array2RESP(user_cmd);
current_pipeline_size_++;

user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());
Expand All @@ -822,15 +821,14 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad

// commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration
// XSETID is used to adjust stream's info on the destination node according to the current values on the source
*restore_cmds += redis::MultiBulkString(
{"XSETID", key.ToString(), metadata.last_generated_id.ToString(), "ENTRIESADDED",
std::to_string(metadata.entries_added), "MAXDELETEDID", metadata.max_deleted_entry_id.ToString()},
false);
*restore_cmds += redis::Array2RESP({"XSETID", key.ToString(), metadata.last_generated_id.ToString(), "ENTRIESADDED",
std::to_string(metadata.entries_added), "MAXDELETEDID",
metadata.max_deleted_entry_id.ToString()});
current_pipeline_size_++;

// Add TTL
if (metadata.expire > 0) {
*restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += redis::Array2RESP({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}

Expand Down Expand Up @@ -862,7 +860,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr<
uint32_t offset = (index * 8) + (byte_idx * 8) + bit_idx;
user_cmd->emplace_back(std::to_string(offset));
user_cmd->emplace_back("1");
*restore_cmds += redis::MultiBulkString(*user_cmd, false);
*restore_cmds += redis::Array2RESP(*user_cmd);
current_pipeline_size_++;
user_cmd->erase(user_cmd->begin() + 2, user_cmd->end());
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/sync_migrate_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) {
void SyncMigrateContext::TimerCB(int, int16_t events) {
auto &&slot_migrator = srv_->slot_migrator;

conn_->Reply(redis::NilString());
conn_->Reply(conn_->NilString());
timer_.reset();

slot_migrator->CancelSyncCtx();
Expand Down
6 changes: 3 additions & 3 deletions src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class BlockingCommander : public Commander,
private EventCallbackBase<BlockingCommander> {
public:
// method to reply when no operation happens
virtual std::string NoopReply() = 0;
virtual std::string NoopReply(const Connection *conn) = 0;

// method to block keys
virtual void BlockKeys() = 0;
Expand All @@ -48,7 +48,7 @@ class BlockingCommander : public Commander,
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
if (conn_->IsInExec()) {
*output = NoopReply();
*output = NoopReply(conn_);
return Status::OK(); // no blocking in multi-exec
}

Expand Down Expand Up @@ -111,7 +111,7 @@ class BlockingCommander : public Commander,
}

void TimerCB(int, int16_t) {
conn_->Reply(NoopReply());
conn_->Reply(NoopReply(conn_));
timer_.reset();
UnblockKeys();
auto bev = conn_->GetBufferEvent();
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class CommandBitfield : public Commander {
str_rets[i] = redis::Integer(rets[i]->Value());
}
} else {
str_rets[i] = redis::NilString();
str_rets[i] = conn->NilString();
}
}
*output = redis::Array(str_rets);
Expand Down
4 changes: 2 additions & 2 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class CommandBFInfo : public Commander {
*output += redis::SimpleString("Number of items inserted");
*output += redis::Integer(info.size);
*output += redis::SimpleString("Expansion rate");
*output += info.expansion == 0 ? redis::NilString() : redis::Integer(info.expansion);
*output += info.expansion == 0 ? conn->NilString() : redis::Integer(info.expansion);
break;
case BloomInfoType::kCapacity:
*output = redis::Integer(info.capacity);
Expand All @@ -360,7 +360,7 @@ class CommandBFInfo : public Commander {
*output = redis::Integer(info.size);
break;
case BloomInfoType::kExpansion:
*output = info.expansion == 0 ? redis::NilString() : redis::Integer(info.expansion);
*output = info.expansion == 0 ? conn->NilString() : redis::Integer(info.expansion);
break;
}

Expand Down
28 changes: 14 additions & 14 deletions src/commands/cmd_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class CommandGeoDist : public CommandGeoBase {
}

if (s.IsNotFound()) {
*output = redis::NilString();
*output = conn->NilString();
} else {
*output = redis::BulkString(util::Float2String(GetDistanceByUnit(distance)));
}
Expand Down Expand Up @@ -177,7 +177,7 @@ class CommandGeoHash : public Commander {
hashes.resize(members_.size(), "");
}

*output = redis::MultiBulkString(hashes);
*output = conn->MultiBulkString(hashes);
return Status::OK();
}

Expand Down Expand Up @@ -206,16 +206,16 @@ class CommandGeoPos : public Commander {

if (s.IsNotFound()) {
list.resize(members_.size(), "");
*output = redis::MultiBulkString(list);
*output = conn->MultiBulkString(list);
return Status::OK();
}

for (const auto &member : members_) {
auto iter = geo_points.find(member.ToString());
if (iter == geo_points.end()) {
list.emplace_back(redis::NilString());
list.emplace_back(conn->NilString());
} else {
list.emplace_back(redis::MultiBulkString(
list.emplace_back(conn->MultiBulkString(
{util::Float2String(iter->second.longitude), util::Float2String(iter->second.latitude)}));
}
}
Expand Down Expand Up @@ -314,12 +314,12 @@ class CommandGeoRadius : public CommandGeoBase {
if (store_key_.size() != 0) {
*output = redis::Integer(geo_points.size());
} else {
*output = GenerateOutput(geo_points);
*output = GenerateOutput(conn, geo_points);
}
return Status::OK();
}

std::string GenerateOutput(const std::vector<GeoPoint> &geo_points) {
std::string GenerateOutput(Connection *conn, const std::vector<GeoPoint> &geo_points) {
int result_length = static_cast<int>(geo_points.size());
int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_;
std::vector<std::string> list;
Expand All @@ -337,8 +337,8 @@ class CommandGeoRadius : public CommandGeoBase {
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
}
if (with_coord_) {
one.emplace_back(redis::MultiBulkString(
{util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
one.emplace_back(
conn->MultiBulkString({util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
}
list.emplace_back(redis::Array(one));
}
Expand Down Expand Up @@ -440,7 +440,7 @@ class CommandGeoSearch : public CommandGeoBase {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = generateOutput(geo_points);
*output = generateOutput(conn, geo_points);

return Status::OK();
}
Expand Down Expand Up @@ -496,7 +496,7 @@ class CommandGeoSearch : public CommandGeoBase {
return Status::OK();
}

std::string generateOutput(const std::vector<GeoPoint> &geo_points) {
std::string generateOutput(Connection *conn, const std::vector<GeoPoint> &geo_points) {
int result_length = static_cast<int>(geo_points.size());
int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_;
std::vector<std::string> output;
Expand All @@ -515,8 +515,8 @@ class CommandGeoSearch : public CommandGeoBase {
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
}
if (with_coord_) {
one.emplace_back(redis::MultiBulkString(
{util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
one.emplace_back(
conn->MultiBulkString({util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
}
output.emplace_back(redis::Array(one));
}
Expand Down Expand Up @@ -644,7 +644,7 @@ class CommandGeoRadiusByMember : public CommandGeoRadius {
if (store_key_.size() != 0) {
*output = redis::Integer(geo_points.size());
} else {
*output = GenerateOutput(geo_points);
*output = GenerateOutput(conn, geo_points);
}

return Status::OK();
Expand Down
Loading

0 comments on commit 3df2070

Please sign in to comment.