From 1153b2fa32662053453f6a28fa0f5f19766eea47 Mon Sep 17 00:00:00 2001 From: dinesh-murugiah Date: Tue, 19 Mar 2024 00:25:07 +0530 Subject: [PATCH 1/2] info command initial commit --- .../filters/network/common/redis/BUILD | 1 + .../filters/network/common/redis/client.h | 15 +- .../network/common/redis/client_impl.cc | 16 +- .../network/common/redis/client_impl.h | 2 +- .../network/common/redis/supported_commands.h | 9 +- .../filters/network/common/redis/utility.cc | 171 ++++++++++++++++++ .../filters/network/common/redis/utility.h | 48 +++++ .../filters/network/redis_proxy/BUILD | 1 + .../redis_proxy/command_splitter_impl.cc | 66 ++++++- .../filters/network/redis_proxy/config.cc | 4 +- .../network/redis_proxy/conn_pool_impl.cc | 6 +- .../network/redis_proxy/proxy_filter.cc | 14 +- .../network/redis_proxy/proxy_filter.h | 34 +++- 13 files changed, 345 insertions(+), 42 deletions(-) diff --git a/source/extensions/filters/network/common/redis/BUILD b/source/extensions/filters/network/common/redis/BUILD index 4561a35a52e5..727cab8b8143 100644 --- a/source/extensions/filters/network/common/redis/BUILD +++ b/source/extensions/filters/network/common/redis/BUILD @@ -47,6 +47,7 @@ envoy_cc_library( name = "client_interface", hdrs = ["client.h"], deps = [ + ":utility_interface", ":codec_lib", ":redis_command_stats_lib", "//envoy/upstream:cluster_manager_interface", diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index 914fa7dac3ee..695596b37773 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -6,6 +6,8 @@ #include "source/extensions/filters/network/common/redis/codec_impl.h" #include "source/extensions/filters/network/common/redis/redis_command_stats.h" +#include "source/extensions/filters/network/common/redis/utility.h" + namespace Envoy { namespace Extensions { @@ -58,8 +60,9 @@ class ClientCallbacks { class DirectCallbacks { public: virtual ~DirectCallbacks() = default; - virtual void onDirectResponse(Common::Redis::RespValuePtr&& value) PURE; + virtual void sendResponseDownstream(Common::Redis::RespValuePtr&& value) PURE; virtual void onFailure() PURE; + virtual std::unique_ptr getDownStreamMetrics() PURE; }; @@ -278,12 +281,12 @@ struct Transaction { } should_close_ = false; } - void setPubsubCallback(std::shared_ptr callback) { - pubsub_cb_ = callback; + void setDownstreamCallback(std::shared_ptr callback) { + downstream_cb_ = callback; } - std::shared_ptr getPubsubCallback() { - return pubsub_cb_; + std::shared_ptr getDownstreamCallback() { + return downstream_cb_; } bool active_{false}; bool connection_established_{false}; @@ -298,7 +301,7 @@ struct Transaction { // the mirroring policies. std::vector clients_; Network::ConnectionCallbacks* connection_cb_; - std::shared_ptr pubsub_cb_=nullptr; + std::shared_ptr downstream_cb_=nullptr; // This index represents the current client on which traffic is being sent to. // When sending to the main redis server it will be 0, and when sending to one of diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 64ef2c65e025..c96a141deb26 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -92,7 +92,7 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis host->stats().cx_active_.inc(); connect_or_op_timer_->enableTimer(host->cluster().connectTimeout()); if (is_pubsub_client_){ - pubsub_cb_ = std::move(drcb); + downstream_cb_ = std::move(drcb); } } @@ -101,12 +101,12 @@ ClientImpl::~ClientImpl() { ASSERT(connection_->state() == Network::Connection::State::Closed); host_->cluster().trafficStats()->upstream_cx_active_.dec(); host_->stats().cx_active_.dec(); - pubsub_cb_.reset(); + downstream_cb_.reset(); } void ClientImpl::close() { - pubsub_cb_.reset(); + downstream_cb_.reset(); if (connection_) { connection_->close(Network::ConnectionCloseType::NoFlush); } @@ -208,10 +208,10 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { if (pending_requests_.empty() && is_pubsub_client_) { host_->cluster().trafficStats()->upstream_cx_destroy_with_active_rq_.inc(); ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}'",eventTypeStr); - if ((pubsub_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){ + if ((downstream_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){ ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it"); - pubsub_cb_->onFailure(); - pubsub_cb_.reset(); + downstream_cb_->onFailure(); + downstream_cb_.reset(); } } @@ -246,8 +246,8 @@ void ClientImpl::onRespValue(RespValuePtr&& value) { if (pending_requests_.empty() && is_pubsub_client_) { // This is a pubsub client, and we have received a message from the server. // We need to pass this message to the registered callback. - if (pubsub_cb_ != nullptr){ - pubsub_cb_->onDirectResponse(std::move(value)); + if (downstream_cb_ != nullptr){ + downstream_cb_->sendResponseDownstream(std::move(value)); } return; } diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index 37daaee44bed..e7a9ffe3343e 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -159,7 +159,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne bool is_transaction_client_; bool is_pubsub_client_=false; bool is_blocking_client_=false; - std::shared_ptr pubsub_cb_=nullptr; + std::shared_ptr downstream_cb_=nullptr; }; class ClientFactoryImpl : public ClientFactory { diff --git a/source/extensions/filters/network/common/redis/supported_commands.h b/source/extensions/filters/network/common/redis/supported_commands.h index 389f7dfffbec..b7d1d08c60e0 100644 --- a/source/extensions/filters/network/common/redis/supported_commands.h +++ b/source/extensions/filters/network/common/redis/supported_commands.h @@ -88,13 +88,13 @@ struct SupportedCommands { * @return commands which handle Redis commands without keys. */ static const absl::flat_hash_set& adminNokeyCommands() { - CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "script", "flushall","publish","pubsub", "keys", "slowlog", "config","client"); + CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "script", "flushall","publish","pubsub", "keys", "slowlog", "config","client","info"); } /** * @return commands which handle Redis commands without keys. */ static const absl::flat_hash_set& allShardCommands() { - CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "script", "flushall", "pubsub", "keys", "slowlog", "config","client"); + CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "script", "flushall", "pubsub", "keys", "slowlog", "config","client","info"); } /** @@ -137,6 +137,11 @@ struct SupportedCommands { */ static const std::string& exit() { CONSTRUCT_ON_FIRST_USE(std::string, "exit"); } + /** + * @return info command + */ + static const std::string& info() { CONSTRUCT_ON_FIRST_USE(std::string, "info"); } + /** * @return commands which alters the state of redis */ diff --git a/source/extensions/filters/network/common/redis/utility.cc b/source/extensions/filters/network/common/redis/utility.cc index 3eb644f3285a..1d32df9c73f9 100644 --- a/source/extensions/filters/network/common/redis/utility.cc +++ b/source/extensions/filters/network/common/redis/utility.cc @@ -1,6 +1,7 @@ #include "source/extensions/filters/network/common/redis/utility.h" #include "source/common/common/utility.h" +#include "absl/strings/str_split.h" namespace Envoy { namespace Extensions { @@ -83,6 +84,176 @@ const SetRequest& SetRequest::instance() { static const SetRequest* instance = new SetRequest{}; return *instance; } + +void CommonInfoCmdResponseAggregator(const std::string& infokey,const std::string& infovalue,InfoCmdResponseProcessor::infoCmdResponseDecoder& infoCmdObj){ + + // Handle unused variable warning + (void)infokey; + switch (infoCmdObj.aggregate_type) { + case staticvalue: + infoCmdObj.strvalue = infovalue; + break; + case sumvalues: + infoCmdObj.intvalue += std::stoull(infovalue); + infoCmdObj.strvalue = std::to_string(infoCmdObj.intvalue); + break; + case highestvalue: + if (std::stoull(infovalue) > infoCmdObj.intvalue) { + infoCmdObj.intvalue = std::stoull(infovalue); + infoCmdObj.strvalue = infovalue; + } + break; + default: + break; + } + + return; +} + +void getkeyspacestats(const std::string& value, std::uint64_t& keys, std::uint64_t& expires, std::uint64_t& avg_ttl) { + std::vector keyvaluepairs = absl::StrSplit(value, ','); + + for (const auto& keyvaluepair : keyvaluepairs) { + std::vector keyvalue = absl::StrSplit(keyvaluepair, '='); + if (keyvalue.size() == 2) { + if (keyvalue[0] == "keys") { + keys = std::stoull(keyvalue[1]); + } else if (keyvalue[0] == "expires") { + expires = std::stoull(keyvalue[1]); + } else if (keyvalue[0] == "avg_ttl") { + avg_ttl = std::stoull(keyvalue[1]); + } + } + } +} + +void InfoResponseAggrKeyspace(const std::string& infokey, const std::string& infovalue,InfoCmdResponseProcessor::infoCmdResponseDecoder& infoCmdObj){ + + std::uint64_t currkeys,oldkeys = 0; + std::uint64_t currexpires,oldexpires = 0; + std::uint64_t curravg_ttl,oldavg_ttl = 0; + + if (infoCmdObj.strvalue != ""){ + getkeyspacestats(infoCmdObj.strvalue, oldkeys, oldexpires, oldavg_ttl); + } + + if (infokey == "db0") { + getkeyspacestats(infovalue, currkeys, currexpires, curravg_ttl); + infoCmdObj.strvalue = "keys=" + std::to_string(currkeys + oldkeys) + ",expires=" + std::to_string(currexpires + oldexpires) + ",avg_ttl=" + std::to_string((curravg_ttl + oldavg_ttl) / 2); + } + + return; + +} +void InfoCmdResponseProcessor::updateInfoCmdResponseString(const std::string& infokey, const std::string& infovalue) { + processInfoCmdResponse(infokey, infovalue); +} + +std::string InfoCmdResponseProcessor::getInfoCmdResponseString(){ + + //generate info response string from the inforesponsetemplate_ vector + std::string prevInfoCategory=""; + std::string infoResponseString; + for (const auto& entry : inforesponsetemplate_) { + + if (prevInfoCategory != entry.infocategory) { + infoResponseString += entry.infocategory + "\n"; + prevInfoCategory = entry.infocategory; + } + if (entry.strvalue != ""){ + infoResponseString += entry.infokey + ":" + entry.strvalue + "\n"; + } + } + return infoResponseString; + +} + +InfoCmdResponseProcessor::InfoCmdResponseProcessor() { + + //Initialize the info response template + inforesponsetemplate_ = { + {"# Server","redis_version", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","redis_git_sha1", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","redis_git_dirty", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","redis_build_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","redis_mode", hardcodedvalue, "standalone", 0, nullptr}, + {"# Server","os", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","arch_bits", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","multiplexing_api", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","process_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","run_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + //{"# Server","tcp_port", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, //Need to find a way to get the TCP port from listener , currently its not exposed + {"# Server","uptime_in_seconds", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","uptime_in_days", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","hz", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","lru_clock", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","config_file", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Memory","used_memory", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Memory","used_memory_peak", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Memory","used_memory_rss", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Memory","used_memory_lua", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Memory","maxmemory", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Memory","maxmemory_policy", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Memory","mem_fragmentation_ratio", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Memory","mem_allocator", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Persistence","loading", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Persistence","rdb_changes_since_last_save", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Persistence","rdb_bgsave_in_progress", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Persistence","rdb_last_save_time", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Persistence","rdb_last_bgsave_status", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Persistence","rdb_last_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Persistence","rdb_current_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","total_connections_received", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","total_commands_processed", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","instantaneous_ops_per_sec", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","total_net_input_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","total_net_output_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","instantaneous_input_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","instantaneous_output_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","rejected_connections", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","sync_full", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","sync_partial_ok", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","sync_partial_err", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","expired_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","evicted_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","keyspace_hits", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","keyspace_misses", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","pubsub_channels", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","pubsub_patterns", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Stats","latest_fork_usec", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Replication","role", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Replication","connected_slaves", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# CPU","used_cpu_sys", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# CPU","used_cpu_user", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# CPU","used_cpu_sys_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# CPU","used_cpu_user_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, + {"# Cluster","cluster_enabled", hardcodedvalue, "0", 0, nullptr}, + {"# Keyspace","db0",customizevalue, "", 0, InfoResponseAggrKeyspace} + }; + // Populate the converters_ hash map with indices of the inforesponsetemplate_ vector + for (size_t i = 0; i < inforesponsetemplate_.size(); ++i) { + converters_[inforesponsetemplate_[i].infokey] = i; + } +} + +InfoCmdResponseProcessor::~InfoCmdResponseProcessor() { +} + +void InfoCmdResponseProcessor::processInfoCmdResponse(const std::string& key, const std::string& value) { + auto it = converters_.find(key); + if (it != converters_.end()) { + // Use the found index to access the corresponding entry in inforesponsetemplate_ + infoCmdResponseDecoder& entry = inforesponsetemplate_[it->second]; + + if (entry.customizer != nullptr) { + entry.customizer(key, value, entry); + //ENVOY_LOG(debug, "infokey: '{}' infovalue: '{}'", inforesponsetemplate_[it->second].infokey, inforesponsetemplate_[it->second].strvalue); + } + + } + return; +} + } // namespace Utility } // namespace Redis } // namespace Common diff --git a/source/extensions/filters/network/common/redis/utility.h b/source/extensions/filters/network/common/redis/utility.h index 7f98bfbb444f..7ce16fbe2691 100644 --- a/source/extensions/filters/network/common/redis/utility.h +++ b/source/extensions/filters/network/common/redis/utility.h @@ -3,6 +3,7 @@ #include #include "source/extensions/filters/network/common/redis/codec.h" +#include "source/common/common/logger.h" namespace Envoy { namespace Extensions { @@ -11,6 +12,53 @@ namespace Common { namespace Redis { namespace Utility { +struct DownStreamMetrics { + uint64_t downstream_rq_total_{}; + uint64_t downstream_cx_drain_close_{}; + uint64_t downstream_cx_protocol_error_{}; + uint64_t downstream_cx_rx_bytes_total_{}; + uint64_t downstream_cx_total_{}; + uint64_t downstream_cx_tx_bytes_total_{}; + uint64_t downstream_cx_active_{}; + uint64_t downstream_cx_rx_bytes_buffered_{}; + uint64_t downstream_cx_tx_bytes_buffered_{}; + uint64_t downstream_rq_active_{}; +}; + + +enum InforesponseAggregatorType { + staticvalue, + sumvalues, + highestvalue, + customizevalue, + hardcodedvalue +}; + + +class InfoCmdResponseProcessor:public Logger::Loggable { +public: + + InfoCmdResponseProcessor(); + ~InfoCmdResponseProcessor(); + + void processInfoCmdResponse(const std::string& infokey, const std::string& infovalue); + std::string getInfoCmdResponseString(); + void updateInfoCmdResponseString(const std::string& infokey, const std::string& infovalue); + + struct infoCmdResponseDecoder { + std::string infocategory; + std::string infokey; + enum InforesponseAggregatorType aggregate_type; + std::string strvalue; + std::uint64_t intvalue; + void (*customizer)(const std::string& infokey, const std::string& infovalue,infoCmdResponseDecoder& infoObj); + }; +private: + + std::vector inforesponsetemplate_; + std::unordered_map converters_; +}; + class AuthRequest : public Redis::RespValue { public: AuthRequest(const std::string& username, const std::string& password); diff --git a/source/extensions/filters/network/redis_proxy/BUILD b/source/extensions/filters/network/redis_proxy/BUILD index 506ec5d2b484..a11aafd2f88d 100644 --- a/source/extensions/filters/network/redis_proxy/BUILD +++ b/source/extensions/filters/network/redis_proxy/BUILD @@ -118,6 +118,7 @@ envoy_cc_library( "//source/common/common:assert_lib", "//source/common/config:datasource_lib", "//source/common/config:utility_lib", + "//source/extensions/filters/network/common/redis:utility_lib", "//source/extensions/common/dynamic_forward_proxy:dns_cache_interface", "//source/extensions/filters/network/common/redis:codec_interface", "@envoy_api//envoy/extensions/filters/network/redis_proxy/v3:pkg_cc_proto", diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 3870cbe9dd96..5f156fb41e94 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -48,7 +48,7 @@ Common::Redis::Client::PoolRequest* makeSingleServerRequest( AdminRespHandlerType getresponseHandlerType(const std::string command_name) { AdminRespHandlerType responseHandlerType = AdminRespHandlerType::response_handler_none; if (Common::Redis::SupportedCommands::allShardCommands().contains(command_name)) { - if (command_name == "pubsub" || command_name == "keys" || command_name == "slowlog" || command_name == "client") { + if (command_name == "pubsub" || command_name == "keys" || command_name == "slowlog" || command_name == "client" || command_name == "info") { responseHandlerType = AdminRespHandlerType::aggregate_all_responses; } else if (command_name == "script" || command_name == "flushall" || command_name == "config"){ responseHandlerType = AdminRespHandlerType::allresponses_mustbe_same; @@ -341,9 +341,15 @@ SplitRequestPtr mgmtNoKeyRequest::create(Router& router, Common::Redis::RespValu int32_t requestsCount=1; int32_t shard_index=0; bool iserror = false; + std::string firstarg = std::string(); std::string command_name = absl::AsciiStrToLower(incoming_request->asArray()[0].asString()); - std::string firstarg = absl::AsciiStrToLower(incoming_request->asArray()[1].asString()); + if (incoming_request->asArray().size() > 1) { + firstarg = absl::AsciiStrToLower(incoming_request->asArray()[1].asString()); + }else{ + firstarg = ""; + } + if (!checkIfAdminCommandSupported(command_name,firstarg)){ ENVOY_LOG(debug, "this admin command is not supported: '{}'", incoming_request->toString()); callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().InvalidRequest)); @@ -463,6 +469,33 @@ bool areAllResponsesSame(const std::vector& respons return true; } +void parseInfoResponse(const std::string& inforesponse,Common::Redis::Utility::InfoCmdResponseProcessor& infoProcessor) { + std::istringstream inputStream(inforesponse); + std::string line; + + while (std::getline(inputStream, line)) { + // Ignore lines that start with '#' + if (!line.empty() && line[0] == '#') { + continue; + } + + std::size_t pos = line.find(':'); + if (pos != std::string::npos) { + std::string key = line.substr(0, pos); + std::string value = line.substr(pos + 1); + + // Trim whitespace from both ends + key.erase(key.find_last_not_of(" \n\r\t") + 1); + value.erase(0, value.find_first_not_of(" \n\r\t")); + infoProcessor.processInfoCmdResponse(key, value); + + } + } + + return; +} + + void mgmtNoKeyRequest::onallChildRespAgrregate(Common::Redis::RespValuePtr&& value, int32_t reqindex, int32_t shardindex) { pending_requests_[reqindex].handle_ = nullptr; const auto& redisarg = pending_requests_[reqindex].redisarg_; @@ -492,6 +525,32 @@ void mgmtNoKeyRequest::onallChildRespAgrregate(Common::Redis::RespValuePtr&& val bool positiveresponse = true; updateStats(error_count_ == 0); if (!pending_responses_.empty()) { + if ( rediscommand == "info"){ + Common::Redis::RespValuePtr response = std::make_unique(); + response->type(Common::Redis::RespType::BulkString); + Envoy::Extensions::NetworkFilters::Common::Redis::Utility::InfoCmdResponseProcessor infoProcessor; + for (auto& resp : pending_responses_) { + if (resp->type() == Common::Redis::RespType::BulkString) { + parseInfoResponse(resp->asString(),infoProcessor); + //Add proxy filters connection stats to the response + auto downstreamstats = callbacks_.transaction().getDownstreamCallback()->getDownStreamMetrics(); + + infoProcessor.updateInfoCmdResponseString("total_connections_received",std::to_string(downstreamstats->downstream_cx_total_)); + + } else { + positiveresponse = false; + ENVOY_LOG(debug, "Error: Unexpected response Type , expected bulkstring"); + } + } + if (!positiveresponse) { + callbacks_.onResponse(Common::Redis::Utility::makeError( + fmt::format("unexpected response type received from upstream"))); + }else { + response->asString() += infoProcessor.getInfoCmdResponseString(); + callbacks_.onResponse(std::move(response)); + } + pending_responses_.clear(); + } if ( rediscommand == "pubsub" || rediscommand == "keys" || rediscommand == "slowlog"|| rediscommand == "client") { if ((redisarg == "numpat" || redisarg == "len") && (rediscommand == "pubsub" || rediscommand == "slowlog")) { int sum = 0; @@ -1215,8 +1274,7 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request, return nullptr; } - if (request->asArray().size() < 2 &&(Common::Redis::SupportedCommands::transactionCommands().count(command_name) == 0) - && (Common::Redis::SupportedCommands::subcrStateallowedCommands().count(command_name) == 0)){ + if (request->asArray().size() < 2 &&(Common::Redis::SupportedCommands::transactionCommands().count(command_name) == 0)&& (Common::Redis::SupportedCommands::subcrStateallowedCommands().count(command_name) == 0) && (command_name != Common::Redis::SupportedCommands::info())){ // Commands other than PING, TIME and transaction commands all have at least two arguments. ENVOY_LOG(debug,"invalid request - not enough arguments for command: '{}'", command_name); onInvalidRequest(callbacks); diff --git a/source/extensions/filters/network/redis_proxy/config.cc b/source/extensions/filters/network/redis_proxy/config.cc index acadcd44b172..dc9015bae83c 100644 --- a/source/extensions/filters/network/redis_proxy/config.cc +++ b/source/extensions/filters/network/redis_proxy/config.cc @@ -99,8 +99,8 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP auto proxy_filter_shared = std::make_shared( factory, Common::Redis::EncoderPtr{new Common::Redis::EncoderImpl()}, *splitter, filter_config); - auto pubsub_cb_ptr = std::make_shared(proxy_filter_shared); - proxy_filter_shared->setTransactionPubsubCallback(std::move(pubsub_cb_ptr)); + auto downstream_cb_ptr = std::make_shared(proxy_filter_shared); + proxy_filter_shared->setDownStreamCallbacks(std::move(downstream_cb_ptr)); ENVOY_LOG(debug, "redis: new proxy filter instance and creating pubsub callback"); filter_manager.addReadFilter(proxy_filter_shared); }; diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 857f41f76104..ed6eb91db077 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -412,14 +412,14 @@ InstanceImpl::ThreadLocalPool::makeBlockingClientRequest(int32_t shard_index, co if (!transaction.connection_established_ && transaction.isSubscribedMode()) { transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), - auth_username_, auth_password_, false,true,false,transaction.getPubsubCallback()); + auth_username_, auth_password_, false,true,false,transaction.getDownstreamCallback()); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } }else if (!transaction.connection_established_ && transaction.isBlockingCommand()) { transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), - auth_username_, auth_password_, false,false,true,transaction.getPubsubCallback()); + auth_username_, auth_password_, false,false,true,transaction.getDownstreamCallback()); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } @@ -476,7 +476,7 @@ InstanceImpl::ThreadLocalPool::makeRequestNoKey(int32_t shard_index, RespVariant if ((!transaction.connection_established_ && transaction.is_subscribed_mode_) || (!transaction.connection_established_ && transaction.is_blocking_command_)) { transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), - auth_username_, auth_password_, false,true,true,transaction.getPubsubCallback()); + auth_username_, auth_password_, false,true,true,transaction.getDownstreamCallback()); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.cc b/source/extensions/filters/network/redis_proxy/proxy_filter.cc index 3ff625a4d156..b986004edcb9 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.cc +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.cc @@ -121,13 +121,13 @@ void ProxyFilter::onEvent(Network::ConnectionEvent event) { pending_requests_.pop_front(); } ENVOY_LOG(debug,"dereferencing pubsub callback and transaction on exit from proxy filter"); - // As pubsubcallbaks is created in proxy filter irerespecive of its a pubsub command or not this needs to be cleared on exit from proxy filter + // As downstreamcallbaks is created in proxy filter irerespecive of its a pubsub command or not this needs to be cleared on exit from proxy filter // decrement the reference to proxy filter - auto pubsub_cb = dynamic_cast(transaction_.getPubsubCallback().get()); - if (pubsub_cb != nullptr){ - pubsub_cb->clearParent(); + auto downstream_cb = dynamic_cast(transaction_.getDownstreamCallback().get()); + if (downstream_cb != nullptr){ + downstream_cb->clearParent(); } - transaction_.setPubsubCallback(nullptr); + transaction_.setDownstreamCallback(nullptr); transaction_.close(); } } @@ -149,7 +149,6 @@ void ProxyFilter::onAuth(PendingRequest& request, const std::string& password) { response->type(Common::Redis::RespType::SimpleString); response->asString() = "OK"; connection_allowed_ = true; - //PubsubCallbacks::getInstance(this); } else { response->type(Common::Redis::RespType::Error); response->asString() = "ERR invalid password"; @@ -170,12 +169,10 @@ void ProxyFilter::onAuth(PendingRequest& request, const std::string& username, response->type(Common::Redis::RespType::SimpleString); response->asString() = "OK"; connection_allowed_ = true; - //PubsubCallbacks::getInstance(this); } else if (username == config_->downstream_auth_username_ && checkPassword(password)) { response->type(Common::Redis::RespType::SimpleString); response->asString() = "OK"; connection_allowed_ = true; - //PubsubCallbacks::getInstance(this); } else { response->type(Common::Redis::RespType::Error); response->asString() = "WRONGPASS invalid username-password pair"; @@ -270,7 +267,6 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt Network::FilterStatus ProxyFilter::onData(Buffer::Instance& data, bool) { TRY_NEEDS_AUDIT { - //PubsubCallbacks::getInstance(this); decoder_->decode(data); return Network::FilterStatus::Continue; } diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.h b/source/extensions/filters/network/redis_proxy/proxy_filter.h index 59d4d5bc2c63..c50a470c3275 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.h +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.h @@ -15,6 +15,7 @@ #include "source/extensions/common/dynamic_forward_proxy/dns_cache.h" #include "source/extensions/filters/network/common/redis/codec.h" #include "source/extensions/filters/network/redis_proxy/command_splitter.h" +#include "source/extensions/filters/network/common/redis/utility.h" namespace Envoy { namespace Extensions { @@ -106,8 +107,22 @@ class ProxyFilter : public Network::ReadFilter, Common::Redis::Client::Transaction& transaction() { return transaction_; } void onAsyncResponse(Common::Redis::RespValuePtr&& value); void onPubsubConnClose(); - void setTransactionPubsubCallback(std::shared_ptr callback) { - transaction_.setPubsubCallback(std::move(callback)); + void setDownStreamCallbacks(std::shared_ptr callback) { + transaction_.setDownstreamCallback(std::move(callback)); + } + std::unique_ptr getDownStreamInfo() { + std::unique_ptr downstream_metrics = std::make_unique(); + downstream_metrics->downstream_rq_total_ = config_->stats_.downstream_rq_total_.value(); + downstream_metrics->downstream_cx_drain_close_ = config_->stats_.downstream_cx_drain_close_.value(); + downstream_metrics->downstream_cx_protocol_error_ = config_->stats_.downstream_cx_protocol_error_.value(); + downstream_metrics->downstream_cx_rx_bytes_total_ = config_->stats_.downstream_cx_rx_bytes_total_.value(); + downstream_metrics->downstream_cx_total_ = config_->stats_.downstream_cx_total_.value(); + downstream_metrics->downstream_cx_tx_bytes_total_ = config_->stats_.downstream_cx_tx_bytes_total_.value(); + downstream_metrics->downstream_cx_active_ = config_->stats_.downstream_cx_active_.value(); + downstream_metrics->downstream_cx_rx_bytes_buffered_ = config_->stats_.downstream_cx_rx_bytes_buffered_.value(); + downstream_metrics->downstream_cx_tx_bytes_buffered_ = config_->stats_.downstream_cx_tx_bytes_buffered_.value(); + downstream_metrics->downstream_rq_active_ = config_->stats_.downstream_rq_active_.value(); + return downstream_metrics; } private: @@ -153,33 +168,38 @@ class ProxyFilter : public Network::ReadFilter, bool connection_quit_; }; -class PubsubCallbacks : public Common::Redis::Client::DirectCallbacks{ +class DownStreamCallbacks : public Common::Redis::Client::DirectCallbacks{ private: std::shared_ptr parent_; public: - PubsubCallbacks( std::shared_ptr parent) : parent_(parent) {} + DownStreamCallbacks( std::shared_ptr parent) : parent_(parent) {} void clearParent() { parent_.reset(); // This releases the shared_ptr's ownership and decrements the reference count. } /* - ~PubsubCallbacks() { + ~DownStreamCallbacks() { if (parent_) { - parent_->setTransactionPubsubCallback(nullptr); // Deregister the callback + parent_->setDownstreamCallback(nullptr); // Deregister the callback } parent_=nullptr; } */ - void onDirectResponse(Common::Redis::RespValuePtr&& value) override { + void sendResponseDownstream(Common::Redis::RespValuePtr&& value) override { parent_->onAsyncResponse(std::move(value)); } void onFailure() override { parent_->onPubsubConnClose(); } + + virtual std::unique_ptr getDownStreamMetrics() override { + return parent_->getDownStreamInfo(); + + } }; } // namespace RedisProxy From 26c05f262683ce7d95a3256bdbcafb71b882cea4 Mon Sep 17 00:00:00 2001 From: dinesh-murugiah Date: Tue, 19 Mar 2024 17:53:22 +0530 Subject: [PATCH 2/2] some cleanups and fixes --- .../filters/network/common/redis/utility.cc | 129 ++++++++++-------- .../filters/network/common/redis/utility.h | 11 +- .../redis_proxy/command_splitter_impl.cc | 9 +- 3 files changed, 81 insertions(+), 68 deletions(-) diff --git a/source/extensions/filters/network/common/redis/utility.cc b/source/extensions/filters/network/common/redis/utility.cc index 1d32df9c73f9..9d64570cbd28 100644 --- a/source/extensions/filters/network/common/redis/utility.cc +++ b/source/extensions/filters/network/common/redis/utility.cc @@ -103,6 +103,9 @@ void CommonInfoCmdResponseAggregator(const std::string& infokey,const std::strin infoCmdObj.strvalue = infovalue; } break; + case proxymetrics: + infoCmdObj.processor->updateInfoResponseWithProxyMetrics(infokey,infoCmdObj); + break; default: break; } @@ -145,9 +148,16 @@ void InfoResponseAggrKeyspace(const std::string& infokey, const std::string& inf return; } -void InfoCmdResponseProcessor::updateInfoCmdResponseString(const std::string& infokey, const std::string& infovalue) { - processInfoCmdResponse(infokey, infovalue); +void InfoCmdResponseProcessor::updateInfoResponseWithProxyMetrics(const std::string& infokey,InfoCmdResponseProcessor::infoCmdResponseDecoder& infoCmdObj){ + +if (infokey == "total_connections_received"){ + auto cxtotal = infoCmdObj.processor->getDownstreamMetrics().downstream_cx_total_; + infoCmdObj.strvalue = std::to_string(cxtotal); } +return; + +} + std::string InfoCmdResponseProcessor::getInfoCmdResponseString(){ @@ -168,71 +178,72 @@ std::string InfoCmdResponseProcessor::getInfoCmdResponseString(){ } -InfoCmdResponseProcessor::InfoCmdResponseProcessor() { +InfoCmdResponseProcessor::InfoCmdResponseProcessor(DownStreamMetrics& downstream_metrics) : downstream_metrics_(downstream_metrics){ //Initialize the info response template inforesponsetemplate_ = { - {"# Server","redis_version", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","redis_git_sha1", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","redis_git_dirty", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","redis_build_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","redis_mode", hardcodedvalue, "standalone", 0, nullptr}, - {"# Server","os", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","arch_bits", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","multiplexing_api", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","process_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","run_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, + {"# Server","redis_version", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","redis_git_sha1", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","redis_git_dirty", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","redis_build_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","redis_mode", hardcodedvalue, "standalone", 0, nullptr,nullptr}, + {"# Server","os", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","arch_bits", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","multiplexing_api", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","process_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","run_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, //{"# Server","tcp_port", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, //Need to find a way to get the TCP port from listener , currently its not exposed - {"# Server","uptime_in_seconds", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","uptime_in_days", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","hz", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","lru_clock", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Server","config_file", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Memory","used_memory", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Memory","used_memory_peak", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Memory","used_memory_rss", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Memory","used_memory_lua", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Memory","maxmemory", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Memory","maxmemory_policy", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Memory","mem_fragmentation_ratio", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Memory","mem_allocator", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Persistence","loading", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Persistence","rdb_changes_since_last_save", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Persistence","rdb_bgsave_in_progress", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Persistence","rdb_last_save_time", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Persistence","rdb_last_bgsave_status", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Persistence","rdb_last_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Persistence","rdb_current_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","total_connections_received", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","total_commands_processed", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","instantaneous_ops_per_sec", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","total_net_input_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","total_net_output_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","instantaneous_input_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","instantaneous_output_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","rejected_connections", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","sync_full", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","sync_partial_ok", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","sync_partial_err", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","expired_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","evicted_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","keyspace_hits", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","keyspace_misses", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","pubsub_channels", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","pubsub_patterns", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Stats","latest_fork_usec", highestvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Replication","role", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# Replication","connected_slaves", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, - {"# CPU","used_cpu_sys", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# CPU","used_cpu_user", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# CPU","used_cpu_sys_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# CPU","used_cpu_user_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator}, - {"# Cluster","cluster_enabled", hardcodedvalue, "0", 0, nullptr}, - {"# Keyspace","db0",customizevalue, "", 0, InfoResponseAggrKeyspace} + {"# Server","uptime_in_seconds", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","uptime_in_days", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","hz", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","lru_clock", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Server","config_file", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Memory","used_memory", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Memory","used_memory_peak", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Memory","used_memory_rss", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Memory","used_memory_lua", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Memory","maxmemory", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Memory","maxmemory_policy", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Memory","mem_fragmentation_ratio", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Memory","mem_allocator", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Persistence","loading", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Persistence","rdb_changes_since_last_save", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Persistence","rdb_bgsave_in_progress", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Persistence","rdb_last_save_time", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Persistence","rdb_last_bgsave_status", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Persistence","rdb_last_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Persistence","rdb_current_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","total_connections_received", proxymetrics, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","total_commands_processed", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","instantaneous_ops_per_sec", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","total_net_input_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","total_net_output_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","instantaneous_input_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","instantaneous_output_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","rejected_connections", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","sync_full", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","sync_partial_ok", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","sync_partial_err", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","expired_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","evicted_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","keyspace_hits", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","keyspace_misses", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","pubsub_channels", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","pubsub_patterns", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Stats","latest_fork_usec", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Replication","role", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Replication","connected_slaves", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# CPU","used_cpu_sys", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# CPU","used_cpu_user", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# CPU","used_cpu_sys_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# CPU","used_cpu_user_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr}, + {"# Cluster","cluster_enabled", hardcodedvalue, "0", 0, nullptr,nullptr}, + {"# Keyspace","db0",customizevalue, "", 0, InfoResponseAggrKeyspace,nullptr} }; // Populate the converters_ hash map with indices of the inforesponsetemplate_ vector for (size_t i = 0; i < inforesponsetemplate_.size(); ++i) { converters_[inforesponsetemplate_[i].infokey] = i; + inforesponsetemplate_[i].processor = this; } } diff --git a/source/extensions/filters/network/common/redis/utility.h b/source/extensions/filters/network/common/redis/utility.h index 7ce16fbe2691..37361d0a5dc1 100644 --- a/source/extensions/filters/network/common/redis/utility.h +++ b/source/extensions/filters/network/common/redis/utility.h @@ -31,19 +31,19 @@ enum InforesponseAggregatorType { sumvalues, highestvalue, customizevalue, - hardcodedvalue + hardcodedvalue, + proxymetrics }; class InfoCmdResponseProcessor:public Logger::Loggable { public: - InfoCmdResponseProcessor(); + InfoCmdResponseProcessor(DownStreamMetrics& downstream_metrics); ~InfoCmdResponseProcessor(); void processInfoCmdResponse(const std::string& infokey, const std::string& infovalue); std::string getInfoCmdResponseString(); - void updateInfoCmdResponseString(const std::string& infokey, const std::string& infovalue); struct infoCmdResponseDecoder { std::string infocategory; @@ -52,11 +52,16 @@ class InfoCmdResponseProcessor:public Logger::Loggable { std::string strvalue; std::uint64_t intvalue; void (*customizer)(const std::string& infokey, const std::string& infovalue,infoCmdResponseDecoder& infoObj); + InfoCmdResponseProcessor* processor; }; + + void updateInfoResponseWithProxyMetrics(const std::string& infokey,infoCmdResponseDecoder& infoObj); + DownStreamMetrics& getDownstreamMetrics() { return downstream_metrics_; } private: std::vector inforesponsetemplate_; std::unordered_map converters_; + DownStreamMetrics& downstream_metrics_; }; class AuthRequest : public Redis::RespValue { diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 5f156fb41e94..aec6943ce92b 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -528,18 +528,15 @@ void mgmtNoKeyRequest::onallChildRespAgrregate(Common::Redis::RespValuePtr&& val if ( rediscommand == "info"){ Common::Redis::RespValuePtr response = std::make_unique(); response->type(Common::Redis::RespType::BulkString); - Envoy::Extensions::NetworkFilters::Common::Redis::Utility::InfoCmdResponseProcessor infoProcessor; + auto downstreamstats = callbacks_.transaction().getDownstreamCallback()->getDownStreamMetrics(); + Envoy::Extensions::NetworkFilters::Common::Redis::Utility::InfoCmdResponseProcessor infoProcessor(*downstreamstats); for (auto& resp : pending_responses_) { if (resp->type() == Common::Redis::RespType::BulkString) { parseInfoResponse(resp->asString(),infoProcessor); - //Add proxy filters connection stats to the response - auto downstreamstats = callbacks_.transaction().getDownstreamCallback()->getDownStreamMetrics(); - - infoProcessor.updateInfoCmdResponseString("total_connections_received",std::to_string(downstreamstats->downstream_cx_total_)); - } else { positiveresponse = false; ENVOY_LOG(debug, "Error: Unexpected response Type , expected bulkstring"); + break; } } if (!positiveresponse) {