From 9b97dd8a984daf6ee4094d144c3241f9a716fe6e Mon Sep 17 00:00:00 2001 From: dinesh-murugiah Date: Wed, 8 May 2024 22:30:54 +0530 Subject: [PATCH 01/10] pubsub framwork change - pair programming iter 1 --- .../filters/network/common/redis/client.h | 29 ++++++- .../network/common/redis/client_impl.cc | 79 +++++++++++++++---- .../network/common/redis/client_impl.h | 9 ++- .../redis_proxy/command_splitter_impl.cc | 68 +++++++++++++--- .../redis_proxy/command_splitter_impl.h | 15 +++- .../network/redis_proxy/conn_pool_impl.cc | 60 +++++++++++++- .../network/redis_proxy/conn_pool_impl.h | 5 ++ .../network/redis_proxy/proxy_filter.cc | 5 ++ 8 files changed, 234 insertions(+), 36 deletions(-) diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index b21a42857d22..d579d3b36163 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -66,6 +66,14 @@ class DirectCallbacks { }; +class PubsubCallbacks { +public: + virtual ~PubsubCallbacks() = default; + virtual void handleChannelMessage(Common::Redis::RespValuePtr&& value) PURE; + virtual void onFailure() PURE; + +}; + /** * DoNothingPoolCallbacks is used for internally generated commands whose response is @@ -111,6 +119,15 @@ class Client : public Event::DeferredDeletable { */ virtual PoolRequest* makeRequest(const RespValue& request, ClientCallbacks& callbacks) PURE; + /** + * Make a pipelined request to the remote redis server. + * @param request supplies the RESP request to make. + * @param callbacks supplies the request callbacks. + * @return PoolRequest* a handle to the active request or nullptr if the request could not be made + * for some reason. + */ + virtual bool makePubSubRequest(const RespValue& request) PURE; + /** * Initialize the connection. Issue the auth command and readonly command as needed. * @param auth password for upstream host. @@ -222,7 +239,7 @@ class ClientFactory { const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope, const std::string& auth_username, - const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr& drcb) PURE; + const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr& pubsubcb) PURE; }; // A MULTI command sent when starting a transaction. @@ -291,6 +308,15 @@ struct Transaction { std::shared_ptr getDownstreamCallback() { return downstream_cb_; } + + void setPubSubCallback(std::shared_ptr callback) { + pubsub_cb_ = callback; + } + + std::shared_ptr getPubSubCallback() { + return pubsub_cb_; + } + bool active_{false}; bool connection_established_{false}; bool should_close_{false}; @@ -305,6 +331,7 @@ struct Transaction { std::vector clients_; Network::ConnectionCallbacks* connection_cb_; std::shared_ptr downstream_cb_=nullptr; + std::shared_ptr pubsub_cb_=nullptr; // This index represents the current client on which traffic is being sent to. // When sending to the main redis server it will be 0, and when sending to one of diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index c96a141deb26..7313a94a7fdd 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -63,10 +63,10 @@ ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatche EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats, - Stats::Scope& scope, bool is_transaction_client, bool is_pubsub_client, bool is_blocking_client, const std::shared_ptr& drcb) { + Stats::Scope& scope, bool is_transaction_client, bool is_pubsub_client, bool is_blocking_client, const std::shared_ptr& pubsubcb) { auto client = std::make_unique(host, dispatcher, std::move(encoder), decoder_factory, config, - redis_command_stats, scope, is_transaction_client,is_pubsub_client,is_blocking_client,drcb); + redis_command_stats, scope, is_transaction_client,is_pubsub_client,is_blocking_client,pubsubcb); client->connection_ = host->createConnection(dispatcher, nullptr, nullptr).connection_; client->connection_->addConnectionCallbacks(*client); client->connection_->addReadFilter(Network::ReadFilterSharedPtr{new UpstreamReadFilter(*client)}); @@ -78,7 +78,7 @@ ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatche ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope, - bool is_transaction_client, bool is_pubsub_client, bool is_blocking_client, const std::shared_ptr& drcb) + bool is_transaction_client, bool is_pubsub_client, bool is_blocking_client, const std::shared_ptr& pubsubcb) : host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)), config_(config), connect_or_op_timer_(dispatcher.createTimer([this]() { onConnectOrOpTimeout(); })), @@ -92,7 +92,7 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis host->stats().cx_active_.inc(); connect_or_op_timer_->enableTimer(host->cluster().connectTimeout()); if (is_pubsub_client_){ - downstream_cb_ = std::move(drcb); + pubsub_cb_ = std::move(pubsubcb); } } @@ -101,12 +101,12 @@ ClientImpl::~ClientImpl() { ASSERT(connection_->state() == Network::Connection::State::Closed); host_->cluster().trafficStats()->upstream_cx_active_.dec(); host_->stats().cx_active_.dec(); - downstream_cb_.reset(); + pubsub_cb_.reset(); } void ClientImpl::close() { - downstream_cb_.reset(); + pubsub_cb_.reset(); if (connection_) { connection_->close(Network::ConnectionCloseType::NoFlush); } @@ -159,6 +159,44 @@ PoolRequest* ClientImpl::makeRequest(const RespValue& request, ClientCallbacks& return &pending_requests_.back(); } + +bool ClientImpl::makePubSubRequest(const RespValue& request) { + ASSERT(connection_->state() == Network::Connection::State::Open); + + const bool empty_buffer = encoder_buffer_.length() == 0; + + Stats::StatName command; + if (config_.enableCommandStats()) { + // Only lowercase command and get StatName if we enable command stats + command = redis_command_stats_->getCommandFromRequest(request); + redis_command_stats_->updateStatsTotal(scope_, command); + } else { + // If disabled, we use a placeholder stat name "unused" that is not used + command = redis_command_stats_->getUnusedStatName(); + } + + encoder_->encode(request, encoder_buffer_); + + // If buffer is full, flush. If the buffer was empty before the request, start the timer. + if (encoder_buffer_.length() >= config_.maxBufferSizeBeforeFlush()) { + flushBufferAndResetTimer(); + } else if (empty_buffer) { + flush_timer_->enableTimer(std::chrono::milliseconds(config_.bufferFlushTimeoutInMs())); + } + + // Only boost the op timeout if: + // - We are already connected. Otherwise, we are governed by the connect timeout and the timer + // will be reset when/if connection occurs. This allows a relatively long connection spin up + // time for example if TLS is being used. + // - Timer is not already armed. Otherwise the timeout would effectively start on + if (connected_ && !connect_or_op_timer_->enabled()){ + connect_or_op_timer_->enableTimer(config_.opTimeout()); + } + + return true; +} + + void ClientImpl::onConnectOrOpTimeout() { putOutlierEvent(Upstream::Outlier::Result::LocalOriginTimeout); if (connected_) { @@ -205,13 +243,13 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { } } // If client is Pubsub handle the upstream close event such that downstream must also be closed. - if (pending_requests_.empty() && is_pubsub_client_) { + if ( is_pubsub_client_) { host_->cluster().trafficStats()->upstream_cx_destroy_with_active_rq_.inc(); ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}'",eventTypeStr); - if ((downstream_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){ + if ((pubsub_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){ ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it"); - downstream_cb_->onFailure(); - downstream_cb_.reset(); + pubsub_cb_->onFailure(); + pubsub_cb_.reset(); } } @@ -229,7 +267,11 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { } else if (event == Network::ConnectionEvent::Connected) { connected_ = true; - ASSERT(!pending_requests_.empty()); + if (!is_pubsub_client_){ + ASSERT(!pending_requests_.empty()); + }else{ + ENVOY_LOG(debug,"Pubsub Client Connection established"); + } if (!is_blocking_client_) { connect_or_op_timer_->enableTimer(config_.opTimeout()); } @@ -243,11 +285,16 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { void ClientImpl::onRespValue(RespValuePtr&& value) { - if (pending_requests_.empty() && is_pubsub_client_) { + if (is_pubsub_client_) { // This is a pubsub client, and we have received a message from the server. // We need to pass this message to the registered callback. - if (downstream_cb_ != nullptr){ - downstream_cb_->sendResponseDownstream(std::move(value)); + if (pubsub_cb_ != nullptr){ + pubsub_cb_->handleChannelMessage(std::move(value)); + if (connect_or_op_timer_->enabled()){ + connect_or_op_timer_->disableTimer(); + } + }else { + ENVOY_LOG(debug,"Pubsub Client Received message from server but no callback registered"); } return; } @@ -352,10 +399,10 @@ ClientPtr ClientFactoryImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope, const std::string& auth_username, - const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client, const std::shared_ptr& drcb) { + const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client, const std::shared_ptr& pubsubcb) { ClientPtr client = ClientImpl::create(host, dispatcher, EncoderPtr{new EncoderImpl()}, decoder_factory_, config, - redis_command_stats, scope, is_transaction_client, is_pubsub_client,is_blocking_client, drcb); + redis_command_stats, scope, is_transaction_client, is_pubsub_client,is_blocking_client, pubsubcb); client->initialize(auth_username, auth_password); return client; } diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index e7a9ffe3343e..c11e070d0217 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -77,12 +77,12 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats, - Stats::Scope& scope, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client, const std::shared_ptr& drcb); + Stats::Scope& scope, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client, const std::shared_ptr& pubsubcb); ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope, - bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr& drcb); + bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr& pubsubcb); ~ClientImpl() override; // Client @@ -91,6 +91,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne } void close() override; PoolRequest* makeRequest(const RespValue& request, ClientCallbacks& callbacks) override; + bool makePubSubRequest(const RespValue& request) override; bool active() override { if (is_pubsub_client_){ return true; @@ -159,7 +160,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne bool is_transaction_client_; bool is_pubsub_client_=false; bool is_blocking_client_=false; - std::shared_ptr downstream_cb_=nullptr; + std::shared_ptr pubsub_cb_=nullptr; }; class ClientFactoryImpl : public ClientFactory { @@ -168,7 +169,7 @@ class ClientFactoryImpl : public ClientFactory { ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope, const std::string& auth_username, - const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr& drcb) override; + const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr& pubsubcb) override; static ClientFactoryImpl instance_; diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 0a3c13b88079..344e0da781b4 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -135,6 +135,14 @@ makeBlockingRequest(const RouteSharedPtr& route, int32_t shard_index, const std: return handler; } +bool +makePubSubRequest(const RouteSharedPtr& route, int32_t shard_index, const std::string& key,Common::Redis::RespValueConstSharedPtr incoming_request,Common::Redis::Client::Transaction& transaction) { + Extensions::NetworkFilters::RedisProxy::ConnPool::InstanceImpl* req_instance = + dynamic_cast(route->upstream(key).get()); + auto isSuccess = req_instance->makePubSubRequest(shard_index,key,ConnPool::RespVariant(incoming_request), transaction); + return isSuccess; +} + /** * Make request and maybe mirror the request based on the mirror policies of the route. * @param route supplies the route matched with the request. @@ -820,8 +828,10 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt Common::Redis::Client::Transaction& transaction = callbacks.transaction(); std::string command_name = absl::AsciiStrToLower(incoming_request->asArray()[0].asString()); - std::unique_ptr request_ptr{ - new PubSubRequest(callbacks, command_stats, time_source, delay_command_latency)}; + //std::unique_ptr request_ptr{ + // new PubSubRequest(callbacks, command_stats, time_source, delay_command_latency)}; + (void)time_source; // Mark time_source as unused + (void)delay_command_latency; // Mark delay_command_latency as unused std::string key = std::string(); int32_t redisShardsCount=0; int32_t shard_index=0; @@ -850,20 +860,23 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt if (transaction.isSubscribedMode() && !Common::Redis::SupportedCommands::subcrStateallowedCommands().contains(command_name)) { callbacks.onResponse( Common::Redis::Utility::makeError(fmt::format("not supported command in subscribe state"))); - return nullptr; + }else if (transaction.isSubscribedMode() && (command_name == "quit")){ transaction.should_close_ = true; transaction.subscribed_client_shard_index_= -1; localResponse(callbacks, "OK"); - return nullptr; } else if (transaction.isTransactionMode()) { // subscription commands are not supported in transaction mode, we will not get here , but just in case callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format("not supported when in transaction mode"))); transaction.should_close_ = true; - return nullptr; } + transaction.setPubSubCallback(nullptr); + return nullptr; }else { if (Common::Redis::SupportedCommands::subcrStateEnterCommands().contains(command_name)){ + auto PubSubMsghandler = std::make_shared(transaction.getDownstreamCallback()); + auto pubsubCallbacksHandler = std::static_pointer_cast(PubSubMsghandler); + transaction.setPubSubCallback(pubsubCallbacksHandler); transaction.clients_.resize(1); transaction.enterSubscribedMode(); if(transaction.subscribed_client_shard_index_ == -1){ @@ -879,10 +892,12 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt shard_index = transaction.subscribed_client_shard_index_; Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); - request_ptr->handle_ = makeBlockingRequest( - route,shard_index,key,base_request, *request_ptr, callbacks.transaction()); + //request_ptr->handle_ = makeBlockingRequest( + // route,shard_index,key,base_request, *request_ptr, callbacks.transaction()); + auto isSuccess = makePubSubRequest( + route,shard_index,key,base_request, callbacks.transaction()); - if (!request_ptr->handle_) { + if (!isSuccess) { command_stats.error_.inc(); transaction.should_close_ = true; transaction.subscribed_client_shard_index_= -1; @@ -892,10 +907,45 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt //Should we set to true irrespective of connpool handler returned transaction.connection_established_=true; transaction.should_close_ = false; - return request_ptr; + Common::Redis::RespValuePtr nullresponse(new Common::Redis::RespValue()); + nullresponse->type(Common::Redis::RespType::Null); + callbacks.onResponse(std::move(nullresponse)); + return nullptr; } +void PubSubMessageHandler::handleChannelMessage(Common::Redis::RespValuePtr&& value) { + if (value->type() == Common::Redis::RespType::Array) { + if (value->asArray().size() == 3) { + if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && + value->asArray()[1].type() == Common::Redis::RespType::BulkString && + value->asArray()[2].type() == Common::Redis::RespType::BulkString) { + std::string message_type = value->asArray()[0].asString(); + std::string channel = value->asArray()[1].asString(); + std::string message = value->asArray()[2].asString(); + if (message_type == "message") { + ENVOY_LOG(debug, "message received on channel '{}': '{}'", channel, message); + downstream_callbacks_->sendResponseDownstream(std::move(value)); + }else { + ENVOY_LOG(debug, "pubsub message type: '{}'", message_type); + downstream_callbacks_->sendResponseDownstream(std::move(value)); + } + } else { + ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + } + } else { + ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + } + } else { + ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + } + +} + +void PubSubMessageHandler::onFailure() { + ENVOY_LOG(debug, "failure in pubsub message handler"); + downstream_callbacks_->onFailure(); +} void MGETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) { pending_requests_[index].handle_ = nullptr; diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h index fc0d22e8e3d7..1a0cad36e034 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h @@ -320,10 +320,19 @@ class PubSubRequest : public SingleServerRequest { SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool delay_command_latency, const StreamInfo::StreamInfo& stream_info); + +}; + +class PubSubMessageHandler : public Common::Redis::Client::PubsubCallbacks,public Logger::Loggable { + private: - PubSubRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, - bool delay_command_latency) - : SingleServerRequest(callbacks, command_stats, time_source, delay_command_latency) {} + std::shared_ptr downstream_callbacks_; + +public: + PubSubMessageHandler(std::shared_ptr downstream_callbacks) : downstream_callbacks_(downstream_callbacks) {} + + void handleChannelMessage(Common::Redis::RespValuePtr&& value) override; + void onFailure() override; }; diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index ed6eb91db077..23256bcef9f0 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -122,6 +122,12 @@ InstanceImpl::makeBlockingClientRequest(int32_t shard_index, const std::string& transaction); } +// This method is always called from a InstanceSharedPtr we don't have to worry about tls_->getTyped +// failing due to InstanceImpl going away. +bool +InstanceImpl::makePubSubRequest(int32_t shard_index, const std::string& key, RespVariant&& request, Common::Redis::Client::Transaction& transaction) { + return tls_->getTyped().makePubSubRequest(shard_index,key,std::move(request), transaction); +} int32_t InstanceImpl::getRedisShardsCount() { @@ -412,14 +418,14 @@ InstanceImpl::ThreadLocalPool::makeBlockingClientRequest(int32_t shard_index, co if (!transaction.connection_established_ && transaction.isSubscribedMode()) { transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), - auth_username_, auth_password_, false,true,false,transaction.getDownstreamCallback()); + auth_username_, auth_password_, false,true,false,transaction.getPubSubCallback()); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } }else if (!transaction.connection_established_ && transaction.isBlockingCommand()) { transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), - auth_username_, auth_password_, false,false,true,transaction.getDownstreamCallback()); + auth_username_, auth_password_, false,false,true,nullptr); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } @@ -442,6 +448,54 @@ InstanceImpl::ThreadLocalPool::makeBlockingClientRequest(int32_t shard_index, co } +bool +InstanceImpl::ThreadLocalPool::makePubSubRequest(int32_t shard_index, const std::string& key,RespVariant&& request,Common::Redis::Client::Transaction& transaction) { + Upstream::HostConstSharedPtr host = nullptr; + bool is_success = false; + + if (cluster_ == nullptr) { + ASSERT(client_map_.empty()); + ASSERT(host_set_member_update_cb_handle_ == nullptr); + return is_success; + } + + if ( shard_index < 0 || !transaction.active_ || !transaction.isSubscribedMode()){ + ENVOY_LOG(debug, "Error in calling makePubSubRequest, shard index is negative or transaction is not active or not in subscribed mode"); + return is_success; + } + + + Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getAllHosts(nullptr); + if (!hosts) { + ENVOY_LOG(error, "Unable to retrive all redis primary shards , possible that we are scaling or upstream error"); + onRequestCompleted(); + return is_success; + } + host = (*hosts)[shard_index]; + + + if (!host) { + ENVOY_LOG(debug, "host not found: '{}'", key); + return is_success; + } + + // For now we create dedicated connection for pubsub commands to be optimised later to use the + // existing connection if available ( Connection Multiplexing forpubsub commands needs to be addded later) + uint32_t client_idx = 0; + client_idx = transaction.current_client_idx_; + if (!transaction.connection_established_ ) { + transaction.clients_[client_idx] = + client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), + auth_username_, auth_password_, false,true,false,transaction.getPubSubCallback()); + if (transaction.connection_cb_) { + transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); + } + } + is_success=transaction.clients_[client_idx]->makePubSubRequest(getRequest(std::move(request))); + return is_success; +} + + Common::Redis::Client::PoolRequest* InstanceImpl::ThreadLocalPool::makeRequestNoKey(int32_t shard_index, RespVariant&& request, PoolCallbacks& callbacks, @@ -476,7 +530,7 @@ InstanceImpl::ThreadLocalPool::makeRequestNoKey(int32_t shard_index, RespVariant if ((!transaction.connection_established_ && transaction.is_subscribed_mode_) || (!transaction.connection_established_ && transaction.is_blocking_command_)) { transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), - auth_username_, auth_password_, false,true,true,transaction.getDownstreamCallback()); + auth_username_, auth_password_, false,true,true,nullptr); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index 44913069a623..29fe1f3e2034 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -101,6 +101,9 @@ class InstanceImpl : public Instance, public std::enable_shared_from_thistype() == Common::Redis::RespType::Null){ + ENVOY_LOG(debug,"Null response received from upstream Possible pubsub message processing, ignoring sending response downstream"); + pending_requests_.pop_front(); + + } // The response we got might not be in order, so flush out what we can. (A new response may // unlock several out of order responses). while (!pending_requests_.empty() && pending_requests_.front().pending_response_) { From 368548f21cd940a2d271be12cd1e25bd756b3b1a Mon Sep 17 00:00:00 2001 From: Sasidharan Gopal Date: Fri, 10 May 2024 16:10:52 +0530 Subject: [PATCH 02/10] Fixed handlechannel message bug and merged new code into current framework --- .../redis_proxy/command_splitter_impl.cc | 200 ++++++++++++++---- .../network/redis_proxy/conn_pool_impl.cc | 20 +- .../network/redis_proxy/proxy_filter.cc | 2 +- 3 files changed, 171 insertions(+), 51 deletions(-) diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 344e0da781b4..81b50613a30a 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -821,6 +821,23 @@ SplitRequestPtr BlockingClientRequest::create(Router& router, Common::Redis::Res return request_ptr; } +bool isKeyspaceArgument(const std::string& argument) { + std::string keyspacepattern = "__keyspace@0__"; + std::string keyeventpattern = "__keyevent@0__"; + if (argument.find(keyspacepattern) == 0 || argument.find(keyeventpattern) == 0) { + return true; + } + return false; +} + +// Generic function to build an array with bulkstring +void addBulkString(Common::Redis::RespValue& requestArray, const std::string& value) { + Common::Redis::RespValue element; + element.type(Common::Redis::RespType::BulkString); + element.asString() = value; + requestArray.asArray().emplace_back(std::move(element)); +} + SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request, SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, bool delay_command_latency, @@ -834,9 +851,45 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt (void)delay_command_latency; // Mark delay_command_latency as unused std::string key = std::string(); int32_t redisShardsCount=0; - int32_t shard_index=0; - + //int32_t shard_index=0; + //bool iserror = false; + + // by default setting single shard request true + bool singleShardRequest = false; + bool allShardsRequest = false; + bool allShardwithSingleShardRequest = false; + Common::Redis::RespValueSharedPtr base_request; + Common::Redis::RespValueSharedPtr keyspaceRequest; + Common::Redis::RespValue keyspaceRequestArray; + // Analyze request to get the proper request type + if (command_name == "subscribe" || command_name == "psubscribe") { + bool containsKeyspaceChannel = false; + bool containsNormalChannel = false; + for (size_t i = 1; i < incoming_request->asArray().size(); i++) { + std::string key = absl::AsciiStrToLower(incoming_request->asArray()[i].asString()); + if (isKeyspaceArgument(key)) { + ENVOY_LOG(debug, "keyspace command: '{}'", key); + containsKeyspaceChannel = true; + } else { + containsNormalChannel = true; + } + } + + if (containsKeyspaceChannel) { + if (!containsNormalChannel) { + allShardsRequest = true; //example: subscribe "__keyspace@0__:del" + } else { + allShardwithSingleShardRequest = true; //example: subscribe test "__keyevent@0__:del" + } + } else if (containsNormalChannel) { + singleShardRequest = true; //example: subscribe test + } + + } else if (command_name == "unsubscribe" || command_name == "punsubscribe") { + allShardsRequest = true; + } + const auto& route = router.upstreamPool(key, stream_info); if (route) { Extensions::NetworkFilters::RedisProxy::ConnPool::InstanceImpl* instance = @@ -865,23 +918,23 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt transaction.should_close_ = true; transaction.subscribed_client_shard_index_= -1; localResponse(callbacks, "OK"); + transaction.setPubSubCallback(nullptr); + return nullptr; } else if (transaction.isTransactionMode()) { // subscription commands are not supported in transaction mode, we will not get here , but just in case callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format("not supported when in transaction mode"))); transaction.should_close_ = true; } - transaction.setPubSubCallback(nullptr); - return nullptr; }else { if (Common::Redis::SupportedCommands::subcrStateEnterCommands().contains(command_name)){ auto PubSubMsghandler = std::make_shared(transaction.getDownstreamCallback()); auto pubsubCallbacksHandler = std::static_pointer_cast(PubSubMsghandler); transaction.setPubSubCallback(pubsubCallbacksHandler); - transaction.clients_.resize(1); + transaction.clients_.resize(redisShardsCount); transaction.enterSubscribedMode(); - if(transaction.subscribed_client_shard_index_ == -1){ - transaction.subscribed_client_shard_index_ = getShardIndex(command_name,1,redisShardsCount); - } + // if(transaction.subscribed_client_shard_index_ == -1){ + // transaction.subscribed_client_shard_index_ = getShardIndex(command_name,1,redisShardsCount); + // } transaction.start(); }else{ ENVOY_LOG(debug, "not yet in subscription mode, must be in subscr mode first: '{}'", command_name); @@ -889,57 +942,128 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt return nullptr; } } - - shard_index = transaction.subscribed_client_shard_index_; - Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); - //request_ptr->handle_ = makeBlockingRequest( - // route,shard_index,key,base_request, *request_ptr, callbacks.transaction()); - auto isSuccess = makePubSubRequest( - route,shard_index,key,base_request, callbacks.transaction()); - if (!isSuccess) { - command_stats.error_.inc(); - transaction.should_close_ = true; - transaction.subscribed_client_shard_index_= -1; - callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); - return nullptr; + if (singleShardRequest) { + // construct pending request hererequest_ptr->num_pending_responses_ = keyspaceResponsesCount + plainResponsesCount; + int32_t shard_index = getShardIndex(command_name,1,redisShardsCount); + Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); + //request_ptr->handle_ = makeBlockingRequest( + // route,shard_index,key,base_request, *request_ptr, callbacks.transaction()); + auto isSuccess = makePubSubRequest( + route,shard_index,key,base_request, callbacks.transaction()); + + if (!isSuccess) { + command_stats.error_.inc(); + transaction.should_close_ = true; + transaction.subscribed_client_shard_index_= -1; + callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); + return nullptr; + } + } + + if (allShardsRequest) { + // construct pending request here + Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); + for (int32_t i = 0; i < redisShardsCount; i++) { + transaction.current_client_idx_ = i; + auto isSuccess = makePubSubRequest( + route,i,key,base_request, callbacks.transaction()); + if (!isSuccess) { + command_stats.error_.inc(); + transaction.should_close_ = true; + transaction.subscribed_client_shard_index_= -1; + callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); + return nullptr; + } + } + } + + if (allShardwithSingleShardRequest) { + // Construct keyspace request + keyspaceRequestArray.type(Common::Redis::RespType::Array); + addBulkString(keyspaceRequestArray, command_name); + for (size_t i = 1; i < incoming_request->asArray().size(); i++) { + std::string key = absl::AsciiStrToLower(incoming_request->asArray()[i].asString()); + if (isKeyspaceArgument(key)) { + addBulkString(keyspaceRequestArray, key); + } + } + + base_request = std::move(incoming_request); + keyspaceRequest = std::make_unique(keyspaceRequestArray); + for (int32_t i = 0; i < redisShardsCount; i++) { + transaction.current_client_idx_ = i; + if ( i == 0) { + auto isSuccess = makePubSubRequest( + route,i,key,base_request, callbacks.transaction()); + if (!isSuccess) { + command_stats.error_.inc(); + transaction.should_close_ = true; + transaction.subscribed_client_shard_index_= -1; + callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); + return nullptr; + } + } else { + auto isSuccess = makePubSubRequest( + route,i,key,keyspaceRequest, callbacks.transaction()); + if (!isSuccess) { + command_stats.error_.inc(); + transaction.should_close_ = true; + transaction.subscribed_client_shard_index_= -1; + callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); + return nullptr; + } + } + } } - //Should we set to true irrespective of connpool handler returned + + // //Should we set to true irrespective of connpool handler returned transaction.connection_established_=true; transaction.should_close_ = false; Common::Redis::RespValuePtr nullresponse(new Common::Redis::RespValue()); nullresponse->type(Common::Redis::RespType::Null); callbacks.onResponse(std::move(nullresponse)); - return nullptr; + return nullptr; } void PubSubMessageHandler::handleChannelMessage(Common::Redis::RespValuePtr&& value) { if (value->type() == Common::Redis::RespType::Array) { if (value->asArray().size() == 3) { - if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && + std::string message_type = value->asArray()[0].asString(); + if (message_type == "message" || message_type == "pmessage") { + if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && value->asArray()[1].type() == Common::Redis::RespType::BulkString && value->asArray()[2].type() == Common::Redis::RespType::BulkString) { - std::string message_type = value->asArray()[0].asString(); - std::string channel = value->asArray()[1].asString(); - std::string message = value->asArray()[2].asString(); - if (message_type == "message") { + std::string message_type = value->asArray()[0].asString(); + std::string channel = value->asArray()[1].asString(); + std::string message = value->asArray()[2].asString(); ENVOY_LOG(debug, "message received on channel '{}': '{}'", channel, message); downstream_callbacks_->sendResponseDownstream(std::move(value)); }else { - ENVOY_LOG(debug, "pubsub message type: '{}'", message_type); + ENVOY_LOG(debug, "unexpected message format for message or pmessage: '{}'", value->toString()); + } + } else if (message_type == "subscribe" || message_type == "unsubscribe" || message_type == "psubscribe" || message_type == "punsubscribe") { + if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && + value->asArray()[1].type() == Common::Redis::RespType::BulkString && + value->asArray()[2].type() == Common::Redis::RespType::Integer) { + std::string message_type = value->asArray()[0].asString(); + std::string channel = value->asArray()[1].asString(); + int64_t count = value->asArray()[2].asInteger(); + ENVOY_LOG(debug, "sub/unsub/psub/pubsub message received on channel '{}': '{}'", channel, count); downstream_callbacks_->sendResponseDownstream(std::move(value)); + }else { + ENVOY_LOG(debug, "unexpected sub/unsub/psub/pubsub message format: '{}'", value->toString()); } } else { - ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); - } + ENVOY_LOG(debug, "unexpected message type: '{}'", value->toString()); + } } else { - ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + ENVOY_LOG(debug, "unexpected message format : '{}'", value->toString()); } } else { - ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + ENVOY_LOG(debug, "unexpected message format 4: '{}'", value->toString()); } - } void PubSubMessageHandler::onFailure() { @@ -1075,14 +1199,6 @@ bool requiresValue(const std::string& arg) { return (arg == "count" || arg == "match" || arg == "type"); } -// Generic function to build an array with bulkstring -void addBulkString(Common::Redis::RespValue& requestArray, const std::string& value) { - Common::Redis::RespValue element; - element.type(Common::Redis::RespType::BulkString); - element.asString() = value; - requestArray.asArray().emplace_back(std::move(element)); -} - void ScanRequest::onChildError(Common::Redis::RespValuePtr&& value) { // Setting null pointer to all pending requests for (auto& request : pending_requests_) { diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 23256bcef9f0..64ca7fb16af1 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -465,13 +465,13 @@ InstanceImpl::ThreadLocalPool::makePubSubRequest(int32_t shard_index, const std: } - Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getAllHosts(nullptr); - if (!hosts) { - ENVOY_LOG(error, "Unable to retrive all redis primary shards , possible that we are scaling or upstream error"); - onRequestCompleted(); - return is_success; - } - host = (*hosts)[shard_index]; + Upstream::HostConstVectorSharedPtr hosts = cluster_->loadBalancer().getAllHosts(nullptr); + if (!hosts) { + ENVOY_LOG(error, "Unable to retrive all redis primary shards , possible that we are scaling or upstream error"); + onRequestCompleted(); + return is_success; + } + host = (*hosts)[shard_index]; if (!host) { @@ -483,13 +483,17 @@ InstanceImpl::ThreadLocalPool::makePubSubRequest(int32_t shard_index, const std: // existing connection if available ( Connection Multiplexing forpubsub commands needs to be addded later) uint32_t client_idx = 0; client_idx = transaction.current_client_idx_; - if (!transaction.connection_established_ ) { + ENVOY_LOG(debug,"Current client index is '{}' '{}' '{}'",client_idx, transaction.clients_.size(), host->address()->asString()); + if (!transaction.connection_established_ || transaction.clients_[client_idx] == nullptr) { + ENVOY_LOG(debug,"Current connection is not established, creating new connection"); transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), auth_username_, auth_password_, false,true,false,transaction.getPubSubCallback()); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } + } else { + ENVOY_LOG(debug,"Current connection is established, using existing connection"); } is_success=transaction.clients_[client_idx]->makePubSubRequest(getRequest(std::move(request))); return is_success; diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.cc b/source/extensions/filters/network/redis_proxy/proxy_filter.cc index 9f8adf2b35cd..dc4feebb3ba3 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.cc +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.cc @@ -229,7 +229,7 @@ void ProxyFilter::onResponse(PendingRequest& request, Common::Redis::RespValuePt request.pending_response_ = std::move(value); request.request_handle_ = nullptr; - if (request.pending_response_.get()->type() == Common::Redis::RespType::Null){ + if (request.pending_response_.get()->type() == Common::Redis::RespType::Null && transaction_.isSubscribedMode()){ ENVOY_LOG(debug,"Null response received from upstream Possible pubsub message processing, ignoring sending response downstream"); pending_requests_.pop_front(); From ec81e30d2a1b005267f1627c4595e54b0cc76031 Mon Sep 17 00:00:00 2001 From: Sasidharan Gopal Date: Mon, 13 May 2024 12:41:22 +0530 Subject: [PATCH 03/10] enhancement --- .../redis_proxy/command_splitter_impl.cc | 87 +++++++------------ 1 file changed, 32 insertions(+), 55 deletions(-) diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 81b50613a30a..57cce71e7e8d 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -880,7 +880,7 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt if (!containsNormalChannel) { allShardsRequest = true; //example: subscribe "__keyspace@0__:del" } else { - allShardwithSingleShardRequest = true; //example: subscribe test "__keyevent@0__:del" + allShardwithSingleShardRequest = true; //example: subscribe test "__keyevent@0__:del" } } else if (containsNormalChannel) { singleShardRequest = true; //example: subscribe test @@ -943,81 +943,57 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt } } - if (singleShardRequest) { - // construct pending request hererequest_ptr->num_pending_responses_ = keyspaceResponsesCount + plainResponsesCount; - int32_t shard_index = getShardIndex(command_name,1,redisShardsCount); - Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); - //request_ptr->handle_ = makeBlockingRequest( - // route,shard_index,key,base_request, *request_ptr, callbacks.transaction()); - auto isSuccess = makePubSubRequest( - route,shard_index,key,base_request, callbacks.transaction()); - + auto makeRequest = [&](int32_t shard_index, Common::Redis::RespValueSharedPtr& base_request) -> bool { + transaction.current_client_idx_ = shard_index; + auto isSuccess = makePubSubRequest(route, shard_index, key, base_request, callbacks.transaction()); if (!isSuccess) { - command_stats.error_.inc(); - transaction.should_close_ = true; - transaction.subscribed_client_shard_index_= -1; - callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); - return nullptr; + command_stats.error_.inc(); + transaction.should_close_ = true; + transaction.subscribed_client_shard_index_ = -1; + callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); + return false; } + return true; + }; + + if (singleShardRequest) { + Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); + int32_t shard_index = getShardIndex(command_name, 1, redisShardsCount); + if (!makeRequest(shard_index, base_request)) + return nullptr; } if (allShardsRequest) { - // construct pending request here Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); for (int32_t i = 0; i < redisShardsCount; i++) { - transaction.current_client_idx_ = i; - auto isSuccess = makePubSubRequest( - route,i,key,base_request, callbacks.transaction()); - if (!isSuccess) { - command_stats.error_.inc(); - transaction.should_close_ = true; - transaction.subscribed_client_shard_index_= -1; - callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); - return nullptr; - } + if (!makeRequest(i, base_request)) + return nullptr; } } - if (allShardwithSingleShardRequest) { - // Construct keyspace request + if (allShardwithSingleShardRequest) { keyspaceRequestArray.type(Common::Redis::RespType::Array); addBulkString(keyspaceRequestArray, command_name); for (size_t i = 1; i < incoming_request->asArray().size(); i++) { - std::string key = absl::AsciiStrToLower(incoming_request->asArray()[i].asString()); - if (isKeyspaceArgument(key)) { - addBulkString(keyspaceRequestArray, key); - } + std::string key = absl::AsciiStrToLower(incoming_request->asArray()[i].asString()); + if (isKeyspaceArgument(key)) { + addBulkString(keyspaceRequestArray, key); + } } - base_request = std::move(incoming_request); keyspaceRequest = std::make_unique(keyspaceRequestArray); for (int32_t i = 0; i < redisShardsCount; i++) { - transaction.current_client_idx_ = i; - if ( i == 0) { - auto isSuccess = makePubSubRequest( - route,i,key,base_request, callbacks.transaction()); - if (!isSuccess) { - command_stats.error_.inc(); - transaction.should_close_ = true; - transaction.subscribed_client_shard_index_= -1; - callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); - return nullptr; - } - } else { - auto isSuccess = makePubSubRequest( - route,i,key,keyspaceRequest, callbacks.transaction()); - if (!isSuccess) { - command_stats.error_.inc(); - transaction.should_close_ = true; - transaction.subscribed_client_shard_index_= -1; - callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); - return nullptr; - } + if (i== 0){ + if (!makeRequest(i, base_request)) + return nullptr; + }else{ + if (!makeRequest(i, keyspaceRequest)) + return nullptr; } } } - // //Should we set to true irrespective of connpool handler returned + //Should we set to true irrespective of connpool handler returned transaction.connection_established_=true; transaction.should_close_ = false; Common::Redis::RespValuePtr nullresponse(new Common::Redis::RespValue()); @@ -1028,6 +1004,7 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt } void PubSubMessageHandler::handleChannelMessage(Common::Redis::RespValuePtr&& value) { + ENVOY_LOG(debug, "message received on channel '{}'", value->toString()); if (value->type() == Common::Redis::RespType::Array) { if (value->asArray().size() == 3) { std::string message_type = value->asArray()[0].asString(); From 6cc5106e7cf9a101fed67db6d4f50d29f5c36dad Mon Sep 17 00:00:00 2001 From: Sasidharan Gopal Date: Mon, 13 May 2024 12:59:49 +0530 Subject: [PATCH 04/10] further optimization --- .../redis_proxy/command_splitter_impl.cc | 35 ++++++++----------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 57cce71e7e8d..3803ebfa0d13 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -845,16 +845,11 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt Common::Redis::Client::Transaction& transaction = callbacks.transaction(); std::string command_name = absl::AsciiStrToLower(incoming_request->asArray()[0].asString()); - //std::unique_ptr request_ptr{ - // new PubSubRequest(callbacks, command_stats, time_source, delay_command_latency)}; (void)time_source; // Mark time_source as unused (void)delay_command_latency; // Mark delay_command_latency as unused std::string key = std::string(); int32_t redisShardsCount=0; - //int32_t shard_index=0; - //bool iserror = false; - // by default setting single shard request true bool singleShardRequest = false; bool allShardsRequest = false; bool allShardwithSingleShardRequest = false; @@ -908,24 +903,24 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt //request_ptr->onChildResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost),0); } - if (transaction.active_ ){ - // when we are in subscribe command, we cannnot accept all other commands - if (transaction.isSubscribedMode() && !Common::Redis::SupportedCommands::subcrStateallowedCommands().contains(command_name)) { - callbacks.onResponse( - Common::Redis::Utility::makeError(fmt::format("not supported command in subscribe state"))); - - }else if (transaction.isSubscribedMode() && (command_name == "quit")){ - transaction.should_close_ = true; - transaction.subscribed_client_shard_index_= -1; - localResponse(callbacks, "OK"); - transaction.setPubSubCallback(nullptr); - return nullptr; + if (transaction.active_) { + if (transaction.isSubscribedMode()) { + if (!Common::Redis::SupportedCommands::subcrStateallowedCommands().contains(command_name)) { + callbacks.onResponse( + Common::Redis::Utility::makeError("Not supported command in subscribe state")); + } else if (command_name == "quit") { + transaction.should_close_ = true; + transaction.subscribed_client_shard_index_ = -1; + localResponse(callbacks, "OK"); + transaction.setPubSubCallback(nullptr); + return nullptr; + } } else if (transaction.isTransactionMode()) { - // subscription commands are not supported in transaction mode, we will not get here , but just in case - callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format("not supported when in transaction mode"))); + callbacks.onResponse( + Common::Redis::Utility::makeError("Not supported when in transaction mode")); transaction.should_close_ = true; } - }else { +}else { if (Common::Redis::SupportedCommands::subcrStateEnterCommands().contains(command_name)){ auto PubSubMsghandler = std::make_shared(transaction.getDownstreamCallback()); auto pubsubCallbacksHandler = std::static_pointer_cast(PubSubMsghandler); From 98bea4ce4aa402b299f2e89ff18fe6a6fc86de22 Mon Sep 17 00:00:00 2001 From: Sasidharan Gopal Date: Tue, 14 May 2024 14:13:08 +0530 Subject: [PATCH 05/10] Handling duplicate initial message --- .../filters/network/common/redis/client.h | 4 ++- .../network/common/redis/client_impl.cc | 5 +-- .../network/common/redis/client_impl.h | 3 ++ .../redis_proxy/command_splitter_impl.cc | 31 +++++++++++-------- .../redis_proxy/command_splitter_impl.h | 12 ++++++- .../network/redis_proxy/conn_pool_impl.cc | 1 + 6 files changed, 39 insertions(+), 17 deletions(-) diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index d579d3b36163..17f94a5686b0 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -69,7 +69,7 @@ class DirectCallbacks { class PubsubCallbacks { public: virtual ~PubsubCallbacks() = default; - virtual void handleChannelMessage(Common::Redis::RespValuePtr&& value) PURE; + virtual void handleChannelMessage(Common::Redis::RespValuePtr&& value, int32_t clientIndex) PURE; virtual void onFailure() PURE; }; @@ -98,6 +98,8 @@ class Client : public Event::DeferredDeletable { * Adds network connection callbacks to the underlying network connection. */ virtual void addConnectionCallbacks(Network::ConnectionCallbacks& callbacks) PURE; + virtual void setCurrentClientIndex(int32_t shardIndex) PURE; + virtual int32_t getCurrentClientIndex() PURE; /** * Called to determine if the client has pending requests. diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 7313a94a7fdd..a1076995adbe 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -284,12 +284,13 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { } void ClientImpl::onRespValue(RespValuePtr&& value) { - + int32_t clientIndex = getCurrentClientIndex(); + ENVOY_LOG(debug, "ClientImpl::onRespValue() clientIndex: {}", clientIndex); if (is_pubsub_client_) { // This is a pubsub client, and we have received a message from the server. // We need to pass this message to the registered callback. if (pubsub_cb_ != nullptr){ - pubsub_cb_->handleChannelMessage(std::move(value)); + pubsub_cb_->handleChannelMessage(std::move(value), clientIndex); if (connect_or_op_timer_->enabled()){ connect_or_op_timer_->disableTimer(); } diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index c11e070d0217..2d83222bb874 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -89,6 +89,8 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne void addConnectionCallbacks(Network::ConnectionCallbacks& callbacks) override { connection_->addConnectionCallbacks(callbacks); } + void setCurrentClientIndex(int32_t index) override { current_shard_index_ = index; } + int32_t getCurrentClientIndex() override { return current_shard_index_; } void close() override; PoolRequest* makeRequest(const RespValue& request, ClientCallbacks& callbacks) override; bool makePubSubRequest(const RespValue& request) override; @@ -161,6 +163,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne bool is_pubsub_client_=false; bool is_blocking_client_=false; std::shared_ptr pubsub_cb_=nullptr; + int32_t current_shard_index_{-1}; }; class ClientFactoryImpl : public ClientFactory { diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 3803ebfa0d13..cd047e1ac53d 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -902,7 +902,8 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); //request_ptr->onChildResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost),0); } - + std::shared_ptr PubSubMsghandler; + PubSubMsghandler = std::make_shared(transaction.getDownstreamCallback()); if (transaction.active_) { if (transaction.isSubscribedMode()) { if (!Common::Redis::SupportedCommands::subcrStateallowedCommands().contains(command_name)) { @@ -922,7 +923,6 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt } }else { if (Common::Redis::SupportedCommands::subcrStateEnterCommands().contains(command_name)){ - auto PubSubMsghandler = std::make_shared(transaction.getDownstreamCallback()); auto pubsubCallbacksHandler = std::static_pointer_cast(PubSubMsghandler); transaction.setPubSubCallback(pubsubCallbacksHandler); transaction.clients_.resize(redisShardsCount); @@ -954,12 +954,14 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt if (singleShardRequest) { Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); int32_t shard_index = getShardIndex(command_name, 1, redisShardsCount); + PubSubMsghandler->setShardIndex(shard_index); if (!makeRequest(shard_index, base_request)) return nullptr; } if (allShardsRequest) { Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); + PubSubMsghandler->setShardIndex(0); for (int32_t i = 0; i < redisShardsCount; i++) { if (!makeRequest(i, base_request)) return nullptr; @@ -977,6 +979,7 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt } base_request = std::move(incoming_request); keyspaceRequest = std::make_unique(keyspaceRequestArray); + PubSubMsghandler->setShardIndex(0); for (int32_t i = 0; i < redisShardsCount; i++) { if (i== 0){ if (!makeRequest(i, base_request)) @@ -998,8 +1001,8 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt return nullptr; } -void PubSubMessageHandler::handleChannelMessage(Common::Redis::RespValuePtr&& value) { - ENVOY_LOG(debug, "message received on channel '{}'", value->toString()); +void PubSubMessageHandler::handleChannelMessageCustom(Common::Redis::RespValuePtr&& value, int32_t clientIndex, int32_t shardIndex) { + ENVOY_LOG(debug, "message received on channel '{}' '{}' '{}'", value->toString(), shardIndex, clientIndex); if (value->type() == Common::Redis::RespType::Array) { if (value->asArray().size() == 3) { std::string message_type = value->asArray()[0].asString(); @@ -1010,22 +1013,24 @@ void PubSubMessageHandler::handleChannelMessage(Common::Redis::RespValuePtr&& va std::string message_type = value->asArray()[0].asString(); std::string channel = value->asArray()[1].asString(); std::string message = value->asArray()[2].asString(); - ENVOY_LOG(debug, "message received on channel '{}': '{}'", channel, message); downstream_callbacks_->sendResponseDownstream(std::move(value)); }else { - ENVOY_LOG(debug, "unexpected message format for message or pmessage: '{}'", value->toString()); + ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); } } else if (message_type == "subscribe" || message_type == "unsubscribe" || message_type == "psubscribe" || message_type == "punsubscribe") { if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && value->asArray()[1].type() == Common::Redis::RespType::BulkString && value->asArray()[2].type() == Common::Redis::RespType::Integer) { - std::string message_type = value->asArray()[0].asString(); - std::string channel = value->asArray()[1].asString(); - int64_t count = value->asArray()[2].asInteger(); - ENVOY_LOG(debug, "sub/unsub/psub/pubsub message received on channel '{}': '{}'", channel, count); - downstream_callbacks_->sendResponseDownstream(std::move(value)); + if (clientIndex == shardIndex) { + std::string message_type = value->asArray()[0].asString(); + std::string channel = value->asArray()[1].asString(); + int64_t count = value->asArray()[2].asInteger(); + downstream_callbacks_->sendResponseDownstream(std::move(value)); + } else { + ENVOY_LOG(debug, "Duplciate message, ignoring..."); + } }else { - ENVOY_LOG(debug, "unexpected sub/unsub/psub/pubsub message format: '{}'", value->toString()); + ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); } } else { ENVOY_LOG(debug, "unexpected message type: '{}'", value->toString()); @@ -1034,7 +1039,7 @@ void PubSubMessageHandler::handleChannelMessage(Common::Redis::RespValuePtr&& va ENVOY_LOG(debug, "unexpected message format : '{}'", value->toString()); } } else { - ENVOY_LOG(debug, "unexpected message format 4: '{}'", value->toString()); + ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); } } diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h index 1a0cad36e034..8555c5e48bfd 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h @@ -327,11 +327,21 @@ class PubSubMessageHandler : public Common::Redis::Client::PubsubCallbacks,publi private: std::shared_ptr downstream_callbacks_; + int32_t shard_index_; public: PubSubMessageHandler(std::shared_ptr downstream_callbacks) : downstream_callbacks_(downstream_callbacks) {} - void handleChannelMessage(Common::Redis::RespValuePtr&& value) override; + void handleChannelMessageCustom(Common::Redis::RespValuePtr&& value, int32_t clientIndex, int32_t shardIndex); + + void setShardIndex(int32_t shard_index) { + shard_index_ = shard_index; + }; + + void handleChannelMessage(Common::Redis::RespValuePtr&& value, int32_t clientIndex) override { + handleChannelMessageCustom(std::move(value), clientIndex, shard_index_); + }; + void onFailure() override; }; diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 64ca7fb16af1..428f59aa7698 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -495,6 +495,7 @@ InstanceImpl::ThreadLocalPool::makePubSubRequest(int32_t shard_index, const std: } else { ENVOY_LOG(debug,"Current connection is established, using existing connection"); } + transaction.clients_[client_idx]->setCurrentClientIndex(client_idx); is_success=transaction.clients_[client_idx]->makePubSubRequest(getRequest(std::move(request))); return is_success; } From 12efe03e399dd70bc340a10675c210fe133a011f Mon Sep 17 00:00:00 2001 From: Sasidharan Gopal Date: Tue, 14 May 2024 14:17:53 +0530 Subject: [PATCH 06/10] updating response format --- .../redis_proxy/command_splitter_impl.cc | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index cd047e1ac53d..3d98625f6465 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -1010,35 +1010,29 @@ void PubSubMessageHandler::handleChannelMessageCustom(Common::Redis::RespValuePt if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && value->asArray()[1].type() == Common::Redis::RespType::BulkString && value->asArray()[2].type() == Common::Redis::RespType::BulkString) { - std::string message_type = value->asArray()[0].asString(); - std::string channel = value->asArray()[1].asString(); - std::string message = value->asArray()[2].asString(); - downstream_callbacks_->sendResponseDownstream(std::move(value)); + downstream_callbacks_->sendResponseDownstream(std::move(value)); }else { - ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + ENVOY_LOG(debug, "unexpected message format for message or pmessage: '{}'", value->toString()); } - } else if (message_type == "subscribe" || message_type == "unsubscribe" || message_type == "psubscribe" || message_type == "punsubscribe") { + }else if (message_type == "subscribe" || message_type == "unsubscribe" || message_type == "psubscribe" || message_type == "punsubscribe") { if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && value->asArray()[1].type() == Common::Redis::RespType::BulkString && value->asArray()[2].type() == Common::Redis::RespType::Integer) { if (clientIndex == shardIndex) { - std::string message_type = value->asArray()[0].asString(); - std::string channel = value->asArray()[1].asString(); - int64_t count = value->asArray()[2].asInteger(); downstream_callbacks_->sendResponseDownstream(std::move(value)); - } else { + }else { ENVOY_LOG(debug, "Duplciate message, ignoring..."); } }else { ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); } - } else { + }else { ENVOY_LOG(debug, "unexpected message type: '{}'", value->toString()); } - } else { + }else { ENVOY_LOG(debug, "unexpected message format : '{}'", value->toString()); } - } else { + }else { ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); } } From d1b5b0196cfb579a2353667b89dd8c82b97511c9 Mon Sep 17 00:00:00 2001 From: Sasidharan Gopal Date: Tue, 14 May 2024 14:28:41 +0530 Subject: [PATCH 07/10] fix --- .../filters/network/redis_proxy/command_splitter_impl.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 3d98625f6465..11693d74ee7a 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -927,9 +927,6 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt transaction.setPubSubCallback(pubsubCallbacksHandler); transaction.clients_.resize(redisShardsCount); transaction.enterSubscribedMode(); - // if(transaction.subscribed_client_shard_index_ == -1){ - // transaction.subscribed_client_shard_index_ = getShardIndex(command_name,1,redisShardsCount); - // } transaction.start(); }else{ ENVOY_LOG(debug, "not yet in subscription mode, must be in subscr mode first: '{}'", command_name); From 33c5187d25358905fc4480d298ad3a3d64fe9d3d Mon Sep 17 00:00:00 2001 From: Sasidharan Gopal Date: Tue, 14 May 2024 17:13:00 +0530 Subject: [PATCH 08/10] Optimization --- .../network/common/redis/client_impl.cc | 1 - .../redis_proxy/command_splitter_impl.cc | 55 ++++++++++--------- .../network/redis_proxy/conn_pool_impl.cc | 1 - 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index a1076995adbe..7b89ba7bb2c1 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -285,7 +285,6 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { void ClientImpl::onRespValue(RespValuePtr&& value) { int32_t clientIndex = getCurrentClientIndex(); - ENVOY_LOG(debug, "ClientImpl::onRespValue() clientIndex: {}", clientIndex); if (is_pubsub_client_) { // This is a pubsub client, and we have received a message from the server. // We need to pass this message to the registered callback. diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 11693d74ee7a..70e74767c06a 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -1000,37 +1000,38 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt void PubSubMessageHandler::handleChannelMessageCustom(Common::Redis::RespValuePtr&& value, int32_t clientIndex, int32_t shardIndex) { ENVOY_LOG(debug, "message received on channel '{}' '{}' '{}'", value->toString(), shardIndex, clientIndex); - if (value->type() == Common::Redis::RespType::Array) { - if (value->asArray().size() == 3) { - std::string message_type = value->asArray()[0].asString(); - if (message_type == "message" || message_type == "pmessage") { - if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && - value->asArray()[1].type() == Common::Redis::RespType::BulkString && - value->asArray()[2].type() == Common::Redis::RespType::BulkString) { - downstream_callbacks_->sendResponseDownstream(std::move(value)); - }else { - ENVOY_LOG(debug, "unexpected message format for message or pmessage: '{}'", value->toString()); - } - }else if (message_type == "subscribe" || message_type == "unsubscribe" || message_type == "psubscribe" || message_type == "punsubscribe") { - if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && - value->asArray()[1].type() == Common::Redis::RespType::BulkString && - value->asArray()[2].type() == Common::Redis::RespType::Integer) { - if (clientIndex == shardIndex) { - downstream_callbacks_->sendResponseDownstream(std::move(value)); - }else { - ENVOY_LOG(debug, "Duplciate message, ignoring..."); - } - }else { - ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + + if (value->type() != Common::Redis::RespType::Array || + value->asArray().size() != 3 || + value->asArray()[0].type() != Common::Redis::RespType::BulkString) { + ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + return; + } + + std::string message_type = value->asArray()[0].asString(); + if (message_type == "message" || message_type == "pmessage") { + if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && + value->asArray()[1].type() == Common::Redis::RespType::BulkString && + value->asArray()[2].type() == Common::Redis::RespType::BulkString) { + downstream_callbacks_->sendResponseDownstream(std::move(value)); + }else { + ENVOY_LOG(debug, "unexpected message format for message or pmessage: '{}'", value->toString()); + return; + } + } + + if (message_type == "subscribe" || message_type == "unsubscribe" || message_type == "psubscribe" || message_type == "punsubscribe") { + if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && + value->asArray()[1].type() == Common::Redis::RespType::BulkString && + value->asArray()[2].type() == Common::Redis::RespType::Integer) { + if (clientIndex == shardIndex) { + downstream_callbacks_->sendResponseDownstream(std::move(value)); } - }else { - ENVOY_LOG(debug, "unexpected message type: '{}'", value->toString()); - } }else { - ENVOY_LOG(debug, "unexpected message format : '{}'", value->toString()); + ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); } }else { - ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + ENVOY_LOG(debug, "unexpected message type: '{}'", value->toString()); } } diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 428f59aa7698..70ed8bd2fcb3 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -483,7 +483,6 @@ InstanceImpl::ThreadLocalPool::makePubSubRequest(int32_t shard_index, const std: // existing connection if available ( Connection Multiplexing forpubsub commands needs to be addded later) uint32_t client_idx = 0; client_idx = transaction.current_client_idx_; - ENVOY_LOG(debug,"Current client index is '{}' '{}' '{}'",client_idx, transaction.clients_.size(), host->address()->asString()); if (!transaction.connection_established_ || transaction.clients_[client_idx] == nullptr) { ENVOY_LOG(debug,"Current connection is not established, creating new connection"); transaction.clients_[client_idx] = From be0c36dcc85fcb568ac7d53d409eb997aae0c9ce Mon Sep 17 00:00:00 2001 From: Sasidharan Gopal Date: Tue, 14 May 2024 22:23:22 +0530 Subject: [PATCH 09/10] optimization on upstream clients --- .../redis_proxy/command_splitter_impl.cc | 27 ++++++++++++++----- .../network/redis_proxy/conn_pool_impl.cc | 6 +---- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 70e74767c06a..0837c0149cf2 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -902,6 +902,7 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); //request_ptr->onChildResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost),0); } + std::shared_ptr PubSubMsghandler; PubSubMsghandler = std::make_shared(transaction.getDownstreamCallback()); if (transaction.active_) { @@ -950,15 +951,20 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt if (singleShardRequest) { Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); - int32_t shard_index = getShardIndex(command_name, 1, redisShardsCount); - PubSubMsghandler->setShardIndex(shard_index); - if (!makeRequest(shard_index, base_request)) + if (transaction.subscribed_client_shard_index_ == -1) { + transaction.subscribed_client_shard_index_ = getShardIndex(command_name, 1, redisShardsCount); + } + PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); + if (!makeRequest(transaction.subscribed_client_shard_index_, base_request)) return nullptr; } if (allShardsRequest) { Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); - PubSubMsghandler->setShardIndex(0); + if (transaction.subscribed_client_shard_index_ == -1) { + transaction.subscribed_client_shard_index_ = getShardIndex(command_name, 1, redisShardsCount); + } + PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); for (int32_t i = 0; i < redisShardsCount; i++) { if (!makeRequest(i, base_request)) return nullptr; @@ -976,9 +982,12 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt } base_request = std::move(incoming_request); keyspaceRequest = std::make_unique(keyspaceRequestArray); - PubSubMsghandler->setShardIndex(0); + if (transaction.subscribed_client_shard_index_ == -1) { + transaction.subscribed_client_shard_index_ = getShardIndex(command_name, 1, redisShardsCount); + } + PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); for (int32_t i = 0; i < redisShardsCount; i++) { - if (i== 0){ + if (i == transaction.subscribed_client_shard_index_){ if (!makeRequest(i, base_request)) return nullptr; }else{ @@ -999,7 +1008,7 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt } void PubSubMessageHandler::handleChannelMessageCustom(Common::Redis::RespValuePtr&& value, int32_t clientIndex, int32_t shardIndex) { - ENVOY_LOG(debug, "message received on channel '{}' '{}' '{}'", value->toString(), shardIndex, clientIndex); + ENVOY_LOG(debug, "message received on channel '{}'", value->toString()); if (value->type() != Common::Redis::RespType::Array || value->asArray().size() != 3 || @@ -1014,6 +1023,7 @@ void PubSubMessageHandler::handleChannelMessageCustom(Common::Redis::RespValuePt value->asArray()[1].type() == Common::Redis::RespType::BulkString && value->asArray()[2].type() == Common::Redis::RespType::BulkString) { downstream_callbacks_->sendResponseDownstream(std::move(value)); + return; }else { ENVOY_LOG(debug, "unexpected message format for message or pmessage: '{}'", value->toString()); return; @@ -1026,6 +1036,9 @@ void PubSubMessageHandler::handleChannelMessageCustom(Common::Redis::RespValuePt value->asArray()[2].type() == Common::Redis::RespType::Integer) { if (clientIndex == shardIndex) { downstream_callbacks_->sendResponseDownstream(std::move(value)); + return; + } else { + ENVOY_LOG(debug, "Duplicate Message, Ignoring!"); } }else { ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 70ed8bd2fcb3..956acd3f81a0 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -481,18 +481,14 @@ InstanceImpl::ThreadLocalPool::makePubSubRequest(int32_t shard_index, const std: // For now we create dedicated connection for pubsub commands to be optimised later to use the // existing connection if available ( Connection Multiplexing forpubsub commands needs to be addded later) - uint32_t client_idx = 0; - client_idx = transaction.current_client_idx_; + uint32_t client_idx = transaction.current_client_idx_; if (!transaction.connection_established_ || transaction.clients_[client_idx] == nullptr) { - ENVOY_LOG(debug,"Current connection is not established, creating new connection"); transaction.clients_[client_idx] = client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_), auth_username_, auth_password_, false,true,false,transaction.getPubSubCallback()); if (transaction.connection_cb_) { transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_); } - } else { - ENVOY_LOG(debug,"Current connection is established, using existing connection"); } transaction.clients_[client_idx]->setCurrentClientIndex(client_idx); is_success=transaction.clients_[client_idx]->makePubSubRequest(getRequest(std::move(request))); From 54421f330cd770b445e222a08bd66ba515fb47df Mon Sep 17 00:00:00 2001 From: dinesh-murugiah Date: Tue, 28 May 2024 23:20:13 +0530 Subject: [PATCH 10/10] memory leak and other fixes --- .../network/common/redis/client_impl.cc | 15 ++- .../redis_proxy/command_splitter_impl.cc | 106 +++++++++--------- .../redis_proxy/command_splitter_impl.h | 4 + .../network/redis_proxy/proxy_filter.cc | 6 + 4 files changed, 74 insertions(+), 57 deletions(-) diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 7b89ba7bb2c1..bff7e2377e31 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -235,6 +235,7 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { event == Network::ConnectionEvent::LocalClose) { std::string eventTypeStr = (event == Network::ConnectionEvent::RemoteClose ? "RemoteClose" : "LocalClose"); + ENVOY_LOG(debug,"Upstream Client Connection close event received:'{}'",eventTypeStr); Upstream::reportUpstreamCxDestroy(host_, event); if (!pending_requests_.empty()) { Upstream::reportUpstreamCxDestroyActiveRequest(host_, event); @@ -245,12 +246,14 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) { // If client is Pubsub handle the upstream close event such that downstream must also be closed. if ( is_pubsub_client_) { host_->cluster().trafficStats()->upstream_cx_destroy_with_active_rq_.inc(); - ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}'",eventTypeStr); - if ((pubsub_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){ - ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it"); - pubsub_cb_->onFailure(); - pubsub_cb_.reset(); - } + ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}', clearing pubsub_cb_",eventTypeStr); + //clear pubsub_cb_ be it either local or remote close + pubsub_cb_.reset(); + + if ((pubsub_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){ + ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it"); + pubsub_cb_->onFailure(); + } } while (!pending_requests_.empty()) { diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index 0837c0149cf2..456615babd8b 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -404,7 +404,7 @@ SplitRequestPtr mgmtNoKeyRequest::create(Router& router, Common::Redis::RespValu else{ ENVOY_LOG(debug, "route not found: '{}'", incoming_request->toString()); callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); - //request_ptr->onChildResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost),0); + return nullptr; } requestsCount = getRequestCount(command_name,redisShardsCount); request_ptr->num_pending_responses_ = requestsCount; @@ -862,24 +862,21 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt bool containsKeyspaceChannel = false; bool containsNormalChannel = false; for (size_t i = 1; i < incoming_request->asArray().size(); i++) { - std::string key = absl::AsciiStrToLower(incoming_request->asArray()[i].asString()); - if (isKeyspaceArgument(key)) { - ENVOY_LOG(debug, "keyspace command: '{}'", key); + std::string channel = absl::AsciiStrToLower(incoming_request->asArray()[i].asString()); + if (isKeyspaceArgument(channel)) { + ENVOY_LOG(debug, "keyspace command: '{}'", channel); containsKeyspaceChannel = true; } else { containsNormalChannel = true; } } - if (containsKeyspaceChannel) { - if (!containsNormalChannel) { - allShardsRequest = true; //example: subscribe "__keyspace@0__:del" - } else { - allShardwithSingleShardRequest = true; //example: subscribe test "__keyevent@0__:del" - } - } else if (containsNormalChannel) { - singleShardRequest = true; //example: subscribe test - } + allShardsRequest = containsKeyspaceChannel && !containsNormalChannel; + allShardwithSingleShardRequest = containsKeyspaceChannel && containsNormalChannel; + singleShardRequest = !containsKeyspaceChannel && containsNormalChannel; + + ENVOY_LOG(debug,"PubSubRequest Request type ++ allShardsRequest: '{}' -- allShardwithSingleShardRequest: '{}' -- singleShardRequest: '{}'",allShardsRequest,allShardwithSingleShardRequest,singleShardRequest); + } else if (command_name == "unsubscribe" || command_name == "punsubscribe") { allShardsRequest = true; @@ -900,11 +897,10 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt else{ ENVOY_LOG(debug, "route not found: '{}'", incoming_request->toString()); callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); - //request_ptr->onChildResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost),0); + return nullptr; } std::shared_ptr PubSubMsghandler; - PubSubMsghandler = std::make_shared(transaction.getDownstreamCallback()); if (transaction.active_) { if (transaction.isSubscribedMode()) { if (!Common::Redis::SupportedCommands::subcrStateallowedCommands().contains(command_name)) { @@ -921,11 +917,14 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt callbacks.onResponse( Common::Redis::Utility::makeError("Not supported when in transaction mode")); transaction.should_close_ = true; + transaction.setPubSubCallback(nullptr); + return nullptr; } }else { + PubSubMsghandler = std::make_shared(transaction.getDownstreamCallback()); if (Common::Redis::SupportedCommands::subcrStateEnterCommands().contains(command_name)){ auto pubsubCallbacksHandler = std::static_pointer_cast(PubSubMsghandler); - transaction.setPubSubCallback(pubsubCallbacksHandler); + transaction.setPubSubCallback(std::move(pubsubCallbacksHandler)); transaction.clients_.resize(redisShardsCount); transaction.enterSubscribedMode(); transaction.start(); @@ -953,21 +952,29 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); if (transaction.subscribed_client_shard_index_ == -1) { transaction.subscribed_client_shard_index_ = getShardIndex(command_name, 1, redisShardsCount); + PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); } - PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); - if (!makeRequest(transaction.subscribed_client_shard_index_, base_request)) + if (!makeRequest(transaction.subscribed_client_shard_index_, base_request)){ + ENVOY_LOG(debug,"makerequest failed for singleshard request"); + transaction.setPubSubCallback(nullptr); return nullptr; + } } if (allShardsRequest) { Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); if (transaction.subscribed_client_shard_index_ == -1) { transaction.subscribed_client_shard_index_ = getShardIndex(command_name, 1, redisShardsCount); + PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); } - PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); + for (int32_t i = 0; i < redisShardsCount; i++) { - if (!makeRequest(i, base_request)) - return nullptr; + if (!makeRequest(i, base_request)){ + ENVOY_LOG(debug,"makerequest failed for allShardsRequest for shardIndex'{}'",i); + transaction.setPubSubCallback(nullptr); + return nullptr; + } + } } @@ -975,24 +982,32 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt keyspaceRequestArray.type(Common::Redis::RespType::Array); addBulkString(keyspaceRequestArray, command_name); for (size_t i = 1; i < incoming_request->asArray().size(); i++) { - std::string key = absl::AsciiStrToLower(incoming_request->asArray()[i].asString()); - if (isKeyspaceArgument(key)) { - addBulkString(keyspaceRequestArray, key); + std::string channel = absl::AsciiStrToLower(incoming_request->asArray()[i].asString()); + if (isKeyspaceArgument(channel)) { + addBulkString(keyspaceRequestArray, channel); } } base_request = std::move(incoming_request); keyspaceRequest = std::make_unique(keyspaceRequestArray); if (transaction.subscribed_client_shard_index_ == -1) { transaction.subscribed_client_shard_index_ = getShardIndex(command_name, 1, redisShardsCount); + PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); } - PubSubMsghandler->setShardIndex(transaction.subscribed_client_shard_index_); + for (int32_t i = 0; i < redisShardsCount; i++) { if (i == transaction.subscribed_client_shard_index_){ - if (!makeRequest(i, base_request)) - return nullptr; + if (!makeRequest(i, base_request)){ + ENVOY_LOG(debug,"makerequest failed for allShardwithSingleShardRequest for base request at shardIndex'{}'",i); + transaction.setPubSubCallback(nullptr); + return nullptr; + } + }else{ - if (!makeRequest(i, keyspaceRequest)) - return nullptr; + if (!makeRequest(i, keyspaceRequest)){ + ENVOY_LOG(debug,"makerequest failed for allShardwithSingleShardRequest for keyspaceRequest at shardIndex'{}'",i); + transaction.setPubSubCallback(nullptr); + return nullptr; + } } } } @@ -1008,10 +1023,9 @@ SplitRequestPtr PubSubRequest::create(Router& router, Common::Redis::RespValuePt } void PubSubMessageHandler::handleChannelMessageCustom(Common::Redis::RespValuePtr&& value, int32_t clientIndex, int32_t shardIndex) { - ENVOY_LOG(debug, "message received on channel '{}'", value->toString()); + ENVOY_LOG(debug, "message received on channel '{}' on shardIndex : '{}'", value->toString(),shardIndex); - if (value->type() != Common::Redis::RespType::Array || - value->asArray().size() != 3 || + if (value->type() != Common::Redis::RespType::Array || value->asArray()[0].type() != Common::Redis::RespType::BulkString) { ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); return; @@ -1019,29 +1033,17 @@ void PubSubMessageHandler::handleChannelMessageCustom(Common::Redis::RespValuePt std::string message_type = value->asArray()[0].asString(); if (message_type == "message" || message_type == "pmessage") { - if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && - value->asArray()[1].type() == Common::Redis::RespType::BulkString && - value->asArray()[2].type() == Common::Redis::RespType::BulkString) { - downstream_callbacks_->sendResponseDownstream(std::move(value)); - return; - }else { - ENVOY_LOG(debug, "unexpected message format for message or pmessage: '{}'", value->toString()); + ENVOY_LOG(debug,"sending response downstream"); + downstream_callbacks_->sendResponseDownstream(std::move(value)); return; - } } if (message_type == "subscribe" || message_type == "unsubscribe" || message_type == "psubscribe" || message_type == "punsubscribe") { - if (value->asArray()[0].type() == Common::Redis::RespType::BulkString && - value->asArray()[1].type() == Common::Redis::RespType::BulkString && - value->asArray()[2].type() == Common::Redis::RespType::Integer) { - if (clientIndex == shardIndex) { - downstream_callbacks_->sendResponseDownstream(std::move(value)); - return; - } else { - ENVOY_LOG(debug, "Duplicate Message, Ignoring!"); - } - }else { - ENVOY_LOG(debug, "unexpected message format: '{}'", value->toString()); + if (clientIndex == shardIndex) { + downstream_callbacks_->sendResponseDownstream(std::move(value)); + return; + } else { + ENVOY_LOG(debug, "Duplicate Message from shard index '{}', Ignoring!",shardIndex); } }else { ENVOY_LOG(debug, "unexpected message type: '{}'", value->toString()); @@ -1286,10 +1288,12 @@ SplitRequestPtr ScanRequest::create(Router& router, Common::Redis::RespValuePtr& request_ptr->num_of_Shards_ = instance->getRedisShardsCount(); if (request_ptr->num_of_Shards_ == 0 ) { callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); + return nullptr; } } else{ callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); + return nullptr; } // If shard index is some random value, we are setting the shard to 0 to avoid crashing diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h index 8555c5e48bfd..d34c7afa10a3 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h @@ -332,6 +332,10 @@ class PubSubMessageHandler : public Common::Redis::Client::PubsubCallbacks,publi public: PubSubMessageHandler(std::shared_ptr downstream_callbacks) : downstream_callbacks_(downstream_callbacks) {} + ~PubSubMessageHandler(){ + ENVOY_LOG(debug,"calling PubSubMessageHandler destructor"); + } + void handleChannelMessageCustom(Common::Redis::RespValuePtr&& value, int32_t clientIndex, int32_t shardIndex); void setShardIndex(int32_t shard_index) { diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.cc b/source/extensions/filters/network/redis_proxy/proxy_filter.cc index dc4feebb3ba3..fe7730aeda94 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.cc +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.cc @@ -129,6 +129,11 @@ void ProxyFilter::onEvent(Network::ConnectionEvent event) { downstream_cb->clearParent(); } transaction_.setDownstreamCallback(nullptr); + if(transaction_.isSubscribedMode()){ + ENVOY_LOG(debug,"ProxyFilter::onEvent Clearing pubsubcb"); + transaction_.setPubSubCallback(nullptr); + + } } @@ -217,6 +222,7 @@ void ProxyFilter::onAsyncResponse(Common::Redis::RespValuePtr&& value){ void ProxyFilter::onPubsubConnClose(){ ASSERT(pending_requests_.empty()); //Close the downstream connection on upstream connection close + transaction_.setPubSubCallback(nullptr); callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); //This callback is called only on remote close , so no need to close the client connnection again transaction_.connection_established_=false;