Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thrift filter: support skip decoding data after metadata in the thrift message #13592

Merged
merged 15 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ enum ProtocolType {
TWITTER = 4;
}

// [#next-free-field: 6]
// [#next-free-field: 7]
message ThriftProxy {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.network.thrift_proxy.v2alpha1.ThriftProxy";
Expand All @@ -82,6 +82,12 @@ message ThriftProxy {
// compatibility, if no thrift_filters are specified, a default Thrift router filter
// (`envoy.filters.thrift.router`) is used.
repeated ThriftFilter thrift_filters = 5;

// If set to true, Envoy will try to skip decode data after metadata in the Thrift message.
// This mode will only work if the upstream and downstream protocols are the same and the transport
// is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will
// fallback to decode the data.
htuch marked this conversation as resolved.
Show resolved Hide resolved
bool payload_passthrough = 6;
}

// ThriftFilter configures a Thrift filter.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ New Features
:ref:`CertificateValidationContext <envoy_v3_api_field_extensions.transport_sockets.tls.v3.CertificateValidationContext.watched_directory>`.
* signal: added an extension point for custom actions to run on the thread that has encountered a fatal error. Actions are configurable via :ref:`fatal_actions <envoy_v3_api_field_config.bootstrap.v3.Bootstrap.fatal_actions>`.
* tcp: added a new :ref:`envoy.overload_actions.reject_incoming_connections <config_overload_manager_overload_actions>` action to reject incoming TCP connections.
* thrift_proxy: added a new :ref: `payload_passthrough <envoy_v3_api_field_extensions.filters.network.thrift_proxy.v3.ThriftProxy.payload_passthrough>` option to skip decoding body in the Thrift message.
* tls: added support for RSA certificates with 4096-bit keys in FIPS mode.
* tracing: added SkyWalking tracer.
* xds: added support for resource TTLs. A TTL is specified on the :ref:`Resource <envoy_api_msg_Resource>`. For SotW, a :ref:`Resource <envoy_api_msg_Resource>` can be embedded
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion source/extensions/filters/network/thrift_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ ConfigImpl::ConfigImpl(
: context_(context), stats_prefix_(fmt::format("thrift.{}.", config.stat_prefix())),
stats_(ThriftFilterStats::generateStats(stats_prefix_, context_.scope())),
transport_(lookupTransport(config.transport())), proto_(lookupProtocol(config.protocol())),
route_matcher_(new Router::RouteMatcher(config.route_config())) {
route_matcher_(new Router::RouteMatcher(config.route_config())),
payload_passthrough_(config.payload_passthrough()) {

if (config.thrift_filters().empty()) {
ENVOY_LOG(debug, "using default router filter");
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/network/thrift_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class ConfigImpl : public Config,
TransportPtr createTransport() override;
ProtocolPtr createProtocol() override;
Router::Config& routerConfig() override { return *this; }
bool payloadPassthrough() const override { return payload_passthrough_; }

private:
void processFilter(
Expand All @@ -94,6 +95,7 @@ class ConfigImpl : public Config,
std::unique_ptr<Router::RouteMatcher> route_matcher_;

std::list<ThriftFilters::FilterFactoryCb> filter_factories_;
const bool payload_passthrough_;
};

} // namespace ThriftProxy
Expand Down
34 changes: 34 additions & 0 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ DecoderEventHandler& ConnectionManager::newDecoderEventHandler() {
return **rpcs_.begin();
}

bool ConnectionManager::passthroughEnabled() const {
if (!config_.payloadPassthrough()) {
return false;
}

// This is called right after the metadata has been parsed, and the ActiveRpc being processed must
// be in the rpcs_ list.
ASSERT(!rpcs_.empty());
htuch marked this conversation as resolved.
Show resolved Hide resolved
return (*rpcs_.begin())->passthroughSupported();
}

bool ConnectionManager::ResponseDecoder::onData(Buffer::Instance& data) {
upstream_buffer_.move(data);

Expand Down Expand Up @@ -274,6 +285,10 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() {
return FilterStatus::Continue;
}

bool ConnectionManager::ResponseDecoder::passthroughEnabled() const {
return parent_.parent_.passthroughEnabled();
}

void ConnectionManager::ActiveRpcDecoderFilter::continueDecoding() {
const FilterStatus status = parent_.applyDecoderFilters(this);
if (status == FilterStatus::Continue) {
Expand Down Expand Up @@ -398,6 +413,25 @@ void ConnectionManager::ActiveRpc::finalizeRequest() {
}
}

bool ConnectionManager::ActiveRpc::passthroughSupported() const {
for (auto& entry : decoder_filters_) {
if (!entry->handle_->passthroughSupported()) {
return false;
}
}
return true;
htuch marked this conversation as resolved.
Show resolved Hide resolved
}

FilterStatus ConnectionManager::ActiveRpc::passthroughData(Buffer::Instance& data) {
filter_context_ = &data;
filter_action_ = [this](DecoderEventHandler* filter) -> FilterStatus {
Buffer::Instance* data = absl::any_cast<Buffer::Instance*>(filter_context_);
return filter->passthroughData(*data);
};

return applyDecoderFilters(nullptr);
}

FilterStatus ConnectionManager::ActiveRpc::messageBegin(MessageMetadataSharedPtr metadata) {
ASSERT(metadata->hasSequenceId());
ASSERT(metadata->hasMessageType());
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/network/thrift_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Config {
virtual TransportPtr createTransport() PURE;
virtual ProtocolPtr createProtocol() PURE;
virtual Router::Config& routerConfig() PURE;
virtual bool payloadPassthrough() const PURE;
};

/**
Expand Down Expand Up @@ -76,6 +77,7 @@ class ConnectionManager : public Network::ReadFilter,

// DecoderCallbacks
DecoderEventHandler& newDecoderEventHandler() override;
bool passthroughEnabled() const override;

private:
struct ActiveRpc;
Expand All @@ -102,6 +104,7 @@ class ConnectionManager : public Network::ReadFilter,

// DecoderCallbacks
DecoderEventHandler& newDecoderEventHandler() override { return *this; }
bool passthroughEnabled() const override;

ActiveRpc& parent_;
DecoderPtr decoder_;
Expand Down Expand Up @@ -180,6 +183,7 @@ class ConnectionManager : public Network::ReadFilter,
// DecoderEventHandler
FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus transportEnd() override;
FilterStatus passthroughData(Buffer::Instance& data) override;
FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus messageEnd() override;
FilterStatus structBegin(absl::string_view name) override;
Expand Down Expand Up @@ -225,6 +229,7 @@ class ConnectionManager : public Network::ReadFilter,
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_);
}

bool passthroughSupported() const;
FilterStatus applyDecoderFilters(ActiveRpcDecoderFilter* filter);
void finalizeRequest();

Expand Down
28 changes: 26 additions & 2 deletions source/extensions/filters/network/thrift_proxy/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/common/exception.h"

#include "common/buffer/buffer_impl.h"
#include "common/common/assert.h"
#include "common/common/macros.h"

Expand All @@ -12,16 +13,37 @@ namespace Extensions {
namespace NetworkFilters {
namespace ThriftProxy {

// PassthroughData -> PassthroughData
// PassthroughData -> MessageEnd (all body bytes received)
DecoderStateMachine::DecoderStatus DecoderStateMachine::passthroughData(Buffer::Instance& buffer) {
if (body_bytes_ > buffer.length()) {
return {ProtocolState::WaitForData};
}

Buffer::OwnedImpl body;
body.move(buffer, body_bytes_);

return {ProtocolState::MessageEnd, handler_.passthroughData(body)};
}

// MessageBegin -> StructBegin
DecoderStateMachine::DecoderStatus DecoderStateMachine::messageBegin(Buffer::Instance& buffer) {
const auto total = buffer.length();
if (!proto_.readMessageBegin(buffer, *metadata_)) {
return {ProtocolState::WaitForData};
}

stack_.clear();
stack_.emplace_back(Frame(ProtocolState::MessageEnd));

return {ProtocolState::StructBegin, handler_.messageBegin(metadata_)};
const auto status = handler_.messageBegin(metadata_);

if (callbacks_.passthroughEnabled()) {
body_bytes_ = metadata_->frameSize() - (total - buffer.length());
return {ProtocolState::PassthroughData, status};
}

return {ProtocolState::StructBegin, status};
}

// MessageEnd -> Done
Expand Down Expand Up @@ -293,6 +315,8 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::handleValue(Buffer::Inst

DecoderStateMachine::DecoderStatus DecoderStateMachine::handleState(Buffer::Instance& buffer) {
switch (state_) {
case ProtocolState::PassthroughData:
return passthroughData(buffer);
case ProtocolState::MessageBegin:
return messageBegin(buffer);
case ProtocolState::StructBegin:
Expand Down Expand Up @@ -416,7 +440,7 @@ FilterStatus Decoder::onData(Buffer::Instance& data, bool& buffer_underflow) {
request_ = std::make_unique<ActiveRequest>(callbacks_.newDecoderEventHandler());
frame_started_ = true;
state_machine_ =
std::make_unique<DecoderStateMachine>(protocol_, metadata_, request_->handler_);
std::make_unique<DecoderStateMachine>(protocol_, metadata_, request_->handler_, callbacks_);

if (request_->handler_.transportBegin(metadata_) == FilterStatus::StopIteration) {
return FilterStatus::StopIteration;
Expand Down
17 changes: 14 additions & 3 deletions source/extensions/filters/network/thrift_proxy/decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace ThriftProxy {
#define ALL_PROTOCOL_STATES(FUNCTION) \
FUNCTION(StopIteration) \
FUNCTION(WaitForData) \
FUNCTION(PassthroughData) \
FUNCTION(MessageBegin) \
FUNCTION(MessageEnd) \
FUNCTION(StructBegin) \
Expand Down Expand Up @@ -56,16 +57,18 @@ class ProtocolStateNameValues {
}
};

class DecoderCallbacks;

/**
* DecoderStateMachine is the Thrift message state machine as described in
* source/extensions/filters/network/thrift_proxy/docs.
*/
class DecoderStateMachine : public Logger::Loggable<Logger::Id::thrift> {
public:
DecoderStateMachine(Protocol& proto, MessageMetadataSharedPtr& metadata,
DecoderEventHandler& handler)
: proto_(proto), metadata_(metadata), handler_(handler), state_(ProtocolState::MessageBegin) {
}
DecoderEventHandler& handler, DecoderCallbacks& callbacks)
: proto_(proto), metadata_(metadata), handler_(handler), callbacks_(callbacks),
state_(ProtocolState::MessageBegin) {}

/**
* Consumes as much data from the configured Buffer as possible and executes the decoding state
Expand Down Expand Up @@ -129,6 +132,7 @@ class DecoderStateMachine : public Logger::Loggable<Logger::Id::thrift> {

// These functions map directly to the matching ProtocolState values. Each returns the next state
// or ProtocolState::WaitForData if more data is required.
DecoderStatus passthroughData(Buffer::Instance& buffer);
DecoderStatus messageBegin(Buffer::Instance& buffer);
DecoderStatus messageEnd(Buffer::Instance& buffer);
DecoderStatus structBegin(Buffer::Instance& buffer);
Expand Down Expand Up @@ -165,8 +169,10 @@ class DecoderStateMachine : public Logger::Loggable<Logger::Id::thrift> {
Protocol& proto_;
MessageMetadataSharedPtr metadata_;
DecoderEventHandler& handler_;
DecoderCallbacks& callbacks_;
ProtocolState state_;
std::vector<Frame> stack_;
uint32_t body_bytes_{};
};

using DecoderStateMachinePtr = std::unique_ptr<DecoderStateMachine>;
Expand All @@ -179,6 +185,11 @@ class DecoderCallbacks {
* @return DecoderEventHandler& a new DecoderEventHandler for a message.
*/
virtual DecoderEventHandler& newDecoderEventHandler() PURE;

/**
* @return True if payload passthrough is enabled and is supported by filter chain.
*/
virtual bool passthroughEnabled() const PURE;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class DecoderEventHandler {
*/
virtual FilterStatus transportEnd() PURE;

/**
* Indicates raw bytes after metadata in a Thrift transport frame was detected.
* Filters should not modify data except for the router.
* @param data data to send as passthrough
* @return FilterStatus to indicate if filter chain iteration should continue
*/
virtual FilterStatus passthroughData(Buffer::Instance& data) PURE;

/**
* Indicates that the start of a Thrift protocol message was detected.
* @param metadata MessageMetadataSharedPtr describing the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ class DecoderFilter : public virtual DecoderEventHandler {
* filter should use. Callbacks will not be invoked by the filter after onDestroy() is called.
*/
virtual void setDecoderFilterCallbacks(DecoderFilterCallbacks& callbacks) PURE;

/**
* @return True if payload passthrough is supported. Called by the connection manager once after
* messageBegin.
*/
virtual bool passthroughSupported() const PURE;
};

using DecoderFilterSharedPtr = std::shared_ptr<DecoderFilter>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class PassThroughDecoderFilter : public DecoderFilter {

ThriftProxy::FilterStatus transportEnd() override { return ThriftProxy::FilterStatus::Continue; }

bool passthroughSupported() const override { return true; }

ThriftProxy::FilterStatus passthroughData(Buffer::Instance&) override {
return ThriftProxy::FilterStatus::Continue;
}

ThriftProxy::FilterStatus messageBegin(ThriftProxy::MessageMetadataSharedPtr) override {
return ThriftProxy::FilterStatus::Continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ class ProtocolConverter : public virtual DecoderEventHandler {
}

// DecoderEventHandler
FilterStatus passthroughData(Buffer::Instance& data) override {
buffer_->move(data);
return FilterStatus::Continue;
}

FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override {
proto_->writeMessageBegin(*buffer_, *metadata);
return FilterStatus::Continue;
Expand Down
Loading