Skip to content

Commit

Permalink
Enforce that requests are mapped to connections for each Host: header…
Browse files Browse the repository at this point in the history
… values (#492)


This PR ensures that the mapping of requests to outbound connections is segregated by `Host:` header values. In most cases, the desired behavior is provided by Hyper's connection pooling. However, Hyper does not handle the case where a request had no `Host:` header and the request URI had no authority part, and the request was routed based on the SO_ORIGINAL_DST in the desired manner. We would like these requests to each have their own outbound connection, but Hyper will reuse the same connection for such requests. 

Therefore, I have modified `conduit_proxy_router::Recognize` to allow implementations of `Recognize` to indicate whether the service for a given key can be cached, and to only cache the service when it is marked as cachable. I've also changed the `reconstruct_uri` function, which rewrites HTTP/1 requests, to mark when a request had no authority and no `Host:` header, and the authority was rewritten to be the request's ORIGINAL_DST. When this is the case, the `Recognize` implementations for `Inbound` and `Outbound` will mark these requests as non-cachable.

I've also added unit tests ensuring that A, connections are created per `Host:` header, and B, that requests with no `Host:` header each create a new connection. The first test passes without any additional changes, but the second only passes on this branch. The tests were added in PR #489, but this branch supersedes that branch.

Fixes #415. Closes #489.
  • Loading branch information
hawkw authored Mar 7, 2018
1 parent a065174 commit 569d693
Show file tree
Hide file tree
Showing 14 changed files with 424 additions and 80 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ futures = "0.1"
h2 = "0.1"
http = "0.1"
httparse = "1.2"
hyper = { version = "0.11.20", default-features = false, features = ["compat"] }
hyper = { version = "0.11.21", default-features = false, features = ["compat"] }
ipnet = "1.0"
log = "0.4.1"
indexmap = "0.4.1"
Expand Down
61 changes: 48 additions & 13 deletions proxy/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use indexmap::IndexMap;
use tower::Service;

use std::{error, fmt, mem};
use std::convert::AsRef;
use std::hash::Hash;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -40,7 +41,7 @@ pub trait Recognize {
Error = Self::Error>;

/// Obtains a Key for a request.
fn recognize(&self, req: &Self::Request) -> Option<Self::Key>;
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>>;

/// Return a `Service` to handle requests from the provided authority.
///
Expand All @@ -51,6 +52,18 @@ pub trait Recognize {

pub struct Single<S>(Option<S>);

/// Whether or not the service to a given key may be cached.
///
/// Some services may, for various reasons, may not be able to
/// be used to serve multiple requests. When this is the case,
/// implementors of `recognize` may use `Reuse::SingleUse` to
/// indicate that the service should not be cached.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Reuse<T> {
Reusable(T),
SingleUse(T),
}

#[derive(Debug)]
pub enum Error<T, U> {
Inner(T),
Expand Down Expand Up @@ -123,28 +136,37 @@ where T: Recognize,
let service;

if let Some(key) = inner.recognize.recognize(&request) {
if let Some(s) = inner.routes.get_mut(&key) {
// The service for the authority is already cached
// Is the bound service for that key reusable? If `recognize`
// returned `SingleUse`, that indicates that the service may
// not be used to serve multiple requests.
let cached = if let Reuse::Reusable(ref key) = key {
// The key is reusable --- look in the cache.
inner.routes.get_mut(key)
} else {
None
};
if let Some(s) = cached {
// The service for the authority is already cached.
service = s;
} else {
// The authority does not match an existing route, try to
// recognize it.
match inner.recognize.bind_service(&key) {
match inner.recognize.bind_service(key.as_ref()) {
Ok(s) => {
// A new service has been matched. Set the outer
// variables and jump out o the loop
// variables and jump out o the loop.
new_key = key.clone();
new_service = s;
break;
}
Err(e) => {
// Route recognition failed
// Route recognition failed.
return ResponseFuture { state: State::RouteError(e) };
}
}
}
} else {
// The request has no authority
// The request has no authority.
return ResponseFuture { state: State::NotRecognized };
}

Expand All @@ -153,13 +175,15 @@ where T: Recognize,
return ResponseFuture { state: State::Inner(response) };
}

// First, route the request to the new service
// First, route the request to the new service.
let response = new_service.call(request);

// Now, cache the new service
inner.routes.insert(new_key, new_service);
// Now, cache the new service.
if let Reuse::Reusable(new_key) = new_key {
inner.routes.insert(new_key, new_service);
}

// And finally, return the response
// And finally, return the response.
ResponseFuture { state: State::Inner(response) }
}
}
Expand Down Expand Up @@ -190,8 +214,8 @@ impl<S: Service> Recognize for Single<S> {
type RouteError = ();
type Service = S;

fn recognize(&self, _: &Self::Request) -> Option<Self::Key> {
Some(())
fn recognize(&self, _: &Self::Request) -> Option<Reuse<Self::Key>> {
Some(Reuse::Reusable(()))
}

fn bind_service(&mut self, _: &Self::Key) -> Result<S, Self::RouteError> {
Expand Down Expand Up @@ -262,3 +286,14 @@ where
}
}
}

// ===== impl Reuse =====

impl<T> AsRef<T> for Reuse<T> {
fn as_ref(&self) -> &T {
match *self {
Reuse::Reusable(ref key) => key,
Reuse::SingleUse(ref key) => key,
}
}
}
164 changes: 154 additions & 10 deletions proxy/src/bind.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use std::error::Error;
use std::fmt;
use std::default::Default;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;

use http;
use futures::{future, Future, Poll};
use futures::future::{Either, Map};
use http::{self, uri};
use tokio_core::reactor::Handle;
use tower;
use tower_h2;
use tower_reconnect::Reconnect;

use conduit_proxy_controller_grpc;
use conduit_proxy_router::Reuse;
use control;
use ctx;
use telemetry::{self, sensor};
use transparency::{self, HttpBody};
use transparency::{self, HttpBody, h1};
use transport;

/// Binds a `Service` from a `SocketAddr`.
Expand All @@ -39,14 +43,37 @@ pub struct BindProtocol<C, B> {
protocol: Protocol,
}

/// Mark whether to use HTTP/1 or HTTP/2
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
/// Protocol portion of the `Recognize` key for a request.
///
/// This marks whether to use HTTP/2 or HTTP/1.x for a request. In
/// the case of HTTP/1.x requests, it also stores a "host" key to ensure
/// that each host receives its own connection.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Protocol {
Http1,
Http1(Host),
Http2
}

pub type Service<B> = Reconnect<NewHttp<B>>;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Host {
Authority(uri::Authority),
NoAuthority,
}

/// Rewrites HTTP/1.x requests so that their URIs are in a canonical form.
///
/// The following transformations are applied:
/// - If an absolute-form URI is received, it must replace
/// the host header (in accordance with RFC7230#section-5.4)
/// - If the request URI is not in absolute form, it is rewritten to contain
/// the authority given in the `Host:` header, or, failing that, from the
/// request's original destination according to `SO_ORIGINAL_DST`.
#[derive(Copy, Clone, Debug)]
pub struct ReconstructUri<S> {
inner: S
}

pub type Service<B> = Reconnect<ReconstructUri<NewHttp<B>>>;

pub type NewHttp<B> = sensor::NewHttp<Client<B>, B, HttpBody>;

Expand Down Expand Up @@ -83,7 +110,6 @@ impl Error for BufferSpawnError {
fn cause(&self) -> Option<&Error> { None }
}


impl<B> Bind<(), B> {
pub fn new(executor: Handle) -> Self {
Self {
Expand Down Expand Up @@ -150,7 +176,7 @@ impl<B> Bind<Arc<ctx::Proxy>, B>
where
B: tower_h2::Body + 'static,
{
pub fn bind_service(&self, addr: &SocketAddr, protocol: Protocol) -> Service<B> {
pub fn bind_service(&self, addr: &SocketAddr, protocol: &Protocol) -> Service<B> {
trace!("bind_service addr={}, protocol={:?}", addr, protocol);
let client_ctx = ctx::transport::Client::new(
&self.ctx,
Expand All @@ -170,7 +196,15 @@ where
self.executor.clone(),
);

let proxy = self.sensors.http(self.req_ids.clone(), client, &client_ctx);
let sensors = self.sensors.http(
self.req_ids.clone(),
client,
&client_ctx
);

// Rewrite the HTTP/1 URI, if the authorities in the Host header
// and request URI are not in agreement, or are not present.
let proxy = ReconstructUri::new(sensors);

// Automatically perform reconnects if the connection fails.
//
Expand Down Expand Up @@ -202,7 +236,117 @@ where
type BindError = ();

fn bind(&self, addr: &SocketAddr) -> Result<Self::Service, Self::BindError> {
Ok(self.bind.bind_service(addr, self.protocol))
Ok(self.bind.bind_service(addr, &self.protocol))
}
}


// ===== impl ReconstructUri =====


impl<S> ReconstructUri<S> {
fn new (inner: S) -> Self {
Self { inner }
}
}

impl<S, B> tower::NewService for ReconstructUri<S>
where
S: tower::NewService<
Request=http::Request<B>,
Response=HttpResponse,
>,
S::Service: tower::Service<
Request=http::Request<B>,
Response=HttpResponse,
>,
ReconstructUri<S::Service>: tower::Service,
B: tower_h2::Body,
{
type Request = <Self::Service as tower::Service>::Request;
type Response = <Self::Service as tower::Service>::Response;
type Error = <Self::Service as tower::Service>::Error;
type Service = ReconstructUri<S::Service>;
type InitError = S::InitError;
type Future = Map<
S::Future,
fn(S::Service) -> ReconstructUri<S::Service>
>;
fn new_service(&self) -> Self::Future {
self.inner.new_service().map(ReconstructUri::new)
}
}

impl<S, B> tower::Service for ReconstructUri<S>
where
S: tower::Service<
Request=http::Request<B>,
Response=HttpResponse,
>,
B: tower_h2::Body,
{
type Request = S::Request;
type Response = HttpResponse;
type Error = S::Error;
type Future = Either<
S::Future,
future::FutureResult<Self::Response, Self::Error>,
>;

fn poll_ready(&mut self) -> Poll<(), S::Error> {
self.inner.poll_ready()
}

fn call(&mut self, mut request: S::Request) -> Self::Future {
if request.version() == http::Version::HTTP_2 {
// skip `reconstruct_uri` entirely if the request is HTTP/2.
return Either::A(self.inner.call(request));
}

if let Err(_) = h1::reconstruct_uri(&mut request) {
let res = http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body(Default::default())
.unwrap();
return Either::B(future::ok(res));
}
Either::A(self.inner.call(request))
}
}

// ===== impl Protocol =====


impl Protocol {

pub fn detect<B>(req: &http::Request<B>) -> Self {
if req.version() == http::Version::HTTP_2 {
return Protocol::Http2
}

// If the request has an authority part, use that as the host part of
// the key for an HTTP/1.x request.
let host = req.uri().authority_part()
.cloned()
.or_else(|| h1::authority_from_host(req))
.map(Host::Authority)
.unwrap_or_else(|| Host::NoAuthority);

Protocol::Http1(host)
}

pub fn is_cachable(&self) -> bool {
match *self {
Protocol::Http2 | Protocol::Http1(Host::Authority(_)) => true,
_ => false,
}
}

pub fn into_key<T>(self, key: T) -> Reuse<(T, Protocol)> {
if self.is_cachable() {
Reuse::Reusable((key, self))
} else {
Reuse::SingleUse((key, self))
}
}
}
Loading

0 comments on commit 569d693

Please sign in to comment.