Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub framework change - pair programming iter 1 #25

Merged
merged 10 commits into from
May 29, 2024
31 changes: 30 additions & 1 deletion source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ class DirectCallbacks {

};

class PubsubCallbacks {
public:
virtual ~PubsubCallbacks() = default;
virtual void handleChannelMessage(Common::Redis::RespValuePtr&& value, int32_t clientIndex) PURE;
virtual void onFailure() PURE;

};


/**
* DoNothingPoolCallbacks is used for internally generated commands whose response is
Expand All @@ -90,6 +98,8 @@ class Client : public Event::DeferredDeletable {
* Adds network connection callbacks to the underlying network connection.
*/
virtual void addConnectionCallbacks(Network::ConnectionCallbacks& callbacks) PURE;
virtual void setCurrentClientIndex(int32_t shardIndex) PURE;
virtual int32_t getCurrentClientIndex() PURE;

/**
* Called to determine if the client has pending requests.
Expand All @@ -111,6 +121,15 @@ class Client : public Event::DeferredDeletable {
*/
virtual PoolRequest* makeRequest(const RespValue& request, ClientCallbacks& callbacks) PURE;

/**
* Make a pipelined request to the remote redis server.
* @param request supplies the RESP request to make.
* @param callbacks supplies the request callbacks.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
virtual bool makePubSubRequest(const RespValue& request) PURE;

/**
* Initialize the connection. Issue the auth command and readonly command as needed.
* @param auth password for upstream host.
Expand Down Expand Up @@ -222,7 +241,7 @@ class ClientFactory {
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope, const std::string& auth_username,
const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr<DirectCallbacks>& drcb) PURE;
const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr<PubsubCallbacks>& pubsubcb) PURE;
};

// A MULTI command sent when starting a transaction.
Expand Down Expand Up @@ -291,6 +310,15 @@ struct Transaction {
std::shared_ptr<DirectCallbacks> getDownstreamCallback() {
return downstream_cb_;
}

void setPubSubCallback(std::shared_ptr<PubsubCallbacks> callback) {
pubsub_cb_ = callback;
}

std::shared_ptr<PubsubCallbacks> getPubSubCallback() {
return pubsub_cb_;
}

bool active_{false};
bool connection_established_{false};
bool should_close_{false};
Expand All @@ -305,6 +333,7 @@ struct Transaction {
std::vector<ClientPtr> clients_;
Network::ConnectionCallbacks* connection_cb_;
std::shared_ptr<DirectCallbacks> downstream_cb_=nullptr;
std::shared_ptr<PubsubCallbacks> pubsub_cb_=nullptr;

// This index represents the current client on which traffic is being sent to.
// When sending to the main redis server it will be 0, and when sending to one of
Expand Down
90 changes: 70 additions & 20 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatche
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope, bool is_transaction_client, bool is_pubsub_client, bool is_blocking_client, const std::shared_ptr<DirectCallbacks>& drcb) {
Stats::Scope& scope, bool is_transaction_client, bool is_pubsub_client, bool is_blocking_client, const std::shared_ptr<PubsubCallbacks>& pubsubcb) {
auto client =
std::make_unique<ClientImpl>(host, dispatcher, std::move(encoder), decoder_factory, config,
redis_command_stats, scope, is_transaction_client,is_pubsub_client,is_blocking_client,drcb);
redis_command_stats, scope, is_transaction_client,is_pubsub_client,is_blocking_client,pubsubcb);
client->connection_ = host->createConnection(dispatcher, nullptr, nullptr).connection_;
client->connection_->addConnectionCallbacks(*client);
client->connection_->addReadFilter(Network::ReadFilterSharedPtr{new UpstreamReadFilter(*client)});
Expand All @@ -78,7 +78,7 @@ ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatche
ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope,
bool is_transaction_client, bool is_pubsub_client, bool is_blocking_client, const std::shared_ptr<DirectCallbacks>& drcb)
bool is_transaction_client, bool is_pubsub_client, bool is_blocking_client, const std::shared_ptr<PubsubCallbacks>& pubsubcb)
: host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)),
config_(config),
connect_or_op_timer_(dispatcher.createTimer([this]() { onConnectOrOpTimeout(); })),
Expand All @@ -92,7 +92,7 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis
host->stats().cx_active_.inc();
connect_or_op_timer_->enableTimer(host->cluster().connectTimeout());
if (is_pubsub_client_){
downstream_cb_ = std::move(drcb);
pubsub_cb_ = std::move(pubsubcb);
}
}

Expand All @@ -101,12 +101,12 @@ ClientImpl::~ClientImpl() {
ASSERT(connection_->state() == Network::Connection::State::Closed);
host_->cluster().trafficStats()->upstream_cx_active_.dec();
host_->stats().cx_active_.dec();
downstream_cb_.reset();
pubsub_cb_.reset();

}

void ClientImpl::close() {
downstream_cb_.reset();
pubsub_cb_.reset();
if (connection_) {
connection_->close(Network::ConnectionCloseType::NoFlush);
}
Expand Down Expand Up @@ -159,6 +159,44 @@ PoolRequest* ClientImpl::makeRequest(const RespValue& request, ClientCallbacks&
return &pending_requests_.back();
}


bool ClientImpl::makePubSubRequest(const RespValue& request) {
ASSERT(connection_->state() == Network::Connection::State::Open);

const bool empty_buffer = encoder_buffer_.length() == 0;

Stats::StatName command;
if (config_.enableCommandStats()) {
// Only lowercase command and get StatName if we enable command stats
command = redis_command_stats_->getCommandFromRequest(request);
redis_command_stats_->updateStatsTotal(scope_, command);
} else {
// If disabled, we use a placeholder stat name "unused" that is not used
command = redis_command_stats_->getUnusedStatName();
}

encoder_->encode(request, encoder_buffer_);

// If buffer is full, flush. If the buffer was empty before the request, start the timer.
if (encoder_buffer_.length() >= config_.maxBufferSizeBeforeFlush()) {
flushBufferAndResetTimer();
} else if (empty_buffer) {
flush_timer_->enableTimer(std::chrono::milliseconds(config_.bufferFlushTimeoutInMs()));
}

// Only boost the op timeout if:
// - We are already connected. Otherwise, we are governed by the connect timeout and the timer
// will be reset when/if connection occurs. This allows a relatively long connection spin up
// time for example if TLS is being used.
// - Timer is not already armed. Otherwise the timeout would effectively start on
if (connected_ && !connect_or_op_timer_->enabled()){
connect_or_op_timer_->enableTimer(config_.opTimeout());
}

return true;
}


void ClientImpl::onConnectOrOpTimeout() {
putOutlierEvent(Upstream::Outlier::Result::LocalOriginTimeout);
if (connected_) {
Expand Down Expand Up @@ -197,6 +235,7 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) {
event == Network::ConnectionEvent::LocalClose) {

std::string eventTypeStr = (event == Network::ConnectionEvent::RemoteClose ? "RemoteClose" : "LocalClose");
ENVOY_LOG(debug,"Upstream Client Connection close event received:'{}'",eventTypeStr);
Upstream::reportUpstreamCxDestroy(host_, event);
if (!pending_requests_.empty()) {
Upstream::reportUpstreamCxDestroyActiveRequest(host_, event);
Expand All @@ -205,14 +244,16 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) {
}
}
// If client is Pubsub handle the upstream close event such that downstream must also be closed.
if (pending_requests_.empty() && is_pubsub_client_) {
if ( is_pubsub_client_) {
host_->cluster().trafficStats()->upstream_cx_destroy_with_active_rq_.inc();
ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}'",eventTypeStr);
if ((downstream_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){
ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it");
downstream_cb_->onFailure();
downstream_cb_.reset();
}
ENVOY_LOG(debug,"Pubsub Client Connection close event received:'{}', clearing pubsub_cb_",eventTypeStr);
//clear pubsub_cb_ be it either local or remote close
pubsub_cb_.reset();

if ((pubsub_cb_ != nullptr)&&(event == Network::ConnectionEvent::RemoteClose)){
ENVOY_LOG(debug,"Pubsub Client Remote close received on Downstream Notify Upstream and close it");
pubsub_cb_->onFailure();
}
}

while (!pending_requests_.empty()) {
Expand All @@ -229,7 +270,11 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) {

} else if (event == Network::ConnectionEvent::Connected) {
connected_ = true;
ASSERT(!pending_requests_.empty());
if (!is_pubsub_client_){
ASSERT(!pending_requests_.empty());
}else{
ENVOY_LOG(debug,"Pubsub Client Connection established");
}
if (!is_blocking_client_) {
connect_or_op_timer_->enableTimer(config_.opTimeout());
}
Expand All @@ -242,12 +287,17 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) {
}

void ClientImpl::onRespValue(RespValuePtr&& value) {

if (pending_requests_.empty() && is_pubsub_client_) {
int32_t clientIndex = getCurrentClientIndex();
if (is_pubsub_client_) {
// This is a pubsub client, and we have received a message from the server.
// We need to pass this message to the registered callback.
if (downstream_cb_ != nullptr){
downstream_cb_->sendResponseDownstream(std::move(value));
if (pubsub_cb_ != nullptr){
pubsub_cb_->handleChannelMessage(std::move(value), clientIndex);
if (connect_or_op_timer_->enabled()){
connect_or_op_timer_->disableTimer();
}
}else {
ENVOY_LOG(debug,"Pubsub Client Received message from server but no callback registered");
}
return;
}
Expand Down Expand Up @@ -352,10 +402,10 @@ ClientPtr ClientFactoryImpl::create(Upstream::HostConstSharedPtr host,
Event::Dispatcher& dispatcher, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope, const std::string& auth_username,
const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client, const std::shared_ptr<DirectCallbacks>& drcb) {
const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client, const std::shared_ptr<PubsubCallbacks>& pubsubcb) {
ClientPtr client =
ClientImpl::create(host, dispatcher, EncoderPtr{new EncoderImpl()}, decoder_factory_, config,
redis_command_stats, scope, is_transaction_client, is_pubsub_client,is_blocking_client, drcb);
redis_command_stats, scope, is_transaction_client, is_pubsub_client,is_blocking_client, pubsubcb);
client->initialize(auth_username, auth_password);
return client;
}
Expand Down
12 changes: 8 additions & 4 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,23 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client, const std::shared_ptr<DirectCallbacks>& drcb);
Stats::Scope& scope, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client, const std::shared_ptr<PubsubCallbacks>& pubsubcb);

ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder,
DecoderFactory& decoder_factory, const Config& config,
const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope,
bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr<DirectCallbacks>& drcb);
bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr<PubsubCallbacks>& pubsubcb);
~ClientImpl() override;

// Client
void addConnectionCallbacks(Network::ConnectionCallbacks& callbacks) override {
connection_->addConnectionCallbacks(callbacks);
}
void setCurrentClientIndex(int32_t index) override { current_shard_index_ = index; }
int32_t getCurrentClientIndex() override { return current_shard_index_; }
void close() override;
PoolRequest* makeRequest(const RespValue& request, ClientCallbacks& callbacks) override;
bool makePubSubRequest(const RespValue& request) override;
bool active() override {
if (is_pubsub_client_){
return true;
Expand Down Expand Up @@ -159,7 +162,8 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
bool is_transaction_client_;
bool is_pubsub_client_=false;
bool is_blocking_client_=false;
std::shared_ptr<Extensions::NetworkFilters::Common::Redis::Client::DirectCallbacks> downstream_cb_=nullptr;
std::shared_ptr<Extensions::NetworkFilters::Common::Redis::Client::PubsubCallbacks> pubsub_cb_=nullptr;
int32_t current_shard_index_{-1};
};

class ClientFactoryImpl : public ClientFactory {
Expand All @@ -168,7 +172,7 @@ class ClientFactoryImpl : public ClientFactory {
ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats,
Stats::Scope& scope, const std::string& auth_username,
const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr<DirectCallbacks>& drcb) override;
const std::string& auth_password, bool is_transaction_client, bool is_pubsub_client,bool is_blocking_client,const std::shared_ptr<PubsubCallbacks>& pubsubcb) override;

static ClientFactoryImpl instance_;

Expand Down
Loading