Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the RESP3 null for the nil string and array #2017

Merged
merged 5 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ void ReplicationThread::run() {
}

ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"AUTH", srv_->GetConfig()->masterauth}));
SendString(bev, redis::ArrayOfBulkStrings({"AUTH", srv_->GetConfig()->masterauth}));
LOG(INFO) << "[replication] Auth request was sent, waiting for response";
repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
return CBState::NEXT;
Expand All @@ -418,7 +418,7 @@ ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) { //
}

ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"_db_name"}));
SendString(bev, redis::ArrayOfBulkStrings({"_db_name"}));
repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
LOG(INFO) << "[replication] Check db name request was sent, waiting for response";
return CBState::NEXT;
Expand Down Expand Up @@ -456,7 +456,7 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev)
data_to_send.emplace_back("ip-address");
data_to_send.emplace_back(config->replica_announce_ip);
}
SendString(bev, redis::MultiBulkString(data_to_send));
SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
repl_state_.store(kReplReplConf, std::memory_order_relaxed);
LOG(INFO) << "[replication] replconf request was sent, waiting for response";
return CBState::NEXT;
Expand Down Expand Up @@ -513,11 +513,11 @@ ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev)
// Also use old PSYNC if replica can't find replication id from WAL and DB.
if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || replid.length() != kReplIdLength) {
next_try_old_psync_ = false; // Reset next_try_old_psync_
SendString(bev, redis::MultiBulkString({"PSYNC", std::to_string(next_seq)}));
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
} else {
// NEW PSYNC "Unique Replication Sequence ID": replication id and sequence id
SendString(bev, redis::MultiBulkString({"PSYNC", replid, std::to_string(next_seq)}));
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid, std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use new psync, current unique replication sequence id: " << replid << ":"
<< cur_seq;
}
Expand Down Expand Up @@ -607,7 +607,7 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
}

ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"_fetch_meta"}));
SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"}));
repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
LOG(INFO) << "[replication] Start syncing data with fullsync";
return CBState::NEXT;
Expand Down Expand Up @@ -835,7 +835,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) {
std::string auth = srv_->GetConfig()->masterauth;
if (!auth.empty()) {
UniqueEvbuf evbuf;
const auto auth_command = redis::MultiBulkString({"AUTH", auth});
const auto auth_command = redis::ArrayOfBulkStrings({"AUTH", auth});
auto s = util::SockSend(sock_fd, auth_command, ssl);
if (!s.IsOK()) return s.Prefixed("send auth command err");
while (true) {
Expand Down Expand Up @@ -921,7 +921,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const
}
files_str.pop_back();

const auto fetch_command = redis::MultiBulkString({"_fetch_file", files_str});
const auto fetch_command = redis::ArrayOfBulkStrings({"_fetch_file", files_str});
auto s = util::SockSend(sock_fd, fetch_command, ssl);
if (!s.IsOK()) return s.Prefixed("send fetch file command");

Expand Down
25 changes: 12 additions & 13 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ void SlotMigrator::clean() {
}

Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
std::string cmd = redis::MultiBulkString({"auth", password}, false);
std::string cmd = redis::ArrayOfBulkStrings({"auth", password});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send AUTH command");
Expand All @@ -456,7 +456,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};

std::string cmd =
redis::MultiBulkString({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)});
redis::ArrayOfBulkStrings({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send command to the destination node");
Expand Down Expand Up @@ -666,7 +666,7 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
}
*restore_cmds += redis::MultiBulkString(command, false);
*restore_cmds += redis::ArrayOfBulkStrings(command);
current_pipeline_size_++;

// Check whether pipeline needs to be sent
Expand Down Expand Up @@ -747,7 +747,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
if (metadata.Type() != kRedisBitmap) {
item_count++;
if (item_count >= kMaxItemsInCommand) {
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
item_count = 0;
// Have to clear saved items
Expand All @@ -764,13 +764,13 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata

// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
}

// Add TTL for complex key
if (metadata.expire > 0) {
*restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}

Expand Down Expand Up @@ -809,7 +809,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad
if (!s.IsOK()) {
return s;
}
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;

user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());
Expand All @@ -822,15 +822,14 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad

// commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration
// XSETID is used to adjust stream's info on the destination node according to the current values on the source
*restore_cmds += redis::MultiBulkString(
{"XSETID", key.ToString(), metadata.last_generated_id.ToString(), "ENTRIESADDED",
std::to_string(metadata.entries_added), "MAXDELETEDID", metadata.max_deleted_entry_id.ToString()},
false);
*restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), metadata.last_generated_id.ToString(),
"ENTRIESADDED", std::to_string(metadata.entries_added), "MAXDELETEDID",
metadata.max_deleted_entry_id.ToString()});
current_pipeline_size_++;

// Add TTL
if (metadata.expire > 0) {
*restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}

Expand Down Expand Up @@ -862,7 +861,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr<
uint32_t offset = (index * 8) + (byte_idx * 8) + bit_idx;
user_cmd->emplace_back(std::to_string(offset));
user_cmd->emplace_back("1");
*restore_cmds += redis::MultiBulkString(*user_cmd, false);
*restore_cmds += redis::ArrayOfBulkStrings(*user_cmd);
current_pipeline_size_++;
user_cmd->erase(user_cmd->begin() + 2, user_cmd->end());
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/sync_migrate_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) {
void SyncMigrateContext::TimerCB(int, int16_t events) {
auto &&slot_migrator = srv_->slot_migrator;

conn_->Reply(redis::NilString());
conn_->Reply(conn_->NilString());
timer_.reset();

slot_migrator->CancelSyncCtx();
Expand Down
6 changes: 3 additions & 3 deletions src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class BlockingCommander : public Commander,
private EventCallbackBase<BlockingCommander> {
public:
// method to reply when no operation happens
virtual std::string NoopReply() = 0;
virtual std::string NoopReply(const Connection *conn) = 0;

// method to block keys
virtual void BlockKeys() = 0;
Expand All @@ -48,7 +48,7 @@ class BlockingCommander : public Commander,
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
if (conn_->IsInExec()) {
*output = NoopReply();
*output = NoopReply(conn_);
return Status::OK(); // no blocking in multi-exec
}

Expand Down Expand Up @@ -111,7 +111,7 @@ class BlockingCommander : public Commander,
}

void TimerCB(int, int16_t) {
conn_->Reply(NoopReply());
conn_->Reply(NoopReply(conn_));
timer_.reset();
UnblockKeys();
auto bev = conn_->GetBufferEvent();
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class CommandBitfield : public Commander {
str_rets[i] = redis::Integer(rets[i]->Value());
}
} else {
str_rets[i] = redis::NilString();
str_rets[i] = conn->NilString();
}
}
*output = redis::Array(str_rets);
Expand Down
4 changes: 2 additions & 2 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class CommandBFInfo : public Commander {
*output += redis::SimpleString("Number of items inserted");
*output += redis::Integer(info.size);
*output += redis::SimpleString("Expansion rate");
*output += info.expansion == 0 ? redis::NilString() : redis::Integer(info.expansion);
*output += info.expansion == 0 ? conn->NilString() : redis::Integer(info.expansion);
break;
case BloomInfoType::kCapacity:
*output = redis::Integer(info.capacity);
Expand All @@ -360,7 +360,7 @@ class CommandBFInfo : public Commander {
*output = redis::Integer(info.size);
break;
case BloomInfoType::kExpansion:
*output = info.expansion == 0 ? redis::NilString() : redis::Integer(info.expansion);
*output = info.expansion == 0 ? conn->NilString() : redis::Integer(info.expansion);
break;
}

Expand Down
28 changes: 14 additions & 14 deletions src/commands/cmd_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class CommandGeoDist : public CommandGeoBase {
}

if (s.IsNotFound()) {
*output = redis::NilString();
*output = conn->NilString();
} else {
*output = redis::BulkString(util::Float2String(GetDistanceByUnit(distance)));
}
Expand Down Expand Up @@ -177,7 +177,7 @@ class CommandGeoHash : public Commander {
hashes.resize(members_.size(), "");
}

*output = redis::MultiBulkString(hashes);
*output = conn->MultiBulkString(hashes);
return Status::OK();
}

Expand Down Expand Up @@ -206,16 +206,16 @@ class CommandGeoPos : public Commander {

if (s.IsNotFound()) {
list.resize(members_.size(), "");
*output = redis::MultiBulkString(list);
*output = conn->MultiBulkString(list);
return Status::OK();
}

for (const auto &member : members_) {
auto iter = geo_points.find(member.ToString());
if (iter == geo_points.end()) {
list.emplace_back(redis::NilString());
list.emplace_back(conn->NilString());
} else {
list.emplace_back(redis::MultiBulkString(
list.emplace_back(conn->MultiBulkString(
{util::Float2String(iter->second.longitude), util::Float2String(iter->second.latitude)}));
}
}
Expand Down Expand Up @@ -314,12 +314,12 @@ class CommandGeoRadius : public CommandGeoBase {
if (store_key_.size() != 0) {
*output = redis::Integer(geo_points.size());
} else {
*output = GenerateOutput(geo_points);
*output = GenerateOutput(conn, geo_points);
}
return Status::OK();
}

std::string GenerateOutput(const std::vector<GeoPoint> &geo_points) {
std::string GenerateOutput(const Connection *conn, const std::vector<GeoPoint> &geo_points) {
int result_length = static_cast<int>(geo_points.size());
int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_;
std::vector<std::string> list;
Expand All @@ -337,8 +337,8 @@ class CommandGeoRadius : public CommandGeoBase {
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
}
if (with_coord_) {
one.emplace_back(redis::MultiBulkString(
{util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
one.emplace_back(
conn->MultiBulkString({util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
}
list.emplace_back(redis::Array(one));
}
Expand Down Expand Up @@ -440,7 +440,7 @@ class CommandGeoSearch : public CommandGeoBase {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = generateOutput(geo_points);
*output = generateOutput(conn, geo_points);

return Status::OK();
}
Expand Down Expand Up @@ -496,7 +496,7 @@ class CommandGeoSearch : public CommandGeoBase {
return Status::OK();
}

std::string generateOutput(const std::vector<GeoPoint> &geo_points) {
std::string generateOutput(const Connection *conn, const std::vector<GeoPoint> &geo_points) {
int result_length = static_cast<int>(geo_points.size());
int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_;
std::vector<std::string> output;
Expand All @@ -515,8 +515,8 @@ class CommandGeoSearch : public CommandGeoBase {
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
}
if (with_coord_) {
one.emplace_back(redis::MultiBulkString(
{util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
one.emplace_back(
conn->MultiBulkString({util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
}
output.emplace_back(redis::Array(one));
}
Expand Down Expand Up @@ -644,7 +644,7 @@ class CommandGeoRadiusByMember : public CommandGeoRadius {
if (store_key_.size() != 0) {
*output = redis::Integer(geo_points.size());
} else {
*output = GenerateOutput(geo_points);
*output = GenerateOutput(conn, geo_points);
}

return Status::OK();
Expand Down
Loading
Loading