Skip to content

Commit

Permalink
async client: fail all active requests during destruction (#321)
Browse files Browse the repository at this point in the history
Required for dynamic cluster remove.
  • Loading branch information
mattklein123 authored Jan 5, 2017
1 parent e6f20f6 commit 23d7b06
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
14 changes: 13 additions & 1 deletion source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St
random, std::move(shadow_writer), true),
dispatcher_(dispatcher), local_address_(local_address) {}

AsyncClientImpl::~AsyncClientImpl() { ASSERT(active_requests_.empty()); }
AsyncClientImpl::~AsyncClientImpl() {
while (!active_requests_.empty()) {
active_requests_.front()->failDueToClientDestroy();
}
}

AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
Expand Down Expand Up @@ -124,4 +128,12 @@ void AsyncRequestImpl::resetStream() {
cleanup();
}

void AsyncRequestImpl::failDueToClientDestroy() {
// In this case we are going away because the client is being destroyed. We need to both reset
// the stream as well as raise a failure callback.
reset_callback_();
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
cleanup();
}

} // Http
1 change: 1 addition & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class AsyncRequestImpl final : public AsyncClient::Request,
};

void cleanup();
void failDueToClientDestroy();
void onComplete();

// Http::StreamDecoderFilterCallbacks
Expand Down
14 changes: 14 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,20 @@ TEST_F(AsyncClientImplTest, CancelRequest) {
request->cancel();
}

TEST_F(AsyncClientImplTest, DestroyWithActive) {
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks)
-> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_);
return nullptr;
}));

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true));
EXPECT_CALL(stream_encoder_.stream_, resetStream(_));
EXPECT_CALL(callbacks_, onFailure(_));
client_.send(std::move(message_), callbacks_, Optional<std::chrono::milliseconds>());
}

TEST_F(AsyncClientImplTest, PoolFailure) {
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder&,
Expand Down

0 comments on commit 23d7b06

Please sign in to comment.