Skip to content

Commit

Permalink
opentelemetry: add support for thread names/ids (#2134)
Browse files Browse the repository at this point in the history
OpenTelemetry has [semantic conventions][1] for reporting thread names
and IDs on spans. This branch adds support for recording thread names
and IDs according to these conventions.

[1]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#source-code-attributes

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Jun 7, 2022
1 parent 3eb3a1e commit 86e0bf1
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 82 deletions.
1 change: 1 addition & 0 deletions tracing-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing = { path = "../tracing", version = "0.1", default-features = false, feat
tracing-core = { path = "../tracing-core", version = "0.1" }
tracing-subscriber = { path = "../tracing-subscriber", version = "0.3", default-features = false, features = ["registry", "std"] }
tracing-log = { path = "../tracing-log", version = "0.1", default-features = false, optional = true }
once_cell = "1"

[dev-dependencies]
async-trait = "0.1"
Expand Down
231 changes: 149 additions & 82 deletions tracing-opentelemetry/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{OtelData, PreSampledTracer};
use once_cell::unsync;
use opentelemetry::{
trace::{self as otel, noop, TraceContextExt},
Context as OtelContext, Key, KeyValue, Value,
Expand All @@ -7,6 +8,7 @@ use std::any::TypeId;
use std::borrow::Cow;
use std::fmt;
use std::marker;
use std::thread;
use std::time::{Instant, SystemTime};
use tracing_core::span::{self, Attributes, Id, Record};
use tracing_core::{field, Event, Subscriber};
Expand All @@ -30,6 +32,7 @@ pub struct OpenTelemetryLayer<S, T> {
tracer: T,
location: bool,
tracked_inactivity: bool,
with_threads: bool,
get_context: WithContext,
_registry: marker::PhantomData<S>,
}
Expand Down Expand Up @@ -314,6 +317,7 @@ where
tracer,
location: true,
tracked_inactivity: true,
with_threads: true,
get_context: WithContext(Self::get_context),
_registry: marker::PhantomData,
}
Expand Down Expand Up @@ -353,23 +357,34 @@ where
tracer,
location: self.location,
tracked_inactivity: self.tracked_inactivity,
with_threads: self.with_threads,
get_context: WithContext(OpenTelemetryLayer::<S, Tracer>::get_context),
_registry: self._registry,
}
}

/// Sets whether or not span and event metadata should include detailed
/// location information, such as the file, module and line number.
/// Sets whether or not span and event metadata should include OpenTelemetry
/// attributes with location information, such as the file, module and line number.
///
/// These attributes follow the [OpenTelemetry semantic conventions for
/// source locations][conv].
///
/// By default, locations are enabled.
///
/// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#source-code-attributes
pub fn with_location(self, location: bool) -> Self {
Self { location, ..self }
}

/// Sets whether or not event span's metadata should include detailed location
/// information, such as the file, module and line number.
/// Sets whether or not span and event metadata should include OpenTelemetry
/// attributes with location information, such as the file, module and line number.
///
/// These attributes follow the [OpenTelemetry semantic conventions for
/// source locations][conv].
///
/// By default, locations are enabled.
///
/// By default, event locations are enabled.
/// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#source-code-attributes
#[deprecated(
since = "0.17.3",
note = "renamed to `OpenTelemetrySubscriber::with_location`"
Expand All @@ -391,6 +406,20 @@ where
}
}

/// Sets whether or not spans record additional attributes for the thread
/// name and thread ID of the thread they were created on, following the
/// [OpenTelemetry semantic conventions for threads][conv].
///
/// By default, thread attributes are enabled.
///
/// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#general-thread-attributes
pub fn with_threads(self, threads: bool) -> Self {
Self {
with_threads: threads,
..self
}
}

/// Retrieve the parent OpenTelemetry [`Context`] from the current tracing
/// [`span`] through the [`Registry`]. This [`Context`] links spans to their
/// parent for proper hierarchical visualization.
Expand Down Expand Up @@ -443,6 +472,30 @@ where
f(builder, &layer.tracer);
}
}

fn extra_span_attrs(&self) -> usize {
let mut extra_attrs = 0;
if self.location {
extra_attrs += 3;
}
if self.with_threads {
extra_attrs += 2;
}
extra_attrs
}
}

thread_local! {
static THREAD_ID: unsync::Lazy<u64> = unsync::Lazy::new(|| {
// OpenTelemetry's semantic conventions require the thread ID to be
// recorded as an integer, but `std::thread::ThreadId` does not expose
// the integer value on stable, so we have to convert it to a `usize` by
// parsing it. Since this requires allocating a `String`, store it in a
// thread local so we only have to do this once.
// TODO(eliza): once `std::thread::ThreadId::as_u64` is stabilized
// (https://github.com/rust-lang/rust/issues/67939), just use that.
thread_id_integer(thread::current().id())
});
}

impl<S, T> Layer<S> for OpenTelemetryLayer<S, T>
Expand Down Expand Up @@ -475,9 +528,9 @@ where
builder.trace_id = Some(self.tracer.new_trace_id());
}

let builder_attrs = builder
.attributes
.get_or_insert(Vec::with_capacity(attrs.fields().len() + 3));
let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity(
attrs.fields().len() + self.extra_span_attrs(),
));

if self.location {
let meta = attrs.metadata();
Expand All @@ -495,6 +548,17 @@ where
}
}

if self.with_threads {
THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64)));
if let Some(name) = std::thread::current().name() {
// TODO(eliza): it's a bummer that we have to allocate here, but
// we can't easily get the string as a `static`. it would be
// nice if `opentelemetry` could also take `Arc<str>`s as
// `String` values...
builder_attrs.push(KeyValue::new("thread.name", name.to_owned()));
}
}

attrs.record(&mut SpanAttributeVisitor(&mut builder));
extensions.insert(OtelData { builder, parent_cx });
}
Expand Down Expand Up @@ -718,15 +782,27 @@ impl Timings {
}
}

fn thread_id_integer(id: thread::ThreadId) -> u64 {
let thread_id = format!("{:?}", id);
thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(')')
.parse::<u64>()
.expect("thread ID should parse as an integer")
}

#[cfg(test)]
mod tests {
use super::*;
use crate::OtelData;
use opentelemetry::trace::{noop, SpanKind, TraceFlags};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use std::{
borrow::Cow,
collections::HashMap,
sync::{Arc, Mutex},
thread,
time::SystemTime,
};
use tracing_subscriber::prelude::*;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -770,6 +846,14 @@ mod tests {
}
}

impl TestTracer {
fn with_data<T>(&self, f: impl FnOnce(&OtelData) -> T) -> T {
let lock = self.0.lock().unwrap();
let data = lock.as_ref().expect("no span data has been recorded yet");
f(data)
}
}

#[derive(Debug, Clone)]
struct TestSpan(otel::SpanContext);
impl otel::Span for TestSpan {
Expand Down Expand Up @@ -820,15 +904,7 @@ mod tests {
tracing::debug_span!("request", otel.kind = %SpanKind::Server);
});

let recorded_kind = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.span_kind
.clone();
let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone());
assert_eq!(recorded_kind, Some(otel::SpanKind::Server))
}

Expand All @@ -840,14 +916,8 @@ mod tests {
tracing::subscriber::with_default(subscriber, || {
tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok);
});
let recorded_status_code = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.status_code;

let recorded_status_code = tracer.with_data(|data| data.builder.status_code);
assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok))
}

Expand All @@ -862,16 +932,7 @@ mod tests {
tracing::debug_span!("request", otel.status_message = message);
});

let recorded_status_message = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.status_message
.clone();

let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone());
assert_eq!(recorded_status_message, Some(message.into()))
}

Expand All @@ -893,16 +954,8 @@ mod tests {
tracing::debug_span!("request", otel.kind = %SpanKind::Server);
});

let recorded_trace_id = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.parent_cx
.span()
.span_context()
.trace_id();
let recorded_trace_id =
tracer.with_data(|data| data.parent_cx.span().span_context().trace_id());
assert_eq!(recorded_trace_id, trace_id)
}

Expand All @@ -919,17 +972,7 @@ mod tests {
tracing::debug_span!("request");
});

let attributes = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.attributes
.as_ref()
.unwrap()
.clone();
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
Expand Down Expand Up @@ -1024,17 +1067,7 @@ mod tests {
tracing::debug_span!("request");
});

let attributes = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.attributes
.as_ref()
.unwrap()
.clone();
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
Expand All @@ -1054,17 +1087,7 @@ mod tests {
tracing::debug_span!("request");
});

let attributes = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.attributes
.as_ref()
.unwrap()
.clone();
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
Expand All @@ -1073,4 +1096,48 @@ mod tests {
assert!(!keys.contains(&"code.namespace"));
assert!(!keys.contains(&"code.lineno"));
}

#[test]
fn includes_thread() {
let thread = thread::current();
let expected_name = thread
.name()
.map(|name| Value::String(Cow::Owned(name.to_owned())));
let expected_id = Value::I64(thread_id_integer(thread.id()) as i64);

let tracer = TestTracer(Arc::new(Mutex::new(None)));
let subscriber = tracing_subscriber::registry()
.with(layer().with_tracer(tracer.clone()).with_threads(true));

tracing::subscriber::with_default(subscriber, || {
tracing::debug_span!("request");
});

let attributes = tracer
.with_data(|data| data.builder.attributes.as_ref().unwrap().clone())
.drain(..)
.map(|keyval| (keyval.key.as_str().to_string(), keyval.value))
.collect::<HashMap<_, _>>();
assert_eq!(attributes.get("thread.name"), expected_name.as_ref());
assert_eq!(attributes.get("thread.id"), Some(&expected_id));
}

#[test]
fn excludes_thread() {
let tracer = TestTracer(Arc::new(Mutex::new(None)));
let subscriber = tracing_subscriber::registry()
.with(layer().with_tracer(tracer.clone()).with_threads(false));

tracing::subscriber::with_default(subscriber, || {
tracing::debug_span!("request");
});

let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
.collect::<Vec<&str>>();
assert!(!keys.contains(&"thread.name"));
assert!(!keys.contains(&"thread.id"));
}
}

0 comments on commit 86e0bf1

Please sign in to comment.