From ec829a24a3bdffeb16a4b59f8a78004cc72a261d Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Thu, 5 Jan 2017 08:46:53 -0800 Subject: [PATCH] async client: fail all active requests during destruction Required for dynamic cluster remove. --- source/common/http/async_client_impl.cc | 14 +++++++++++++- source/common/http/async_client_impl.h | 1 + test/common/http/async_client_impl_test.cc | 14 ++++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index a0483c43ecd0..d6972f903632 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -19,7 +19,11 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St random, std::move(shadow_writer), true), dispatcher_(dispatcher), local_address_(local_address) {} -AsyncClientImpl::~AsyncClientImpl() { ASSERT(active_requests_.empty()); } +AsyncClientImpl::~AsyncClientImpl() { + while (!active_requests_.empty()) { + active_requests_.front()->failDueToClientDestroy(); + } +} AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks, const Optional& timeout) { @@ -124,4 +128,12 @@ void AsyncRequestImpl::resetStream() { cleanup(); } +void AsyncRequestImpl::failDueToClientDestroy() { + // In this case we are going away because the client is being destroyed. We need to both reset + // the stream as well as raise a failure callback. + reset_callback_(); + callbacks_.onFailure(AsyncClient::FailureReason::Reset); + cleanup(); +} + } // Http diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 0052e9214f80..2f0aff12ef43 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -119,6 +119,7 @@ class AsyncRequestImpl final : public AsyncClient::Request, }; void cleanup(); + void failDueToClientDestroy(); void onComplete(); // Http::StreamDecoderFilterCallbacks diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 7f953e4cbe2c..3355541f0c31 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -252,6 +252,20 @@ TEST_F(AsyncClientImplTest, CancelRequest) { request->cancel(); } +TEST_F(AsyncClientImplTest, DestroyWithActive) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + return nullptr; + })); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true)); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + EXPECT_CALL(callbacks_, onFailure(_)); + client_.send(std::move(message_), callbacks_, Optional()); +} + TEST_F(AsyncClientImplTest, PoolFailure) { EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) .WillOnce(Invoke([&](StreamDecoder&,