Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor async client #12

Merged
merged 1 commit into from
Aug 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,18 @@ class AsyncClient {
virtual void cancel() PURE;
};

typedef std::unique_ptr<Request> RequestPtr;

virtual ~AsyncClient() {}

/**
* Send an HTTP request asynchronously
* @param request the request to send
* @param callbacks the callbacks to be notified of request status
* @param request the request to send.
* @param callbacks the callbacks to be notified of request status.
* @return a request handle or nullptr if no request could be created. NOTE: In this case
* onFailure() has already been called inline.
* onFailure() has already been called inline. The client owns the request and the
* handle should just be used to cancel.
*/
virtual RequestPtr send(MessagePtr&& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
virtual Request* send(MessagePtr&& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
};

typedef std::unique_ptr<AsyncClient> AsyncClientPtr;
Expand Down
17 changes: 6 additions & 11 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ class ClusterManager {
*/
virtual const Cluster* get(const std::string& cluster) PURE;

/**
* @return whether the cluster manager knows about a particular cluster by name.
*/
virtual bool has(const std::string& cluster) PURE;

/**
* Allocate a load balanced HTTP connection pool for a cluster. This is *per-thread* so that
* callers do not need to worry about per thread synchronization. The load balancing policy that
* is used is the one defined on the cluster when it was created.
*
* Can return nullptr if there is no host available in the cluster or the cluster name is not
* valid.
* Can return nullptr if there is no host available in the cluster.
*/
virtual Http::ConnectionPool::Instance* httpConnPoolForCluster(const std::string& cluster) PURE;

Expand All @@ -52,15 +46,16 @@ class ClusterManager {
* load balancing policy that is used is the one defined on the cluster when it was created.
*
* Returns both a connection and the host that backs the connection. Both can be nullptr if there
* is no host available in the cluster or the cluster name is not valid.
* is no host available in the cluster.
*/
virtual Host::CreateConnectionData tcpConnForCluster(const std::string& cluster) PURE;

/**
* Returns a client that can be used to make async HTTP calls against the given cluster. The
* client may be backed by a connection pool or by a multiplexed connection.
* Returns a client that can be used to make async HTTP calls against the given cluster. The
* client may be backed by a connection pool or by a multiplexed connection. The cluster manager
* owns the client.
*/
virtual Http::AsyncClientPtr httpAsyncClientForCluster(const std::string& cluster) PURE;
virtual Http::AsyncClient& httpAsyncClientForCluster(const std::string& cluster) PURE;

/**
* Shutdown the cluster prior to destroying connection pools and other thread local data.
Expand Down
3 changes: 1 addition & 2 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ class Cluster : public virtual HostSet {
virtual ResourceManager& resourceManager() const PURE;

/**
* Shutdown the cluster manager prior to destroying connection pools and other thread local
* data.
* Shutdown the cluster prior to destroying connection pools and other thread local data.
*/
virtual void shutdown() PURE;

Expand Down
5 changes: 5 additions & 0 deletions source/common/common/linked_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ template <class T> class LinkedObject {
return entry_;
}

/**
* @return whether the object is currently inserted into a list.
*/
bool inserted() { return inserted_; }

/**
* Move a linked item between 2 lists.
* @param list1 supplies the first list.
Expand Down
16 changes: 3 additions & 13 deletions source/common/filter/auth/client_ssl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Config::Config(const Json::Object& config, ThreadLocal::Instance& tls, Upstream:
ip_white_list_(config), stats_(generateStats(stats_store, config.getString("stat_prefix"))),
runtime_(runtime), local_address_(local_address) {

if (!cm_.has(auth_api_cluster_)) {
if (!cm_.get(auth_api_cluster_)) {
throw EnvoyException(
fmt::format("unknown cluster '{}' in client ssl auth config", auth_api_cluster_));
}
Expand Down Expand Up @@ -83,29 +83,19 @@ void Config::onFailure(Http::AsyncClient::FailureReason) {
}

void Config::refreshPrincipals() {
ASSERT(!active_request_);
active_request_.reset(new ActiveRequest());
active_request_->client_ = cm_.httpAsyncClientForCluster(auth_api_cluster_);
if (!active_request_->client_) {
onFailure(Http::AsyncClient::FailureReason::Reset);
return;
}

Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "GET");
message->headers().addViaMoveValue(Http::Headers::get().Path, "/v1/certs/list/approved");
message->headers().addViaCopy(Http::Headers::get().Host, auth_api_cluster_);
message->headers().addViaCopy(Http::Headers::get().ForwardedFor, local_address_);
active_request_->request_ = active_request_->client_->send(std::move(message), *this,
Optional<std::chrono::milliseconds>());
cm_.httpAsyncClientForCluster(auth_api_cluster_)
.send(std::move(message), *this, Optional<std::chrono::milliseconds>());
}

void Config::requestComplete() {
std::chrono::milliseconds interval(
runtime_.snapshot().getInteger("auth.clientssl.refresh_interval_ms", 60000));

active_request_.reset();
interval_timer_->enableTimer(interval);
}

Expand Down
8 changes: 0 additions & 8 deletions source/common/filter/auth/client_ssl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ class Config : public Http::AsyncClient::Callbacks {
void onFailure(Http::AsyncClient::FailureReason reason) override;

private:
struct ActiveRequest {
Http::AsyncClientPtr client_;
Http::AsyncClient::RequestPtr request_;
};

typedef std::unique_ptr<ActiveRequest> ActiveRequestPtr;

static GlobalStats generateStats(Stats::Store& store, const std::string& prefix);
AllowedPrincipalsPtr parseAuthResponse(Http::Message& message);
void refreshPrincipals();
Expand All @@ -94,7 +87,6 @@ class Config : public Http::AsyncClient::Callbacks {
uint32_t tls_slot_;
Upstream::ClusterManager& cm_;
const std::string auth_api_cluster_;
ActiveRequestPtr active_request_;
Event::TimerPtr interval_timer_;
Network::IpWhiteList ip_white_list_;
GlobalStats stats_;
Expand Down
2 changes: 1 addition & 1 deletion source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TcpProxyConfig::TcpProxyConfig(const Json::Object& config,
Upstream::ClusterManager& cluster_manager, Stats::Store& stats_store)
: cluster_name_(config.getString("cluster")),
stats_(generateStats(config.getString("stat_prefix"), stats_store)) {
if (!cluster_manager.has(cluster_name_)) {
if (!cluster_manager.get(cluster_name_)) {
throw EnvoyException(fmt::format("tcp proxy: unknown cluster '{}'", cluster_name_));
}
}
Expand Down
11 changes: 1 addition & 10 deletions source/common/grpc/rpc_channel_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ void RpcChannelImpl::CallMethod(const proto::MethodDescriptor* method, proto::Rp
// here for clarity.
ASSERT(cm_.get(cluster_)->features() & Upstream::Cluster::Features::HTTP2);

client_ = cm_.httpAsyncClientForCluster(cluster_);
if (!client_) {
onFailureWorker(Optional<uint64_t>(), "http request failure");
return;
}

Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "POST");
Expand All @@ -46,10 +40,7 @@ void RpcChannelImpl::CallMethod(const proto::MethodDescriptor* method, proto::Rp
message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE);
message->body(serializeBody(*grpc_request));

http_request_ = client_->send(std::move(message), *this, timeout_);
if (!http_request_) {
onFailureWorker(Optional<uint64_t>(), "http request failure");
}
http_request_ = cm_.httpAsyncClientForCluster(cluster_).send(std::move(message), *this, timeout_);
}

void RpcChannelImpl::incStat(bool success) {
Expand Down
3 changes: 1 addition & 2 deletions source/common/grpc/rpc_channel_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class RpcChannelImpl : public RpcChannel, public Http::AsyncClient::Callbacks {

Upstream::ClusterManager& cm_;
const std::string cluster_;
Http::AsyncClientPtr client_;
Http::AsyncClient::RequestPtr http_request_;
Http::AsyncClient::Request* http_request_{};
const proto::MethodDescriptor* grpc_method_{};
proto::Message* grpc_response_{};
RpcChannelCallbacks& callbacks_;
Expand Down
81 changes: 48 additions & 33 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,47 @@

namespace Http {

const Http::HeaderMapImpl AsyncRequestImpl::SERVICE_UNAVAILABLE_HEADER{
{Http::Headers::get().Status, std::to_string(enumToInt(Http::Code::ServiceUnavailable))}};
const HeaderMapImpl AsyncRequestImpl::SERVICE_UNAVAILABLE_HEADER{
{Headers::get().Status, std::to_string(enumToInt(Code::ServiceUnavailable))}};

const Http::HeaderMapImpl AsyncRequestImpl::REQUEST_TIMEOUT_HEADER{
{Http::Headers::get().Status, std::to_string(enumToInt(Http::Code::GatewayTimeout))}};
const HeaderMapImpl AsyncRequestImpl::REQUEST_TIMEOUT_HEADER{
{Headers::get().Status, std::to_string(enumToInt(Code::GatewayTimeout))}};

AsyncClientImpl::AsyncClientImpl(ConnectionPool::Instance& conn_pool, const std::string& cluster,
Stats::Store& stats_store, Event::Dispatcher& dispatcher)
: conn_pool_(conn_pool), stat_prefix_(fmt::format("cluster.{}.", cluster)),
stats_store_(stats_store), dispatcher_(dispatcher) {}
AsyncClientImpl::AsyncClientImpl(const Upstream::Cluster& cluster,
AsyncClientConnPoolFactory& factory, Stats::Store& stats_store,
Event::Dispatcher& dispatcher)
: cluster_(cluster), factory_(factory), stats_store_(stats_store), dispatcher_(dispatcher),
stat_prefix_(fmt::format("cluster.{}.", cluster.name())) {}

AsyncClientImpl::~AsyncClientImpl() { ASSERT(active_requests_.empty()); }

AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
ConnectionPool::Instance* conn_pool = factory_.connPool();
if (!conn_pool) {
callbacks.onFailure(AsyncClient::FailureReason::Reset);
return nullptr;
}

AsyncClient::RequestPtr AsyncClientImpl::send(MessagePtr&& request,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncRequestImpl> new_request{
new AsyncRequestImpl(std::move(request), *this, callbacks, dispatcher_, timeout)};
new AsyncRequestImpl(std::move(request), *this, callbacks, dispatcher_, *conn_pool, timeout)};

// The request may get immediately failed. If so, we will return nullptr.
if (new_request->stream_encoder_) {
return std::move(new_request);
new_request->moveIntoList(std::move(new_request), active_requests_);
return active_requests_.front().get();
} else {
return nullptr;
}
}

AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks, Event::Dispatcher& dispatcher,
ConnectionPool::Instance& conn_pool,
const Optional<std::chrono::milliseconds>& timeout)
: request_(std::move(request)), parent_(parent), callbacks_(callbacks) {

stream_encoder_.reset(new PooledStreamEncoder(parent_.conn_pool_, *this, *this, 0, 0, *this));
stream_encoder_.reset(new PooledStreamEncoder(conn_pool, *this, *this, 0, 0, *this));
stream_encoder_->encodeHeaders(request_->headers(), !request_->body());

// We might have been immediately failed.
Expand Down Expand Up @@ -66,9 +76,9 @@ void AsyncRequestImpl::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
-> void { log_debug(" '{}':'{}'", key.get(), value); });
#endif

Http::CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
response_->headers(), true, EMPTY_STRING, EMPTY_STRING};
Http::CodeUtility::chargeResponseStat(info);
CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
response_->headers(), true, EMPTY_STRING, EMPTY_STRING};
CodeUtility::chargeResponseStat(info);

if (end_stream) {
onComplete();
Expand Down Expand Up @@ -102,40 +112,45 @@ void AsyncRequestImpl::decodeTrailers(HeaderMapPtr&& trailers) {

void AsyncRequestImpl::onComplete() {
// TODO: Check host's canary status in addition to canary header.
Http::CodeUtility::ResponseTimingInfo info{
CodeUtility::ResponseTimingInfo info{
parent_.stats_store_, parent_.stat_prefix_, stream_encoder_->requestCompleteTime(),
response_->headers().get(Http::Headers::get().EnvoyUpstreamCanary) == "true", true,
EMPTY_STRING, EMPTY_STRING};
Http::CodeUtility::chargeResponseTiming(info);
response_->headers().get(Headers::get().EnvoyUpstreamCanary) == "true", true, EMPTY_STRING,
EMPTY_STRING};
CodeUtility::chargeResponseTiming(info);

cleanup();
callbacks_.onSuccess(std::move(response_));
cleanup();
}

void AsyncRequestImpl::onResetStream(StreamResetReason) {
Http::CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
SERVICE_UNAVAILABLE_HEADER, true, EMPTY_STRING,
EMPTY_STRING};
Http::CodeUtility::chargeResponseStat(info);
CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
SERVICE_UNAVAILABLE_HEADER, true, EMPTY_STRING, EMPTY_STRING};
CodeUtility::chargeResponseStat(info);
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
cleanup();
callbacks_.onFailure(Http::AsyncClient::FailureReason::Reset);
}

void AsyncRequestImpl::onRequestTimeout() {
Http::CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
REQUEST_TIMEOUT_HEADER, true, EMPTY_STRING,
EMPTY_STRING};
Http::CodeUtility::chargeResponseStat(info);
parent_.stats_store_.counter(fmt::format("{}upstream_rq_timeout", parent_.stat_prefix_)).inc();
CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
REQUEST_TIMEOUT_HEADER, true, EMPTY_STRING, EMPTY_STRING};
CodeUtility::chargeResponseStat(info);
parent_.cluster_.stats().upstream_rq_timeout_.inc();
stream_encoder_->resetStream();
callbacks_.onFailure(AsyncClient::FailureReason::RequestTimemout);
cleanup();
callbacks_.onFailure(Http::AsyncClient::FailureReason::RequestTimemout);
}

void AsyncRequestImpl::cleanup() {
stream_encoder_.reset();
if (request_timeout_) {
request_timeout_->disableTimer();
}

// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
if (inserted()) {
removeFromList(parent_.active_requests_);
}
}

} // Http
Loading