From 644b6bb1627a042a0a20113a6e74f6b8be3ddcb3 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 25 May 2022 12:18:39 -0700 Subject: [PATCH] opentelemetry: add support for thread names/ids (#2134) OpenTelemetry has [semantic conventions][1] for reporting thread names and IDs on spans. This branch adds support for recording thread names and IDs according to these conventions. [1]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#source-code-attributes Signed-off-by: Eliza Weisman --- tracing-opentelemetry/Cargo.toml | 1 + tracing-opentelemetry/src/subscriber.rs | 230 +++++++++++++++--------- 2 files changed, 150 insertions(+), 81 deletions(-) diff --git a/tracing-opentelemetry/Cargo.toml b/tracing-opentelemetry/Cargo.toml index 9f3af470e3..f9eae98d36 100644 --- a/tracing-opentelemetry/Cargo.toml +++ b/tracing-opentelemetry/Cargo.toml @@ -28,6 +28,7 @@ tracing = { path = "../tracing", version = "0.2", default-features = false, feat tracing-core = { path = "../tracing-core", version = "0.2" } tracing-subscriber = { path = "../tracing-subscriber", version = "0.3", default-features = false, features = ["registry", "std"] } tracing-log = { path = "../tracing-log", version = "0.2", default-features = false, optional = true } +once_cell = "1" [dev-dependencies] async-trait = "0.1" diff --git a/tracing-opentelemetry/src/subscriber.rs b/tracing-opentelemetry/src/subscriber.rs index 0ec254aee3..9a781aa9bf 100644 --- a/tracing-opentelemetry/src/subscriber.rs +++ b/tracing-opentelemetry/src/subscriber.rs @@ -1,10 +1,12 @@ use crate::{OtelData, PreSampledTracer}; +use once_cell::unsync; use opentelemetry::{ trace::{self as otel, noop, TraceContextExt}, Context as OtelContext, Key, KeyValue, Value, }; use std::fmt; use std::marker; +use std::thread; use std::time::{Instant, SystemTime}; use std::{any::TypeId, ptr::NonNull}; use tracing_core::span::{self, Attributes, Id, Record}; @@ -29,6 +31,7 @@ pub struct OpenTelemetrySubscriber { tracer: T, location: bool, tracked_inactivity: bool, + with_threads: bool, get_context: WithContext, _registry: marker::PhantomData, } @@ -292,6 +295,7 @@ where tracer, location: true, tracked_inactivity: true, + with_threads: true, get_context: WithContext(Self::get_context), _registry: marker::PhantomData, } @@ -331,23 +335,34 @@ where tracer, location: self.location, tracked_inactivity: self.tracked_inactivity, + with_threads: self.with_threads, get_context: WithContext(OpenTelemetrySubscriber::::get_context), _registry: self._registry, } } - /// Sets whether or not span and event metadata should include detailed - /// location information, such as the file, module and line number. + /// Sets whether or not span and event metadata should include OpenTelemetry + /// attributes with location information, such as the file, module and line number. + /// + /// These attributes follow the [OpenTelemetry semantic conventions for + /// source locations][conv]. /// /// By default, locations are enabled. + /// + /// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#source-code-attributes pub fn with_location(self, location: bool) -> Self { Self { location, ..self } } - /// Sets whether or not event span's metadata should include detailed location - /// information, such as the file, module and line number. + /// Sets whether or not span and event metadata should include OpenTelemetry + /// attributes with location information, such as the file, module and line number. /// - /// By default, event locations are enabled. + /// These attributes follow the [OpenTelemetry semantic conventions for + /// source locations][conv]. + /// + /// By default, locations are enabled. + /// + /// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#source-code-attributes #[deprecated( since = "0.17.3", note = "renamed to `OpenTelemetrySubscriber::with_location`" @@ -369,6 +384,20 @@ where } } + /// Sets whether or not spans record additional attributes for the thread + /// name and thread ID of the thread they were created on, following the + /// [OpenTelemetry semantic conventions for threads][conv]. + /// + /// By default, thread attributes are enabled. + /// + /// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#general-thread-attributes + pub fn with_threads(self, threads: bool) -> Self { + Self { + with_threads: threads, + ..self + } + } + /// Retrieve the parent OpenTelemetry [`Context`] from the current tracing /// [`span`] through the [`Registry`]. This [`Context`] links spans to their /// parent for proper hierarchical visualization. @@ -421,6 +450,30 @@ where f(builder, &subscriber.tracer); } } + + fn extra_span_attrs(&self) -> usize { + let mut extra_attrs = 0; + if self.location { + extra_attrs += 3; + } + if self.with_threads { + extra_attrs += 2; + } + extra_attrs + } +} + +thread_local! { + static THREAD_ID: unsync::Lazy = unsync::Lazy::new(|| { + // OpenTelemetry's semantic conventions require the thread ID to be + // recorded as an integer, but `std::thread::ThreadId` does not expose + // the integer value on stable, so we have to convert it to a `usize` by + // parsing it. Since this requires allocating a `String`, store it in a + // thread local so we only have to do this once. + // TODO(eliza): once `std::thread::ThreadId::as_u64` is stabilized + // (https://github.com/rust-lang/rust/issues/67939), just use that. + thread_id_integer(thread::current().id()) + }); } impl Subscribe for OpenTelemetrySubscriber @@ -453,9 +506,9 @@ where builder.trace_id = Some(self.tracer.new_trace_id()); } - let builder_attrs = builder - .attributes - .get_or_insert(Vec::with_capacity(attrs.fields().len() + 3)); + let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity( + attrs.fields().len() + self.extra_span_attrs(), + )); if self.location { let meta = attrs.metadata(); @@ -473,6 +526,17 @@ where } } + if self.with_threads { + THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64))); + if let Some(name) = std::thread::current().name() { + // TODO(eliza): it's a bummer that we have to allocate here, but + // we can't easily get the string as a `static`. it would be + // nice if `opentelemetry` could also take `Arc`s as + // `String` values... + builder_attrs.push(KeyValue::new("thread.name", name.to_owned())); + } + } + attrs.record(&mut SpanAttributeVisitor(&mut builder)); extensions.insert(OtelData { builder, parent_cx }); } @@ -696,14 +760,27 @@ impl Timings { } } +fn thread_id_integer(id: thread::ThreadId) -> u64 { + let thread_id = format!("{:?}", id); + thread_id + .trim_start_matches("ThreadId(") + .trim_end_matches(')') + .parse::() + .expect("thread ID should parse as an integer") +} + #[cfg(test)] mod tests { use super::*; use crate::OtelData; use opentelemetry::trace::{noop, SpanKind, TraceFlags}; - use std::borrow::Cow; - use std::sync::{Arc, Mutex}; - use std::time::SystemTime; + use std::{ + borrow::Cow, + collections::HashMap, + sync::{Arc, Mutex}, + thread, + time::SystemTime, + }; use tracing_subscriber::prelude::*; #[derive(Debug, Clone)] @@ -747,6 +824,14 @@ mod tests { } } + impl TestTracer { + fn with_data(&self, f: impl FnOnce(&OtelData) -> T) -> T { + let lock = self.0.lock().unwrap(); + let data = lock.as_ref().expect("no span data has been recorded yet"); + f(data) + } + } + #[derive(Debug, Clone)] struct TestSpan(otel::SpanContext); impl otel::Span for TestSpan { @@ -799,15 +884,7 @@ mod tests { tracing::debug_span!("request", otel.kind = %SpanKind::Server); }); - let recorded_kind = tracer - .0 - .lock() - .unwrap() - .as_ref() - .unwrap() - .builder - .span_kind - .clone(); + let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone()); assert_eq!(recorded_kind, Some(otel::SpanKind::Server)) } @@ -820,14 +897,8 @@ mod tests { tracing::collect::with_default(subscriber, || { tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok); }); - let recorded_status_code = tracer - .0 - .lock() - .unwrap() - .as_ref() - .unwrap() - .builder - .status_code; + + let recorded_status_code = tracer.with_data(|data| data.builder.status_code); assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok)) } @@ -843,16 +914,7 @@ mod tests { tracing::debug_span!("request", otel.status_message = message); }); - let recorded_status_message = tracer - .0 - .lock() - .unwrap() - .as_ref() - .unwrap() - .builder - .status_message - .clone(); - + let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone()); assert_eq!(recorded_status_message, Some(message.into())) } @@ -875,16 +937,8 @@ mod tests { tracing::debug_span!("request", otel.kind = %SpanKind::Server); }); - let recorded_trace_id = tracer - .0 - .lock() - .unwrap() - .as_ref() - .unwrap() - .parent_cx - .span() - .span_context() - .trace_id(); + let recorded_trace_id = + tracer.with_data(|data| data.parent_cx.span().span_context().trace_id()); assert_eq!(recorded_trace_id, trace_id) } @@ -901,17 +955,7 @@ mod tests { tracing::debug_span!("request"); }); - let attributes = tracer - .0 - .lock() - .unwrap() - .as_ref() - .unwrap() - .builder - .attributes - .as_ref() - .unwrap() - .clone(); + let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() .map(|attr| attr.key.as_str()) @@ -930,17 +974,7 @@ mod tests { tracing::debug_span!("request"); }); - let attributes = tracer - .0 - .lock() - .unwrap() - .as_ref() - .unwrap() - .builder - .attributes - .as_ref() - .unwrap() - .clone(); + let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() .map(|attr| attr.key.as_str()) @@ -963,17 +997,7 @@ mod tests { tracing::debug_span!("request"); }); - let attributes = tracer - .0 - .lock() - .unwrap() - .as_ref() - .unwrap() - .builder - .attributes - .as_ref() - .unwrap() - .clone(); + let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() .map(|attr| attr.key.as_str()) @@ -982,4 +1006,48 @@ mod tests { assert!(!keys.contains(&"code.namespace")); assert!(!keys.contains(&"code.lineno")); } + + #[test] + fn includes_thread() { + let thread = thread::current(); + let expected_name = thread + .name() + .map(|name| Value::String(Cow::Owned(name.to_owned()))); + let expected_id = Value::I64(thread_id_integer(thread.id()) as i64); + + let tracer = TestTracer(Arc::new(Mutex::new(None))); + let subscriber = tracing_subscriber::registry() + .with(subscriber().with_tracer(tracer.clone()).with_threads(true)); + + tracing::collect::with_default(subscriber, || { + tracing::debug_span!("request"); + }); + + let attributes = tracer + .with_data(|data| data.builder.attributes.as_ref().unwrap().clone()) + .drain(..) + .map(|keyval| (keyval.key.as_str().to_string(), keyval.value)) + .collect::>(); + assert_eq!(attributes.get("thread.name"), expected_name.as_ref()); + assert_eq!(attributes.get("thread.id"), Some(&expected_id)); + } + + #[test] + fn excludes_thread() { + let tracer = TestTracer(Arc::new(Mutex::new(None))); + let subscriber = tracing_subscriber::registry() + .with(subscriber().with_tracer(tracer.clone()).with_threads(false)); + + tracing::collect::with_default(subscriber, || { + tracing::debug_span!("request"); + }); + + let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); + let keys = attributes + .iter() + .map(|attr| attr.key.as_str()) + .collect::>(); + assert!(!keys.contains(&"thread.name")); + assert!(!keys.contains(&"thread.id")); + } }