Skip to content

Commit

Permalink
support web client and server's open telemetry tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
4t145 committed Jul 2, 2024
1 parent e402c42 commit 9c06c8e
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 3 deletions.
5 changes: 5 additions & 0 deletions tardis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ tracing = [
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
"opentelemetry-http",
"poem?/opentelemetry"
]
tokio-console = ["console-subscriber"]
tracing-appender = ["dep:tracing-appender"]
web-server-grpc = ["web-server", "dep:poem-grpc"]
cluster = ["web-server", "ws-client", "cache"]
build-info = ["git-version"]
opentelemetry-http = ["dep:opentelemetry-http"]

[dependencies]
# Basic
Expand Down Expand Up @@ -240,6 +243,8 @@ testcontainers-modules = { version = "0.3", features = [

# Debug
git-version = { version = "0.3.9", optional = true }
opentelemetry-http = { version = "0.12.0", features = ["tokio"], optional = true }
http = "1.1.0"

[dev-dependencies]
# Common
Expand Down
36 changes: 36 additions & 0 deletions tardis/src/basic/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,39 @@ impl TardisTracing<LogConfig> {
headers
}
}

#[cfg(feature = "tracing")]
pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);

#[cfg(feature = "tracing")]

impl<'a> opentelemetry::propagation::Injector for HeaderInjector<'a> {
/// Set a key and value in the HeaderMap. Does nothing if the key or value are not valid inputs.
fn set(&mut self, key: &str, value: String) {
if let Ok(name) = http::header::HeaderName::from_bytes(key.as_bytes()) {
if let Ok(val) = http::header::HeaderValue::from_str(&value) {
self.0.insert(name, val);
}
}
}
}

/// Helper for extracting headers from HTTP Requests. This is used for OpenTelemetry context
/// propagation over HTTP.
/// See [this](https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/tracing-http-propagator/README.md)
/// for example usage.
#[cfg(feature = "tracing")]
pub struct HeaderExtractor<'a>(pub &'a http::HeaderMap);

#[cfg(feature = "tracing")]
impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> {
/// Get a value for a key from the HeaderMap. If the value is not valid ASCII, returns None.
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|value| value.to_str().ok())
}

/// Collect all the keys from the HeaderMap.
fn keys(&self) -> Vec<&str> {
self.0.keys().map(|value| value.as_str()).collect::<Vec<_>>()
}
}
4 changes: 4 additions & 0 deletions tardis/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub mod uniform_error_mw;
#[cfg(feature = "web-client")]
#[cfg_attr(docsrs, doc(cfg(feature = "web-client")))]
pub mod web_client;

// #[cfg(feature = "web-client")]
// #[cfg_attr(docsrs, doc(cfg(feature = "web-client")))]
// pub mod web_client_v2;
#[cfg(feature = "web-server")]
#[cfg_attr(docsrs, doc(cfg(feature = "web-server")))]
pub mod web_resp;
Expand Down
11 changes: 8 additions & 3 deletions tardis/src/web/web_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ impl TardisWebClient {
let (code, headers, response) = self.request(Method::PATCH, url, headers, Json(body)).await?;
self.to_json::<T>(code, headers, response).await
}

pub async fn request<K, V>(
&self,
method: Method,
Expand All @@ -289,8 +288,14 @@ impl TardisWebClient {
for (key, value) in headers {
result = result.header(key.into(), value.into());
}
result = body.apply_on(result);
let response = result.send().await?;
let request = body.apply_on(result).build()?;
#[cfg(feature = "tracing")]
{
use opentelemetry::{global, Context};
let ctx = Context::current();
global::get_text_map_propagator(|propagator| propagator.inject_context(&ctx, &mut crate::basic::tracing::HeaderInjector(request.headers_mut())));
}
let response = self.client.execute(request).await?;
let code = response.status().as_u16();
let headers = response
.headers()
Expand Down
5 changes: 5 additions & 0 deletions tardis/src/web/web_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ impl TardisWebServer {
};
let route = route.boxed();
let route = route.with(middleware);
#[cfg(feature = "tracing")]
let route = {
let tracer = opentelemetry::global::tracer("");
route.with(poem::middleware::OpenTelemetryTracing::new(tracer))
};
if module_options.uniform_error || module_config.uniform_error {
self.state.lock().await.add_route(code, route.with(UniformError).with(AddClusterIdHeader).with(cors), data);
} else {
Expand Down

0 comments on commit 9c06c8e

Please sign in to comment.