Skip to content

Commit

Permalink
Simplify opencensus client control flow (#790)
Browse files Browse the repository at this point in the history
* Simplify opencensus client control flow

* +trace

* Flush traces after idle timeout

Adds a 10s idle timeout so that traces are flushed when traffic stops.

Increases the span buffer to 1000 spans and minimizes the initial
allocation so that the buffer grows with use.

Improves opencensus factoring by splitting more methods.

* More yakshaving: make arguments explicit, avoid cloning the client

* Remove unused IntoService helper

* Drive both the response future and export stream together
  • Loading branch information
olix0r authored Jan 29, 2021
1 parent fef3f1f commit 2c95313
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 76 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,7 @@ dependencies = [
name = "linkerd2-opencensus"
version = "0.1.0"
dependencies = [
"futures 0.3.5",
"http",
"http-body",
"linkerd2-error",
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/src/oc_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Config {

let addr = addr.clone();
Box::pin(async move {
debug!(peer.addr = ?addr, "running");
debug!(peer.addr = ?addr, "Running");
opencensus::export_spans(svc, node, spans_rx, metrics).await
})
};
Expand Down
3 changes: 2 additions & 1 deletion linkerd/opencensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ edition = "2018"
publish = false

[dependencies]
futures = "0.3"
http = "0.2"
http-body = "0.4"
linkerd2-error = { path = "../error" }
linkerd2-metrics = { path = "../metrics" }
opencensus-proto = { path = "../../opencensus-proto" }
tonic = { version = "0.3", default-features = false, features = ["prost", "codegen"] }
tower = { version = "0.4", default-features = false }
tokio = { version = "0.3", features = ["macros", "sync", "stream"] }
tokio = { version = "0.3", features = ["macros", "sync", "stream", "time"] }
tracing = "0.1.22"
198 changes: 124 additions & 74 deletions linkerd/opencensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#![deny(warnings, rust_2018_idioms)]

pub mod metrics;

use futures::FutureExt;
use http_body::Body as HttpBody;
use linkerd2_error::Error;
use metrics::Registry;
Expand All @@ -8,15 +12,13 @@ use opencensus_proto::agent::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opencensus_proto::trace::v1::Span;
use std::convert::TryInto;
use std::task::{Context, Poll};
use tokio::{
stream::{Stream, StreamExt},
sync::mpsc,
time,
};
use tonic::{self as grpc, body::BoxBody, client::GrpcService};
use tracing::trace;
pub mod metrics;
use tracing::{debug, trace};

pub async fn export_spans<T, S>(client: T, node: Node, spans: S, metrics: Registry)
where
Expand All @@ -37,16 +39,22 @@ struct SpanExporter<T, S> {
metrics: Registry,
}

// ===== impl SpanExporter =====
#[derive(Debug)]
struct SpanRxClosed;

// === impl SpanExporter ===

impl<T, S> SpanExporter<T, S>
where
T: GrpcService<BoxBody> + Clone,
T: GrpcService<BoxBody>,
T::Error: Into<Error>,
<T::ResponseBody as HttpBody>::Error: Into<Error> + Send + Sync,
T::ResponseBody: 'static,
S: Stream<Item = Span> + Unpin,
{
const MAX_BATCH_SIZE: usize = 1000;
const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10);

fn new(client: T, node: Node, spans: S, metrics: Registry) -> Self {
Self {
client,
Expand All @@ -56,87 +64,129 @@ where
}
}

async fn run(mut self) {
'reconnect: loop {
let (request_tx, request_rx) = mpsc::channel(1);
let mut svc = TraceServiceClient::new(self.client.clone());
let req = grpc::Request::new(request_rx);
async fn run(self) {
let Self {
client,
node,
mut spans,
mut metrics,
} = self;

// Holds the batch of pending spans. Cleared as the spans are flushed.
// Contains no more than MAX_BATCH_SIZE spans.
let mut accum = Vec::new();

let mut svc = TraceServiceClient::new(client);
loop {
trace!("Establishing new TraceService::export request");
self.metrics.start_stream();
let mut rsp = Box::pin(svc.export(req));
let mut drive_rsp = true;

loop {
tokio::select! {
res = &mut rsp, if drive_rsp => match res {
Ok(_) => {
drive_rsp = false;
}
Err(error) => {
tracing::debug!(%error, "response future failed, sending a new request");
continue 'reconnect;
metrics.start_stream();
let (tx, rx) = mpsc::channel(1);

// The node is only transmitted on the first message of each stream.
let mut node = Some(node.clone());

// Drive both the response future and the export stream
// simultaneously.
tokio::select! {
res = svc.export(grpc::Request::new(rx)) => match res {
Ok(_rsp) => {
// The response future completed. Continue exporting spans until the
// stream stops accepting them.
if let Err(SpanRxClosed) = Self::export(&tx, &mut spans, &mut accum, &mut node).await {
// No more spans.
return;
}
},
res = self.send_batch(&request_tx) => match res {
Ok(false) => return, // The span strean has ended --- the proxy is shutting down.
Ok(true) => {} // Continue the inner loop and send a new batch of spans.
Err(()) => continue 'reconnect,
}
}
Err(error) => {
debug!(%error, "Response future failed; restarting");
}
},
res = Self::export(&tx, &mut spans, &mut accum, &mut node) => match res {
// The export stream closed; reconnect.
Ok(()) => {},
// No more spans.
Err(SpanRxClosed) => return,
},
}
}
}

async fn send_batch(
&mut self,
/// Accumulate spans and send them on the export stream.
///
/// Returns an error when the proxy has closed the span stream.
async fn export(
tx: &mpsc::Sender<ExportTraceServiceRequest>,
) -> Result<bool, ()> {
const MAX_BATCH_SIZE: usize = 100;
// If the sender is dead, return an error so we can reconnect.
let send = tx.reserve().await.map_err(|_| ())?;

let mut spans = Vec::new();
let mut can_send_more = false;
while let Some(span) = self.spans.next().await {
spans.push(span);
if spans.len() == MAX_BATCH_SIZE {
can_send_more = true;
break;
}
}
spans: &mut S,
accum: &mut Vec<Span>,
node: &mut Option<Node>,
) -> Result<(), SpanRxClosed> {
loop {
// Collect spans into a batch.
let collect = Self::collect_batch(spans, accum).await;

if spans.is_empty() {
return Ok(false);
}
// If we collected spans, flush them.
if !accum.is_empty() {
// Once a batch has been accumulated, ensure that the
// request stream is ready to accept the batch.
match tx.reserve().await {
Ok(tx) => {
let msg = ExportTraceServiceRequest {
spans: accum.drain(..).collect(),
node: node.take(),
..Default::default()
};
trace!(
node = msg.node.is_some(),
spans = msg.spans.len(),
"Sending batch"
);
tx.send(msg);
}
Err(error) => {
// If the channel isn't open, start a new stream
// and retry sending the batch.
debug!(%error, "Request stream lost; restarting");
return Ok(());
}
}
}

if let Ok(num_spans) = spans.len().try_into() {
self.metrics.send(num_spans);
// If the span source was closed, end the task.
if let Err(closed) = collect {
debug!("Span channel lost");
return Err(closed);
}
}
let req = ExportTraceServiceRequest {
spans,
node: Some(self.node.clone()),
resource: None,
};
trace!(message = "Transmitting", ?req);
send.send(req);
Ok(can_send_more)
}
}

struct IntoService<T>(T);

impl<T, R> tower::Service<http::Request<R>> for IntoService<T>
where
T: GrpcService<R>,
{
type Response = http::Response<T::ResponseBody>;
type Future = T::Future;
type Error = T::Error;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
/// Collects spans from the proxy into `accum`.
///
/// Returns an error when the span sream has completed. An error may be
/// returned after accumulating spans.
async fn collect_batch(spans: &mut S, accum: &mut Vec<Span>) -> Result<(), SpanRxClosed> {
loop {
if accum.len() == Self::MAX_BATCH_SIZE {
trace!(capacity = Self::MAX_BATCH_SIZE, "Batch capacity reached");
return Ok(());
}

fn call(&mut self, req: http::Request<R>) -> Self::Future {
self.0.call(req)
futures::select_biased! {
res = spans.next().fuse() => match res {
Some(span) => {
trace!(?span, "Adding to batch");
accum.push(span);
}
None => return Err(SpanRxClosed),
},
// Don't hold spans indefinitely. Return if we hit an idle
// timeout and spans have been collected.
_ = time::sleep(Self::MAX_BATCH_IDLE).fuse() => {
if !accum.is_empty() {
trace!(spans = accum.len(), "Flushing spans due to inactivitiy");
return Ok(());
}
}
}
}
}
}

0 comments on commit 2c95313

Please sign in to comment.