Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

detect: Make protocol detection more robust #744

Merged
merged 66 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
f5e272e
wip: Add a more robust HTTP detection scheme
olix0r Nov 14, 2020
4444518
Decouple HTTP server from detection
olix0r Nov 14, 2020
352e013
remove defunct test
olix0r Nov 14, 2020
05ec556
fmt
olix0r Nov 14, 2020
24f8c55
cache: Make the cache cloneable with RwLock
olix0r Nov 14, 2020
75b355c
Merge branch 'ver/cache-clone' into ver/detect
olix0r Nov 15, 2020
5ac38a6
Wire up new detection logic
olix0r Nov 15, 2020
6c9192e
touchup
olix0r Nov 15, 2020
21b5719
Fix HTTP/2 detection
olix0r Nov 15, 2020
003dc2c
Add HTTP/1 test case
olix0r Nov 15, 2020
6b0d6a6
Remove the unused Peek types
olix0r Nov 16, 2020
1a9f571
Merge branch 'main' into ver/detect
olix0r Nov 19, 2020
1e53db8
touchup
olix0r Nov 19, 2020
45aa9f2
Merge branch 'main' into ver/detect
olix0r Dec 3, 2020
87befc2
Merge branch 'main' into ver/detect
olix0r Dec 4, 2020
d92caf5
Robustify parser tests
olix0r Dec 5, 2020
9ebfde5
Cleanup: commentary + clarity
olix0r Dec 5, 2020
b3a0fc2
Rename DetectHttp to NewServeHttp
olix0r Dec 5, 2020
26ec212
Merge branch 'ver/http-server' into ver/detect
olix0r Dec 5, 2020
a14ebf1
Remove unneeded special-case
olix0r Dec 5, 2020
f6bd87a
nit
olix0r Dec 5, 2020
b243b0c
More testing
olix0r Dec 5, 2020
06424fb
Indexing touchup for clarity
olix0r Dec 5, 2020
9c49868
Logging touchup
olix0r Dec 5, 2020
4b938b8
More robust h2 testing
olix0r Dec 5, 2020
bc8ce98
Merge branch 'main' into ver/detect
olix0r Dec 5, 2020
e7d061e
Merge branch 'main' into ver/detect
olix0r Dec 5, 2020
ac67bf3
Add protocol to stack metrics; add TCP scopes
olix0r Dec 10, 2020
9ca96e9
cache: Ensure that actively held services are not evicted
olix0r Dec 11, 2020
0eaffb9
fixup test
olix0r Dec 11, 2020
d9e0adf
Merge branch 'ver/cache-retain' into ver/detect
olix0r Dec 11, 2020
18b7123
Remove unnecessary buffers
olix0r Dec 11, 2020
959fa53
Remove unneeded buffer
olix0r Dec 11, 2020
f734da9
Comment out cache around http server
olix0r Dec 11, 2020
5823cce
detect: Retain service until connection is complete
olix0r Dec 11, 2020
1c721a0
Merge branch 'main' into ver/detect
olix0r Dec 12, 2020
c7ac588
cache: Only spawn a single task per cache entry
olix0r Dec 12, 2020
77a3d59
touchup
olix0r Dec 12, 2020
92c85b6
Split eviction task out into function
olix0r Dec 12, 2020
47ce0fd
Merge branch 'ver/cache-tasks' into ver/detect
olix0r Dec 12, 2020
df4dd94
More debug logging
olix0r Dec 12, 2020
7d685cf
Restore detection timeouts
olix0r Dec 12, 2020
726a0c5
Increase default dispatch timeouts to 10s
olix0r Dec 12, 2020
b28f592
typo
olix0r Dec 12, 2020
4b99db1
Include the number of bytes read in protocol detection error message
olix0r Dec 12, 2020
3515eee
Even greater dispatch timeout for debugging protocol detection timeouts
olix0r Dec 12, 2020
7b6c3a8
Add diagnostic logs for detection time
olix0r Dec 12, 2020
0c823a6
Revert timeout changes
olix0r Dec 12, 2020
daa0025
Make `Option` part of the Detect interface
olix0r Dec 12, 2020
55309d0
Merge branch 'main' into ver/detect
olix0r Dec 14, 2020
9c6fd01
fixup tests
olix0r Dec 14, 2020
4f420b5
fixup test dependency
olix0r Dec 14, 2020
f649eed
Make HTTP detection more robust
olix0r Dec 14, 2020
ca6b7a4
Disable detection timeout errors for now
olix0r Dec 14, 2020
b374e81
Decouple detect timeout from detect impls
olix0r Dec 14, 2020
d61a0bd
Use windows rather than manual buffer indices
olix0r Dec 15, 2020
b9b2ea7
Fixup tracing in detect tests
olix0r Dec 15, 2020
179dbce
Merge branch 'main' into ver/detect
olix0r Dec 15, 2020
39f69a4
Increase default dispatch timeout
olix0r Dec 15, 2020
fde2dbe
Fix tests by adding newline to TCP client message
olix0r Dec 15, 2020
bd1e3bf
Lessen detection buffer capacity
olix0r Dec 15, 2020
560dba2
detect: Make protocol detection more robust
olix0r Dec 15, 2020
41152e8
Add a protocol label to stack metrics
olix0r Dec 15, 2020
b090162
Merge branch 'ver/stack-protocol' into ver/detect
olix0r Dec 15, 2020
6e220d6
fix test tracing
olix0r Dec 15, 2020
ffd96ac
Don't set maybe_h1 to false based on partial h2 preface
olix0r Dec 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ dependencies = [
name = "linkerd2-proxy-http"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes 0.6.0",
"futures 0.3.5",
"h2 0.3.0",
Expand All @@ -1375,14 +1376,17 @@ dependencies = [
"linkerd2-http-box",
"linkerd2-identity",
"linkerd2-io",
"linkerd2-proxy-transport",
"linkerd2-stack",
"linkerd2-timeout",
"pin-project 1.0.2",
"rand 0.7.2",
"tokio 0.3.5",
"tokio-test 0.3.0",
"tower",
"tracing",
"tracing-futures",
"tracing-subscriber",
"try-lock",
]

Expand Down Expand Up @@ -1459,6 +1463,7 @@ name = "linkerd2-proxy-transport"
version = "0.1.0"
dependencies = [
"async-stream 0.2.1",
"async-trait",
"bytes 0.6.0",
"futures 0.3.5",
"indexmap",
Expand Down
13 changes: 8 additions & 5 deletions linkerd/app/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct EndpointLabels {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StackLabels {
pub direction: Direction,
pub protocol: &'static str,
pub name: &'static str,
}

Expand Down Expand Up @@ -297,24 +298,26 @@ impl FmtLabels for TlsId {
// === impl StackLabels ===

impl StackLabels {
pub fn inbound(name: &'static str) -> Self {
pub fn inbound(protocol: &'static str, name: &'static str) -> Self {
Self {
direction: Direction::In,
name,
protocol,
direction: Direction::In,
}
}

pub fn outbound(name: &'static str) -> Self {
pub fn outbound(protocol: &'static str, name: &'static str) -> Self {
Self {
direction: Direction::Out,
name,
protocol,
direction: Direction::Out,
}
}
}

impl FmtLabels for StackLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.direction.fmt_labels(f)?;
write!(f, ",name=\"{}\"", self.name)
write!(f, ",protocol=\"{}\",name=\"{}\"", self.protocol, self.name)
}
}
22 changes: 12 additions & 10 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl Config {
svc::layers()
.push_failfast(dispatch_timeout)
.push_spawn_buffer(buffer_capacity)
.push(metrics.stack.layer(stack_labels("logical"))),
.push(metrics.stack.layer(stack_labels("http", "logical"))),
)
.push_cache(cache_max_idle_age)
.push_on_response(
Expand Down Expand Up @@ -317,10 +317,10 @@ impl Config {
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Unpin + Send + 'static,
F: svc::NewService<TcpEndpoint, Service = A> + Unpin + Clone + Send + 'static,
A: tower::Service<io::PrefixedIo<I>, Response = ()> + Clone + Send + 'static,
A: tower::Service<io::PrefixedIo<I>, Response = ()> + Clone + Send + Sync + 'static,
A::Error: Into<Error>,
A::Future: Send,
H: svc::NewService<Target, Service = S> + Unpin + Clone + Send + 'static,
H: svc::NewService<Target, Service = S> + Unpin + Clone + Send + Sync + 'static,
S: tower::Service<
http::Request<http::boxed::BoxBody>,
Response = http::Response<http::boxed::BoxBody>,
Expand All @@ -335,7 +335,7 @@ impl Config {
dispatch_timeout,
max_in_flight_requests,
detect_protocol_timeout,
buffer_capacity,
cache_max_idle_age,
..
} = self.proxy.clone();

Expand Down Expand Up @@ -371,7 +371,7 @@ impl Config {
.push(TraceContext::layer(span_sink.map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
.push(metrics.stack.layer(stack_labels("source")))
.push(metrics.stack.layer(stack_labels("http", "server")))
.box_http_request()
.box_http_response(),
)
Expand All @@ -389,10 +389,12 @@ impl Config {
.into_inner(),
drain.clone(),
))
.push_on_response(svc::layers().push_spawn_buffer(buffer_capacity).push(
transport::Prefix::layer(
http::Version::DETECT_BUFFER_CAPACITY,
.check_new_clone::<(Option<http::Version>, TcpAccept)>()
.push_cache(cache_max_idle_age)
.push(transport::NewDetectService::layer(
transport::detect::DetectTimeout::new(
detect_protocol_timeout,
http::DetectHttp::default(),
),
))
.into_inner()
Expand Down Expand Up @@ -458,8 +460,8 @@ pub fn trace_labels() -> HashMap<String, String> {
l
}

fn stack_labels(name: &'static str) -> metrics::StackLabels {
metrics::StackLabels::inbound(name)
fn stack_labels(proto: &'static str, name: &'static str) -> metrics::StackLabels {
metrics::StackLabels::inbound(proto, name)
}

// === impl SkipByPort ===
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn nonblocking_identity_detection() {
.await;
let proxy = proxy::new().identity(id_svc);

let msg1 = "custom tcp hello";
let msg1 = "custom tcp hello\n";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/TIOLI: do you think it's worth also having some tests with \r\ns in them, just to make sure we're handling that correctly? the logic isn't too complex so i'm not too concerned about it, just a thought...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the http detect tests have specific tests for this sort of thing. i think the integration tests should be as simple as possible to test whatever we expect them to tests.

let msg2 = "custom tcp bye";
let srv = server::tcp()
.accept(move |read| {
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn tcp_waits_for_proxies_to_close() {
let _trace = trace_init();

let (shdn, rx) = shutdown_signal();
let msg1 = "custom tcp hello";
let msg1 = "custom tcp hello\n";
let msg2 = "custom tcp bye";

let srv = server::tcp()
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Fixture {
}

impl TcpFixture {
const HELLO_MSG: &'static str = "custom tcp hello";
const HELLO_MSG: &'static str = "custom tcp hello\n";
const BYE_MSG: &'static str = "custom tcp bye";

async fn server() -> server::Listening {
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/integration/src/tests/transparency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn inbound_http1() {
async fn outbound_tcp() {
let _trace = trace_init();

let msg1 = "custom tcp hello";
let msg1 = "custom tcp hello\n";
let msg2 = "custom tcp bye";

let srv = server::tcp()
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn outbound_tcp() {
async fn outbound_tcp_external() {
let _trace = trace_init();

let msg1 = "custom tcp hello";
let msg1 = "custom tcp hello\n";
let msg2 = "custom tcp bye";

let srv = server::tcp()
Expand Down Expand Up @@ -127,7 +127,7 @@ async fn outbound_tcp_external() {
async fn inbound_tcp() {
let _trace = trace_init();

let msg1 = "custom tcp hello";
let msg1 = "custom tcp hello\n";
let msg2 = "custom tcp bye";

let srv = server::tcp()
Expand Down Expand Up @@ -296,7 +296,7 @@ async fn tcp_server_first_tls() {
async fn tcp_connections_close_if_client_closes() {
let _trace = trace_init();

let msg1 = "custom tcp hello";
let msg1 = "custom tcp hello\n";
let msg2 = "custom tcp bye";

let (mut tx, mut rx) = mpsc::channel(1);
Expand Down
8 changes: 6 additions & 2 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ where
.push_on_response(
svc::layers()
.push(svc::layer::mk(svc::SpawnReady::new))
.push(metrics.stack.layer(stack_labels("balance.endpoint")))
.push(
metrics
.stack
.layer(stack_labels("http", "balance.endpoint")),
)
.box_http_request(),
)
.check_new_service::<Endpoint, http::Request<_>>()
Expand All @@ -71,7 +75,7 @@ where
// If the balancer has been empty/unavailable for 10s, eagerly fail
// requests.
.push_failfast(dispatch_timeout)
.push(metrics.stack.layer(stack_labels("concrete"))),
.push(metrics.stack.layer(stack_labels("http", "concrete"))),
)
.into_new_service()
.check_new_service::<Concrete, http::Request<_>>()
Expand Down
13 changes: 8 additions & 5 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ where
Error = Error,
> + Clone
+ Send
+ Sync
+ 'static,
TSvc::Future: Send,
H: svc::NewService<http::Logical, Service = HSvc> + Unpin + Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -110,7 +111,7 @@ where
.push(TraceContext::layer(span_sink.clone().map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
.push(metrics.stack.layer(stack_labels("source")))
.push(metrics.stack.layer(stack_labels("http", "server")))
.push_failfast(dispatch_timeout)
.push_spawn_buffer(buffer_capacity)
.box_http_response(),
Expand All @@ -130,11 +131,13 @@ where
.into_inner();

svc::stack(http::NewServeHttp::new(h2_settings, http, tcp, drain))
.check_new_service::<tcp::Accept, io::PrefixedIo<transport::metrics::SensorIo<I>>>()
.push_on_response(svc::layers().push_spawn_buffer(buffer_capacity).push(
transport::Prefix::layer(
http::Version::DETECT_BUFFER_CAPACITY,
.check_new_service::<(Option<http::Version>, tcp::Accept), io::PrefixedIo<transport::metrics::SensorIo<I>>>()
.check_new_clone::<(Option<http::Version>, tcp::Accept)>()
olix0r marked this conversation as resolved.
Show resolved Hide resolved
.push_cache(cache_max_idle_age)
.push(transport::NewDetectService::layer(
transport::detect::DetectTimeout::new(
detect_protocol_timeout,
http::DetectHttp::default(),
),
))
.check_new_service::<tcp::Accept, transport::metrics::SensorIo<I>>()
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub struct Config {
pub allow_discovery: AddrMatch,
}

fn stack_labels(name: &'static str) -> metrics::StackLabels {
metrics::StackLabels::outbound(name)
fn stack_labels(proto: &'static str, name: &'static str) -> metrics::StackLabels {
metrics::StackLabels::outbound(proto, name)
}

pub fn trace_labels() -> HashMap<String, String> {
Expand Down
15 changes: 10 additions & 5 deletions linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ where
max_in_flight_requests,
detect_protocol_timeout,
buffer_capacity,
cache_max_idle_age,
..
} = config.proxy.clone();

Expand All @@ -176,7 +177,7 @@ where
.push(TraceContext::layer(span_sink.clone().map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
.push(metrics.stack.layer(stack_labels("source")))
.push(metrics.stack.layer(stack_labels("http", "server")))
.push_failfast(dispatch_timeout)
.push_spawn_buffer(buffer_capacity)
.box_http_response(),
Expand All @@ -193,6 +194,7 @@ where
.check_make_service::<tcp::Endpoint, ()>()
.push_on_response(svc::layer::mk(tcp::Forward::new))
.into_new_service()
.push_on_response(metrics.stack.layer(stack_labels("tcp", "forward")))
.check_new_service::<tcp::Endpoint, transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()
.push_map_target(tcp::Endpoint::from_logical(
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery,
Expand Down Expand Up @@ -221,11 +223,13 @@ where
tcp_balance,
drain.clone(),
))
.check_new_service::<tcp::Logical, transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()
.push_on_response(svc::layers().push_spawn_buffer(buffer_capacity).push(
transport::Prefix::layer(
http::Version::DETECT_BUFFER_CAPACITY,
.check_new_clone::<(Option<http::Version>, tcp::Logical)>()
.check_new_service::<(Option<http::Version>, tcp::Logical), transport::io::PrefixedIo<transport::metrics::SensorIo<I>>>()
.push_cache(cache_max_idle_age)
.push(transport::NewDetectService::layer(
transport::detect::DetectTimeout::new(
detect_protocol_timeout,
http::DetectHttp::default(),
),
))
.check_new_service::<tcp::Logical, transport::metrics::SensorIo<I>>()
Expand All @@ -235,6 +239,7 @@ where
.push_map_target(tcp::Endpoint::from_logical(
tls::ReasonForNoPeerName::PortSkipped,
))
.push_on_response(metrics.stack.layer(stack_labels("tcp", "opaque")))
.check_new_service::<tcp::Logical, transport::metrics::SensorIo<I>>()
.into_inner();

Expand Down
15 changes: 15 additions & 0 deletions linkerd/app/outbound/src/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ impl<P> Logical<P> {
}
}

impl<P: PartialEq> PartialEq<Logical<P>> for Logical<P> {
fn eq(&self, other: &Logical<P>) -> bool {
self.orig_dst == other.orig_dst && self.protocol == other.protocol
}
}

impl<P: Eq> Eq for Logical<P> {}

impl<P: std::hash::Hash> std::hash::Hash for Logical<P> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.orig_dst.hash(state);
self.protocol.hash(state);
}
}

impl<P: std::fmt::Debug> std::fmt::Debug for Logical<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Logical")
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/tcp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ where
svc
};
async move {
let io = support::io().read(b"hello\r\n").write(b"world").build();
let io = support::io().read(b"hello\n").write(b"world").build();
let res = svc.oneshot(io).err_into::<Error>().await;
tracing::trace!(?res);
if let Err(err) = res {
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ pub const DEFAULT_INBOUND_LISTEN_ADDR: &str = "0.0.0.0:4143";
pub const DEFAULT_CONTROL_LISTEN_ADDR: &str = "0.0.0.0:4190";
const DEFAULT_ADMIN_LISTEN_ADDR: &str = "127.0.0.1:4191";
const DEFAULT_METRICS_RETAIN_IDLE: Duration = Duration::from_secs(10 * 60);
const DEFAULT_INBOUND_DISPATCH_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_INBOUND_DISPATCH_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_INBOUND_CONNECT_TIMEOUT: Duration = Duration::from_millis(100);
const DEFAULT_INBOUND_CONNECT_BACKOFF: ExponentialBackoff = ExponentialBackoff {
min: Duration::from_millis(100),
max: Duration::from_millis(500),
jitter: 0.1,
};
const DEFAULT_OUTBOUND_DISPATCH_TIMEOUT: Duration = Duration::from_secs(3);
const DEFAULT_OUTBOUND_DISPATCH_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_OUTBOUND_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_OUTBOUND_CONNECT_BACKOFF: ExponentialBackoff = ExponentialBackoff {
min: Duration::from_millis(100),
Expand Down
2 changes: 1 addition & 1 deletion linkerd/concurrency-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct ConcurrencyLimit<T> {
}

enum State {
Waiting(Pin<Box<dyn Future<Output = OwnedSemaphorePermit> + Send + 'static>>),
Waiting(Pin<Box<dyn Future<Output = OwnedSemaphorePermit> + Send + Sync + 'static>>),
Ready(OwnedSemaphorePermit),
Empty,
}
Expand Down
2 changes: 0 additions & 2 deletions linkerd/io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
mod boxed;
mod peek;
mod prefixed;
mod sensor;

pub use self::{
boxed::BoxedIo,
peek::{Peek, Peekable},
prefixed::PrefixedIo,
sensor::{Sensor, SensorIo},
};
Expand Down
Loading