Skip to content

Commit

Permalink
conn_pool: making TCP upstreams pluggable (#13548)
Browse files Browse the repository at this point in the history
Risk Level: Medium (some refactory)
Testing: existing tests pass
Docs Changes: n/a
Release Notes: n/a
Fixes #13185

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored Nov 5, 2020
1 parent 9a205da commit 6a044a3
Show file tree
Hide file tree
Showing 24 changed files with 564 additions and 145 deletions.
4 changes: 1 addition & 3 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,7 @@ extensions/filters/common/original_src @snowp @klarose
/*/extensions/watchdog/profile_action @kbaichoo @antoniovicente
# Core upstream code
extensions/upstreams/http @alyssawilk @snowp @mattklein123
extensions/upstreams/http/http @alyssawilk @snowp @mattklein123
extensions/upstreams/http/tcp @alyssawilk @mattklein123
extensions/upstreams/http/default @alyssawilk @snowp @mattklein123
extensions/upstreams/tcp @alyssawilk @ggreenway @mattklein123
# OAuth2
extensions/filters/http/oauth2 @rgs1 @derekargueta @snowp
# HTTP Local Rate Limit
Expand Down
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ proto_library(
"//envoy/extensions/upstreams/http/generic/v3:pkg",
"//envoy/extensions/upstreams/http/http/v3:pkg",
"//envoy/extensions/upstreams/http/tcp/v3:pkg",
"//envoy/extensions/upstreams/tcp/generic/v3:pkg",
"//envoy/extensions/wasm/v3:pkg",
"//envoy/extensions/watchdog/profile_action/v3alpha:pkg",
"//envoy/service/accesslog/v3:pkg",
Expand Down
9 changes: 9 additions & 0 deletions api/envoy/extensions/upstreams/tcp/generic/v3/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package envoy.extensions.upstreams.tcp.generic.v3;

import "udpa/annotations/status.proto";

option java_package = "io.envoyproxy.envoy.extensions.upstreams.tcp.generic.v3";
option java_outer_classname = "GenericConnectionPoolProtoOuterClass";
option java_multiple_files = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;

// [#protodoc-title: Generic Connection Pool]

// A connection pool which forwards downstream TCP as TCP or HTTP to upstream,
// based on CONNECT configuration.
// [#extension: envoy.upstreams.tcp.generic]
message GenericConnectionPoolProto {
}
1 change: 1 addition & 0 deletions api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ proto_library(
"//envoy/extensions/upstreams/http/generic/v3:pkg",
"//envoy/extensions/upstreams/http/http/v3:pkg",
"//envoy/extensions/upstreams/http/tcp/v3:pkg",
"//envoy/extensions/upstreams/tcp/generic/v3:pkg",
"//envoy/extensions/wasm/v3:pkg",
"//envoy/extensions/watchdog/profile_action/v3alpha:pkg",
"//envoy/service/accesslog/v3:pkg",
Expand Down
2 changes: 2 additions & 0 deletions docs/generate_extension_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,7 @@ def GetExtensionMetadata(target):
'//source/extensions/transport_sockets/tls:config')
extension_db['envoy.upstreams.http.generic'] = GetExtensionMetadata(
'//source/extensions/upstreams/http/generic:config')
extension_db['envoy.upstreams.tcp.generic'] = GetExtensionMetadata(
'//source/extensions/upstreams/tcp/generic:config')

pathlib.Path(output_path).write_text(json.dumps(extension_db))
1 change: 1 addition & 0 deletions docs/root/api-v3/config/upstream/upstream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Upstream Configuration
:maxdepth: 3

../../extensions/upstreams/http/*/v3/**
../../extensions/upstreams/tcp/*/v3/**

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions include/envoy/tcp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ envoy_cc_library(
"//include/envoy/upstream:upstream_interface",
],
)

envoy_cc_library(
name = "upstream_interface",
hdrs = ["upstream.h"],
deps = [
"//include/envoy/tcp:conn_pool_interface",
"//include/envoy/upstream:upstream_interface",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
],
)
136 changes: 136 additions & 0 deletions include/envoy/tcp/upstream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#pragma once

#include "envoy/buffer/buffer.h"
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
#include "envoy/stream_info/stream_info.h"
#include "envoy/tcp/conn_pool.h"
#include "envoy/upstream/upstream.h"

namespace Envoy {

namespace Upstream {
class LoadBalancerContext;
} // namespace Upstream

namespace TcpProxy {

class GenericConnectionPoolCallbacks;
class GenericUpstream;

// An API for wrapping either a TCP or an HTTP connection pool.
class GenericConnPool : public Logger::Loggable<Logger::Id::router> {
public:
virtual ~GenericConnPool() = default;

/**
* Called to create a TCP connection or HTTP stream for "CONNECT" streams.
*
* The implementation is then responsible for calling either onGenericPoolReady or
* onGenericPoolFailure on the supplied GenericConnectionPoolCallbacks.
*
* @param callbacks callbacks to communicate stream failure or creation on.
*/
virtual void newStream(GenericConnectionPoolCallbacks& callbacks) PURE;
};

// An API for the UpstreamRequest to get callbacks from either an HTTP or TCP
// connection pool.
class GenericConnectionPoolCallbacks {
public:
virtual ~GenericConnectionPoolCallbacks() = default;

/**
* Called when GenericConnPool::newStream has established a new stream.
*
* @param info supplies the stream info object associated with the upstream connection.
* @param upstream supplies the generic upstream for the stream.
* @param host supplies the description of the host that will carry the request.
* @param upstream_local_address supplies the local address of the upstream connection.
* @param ssl_info supplies the ssl information of the upstream connection.
*/
virtual void
onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info) PURE;

/**
* Called to indicate a failure for GenericConnPool::newStream to establish a stream.
*
* @param reason supplies the failure reason.
* @param host supplies the description of the host that caused the failure. This may be nullptr
* if no host was involved in the failure (for example overflow).
*/
virtual void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) PURE;
};

// Interface for a generic Upstream, which can communicate with a TCP or HTTP
// upstream.
class GenericUpstream {
public:
virtual ~GenericUpstream() = default;

/**
* Enable/disable further data from this stream.
*
* @param disable true if the stream should be read disabled, false otherwise.
* @return returns true if the disable is performed, false otherwise
* (e.g. if the connection is closed)
*/
virtual bool readDisable(bool disable) PURE;

/**
* Encodes data upstream.
* @param data supplies the data to encode. The data may be moved by the encoder.
* @param end_stream supplies whether this is the last data to encode.
*/
virtual void encodeData(Buffer::Instance& data, bool end_stream) PURE;

/**
* Adds a callback to be called when the data is sent to the kernel.
* @param cb supplies the callback to be called
*/
virtual void addBytesSentCallback(Network::Connection::BytesSentCb cb) PURE;

/**
* Called when an event is received on the downstream connection
* @param event supplies the event which occurred.
* @return the underlying ConnectionData if the event is not "Connected" and draining
is supported for this upstream.
*/
virtual Tcp::ConnectionPool::ConnectionData*
onDownstreamEvent(Network::ConnectionEvent event) PURE;
};

using GenericConnPoolPtr = std::unique_ptr<GenericConnPool>;

/*
* A factory for creating generic connection pools.
*/
class GenericConnPoolFactory : public Envoy::Config::TypedFactory {
public:
~GenericConnPoolFactory() override = default;

using TunnelingConfig =
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;

/*
* @param cluster_name the name of the cluster to use
* @param cm the cluster manager to get the connection pool from
* @param config the tunneling config, if doing connect tunneling.
* @param context the load balancing context for this connection.
* @param upstream_callbacks the callbacks to provide to the connection if successfully created.
* @return may be null if there is no cluster with the given name.
*/
virtual GenericConnPoolPtr
createGenericConnPool(const std::string& cluster_name, Upstream::ClusterManager& cm,
const absl::optional<TunnelingConfig>& config,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) const PURE;
};

using GenericConnPoolFactoryPtr = std::unique_ptr<GenericConnPoolFactory>;

} // namespace TcpProxy
} // namespace Envoy
30 changes: 28 additions & 2 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ class Utility {
/**
* Get a Factory from the registry with a particular name (and templated type) with error checking
* to ensure the name and factory are valid.
* @param name string identifier for the particular implementation. Note: this is a proto string
* because it is assumed that this value will be pulled directly from the configuration proto.
* @param name string identifier for the particular implementation.
* @return factory the factory requested or nullptr if it does not exist.
*/
template <class Factory> static Factory& getAndCheckFactoryByName(const std::string& name) {
if (name.empty()) {
Expand All @@ -230,6 +230,32 @@ class Utility {
return *factory;
}

/**
* Get a Factory from the registry with a particular name or return nullptr.
* @param name string identifier for the particular implementation.
*/
template <class Factory> static Factory* getFactoryByName(const std::string& name) {
if (name.empty()) {
return nullptr;
}

return Registry::FactoryRegistry<Factory>::getFactory(name);
}

/**
* Get a Factory from the registry or return nullptr.
* @param message proto that contains fields 'name' and 'typed_config'.
*/
template <class Factory, class ProtoMessage>
static Factory* getFactory(const ProtoMessage& message) {
Factory* factory = Utility::getFactoryByType<Factory>(message.typed_config());
if (factory != nullptr) {
return factory;
}

return Utility::getFactoryByName<Factory>(message.name());
}

/**
* Get a Factory from the registry with error checking to ensure the name and the factory are
* valid.
Expand Down
25 changes: 22 additions & 3 deletions source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,35 @@ licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_library(
name = "upstream_lib",
srcs = [
"upstream.cc",
],
hdrs = [
"upstream.h",
],
deps = [
"//include/envoy/tcp:conn_pool_interface",
"//include/envoy/tcp:upstream_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:load_balancer_interface",
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
"//source/common/http:utility_lib",
],
)

envoy_cc_library(
name = "tcp_proxy",
srcs = [
"tcp_proxy.cc",
"upstream.cc",
],
hdrs = [
"tcp_proxy.h",
"upstream.h",
],
deps = [
":upstream_lib",
"//include/envoy/access_log:access_log_interface",
"//include/envoy/buffer:buffer_interface",
"//include/envoy/common:time_interface",
Expand All @@ -32,14 +50,14 @@ envoy_cc_library(
"//include/envoy/stats:timespan_interface",
"//include/envoy/stream_info:filter_state_interface",
"//include/envoy/tcp:conn_pool_interface",
"//include/envoy/tcp:upstream_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/access_log:access_log_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:macros",
"//source/common/common:minimal_logger_lib",
"//source/common/http:headers_lib",
"//source/common/network:application_protocol_lib",
"//source/common/network:cidr_range_lib",
"//source/common/network:filter_lib",
Expand All @@ -51,6 +69,7 @@ envoy_cc_library(
"//source/common/router:metadatamatchcriteria_lib",
"//source/common/stream_info:stream_info_lib",
"//source/common/upstream:load_balancer_lib",
"//source/extensions/upstreams/tcp/generic:config",
"@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
],
Expand Down
Loading

0 comments on commit 6a044a3

Please sign in to comment.