From 2c953139d071169eb650db0494ab2683bfce461b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 29 Jan 2021 09:40:21 -0800 Subject: [PATCH] Simplify opencensus client control flow (#790) * Simplify opencensus client control flow * +trace * Flush traces after idle timeout Adds a 10s idle timeout so that traces are flushed when traffic stops. Increases the span buffer to 1000 spans and minimizes the initial allocation so that the buffer grows with use. Improves opencensus factoring by splitting more methods. * More yakshaving: make arguments explicit, avoid cloning the client * Remove unused IntoService helper * Drive both the response future and export stream together --- Cargo.lock | 1 + linkerd/app/src/oc_collector.rs | 2 +- linkerd/opencensus/Cargo.toml | 3 +- linkerd/opencensus/src/lib.rs | 198 ++++++++++++++++++++------------ 4 files changed, 128 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e5d3a5cb9..cbe30ef3fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1251,6 +1251,7 @@ dependencies = [ name = "linkerd2-opencensus" version = "0.1.0" dependencies = [ + "futures 0.3.5", "http", "http-body", "linkerd2-error", diff --git a/linkerd/app/src/oc_collector.rs b/linkerd/app/src/oc_collector.rs index d9f555adfa..ca0cad7533 100644 --- a/linkerd/app/src/oc_collector.rs +++ b/linkerd/app/src/oc_collector.rs @@ -72,7 +72,7 @@ impl Config { let addr = addr.clone(); Box::pin(async move { - debug!(peer.addr = ?addr, "running"); + debug!(peer.addr = ?addr, "Running"); opencensus::export_spans(svc, node, spans_rx, metrics).await }) }; diff --git a/linkerd/opencensus/Cargo.toml b/linkerd/opencensus/Cargo.toml index b66294f9f4..e6ded4ab11 100644 --- a/linkerd/opencensus/Cargo.toml +++ b/linkerd/opencensus/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" publish = false [dependencies] +futures = "0.3" http = "0.2" http-body = "0.4" linkerd2-error = { path = "../error" } @@ -13,5 +14,5 @@ linkerd2-metrics = { path = "../metrics" } opencensus-proto = { path = "../../opencensus-proto" } tonic = { version = "0.3", default-features = false, features = ["prost", "codegen"] } tower = { version = "0.4", default-features = false } -tokio = { version = "0.3", features = ["macros", "sync", "stream"] } +tokio = { version = "0.3", features = ["macros", "sync", "stream", "time"] } tracing = "0.1.22" diff --git a/linkerd/opencensus/src/lib.rs b/linkerd/opencensus/src/lib.rs index 3dedb8d114..9dc0552a4d 100644 --- a/linkerd/opencensus/src/lib.rs +++ b/linkerd/opencensus/src/lib.rs @@ -1,4 +1,8 @@ #![deny(warnings, rust_2018_idioms)] + +pub mod metrics; + +use futures::FutureExt; use http_body::Body as HttpBody; use linkerd2_error::Error; use metrics::Registry; @@ -8,15 +12,13 @@ use opencensus_proto::agent::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; use opencensus_proto::trace::v1::Span; -use std::convert::TryInto; -use std::task::{Context, Poll}; use tokio::{ stream::{Stream, StreamExt}, sync::mpsc, + time, }; use tonic::{self as grpc, body::BoxBody, client::GrpcService}; -use tracing::trace; -pub mod metrics; +use tracing::{debug, trace}; pub async fn export_spans(client: T, node: Node, spans: S, metrics: Registry) where @@ -37,16 +39,22 @@ struct SpanExporter { metrics: Registry, } -// ===== impl SpanExporter ===== +#[derive(Debug)] +struct SpanRxClosed; + +// === impl SpanExporter === impl SpanExporter where - T: GrpcService + Clone, + T: GrpcService, T::Error: Into, ::Error: Into + Send + Sync, T::ResponseBody: 'static, S: Stream + Unpin, { + const MAX_BATCH_SIZE: usize = 1000; + const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10); + fn new(client: T, node: Node, spans: S, metrics: Registry) -> Self { Self { client, @@ -56,87 +64,129 @@ where } } - async fn run(mut self) { - 'reconnect: loop { - let (request_tx, request_rx) = mpsc::channel(1); - let mut svc = TraceServiceClient::new(self.client.clone()); - let req = grpc::Request::new(request_rx); + async fn run(self) { + let Self { + client, + node, + mut spans, + mut metrics, + } = self; + + // Holds the batch of pending spans. Cleared as the spans are flushed. + // Contains no more than MAX_BATCH_SIZE spans. + let mut accum = Vec::new(); + + let mut svc = TraceServiceClient::new(client); + loop { trace!("Establishing new TraceService::export request"); - self.metrics.start_stream(); - let mut rsp = Box::pin(svc.export(req)); - let mut drive_rsp = true; - - loop { - tokio::select! { - res = &mut rsp, if drive_rsp => match res { - Ok(_) => { - drive_rsp = false; - } - Err(error) => { - tracing::debug!(%error, "response future failed, sending a new request"); - continue 'reconnect; + metrics.start_stream(); + let (tx, rx) = mpsc::channel(1); + + // The node is only transmitted on the first message of each stream. + let mut node = Some(node.clone()); + + // Drive both the response future and the export stream + // simultaneously. + tokio::select! { + res = svc.export(grpc::Request::new(rx)) => match res { + Ok(_rsp) => { + // The response future completed. Continue exporting spans until the + // stream stops accepting them. + if let Err(SpanRxClosed) = Self::export(&tx, &mut spans, &mut accum, &mut node).await { + // No more spans. + return; } - }, - res = self.send_batch(&request_tx) => match res { - Ok(false) => return, // The span strean has ended --- the proxy is shutting down. - Ok(true) => {} // Continue the inner loop and send a new batch of spans. - Err(()) => continue 'reconnect, } - } + Err(error) => { + debug!(%error, "Response future failed; restarting"); + } + }, + res = Self::export(&tx, &mut spans, &mut accum, &mut node) => match res { + // The export stream closed; reconnect. + Ok(()) => {}, + // No more spans. + Err(SpanRxClosed) => return, + }, } } } - async fn send_batch( - &mut self, + /// Accumulate spans and send them on the export stream. + /// + /// Returns an error when the proxy has closed the span stream. + async fn export( tx: &mpsc::Sender, - ) -> Result { - const MAX_BATCH_SIZE: usize = 100; - // If the sender is dead, return an error so we can reconnect. - let send = tx.reserve().await.map_err(|_| ())?; - - let mut spans = Vec::new(); - let mut can_send_more = false; - while let Some(span) = self.spans.next().await { - spans.push(span); - if spans.len() == MAX_BATCH_SIZE { - can_send_more = true; - break; - } - } + spans: &mut S, + accum: &mut Vec, + node: &mut Option, + ) -> Result<(), SpanRxClosed> { + loop { + // Collect spans into a batch. + let collect = Self::collect_batch(spans, accum).await; - if spans.is_empty() { - return Ok(false); - } + // If we collected spans, flush them. + if !accum.is_empty() { + // Once a batch has been accumulated, ensure that the + // request stream is ready to accept the batch. + match tx.reserve().await { + Ok(tx) => { + let msg = ExportTraceServiceRequest { + spans: accum.drain(..).collect(), + node: node.take(), + ..Default::default() + }; + trace!( + node = msg.node.is_some(), + spans = msg.spans.len(), + "Sending batch" + ); + tx.send(msg); + } + Err(error) => { + // If the channel isn't open, start a new stream + // and retry sending the batch. + debug!(%error, "Request stream lost; restarting"); + return Ok(()); + } + } + } - if let Ok(num_spans) = spans.len().try_into() { - self.metrics.send(num_spans); + // If the span source was closed, end the task. + if let Err(closed) = collect { + debug!("Span channel lost"); + return Err(closed); + } } - let req = ExportTraceServiceRequest { - spans, - node: Some(self.node.clone()), - resource: None, - }; - trace!(message = "Transmitting", ?req); - send.send(req); - Ok(can_send_more) } -} - -struct IntoService(T); -impl tower::Service> for IntoService -where - T: GrpcService, -{ - type Response = http::Response; - type Future = T::Future; - type Error = T::Error; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx) - } + /// Collects spans from the proxy into `accum`. + /// + /// Returns an error when the span sream has completed. An error may be + /// returned after accumulating spans. + async fn collect_batch(spans: &mut S, accum: &mut Vec) -> Result<(), SpanRxClosed> { + loop { + if accum.len() == Self::MAX_BATCH_SIZE { + trace!(capacity = Self::MAX_BATCH_SIZE, "Batch capacity reached"); + return Ok(()); + } - fn call(&mut self, req: http::Request) -> Self::Future { - self.0.call(req) + futures::select_biased! { + res = spans.next().fuse() => match res { + Some(span) => { + trace!(?span, "Adding to batch"); + accum.push(span); + } + None => return Err(SpanRxClosed), + }, + // Don't hold spans indefinitely. Return if we hit an idle + // timeout and spans have been collected. + _ = time::sleep(Self::MAX_BATCH_IDLE).fuse() => { + if !accum.is_empty() { + trace!(spans = accum.len(), "Flushing spans due to inactivitiy"); + return Ok(()); + } + } + } + } } }