Skip to content

Commit

Permalink
fixes for xread and memleak on quit
Browse files Browse the repository at this point in the history
  • Loading branch information
dinesh-murugiah committed Aug 11, 2024
1 parent f5cfa34 commit 630672b
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 25 deletions.
8 changes: 8 additions & 0 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis
flush_timer_(dispatcher.createTimer([this]() { flushBufferAndResetTimer(); })),
time_source_(dispatcher.timeSource()), redis_command_stats_(redis_command_stats),
scope_(scope), is_transaction_client_(is_transaction_client), is_pubsub_client_(is_pubsub_client), is_blocking_client_(is_blocking_client) {

ENVOY_LOG(debug,"ClientImpl Constructor creating client of type: is_transaction_client: {}, is_pubsub_client: {}, is_blocking_client: {}", is_transaction_client_, is_pubsub_client_, is_blocking_client_);
Upstream::ClusterTrafficStats& traffic_stats = *host->cluster().trafficStats();
traffic_stats.upstream_cx_total_.inc();
host->stats().cx_total_.inc();
Expand All @@ -107,6 +109,7 @@ ClientImpl::~ClientImpl() {

void ClientImpl::close() {
pubsub_cb_.reset();
ENVOY_LOG(debug, "Upstream Client Connection close requested");
if (connection_) {
connection_->close(Network::ConnectionCloseType::NoFlush);
}
Expand Down Expand Up @@ -198,6 +201,8 @@ bool ClientImpl::makePubSubRequest(const RespValue& request) {


void ClientImpl::onConnectOrOpTimeout() {

ENVOY_LOG(debug, "Upstream Client Connection or Operation timeout occurred, is blocking client: {}", is_blocking_client_);
putOutlierEvent(Upstream::Outlier::Result::LocalOriginTimeout);
if (connected_) {
host_->cluster().trafficStats()->upstream_rq_timeout_.inc();
Expand All @@ -220,6 +225,7 @@ void ClientImpl::onData(Buffer::Instance& data) {
putOutlierEvent(Upstream::Outlier::Result::ExtOriginRequestFailed);
host_->cluster().trafficStats()->upstream_cx_protocol_error_.inc();
host_->stats().rq_error_.inc();
ENVOY_LOG(debug, "Upstream Client Protocol error occurred");
connection_->close(Network::ConnectionCloseType::NoFlush);
}
}
Expand Down Expand Up @@ -263,7 +269,9 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) {
//handle non blocking and non transaction requests
while (!pending_requests_.empty()) {
PendingRequest& request = pending_requests_.front();
ENVOY_LOG(debug,"Upstream Client Connection close ");
if (!request.canceled_) {
ENVOY_LOG(debug,"Upstream Client Connection close calling onFailure");
request.callbacks_.onFailure();
} else {
host_->cluster().trafficStats()->upstream_rq_cancelled_.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct SupportedCommands {
"xrange", "xrevrange", "rename", "getex", "sort", "zmscore", "sdiffstore", "msetnx", "substr",
"zrangestore", "zunion", "echo", "zdiff", "xautoclaim", "xinfo", "sunionstore", "smismember",
"hrandfield", "geosearchstore", "zdiffstore", "geosearch", "randomkey", "zinter", "zrandmember",
"bitop", "xclaim", "lpos", "renamenx", "xgroup");
"bitop", "xclaim", "lpos", "renamenx", "xgroup","xreadnonblock");
}

/**
Expand Down Expand Up @@ -93,7 +93,7 @@ struct SupportedCommands {
* @return commands that are called blocking commands but not pubsub commands.
*/
static const absl::flat_hash_set<std::string>& blockingCommands() {
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "blpop", "brpop", "brpoplpush", "bzpopmax", "bzpopmin", "xread", "xreadgroup", "blmove");
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "blpop", "brpop", "brpoplpush", "bzpopmax", "bzpopmin", "xreadblock", "xreadgroup", "blmove");
}

/**
Expand Down Expand Up @@ -174,6 +174,10 @@ struct SupportedCommands {
*/
static const std::string& info() { CONSTRUCT_ON_FIRST_USE(std::string, "info"); }

/**
* @return special stream commands
*/
static const std::string& spl_strm_commands() { CONSTRUCT_ON_FIRST_USE(std::string, "xread"); }
/**
* @return commands which alters the state of redis
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ int32_t getShardIndex(const std::string command, int32_t requestsCount,int32_t r

bool isBlockingCommand = Common::Redis::SupportedCommands::blockingCommands().contains(command);
bool isAllShardCommand = Common::Redis::SupportedCommands::allShardCommands().contains(command);
bool isXreadBlockingCommand = (command == "xread" || command == "xreadgroup");

if (!isBlockingCommand && !isAllShardCommand && requestsCount == 1 ){
if (!isBlockingCommand && !isAllShardCommand && requestsCount == 1 && !isXreadBlockingCommand){
// Send request to a random shard so that we donot allways send to the same shard
shard_index = rand() % redisShardsCount;
}
Expand Down Expand Up @@ -220,7 +221,9 @@ void SingleServerRequest::onFailure() { onFailure(Response::get().UpstreamFailur
void SingleServerRequest::onFailure(std::string error_msg) {
handle_ = nullptr;
updateStats(false);
ENVOY_LOG(debug,"mode of clients is Transaction : '{}', PubSub: '{}', Blocking: '{}'",callbacks_.transaction().isTransactionMode(),callbacks_.transaction().isSubscribedMode(),callbacks_.transaction().isBlockingCommand());
callbacks_.transaction().should_close_ = true;
ENVOY_LOG(debug, "onFailure error: {},closing transaction also", error_msg);
callbacks_.onResponse(Common::Redis::Utility::makeError(error_msg));
}

Expand Down Expand Up @@ -266,13 +269,38 @@ SplitRequestPtr SimpleRequest::create(Router& router,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool delay_command_latency,
const StreamInfo::StreamInfo& stream_info) {
std::string command_name = absl::AsciiStrToLower(incoming_request->asArray()[0].asString());
std::string key ="";
std::unique_ptr<SimpleRequest> request_ptr{
new SimpleRequest(callbacks, command_stats, time_source, delay_command_latency)};

const auto route = router.upstreamPool(incoming_request->asArray()[1].asString(), stream_info);
if (route) {
Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request);
if (command_name == "xread"){
int32_t index = 0;
int32_t count = base_request->asArray().size();
while (count > 0) {
if (absl::AsciiStrToLower(base_request->asArray()[index].asString())== "streams") {
index++;
key = base_request->asArray()[index].asString();
break;
}
index++;
count--;
}
if (key.empty()) {
ENVOY_LOG(debug, "unexpected command : '{}'", base_request->toString());
callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format("unexpected command format")));
return nullptr;
}

}else {
key = base_request->asArray()[1].asString();
}

request_ptr->handle_ = makeSingleServerRequest(
route, base_request->asArray()[0].asString(), base_request->asArray()[1].asString(),
route, base_request->asArray()[0].asString(), key,
base_request, *request_ptr, callbacks.transaction());
} else {
ENVOY_LOG(debug, "route not found: '{}'", incoming_request->toString());
Expand Down Expand Up @@ -782,12 +810,34 @@ SplitRequestPtr BlockingClientRequest::create(Router& router, Common::Redis::Res
// For blocking requests which operate on a single key, we can hash the key to a single
//must send shard index as negative to indicate that it is a blocking request that acts on key
std::string command_name = absl::AsciiStrToLower(incoming_request->asArray()[0].asString());
std::string key ="";
int32_t shard_index=getShardIndex(command_name,1,1);
Common::Redis::Client::Transaction& transaction = callbacks.transaction();

std::unique_ptr<BlockingClientRequest> request_ptr{
new BlockingClientRequest(callbacks, command_stats, time_source, delay_command_latency)};
std::string key = absl::AsciiStrToLower(incoming_request->asArray()[1].asString());
if (command_name == "xread"){
int32_t index = 0;
int32_t count = incoming_request->asArray().size();
while (count > 0) {
if (absl::AsciiStrToLower(incoming_request->asArray()[index].asString())== "streams") {
index++;
key = incoming_request->asArray()[index].asString();
break;
}
index++;
count--;
}
if (key.empty()) {
ENVOY_LOG(debug, "unexpected command : '{}'", incoming_request->toString());
callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format("unexpected command format")));
return nullptr;
}

}else {
key = incoming_request->asArray()[1].asString();
}


if (transaction.active_ ){
// when we are in blocking command, we cannnot accept any other commands
Expand All @@ -802,7 +852,7 @@ SplitRequestPtr BlockingClientRequest::create(Router& router, Common::Redis::Res
return nullptr;
}
}else {
if (Common::Redis::SupportedCommands::blockingCommands().contains(command_name)){
if (Common::Redis::SupportedCommands::blockingCommands().contains(command_name) || command_name == "xread") {
transaction.clients_.resize(1);
transaction.setBlockingCommand();
transaction.start();
Expand All @@ -814,6 +864,7 @@ SplitRequestPtr BlockingClientRequest::create(Router& router, Common::Redis::Res
}
const auto route = router.upstreamPool(incoming_request->asArray()[1].asString(), stream_info);
if (route) {
ENVOY_LOG(debug, "key: for sharding '{}'", key);
Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request);
request_ptr->handle_ = makeBlockingRequest(
route,shard_index,key,base_request, *request_ptr, callbacks.transaction());
Expand Down Expand Up @@ -1826,7 +1877,7 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,

// Get the handler for the downstream request
auto handler = handler_lookup_table_.find(command_name.c_str());
if (handler == nullptr && !callbacks.transaction().isSubscribedMode()) {
if (handler == nullptr && !callbacks.transaction().isSubscribedMode() && command_name!=Common::Redis::SupportedCommands::spl_strm_commands()) {
stats_.unsupported_command_.inc();
ENVOY_LOG(debug, "unsupported command '{}'", request->asArray()[0].asString());
callbacks.onResponse(Common::Redis::Utility::makeError(
Expand All @@ -1845,6 +1896,18 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,
handler = handler_lookup_table_.find("subscribe");
}

//If the command is xread we need to check if its a blocking command or not
if (command_name == "xread") {
if (((request->asArray().size() > 1) && (absl::AsciiStrToLower(request->asArray()[1].asString()) == "block")) ||
((request->asArray().size() > 3) && (absl::AsciiStrToLower(request->asArray()[3].asString()) == "block"))) {
handler = handler_lookup_table_.find("xreadblock");
} else {
handler = handler_lookup_table_.find("xreadnonblock");
}

}


// Fault Injection Check
const Common::Redis::Fault* fault_ptr = fault_manager_->getFaultForCommand(command_name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ void InstanceImpl::ThreadLocalPool::onHostsRemoved(
}
} else {
// There are no pending requests so close the connection.
ENVOY_LOG(debug," onHostsRemoved Closing redis client for host:'{}'",host->address()->asString());
it->second->redis_client_->close();
}
}
Expand Down Expand Up @@ -415,20 +416,27 @@ InstanceImpl::ThreadLocalPool::makeBlockingClientRequest(int32_t shard_index, co
uint32_t client_idx = 0;
if (transaction.active_) {
client_idx = transaction.current_client_idx_;
ENVOY_LOG(debug, " client creation in makeBlockingClientRequest client_idx:{}",client_idx);
if (!transaction.connection_established_ && transaction.isSubscribedMode()) {
ENVOY_LOG(debug, "PubSub command client creation in makeBlockingClientRequest");
transaction.clients_[client_idx] =
client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_),
auth_username_, auth_password_, false,true,false,transaction.getPubSubCallback());
if (transaction.connection_cb_) {
transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_);
}
}else if (!transaction.connection_established_ && transaction.isBlockingCommand()) {
ENVOY_LOG(debug, "Blocking command client creation in makeBlockingClientRequest for host:'{}'",host->address()->asString());
transaction.clients_[client_idx] =
client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_),
auth_username_, auth_password_, false,false,true,nullptr);
if (transaction.connection_cb_) {
transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_);
}
}else{
ENVOY_LOG(debug, "Error in calling makeBlockingClientRequest, Neither in subscribed mode nor blocking mode");
onRequestCompleted();
return nullptr;
}

pending_request.request_handler_ = transaction.clients_[client_idx]->makeRequest(
Expand Down
46 changes: 30 additions & 16 deletions source/extensions/filters/network/redis_proxy/proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void ProxyFilter::onEvent(Network::ConnectionEvent event) {
}

}

ENVOY_LOG(debug,"closing downstream connection with transaction_.close()");
transaction_.close();
}
}
Expand Down Expand Up @@ -215,19 +215,33 @@ void ProxyFilter::onAsyncResponse(Common::Redis::RespValuePtr&& value){
if(config_->drain_decision_.drainClose() &&
config_->runtime_.snapshot().featureEnabled(config_->redis_drain_close_runtime_key_, 100)) {
config_->stats_.downstream_cx_drain_close_.inc();
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
//callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
this->closeDownstreamConnection();
}

}
void ProxyFilter::closeDownstreamConnection() {

ENVOY_LOG(debug,"dereferencing pubsub callback and transaction on exit from proxy filter");
// As downstreamcallbaks is created in proxy filter irerespecive of its a pubsub command or not this needs to be cleared on exit from proxy filter
// decrement the reference to proxy filter
auto downstream_cb = dynamic_cast<DownStreamCallbacks*>(transaction_.getDownstreamCallback().get());
if (downstream_cb != nullptr){
downstream_cb->clearParent();
}
transaction_.setDownstreamCallback(nullptr);
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);

}
void ProxyFilter::onPubsubConnClose(){
ASSERT(pending_requests_.empty());
//Close the downstream connection on upstream connection close
transaction_.setPubSubCallback(nullptr);
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
//callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
//This callback is called only on remote close , so no need to close the client connnection again
transaction_.connection_established_=false;
connection_quit_ = false;
this->closeDownstreamConnection();
return;
}

Expand All @@ -253,15 +267,8 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt
}
if (pending_requests_.empty() && connection_quit_) {
ENVOY_LOG(debug,"closing downstream connection as no pending requests and connection quit");
ENVOY_LOG(debug,"dereferencing pubsub callback and transaction on exit from proxy filter");
// As downstreamcallbaks is created in proxy filter irerespecive of its a pubsub command or not this needs to be cleared on exit from proxy filter
// decrement the reference to proxy filter
auto downstream_cb = dynamic_cast<DownStreamCallbacks*>(transaction_.getDownstreamCallback().get());
if (downstream_cb != nullptr){
downstream_cb->clearParent();
}
transaction_.setDownstreamCallback(nullptr);
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
//callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
this->closeDownstreamConnection();
connection_quit_ = false;
return;
}
Expand All @@ -270,21 +277,28 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt
if (pending_requests_.empty() && config_->drain_decision_.drainClose() &&
config_->runtime_.snapshot().featureEnabled(config_->redis_drain_close_runtime_key_, 100)) {
config_->stats_.downstream_cx_drain_close_.inc();
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
//callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
ENVOY_LOG(debug,"closing downstream connection as no pending requests and drain close");
this->closeDownstreamConnection();
}

// Check if there is an active transaction that needs to be closed.
if ((transaction_.should_close_ && pending_requests_.empty()) ||
(transaction_.isBlockingCommand() && pending_requests_.empty()) ||
(transaction_.isSubscribedMode() && transaction_.should_close_)) {
if (transaction_.should_close_ == true && transaction_.is_blocking_command_) {
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
if (transaction_.should_close_ == true && transaction_.isBlockingCommand()) {
//callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
ENVOY_LOG(debug,"closing downstream connection as blocking command and transaction close");
this->closeDownstreamConnection();
}
ENVOY_LOG(debug,"closing transaction as no pending requests and transaction close");
transaction_.close();
//Not sure if for transaction mode also we need to close the connection in downstream
if (transaction_.isSubscribedMode()){
transaction_.subscribed_client_shard_index_ = -1;
callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
//callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
ENVOY_LOG(debug,"closing downstream connection as pubsub mode and transaction close");
this->closeDownstreamConnection();
}
connection_quit_ = false;
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class ProxyFilter : public Network::ReadFilter,
std::unique_ptr<Common::Redis::Utility::DownStreamMetrics> getDownStreamInfo();
void setclientname(std::string clientname) { clientname_ = clientname; }
std::string getclientname() { return clientname_; }
void closeDownstreamConnection();

private:
friend class RedisProxyFilterTest;
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/health_checkers/redis/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct RedisHealthCheckerStats {
/**
* Redis health checker implementation. Sends PING and expects PONG.
*/
class RedisHealthChecker : public Upstream::HealthCheckerImplBase {
class RedisHealthChecker : public Upstream::HealthCheckerImplBase, public Logger::Loggable<Logger::Id::redis>{
public:
RedisHealthChecker(
const Upstream::Cluster& cluster, const envoy::config::core::v3::HealthCheck& config,
Expand Down Expand Up @@ -67,7 +67,7 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase {
: public ActiveHealthCheckSession,
public Extensions::NetworkFilters::Common::Redis::Client::Config,
public Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks,
public Network::ConnectionCallbacks {
public Network::ConnectionCallbacks, public Logger::Loggable<Logger::Id::redis> {
RedisActiveHealthCheckSession(RedisHealthChecker& parent, const Upstream::HostSharedPtr& host);
~RedisActiveHealthCheckSession() override;

Expand Down

0 comments on commit 630672b

Please sign in to comment.