From 630672beebb5f05ddcc658f68b205b5818af435a Mon Sep 17 00:00:00 2001 From: dinesh-murugiah Date: Sun, 11 Aug 2024 22:32:06 +0530 Subject: [PATCH] fixes for xread and memleak on quit --- .../network/common/redis/client_impl.cc | 8 ++ .../network/common/redis/supported_commands.h | 8 +- .../redis_proxy/command_splitter_impl.cc | 73 +++++++++++++++++-- .../network/redis_proxy/conn_pool_impl.cc | 8 ++ .../network/redis_proxy/proxy_filter.cc | 46 ++++++++---- .../network/redis_proxy/proxy_filter.h | 1 + .../extensions/health_checkers/redis/redis.h | 4 +- 7 files changed, 123 insertions(+), 25 deletions(-) diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 7e255a80b6b1..35710d1d7b46 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -85,6 +85,8 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis flush_timer_(dispatcher.createTimer([this]() { flushBufferAndResetTimer(); })), time_source_(dispatcher.timeSource()), redis_command_stats_(redis_command_stats), scope_(scope), is_transaction_client_(is_transaction_client), is_pubsub_client_(is_pubsub_client), is_blocking_client_(is_blocking_client) { + + ENVOY_LOG(debug,"ClientImpl Constructor creating client of type: is_transaction_client: {}, is_pubsub_client: {}, is_blocking_client: {}", is_transaction_client_, is_pubsub_client_, is_blocking_client_); Upstream::ClusterTrafficStats& traffic_stats = *host->cluster().trafficStats(); traffic_stats.upstream_cx_total_.inc(); host->stats().cx_total_.inc(); @@ -107,6 +109,7 @@ ClientImpl::~ClientImpl() { void ClientImpl::close() { pubsub_cb_.reset(); + ENVOY_LOG(debug, "Upstream Client Connection close requested"); if (connection_) { connection_->close(Network::ConnectionCloseType::NoFlush); } @@ -198,6 +201,8 @@ bool ClientImpl::makePubSubRequest(const RespValue& request) { void ClientImpl::onConnectOrOpTimeout() { + + ENVOY_LOG(debug, "Upstream Client Connection or Operation timeout occurred, is blocking client: {}", is_blocking_client_); putOutlierEvent(Upstream::Outlier::Result::LocalOriginTimeout); if (connected_) { host_->cluster().trafficStats()->upstream_rq_timeout_.inc(); @@ -220,6 +225,7 @@ void ClientImpl::onData(Buffer::Instance& data) { putOutlierEvent(Upstream::Outlier::Result::ExtOriginRequestFailed); host_->cluster().trafficStats()->upstream_cx_protocol_error_.inc(); host_->stats().rq_error_.inc(); + ENVOY_LOG(debug, "Upstream Client Protocol error occurred"); connection_->close(Network::ConnectionCloseType::NoFlush); } } @@ -263,7 +269,9 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { //handle non blocking and non transaction requests while (!pending_requests_.empty()) { PendingRequest& request = pending_requests_.front(); + ENVOY_LOG(debug,"Upstream Client Connection close "); if (!request.canceled_) { + ENVOY_LOG(debug,"Upstream Client Connection close calling onFailure"); request.callbacks_.onFailure(); } else { host_->cluster().trafficStats()->upstream_rq_cancelled_.inc(); diff --git a/source/extensions/filters/network/common/redis/supported_commands.h b/source/extensions/filters/network/common/redis/supported_commands.h index fc76963d011a..21fa226aa7fb 100644 --- a/source/extensions/filters/network/common/redis/supported_commands.h +++ b/source/extensions/filters/network/common/redis/supported_commands.h @@ -37,7 +37,7 @@ struct SupportedCommands { "xrange", "xrevrange", "rename", "getex", "sort", "zmscore", "sdiffstore", "msetnx", "substr", "zrangestore", "zunion", "echo", "zdiff", "xautoclaim", "xinfo", "sunionstore", "smismember", "hrandfield", "geosearchstore", "zdiffstore", "geosearch", "randomkey", "zinter", "zrandmember", - "bitop", "xclaim", "lpos", "renamenx", "xgroup"); + "bitop", "xclaim", "lpos", "renamenx", "xgroup","xreadnonblock"); } /** @@ -93,7 +93,7 @@ struct SupportedCommands { * @return commands that are called blocking commands but not pubsub commands. */ static const absl::flat_hash_set& blockingCommands() { - CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "blpop", "brpop", "brpoplpush", "bzpopmax", "bzpopmin", "xread", "xreadgroup", "blmove"); + CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "blpop", "brpop", "brpoplpush", "bzpopmax", "bzpopmin", "xreadblock", "xreadgroup", "blmove"); } /** @@ -174,6 +174,10 @@ struct SupportedCommands { */ static const std::string& info() { CONSTRUCT_ON_FIRST_USE(std::string, "info"); } + /** + * @return special stream commands + */ + static const std::string& spl_strm_commands() { CONSTRUCT_ON_FIRST_USE(std::string, "xread"); } /** * @return commands which alters the state of redis */ diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index ebdf03d5e1db..d93233a33433 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -95,8 +95,9 @@ int32_t getShardIndex(const std::string command, int32_t requestsCount,int32_t r bool isBlockingCommand = Common::Redis::SupportedCommands::blockingCommands().contains(command); bool isAllShardCommand = Common::Redis::SupportedCommands::allShardCommands().contains(command); + bool isXreadBlockingCommand = (command == "xread" || command == "xreadgroup"); - if (!isBlockingCommand && !isAllShardCommand && requestsCount == 1 ){ + if (!isBlockingCommand && !isAllShardCommand && requestsCount == 1 && !isXreadBlockingCommand){ // Send request to a random shard so that we donot allways send to the same shard shard_index = rand() % redisShardsCount; } @@ -220,7 +221,9 @@ void SingleServerRequest::onFailure() { onFailure(Response::get().UpstreamFailur void SingleServerRequest::onFailure(std::string error_msg) { handle_ = nullptr; updateStats(false); + ENVOY_LOG(debug,"mode of clients is Transaction : '{}', PubSub: '{}', Blocking: '{}'",callbacks_.transaction().isTransactionMode(),callbacks_.transaction().isSubscribedMode(),callbacks_.transaction().isBlockingCommand()); callbacks_.transaction().should_close_ = true; + ENVOY_LOG(debug, "onFailure error: {},closing transaction also", error_msg); callbacks_.onResponse(Common::Redis::Utility::makeError(error_msg)); } @@ -266,13 +269,38 @@ SplitRequestPtr SimpleRequest::create(Router& router, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool delay_command_latency, const StreamInfo::StreamInfo& stream_info) { + std::string command_name = absl::AsciiStrToLower(incoming_request->asArray()[0].asString()); + std::string key =""; std::unique_ptr request_ptr{ new SimpleRequest(callbacks, command_stats, time_source, delay_command_latency)}; + const auto route = router.upstreamPool(incoming_request->asArray()[1].asString(), stream_info); if (route) { Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); + if (command_name == "xread"){ + int32_t index = 0; + int32_t count = base_request->asArray().size(); + while (count > 0) { + if (absl::AsciiStrToLower(base_request->asArray()[index].asString())== "streams") { + index++; + key = base_request->asArray()[index].asString(); + break; + } + index++; + count--; + } + if (key.empty()) { + ENVOY_LOG(debug, "unexpected command : '{}'", base_request->toString()); + callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format("unexpected command format"))); + return nullptr; + } + + }else { + key = base_request->asArray()[1].asString(); + } + request_ptr->handle_ = makeSingleServerRequest( - route, base_request->asArray()[0].asString(), base_request->asArray()[1].asString(), + route, base_request->asArray()[0].asString(), key, base_request, *request_ptr, callbacks.transaction()); } else { ENVOY_LOG(debug, "route not found: '{}'", incoming_request->toString()); @@ -782,12 +810,34 @@ SplitRequestPtr BlockingClientRequest::create(Router& router, Common::Redis::Res // For blocking requests which operate on a single key, we can hash the key to a single //must send shard index as negative to indicate that it is a blocking request that acts on key std::string command_name = absl::AsciiStrToLower(incoming_request->asArray()[0].asString()); + std::string key =""; int32_t shard_index=getShardIndex(command_name,1,1); Common::Redis::Client::Transaction& transaction = callbacks.transaction(); std::unique_ptr request_ptr{ new BlockingClientRequest(callbacks, command_stats, time_source, delay_command_latency)}; - std::string key = absl::AsciiStrToLower(incoming_request->asArray()[1].asString()); + if (command_name == "xread"){ + int32_t index = 0; + int32_t count = incoming_request->asArray().size(); + while (count > 0) { + if (absl::AsciiStrToLower(incoming_request->asArray()[index].asString())== "streams") { + index++; + key = incoming_request->asArray()[index].asString(); + break; + } + index++; + count--; + } + if (key.empty()) { + ENVOY_LOG(debug, "unexpected command : '{}'", incoming_request->toString()); + callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format("unexpected command format"))); + return nullptr; + } + + }else { + key = incoming_request->asArray()[1].asString(); + } + if (transaction.active_ ){ // when we are in blocking command, we cannnot accept any other commands @@ -802,7 +852,7 @@ SplitRequestPtr BlockingClientRequest::create(Router& router, Common::Redis::Res return nullptr; } }else { - if (Common::Redis::SupportedCommands::blockingCommands().contains(command_name)){ + if (Common::Redis::SupportedCommands::blockingCommands().contains(command_name) || command_name == "xread") { transaction.clients_.resize(1); transaction.setBlockingCommand(); transaction.start(); @@ -814,6 +864,7 @@ SplitRequestPtr BlockingClientRequest::create(Router& router, Common::Redis::Res } const auto route = router.upstreamPool(incoming_request->asArray()[1].asString(), stream_info); if (route) { + ENVOY_LOG(debug, "key: for sharding '{}'", key); Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); request_ptr->handle_ = makeBlockingRequest( route,shard_index,key,base_request, *request_ptr, callbacks.transaction()); @@ -1826,7 +1877,7 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request, // Get the handler for the downstream request auto handler = handler_lookup_table_.find(command_name.c_str()); - if (handler == nullptr && !callbacks.transaction().isSubscribedMode()) { + if (handler == nullptr && !callbacks.transaction().isSubscribedMode() && command_name!=Common::Redis::SupportedCommands::spl_strm_commands()) { stats_.unsupported_command_.inc(); ENVOY_LOG(debug, "unsupported command '{}'", request->asArray()[0].asString()); callbacks.onResponse(Common::Redis::Utility::makeError( @@ -1845,6 +1896,18 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request, handler = handler_lookup_table_.find("subscribe"); } + //If the command is xread we need to check if its a blocking command or not + if (command_name == "xread") { + if (((request->asArray().size() > 1) && (absl::AsciiStrToLower(request->asArray()[1].asString()) == "block")) || + ((request->asArray().size() > 3) && (absl::AsciiStrToLower(request->asArray()[3].asString()) == "block"))) { + handler = handler_lookup_table_.find("xreadblock"); + } else { + handler = handler_lookup_table_.find("xreadnonblock"); + } + + } + + // Fault Injection Check const Common::Redis::Fault* fault_ptr = fault_manager_->getFaultForCommand(command_name); diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 956acd3f81a0..71989c80034e 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -269,6 +269,7 @@ void InstanceImpl::ThreadLocalPool::onHostsRemoved( } } else { // There are no pending requests so close the connection. + ENVOY_LOG(debug," onHostsRemoved Closing redis client for host:'{}'",host->address()->asString()); it->second->redis_client_->close(); } } @@ -415,7 +416,9 @@ InstanceImpl::ThreadLocalPool::makeBlockingClientRequest(int32_t shard_index, co uint32_t client_idx = 0; if (transaction.active_) { client_idx = transaction.current_client_idx_; + ENVOY_LOG(debug, " client creation in makeBlockingClientRequest client_idx:{}",client_idx); if (!transaction.connection_established_ && transaction.isSubscribedMode()) { + ENVOY_LOG(debug, "PubSub command client creation in makeBlockingClientRequest"); transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), auth_username_, auth_password_, false,true,false,transaction.getPubSubCallback()); @@ -423,12 +426,17 @@ InstanceImpl::ThreadLocalPool::makeBlockingClientRequest(int32_t shard_index, co transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } }else if (!transaction.connection_established_ && transaction.isBlockingCommand()) { + ENVOY_LOG(debug, "Blocking command client creation in makeBlockingClientRequest for host:'{}'",host->address()->asString()); transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), auth_username_, auth_password_, false,false,true,nullptr); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } + }else{ + ENVOY_LOG(debug, "Error in calling makeBlockingClientRequest, Neither in subscribed mode nor blocking mode"); + onRequestCompleted(); + return nullptr; } pending_request.request_handler_ = transaction.clients_[client_idx]->makeRequest( diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.cc b/source/extensions/filters/network/redis_proxy/proxy_filter.cc index 2a3dc670897c..7ca03742fed8 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.cc +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.cc @@ -137,7 +137,7 @@ void ProxyFilter::onEvent(Network::ConnectionEvent event) { } } - + ENVOY_LOG(debug,"closing downstream connection with transaction_.close()"); transaction_.close(); } } @@ -215,19 +215,33 @@ void ProxyFilter::onAsyncResponse(Common::Redis::RespValuePtr&& value){ if(config_->drain_decision_.drainClose() && config_->runtime_.snapshot().featureEnabled(config_->redis_drain_close_runtime_key_, 100)) { config_->stats_.downstream_cx_drain_close_.inc(); - callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + //callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + this->closeDownstreamConnection(); } } +void ProxyFilter::closeDownstreamConnection() { + ENVOY_LOG(debug,"dereferencing pubsub callback and transaction on exit from proxy filter"); + // As downstreamcallbaks is created in proxy filter irerespecive of its a pubsub command or not this needs to be cleared on exit from proxy filter + // decrement the reference to proxy filter + auto downstream_cb = dynamic_cast(transaction_.getDownstreamCallback().get()); + if (downstream_cb != nullptr){ + downstream_cb->clearParent(); + } + transaction_.setDownstreamCallback(nullptr); + callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + +} void ProxyFilter::onPubsubConnClose(){ ASSERT(pending_requests_.empty()); //Close the downstream connection on upstream connection close transaction_.setPubSubCallback(nullptr); - callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + //callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); //This callback is called only on remote close , so no need to close the client connnection again transaction_.connection_established_=false; connection_quit_ = false; + this->closeDownstreamConnection(); return; } @@ -253,15 +267,8 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt } if (pending_requests_.empty() && connection_quit_) { ENVOY_LOG(debug,"closing downstream connection as no pending requests and connection quit"); - ENVOY_LOG(debug,"dereferencing pubsub callback and transaction on exit from proxy filter"); - // As downstreamcallbaks is created in proxy filter irerespecive of its a pubsub command or not this needs to be cleared on exit from proxy filter - // decrement the reference to proxy filter - auto downstream_cb = dynamic_cast(transaction_.getDownstreamCallback().get()); - if (downstream_cb != nullptr){ - downstream_cb->clearParent(); - } - transaction_.setDownstreamCallback(nullptr); - callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + //callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + this->closeDownstreamConnection(); connection_quit_ = false; return; } @@ -270,21 +277,28 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt if (pending_requests_.empty() && config_->drain_decision_.drainClose() && config_->runtime_.snapshot().featureEnabled(config_->redis_drain_close_runtime_key_, 100)) { config_->stats_.downstream_cx_drain_close_.inc(); - callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + //callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + ENVOY_LOG(debug,"closing downstream connection as no pending requests and drain close"); + this->closeDownstreamConnection(); } // Check if there is an active transaction that needs to be closed. if ((transaction_.should_close_ && pending_requests_.empty()) || (transaction_.isBlockingCommand() && pending_requests_.empty()) || (transaction_.isSubscribedMode() && transaction_.should_close_)) { - if (transaction_.should_close_ == true && transaction_.is_blocking_command_) { - callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + if (transaction_.should_close_ == true && transaction_.isBlockingCommand()) { + //callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + ENVOY_LOG(debug,"closing downstream connection as blocking command and transaction close"); + this->closeDownstreamConnection(); } + ENVOY_LOG(debug,"closing transaction as no pending requests and transaction close"); transaction_.close(); //Not sure if for transaction mode also we need to close the connection in downstream if (transaction_.isSubscribedMode()){ transaction_.subscribed_client_shard_index_ = -1; - callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + //callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + ENVOY_LOG(debug,"closing downstream connection as pubsub mode and transaction close"); + this->closeDownstreamConnection(); } connection_quit_ = false; return; diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.h b/source/extensions/filters/network/redis_proxy/proxy_filter.h index 1dc5c7557386..37fcefa12388 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.h +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.h @@ -113,6 +113,7 @@ class ProxyFilter : public Network::ReadFilter, std::unique_ptr getDownStreamInfo(); void setclientname(std::string clientname) { clientname_ = clientname; } std::string getclientname() { return clientname_; } + void closeDownstreamConnection(); private: friend class RedisProxyFilterTest; diff --git a/source/extensions/health_checkers/redis/redis.h b/source/extensions/health_checkers/redis/redis.h index d87dc880b4bb..43a1f49b5be9 100644 --- a/source/extensions/health_checkers/redis/redis.h +++ b/source/extensions/health_checkers/redis/redis.h @@ -34,7 +34,7 @@ struct RedisHealthCheckerStats { /** * Redis health checker implementation. Sends PING and expects PONG. */ -class RedisHealthChecker : public Upstream::HealthCheckerImplBase { +class RedisHealthChecker : public Upstream::HealthCheckerImplBase, public Logger::Loggable{ public: RedisHealthChecker( const Upstream::Cluster& cluster, const envoy::config::core::v3::HealthCheck& config, @@ -67,7 +67,7 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { : public ActiveHealthCheckSession, public Extensions::NetworkFilters::Common::Redis::Client::Config, public Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks, - public Network::ConnectionCallbacks { + public Network::ConnectionCallbacks, public Logger::Loggable { RedisActiveHealthCheckSession(RedisHealthChecker& parent, const Upstream::HostSharedPtr& host); ~RedisActiveHealthCheckSession() override;