Skip to content

Commit

Permalink
Merge pull request #17 from freshworks/info_command_support
Browse files Browse the repository at this point in the history
info command support added
  • Loading branch information
dinesh-murugiah authored Mar 19, 2024
2 parents 8876f60 + 26c05f2 commit 2bc63a3
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 42 deletions.
1 change: 1 addition & 0 deletions source/extensions/filters/network/common/redis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ envoy_cc_library(
name = "client_interface",
hdrs = ["client.h"],
deps = [
":utility_interface",
":codec_lib",
":redis_command_stats_lib",
"//envoy/upstream:cluster_manager_interface",
Expand Down
15 changes: 9 additions & 6 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "source/extensions/filters/network/common/redis/codec_impl.h"
#include "source/extensions/filters/network/common/redis/redis_command_stats.h"
#include "source/extensions/filters/network/common/redis/utility.h"


namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -58,8 +60,9 @@ class ClientCallbacks {
class DirectCallbacks {
public:
virtual ~DirectCallbacks() = default;
virtual void onDirectResponse(Common::Redis::RespValuePtr&& value) PURE;
virtual void sendResponseDownstream(Common::Redis::RespValuePtr&& value) PURE;
virtual void onFailure() PURE;
virtual std::unique_ptr<Envoy::Extensions::NetworkFilters::Common::Redis::Utility::DownStreamMetrics> getDownStreamMetrics() PURE;

};

Expand Down Expand Up @@ -278,12 +281,12 @@ struct Transaction {
}
should_close_ = false;
}
void setPubsubCallback(std::shared_ptr<DirectCallbacks> callback) {
pubsub_cb_ = callback;
void setDownstreamCallback(std::shared_ptr<DirectCallbacks> callback) {
downstream_cb_ = callback;
}

std::shared_ptr<DirectCallbacks> getPubsubCallback() {
return pubsub_cb_;
std::shared_ptr<DirectCallbacks> getDownstreamCallback() {
return downstream_cb_;
}
bool active_{false};
bool connection_established_{false};
Expand All @@ -298,7 +301,7 @@ struct Transaction {
// the mirroring policies.
std::vector<ClientPtr> clients_;
Network::ConnectionCallbacks* connection_cb_;
std::shared_ptr<DirectCallbacks> pubsub_cb_=nullptr;
std::shared_ptr<DirectCallbacks> downstream_cb_=nullptr;

// This index represents the current client on which traffic is being sent to.
// When sending to the main redis server it will be 0, and when sending to one of
Expand Down
16 changes: 8 additions & 8 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis
host->stats().cx_active_.inc();
connect_or_op_timer_->enableTimer(host->cluster().connectTimeout());
if (is_pubsub_client_){
pubsub_cb_ = std::move(drcb);
downstream_cb_ = std::move(drcb);
}
}

Expand All @@ -101,12 +101,12 @@ ClientImpl::~ClientImpl() {
ASSERT(connection_->state() == Network::Connection::State::Closed);
host_->cluster().trafficStats()->upstream_cx_active_.dec();
host_->stats().cx_active_.dec();
pubsub_cb_.reset();
downstream_cb_.reset();

}

void ClientImpl::close() {
pubsub_cb_.reset();
downstream_cb_.reset();
if (connection_) {
connection_->close(Network::ConnectionCloseType::NoFlush);
}
Expand Down Expand Up @@ -208,10 +208,10 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) {
if (pending_requests_.empty() && is_pubsub_client_) {
host_->cluster().trafficStats()->upstream_cx_destroy_with_active_rq_.inc();
ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}'",eventTypeStr);
if ((pubsub_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){
if ((downstream_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){
ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it");
pubsub_cb_->onFailure();
pubsub_cb_.reset();
downstream_cb_->onFailure();
downstream_cb_.reset();
}
}

Expand Down Expand Up @@ -246,8 +246,8 @@ void ClientImpl::onRespValue(RespValuePtr&& value) {
if (pending_requests_.empty() && is_pubsub_client_) {
// This is a pubsub client, and we have received a message from the server.
// We need to pass this message to the registered callback.
if (pubsub_cb_ != nullptr){
pubsub_cb_->onDirectResponse(std::move(value));
if (downstream_cb_ != nullptr){
downstream_cb_->sendResponseDownstream(std::move(value));
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
bool is_transaction_client_;
bool is_pubsub_client_=false;
bool is_blocking_client_=false;
std::shared_ptr<Extensions::NetworkFilters::Common::Redis::Client::DirectCallbacks> pubsub_cb_=nullptr;
std::shared_ptr<Extensions::NetworkFilters::Common::Redis::Client::DirectCallbacks> downstream_cb_=nullptr;
};

class ClientFactoryImpl : public ClientFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ struct SupportedCommands {
* @return commands which handle Redis commands without keys.
*/
static const absl::flat_hash_set<std::string>& adminNokeyCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall","publish","pubsub", "keys", "slowlog", "config","client");
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall","publish","pubsub", "keys", "slowlog", "config","client","info");
}
/**
* @return commands which handle Redis commands without keys.
*/
static const absl::flat_hash_set<std::string>& allShardCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall", "pubsub", "keys", "slowlog", "config","client");
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall", "pubsub", "keys", "slowlog", "config","client","info");
}

/**
Expand Down Expand Up @@ -142,6 +142,11 @@ struct SupportedCommands {
*/
static const std::string& exit() { CONSTRUCT_ON_FIRST_USE(std::string, "exit"); }

/**
* @return info command
*/
static const std::string& info() { CONSTRUCT_ON_FIRST_USE(std::string, "info"); }

/**
* @return commands which alters the state of redis
*/
Expand Down
182 changes: 182 additions & 0 deletions source/extensions/filters/network/common/redis/utility.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "source/extensions/filters/network/common/redis/utility.h"

#include "source/common/common/utility.h"
#include "absl/strings/str_split.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -83,6 +84,187 @@ const SetRequest& SetRequest::instance() {
static const SetRequest* instance = new SetRequest{};
return *instance;
}

void CommonInfoCmdResponseAggregator(const std::string& infokey,const std::string& infovalue,InfoCmdResponseProcessor::infoCmdResponseDecoder& infoCmdObj){

// Handle unused variable warning
(void)infokey;
switch (infoCmdObj.aggregate_type) {
case staticvalue:
infoCmdObj.strvalue = infovalue;
break;
case sumvalues:
infoCmdObj.intvalue += std::stoull(infovalue);
infoCmdObj.strvalue = std::to_string(infoCmdObj.intvalue);
break;
case highestvalue:
if (std::stoull(infovalue) > infoCmdObj.intvalue) {
infoCmdObj.intvalue = std::stoull(infovalue);
infoCmdObj.strvalue = infovalue;
}
break;
case proxymetrics:
infoCmdObj.processor->updateInfoResponseWithProxyMetrics(infokey,infoCmdObj);
break;
default:
break;
}

return;
}

void getkeyspacestats(const std::string& value, std::uint64_t& keys, std::uint64_t& expires, std::uint64_t& avg_ttl) {
std::vector<std::string> keyvaluepairs = absl::StrSplit(value, ',');

for (const auto& keyvaluepair : keyvaluepairs) {
std::vector<std::string> keyvalue = absl::StrSplit(keyvaluepair, '=');
if (keyvalue.size() == 2) {
if (keyvalue[0] == "keys") {
keys = std::stoull(keyvalue[1]);
} else if (keyvalue[0] == "expires") {
expires = std::stoull(keyvalue[1]);
} else if (keyvalue[0] == "avg_ttl") {
avg_ttl = std::stoull(keyvalue[1]);
}
}
}
}

void InfoResponseAggrKeyspace(const std::string& infokey, const std::string& infovalue,InfoCmdResponseProcessor::infoCmdResponseDecoder& infoCmdObj){

std::uint64_t currkeys,oldkeys = 0;
std::uint64_t currexpires,oldexpires = 0;
std::uint64_t curravg_ttl,oldavg_ttl = 0;

if (infoCmdObj.strvalue != ""){
getkeyspacestats(infoCmdObj.strvalue, oldkeys, oldexpires, oldavg_ttl);
}

if (infokey == "db0") {
getkeyspacestats(infovalue, currkeys, currexpires, curravg_ttl);
infoCmdObj.strvalue = "keys=" + std::to_string(currkeys + oldkeys) + ",expires=" + std::to_string(currexpires + oldexpires) + ",avg_ttl=" + std::to_string((curravg_ttl + oldavg_ttl) / 2);
}

return;

}
void InfoCmdResponseProcessor::updateInfoResponseWithProxyMetrics(const std::string& infokey,InfoCmdResponseProcessor::infoCmdResponseDecoder& infoCmdObj){

if (infokey == "total_connections_received"){
auto cxtotal = infoCmdObj.processor->getDownstreamMetrics().downstream_cx_total_;
infoCmdObj.strvalue = std::to_string(cxtotal);
}
return;

}


std::string InfoCmdResponseProcessor::getInfoCmdResponseString(){

//generate info response string from the inforesponsetemplate_ vector
std::string prevInfoCategory="";
std::string infoResponseString;
for (const auto& entry : inforesponsetemplate_) {

if (prevInfoCategory != entry.infocategory) {
infoResponseString += entry.infocategory + "\n";
prevInfoCategory = entry.infocategory;
}
if (entry.strvalue != ""){
infoResponseString += entry.infokey + ":" + entry.strvalue + "\n";
}
}
return infoResponseString;

}

InfoCmdResponseProcessor::InfoCmdResponseProcessor(DownStreamMetrics& downstream_metrics) : downstream_metrics_(downstream_metrics){

//Initialize the info response template
inforesponsetemplate_ = {
{"# Server","redis_version", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","redis_git_sha1", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","redis_git_dirty", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","redis_build_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","redis_mode", hardcodedvalue, "standalone", 0, nullptr,nullptr},
{"# Server","os", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","arch_bits", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","multiplexing_api", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","process_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","run_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
//{"# Server","tcp_port", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, //Need to find a way to get the TCP port from listener , currently its not exposed
{"# Server","uptime_in_seconds", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","uptime_in_days", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","hz", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","lru_clock", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Server","config_file", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Memory","used_memory", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Memory","used_memory_peak", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Memory","used_memory_rss", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Memory","used_memory_lua", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Memory","maxmemory", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Memory","maxmemory_policy", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Memory","mem_fragmentation_ratio", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Memory","mem_allocator", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Persistence","loading", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Persistence","rdb_changes_since_last_save", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Persistence","rdb_bgsave_in_progress", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Persistence","rdb_last_save_time", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Persistence","rdb_last_bgsave_status", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Persistence","rdb_last_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Persistence","rdb_current_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","total_connections_received", proxymetrics, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","total_commands_processed", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","instantaneous_ops_per_sec", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","total_net_input_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","total_net_output_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","instantaneous_input_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","instantaneous_output_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","rejected_connections", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","sync_full", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","sync_partial_ok", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","sync_partial_err", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","expired_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","evicted_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","keyspace_hits", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","keyspace_misses", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","pubsub_channels", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","pubsub_patterns", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Stats","latest_fork_usec", highestvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Replication","role", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Replication","connected_slaves", staticvalue, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# CPU","used_cpu_sys", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# CPU","used_cpu_user", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# CPU","used_cpu_sys_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# CPU","used_cpu_user_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator,nullptr},
{"# Cluster","cluster_enabled", hardcodedvalue, "0", 0, nullptr,nullptr},
{"# Keyspace","db0",customizevalue, "", 0, InfoResponseAggrKeyspace,nullptr}
};
// Populate the converters_ hash map with indices of the inforesponsetemplate_ vector
for (size_t i = 0; i < inforesponsetemplate_.size(); ++i) {
converters_[inforesponsetemplate_[i].infokey] = i;
inforesponsetemplate_[i].processor = this;
}
}

InfoCmdResponseProcessor::~InfoCmdResponseProcessor() {
}

void InfoCmdResponseProcessor::processInfoCmdResponse(const std::string& key, const std::string& value) {
auto it = converters_.find(key);
if (it != converters_.end()) {
// Use the found index to access the corresponding entry in inforesponsetemplate_
infoCmdResponseDecoder& entry = inforesponsetemplate_[it->second];

if (entry.customizer != nullptr) {
entry.customizer(key, value, entry);
//ENVOY_LOG(debug, "infokey: '{}' infovalue: '{}'", inforesponsetemplate_[it->second].infokey, inforesponsetemplate_[it->second].strvalue);
}

}
return;
}

} // namespace Utility
} // namespace Redis
} // namespace Common
Expand Down
Loading

0 comments on commit 2bc63a3

Please sign in to comment.