Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc: Implement response path for headers only #14713

Merged
merged 6 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// The External Processing filter allows an external service to act on HTTP traffic in a flexible way.

// **Current Implementation Status:**
// At this time, the filter will send a "request_headers" message to the server when the
// filter is invoked from the downstream, and apply any header mutations returned by the
// server. No other part of the protocol is implemented yet.
// At this time, the filter will send the "request_headers" and "response_headers" messages
// to the server when the filter is invoked, and apply any header mutations returned by the
// server, and respond to "immediate_response" messages. No other parts of the protocol are implemented yet.

// As designed, the filter supports up to six different processing steps, which are in the
// process of being implemented:
// * Request headers: IMPLEMENTED
// * Request body: NOT IMPLEMENTED
// * Request trailers: NOT IMPLEMENTED
// * Response headers: NOT IMPLEMENTED
// * Response headers: IMPLEMENTED
// * Response body: NOT IMPLEMENTED
// * Response trailers: NOT IMPLEMENTED

Expand Down Expand Up @@ -78,7 +78,6 @@ message ExternalProcessor {
// The filter supports both the "Envoy" and "Google" gRPC clients.
config.core.v3.GrpcService grpc_service = 1;

// [#not-implemented-hide:]
// By default, if the gRPC stream cannot be established, or if it is closed
// prematurely with an error, the filter will fail. Specifically, if the
// response headers have not yet been delivered, then it will return a 500
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

166 changes: 114 additions & 52 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

using envoy::service::ext_proc::v3alpha::HeadersResponse;
using envoy::service::ext_proc::v3alpha::ImmediateResponse;
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;
using Http::ResponseHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

Expand Down Expand Up @@ -48,66 +50,121 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_of_stream) {
if (stream_closed_) {
return FilterHeadersStatus::Continue;
htuch marked this conversation as resolved.
Show resolved Hide resolved
}

response_headers_ = &headers;
ProcessingRequest req;
auto* headers_req = req.mutable_response_headers();
MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
response_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
bool message_handled = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto& headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
} else if (response->has_immediate_response()) {
ENVOY_LOG(debug, "Returning immediate response from processor");
sendImmediateResponse(response->immediate_response());
message_valid = true;
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
if (response->has_request_headers()) {
message_handled = handleRequestHeadersResponse(response->request_headers());
} else if (response->has_response_headers()) {
message_handled = handleResponseHeadersResponse(response->response_headers());
} else if (response->has_immediate_response()) {
message_handled = handleImmediateResponse(response->immediate_response());
}

if (message_valid) {
if (message_handled) {
stats_.stream_msgs_received_.inc();
} else {
stats_.spurious_msgs_received_.inc();
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
cleanupState();
closeStream();
}
}

bool Filter::handleRequestHeadersResponse(const HeadersResponse& response) {
if (request_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "applying request_headers response");
if (response.has_response()) {
const auto& common_response = response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
return true;
}
return false;
gbrail marked this conversation as resolved.
Show resolved Hide resolved
}

bool Filter::handleResponseHeadersResponse(const HeadersResponse& response) {
if (response_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "applying response_headers response");
if (response.has_response()) {
const auto& common_response = response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *response_headers_);
}
}
response_state_ = FilterState::IDLE;
encoder_callbacks_->continueEncoding();
return true;
}
return false;
gbrail marked this conversation as resolved.
Show resolved Hide resolved
}

bool Filter::handleImmediateResponse(const ImmediateResponse& response) {
if (response_state_ == FilterState::HEADERS) {
// Waiting for a response headers response, so return immediately now.
// Do this first in case both are in progress.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an observation, the decoupling of request/response state from server reply message type makes it a bit confusing to reason about the state machine here. If we had distinct message for "send immediate response" for request/reply, it would be a bit more intuitive in the code at least; OTOH, I think the behaviors are identical so it's technically not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure how to improve this area. For this PR I switched things up to have one function that handles the incoming message and switches on the message type, and others that decide whether it's an appropriate time to handle each one. Or are you suggesting changing the proto so that "immediate_response" is part of a larger message type rather than a separate message?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess tracking the containing context (request/response) for the immediate response would be clearer than relying on request/response state variables to me, but I can see either working.

// We don't want to process any more stream messages after this.
// Close the stream before sending because "sendLocalResponse" triggers
// additional calls to this filter.
response_state_ = FilterState::IDLE;
closeStream();

ENVOY_LOG(debug, "Returning immediate response from processor on encoding path");
htuch marked this conversation as resolved.
Show resolved Hide resolved
sendImmediateResponse(response, false);

return true;

} else if (request_state_ == FilterState::HEADERS) {
ENVOY_LOG(debug, "Returning immediate response from processor on decoding path");
request_state_ = FilterState::IDLE;
closeStream();
sendImmediateResponse(response, true);
return true;
}

return false;
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
stats_.streams_failed_.inc();

if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
stats_.failure_mode_allowed_.inc();

} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
stream_closed_ = true;
ImmediateResponse error_response;
error_response.mutable_status()->set_code(envoy::type::v3::StatusCode::InternalServerError);
error_response.set_details(absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
handleImmediateResponse(error_response);
gbrail marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -117,34 +174,39 @@ void Filter::onGrpcClose() {
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
cleanupState();
}

void Filter::cleanupState() {
if (request_state_ != FilterState::IDLE) {
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
if (response_state_ != FilterState::IDLE) {
response_state_ = FilterState::IDLE;
encoder_callbacks_->continueEncoding();
}
}

void Filter::sendImmediateResponse(const ImmediateResponse& response) {
void Filter::sendImmediateResponse(const ImmediateResponse& response, bool on_decoding) {
const auto status_code = response.has_status() ? response.status().code() : 200;
const auto grpc_status =
response.has_grpc_status()
? absl::optional<Grpc::Status::GrpcStatus>(response.grpc_status().status())
: absl::nullopt;
const auto mutate_headers = [&response](Http::ResponseHeaderMap& headers) {
if (response.has_headers()) {
MutationUtils::applyHeaderMutations(response.headers(), headers);
}
};

decoder_callbacks_->sendLocalReply(
static_cast<Http::Code>(status_code), response.body(),
[&response](Http::ResponseHeaderMap& headers) {
if (response.has_headers()) {
MutationUtils::applyHeaderMutations(response.headers(), headers);
}
},
grpc_status, response.details());
if (on_decoding) {
decoder_callbacks_->sendLocalReply(static_cast<Http::Code>(status_code), response.body(),
gbrail marked this conversation as resolved.
Show resolved Hide resolved
mutate_headers, grpc_status, response.details());
} else {
encoder_callbacks_->sendLocalReply(static_cast<Http::Code>(status_code), response.body(),
mutate_headers, grpc_status, response.details());
}
}

} // namespace ExternalProcessing
Expand Down
23 changes: 16 additions & 7 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,10 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

void onDestroy() override;

void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) override;

// ExternalProcessorCallbacks

Expand All @@ -101,23 +99,34 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

private:
void closeStream();
void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);
void cleanupState();
void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response,
bool on_decoding);

bool
handleRequestHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response);
bool
handleResponseHeadersResponse(const envoy::service::ext_proc::v3alpha::HeadersResponse& response);
bool
handleImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);

const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
ExtProcFilterStats stats_;

Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr;

// The state of the request-processing, or "decoding" side of the filter.
// We maintain separate states for encoding and decoding since they may
// be interleaved.
FilterState request_state_ = FilterState::IDLE;

// The state of the response-processing side
FilterState response_state_ = FilterState::IDLE;

ExternalProcessorStreamPtr stream_;
bool stream_closed_ = false;

Http::HeaderMap* request_headers_ = nullptr;
Http::HeaderMap* response_headers_ = nullptr;
};

} // namespace ExternalProcessing
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ envoy_extension_cc_test_library(
deps = [
"//include/envoy/http:header_map_interface",
"//test/test_common:utility_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
Loading