Skip to content

Commit

Permalink
Fixes for stream support , memeory leaks , crash fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dinesh-murugiah committed Aug 15, 2024
1 parent 630672b commit 646275b
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 135 deletions.
28 changes: 24 additions & 4 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis
time_source_(dispatcher.timeSource()), redis_command_stats_(redis_command_stats),
scope_(scope), is_transaction_client_(is_transaction_client), is_pubsub_client_(is_pubsub_client), is_blocking_client_(is_blocking_client) {

ENVOY_LOG(debug,"ClientImpl Constructor creating client of type: is_transaction_client: {}, is_pubsub_client: {}, is_blocking_client: {}", is_transaction_client_, is_pubsub_client_, is_blocking_client_);
Upstream::ClusterTrafficStats& traffic_stats = *host->cluster().trafficStats();
traffic_stats.upstream_cx_total_.inc();
host->stats().cx_total_.inc();
Expand All @@ -96,6 +95,17 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis
if (is_pubsub_client_){
pubsub_cb_ = std::move(pubsubcb);
}
if (!is_transaction_client_ && !is_pubsub_client_ && !is_blocking_client_){
ENVOY_LOG(debug, "Upstream Client created of type ThreadLocal Active Client");
}else if (is_transaction_client_){
ENVOY_LOG(debug, "Upstream Client created of type Transaction Client");
}else if (is_pubsub_client_){
ENVOY_LOG(debug, "Upstream Client created of type Pubsub Client");
}else if (is_blocking_client_){
ENVOY_LOG(debug, "Upstream Client created of type Blocking Client");
}else{
ENVOY_LOG(error, "Upstream Client created of type Unknown Client");
}
}

ClientImpl::~ClientImpl() {
Expand All @@ -104,7 +114,7 @@ ClientImpl::~ClientImpl() {
host_->cluster().trafficStats()->upstream_cx_active_.dec();
host_->stats().cx_active_.dec();
pubsub_cb_.reset();

ENVOY_LOG(debug, "Upstream Client destroyed");
}

void ClientImpl::close() {
Expand Down Expand Up @@ -202,7 +212,6 @@ bool ClientImpl::makePubSubRequest(const RespValue& request) {

void ClientImpl::onConnectOrOpTimeout() {

ENVOY_LOG(debug, "Upstream Client Connection or Operation timeout occurred, is blocking client: {}", is_blocking_client_);
putOutlierEvent(Upstream::Outlier::Result::LocalOriginTimeout);
if (connected_) {
host_->cluster().trafficStats()->upstream_rq_timeout_.inc();
Expand All @@ -217,6 +226,17 @@ void ClientImpl::onConnectOrOpTimeout() {
} else {
ENVOY_LOG(debug, "Ignoring timeout for connection close for Blocking clients!");
}
if (!is_transaction_client_ && !is_pubsub_client_ && !is_blocking_client_){
ENVOY_LOG(debug, "Upstream Client onConnectOrOpTimeout for ThreadLocal Active Client");
}else if (is_transaction_client_){
ENVOY_LOG(debug, "Upstream Client onConnectOrOpTimeout for Transaction Client");
}else if (is_pubsub_client_){
ENVOY_LOG(debug, "Upstream Client onConnectOrOpTimeout for Pubsub Client");
}else if (is_blocking_client_){
ENVOY_LOG(debug, "Upstream Client onConnectOrOpTimeout for Blocking Client");
}else{
ENVOY_LOG(error, "Upstream Client onConnectOrOpTimeout for Unknown Client");
}
}

void ClientImpl::onData(Buffer::Instance& data) {
Expand Down Expand Up @@ -332,7 +352,7 @@ void ClientImpl::onRespValue(RespValuePtr&& value) {
pending_requests_.pop_front();
if (canceled) {
host_->cluster().trafficStats()->upstream_rq_cancelled_.inc();
} else if (config_.enableRedirection() && (!is_blocking_client_ || !is_transaction_client_) &&
} else if (config_.enableRedirection() && (!is_blocking_client_ && !is_transaction_client_ && !is_pubsub_client_) &&
(value->type() == Common::Redis::RespType::Error)) {
std::vector<absl::string_view> err = StringUtil::splitToken(value->asString(), " ", false);
if (err.size() == 3 &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ struct SupportedCommands {
"zrangebylex", "zrangebyscore", "zrank", "zrem", "zremrangebylex", "zremrangebyrank",
"zremrangebyscore", "zrevrange", "zrevrangebylex", "zrevrangebyscore", "zrevrank", "zscan",
"zscore", "rpoplpush", "smove", "sunion", "sdiff", "sinter", "sinterstore", "zunionstore",
"zinterstore", "pfmerge", "georadius", "georadiusbymember", "xadd", "xlen", "xdel", "xtrim",
"xrange", "xrevrange", "rename", "getex", "sort", "zmscore", "sdiffstore", "msetnx", "substr",
"zrangestore", "zunion", "echo", "zdiff", "xautoclaim", "xinfo", "sunionstore", "smismember",
"zinterstore", "pfmerge", "georadius", "georadiusbymember", "rename", "getex", "sort", "zmscore", "sdiffstore", "msetnx", "substr",
"zrangestore", "zunion", "echo", "zdiff", "sunionstore", "smismember",
"hrandfield", "geosearchstore", "zdiffstore", "geosearch", "randomkey", "zinter", "zrandmember",
"bitop", "xclaim", "lpos", "renamenx", "xgroup","xreadnonblock");
"bitop", "lpos", "renamenx","xread_simple_command");
}

/**
Expand Down Expand Up @@ -93,7 +92,7 @@ struct SupportedCommands {
* @return commands that are called blocking commands but not pubsub commands.
*/
static const absl::flat_hash_set<std::string>& blockingCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "blpop", "brpop", "brpoplpush", "bzpopmax", "bzpopmin", "xreadblock", "xreadgroup", "blmove");
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "blpop", "brpop", "brpoplpush", "bzpopmax", "bzpopmin", "xread_blocking_command", "blmove");
}

/**
Expand Down Expand Up @@ -124,6 +123,20 @@ struct SupportedCommands {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "script", "flushall", "flushdb", "pubsub", "keys", "slowlog", "config", "client", "info", "select", "unwatch");
}

/**
* @return commands which handle Redis Streams.
*/
static const absl::flat_hash_set<std::string>& streamCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "xack", "xadd", "xautoclaim", "xclaim", "xdel", "xgroup", "xinfo", "xlen", "xpending", "xrange", "xread","xreadgroup","xrevrange","xtrim");
}

/**
* @return List of stream commands which can be configured in blocking mode.
*/
static const absl::flat_hash_set<std::string>& streamBlockingCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "xread","xreadgroup");
}

/**
* @return scan command
*/
Expand Down Expand Up @@ -174,10 +187,6 @@ struct SupportedCommands {
*/
static const std::string& info() { CONSTRUCT_ON_FIRST_USE(std::string, "info"); }

/**
* @return special stream commands
*/
static const std::string& spl_strm_commands() { CONSTRUCT_ON_FIRST_USE(std::string, "xread"); }
/**
* @return commands which alters the state of redis
*/
Expand Down
Loading

0 comments on commit 646275b

Please sign in to comment.