Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attempt to get otel tracing to work with tempo #10

Merged
merged 16 commits into from
Mar 31, 2021
Merged
601 changes: 390 additions & 211 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 15 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,29 @@ actix-rt = "2.1.0"
actix-web = "4.0.0-beta.1"
futures = "0.3.13"
tokio = { version = "1.1.0", features = ["macros"] }
prometheus = "0.12.0"
kube = { version = "0.51.0", features = ["derive"] }
kube-runtime = "0.51.0"
#prometheus = "0.12.0"
kube = { version = "0.52.0", features = ["derive"] }
kube-runtime = "0.52.0"
k8s-openapi = { version = "0.11.0", features = ["v1_19"], default-features=false }
serde = { version = "1.0.123", features = ["derive"] }
serde_json = "1.0.64"
chrono = { version = "0.4.19", features = ["serde"] }
color-eyre = "0.5.10"
snafu = "0.6.10"
thiserror = "1.0.24"
schemars = { version = "0.8.0", features = ["chrono"] }
serde_yaml = "0.8.17"
tracing = "0.1.25"
tracing-subscriber = { version = "0.2.16", features = ["json"] }
tracing-futures = "0.2.5"
schemars = { version = "0.8.0", features = ["chrono"] }
serde_yaml = "0.8.17"
tracing-opentelemetry = "0.11.0"
opentelemetry = { version = "0.12.0", features = ["trace", "tokio-support"] }
# NB: in 0.13 s/tokio-support/rt-tokio/
opentelemetry-otlp = { version = "0.5.0", features = ["tokio"] }
# NB: otlp 0.6 and otel 0.13 blocked by https://github.com/tokio-rs/tracing/pull/1322

# local dev
#kube = { path = "../kube-rs/kube", features = ["derive"] }
#kube-runtime = { path = "../kube-rs/kube-runtime" }
#kube = { git = "https://github.com/clux/kube-rs.git", rev = "698f421652032aec5302eefa1593a4bee0d28b77", features = ["derive"] }
#kube-runtime = { git = "https://github.com/clux/kube-rs.git", rev = "698f421652032aec5302eefa1593a4bee0d28b77" }
prometheus = { git = "https://github.com/clux/rust-prometheus.git", branch = "master" }
#prometheus = { path = "../rust-prometheus" }
15 changes: 12 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ install:
cargo run --bin crdgen > yaml/foo-crd.yaml
kubectl apply -f yaml/foo-crd.yaml

forward-tempo:
kubectl port-forward -n monitoring service/grafana-agent-traces 55680:55680

run:
cargo run
OPENTELEMETRY_ENDPOINT_URL=https://0.0.0.0:55680 RUST_LOG=info,kube=trace,controller=debug cargo run

compile:
docker run --rm \
Expand All @@ -23,8 +26,6 @@ compile:
mv target/x86_64-unknown-linux-musl/release/controller .

build:
@echo "Reusing built binary in current directory from make compile"
@ls -lah ./controller
docker build -t $(REPO)/$(NAME):$(VERSION) .

tag-latest: build
Expand All @@ -39,3 +40,11 @@ tag-semver: build
docker tag $(REPO)/$(NAME):$(VERSION) $(REPO)/$(NAME):$(SEMVER_VERSION) ; \
docker push $(REPO)/$(NAME):$(SEMVER_VERSION) ; \
fi

# Helpers for debugging with tempo as an otel collector
forward-tempo-metrics:
kubectl port-forward -n monitoring service/grafana-agent-traces 8080:8080
check-tempo-metrics:
curl http://0.0.0.0:8080/metrics -s |grep -E "^tempo_receiver_accepted_span"
# can verify that spans are received from metrics on the grafana-agent-traces
# tempo_receiver_accepted_spans{receiver="otlp",tempo_config="default",transport="grpc"} 4
2 changes: 1 addition & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
overflow_delimited_expr = true
newline_style = "Unix"
merge_imports = true
imports_granularity = "Crate"
reorder_impl_items = true
fn_single_line = false
blank_lines_upper_bound = 2
Expand Down
20 changes: 9 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
#![warn(rust_2018_idioms)]
#![allow(unused_imports)]
use thiserror::Error;

use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
#[derive(Debug, Snafu)]
#[derive(Error, Debug)]
pub enum Error {
#[snafu(display("Failed to patch Foo: {}", source))]
FooPatchFailed {
source: kube::Error,
backtrace: Backtrace,
},
#[error("Kube Api Error: {0}")]
KubeError(#[source] kube::Error),

SerializationFailed {
source: serde_json::Error,
backtrace: Backtrace,
},
#[error("SerializationError: {0}")]
SerializationError(#[source] serde_json::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

Expand All @@ -23,3 +18,6 @@ pub use manager::Manager;

/// Generated type, for crdgen
pub use manager::Foo;

/// Log and trace integrations
pub mod telemetry;
41 changes: 35 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
pub use controller::*;
use prometheus::{Encoder, TextEncoder};
use tracing::{debug, error, info, trace, warn};
use tracing_subscriber::{prelude::*, EnvFilter, Registry};

use actix_web::{
get, middleware,
Expand Down Expand Up @@ -31,13 +32,41 @@ async fn index(c: Data<Manager>, _req: HttpRequest) -> impl Responder {

#[actix_rt::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.json()
.init();
let client = kube::Client::try_default().await.expect("create client");
let (manager, drainer) = Manager::new(client).await;
let otlp_endpoint =
std::env::var("OPENTELEMETRY_ENDPOINT_URL").expect("Need a otel tracing collector configured");

let (tracer, _uninstall) = opentelemetry_otlp::new_pipeline()
.with_endpoint(&otlp_endpoint)
// TODO: opentelemetry_otlp::new_pipeline().with_tonic().install_batch() in 0.6
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
opentelemetry::sdk::Resource::new(vec![opentelemetry::KeyValue::new(
"service.name",
"foo-controller",
)]),
))
.install()
.unwrap();

// Finish layers
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let logger = tracing_subscriber::fmt::layer().json();

let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("info"))
.unwrap();

// Register all subscribers
let collector = Registry::default()
.with(telemetry)
.with(logger)
.with(filter_layer);

tracing::subscriber::set_global_default(collector).unwrap();

// Start kubernetes controller
let (manager, drainer) = Manager::new().await;

// Start web server
let server = HttpServer::new(move || {
App::new()
.data(manager.clone())
Expand Down
80 changes: 57 additions & 23 deletions src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use crate::{Error, FooPatchFailed, Result, SerializationFailed};
use crate::{telemetry, Error, Result};
use chrono::prelude::*;
use futures::{future::BoxFuture, FutureExt, StreamExt};
use kube::{
api::{Api, ListParams, Meta, Patch, PatchParams},
api::{Api, ListParams, Patch, PatchParams, Resource},
client::Client,
CustomResource,
};
use kube_runtime::controller::{Context, Controller, ReconcilerAction};
use prometheus::{default_registry, proto::MetricFamily, register_int_counter, IntCounter};
use prometheus::{
default_registry, proto::MetricFamily, register_histogram_vec, register_int_counter, Exemplar,
HistogramOpts, HistogramVec, IntCounter,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use std::sync::Arc;
use tokio::{sync::RwLock, time::Duration};
use tracing::{debug, error, info, instrument, trace, warn};
use std::{collections::HashMap, sync::Arc};
use tokio::{
sync::RwLock,
time::{Duration, Instant},
};
use tracing::{debug, error, event, field, info, instrument, trace, warn, Level, Span};

/// Our Foo custom resource spec
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
Expand Down Expand Up @@ -42,13 +47,16 @@ struct Data {
metrics: Metrics,
}

#[instrument(skip(ctx))]
#[instrument(skip(ctx), fields(traceID))]
async fn reconcile(foo: Foo, ctx: Context<Data>) -> Result<ReconcilerAction, Error> {
let trace_id = telemetry::get_trace_id();
Span::current().record("traceID", &field::display(&trace_id));
let start = Instant::now();

let client = ctx.get_ref().client.clone();
ctx.get_ref().state.write().await.last_event = Utc::now();
let name = Meta::name(&foo);
let ns = Meta::namespace(&foo).expect("foo is namespaced");
debug!("Reconcile Foo {}: {:?}", name, foo);
let name = Resource::name(&foo);
let ns = Resource::namespace(&foo).expect("foo is namespaced");
let foos: Api<Foo> = Api::namespaced(client, &ns);

let new_status = Patch::Apply(json!({
Expand All @@ -60,17 +68,33 @@ async fn reconcile(foo: Foo, ctx: Context<Data>) -> Result<ReconcilerAction, Err
}
}));
let ps = PatchParams::apply("cntrlr").force();
let _o = foos.patch_status(&name, &ps, &new_status).await.context(FooPatchFailed)?;

let _o = foos
.patch_status(&name, &ps, &new_status)
.await
.map_err(Error::KubeError)?;

let duration = start.elapsed().as_millis() as f64;
println!("duration= {}", duration);

let mut exemplar_labels = HashMap::new();
exemplar_labels.insert("traceID".into(), trace_id);
let ex = Exemplar::new_with_labels(duration, exemplar_labels);
println!("exemplar: {:?}", ex);
ctx.get_ref()
.metrics
.reconcile_duration
.with_label_values(&[])
.observe_with_exemplar(duration, ex);
ctx.get_ref().metrics.handled_events.inc();
info!("Reconciled Foo \"{}\" in {}", name, ns);

// If no events were received, check back every 30 minutes
Ok(ReconcilerAction {
requeue_after: Some(Duration::from_secs(3600 / 2)),
})
}
fn error_policy(error: &Error, _ctx: Context<Data>) -> ReconcilerAction {
warn!("reconcile failed: {}", error);
warn!("reconcile failed: {:?}", error);
ReconcilerAction {
requeue_after: Some(Duration::from_secs(360)),
}
Expand All @@ -80,11 +104,21 @@ fn error_policy(error: &Error, _ctx: Context<Data>) -> ReconcilerAction {
#[derive(Clone)]
pub struct Metrics {
pub handled_events: IntCounter,
pub reconcile_duration: HistogramVec,
}
impl Metrics {
fn new() -> Self {
let reconcile_histogram = register_histogram_vec!(
"foo_controller_reconcile_duration_seconds",
"The duration of reconcile to complete in seconds",
&[],
vec![0.01, 0.1, 0.25, 0.5, 1., 5., 15., 60.]
)
.unwrap();

Metrics {
handled_events: register_int_counter!("handled_events", "handled events").unwrap(),
handled_events: register_int_counter!("foo_controller_handled_events", "handled events").unwrap(),
reconcile_duration: reconcile_histogram,
}
}
}
Expand Down Expand Up @@ -118,7 +152,8 @@ impl Manager {
///
/// This returns a `Manager` that drives a `Controller` + a future to be awaited
/// It is up to `main` to wait for the controller stream.
pub async fn new(client: Client) -> (Self, BoxFuture<'static, ()>) {
pub async fn new() -> (Self, BoxFuture<'static, ()>) {
let client = Client::try_default().await.expect("create client");
let metrics = Metrics::new();
let state = Arc::new(RwLock::new(State::new()));
let context = Context::new(Data {
Expand All @@ -128,18 +163,17 @@ impl Manager {
});

let foos = Api::<Foo>::all(client);
//foos.get("testfoo").await.expect("please run: cargo run --bin crdgen | kubectl apply -f -");
// Ensure CRD is installed before loop-watching
let _r = foos.list(&ListParams::default().limit(1))
.await
.expect("is the crd installed? please run: cargo run --bin crdgen | kubectl apply -f -");

// All good. Start controller and return its future.
let drainer = Controller::new(foos, ListParams::default())
.run(reconcile, error_policy, context)
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|o| {
info!("Reconciled {:?}", o);
futures::future::ready(())
})
.for_each(|_| futures::future::ready(()))
.boxed();
// what we do with the controller stream from .run() ^^ does not matter
// but we do need to consume it, hence general printing + return future

(Self { state, metrics }, drainer)
}
Expand Down
13 changes: 13 additions & 0 deletions src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::{Error, Result};

/// Fetch an opentelemetry::trace::TraceId as hex through the full tracing stack
pub fn get_trace_id() -> String {
use opentelemetry::trace::TraceContextExt; // opentelemetry::Context -> opentelemetry::trace::Span
use tracing_opentelemetry::OpenTelemetrySpanExt; // tracing::Span to opentelemetry::Context
tracing::Span::current()
.context()
.span()
.span_context()
.trace_id()
.to_hex()
}
5 changes: 4 additions & 1 deletion yaml/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ spec:
serviceAccountName: foo-controller
containers:
- name: foo-controller
image: "clux/controller:latest"
image: "clux/controller:demo-otel"
imagePullPolicy: Always
resources:
limits:
Expand All @@ -91,6 +91,9 @@ spec:
- name: http
containerPort: 8080
protocol: TCP
env:
- name: OPENTELEMETRY_ENDPOINT_URL
value: "grafana-agent-traces.monitoring.svc.cluster.local:55680"
readinessProbe:
httpGet:
path: /health
Expand Down