diff --git a/envoy/upstream/load_balancer.h b/envoy/upstream/load_balancer.h index a225b16e8d36..edbbee86d7ce 100644 --- a/envoy/upstream/load_balancer.h +++ b/envoy/upstream/load_balancer.h @@ -134,7 +134,10 @@ class LoadBalancer { * context information. Load balancers should be written to assume that context information * is missing and use sensible defaults. */ - virtual HostConstVectorSharedPtr getallHosts(LoadBalancerContext* context) PURE; + virtual HostConstVectorSharedPtr getAllHosts(LoadBalancerContext* context){ + (void)context; // Mark the context parameter as unused to avoid compiler warnings + return nullptr; // Return nullptr so that only required class can override this interface implementation + }; /** * Returns a best effort prediction of the next host to be picked, or nullptr if not predictable. diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 110114913920..c58ef7fa00bf 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -2131,13 +2131,6 @@ HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEnt return lb_->peekAnotherHost(context); } -// Make the ThreadLocalClusterManagerImpl class accessible by changing its access specifier or adding a friend declaration. -HostConstVectorSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::getallHosts( - LoadBalancerContext* context) { - - return lb_->getallHosts(context); -} - Tcp::ConnectionPool::Instance* ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolImpl( ResourcePriority priority, LoadBalancerContext* context, bool peek) { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 4962179cfadf..33074b2b1244 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -628,7 +628,6 @@ class ClusterManagerImpl : public ClusterManager, HostConstSharedPtr chooseHost(LoadBalancerContext* context); HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context); - HostConstVectorSharedPtr getallHosts(LoadBalancerContext* context); ThreadLocalClusterManagerImpl& parent_; PrioritySetImpl priority_set_; diff --git a/source/common/upstream/load_balancer_impl.h b/source/common/upstream/load_balancer_impl.h index 619a58033bdf..614541057798 100644 --- a/source/common/upstream/load_balancer_impl.h +++ b/source/common/upstream/load_balancer_impl.h @@ -518,7 +518,6 @@ class EdfLoadBalancerBase : public ZoneAwareLoadBalancerBase { // Upstream::ZoneAwareLoadBalancerBase HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) override; - HostConstVectorSharedPtr getallHosts(LoadBalancerContext* ) override {return nullptr;} HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) override; protected: @@ -775,7 +774,6 @@ class RandomLoadBalancer : public ZoneAwareLoadBalancerBase { // Upstream::ZoneAwareLoadBalancerBase HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) override; HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) override; - HostConstVectorSharedPtr getallHosts(LoadBalancerContext* ) override {return nullptr;} protected: HostConstSharedPtr peekOrChoose(LoadBalancerContext* context, bool peek); diff --git a/source/common/upstream/thread_aware_lb_impl.h b/source/common/upstream/thread_aware_lb_impl.h index 101ae33b0836..816cca9d991e 100644 --- a/source/common/upstream/thread_aware_lb_impl.h +++ b/source/common/upstream/thread_aware_lb_impl.h @@ -94,7 +94,6 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL // Preconnect not implemented for hash based load balancing HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } - HostConstVectorSharedPtr getallHosts(LoadBalancerContext*) override { return nullptr; } // Pool selection not implemented. absl::optional selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, @@ -130,7 +129,6 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; // Preconnect not implemented for hash based load balancing HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } - HostConstVectorSharedPtr getallHosts(LoadBalancerContext*) override { return nullptr; } absl::optional selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, const Upstream::Host& /*host*/, diff --git a/source/extensions/clusters/aggregate/cluster.cc b/source/extensions/clusters/aggregate/cluster.cc index b58c8f85feea..c8e0bc8b5dee 100644 --- a/source/extensions/clusters/aggregate/cluster.cc +++ b/source/extensions/clusters/aggregate/cluster.cc @@ -188,13 +188,6 @@ AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* con return nullptr; } -Upstream::HostConstVectorSharedPtr -AggregateClusterLoadBalancer::getallHosts(Upstream::LoadBalancerContext* context) { - if (load_balancer_) { - return load_balancer_->getallHosts(context); - } - return nullptr; -} absl::optional AggregateClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context, const Upstream::Host& host, diff --git a/source/extensions/clusters/aggregate/cluster.h b/source/extensions/clusters/aggregate/cluster.h index e286bd90c71e..a650a1c98d14 100644 --- a/source/extensions/clusters/aggregate/cluster.h +++ b/source/extensions/clusters/aggregate/cluster.h @@ -77,7 +77,6 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer, // Upstream::LoadBalancer Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override; - Upstream::HostConstVectorSharedPtr getallHosts(Upstream::LoadBalancerContext*) override; absl::optional selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, @@ -104,9 +103,6 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer, Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { return nullptr; } - Upstream::HostConstVectorSharedPtr getallHosts(Upstream::LoadBalancerContext*) override { - return nullptr; - } absl::optional selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, diff --git a/source/extensions/clusters/dynamic_forward_proxy/cluster.h b/source/extensions/clusters/dynamic_forward_proxy/cluster.h index 0afdfa74e1e7..8f32bfd7d215 100644 --- a/source/extensions/clusters/dynamic_forward_proxy/cluster.h +++ b/source/extensions/clusters/dynamic_forward_proxy/cluster.h @@ -95,7 +95,6 @@ class Cluster : public Upstream::BaseDynamicClusterImpl, Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { return nullptr; } - Upstream::HostConstVectorSharedPtr getallHosts(Upstream::LoadBalancerContext*) override { return nullptr; } absl::optional selectExistingConnection(Upstream::LoadBalancerContext* context, const Upstream::Host& host, diff --git a/source/extensions/clusters/original_dst/original_dst_cluster.h b/source/extensions/clusters/original_dst/original_dst_cluster.h index a9c2621aeb74..fa4c7f6ed759 100644 --- a/source/extensions/clusters/original_dst/original_dst_cluster.h +++ b/source/extensions/clusters/original_dst/original_dst_cluster.h @@ -98,7 +98,6 @@ class OriginalDstCluster : public ClusterImplBase { HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; // Preconnecting is not implemented for OriginalDstCluster HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } - HostConstVectorSharedPtr getallHosts(LoadBalancerContext*) override { return nullptr; } // Pool selection not implemented for OriginalDstCluster absl::optional diff --git a/source/extensions/clusters/redis/redis_cluster_lb.h b/source/extensions/clusters/redis/redis_cluster_lb.h index 21333c7d1501..7eb13c0c23ad 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.h +++ b/source/extensions/clusters/redis/redis_cluster_lb.h @@ -203,27 +203,27 @@ class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack, Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { return nullptr; } - Upstream::HostConstVectorSharedPtr getallHosts(Upstream::LoadBalancerContext*) override { - size_t count = 0; - if (!shard_vector_) { - return nullptr; - }else{ - count = shard_vector_->size(); - } - - Envoy::Upstream::HostConstVectorSharedPtr hosts = std::make_shared>(); - for (auto const& shard : *shard_vector_) { - auto host = shard->primary(); - if (host) { - hosts->emplace_back(std::move(host)); - } - } - if (count == hosts->size()) { - return hosts; - } else { - return nullptr; + Upstream::HostConstVectorSharedPtr getAllHosts(Upstream::LoadBalancerContext*) override { + + if (!shard_vector_) { + return nullptr; + } + + size_t count = shard_vector_->size(); + + Envoy::Upstream::HostConstVectorSharedPtr hosts = std::make_shared>(); + for (auto const& shard : *shard_vector_) { + auto host = shard->primary(); + if (host) { + hosts->emplace_back(std::move(host)); } } + if (count == hosts->size()) { + return hosts; + } else { + return nullptr; + } + } // Pool selection not implemented. absl::optional selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 9436d877c10a..64ef2c65e025 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -196,6 +196,7 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { + std::string eventTypeStr = (event == Network::ConnectionEvent::RemoteClose ? "RemoteClose" : "LocalClose"); Upstream::reportUpstreamCxDestroy(host_, event); if (!pending_requests_.empty()) { Upstream::reportUpstreamCxDestroyActiveRequest(host_, event); @@ -203,10 +204,14 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { putOutlierEvent(Upstream::Outlier::Result::LocalOriginConnectFailed); } } + // If client is Pubsub handle the upstream close event such that downstream must also be closed. if (pending_requests_.empty() && is_pubsub_client_) { host_->cluster().trafficStats()->upstream_cx_destroy_with_active_rq_.inc(); - if (pubsub_cb_ != nullptr){ + ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}'",eventTypeStr); + if ((pubsub_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){ + ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it"); pubsub_cb_->onFailure(); + pubsub_cb_.reset(); } } @@ -221,7 +226,7 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { } connect_or_op_timer_->disableTimer(); - // If client is Pubsub handle the upstream close event such that downstream must also be closed. + } else if (event == Network::ConnectionEvent::Connected) { connected_ = true; ASSERT(!pending_requests_.empty()); diff --git a/source/extensions/filters/network/common/redis/supported_commands.h b/source/extensions/filters/network/common/redis/supported_commands.h index 3ceffddc55cc..ae6a0947d91b 100644 --- a/source/extensions/filters/network/common/redis/supported_commands.h +++ b/source/extensions/filters/network/common/redis/supported_commands.h @@ -87,13 +87,13 @@ struct SupportedCommands { * @return commands which handle Redis commands without keys. */ static const absl::flat_hash_set& adminNokeyCommands() { - CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "script", "flushall","publish","pubsub", "keys", "slowlog", "config"); + CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "script", "flushall","publish","pubsub", "keys", "slowlog", "config","client"); } /** * @return commands which handle Redis commands without keys. */ static const absl::flat_hash_set& allShardCommands() { - CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "script", "flushall", "pubsub", "keys", "slowlog", "config"); + CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "script", "flushall", "pubsub", "keys", "slowlog", "config","client"); } /** diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 63672e9cf647..4be024d3a69a 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -48,7 +48,7 @@ Common::Redis::Client::PoolRequest* makeSingleServerRequest( AdminRespHandlerType getresponseHandlerType(const std::string command_name) { AdminRespHandlerType responseHandlerType = AdminRespHandlerType::response_handler_none; if (Common::Redis::SupportedCommands::allShardCommands().contains(command_name)) { - if (command_name == "pubsub" || command_name == "keys" || command_name == "slowlog") { + if (command_name == "pubsub" || command_name == "keys" || command_name == "slowlog" || command_name == "client") { responseHandlerType = AdminRespHandlerType::aggregate_all_responses; } else if (command_name == "script" || command_name == "flushall" || command_name == "config"){ responseHandlerType = AdminRespHandlerType::allresponses_mustbe_same; @@ -58,28 +58,35 @@ AdminRespHandlerType getresponseHandlerType(const std::string command_name) { } return responseHandlerType; } -int32_t getShardIndex(const std::string command, int32_t numofRequests,int32_t numofRedisShards) { +int32_t getShardIndex(const std::string command, int32_t requestsCount,int32_t redisShardsCount) { int32_t shard_index = -1; //This need optimisation , generate random seed only once per thread srand(time(nullptr)); - if (Common::Redis::SupportedCommands::blockingCommands().contains(command) && numofRequests == 1) { + if (Common::Redis::SupportedCommands::blockingCommands().contains(command) && requestsCount == 1) { return shard_index; - }else if (!Common::Redis::SupportedCommands::allShardCommands().contains(command) && numofRequests == 1 ){ + }else if (!Common::Redis::SupportedCommands::allShardCommands().contains(command) && requestsCount == 1 ){ // Send request to a random shard so that we donot allways send to the same shard - shard_index = rand() % numofRedisShards; + shard_index = rand() % redisShardsCount; } return shard_index; } +bool checkIfAdminCommandSupported(const std::string command,const std::string subcommand){ + if (command == "client" && subcommand != "list"){ + return false; + }else { + return true; + } +} -int32_t getNumberofRequests(const std::string command_name, int32_t numofRedisShards) { - int32_t numofRequests = 1; +int32_t getRequestCount(const std::string command_name, int32_t redisShardsCount) { + int32_t requestsCount = 1; if (Common::Redis::SupportedCommands::allShardCommands().contains(command_name)) { - numofRequests = numofRedisShards; + requestsCount = redisShardsCount; } - return numofRequests; + return requestsCount; } Common::Redis::Client::PoolRequest* makeNoKeyRequest( @@ -330,13 +337,18 @@ SplitRequestPtr mgmtNoKeyRequest::create(Router& router, Common::Redis::RespValu std::unique_ptr request_ptr{ new mgmtNoKeyRequest(callbacks, command_stats, time_source, delay_command_latency)}; std::string key = std::string(); - int32_t numofRedisShards=0; - int32_t numofRequests=1; + int32_t redisShardsCount=0; + int32_t requestsCount=1; int32_t shard_index=0; bool iserror = false; std::string command_name = absl::AsciiStrToLower(incoming_request->asArray()[0].asString()); std::string firstarg = absl::AsciiStrToLower(incoming_request->asArray()[1].asString()); + if (!checkIfAdminCommandSupported(command_name,firstarg)){ + ENVOY_LOG(debug, "this admin command is not supported: '{}'", incoming_request->toString()); + callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().InvalidRequest)); + return nullptr; + } const auto& route = router.upstreamPool(key, stream_info); @@ -344,9 +356,9 @@ SplitRequestPtr mgmtNoKeyRequest::create(Router& router, Common::Redis::RespValu Extensions::NetworkFilters::RedisProxy::ConnPool::InstanceImpl* instance = dynamic_cast( route->upstream(key).get()); - numofRedisShards = instance->getNumofRedisShards(); - if (numofRedisShards <= 0){ - ENVOY_LOG(debug, "numofRedisShards not found: '{}'", incoming_request->toString()); + redisShardsCount = instance->getRedisShardsCount(); + if (redisShardsCount <= 0){ + ENVOY_LOG(debug, "redisShardsCount not found: '{}'", incoming_request->toString()); callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); return nullptr; } @@ -356,8 +368,8 @@ SplitRequestPtr mgmtNoKeyRequest::create(Router& router, Common::Redis::RespValu callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); //request_ptr->onChildResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost),0); } - numofRequests = getNumberofRequests(command_name,numofRedisShards); - request_ptr->num_pending_responses_ = numofRequests; + requestsCount = getRequestCount(command_name,redisShardsCount); + request_ptr->num_pending_responses_ = requestsCount; request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_); // Identify the response type based on the subcommand , needs to be written very clean @@ -365,12 +377,12 @@ SplitRequestPtr mgmtNoKeyRequest::create(Router& router, Common::Redis::RespValu // For now keeping it ugly like this and assuming we only add support for script comand. Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); //reserve memory for list of responses if we have more than 1 request - if(numofRequests >1){ + if(requestsCount >1){ request_ptr->pending_responses_.reserve(request_ptr->num_pending_responses_); } for (int32_t i = 0; i < request_ptr->num_pending_responses_; i++) { - shard_index=getShardIndex(command_name,numofRequests,numofRedisShards); + shard_index=getShardIndex(command_name,requestsCount,redisShardsCount); if(shard_index < 0){ shard_index = i; } @@ -480,7 +492,7 @@ void mgmtNoKeyRequest::onallChildRespAgrregate(Common::Redis::RespValuePtr&& val bool positiveresponse = true; updateStats(error_count_ == 0); if (!pending_responses_.empty()) { - if ( rediscommand == "pubsub" || rediscommand == "keys" || rediscommand == "slowlog") { + if ( rediscommand == "pubsub" || rediscommand == "keys" || rediscommand == "slowlog"|| rediscommand == "client") { if ((redisarg == "numpat" || redisarg == "len") && (rediscommand == "pubsub" || rediscommand == "slowlog")) { int sum = 0; Common::Redis::RespValuePtr response = std::make_unique(); @@ -518,7 +530,7 @@ void mgmtNoKeyRequest::onallChildRespAgrregate(Common::Redis::RespValuePtr&& val response->type(Common::Redis::RespType::Array); // Iterate through pending_responses_ and append non-empty responses to the array - if ( redisarg == "channels" || rediscommand == "keys" || redisarg == "get") { + if ( redisarg == "channels" || rediscommand == "keys" || redisarg == "get" ) { for (auto& resp : pending_responses_) { if (resp->type() == Common::Redis::RespType::Array ) { if (resp->asArray().empty()) { @@ -530,7 +542,14 @@ void mgmtNoKeyRequest::onallChildRespAgrregate(Common::Redis::RespValuePtr&& val } } } - } else if (redisarg == "numsub"){ + }else if (rediscommand == "client" && redisarg == "list") { + for (auto& resp : pending_responses_) { + if (resp->type() == Common::Redis::RespType::BulkString) { + innerResponse = *resp; + response->asArray().emplace_back(std::move(innerResponse)); + } + } + }else if (redisarg == "numsub"){ std::unordered_map subscriberCounts; for (auto& resp : pending_responses_) { if (resp->type() == Common::Redis::RespType::Array) { @@ -717,7 +736,7 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt std::unique_ptr request_ptr{ new PubSubRequest(callbacks, command_stats, time_source, delay_command_latency)}; std::string key = std::string(); - int32_t numofRedisShards=0; + int32_t redisShardsCount=0; int32_t shard_index=0; @@ -726,9 +745,9 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt Extensions::NetworkFilters::RedisProxy::ConnPool::InstanceImpl* instance = dynamic_cast( route->upstream(key).get()); - numofRedisShards = instance->getNumofRedisShards(); - if (numofRedisShards <= 0){ - ENVOY_LOG(debug, "numofRedisShards not found: '{}'", incoming_request->toString()); + redisShardsCount = instance->getRedisShardsCount(); + if (redisShardsCount <= 0){ + ENVOY_LOG(debug, "redisShardsCount not found: '{}'", incoming_request->toString()); callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); return nullptr; } @@ -761,7 +780,7 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt transaction.clients_.resize(1); transaction.enterSubscribedMode(); if(transaction.subscribed_client_shard_index_ == -1){ - transaction.subscribed_client_shard_index_ = getShardIndex(command_name,1,numofRedisShards); + transaction.subscribed_client_shard_index_ = getShardIndex(command_name,1,redisShardsCount); } transaction.start(); }else{ @@ -783,6 +802,7 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); return nullptr; } + //Should we set to true irrespective of connpool handler returned transaction.connection_established_=true; transaction.should_close_ = false; return request_ptr; diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 6d0caf0defa0..857f41f76104 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -123,9 +123,9 @@ InstanceImpl::makeBlockingClientRequest(int32_t shard_index, const std::string& } -int32_t InstanceImpl::getNumofRedisShards() { +int32_t InstanceImpl::getRedisShardsCount() { - return tls_->getTyped().getNumofRedisShards(); + return tls_->getTyped().getRedisShardsCount(); } InstanceImpl::ThreadLocalPool::ThreadLocalPool( @@ -300,6 +300,7 @@ InstanceImpl::ThreadLocalPool::threadLocalActiveClient(Upstream::HostConstShared } else { client = std::make_unique(*this); client->host_ = host; + ENVOY_LOG(debug,"New Threadlocal active client created for:'{}'",host->address()->asString()); client->redis_client_ = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), auth_username_, auth_password_, false,false,false,nullptr); @@ -387,9 +388,9 @@ InstanceImpl::ThreadLocalPool::makeBlockingClientRequest(int32_t shard_index, co }else { - Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getallHosts(nullptr); + Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getAllHosts(nullptr); if (!hosts) { - ENVOY_LOG(debug, "host not found:"); + ENVOY_LOG(error, "Unable to retrive all redis primary shards , possible that we are scaling or upstream error"); onRequestCompleted(); return nullptr; } @@ -457,9 +458,9 @@ InstanceImpl::ThreadLocalPool::makeRequestNoKey(int32_t shard_index, RespVariant return nullptr; } - Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getallHosts(nullptr); + Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getAllHosts(nullptr); if (!hosts) { - ENVOY_LOG(debug, "host not found:"); + ENVOY_LOG(error, "Unable to retrive all redis primary shards , possible that we are scaling or upstream error"); onRequestCompleted(); return nullptr; } @@ -470,6 +471,7 @@ InstanceImpl::ThreadLocalPool::makeRequestNoKey(int32_t shard_index, RespVariant uint32_t client_idx = 0; // If there is an active transaction, establish a new connection if necessary. if (transaction.active_) { + ENVOY_LOG(error,"Ideally transanction client should not be used for admin commands ERROR!!!"); client_idx = transaction.current_client_idx_; if ((!transaction.connection_established_ && transaction.is_subscribed_mode_) || (!transaction.connection_established_ && transaction.is_blocking_command_)) { transaction.clients_[client_idx] = @@ -504,12 +506,12 @@ InstanceImpl::ThreadLocalPool::makeRequestNoKey(int32_t shard_index, RespVariant } } -int32_t InstanceImpl::ThreadLocalPool::getNumofRedisShards() { +int32_t InstanceImpl::ThreadLocalPool::getRedisShardsCount() { int32_t numofRedisShards =0; - Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getallHosts(nullptr); + Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getAllHosts(nullptr); if (!hosts) { - ENVOY_LOG(debug, "host not found:"); + ENVOY_LOG(error, "Unable to retrive all redis primary shards , possible that we are scaling or upstream error"); return numofRedisShards; } for (const auto& host : *hosts) { diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index c0604caa68cd..44913069a623 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -101,7 +101,7 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this(transaction_.getPubsubCallback().get()); - pubsub_cb->clearParent(); - - transaction_.setPubsubCallback(nullptr); - - //Close all upsteam clients and ref to pubsub callbacks if any - transaction_.close(); - + ASSERT(pending_requests_.empty()); +//Close the downstream connection on upstream connection close callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); - - + //This callback is called only on remote close , so no need to close the client connnection again + transaction_.connection_established_=false; connection_quit_ = false; return; - } void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePtr&& value) { @@ -263,7 +253,9 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt } // Check if there is an active transaction that needs to be closed. - if ((transaction_.should_close_ && pending_requests_.empty()) || (transaction_.is_blocking_command_ && pending_requests_.empty())) { + if ((transaction_.should_close_ && pending_requests_.empty()) || + (transaction_.is_blocking_command_ && pending_requests_.empty()) || + (transaction_.isSubscribedMode() && transaction_.should_close_)) { transaction_.close(); //Not sure if for transaction mode also we need to close the connection in downstream diff --git a/source/extensions/load_balancing_policies/subset/subset_lb.h b/source/extensions/load_balancing_policies/subset/subset_lb.h index 0f0f749e901a..ddf228a2c0e7 100644 --- a/source/extensions/load_balancing_policies/subset/subset_lb.h +++ b/source/extensions/load_balancing_policies/subset/subset_lb.h @@ -144,7 +144,6 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::Loggable selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,