Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

info command initial commit #17

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/extensions/filters/network/common/redis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ envoy_cc_library(
name = "client_interface",
hdrs = ["client.h"],
deps = [
":utility_interface",
":codec_lib",
":redis_command_stats_lib",
"//envoy/upstream:cluster_manager_interface",
Expand Down
15 changes: 9 additions & 6 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "source/extensions/filters/network/common/redis/codec_impl.h"
#include "source/extensions/filters/network/common/redis/redis_command_stats.h"
#include "source/extensions/filters/network/common/redis/utility.h"


namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -58,8 +60,9 @@ class ClientCallbacks {
class DirectCallbacks {
public:
virtual ~DirectCallbacks() = default;
virtual void onDirectResponse(Common::Redis::RespValuePtr&& value) PURE;
virtual void sendResponseDownstream(Common::Redis::RespValuePtr&& value) PURE;
virtual void onFailure() PURE;
virtual std::unique_ptr<Envoy::Extensions::NetworkFilters::Common::Redis::Utility::DownStreamMetrics> getDownStreamMetrics() PURE;

};

Expand Down Expand Up @@ -278,12 +281,12 @@ struct Transaction {
}
should_close_ = false;
}
void setPubsubCallback(std::shared_ptr<DirectCallbacks> callback) {
pubsub_cb_ = callback;
void setDownstreamCallback(std::shared_ptr<DirectCallbacks> callback) {
downstream_cb_ = callback;
}

std::shared_ptr<DirectCallbacks> getPubsubCallback() {
return pubsub_cb_;
std::shared_ptr<DirectCallbacks> getDownstreamCallback() {
return downstream_cb_;
}
bool active_{false};
bool connection_established_{false};
Expand All @@ -298,7 +301,7 @@ struct Transaction {
// the mirroring policies.
std::vector<ClientPtr> clients_;
Network::ConnectionCallbacks* connection_cb_;
std::shared_ptr<DirectCallbacks> pubsub_cb_=nullptr;
std::shared_ptr<DirectCallbacks> downstream_cb_=nullptr;

// This index represents the current client on which traffic is being sent to.
// When sending to the main redis server it will be 0, and when sending to one of
Expand Down
16 changes: 8 additions & 8 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis
host->stats().cx_active_.inc();
connect_or_op_timer_->enableTimer(host->cluster().connectTimeout());
if (is_pubsub_client_){
pubsub_cb_ = std::move(drcb);
downstream_cb_ = std::move(drcb);
}
}

Expand All @@ -101,12 +101,12 @@ ClientImpl::~ClientImpl() {
ASSERT(connection_->state() == Network::Connection::State::Closed);
host_->cluster().trafficStats()->upstream_cx_active_.dec();
host_->stats().cx_active_.dec();
pubsub_cb_.reset();
downstream_cb_.reset();

}

void ClientImpl::close() {
pubsub_cb_.reset();
downstream_cb_.reset();
if (connection_) {
connection_->close(Network::ConnectionCloseType::NoFlush);
}
Expand Down Expand Up @@ -208,10 +208,10 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) {
if (pending_requests_.empty() && is_pubsub_client_) {
host_->cluster().trafficStats()->upstream_cx_destroy_with_active_rq_.inc();
ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}'",eventTypeStr);
if ((pubsub_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){
if ((downstream_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){
ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it");
pubsub_cb_->onFailure();
pubsub_cb_.reset();
downstream_cb_->onFailure();
downstream_cb_.reset();
}
}

Expand Down Expand Up @@ -246,8 +246,8 @@ void ClientImpl::onRespValue(RespValuePtr&& value) {
if (pending_requests_.empty() && is_pubsub_client_) {
// This is a pubsub client, and we have received a message from the server.
// We need to pass this message to the registered callback.
if (pubsub_cb_ != nullptr){
pubsub_cb_->onDirectResponse(std::move(value));
if (downstream_cb_ != nullptr){
downstream_cb_->sendResponseDownstream(std::move(value));
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
bool is_transaction_client_;
bool is_pubsub_client_=false;
bool is_blocking_client_=false;
std::shared_ptr<Extensions::NetworkFilters::Common::Redis::Client::DirectCallbacks> pubsub_cb_=nullptr;
std::shared_ptr<Extensions::NetworkFilters::Common::Redis::Client::DirectCallbacks> downstream_cb_=nullptr;
};

class ClientFactoryImpl : public ClientFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ struct SupportedCommands {
* @return commands which handle Redis commands without keys.
*/
static const absl::flat_hash_set<std::string>& adminNokeyCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall","publish","pubsub", "keys", "slowlog", "config","client");
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall","publish","pubsub", "keys", "slowlog", "config","client","info");
}
/**
* @return commands which handle Redis commands without keys.
*/
static const absl::flat_hash_set<std::string>& allShardCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall", "pubsub", "keys", "slowlog", "config","client");
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall", "pubsub", "keys", "slowlog", "config","client","info");
}

/**
Expand Down Expand Up @@ -137,6 +137,11 @@ struct SupportedCommands {
*/
static const std::string& exit() { CONSTRUCT_ON_FIRST_USE(std::string, "exit"); }

/**
* @return info command
*/
static const std::string& info() { CONSTRUCT_ON_FIRST_USE(std::string, "info"); }

/**
* @return commands which alters the state of redis
*/
Expand Down
171 changes: 171 additions & 0 deletions source/extensions/filters/network/common/redis/utility.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "source/extensions/filters/network/common/redis/utility.h"

#include "source/common/common/utility.h"
#include "absl/strings/str_split.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -83,6 +84,176 @@ const SetRequest& SetRequest::instance() {
static const SetRequest* instance = new SetRequest{};
return *instance;
}

void CommonInfoCmdResponseAggregator(const std::string& infokey,const std::string& infovalue,InfoCmdResponseProcessor::infoCmdResponseDecoder& infoCmdObj){

// Handle unused variable warning
(void)infokey;
switch (infoCmdObj.aggregate_type) {
case staticvalue:
infoCmdObj.strvalue = infovalue;
break;
case sumvalues:
infoCmdObj.intvalue += std::stoull(infovalue);
infoCmdObj.strvalue = std::to_string(infoCmdObj.intvalue);
break;
case highestvalue:
if (std::stoull(infovalue) > infoCmdObj.intvalue) {
infoCmdObj.intvalue = std::stoull(infovalue);
infoCmdObj.strvalue = infovalue;
}
break;
default:
break;
}

return;
}

void getkeyspacestats(const std::string& value, std::uint64_t& keys, std::uint64_t& expires, std::uint64_t& avg_ttl) {
std::vector<std::string> keyvaluepairs = absl::StrSplit(value, ',');

for (const auto& keyvaluepair : keyvaluepairs) {
std::vector<std::string> keyvalue = absl::StrSplit(keyvaluepair, '=');
if (keyvalue.size() == 2) {
if (keyvalue[0] == "keys") {
keys = std::stoull(keyvalue[1]);
} else if (keyvalue[0] == "expires") {
expires = std::stoull(keyvalue[1]);
} else if (keyvalue[0] == "avg_ttl") {
avg_ttl = std::stoull(keyvalue[1]);
}
}
}
}

void InfoResponseAggrKeyspace(const std::string& infokey, const std::string& infovalue,InfoCmdResponseProcessor::infoCmdResponseDecoder& infoCmdObj){

std::uint64_t currkeys,oldkeys = 0;
std::uint64_t currexpires,oldexpires = 0;
std::uint64_t curravg_ttl,oldavg_ttl = 0;

if (infoCmdObj.strvalue != ""){
getkeyspacestats(infoCmdObj.strvalue, oldkeys, oldexpires, oldavg_ttl);
}

if (infokey == "db0") {
getkeyspacestats(infovalue, currkeys, currexpires, curravg_ttl);
infoCmdObj.strvalue = "keys=" + std::to_string(currkeys + oldkeys) + ",expires=" + std::to_string(currexpires + oldexpires) + ",avg_ttl=" + std::to_string((curravg_ttl + oldavg_ttl) / 2);
}

return;

}
void InfoCmdResponseProcessor::updateInfoCmdResponseString(const std::string& infokey, const std::string& infovalue) {
processInfoCmdResponse(infokey, infovalue);
}

std::string InfoCmdResponseProcessor::getInfoCmdResponseString(){

//generate info response string from the inforesponsetemplate_ vector
std::string prevInfoCategory="";
std::string infoResponseString;
for (const auto& entry : inforesponsetemplate_) {

if (prevInfoCategory != entry.infocategory) {
infoResponseString += entry.infocategory + "\n";
prevInfoCategory = entry.infocategory;
}
if (entry.strvalue != ""){
infoResponseString += entry.infokey + ":" + entry.strvalue + "\n";
}
}
return infoResponseString;

}

InfoCmdResponseProcessor::InfoCmdResponseProcessor() {

//Initialize the info response template
inforesponsetemplate_ = {
{"# Server","redis_version", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","redis_git_sha1", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","redis_git_dirty", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","redis_build_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","redis_mode", hardcodedvalue, "standalone", 0, nullptr},
{"# Server","os", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","arch_bits", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","multiplexing_api", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","process_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","run_id", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
//{"# Server","tcp_port", staticvalue, "", 0, CommonInfoCmdResponseAggregator}, //Need to find a way to get the TCP port from listener , currently its not exposed
{"# Server","uptime_in_seconds", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","uptime_in_days", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","hz", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","lru_clock", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Server","config_file", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Memory","used_memory", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Memory","used_memory_peak", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Memory","used_memory_rss", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Memory","used_memory_lua", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Memory","maxmemory", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Memory","maxmemory_policy", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Memory","mem_fragmentation_ratio", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Memory","mem_allocator", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Persistence","loading", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Persistence","rdb_changes_since_last_save", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Persistence","rdb_bgsave_in_progress", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Persistence","rdb_last_save_time", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Persistence","rdb_last_bgsave_status", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Persistence","rdb_last_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Persistence","rdb_current_bgsave_time_sec", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","total_connections_received", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","total_commands_processed", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","instantaneous_ops_per_sec", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","total_net_input_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","total_net_output_bytes", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","instantaneous_input_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","instantaneous_output_kbps", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","rejected_connections", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","sync_full", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","sync_partial_ok", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","sync_partial_err", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","expired_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","evicted_keys", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","keyspace_hits", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","keyspace_misses", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","pubsub_channels", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","pubsub_patterns", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Stats","latest_fork_usec", highestvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Replication","role", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# Replication","connected_slaves", staticvalue, "", 0, CommonInfoCmdResponseAggregator},
{"# CPU","used_cpu_sys", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# CPU","used_cpu_user", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# CPU","used_cpu_sys_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# CPU","used_cpu_user_children", sumvalues, "", 0, CommonInfoCmdResponseAggregator},
{"# Cluster","cluster_enabled", hardcodedvalue, "0", 0, nullptr},
{"# Keyspace","db0",customizevalue, "", 0, InfoResponseAggrKeyspace}
};
// Populate the converters_ hash map with indices of the inforesponsetemplate_ vector
for (size_t i = 0; i < inforesponsetemplate_.size(); ++i) {
converters_[inforesponsetemplate_[i].infokey] = i;
}
}

InfoCmdResponseProcessor::~InfoCmdResponseProcessor() {
}

void InfoCmdResponseProcessor::processInfoCmdResponse(const std::string& key, const std::string& value) {
auto it = converters_.find(key);
if (it != converters_.end()) {
// Use the found index to access the corresponding entry in inforesponsetemplate_
infoCmdResponseDecoder& entry = inforesponsetemplate_[it->second];

if (entry.customizer != nullptr) {
entry.customizer(key, value, entry);
//ENVOY_LOG(debug, "infokey: '{}' infovalue: '{}'", inforesponsetemplate_[it->second].infokey, inforesponsetemplate_[it->second].strvalue);
}

}
return;
}

} // namespace Utility
} // namespace Redis
} // namespace Common
Expand Down
48 changes: 48 additions & 0 deletions source/extensions/filters/network/common/redis/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <string>

#include "source/extensions/filters/network/common/redis/codec.h"
#include "source/common/common/logger.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -11,6 +12,53 @@ namespace Common {
namespace Redis {
namespace Utility {

struct DownStreamMetrics {
uint64_t downstream_rq_total_{};
uint64_t downstream_cx_drain_close_{};
uint64_t downstream_cx_protocol_error_{};
uint64_t downstream_cx_rx_bytes_total_{};
uint64_t downstream_cx_total_{};
uint64_t downstream_cx_tx_bytes_total_{};
uint64_t downstream_cx_active_{};
uint64_t downstream_cx_rx_bytes_buffered_{};
uint64_t downstream_cx_tx_bytes_buffered_{};
uint64_t downstream_rq_active_{};
};


enum InforesponseAggregatorType {
staticvalue,
sumvalues,
highestvalue,
customizevalue,
hardcodedvalue
};


class InfoCmdResponseProcessor:public Logger::Loggable<Logger::Id::redis> {
public:

InfoCmdResponseProcessor();
~InfoCmdResponseProcessor();

void processInfoCmdResponse(const std::string& infokey, const std::string& infovalue);
std::string getInfoCmdResponseString();
void updateInfoCmdResponseString(const std::string& infokey, const std::string& infovalue);

struct infoCmdResponseDecoder {
std::string infocategory;
std::string infokey;
enum InforesponseAggregatorType aggregate_type;
std::string strvalue;
std::uint64_t intvalue;
void (*customizer)(const std::string& infokey, const std::string& infovalue,infoCmdResponseDecoder& infoObj);
};
private:

std::vector<infoCmdResponseDecoder> inforesponsetemplate_;
std::unordered_map<std::string, size_t> converters_;
};

class AuthRequest : public Redis::RespValue {
public:
AuthRequest(const std::string& username, const std::string& password);
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/network/redis_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ envoy_cc_library(
"//source/common/common:assert_lib",
"//source/common/config:datasource_lib",
"//source/common/config:utility_lib",
"//source/extensions/filters/network/common/redis:utility_lib",
"//source/extensions/common/dynamic_forward_proxy:dns_cache_interface",
"//source/extensions/filters/network/common/redis:codec_interface",
"@envoy_api//envoy/extensions/filters/network/redis_proxy/v3:pkg_cc_proto",
Expand Down
Loading