Skip to content

Commit

Permalink
http: allow decoding/encoding a header only request/response (envoypr…
Browse files Browse the repository at this point in the history
…oxy#4885)

This adds a new return value to encodeHeaders that indicates that
filter iteration should immediately stop, dropping any pending data/trailers.
This results in a header only request/response, which can be used to
terminate stream processing early, eg in case of invalid
request or response.

Signed-off-by: Snow Pettersen <snowp@squareup.com>
Signed-off-by: Fred Douglas <fredlas@google.com>
  • Loading branch information
snowp authored and fredlas committed Mar 5, 2019
1 parent d999605 commit 867214b
Show file tree
Hide file tree
Showing 13 changed files with 511 additions and 13 deletions.
5 changes: 4 additions & 1 deletion include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ enum class FilterHeadersStatus {
// Do not iterate to any of the remaining filters in the chain. Returning
// FilterDataStatus::Continue from decodeData()/encodeData() or calling
// continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired.
StopIteration
StopIteration,
// Continue iteration to remaining filters, but ignore any subsequent data or trailers. This
// results in creating a header only request/response.
ContinueAndEndStream
};

/**
Expand Down
71 changes: 61 additions & 10 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,9 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
*request_headers_);

// We end the decode here only if the request is header only. If we convert the request to a
// header only, the stream will be marked as done once a subsequent decodeData/decodeTrailers is
// called with end_stream=true.
maybeEndDecode(end_stream);

// Drop new requests when overloaded as soon as we have decoded the headers.
Expand Down Expand Up @@ -751,12 +754,16 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte
for (; entry != decoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
state_.filter_call_state_ |= FilterCallState::DecodeHeaders;
FilterHeadersStatus status = (*entry)->decodeHeaders(
headers, end_stream && continue_data_entry == decoder_filters_.end());
const auto current_filter_end_stream =
decoding_headers_only_ || (end_stream && continue_data_entry == decoder_filters_.end());
FilterHeadersStatus status = (*entry)->decodeHeaders(headers, current_filter_end_stream);

ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && current_filter_end_stream));
state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterHeadersCallback(status) &&

if (!(*entry)->commonHandleAfterHeadersCallback(status, decoding_headers_only_) &&
std::next(entry) != decoder_filters_.end()) {
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with
// processing since we need to handle the case where a terminal filter wants to buffer, but
Expand Down Expand Up @@ -795,6 +802,11 @@ void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter*
Buffer::Instance& data, bool end_stream) {
resetIdleTimer();

// If we previously decided to decode only the headers, do nothing here.
if (decoding_headers_only_) {
return;
}

// If a response is complete or a reset has been sent, filters do not care about further body
// data. Just drop it.
if (state_.local_complete_) {
Expand Down Expand Up @@ -894,6 +906,11 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(HeaderMapPtr&& trailers

void ConnectionManagerImpl::ActiveStream::decodeTrailers(ActiveStreamDecoderFilter* filter,
HeaderMap& trailers) {
// If we previously decided to decode only the headers, do nothing here.
if (decoding_headers_only_) {
return;
}

// See decodeData() above for why we check local_complete_ here.
if (state_.local_complete_) {
return;
Expand Down Expand Up @@ -1051,11 +1068,22 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeHeaders));
state_.filter_call_state_ |= FilterCallState::EncodeHeaders;
FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(
headers, end_stream && continue_data_entry == encoder_filters_.end());
headers,
encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end()));
state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders;
ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterHeadersCallback(status)) {

const auto continue_iteration =
(*entry)->commonHandleAfterHeadersCallback(status, encoding_headers_only_);

// If we're encoding a headers only response, then mark the local as complete. This ensures
// that we don't attempt to reset the downstream request in doEndStream.
if (encoding_headers_only_) {
state_.local_complete_ = true;
}

if (!continue_iteration) {
return;
}

Expand Down Expand Up @@ -1147,12 +1175,15 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
chargeStats(headers);

ENVOY_STREAM_LOG(debug, "encoding headers via codec (end_stream={}):\n{}", *this,
end_stream && continue_data_entry == encoder_filters_.end(), headers);
encoding_headers_only_ ||
(end_stream && continue_data_entry == encoder_filters_.end()),
headers);

// Now actually encode via the codec.
stream_info_.onFirstDownstreamTxByteSent();
response_encoder_->encodeHeaders(headers,
end_stream && continue_data_entry == encoder_filters_.end());
response_encoder_->encodeHeaders(
headers,
encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end()));

if (continue_data_entry != encoder_filters_.end()) {
// We use the continueEncoding() code since it will correctly handle not calling
Expand All @@ -1161,7 +1192,9 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
(*continue_data_entry)->stopped_ = true;
(*continue_data_entry)->continueEncoding();
} else {
maybeEndEncode(end_stream);
// End encoding if this is a header only response, either due to a filter converting it to one
// or due to the upstream returning headers only.
maybeEndEncode(encoding_headers_only_ || end_stream);
}
}

Expand Down Expand Up @@ -1200,6 +1233,12 @@ void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilt
void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter* filter,
Buffer::Instance& data, bool end_stream) {
resetIdleTimer();

// If we previously decided to encode only the headers, do nothing here.
if (encoding_headers_only_) {
return;
}

std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, end_stream);
auto trailers_added_entry = encoder_filters_.end();

Expand Down Expand Up @@ -1252,6 +1291,12 @@ void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter*
void ConnectionManagerImpl::ActiveStream::encodeTrailers(ActiveStreamEncoderFilter* filter,
HeaderMap& trailers) {
resetIdleTimer();

// If we previously decided to encode only the headers, do nothing here.
if (encoding_headers_only_) {
return;
}

std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, true);
for (; entry != encoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeTrailers));
Expand Down Expand Up @@ -1418,13 +1463,19 @@ bool ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleAfter100Continue
}

bool ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleAfterHeadersCallback(
FilterHeadersStatus status) {
FilterHeadersStatus status, bool& headers_only) {
ASSERT(!headers_continued_);
ASSERT(!stopped_);

if (status == FilterHeadersStatus::StopIteration) {
stopped_ = true;
return false;
} else if (status == FilterHeadersStatus::ContinueAndEndStream) {
// Set headers_only to true so we know to end early if necessary,
// but continue filter iteration so we actually write the headers/run the cleanup code.
headers_only = true;
ENVOY_STREAM_LOG(debug, "converting to headers only", parent_);
return true;
} else {
ASSERT(status == FilterHeadersStatus::Continue);
headers_continued_ = true;
Expand Down
9 changes: 7 additions & 2 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
stopped_(false), dual_filter_(dual_filter) {}

bool commonHandleAfter100ContinueHeadersCallback(FilterHeadersStatus status);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& headers_only);
void commonHandleBufferData(Buffer::Instance& provided_data);
bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data,
bool& buffer_was_streaming);
Expand Down Expand Up @@ -417,7 +417,12 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// By default, we will assume there are no 100-Continue headers. If encode100ContinueHeaders
// is ever called, this is set to true so commonContinue resumes processing the 100-Continue.
bool has_continue_headers_{};
bool is_head_request_{false};
bool is_head_request_{};
// Whether a filter has indicated that the request should be treated as a headers only request.
bool decoding_headers_only_{};
// Whether a filter has indicated that the response should be treated as a headers only
// response.
bool encoding_headers_only_{};
};

typedef std::unique_ptr<ActiveStream> ActiveStreamPtr;
Expand Down
119 changes: 119 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2899,6 +2899,125 @@ TEST_F(HttpConnectionManagerImplTest, FilterHeadReply) {
conn_manager_->onData(fake_input, false);
}

TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
auto headers = std::make_unique<TestHeaderMapImpl>(
std::initializer_list<std::pair<std::string, std::string>>(
{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}));
decoder->decodeHeaders(std::move(headers), false);
}));

setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, true);

EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, true));

expectOnDestroy();

decoder_filters_[1]->callbacks_->encodeHeaders(makeHeaderMap({{":status", "200"}}), true);

Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, true);
}

TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamData) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
auto headers = makeHeaderMap({{":authority", "host"}, {":path", "/"}, {":method", "GET"}});
decoder->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data("hello");
decoder->decodeData(fake_data, true);
}));

setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, true));

expectOnDestroy();

decoder_filters_[1]->callbacks_->encodeHeaders(makeHeaderMap({{":status", "200"}}), false);

Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, true);
}

TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
auto headers = makeHeaderMap({{":authority", "host"}, {":path", "/"}, {":method", "GET"}});
decoder->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data("hello");
decoder->decodeData(fake_data, false);

auto trailers = makeHeaderMap({{"foo", "bar"}});
decoder->decodeTrailers(std::move(trailers));
}));

setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, true));

expectOnDestroy();

decoder_filters_[1]->callbacks_->encodeHeaders(makeHeaderMap({{":status", "200"}}), false);

Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, false);

auto response_trailers = makeHeaderMap({{"x-trailer", "1"}});
decoder_filters_[1]->callbacks_->encodeTrailers(std::move(response_trailers));
}

TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) {
InSequence s;
setup(false, "");
Expand Down
43 changes: 43 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,36 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "passthrough_filter_config_lib",
srcs = [
"passthrough_filter.cc",
],
deps = [
":common_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:empty_http_filter_config_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
],
)

envoy_cc_test_library(
name = "headers_only_filter_config_lib",
srcs = [
"headers_only_filter.cc",
],
deps = [
":common_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:empty_http_filter_config_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
],
)

envoy_cc_test_library(
name = "pause_filter_lib",
srcs = [
Expand All @@ -36,6 +66,19 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "common_lib",
hdrs = [
"common.h",
],
deps = [
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//source/extensions/filters/http/common:empty_http_filter_config_lib",
"//test/test_common:utility_lib",
],
)

envoy_cc_test_library(
name = "random_pause_filter_lib",
srcs = [
Expand Down
25 changes: 25 additions & 0 deletions test/integration/filters/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <string>

#include "envoy/http/filter.h"
#include "envoy/server/filter_config.h"

#include "extensions/filters/http/common/empty_http_filter_config.h"

namespace Envoy {

// DRYs up the creation of a simple filter config for a filter that requires no config.
template <class T>
class SimpleFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig {
public:
SimpleFilterConfig() : EmptyHttpFilterConfig(T::name) {}

Http::FilterFactoryCb createFilter(const std::string&, Server::Configuration::FactoryContext&) {
return [](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<T>());
};
}
};

} // namespace Envoy
Loading

0 comments on commit 867214b

Please sign in to comment.