From 3d44c797ebc80aa10f3e8b0ecd30870e24b905cf Mon Sep 17 00:00:00 2001 From: git-hulk Date: Tue, 9 Jan 2024 00:04:19 +0800 Subject: [PATCH 1/5] Implement the RESP3 null for the nil string and array For the null type in RESP2, the nil string is represented by `$-1\r\n` and nil array is `*-1\r\n`. But after RESP3, they will be reduced into an unify string `_\r\n`. So in this PR, most of them are renamed redis::MultiBulkString/Redis::NilString to connection::MultiBulkString and connection::NilString. --- src/cluster/replication.cc | 16 ++-- src/cluster/slot_migrate.cc | 26 +++--- src/cluster/sync_migrate_context.cc | 2 +- src/commands/blocking_commander.h | 6 +- src/commands/cmd_bit.cc | 2 +- src/commands/cmd_bloom_filter.cc | 4 +- src/commands/cmd_geo.cc | 28 +++--- src/commands/cmd_hash.cc | 20 ++--- src/commands/cmd_json.cc | 58 ++++++------ src/commands/cmd_list.cc | 34 +++---- src/commands/cmd_pubsub.cc | 33 +++---- src/commands/cmd_server.cc | 29 +++--- src/commands/cmd_set.cc | 16 ++-- src/commands/cmd_stream.cc | 44 ++++----- src/commands/cmd_string.cc | 18 ++-- src/commands/cmd_txn.cc | 2 +- src/commands/cmd_zset.cc | 22 ++--- src/commands/commander.cc | 2 +- src/commands/scan_base.h | 9 +- src/server/redis_connection.cc | 33 +++++++ src/server/redis_connection.h | 6 ++ src/server/redis_reply.cc | 34 ++----- src/server/redis_reply.h | 13 +-- src/server/server.cc | 4 +- src/stats/log_collector.cc | 2 +- src/storage/batch_extractor.cc | 12 +-- src/storage/scripting.cc | 6 +- tests/cppunit/string_reply_test.cc | 2 +- tests/gocase/unit/debug/debug_test.go | 5 ++ tests/gocase/unit/protocol/protocol_test.go | 98 +++++++++++++++++++++ utils/kvrocks2redis/parser.cc | 18 ++-- utils/kvrocks2redis/redis_writer.cc | 2 +- 32 files changed, 364 insertions(+), 242 deletions(-) diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index c2d17b5a05b..ba3afdf0b1d 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -396,7 +396,7 @@ void ReplicationThread::run() { } ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) { - SendString(bev, redis::MultiBulkString({"AUTH", srv_->GetConfig()->masterauth})); + SendString(bev, redis::Array2RESP({"AUTH", srv_->GetConfig()->masterauth})); LOG(INFO) << "[replication] Auth request was sent, waiting for response"; repl_state_.store(kReplSendAuth, std::memory_order_relaxed); return CBState::NEXT; @@ -418,7 +418,7 @@ ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) { // } ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent *bev) { - SendString(bev, redis::MultiBulkString({"_db_name"})); + SendString(bev, redis::Array2RESP({"_db_name"})); repl_state_.store(kReplCheckDBName, std::memory_order_relaxed); LOG(INFO) << "[replication] Check db name request was sent, waiting for response"; return CBState::NEXT; @@ -456,7 +456,7 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev) data_to_send.emplace_back("ip-address"); data_to_send.emplace_back(config->replica_announce_ip); } - SendString(bev, redis::MultiBulkString(data_to_send)); + SendString(bev, redis::Array2RESP(data_to_send)); repl_state_.store(kReplReplConf, std::memory_order_relaxed); LOG(INFO) << "[replication] replconf request was sent, waiting for response"; return CBState::NEXT; @@ -513,11 +513,11 @@ ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev) // Also use old PSYNC if replica can't find replication id from WAL and DB. if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || replid.length() != kReplIdLength) { next_try_old_psync_ = false; // Reset next_try_old_psync_ - SendString(bev, redis::MultiBulkString({"PSYNC", std::to_string(next_seq)})); + SendString(bev, redis::Array2RESP({"PSYNC", std::to_string(next_seq)})); LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq; } else { // NEW PSYNC "Unique Replication Sequence ID": replication id and sequence id - SendString(bev, redis::MultiBulkString({"PSYNC", replid, std::to_string(next_seq)})); + SendString(bev, redis::Array2RESP({"PSYNC", replid, std::to_string(next_seq)})); LOG(INFO) << "[replication] Try to use new psync, current unique replication sequence id: " << replid << ":" << cur_seq; } @@ -607,7 +607,7 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent * } ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent *bev) { - SendString(bev, redis::MultiBulkString({"_fetch_meta"})); + SendString(bev, redis::Array2RESP({"_fetch_meta"})); repl_state_.store(kReplFetchMeta, std::memory_order_relaxed); LOG(INFO) << "[replication] Start syncing data with fullsync"; return CBState::NEXT; @@ -835,7 +835,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) { std::string auth = srv_->GetConfig()->masterauth; if (!auth.empty()) { UniqueEvbuf evbuf; - const auto auth_command = redis::MultiBulkString({"AUTH", auth}); + const auto auth_command = redis::Array2RESP({"AUTH", auth}); auto s = util::SockSend(sock_fd, auth_command, ssl); if (!s.IsOK()) return s.Prefixed("send auth command err"); while (true) { @@ -921,7 +921,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const } files_str.pop_back(); - const auto fetch_command = redis::MultiBulkString({"_fetch_file", files_str}); + const auto fetch_command = redis::Array2RESP({"_fetch_file", files_str}); auto s = util::SockSend(sock_fd, fetch_command, ssl); if (!s.IsOK()) return s.Prefixed("send fetch file command"); diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 09b220ae3c9..bb3334cd391 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -438,7 +438,7 @@ void SlotMigrator::clean() { } Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) { - std::string cmd = redis::MultiBulkString({"auth", password}, false); + std::string cmd = redis::Array2RESP({"auth", password}); auto s = util::SockSend(sock_fd, cmd); if (!s.IsOK()) { return s.Prefixed("failed to send AUTH command"); @@ -455,8 +455,7 @@ Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) { Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) { if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"}; - std::string cmd = - redis::MultiBulkString({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)}); + std::string cmd = redis::Array2RESP({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)}); auto s = util::SockSend(sock_fd, cmd); if (!s.IsOK()) { return s.Prefixed("failed to send command to the destination node"); @@ -666,7 +665,7 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata command.emplace_back("PXAT"); command.emplace_back(std::to_string(metadata.expire)); } - *restore_cmds += redis::MultiBulkString(command, false); + *restore_cmds += redis::Array2RESP(command); current_pipeline_size_++; // Check whether pipeline needs to be sent @@ -747,7 +746,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata if (metadata.Type() != kRedisBitmap) { item_count++; if (item_count >= kMaxItemsInCommand) { - *restore_cmds += redis::MultiBulkString(user_cmd, false); + *restore_cmds += redis::Array2RESP(user_cmd); current_pipeline_size_++; item_count = 0; // Have to clear saved items @@ -764,13 +763,13 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata // Have to check the item count of the last command list if (item_count % kMaxItemsInCommand != 0) { - *restore_cmds += redis::MultiBulkString(user_cmd, false); + *restore_cmds += redis::Array2RESP(user_cmd); current_pipeline_size_++; } // Add TTL for complex key if (metadata.expire > 0) { - *restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false); + *restore_cmds += redis::Array2RESP({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}); current_pipeline_size_++; } @@ -809,7 +808,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad if (!s.IsOK()) { return s; } - *restore_cmds += redis::MultiBulkString(user_cmd, false); + *restore_cmds += redis::Array2RESP(user_cmd); current_pipeline_size_++; user_cmd.erase(user_cmd.begin() + 2, user_cmd.end()); @@ -822,15 +821,14 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad // commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration // XSETID is used to adjust stream's info on the destination node according to the current values on the source - *restore_cmds += redis::MultiBulkString( - {"XSETID", key.ToString(), metadata.last_generated_id.ToString(), "ENTRIESADDED", - std::to_string(metadata.entries_added), "MAXDELETEDID", metadata.max_deleted_entry_id.ToString()}, - false); + *restore_cmds += redis::Array2RESP({"XSETID", key.ToString(), metadata.last_generated_id.ToString(), "ENTRIESADDED", + std::to_string(metadata.entries_added), "MAXDELETEDID", + metadata.max_deleted_entry_id.ToString()}); current_pipeline_size_++; // Add TTL if (metadata.expire > 0) { - *restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false); + *restore_cmds += redis::Array2RESP({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}); current_pipeline_size_++; } @@ -862,7 +860,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr< uint32_t offset = (index * 8) + (byte_idx * 8) + bit_idx; user_cmd->emplace_back(std::to_string(offset)); user_cmd->emplace_back("1"); - *restore_cmds += redis::MultiBulkString(*user_cmd, false); + *restore_cmds += redis::Array2RESP(*user_cmd); current_pipeline_size_++; user_cmd->erase(user_cmd->begin() + 2, user_cmd->end()); } diff --git a/src/cluster/sync_migrate_context.cc b/src/cluster/sync_migrate_context.cc index 0633cecd1de..3ba2806ca45 100644 --- a/src/cluster/sync_migrate_context.cc +++ b/src/cluster/sync_migrate_context.cc @@ -54,7 +54,7 @@ void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) { void SyncMigrateContext::TimerCB(int, int16_t events) { auto &&slot_migrator = srv_->slot_migrator; - conn_->Reply(redis::NilString()); + conn_->Reply(conn_->NilString()); timer_.reset(); slot_migrator->CancelSyncCtx(); diff --git a/src/commands/blocking_commander.h b/src/commands/blocking_commander.h index 537e770fd1b..05883a8ac04 100644 --- a/src/commands/blocking_commander.h +++ b/src/commands/blocking_commander.h @@ -31,7 +31,7 @@ class BlockingCommander : public Commander, private EventCallbackBase { public: // method to reply when no operation happens - virtual std::string NoopReply() = 0; + virtual std::string NoopReply(const Connection *conn) = 0; // method to block keys virtual void BlockKeys() = 0; @@ -48,7 +48,7 @@ class BlockingCommander : public Commander, // usually put to the end of the Execute method Status StartBlocking(int64_t timeout, std::string *output) { if (conn_->IsInExec()) { - *output = NoopReply(); + *output = NoopReply(conn_); return Status::OK(); // no blocking in multi-exec } @@ -111,7 +111,7 @@ class BlockingCommander : public Commander, } void TimerCB(int, int16_t) { - conn_->Reply(NoopReply()); + conn_->Reply(NoopReply(conn_)); timer_.reset(); UnblockKeys(); auto bev = conn_->GetBufferEvent(); diff --git a/src/commands/cmd_bit.cc b/src/commands/cmd_bit.cc index 6e9fe5ba33f..65e90d046d1 100644 --- a/src/commands/cmd_bit.cc +++ b/src/commands/cmd_bit.cc @@ -342,7 +342,7 @@ class CommandBitfield : public Commander { str_rets[i] = redis::Integer(rets[i]->Value()); } } else { - str_rets[i] = redis::NilString(); + str_rets[i] = conn->NilString(); } } *output = redis::Array(str_rets); diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc index e4316a3c05d..f33979e3d50 100644 --- a/src/commands/cmd_bloom_filter.cc +++ b/src/commands/cmd_bloom_filter.cc @@ -345,7 +345,7 @@ class CommandBFInfo : public Commander { *output += redis::SimpleString("Number of items inserted"); *output += redis::Integer(info.size); *output += redis::SimpleString("Expansion rate"); - *output += info.expansion == 0 ? redis::NilString() : redis::Integer(info.expansion); + *output += info.expansion == 0 ? conn->NilString() : redis::Integer(info.expansion); break; case BloomInfoType::kCapacity: *output = redis::Integer(info.capacity); @@ -360,7 +360,7 @@ class CommandBFInfo : public Commander { *output = redis::Integer(info.size); break; case BloomInfoType::kExpansion: - *output = info.expansion == 0 ? redis::NilString() : redis::Integer(info.expansion); + *output = info.expansion == 0 ? conn->NilString() : redis::Integer(info.expansion); break; } diff --git a/src/commands/cmd_geo.cc b/src/commands/cmd_geo.cc index 39854368e66..991d16c5ed3 100644 --- a/src/commands/cmd_geo.cc +++ b/src/commands/cmd_geo.cc @@ -148,7 +148,7 @@ class CommandGeoDist : public CommandGeoBase { } if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); } else { *output = redis::BulkString(util::Float2String(GetDistanceByUnit(distance))); } @@ -177,7 +177,7 @@ class CommandGeoHash : public Commander { hashes.resize(members_.size(), ""); } - *output = redis::MultiBulkString(hashes); + *output = conn->MultiBulkString(hashes); return Status::OK(); } @@ -206,16 +206,16 @@ class CommandGeoPos : public Commander { if (s.IsNotFound()) { list.resize(members_.size(), ""); - *output = redis::MultiBulkString(list); + *output = conn->MultiBulkString(list); return Status::OK(); } for (const auto &member : members_) { auto iter = geo_points.find(member.ToString()); if (iter == geo_points.end()) { - list.emplace_back(redis::NilString()); + list.emplace_back(conn->NilString()); } else { - list.emplace_back(redis::MultiBulkString( + list.emplace_back(conn->MultiBulkString( {util::Float2String(iter->second.longitude), util::Float2String(iter->second.latitude)})); } } @@ -314,12 +314,12 @@ class CommandGeoRadius : public CommandGeoBase { if (store_key_.size() != 0) { *output = redis::Integer(geo_points.size()); } else { - *output = GenerateOutput(geo_points); + *output = GenerateOutput(conn, geo_points); } return Status::OK(); } - std::string GenerateOutput(const std::vector &geo_points) { + std::string GenerateOutput(Connection *conn, const std::vector &geo_points) { int result_length = static_cast(geo_points.size()); int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_; std::vector list; @@ -337,8 +337,8 @@ class CommandGeoRadius : public CommandGeoBase { one.emplace_back(redis::BulkString(util::Float2String(geo_point.score))); } if (with_coord_) { - one.emplace_back(redis::MultiBulkString( - {util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)})); + one.emplace_back( + conn->MultiBulkString({util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)})); } list.emplace_back(redis::Array(one)); } @@ -440,7 +440,7 @@ class CommandGeoSearch : public CommandGeoBase { if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } - *output = generateOutput(geo_points); + *output = generateOutput(conn, geo_points); return Status::OK(); } @@ -496,7 +496,7 @@ class CommandGeoSearch : public CommandGeoBase { return Status::OK(); } - std::string generateOutput(const std::vector &geo_points) { + std::string generateOutput(Connection *conn, const std::vector &geo_points) { int result_length = static_cast(geo_points.size()); int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_; std::vector output; @@ -515,8 +515,8 @@ class CommandGeoSearch : public CommandGeoBase { one.emplace_back(redis::BulkString(util::Float2String(geo_point.score))); } if (with_coord_) { - one.emplace_back(redis::MultiBulkString( - {util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)})); + one.emplace_back( + conn->MultiBulkString({util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)})); } output.emplace_back(redis::Array(one)); } @@ -644,7 +644,7 @@ class CommandGeoRadiusByMember : public CommandGeoRadius { if (store_key_.size() != 0) { *output = redis::Integer(geo_points.size()); } else { - *output = GenerateOutput(geo_points); + *output = GenerateOutput(conn, geo_points); } return Status::OK(); diff --git a/src/commands/cmd_hash.cc b/src/commands/cmd_hash.cc index 7ae901919cb..8b9526f97fc 100644 --- a/src/commands/cmd_hash.cc +++ b/src/commands/cmd_hash.cc @@ -37,7 +37,7 @@ class CommandHGet : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value); + *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value); return Status::OK(); } }; @@ -208,9 +208,9 @@ class CommandHMGet : public Commander { if (s.IsNotFound()) { values.resize(fields.size(), ""); - *output = redis::MultiBulkString(values); + *output = conn->MultiBulkString(values); } else { - *output = redis::MultiBulkString(values, statuses); + *output = conn->MultiBulkString(values, statuses); } return Status::OK(); } @@ -263,7 +263,7 @@ class CommandHKeys : public Commander { for (const auto &fv : field_values) { keys.emplace_back(fv.field); } - *output = redis::MultiBulkString(keys); + *output = conn->MultiBulkString(keys); return Status::OK(); } @@ -284,7 +284,7 @@ class CommandHVals : public Commander { for (const auto &p : field_values) { values.emplace_back(p.value); } - *output = MultiBulkString(values, false); + *output = conn->MultiBulkString(values, false); return Status::OK(); } @@ -306,7 +306,7 @@ class CommandHGetAll : public Commander { kv_pairs.emplace_back(p.field); kv_pairs.emplace_back(p.value); } - *output = MultiBulkString(kv_pairs, false); + *output = conn->MultiBulkString(kv_pairs, false); return Status::OK(); } @@ -350,7 +350,7 @@ class CommandHRangeByLex : public Commander { kv_pairs.emplace_back(p.field); kv_pairs.emplace_back(p.value); } - *output = MultiBulkString(kv_pairs, false); + *output = conn->MultiBulkString(kv_pairs, false); return Status::OK(); } @@ -372,7 +372,7 @@ class CommandHScan : public CommandSubkeyScanBase { return {Status::RedisExecErr, s.ToString()}; } - *output = GenerateOutput(srv, fields, values, CursorType::kTypeHash); + *output = GenerateOutput(srv, conn, fields, values, CursorType::kTypeHash); return Status::OK(); } }; @@ -415,9 +415,9 @@ class CommandHRandField : public Commander { } if (no_parameters_) - *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(result_entries[0]); + *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(result_entries[0]); else - *output = redis::MultiBulkString(result_entries, false); + *output = conn->MultiBulkString(result_entries, false); return Status::OK(); } diff --git a/src/commands/cmd_json.cc b/src/commands/cmd_json.cc index 737581f81e7..5377ed31654 100644 --- a/src/commands/cmd_json.cc +++ b/src/commands/cmd_json.cc @@ -33,13 +33,13 @@ namespace redis { template , int> = 0> -std::string OptionalsToString(Optionals &opts) { +std::string OptionalsToString(const Connection *conn, Optionals &opts) { std::string str = MultiLen(opts.size()); for (const auto &opt : opts) { if (opt.has_value()) { str += redis::Integer(opt.value()); } else { - str += redis::NilString(); + str += conn->NilString(); } } return str; @@ -100,7 +100,7 @@ class CommandJsonGet : public Commander { JsonValue result; auto s = json.Get(args_[1], paths_, &result); if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; @@ -129,7 +129,7 @@ class CommandJsonInfo : public Commander { auto format_str = storage_format == JsonStorageFormat::JSON ? "json" : storage_format == JsonStorageFormat::CBOR ? "cbor" : "unknown"; - output->append(redis::MultiBulkString({"storage_format", format_str})); + output->append(conn->MultiBulkString({"storage_format", format_str})); return Status::OK(); } }; @@ -144,7 +144,7 @@ class CommandJsonArrAppend : public Commander { auto s = json.ArrAppend(args_[1], args_[2], {args_.begin() + 3, args_.end()}, &results); if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } }; @@ -169,12 +169,12 @@ class CommandJsonArrInsert : public Commander { auto s = json.ArrInsert(args_[1], args_[2], index_, {args_.begin() + 4, args_.end()}, &results); if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } @@ -198,11 +198,11 @@ class CommandJsonType : public Commander { auto s = json.Type(args_[1], path, &types); if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, s.ToString()}; if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } - *output = redis::MultiBulkString(types); + *output = conn->MultiBulkString(types); return Status::OK(); } }; @@ -219,16 +219,16 @@ class CommandJsonObjkeys : public Commander { auto s = json.ObjKeys(args_[1], path, &results); if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, s.ToString()}; if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } *output = redis::MultiLen(results.size()); for (const auto &item : results) { if (item.has_value()) { - *output += redis::MultiBulkString(item.value(), false); + *output += conn->MultiBulkString(item.value(), false); } else { - *output += redis::NilString(); + *output += conn->NilString(); } } @@ -248,7 +248,7 @@ class CommandJsonClear : public Commander { auto s = json.Clear(args_[1], path, &result); if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } @@ -269,11 +269,11 @@ class CommandJsonToggle : public Commander { auto s = json.Toggle(args_[1], path, &results); if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, s.ToString()}; if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } }; @@ -293,12 +293,12 @@ class CommandJsonArrLen : public Commander { Optionals results; auto s = json.ArrLen(args_[1], path, &results); if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } }; @@ -320,7 +320,7 @@ class CommandJsonMerge : public Commander { } if (!result) { - *output = redis::NilString(); + *output = conn->NilString(); } else { *output = redis::SimpleString("OK"); } @@ -356,7 +356,7 @@ class CommandJsonArrPop : public Commander { if (data.has_value()) { *output += redis::BulkString(GET_OR_RET(data->Print())); } else { - *output += redis::NilString(); + *output += conn->NilString(); } } @@ -383,12 +383,12 @@ class CommandJsonObjLen : public Commander { Optionals results; auto s = json.ObjLen(args_[1], path, &results); if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } }; @@ -411,12 +411,12 @@ class CommandJsonArrTrim : public Commander { auto s = json.ArrTrim(args_[1], path_, start_, stop_, &results); if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } @@ -452,13 +452,13 @@ class CommanderJsonArrIndex : public Commander { auto s = json.ArrIndex(args_[1], args_[2], args_[3], start_, end_, &results); if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } @@ -480,7 +480,7 @@ class CommandJsonDel : public Commander { } auto s = json.Del(args_[1], path, &result); if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; @@ -540,7 +540,7 @@ class CommandJsonStrAppend : public Commander { auto s = json.StrAppend(args_[1], path, args_[3], &results); if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } }; @@ -561,7 +561,7 @@ class CommandJsonStrLen : public Commander { auto s = json.StrLen(args_[1], path, &results); if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; - *output = OptionalsToString(results); + *output = OptionalsToString(conn, results); return Status::OK(); } }; @@ -589,7 +589,7 @@ class CommandJsonMGet : public Commander { } } - *output = MultiBulkString(values, statuses); + *output = conn->MultiBulkString(values, statuses); return Status::OK(); } }; diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index 86a0708d51b..5470d444f4f 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -119,9 +119,9 @@ class CommandPop : public Commander { } if (s.IsNotFound()) { - *output = redis::MultiLen(-1); + *output = conn->NilArray(); } else { - *output = redis::MultiBulkString(elems); + *output = conn->MultiBulkString(elems); } } else { std::string elem; @@ -131,7 +131,7 @@ class CommandPop : public Commander { } if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); } else { *output = redis::BulkString(elem); } @@ -209,9 +209,9 @@ class CommandLMPop : public Commander { } if (elems.empty()) { - *output = redis::NilString(); + *output = conn->NilString(); } else { - std::string elems_bulk = redis::MultiBulkString(elems); + std::string elems_bulk = conn->MultiBulkString(elems); *output = redis::Array({redis::BulkString(chosen_key), std::move(elems_bulk)}); } @@ -298,10 +298,10 @@ class CommandBPop : public BlockingCommander { if (s.ok()) { if (!last_key_ptr) { - conn_->Reply(redis::MultiBulkString({"", ""})); + conn_->Reply(conn_->MultiBulkString({"", ""})); } else { conn_->GetServer()->UpdateWatchedKeysManually({*last_key_ptr}); - conn_->Reply(redis::MultiBulkString({*last_key_ptr, std::move(elem)})); + conn_->Reply(conn_->MultiBulkString({*last_key_ptr, std::move(elem)})); } } else if (!s.IsNotFound()) { conn_->Reply(redis::Error("ERR " + s.ToString())); @@ -315,7 +315,7 @@ class CommandBPop : public BlockingCommander { return !s.IsNotFound(); } - std::string NoopReply() override { return redis::NilString(); } + std::string NoopReply(const Connection *conn) override { return conn->NilString(); } private: bool left_ = false; @@ -410,7 +410,7 @@ class CommandBLMPop : public BlockingCommander { if (s.ok()) { if (!elems.empty()) { conn_->GetServer()->UpdateWatchedKeysManually({chosen_key}); - std::string elems_bulk = redis::MultiBulkString(elems); + std::string elems_bulk = conn_->MultiBulkString(elems); conn_->Reply(redis::Array({redis::BulkString(chosen_key), std::move(elems_bulk)})); } } else if (!s.IsNotFound()) { @@ -437,7 +437,7 @@ class CommandBLMPop : public BlockingCommander { return !s.IsNotFound(); } - std::string NoopReply() override { return redis::NilString(); } + std::string NoopReply(const Connection *conn) override { return conn->NilString(); } static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector &args) { CommandKeyRange range; @@ -536,7 +536,7 @@ class CommandLRange : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = redis::MultiBulkString(elems, false); + *output = conn->MultiBulkString(elems, false); return Status::OK(); } @@ -580,7 +580,7 @@ class CommandLIndex : public Commander { } if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); } else { *output = redis::BulkString(elem); } @@ -663,7 +663,7 @@ class CommandRPopLPUSH : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(elem); + *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(elem); return Status::OK(); } }; @@ -694,7 +694,7 @@ class CommandLMove : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(elem); + *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(elem); return Status::OK(); } @@ -769,7 +769,7 @@ class CommandBLMove : public BlockingCommander { return !empty; } - std::string NoopReply() override { return redis::MultiLen(-1); } + std::string NoopReply(const Connection *conn) override { return conn->NilArray(); } private: bool src_left_; @@ -826,7 +826,7 @@ class CommandLPos : public Commander { // We return nil or a single value if `COUNT` option is not given. if (!spec_.count.has_value()) { if (s.IsNotFound() || indexes.empty()) { - *output = redis::NilString(); + *output = conn->NilString(); } else { assert(indexes.size() == 1); *output = redis::Integer(indexes[0]); @@ -839,7 +839,7 @@ class CommandLPos : public Commander { for (const auto &index : indexes) { values.emplace_back(std::to_string(index)); } - *output = redis::MultiBulkString(values, false); + *output = conn->MultiBulkString(values, false); } return Status::OK(); } diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc index 6ec61eea5d3..32065c9842a 100644 --- a/src/commands/cmd_pubsub.cc +++ b/src/commands/cmd_pubsub.cc @@ -73,10 +73,11 @@ class CommandMPublish : public Commander { } }; -void SubscribeCommandReply(std::string *output, const std::string &name, const std::string &sub_name, int num) { +void SubscribeCommandReply(Connection *conn, std::string *output, const std::string &name, const std::string &sub_name, + int num) { output->append(redis::MultiLen(3)); output->append(redis::BulkString(name)); - output->append(sub_name.empty() ? redis::NilString() : redis::BulkString(sub_name)); + output->append(sub_name.empty() ? conn->NilString() : BulkString(sub_name)); output->append(redis::Integer(num)); } @@ -85,7 +86,8 @@ class CommandSubscribe : public Commander { Status Execute(Server *srv, Connection *conn, std::string *output) override { for (unsigned i = 1; i < args_.size(); i++) { conn->SubscribeChannel(args_[i]); - SubscribeCommandReply(output, "subscribe", args_[i], conn->SubscriptionsCount() + conn->PSubscriptionsCount()); + SubscribeCommandReply(conn, output, "subscribe", args_[i], + conn->SubscriptionsCount() + conn->PSubscriptionsCount()); } return Status::OK(); } @@ -95,13 +97,13 @@ class CommandUnSubscribe : public Commander { public: Status Execute(Server *srv, Connection *conn, std::string *output) override { if (args_.size() == 1) { - conn->UnsubscribeAll([output](const std::string &sub_name, int num) { - SubscribeCommandReply(output, "unsubscribe", sub_name, num); + conn->UnsubscribeAll([conn, output](const std::string &sub_name, int num) { + SubscribeCommandReply(conn, output, "unsubscribe", sub_name, num); }); } else { for (size_t i = 1; i < args_.size(); i++) { conn->UnsubscribeChannel(args_[i]); - SubscribeCommandReply(output, "unsubscribe", args_[i], + SubscribeCommandReply(conn, output, "unsubscribe", args_[i], conn->SubscriptionsCount() + conn->PSubscriptionsCount()); } } @@ -114,7 +116,8 @@ class CommandPSubscribe : public Commander { Status Execute(Server *srv, Connection *conn, std::string *output) override { for (size_t i = 1; i < args_.size(); i++) { conn->PSubscribeChannel(args_[i]); - SubscribeCommandReply(output, "psubscribe", args_[i], conn->SubscriptionsCount() + conn->PSubscriptionsCount()); + SubscribeCommandReply(conn, output, "psubscribe", args_[i], + conn->SubscriptionsCount() + conn->PSubscriptionsCount()); } return Status::OK(); } @@ -124,13 +127,13 @@ class CommandPUnSubscribe : public Commander { public: Status Execute(Server *srv, Connection *conn, std::string *output) override { if (args_.size() == 1) { - conn->PUnsubscribeAll([output](const std::string &sub_name, int num) { - SubscribeCommandReply(output, "punsubscribe", sub_name, num); + conn->PUnsubscribeAll([conn, output](const std::string &sub_name, int num) { + SubscribeCommandReply(conn, output, "punsubscribe", sub_name, num); }); } else { for (size_t i = 1; i < args_.size(); i++) { conn->PUnsubscribeChannel(args_[i]); - SubscribeCommandReply(output, "punsubscribe", args_[i], + SubscribeCommandReply(conn, output, "punsubscribe", args_[i], conn->SubscriptionsCount() + conn->PSubscriptionsCount()); } } @@ -153,7 +156,7 @@ class CommandSSubscribe : public Commander { for (unsigned int i = 1; i < args_.size(); i++) { conn->SSubscribeChannel(args_[i], slot); - SubscribeCommandReply(output, "ssubscribe", args_[i], conn->SSubscriptionsCount()); + SubscribeCommandReply(conn, output, "ssubscribe", args_[i], conn->SSubscriptionsCount()); } return Status::OK(); } @@ -163,13 +166,13 @@ class CommandSUnSubscribe : public Commander { public: Status Execute(Server *srv, Connection *conn, std::string *output) override { if (args_.size() == 1) { - conn->SUnsubscribeAll([output](const std::string &sub_name, int num) { - SubscribeCommandReply(output, "sunsubscribe", sub_name, num); + conn->SUnsubscribeAll([conn, output](const std::string &sub_name, int num) { + SubscribeCommandReply(conn, output, "sunsubscribe", sub_name, num); }); } else { for (size_t i = 1; i < args_.size(); i++) { conn->SUnsubscribeChannel(args_[i], srv->GetConfig()->cluster_enabled ? GetSlotIdFromKey(args_[i]) : 0); - SubscribeCommandReply(output, "sunsubscribe", args_[i], conn->SSubscriptionsCount()); + SubscribeCommandReply(conn, output, "sunsubscribe", args_[i], conn->SSubscriptionsCount()); } } return Status::OK(); @@ -231,7 +234,7 @@ class CommandPubSub : public Commander { } else { srv->GetSChannelsByPattern(pattern_, &channels); } - *output = redis::MultiBulkString(channels); + *output = conn->MultiBulkString(channels); return Status::OK(); } diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index ef1f9a17b5b..f4b4c99860a 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -105,11 +105,11 @@ class CommandNamespace : public Commander { } namespaces.emplace_back(kDefaultNamespace); namespaces.emplace_back(config->requirepass); - *output = redis::MultiBulkString(namespaces, false); + *output = conn->MultiBulkString(namespaces, false); } else { auto token = srv->GetNamespace()->Get(args_[2]); if (token.Is()) { - *output = redis::NilString(); + *output = conn->NilString(); } else { *output = redis::BulkString(token.GetValue()); } @@ -155,7 +155,7 @@ class CommandKeys : public Commander { if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } - *output = redis::MultiBulkString(keys); + *output = conn->MultiBulkString(keys); return Status::OK(); } }; @@ -252,7 +252,7 @@ class CommandConfig : public Commander { } else if (args_.size() == 3 && sub_command == "get") { std::vector values; config->Get(args_[2], &values); - *output = redis::MultiBulkString(values); + *output = conn->MultiBulkString(values); } else if (args_.size() == 4 && sub_command == "set") { Status s = config->Set(srv, args_[2], args_[3]); if (!s.IsOK()) { @@ -302,7 +302,7 @@ class CommandDisk : public Commander { if (!s.ok()) { // Redis returns the Nil string when the key does not exist if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } return {Status::RedisExecErr, s.ToString()}; @@ -514,7 +514,7 @@ class CommandClient : public Commander { return Status::OK(); } else if (subcommand_ == "getname") { std::string name = conn->GetName(); - *output = name == "" ? redis::NilString() : redis::BulkString(name); + *output = name == "" ? conn->NilString() : redis::BulkString(name); return Status::OK(); } else if (subcommand_ == "id") { *output = redis::Integer(conn->GetID()); @@ -613,12 +613,14 @@ class CommandDebug : public Commander { *output += redis::Integer(i); } } else if (protocol_type_ == "true") { - *output = redis::Bool(conn->GetProtocolVersion(), true); + *output = conn->Bool(true); } else if (protocol_type_ == "false") { - *output = redis::Bool(conn->GetProtocolVersion(), false); + *output = conn->Bool(false); + } else if (protocol_type_ == "null") { + *output = conn->NilString(); } else { *output = - redis::Error("Wrong protocol type name. Please use one of the following: string|int|array|true|false"); + redis::Error("Wrong protocol type name. Please use one of the following: string|int|array|true|false|null"); } } else { return {Status::RedisInvalidCmd, "Unknown subcommand, should be DEBUG or PROTOCOL"}; @@ -668,7 +670,7 @@ class CommandCommand : public Commander { for (const auto &key_index : keys_indexes) { keys.emplace_back(args_[key_index + 2]); } - *output = redis::MultiBulkString(keys); + *output = conn->MultiBulkString(keys); } else { return {Status::RedisExecErr, "Command subcommand must be one of COUNT, GETKEYS, INFO"}; } @@ -807,7 +809,8 @@ class CommandScan : public CommandScanBase { return Commander::Parse(args); } - static std::string GenerateOutput(Server *srv, const std::vector &keys, const std::string &end_cursor) { + static std::string GenerateOutput(Server *srv, Connection *conn, const std::vector &keys, + const std::string &end_cursor) { std::vector list; if (!end_cursor.empty()) { list.emplace_back( @@ -816,7 +819,7 @@ class CommandScan : public CommandScanBase { list.emplace_back(redis::BulkString("0")); } - list.emplace_back(redis::MultiBulkString(keys, false)); + list.emplace_back(conn->MultiBulkString(keys, false)); return redis::Array(list); } @@ -831,7 +834,7 @@ class CommandScan : public CommandScanBase { if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } - *output = GenerateOutput(srv, keys, end_key); + *output = GenerateOutput(srv, conn, keys, end_key); return Status::OK(); } }; diff --git a/src/commands/cmd_set.cc b/src/commands/cmd_set.cc index f2c60a3ba66..0b5a8bda5a4 100644 --- a/src/commands/cmd_set.cc +++ b/src/commands/cmd_set.cc @@ -93,7 +93,7 @@ class CommandSMembers : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = redis::MultiBulkString(members, false); + *output = conn->MultiBulkString(members, false); return Status::OK(); } }; @@ -171,12 +171,12 @@ class CommandSPop : public Commander { } if (with_count_) { - *output = redis::MultiBulkString(members, false); + *output = conn->MultiBulkString(members, false); } else { if (members.size() > 0) { *output = redis::BulkString(members.front()); } else { - *output = redis::NilString(); + *output = conn->NilString(); } } return Status::OK(); @@ -211,7 +211,7 @@ class CommandSRandMember : public Commander { if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } - *output = redis::MultiBulkString(members, false); + *output = conn->MultiBulkString(members, false); return Status::OK(); } @@ -249,7 +249,7 @@ class CommandSDiff : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = redis::MultiBulkString(members, false); + *output = conn->MultiBulkString(members, false); return Status::OK(); } }; @@ -269,7 +269,7 @@ class CommandSUnion : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = redis::MultiBulkString(members, false); + *output = conn->MultiBulkString(members, false); return Status::OK(); } }; @@ -289,7 +289,7 @@ class CommandSInter : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = redis::MultiBulkString(members, false); + *output = conn->MultiBulkString(members, false); return Status::OK(); } }; @@ -432,7 +432,7 @@ class CommandSScan : public CommandSubkeyScanBase { return {Status::RedisExecErr, s.ToString()}; } - *output = CommandScanBase::GenerateOutput(srv, members, CursorType::kTypeSet); + *output = CommandScanBase::GenerateOutput(srv, conn, members, CursorType::kTypeSet); return Status::OK(); } }; diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 545d438de3f..4150ae513dd 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -153,7 +153,7 @@ class CommandXAdd : public Commander { } if (s.IsNotFound() && nomkstream_) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } @@ -464,17 +464,17 @@ class CommandXInfo : public Commander { if (info.first_entry) { output->append(redis::MultiLen(2)); output->append(redis::BulkString(info.first_entry->key)); - output->append(redis::MultiBulkString(info.first_entry->values)); + output->append(conn->MultiBulkString(info.first_entry->values)); } else { - output->append(redis::NilString()); + output->append(conn->NilString()); } output->append(redis::BulkString("last-entry")); if (info.last_entry) { output->append(redis::MultiLen(2)); output->append(redis::BulkString(info.last_entry->key)); - output->append(redis::MultiBulkString(info.last_entry->values)); + output->append(conn->MultiBulkString(info.last_entry->values)); } else { - output->append(redis::NilString()); + output->append(conn->NilString()); } } else { output->append(redis::BulkString("entries")); @@ -482,7 +482,7 @@ class CommandXInfo : public Commander { for (const auto &e : info.entries) { output->append(redis::MultiLen(2)); output->append(redis::BulkString(e.key)); - output->append(redis::MultiBulkString(e.values)); + output->append(conn->MultiBulkString(e.values)); } } @@ -514,13 +514,13 @@ class CommandXInfo : public Commander { output->append(redis::BulkString(it.second.last_delivered_id.ToString())); output->append(redis::BulkString("entries-read")); if (it.second.entries_read == -1) { - output->append(redis::NilString()); + output->append(conn->NilString()); } else { output->append(redis::Integer(it.second.entries_read)); } output->append(redis::BulkString("lag")); if (it.second.lag == UINT64_MAX) { - output->append(redis::NilString()); + output->append(conn->NilString()); } else { output->append(redis::Integer(it.second.lag)); } @@ -611,7 +611,7 @@ class CommandXRange : public Commander { Status Execute(Server *srv, Connection *conn, std::string *output) override { if (with_count_ && count_ == 0) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } @@ -637,7 +637,7 @@ class CommandXRange : public Commander { for (const auto &e : result) { output->append(redis::MultiLen(2)); output->append(redis::BulkString(e.key)); - output->append(redis::MultiBulkString(e.values)); + output->append(conn->MultiBulkString(e.values)); } return Status::OK(); @@ -704,7 +704,7 @@ class CommandXRevRange : public Commander { Status Execute(Server *srv, Connection *conn, std::string *output) override { if (with_count_ && count_ == 0) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } @@ -730,7 +730,7 @@ class CommandXRevRange : public Commander { for (const auto &e : result) { output->append(redis::MultiLen(2)); output->append(redis::BulkString(e.key)); - output->append(redis::MultiBulkString(e.values)); + output->append(conn->MultiBulkString(e.values)); } return Status::OK(); @@ -862,7 +862,7 @@ class CommandXRead : public Commander, if (block_ && results.empty()) { if (conn->IsInExec()) { - *output = redis::MultiLen(-1); + *output = conn_->NilArray(); return Status::OK(); // No blocking in multi-exec } @@ -870,14 +870,14 @@ class CommandXRead : public Commander, } if (!block_ && results.empty()) { - *output = redis::MultiLen(-1); + *output = conn_->NilArray(); return Status::OK(); } - return SendResults(output, results); + return SendResults(conn, output, results); } - static Status SendResults(std::string *output, const std::vector &results) { + static Status SendResults(Connection *conn, std::string *output, const std::vector &results) { output->append(redis::MultiLen(results.size())); for (const auto &result : results) { @@ -887,7 +887,7 @@ class CommandXRead : public Commander, for (const auto &entry : result.entries) { output->append(redis::MultiLen(2)); output->append(redis::BulkString(entry.key)); - output->append(redis::MultiBulkString(entry.values)); + output->append(conn->MultiBulkString(entry.values)); } } @@ -973,13 +973,13 @@ class CommandXRead : public Commander, } if (results.empty()) { - conn_->Reply(redis::MultiLen(-1)); + conn_->Reply(conn_->NilArray()); } - SendReply(results); + SendReply(conn_, results); } - void SendReply(const std::vector &results) { + void SendReply(Connection *conn, const std::vector &results) { std::string output; output.append(redis::MultiLen(results.size())); @@ -991,7 +991,7 @@ class CommandXRead : public Commander, for (const auto &entry : result.entries) { output.append(redis::MultiLen(2)); output.append(redis::BulkString(entry.key)); - output.append(redis::MultiBulkString(entry.values)); + output.append(conn->MultiBulkString(entry.values)); } } @@ -1009,7 +1009,7 @@ class CommandXRead : public Commander, } void TimerCB(int, int16_t events) { - conn_->Reply(redis::NilString()); + conn_->Reply(conn_->NilString()); timer_.reset(); diff --git a/src/commands/cmd_string.cc b/src/commands/cmd_string.cc index a0e1a690b5c..debf5e3fb7b 100644 --- a/src/commands/cmd_string.cc +++ b/src/commands/cmd_string.cc @@ -54,7 +54,7 @@ class CommandGet : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value); + *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value); return Status::OK(); } }; @@ -101,7 +101,7 @@ class CommandGetEx : public Commander { return {Status::RedisExecErr, s.ToString()}; } - *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(value); + *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(value); return Status::OK(); } @@ -142,7 +142,7 @@ class CommandGetSet : public Commander { if (old_value.has_value()) { *output = redis::BulkString(old_value.value()); } else { - *output = redis::NilString(); + *output = conn->NilString(); } return Status::OK(); } @@ -159,7 +159,7 @@ class CommandGetDel : public Commander { } if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); } else { *output = redis::BulkString(value); } @@ -190,7 +190,7 @@ class CommandGetRange : public Commander { } if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } @@ -199,7 +199,7 @@ class CommandGetRange : public Commander { if (start_ < 0) start_ = 0; if (stop_ > static_cast(value.size())) stop_ = static_cast(value.size()); if (start_ > stop_) { - *output = redis::NilString(); + *output = conn->NilString(); } else { *output = redis::BulkString(value.substr(start_, stop_ - start_ + 1)); } @@ -255,7 +255,7 @@ class CommandMGet : public Commander { std::vector values; // always return OK auto statuses = string_db.MGet(keys, &values); - *output = redis::MultiBulkString(values, statuses); + *output = conn->MultiBulkString(values, statuses); return Status::OK(); } }; @@ -323,13 +323,13 @@ class CommandSet : public Commander { if (ret.has_value()) { *output = redis::BulkString(ret.value()); } else { - *output = redis::NilString(); + *output = conn->NilString(); } } else { if (ret.has_value()) { *output = redis::SimpleString("OK"); } else { - *output = redis::NilString(); + *output = conn->NilString(); } } return Status::OK(); diff --git a/src/commands/cmd_txn.cc b/src/commands/cmd_txn.cc index c99c6ddc80f..fa1a47aadae 100644 --- a/src/commands/cmd_txn.cc +++ b/src/commands/cmd_txn.cc @@ -73,7 +73,7 @@ class CommandExec : public Commander { } if (srv->IsWatchedKeysModified(conn)) { - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index f1160274a6b..17d78bda798 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -83,7 +83,7 @@ class CommandZAdd : public Commander { auto new_score = member_scores_[0].score; if ((flags_.HasNX() || flags_.HasXX() || flags_.HasLT() || flags_.HasGT()) && old_score == new_score && ret == 0) { // not the first time using incr && score not changed - *output = redis::NilString(); + *output = conn->NilString(); return Status::OK(); } @@ -336,7 +336,7 @@ class CommandBZPop : public BlockingCommander { return StartBlocking(timeout_, output); } - std::string NoopReply() override { return redis::MultiLen(-1); } + std::string NoopReply(const Connection *conn) override { return conn->NilArray(); } void BlockKeys() override { for (const auto &key : keys_) { @@ -454,7 +454,7 @@ class CommandZMPop : public Commander { SendMembersWithScoresForZMpop(conn, user_key, member_scores); return Status::OK(); } - *output = redis::MultiLen(-1); + *output = conn->NilArray(); return Status::OK(); } @@ -538,7 +538,7 @@ class CommandBZMPop : public BlockingCommander { } } - std::string NoopReply() override { return redis::NilString(); } + std::string NoopReply(const Connection *conn) override { return conn->NilString(); } bool OnBlockingWrite() override { std::string user_key; @@ -797,14 +797,14 @@ class CommandZRangeGeneric : public Commander { break; case kZRangeScore: if (score_spec_.count == 0) { - *output = redis::MultiBulkString({}); + *output = conn->MultiBulkString({}); return Status::OK(); } s = zset_db.RangeByScore(key_, score_spec_, &member_scores, nullptr); break; case kZRangeLex: if (lex_spec_.count == 0) { - *output = redis::MultiBulkString({}); + *output = conn->MultiBulkString({}); return Status::OK(); } s = zset_db.RangeByLex(key_, lex_spec_, &member_scores, nullptr); @@ -896,9 +896,9 @@ class CommandZRank : public Commander { if (rank == -1) { if (with_score_) { - output->append(redis::MultiLen(-1)); + output->append(conn->NilArray()); } else { - *output = redis::NilString(); + *output = conn->NilString(); } } else { if (with_score_) { @@ -1045,7 +1045,7 @@ class CommandZScore : public Commander { } if (s.IsNotFound()) { - *output = redis::NilString(); + *output = conn->NilString(); } else { *output = redis::BulkString(util::Float2String(score)); } @@ -1080,7 +1080,7 @@ class CommandZMScore : public Commander { } } } - *output = redis::MultiBulkString(values); + *output = conn->MultiBulkString(values); return Status::OK(); } }; @@ -1355,7 +1355,7 @@ class CommandZScan : public CommandSubkeyScanBase { for (const auto &score : scores) { score_strings.emplace_back(util::Float2String(score)); } - *output = GenerateOutput(srv, members, score_strings, CursorType::kTypeZSet); + *output = GenerateOutput(srv, conn, members, score_strings, CursorType::kTypeZSet); return Status::OK(); } }; diff --git a/src/commands/commander.cc b/src/commands/commander.cc index 024db6144ed..6ac10581606 100644 --- a/src/commands/commander.cc +++ b/src/commands/commander.cc @@ -68,7 +68,7 @@ void CommandTable::GetCommandsInfo(std::string *info, const std::vectorappend(redis::NilString()); + info->append(NilString(RESP::v2)); } else { auto command_attribute = cmd_iter->second; auto command_info = GetCommandInfo(command_attribute); diff --git a/src/commands/scan_base.h b/src/commands/scan_base.h index 0bfb188b481..dd0f605f02a 100644 --- a/src/commands/scan_base.h +++ b/src/commands/scan_base.h @@ -64,7 +64,8 @@ class CommandScanBase : public Commander { } } - std::string GenerateOutput(Server *srv, const std::vector &keys, CursorType cursor_type) const { + std::string GenerateOutput(Server *srv, Connection *conn, const std::vector &keys, + CursorType cursor_type) const { std::vector list; if (keys.size() == static_cast(limit_)) { auto end_cursor = srv->GenerateCursorFromKeyName(keys.back(), cursor_type); @@ -73,7 +74,7 @@ class CommandScanBase : public Commander { list.emplace_back(redis::BulkString("0")); } - list.emplace_back(redis::MultiBulkString(keys, false)); + list.emplace_back(conn->MultiBulkString(keys, false)); return redis::Array(list); } @@ -111,7 +112,7 @@ class CommandSubkeyScanBase : public CommandScanBase { return Commander::Parse(args); } - std::string GenerateOutput(Server *srv, const std::vector &fields, + std::string GenerateOutput(Server *srv, Connection *conn, const std::vector &fields, const std::vector &values, CursorType cursor_type) { std::vector list; auto items_count = fields.size(); @@ -128,7 +129,7 @@ class CommandSubkeyScanBase : public CommandScanBase { fvs.emplace_back(values[i]); } } - list.emplace_back(redis::MultiBulkString(fvs, false)); + list.emplace_back(conn->MultiBulkString(fvs, false)); return redis::Array(list); } diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index d6e0b5f6749..87370ffa921 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -130,6 +130,39 @@ void Connection::Reply(const std::string &msg) { redis::Reply(bufferevent_get_output(bev_), msg); } +std::string Connection::Bool(bool b) const { + if (protocol_version_ == RESP::v3) { + return b ? "#t" CRLF : "#f" CRLF; + } + return Integer(b ? 1 : 0); +} + +std::string Connection::MultiBulkString(const std::vector &values, + bool output_nil_for_empty_string) const { + std::string result = "*" + std::to_string(values.size()) + CRLF; + for (const auto &value : values) { + if (value.empty() && output_nil_for_empty_string) { + result += NilString(); + } else { + result += BulkString(value); + } + } + return result; +} + +std::string Connection::MultiBulkString(const std::vector &values, + const std::vector &statuses) const { + std::string result = "*" + std::to_string(values.size()) + CRLF; + for (size_t i = 0; i < values.size(); i++) { + if (i < statuses.size() && !statuses[i].ok()) { + result += NilString(); + } else { + result += BulkString(values[i]); + } + } + return result; +} + void Connection::SendFile(int fd) { // NOTE: we don't need to close the fd, the libevent will do that auto output = bufferevent_get_output(bev_); diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h index 34fbcbae9fa..f35a3889c68 100644 --- a/src/server/redis_connection.h +++ b/src/server/redis_connection.h @@ -64,6 +64,12 @@ class Connection : public EvbufCallbackBase { void Reply(const std::string &msg); RESP GetProtocolVersion() const { return protocol_version_; } void SetProtocolVersion(RESP version) { protocol_version_ = version; } + std::string Bool(bool b) const; + std::string NilString() const { return redis::NilString(protocol_version_); } + std::string NilArray() const { return protocol_version_ == RESP::v3 ? "_" CRLF : "*-1" CRLF; } + std::string MultiBulkString(const std::vector &values, bool output_nil_for_empty_string = true) const; + std::string MultiBulkString(const std::vector &values, + const std::vector &statuses) const; using UnsubscribeCallback = std::function; void SubscribeChannel(const std::string &channel); diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc index 768db4ebf59..a670ce1a27d 100644 --- a/src/server/redis_reply.cc +++ b/src/server/redis_reply.cc @@ -32,32 +32,6 @@ std::string Error(const std::string &err) { return "-" + err + CRLF; } std::string BulkString(const std::string &data) { return "$" + std::to_string(data.length()) + CRLF + data + CRLF; } -std::string NilString() { return "$-1" CRLF; } - -std::string MultiBulkString(const std::vector &values, bool output_nil_for_empty_string) { - std::string result = "*" + std::to_string(values.size()) + CRLF; - for (const auto &value : values) { - if (value.empty() && output_nil_for_empty_string) { - result += NilString(); - } else { - result += BulkString(value); - } - } - return result; -} - -std::string MultiBulkString(const std::vector &values, const std::vector &statuses) { - std::string result = "*" + std::to_string(values.size()) + CRLF; - for (size_t i = 0; i < values.size(); i++) { - if (i < statuses.size() && !statuses[i].ok()) { - result += NilString(); - } else { - result += BulkString(values[i]); - } - } - return result; -} - std::string Array(const std::vector &list) { size_t n = std::accumulate(list.begin(), list.end(), 0, [](size_t n, const std::string &s) { return n + s.size(); }); std::string result = "*" + std::to_string(list.size()) + CRLF; @@ -67,6 +41,12 @@ std::string Array(const std::vector &list) { return result; } -std::string Command2RESP(const std::vector &cmd_args) { return MultiBulkString(cmd_args, false); } +std::string Array2RESP(const std::vector &elems) { + std::string result = "*" + std::to_string(elems.size()) + CRLF; + for (const auto &elem : elems) { + result += BulkString(elem); + } + return result; +} } // namespace redis diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h index e23380ccb01..083ccea2d38 100644 --- a/src/server/redis_reply.h +++ b/src/server/redis_reply.h @@ -21,10 +21,8 @@ #pragma once #include -#include #include -#include #include #define CRLF "\r\n" // NOLINT @@ -42,15 +40,14 @@ std::string Integer(T data) { return ":" + std::to_string(data) + CRLF; } -inline std::string Bool(const RESP ver, const bool b) { +inline std::string NilString(const RESP ver) { if (ver == RESP::v3) { - return b ? "#t" CRLF : "#f" CRLF; + return "_" CRLF; } - return Integer(b ? 1 : 0); + return "$-1" CRLF; } std::string BulkString(const std::string &data); -std::string NilString(); template , int> = 0> std::string MultiLen(T len) { @@ -58,8 +55,6 @@ std::string MultiLen(T len) { } std::string Array(const std::vector &list); -std::string MultiBulkString(const std::vector &values, bool output_nil_for_empty_string = true); -std::string MultiBulkString(const std::vector &values, const std::vector &statuses); -std::string Command2RESP(const std::vector &cmd_args); +std::string Array2RESP(const std::vector &elements); } // namespace redis diff --git a/src/server/server.cc b/src/server/server.cc index efe721b27ba..2e98d6e1512 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -1024,7 +1024,7 @@ void Server::GetRoleInfo(std::string *info) { roles.emplace_back("connecting"); } roles.emplace_back(std::to_string(storage->LatestSeqNumber())); - *info = redis::MultiBulkString(roles); + *info = redis::Array2RESP(roles); } else { std::vector list; @@ -1032,7 +1032,7 @@ void Server::GetRoleInfo(std::string *info) { for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; - list.emplace_back(redis::MultiBulkString({ + list.emplace_back(redis::Array2RESP({ slave->GetConn()->GetAnnounceIP(), std::to_string(slave->GetConn()->GetListeningPort()), std::to_string(slave->GetCurrentReplSeq()), diff --git a/src/stats/log_collector.cc b/src/stats/log_collector.cc index 1d3f9f405a0..6785a04a4c7 100644 --- a/src/stats/log_collector.cc +++ b/src/stats/log_collector.cc @@ -32,7 +32,7 @@ std::string SlowEntry::ToRedisString() const { output.append(redis::Integer(id)); output.append(redis::Integer(time)); output.append(redis::Integer(duration)); - output.append(redis::MultiBulkString(args)); + output.append(redis::Array2RESP(args)); output.append(redis::BulkString(ip + ":" + std::to_string(port))); output.append(redis::BulkString(client_name)); return output; diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index db7174d0839..021527d0446 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -63,10 +63,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic if (metadata.Type() == kRedisString) { command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))}; - resp_commands_[ns].emplace_back(redis::Command2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); if (metadata.expire > 0) { command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)}; - resp_commands_[ns].emplace_back(redis::Command2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); } } else if (metadata.expire > 0) { auto args = log_data_.GetArguments(); @@ -80,7 +80,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic auto cmd = static_cast(*parse_result); if (cmd == kRedisCmdExpire) { command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)}; - resp_commands_[ns].emplace_back(redis::Command2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); } } } @@ -103,7 +103,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic std::to_string(stream_metadata.entries_added), "MAXDELETEDID", stream_metadata.max_deleted_entry_id.ToString()}; - resp_commands_[ns].emplace_back(redis::Command2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); } return rocksdb::Status::OK(); @@ -262,7 +262,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic } if (!command_args.empty()) { - resp_commands_[ns].emplace_back(redis::Command2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); } return rocksdb::Status::OK(); @@ -387,7 +387,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S } if (!command_args.empty()) { - resp_commands_[ns].emplace_back(redis::Command2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); } return rocksdb::Status::OK(); diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc index b8fbc02a228..a6e73fa040d 100644 --- a/src/storage/scripting.cc +++ b/src/storage/scripting.cc @@ -1086,9 +1086,9 @@ std::string ReplyToRedisReply(redis::Connection *conn, lua_State *lua) { break; case LUA_TBOOLEAN: if (conn->GetProtocolVersion() == redis::RESP::v2) { - output = lua_toboolean(lua, -1) ? redis::Integer(1) : redis::NilString(); + output = lua_toboolean(lua, -1) ? redis::Integer(1) : conn->NilString(); } else { - output = redis::Bool(redis::RESP::v3, lua_toboolean(lua, -1)); + output = conn->Bool(lua_toboolean(lua, -1)); } break; case LUA_TNUMBER: @@ -1138,7 +1138,7 @@ std::string ReplyToRedisReply(redis::Connection *conn, lua_State *lua) { } break; default: - output = redis::NilString(); + output = conn->NilString(); } return output; } diff --git a/tests/cppunit/string_reply_test.cc b/tests/cppunit/string_reply_test.cc index 80e74972119..5cef14a38b4 100644 --- a/tests/cppunit/string_reply_test.cc +++ b/tests/cppunit/string_reply_test.cc @@ -39,7 +39,7 @@ class StringReplyTest : public testing::Test { std::vector StringReplyTest::values; TEST_F(StringReplyTest, MultiBulkString) { - std::string result = redis::MultiBulkString(values); + std::string result = redis::Array2RESP(values); ASSERT_EQ(result.length(), 13 * 10 + 14 * 90 + 15 * 900 + 17 * 9000 + 18 * 90000 + 9); } diff --git a/tests/gocase/unit/debug/debug_test.go b/tests/gocase/unit/debug/debug_test.go index 7221d830d3c..6b65ad8c6b3 100644 --- a/tests/gocase/unit/debug/debug_test.go +++ b/tests/gocase/unit/debug/debug_test.go @@ -52,6 +52,9 @@ func TestDebugProtocolV2(t *testing.T) { require.NoError(t, r.Err()) require.EqualValues(t, expectedValue, r.Val()) } + + r := rdb.Do(ctx, "DEBUG", "PROTOCOL", "null") + require.EqualError(t, r.Err(), redis.Nil.Error()) }) t.Run("lua script return value type", func(t *testing.T) { @@ -90,6 +93,8 @@ func TestDebugProtocolV3(t *testing.T) { require.NoError(t, r.Err()) require.EqualValues(t, expectedValue, r.Val()) } + r := rdb.Do(ctx, "DEBUG", "PROTOCOL", "null") + require.EqualError(t, r.Err(), redis.Nil.Error()) }) t.Run("lua script return value type", func(t *testing.T) { diff --git a/tests/gocase/unit/protocol/protocol_test.go b/tests/gocase/unit/protocol/protocol_test.go index 9ba40bec46b..7896cf00548 100644 --- a/tests/gocase/unit/protocol/protocol_test.go +++ b/tests/gocase/unit/protocol/protocol_test.go @@ -137,3 +137,101 @@ func TestProtocolNetwork(t *testing.T) { c.MustRead(t, "+string") }) } + +func TestProtocolRESP2(t *testing.T) { + srv := util.StartServer(t, map[string]string{ + "resp3-enabled": "no", + }) + defer srv.Close() + + c := srv.NewTCPClient() + defer func() { + require.NoError(t, c.Close()) + }() + + t.Run("debug protocol string", func(t *testing.T) { + types := map[string][]string{ + "string": {"$11", "Hello World"}, + "integer": {":12345"}, + "array": {"*3", ":0", ":1", ":2"}, + "true": {":1"}, + "false": {":0"}, + "null": {"$-1"}, + } + for typ, expected := range types { + args := []string{"DEBUG", "PROTOCOL", typ} + require.NoError(t, c.WriteArgs(args...)) + for _, line := range expected { + c.MustRead(t, line) + } + } + }) + + t.Run("multi bulk strings with null string", func(t *testing.T) { + require.NoError(t, c.WriteArgs("HSET", "hash", "f1", "v1")) + c.MustRead(t, ":1") + + require.NoError(t, c.WriteArgs("HMGET", "hash", "f1", "f2")) + c.MustRead(t, "*2") + c.MustRead(t, "$2") + c.MustRead(t, "v1") + c.MustRead(t, "$-1") + }) + + t.Run("null array", func(t *testing.T) { + require.NoError(t, c.WriteArgs("ZRANK", "no-exists-zset", "m0", "WITHSCORE")) + c.MustRead(t, "*-1") + }) +} + +func TestProtocolRESP3(t *testing.T) { + srv := util.StartServer(t, map[string]string{ + "resp3-enabled": "yes", + }) + defer srv.Close() + + c := srv.NewTCPClient() + defer func() { + require.NoError(t, c.Close()) + }() + + t.Run("debug protocol string", func(t *testing.T) { + require.NoError(t, c.WriteArgs("HELLO", "3")) + values := []string{"*6", "$6", "server", "$5", "redis", "$5", "proto", ":3", "$4", "mode", "$10", "standalone"} + for _, line := range values { + c.MustRead(t, line) + } + + types := map[string][]string{ + "string": {"$11", "Hello World"}, + "integer": {":12345"}, + "array": {"*3", ":0", ":1", ":2"}, + "true": {"#t"}, + "false": {"#f"}, + "null": {"_"}, + } + for typ, expected := range types { + args := []string{"DEBUG", "PROTOCOL", typ} + require.NoError(t, c.WriteArgs(args...)) + for _, line := range expected { + c.MustRead(t, line) + } + } + }) + + t.Run("multi bulk strings with null", func(t *testing.T) { + require.NoError(t, c.WriteArgs("HSET", "hash", "f1", "v1")) + c.MustRead(t, ":1") + + require.NoError(t, c.WriteArgs("HMGET", "hash", "f1", "f2")) + c.MustRead(t, "*2") + c.MustRead(t, "$2") + c.MustRead(t, "v1") + c.MustRead(t, "_") + }) + + t.Run("null array", func(t *testing.T) { + require.NoError(t, c.WriteArgs("ZRANK", "no-exists-zset", "m0", "WITHSCORE")) + c.MustRead(t, "_") + }) +} diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc index 86b3e1a5540..0c29cecb599 100644 --- a/utils/kvrocks2redis/parser.cc +++ b/utils/kvrocks2redis/parser.cc @@ -67,12 +67,12 @@ Status Parser::parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t e auto [ns, user_key] = ExtractNamespaceKey(ns_key, slot_id_encoded_); auto command = - redis::Command2RESP({"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))}); + redis::Array2RESP({"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))}); Status s = writer_->Write(ns, {command}); if (!s.IsOK()) return s; if (expire > 0) { - command = redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire / 1000)}); + command = redis::Array2RESP({"EXPIREAT", user_key, std::to_string(expire / 1000)}); s = writer_->Write(ns, {command}); } @@ -105,17 +105,17 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) { std::string value = iter->value().ToString(); switch (type) { case kRedisHash: - output = redis::Command2RESP({"HSET", user_key, sub_key, value}); + output = redis::Array2RESP({"HSET", user_key, sub_key, value}); break; case kRedisSet: - output = redis::Command2RESP({"SADD", user_key, sub_key}); + output = redis::Array2RESP({"SADD", user_key, sub_key}); break; case kRedisList: - output = redis::Command2RESP({"RPUSH", user_key, value}); + output = redis::Array2RESP({"RPUSH", user_key, value}); break; case kRedisZSet: { double score = DecodeDouble(value.data()); - output = redis::Command2RESP({"ZADD", user_key, util::Float2String(score), sub_key}); + output = redis::Array2RESP({"ZADD", user_key, util::Float2String(score), sub_key}); break; } case kRedisBitmap: { @@ -126,7 +126,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) { } case kRedisSortedint: { std::string val = std::to_string(DecodeFixed64(ikey.GetSubKey().data())); - output = redis::Command2RESP({"ZADD", user_key, val, val}); + output = redis::Array2RESP({"ZADD", user_key, val, val}); break; } default: @@ -140,7 +140,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) { } if (metadata.expire > 0) { - output = redis::Command2RESP({"EXPIREAT", user_key, std::to_string(metadata.expire / 1000)}); + output = redis::Array2RESP({"EXPIREAT", user_key, std::to_string(metadata.expire / 1000)}); Status s = writer_->Write(ns, {output}); if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to AOF"); } @@ -158,7 +158,7 @@ Status Parser::parseBitmapSegment(const Slice &ns, const Slice &user_key, int in s = writer_->Write( ns.ToString(), - {redis::Command2RESP({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})}); + {redis::Array2RESP({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})}); if (!s.IsOK()) return s.Prefixed("failed to write SETBIT command to AOF"); } } diff --git a/utils/kvrocks2redis/redis_writer.cc b/utils/kvrocks2redis/redis_writer.cc index 27e7d1d82bf..3cb512bd105 100644 --- a/utils/kvrocks2redis/redis_writer.cc +++ b/utils/kvrocks2redis/redis_writer.cc @@ -72,7 +72,7 @@ Status RedisWriter::FlushDB(const std::string &ns) { return s; } - s = Write(ns, {redis::Command2RESP({"FLUSHDB"})}); + s = Write(ns, {redis::Array2RESP({"FLUSHDB"})}); if (!s.IsOK()) return s; return Status::OK(); From 65ce10fa5528c91375a7a43d013c250a7f882c48 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Mon, 15 Jan 2024 09:08:14 +0800 Subject: [PATCH 2/5] Fix test cases --- src/commands/cmd_stream.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 4150ae513dd..f82497fce4d 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -862,7 +862,7 @@ class CommandXRead : public Commander, if (block_ && results.empty()) { if (conn->IsInExec()) { - *output = conn_->NilArray(); + *output = conn->NilArray(); return Status::OK(); // No blocking in multi-exec } @@ -870,7 +870,7 @@ class CommandXRead : public Commander, } if (!block_ && results.empty()) { - *output = conn_->NilArray(); + *output = conn->NilArray(); return Status::OK(); } @@ -976,10 +976,10 @@ class CommandXRead : public Commander, conn_->Reply(conn_->NilArray()); } - SendReply(conn_, results); + SendReply(results); } - void SendReply(Connection *conn, const std::vector &results) { + void SendReply(const std::vector &results) { std::string output; output.append(redis::MultiLen(results.size())); @@ -991,7 +991,7 @@ class CommandXRead : public Commander, for (const auto &entry : result.entries) { output.append(redis::MultiLen(2)); output.append(redis::BulkString(entry.key)); - output.append(conn->MultiBulkString(entry.values)); + output.append(conn_->MultiBulkString(entry.values)); } } From 4bb5f44386760eff6b58b8e9b7616845a41234ad Mon Sep 17 00:00:00 2001 From: git-hulk Date: Mon, 15 Jan 2024 10:42:53 +0800 Subject: [PATCH 3/5] Add const modifier to the connection parameter --- src/commands/cmd_geo.cc | 4 ++-- src/commands/cmd_pubsub.cc | 4 ++-- src/commands/cmd_server.cc | 2 +- src/commands/scan_base.h | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/commands/cmd_geo.cc b/src/commands/cmd_geo.cc index 991d16c5ed3..0f4d98ebf25 100644 --- a/src/commands/cmd_geo.cc +++ b/src/commands/cmd_geo.cc @@ -319,7 +319,7 @@ class CommandGeoRadius : public CommandGeoBase { return Status::OK(); } - std::string GenerateOutput(Connection *conn, const std::vector &geo_points) { + std::string GenerateOutput(const Connection *conn, const std::vector &geo_points) { int result_length = static_cast(geo_points.size()); int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_; std::vector list; @@ -496,7 +496,7 @@ class CommandGeoSearch : public CommandGeoBase { return Status::OK(); } - std::string generateOutput(Connection *conn, const std::vector &geo_points) { + std::string generateOutput(const Connection *conn, const std::vector &geo_points) { int result_length = static_cast(geo_points.size()); int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_; std::vector output; diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc index 32065c9842a..8f0ddfbd9c0 100644 --- a/src/commands/cmd_pubsub.cc +++ b/src/commands/cmd_pubsub.cc @@ -73,8 +73,8 @@ class CommandMPublish : public Commander { } }; -void SubscribeCommandReply(Connection *conn, std::string *output, const std::string &name, const std::string &sub_name, - int num) { +void SubscribeCommandReply(const Connection *conn, std::string *output, const std::string &name, + const std::string &sub_name, int num) { output->append(redis::MultiLen(3)); output->append(redis::BulkString(name)); output->append(sub_name.empty() ? conn->NilString() : BulkString(sub_name)); diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index f4b4c99860a..71781cd5182 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -809,7 +809,7 @@ class CommandScan : public CommandScanBase { return Commander::Parse(args); } - static std::string GenerateOutput(Server *srv, Connection *conn, const std::vector &keys, + static std::string GenerateOutput(Server *srv, const Connection *conn, const std::vector &keys, const std::string &end_cursor) { std::vector list; if (!end_cursor.empty()) { diff --git a/src/commands/scan_base.h b/src/commands/scan_base.h index dd0f605f02a..bab86e218a7 100644 --- a/src/commands/scan_base.h +++ b/src/commands/scan_base.h @@ -64,7 +64,7 @@ class CommandScanBase : public Commander { } } - std::string GenerateOutput(Server *srv, Connection *conn, const std::vector &keys, + std::string GenerateOutput(Server *srv, const Connection *conn, const std::vector &keys, CursorType cursor_type) const { std::vector list; if (keys.size() == static_cast(limit_)) { @@ -112,7 +112,7 @@ class CommandSubkeyScanBase : public CommandScanBase { return Commander::Parse(args); } - std::string GenerateOutput(Server *srv, Connection *conn, const std::vector &fields, + std::string GenerateOutput(Server *srv, const Connection *conn, const std::vector &fields, const std::vector &values, CursorType cursor_type) { std::vector list; auto items_count = fields.size(); From e239e83d55dd2ece43f8b1cd8cbd2a5e44653b42 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Mon, 15 Jan 2024 16:00:15 +0800 Subject: [PATCH 4/5] Use conn->NilString in the new command --- src/commands/cmd_zset.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index 17d78bda798..acddad82ddf 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -1407,9 +1407,9 @@ class CommandZRandMember : public Commander { } if (no_parameters_) - *output = s.IsNotFound() ? redis::NilString() : redis::BulkString(result_entries[0]); + *output = s.IsNotFound() ? conn->NilString() : redis::BulkString(result_entries[0]); else - *output = redis::MultiBulkString(result_entries, false); + *output = conn->MultiBulkString(result_entries, false); return Status::OK(); } From 5a1e1da2a730259de2d0a9851c95db9fa92bd41f Mon Sep 17 00:00:00 2001 From: git-hulk Date: Mon, 15 Jan 2024 17:45:57 +0800 Subject: [PATCH 5/5] Rename Array2RESP to ArrayOfBulkStrings --- src/cluster/replication.cc | 16 ++++++++-------- src/cluster/slot_migrate.cc | 25 +++++++++++++------------ src/server/redis_reply.cc | 2 +- src/server/redis_reply.h | 2 +- src/server/server.cc | 4 ++-- src/stats/log_collector.cc | 2 +- src/storage/batch_extractor.cc | 12 ++++++------ tests/cppunit/string_reply_test.cc | 2 +- utils/kvrocks2redis/parser.cc | 18 +++++++++--------- utils/kvrocks2redis/redis_writer.cc | 2 +- 10 files changed, 43 insertions(+), 42 deletions(-) diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index ba3afdf0b1d..51e536c781d 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -396,7 +396,7 @@ void ReplicationThread::run() { } ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) { - SendString(bev, redis::Array2RESP({"AUTH", srv_->GetConfig()->masterauth})); + SendString(bev, redis::ArrayOfBulkStrings({"AUTH", srv_->GetConfig()->masterauth})); LOG(INFO) << "[replication] Auth request was sent, waiting for response"; repl_state_.store(kReplSendAuth, std::memory_order_relaxed); return CBState::NEXT; @@ -418,7 +418,7 @@ ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) { // } ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent *bev) { - SendString(bev, redis::Array2RESP({"_db_name"})); + SendString(bev, redis::ArrayOfBulkStrings({"_db_name"})); repl_state_.store(kReplCheckDBName, std::memory_order_relaxed); LOG(INFO) << "[replication] Check db name request was sent, waiting for response"; return CBState::NEXT; @@ -456,7 +456,7 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev) data_to_send.emplace_back("ip-address"); data_to_send.emplace_back(config->replica_announce_ip); } - SendString(bev, redis::Array2RESP(data_to_send)); + SendString(bev, redis::ArrayOfBulkStrings(data_to_send)); repl_state_.store(kReplReplConf, std::memory_order_relaxed); LOG(INFO) << "[replication] replconf request was sent, waiting for response"; return CBState::NEXT; @@ -513,11 +513,11 @@ ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev) // Also use old PSYNC if replica can't find replication id from WAL and DB. if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || replid.length() != kReplIdLength) { next_try_old_psync_ = false; // Reset next_try_old_psync_ - SendString(bev, redis::Array2RESP({"PSYNC", std::to_string(next_seq)})); + SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", std::to_string(next_seq)})); LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq; } else { // NEW PSYNC "Unique Replication Sequence ID": replication id and sequence id - SendString(bev, redis::Array2RESP({"PSYNC", replid, std::to_string(next_seq)})); + SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid, std::to_string(next_seq)})); LOG(INFO) << "[replication] Try to use new psync, current unique replication sequence id: " << replid << ":" << cur_seq; } @@ -607,7 +607,7 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent * } ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent *bev) { - SendString(bev, redis::Array2RESP({"_fetch_meta"})); + SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"})); repl_state_.store(kReplFetchMeta, std::memory_order_relaxed); LOG(INFO) << "[replication] Start syncing data with fullsync"; return CBState::NEXT; @@ -835,7 +835,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) { std::string auth = srv_->GetConfig()->masterauth; if (!auth.empty()) { UniqueEvbuf evbuf; - const auto auth_command = redis::Array2RESP({"AUTH", auth}); + const auto auth_command = redis::ArrayOfBulkStrings({"AUTH", auth}); auto s = util::SockSend(sock_fd, auth_command, ssl); if (!s.IsOK()) return s.Prefixed("send auth command err"); while (true) { @@ -921,7 +921,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const } files_str.pop_back(); - const auto fetch_command = redis::Array2RESP({"_fetch_file", files_str}); + const auto fetch_command = redis::ArrayOfBulkStrings({"_fetch_file", files_str}); auto s = util::SockSend(sock_fd, fetch_command, ssl); if (!s.IsOK()) return s.Prefixed("send fetch file command"); diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index bb3334cd391..424d0b4ca5a 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -438,7 +438,7 @@ void SlotMigrator::clean() { } Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) { - std::string cmd = redis::Array2RESP({"auth", password}); + std::string cmd = redis::ArrayOfBulkStrings({"auth", password}); auto s = util::SockSend(sock_fd, cmd); if (!s.IsOK()) { return s.Prefixed("failed to send AUTH command"); @@ -455,7 +455,8 @@ Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) { Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) { if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"}; - std::string cmd = redis::Array2RESP({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)}); + std::string cmd = + redis::ArrayOfBulkStrings({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)}); auto s = util::SockSend(sock_fd, cmd); if (!s.IsOK()) { return s.Prefixed("failed to send command to the destination node"); @@ -665,7 +666,7 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata command.emplace_back("PXAT"); command.emplace_back(std::to_string(metadata.expire)); } - *restore_cmds += redis::Array2RESP(command); + *restore_cmds += redis::ArrayOfBulkStrings(command); current_pipeline_size_++; // Check whether pipeline needs to be sent @@ -746,7 +747,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata if (metadata.Type() != kRedisBitmap) { item_count++; if (item_count >= kMaxItemsInCommand) { - *restore_cmds += redis::Array2RESP(user_cmd); + *restore_cmds += redis::ArrayOfBulkStrings(user_cmd); current_pipeline_size_++; item_count = 0; // Have to clear saved items @@ -763,13 +764,13 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata // Have to check the item count of the last command list if (item_count % kMaxItemsInCommand != 0) { - *restore_cmds += redis::Array2RESP(user_cmd); + *restore_cmds += redis::ArrayOfBulkStrings(user_cmd); current_pipeline_size_++; } // Add TTL for complex key if (metadata.expire > 0) { - *restore_cmds += redis::Array2RESP({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}); + *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}); current_pipeline_size_++; } @@ -808,7 +809,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad if (!s.IsOK()) { return s; } - *restore_cmds += redis::Array2RESP(user_cmd); + *restore_cmds += redis::ArrayOfBulkStrings(user_cmd); current_pipeline_size_++; user_cmd.erase(user_cmd.begin() + 2, user_cmd.end()); @@ -821,14 +822,14 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad // commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration // XSETID is used to adjust stream's info on the destination node according to the current values on the source - *restore_cmds += redis::Array2RESP({"XSETID", key.ToString(), metadata.last_generated_id.ToString(), "ENTRIESADDED", - std::to_string(metadata.entries_added), "MAXDELETEDID", - metadata.max_deleted_entry_id.ToString()}); + *restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), metadata.last_generated_id.ToString(), + "ENTRIESADDED", std::to_string(metadata.entries_added), "MAXDELETEDID", + metadata.max_deleted_entry_id.ToString()}); current_pipeline_size_++; // Add TTL if (metadata.expire > 0) { - *restore_cmds += redis::Array2RESP({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}); + *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}); current_pipeline_size_++; } @@ -860,7 +861,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr< uint32_t offset = (index * 8) + (byte_idx * 8) + bit_idx; user_cmd->emplace_back(std::to_string(offset)); user_cmd->emplace_back("1"); - *restore_cmds += redis::Array2RESP(*user_cmd); + *restore_cmds += redis::ArrayOfBulkStrings(*user_cmd); current_pipeline_size_++; user_cmd->erase(user_cmd->begin() + 2, user_cmd->end()); } diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc index a670ce1a27d..95bbf9fde03 100644 --- a/src/server/redis_reply.cc +++ b/src/server/redis_reply.cc @@ -41,7 +41,7 @@ std::string Array(const std::vector &list) { return result; } -std::string Array2RESP(const std::vector &elems) { +std::string ArrayOfBulkStrings(const std::vector &elems) { std::string result = "*" + std::to_string(elems.size()) + CRLF; for (const auto &elem : elems) { result += BulkString(elem); diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h index 083ccea2d38..213a8bc0849 100644 --- a/src/server/redis_reply.h +++ b/src/server/redis_reply.h @@ -55,6 +55,6 @@ std::string MultiLen(T len) { } std::string Array(const std::vector &list); -std::string Array2RESP(const std::vector &elements); +std::string ArrayOfBulkStrings(const std::vector &elements); } // namespace redis diff --git a/src/server/server.cc b/src/server/server.cc index 2e98d6e1512..bc4c29391f8 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -1024,7 +1024,7 @@ void Server::GetRoleInfo(std::string *info) { roles.emplace_back("connecting"); } roles.emplace_back(std::to_string(storage->LatestSeqNumber())); - *info = redis::Array2RESP(roles); + *info = redis::ArrayOfBulkStrings(roles); } else { std::vector list; @@ -1032,7 +1032,7 @@ void Server::GetRoleInfo(std::string *info) { for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; - list.emplace_back(redis::Array2RESP({ + list.emplace_back(redis::ArrayOfBulkStrings({ slave->GetConn()->GetAnnounceIP(), std::to_string(slave->GetConn()->GetListeningPort()), std::to_string(slave->GetCurrentReplSeq()), diff --git a/src/stats/log_collector.cc b/src/stats/log_collector.cc index 6785a04a4c7..842a7e5597b 100644 --- a/src/stats/log_collector.cc +++ b/src/stats/log_collector.cc @@ -32,7 +32,7 @@ std::string SlowEntry::ToRedisString() const { output.append(redis::Integer(id)); output.append(redis::Integer(time)); output.append(redis::Integer(duration)); - output.append(redis::Array2RESP(args)); + output.append(redis::ArrayOfBulkStrings(args)); output.append(redis::BulkString(ip + ":" + std::to_string(port))); output.append(redis::BulkString(client_name)); return output; diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 021527d0446..56f588746a8 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -63,10 +63,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic if (metadata.Type() == kRedisString) { command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))}; - resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); if (metadata.expire > 0) { command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)}; - resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); } } else if (metadata.expire > 0) { auto args = log_data_.GetArguments(); @@ -80,7 +80,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic auto cmd = static_cast(*parse_result); if (cmd == kRedisCmdExpire) { command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)}; - resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); } } } @@ -103,7 +103,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic std::to_string(stream_metadata.entries_added), "MAXDELETEDID", stream_metadata.max_deleted_entry_id.ToString()}; - resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); } return rocksdb::Status::OK(); @@ -262,7 +262,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic } if (!command_args.empty()) { - resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); } return rocksdb::Status::OK(); @@ -387,7 +387,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S } if (!command_args.empty()) { - resp_commands_[ns].emplace_back(redis::Array2RESP(command_args)); + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); } return rocksdb::Status::OK(); diff --git a/tests/cppunit/string_reply_test.cc b/tests/cppunit/string_reply_test.cc index 5cef14a38b4..8aa93507f74 100644 --- a/tests/cppunit/string_reply_test.cc +++ b/tests/cppunit/string_reply_test.cc @@ -39,7 +39,7 @@ class StringReplyTest : public testing::Test { std::vector StringReplyTest::values; TEST_F(StringReplyTest, MultiBulkString) { - std::string result = redis::Array2RESP(values); + std::string result = redis::ArrayOfBulkStrings(values); ASSERT_EQ(result.length(), 13 * 10 + 14 * 90 + 15 * 900 + 17 * 9000 + 18 * 90000 + 9); } diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc index 0c29cecb599..9d4db5ec698 100644 --- a/utils/kvrocks2redis/parser.cc +++ b/utils/kvrocks2redis/parser.cc @@ -67,12 +67,12 @@ Status Parser::parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t e auto [ns, user_key] = ExtractNamespaceKey(ns_key, slot_id_encoded_); auto command = - redis::Array2RESP({"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))}); + redis::ArrayOfBulkStrings({"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))}); Status s = writer_->Write(ns, {command}); if (!s.IsOK()) return s; if (expire > 0) { - command = redis::Array2RESP({"EXPIREAT", user_key, std::to_string(expire / 1000)}); + command = redis::ArrayOfBulkStrings({"EXPIREAT", user_key, std::to_string(expire / 1000)}); s = writer_->Write(ns, {command}); } @@ -105,17 +105,17 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) { std::string value = iter->value().ToString(); switch (type) { case kRedisHash: - output = redis::Array2RESP({"HSET", user_key, sub_key, value}); + output = redis::ArrayOfBulkStrings({"HSET", user_key, sub_key, value}); break; case kRedisSet: - output = redis::Array2RESP({"SADD", user_key, sub_key}); + output = redis::ArrayOfBulkStrings({"SADD", user_key, sub_key}); break; case kRedisList: - output = redis::Array2RESP({"RPUSH", user_key, value}); + output = redis::ArrayOfBulkStrings({"RPUSH", user_key, value}); break; case kRedisZSet: { double score = DecodeDouble(value.data()); - output = redis::Array2RESP({"ZADD", user_key, util::Float2String(score), sub_key}); + output = redis::ArrayOfBulkStrings({"ZADD", user_key, util::Float2String(score), sub_key}); break; } case kRedisBitmap: { @@ -126,7 +126,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) { } case kRedisSortedint: { std::string val = std::to_string(DecodeFixed64(ikey.GetSubKey().data())); - output = redis::Array2RESP({"ZADD", user_key, val, val}); + output = redis::ArrayOfBulkStrings({"ZADD", user_key, val, val}); break; } default: @@ -140,7 +140,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) { } if (metadata.expire > 0) { - output = redis::Array2RESP({"EXPIREAT", user_key, std::to_string(metadata.expire / 1000)}); + output = redis::ArrayOfBulkStrings({"EXPIREAT", user_key, std::to_string(metadata.expire / 1000)}); Status s = writer_->Write(ns, {output}); if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to AOF"); } @@ -158,7 +158,7 @@ Status Parser::parseBitmapSegment(const Slice &ns, const Slice &user_key, int in s = writer_->Write( ns.ToString(), - {redis::Array2RESP({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})}); + {redis::ArrayOfBulkStrings({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})}); if (!s.IsOK()) return s.Prefixed("failed to write SETBIT command to AOF"); } } diff --git a/utils/kvrocks2redis/redis_writer.cc b/utils/kvrocks2redis/redis_writer.cc index 3cb512bd105..74410669ed2 100644 --- a/utils/kvrocks2redis/redis_writer.cc +++ b/utils/kvrocks2redis/redis_writer.cc @@ -72,7 +72,7 @@ Status RedisWriter::FlushDB(const std::string &ns) { return s; } - s = Write(ns, {redis::Array2RESP({"FLUSHDB"})}); + s = Write(ns, {redis::ArrayOfBulkStrings({"FLUSHDB"})}); if (!s.IsOK()) return s; return Status::OK();