Skip to content

Commit

Permalink
opentelemetry-http: move HTTP-related code into a separate crate
Browse files Browse the repository at this point in the history
  • Loading branch information
djc committed Jan 5, 2021
1 parent 89424cd commit 8fd5e99
Show file tree
Hide file tree
Showing 22 changed files with 268 additions and 153 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
override: true
- name: Run tests
run: cargo --version &&
cargo test --verbose --manifest-path=opentelemetry/Cargo.toml --features trace,metrics,serialize,tokio,serde,http,tonic,reqwest,testing &&
cargo test --verbose --manifest-path=opentelemetry/Cargo.toml --features trace,metrics,serialize,tokio,serde,tonic,testing &&
cargo test --manifest-path=opentelemetry-jaeger/Cargo.toml &&
cargo test --manifest-path=opentelemetry-zipkin/Cargo.toml
meta:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"opentelemetry",
"opentelemetry-contrib",
"opentelemetry-http",
"opentelemetry-jaeger",
"opentelemetry-otlp",
"opentelemetry-prometheus",
Expand Down
3 changes: 2 additions & 1 deletion examples/aws-xray/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ path = "src/client.rs"
[dependencies]
hyper = "0.13"
tokio = { version = "0.2", features = ["full"] }
opentelemetry = { path = "../../opentelemetry", features = ["http"] }
opentelemetry = { path = "../../opentelemetry" }
opentelemetry-contrib = { path = "../../opentelemetry-contrib", features = ["aws-xray"] }
opentelemetry-http = { path = "../../opentelemetry-http" }
3 changes: 2 additions & 1 deletion examples/aws-xray/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use opentelemetry::{
Context, KeyValue,
};
use opentelemetry_contrib::trace::propagator::XrayPropagator;
use opentelemetry_http::HeaderInjector;

fn init_tracer() -> (sdktrace::Tracer, stdout::Uninstall) {
global::set_text_map_propagator(XrayPropagator::new());
Expand Down Expand Up @@ -34,7 +35,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sy
let mut req = hyper::Request::builder().uri("http://127.0.0.1:3000");

global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, req.headers_mut().unwrap());
propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()));

println!("Headers: {:?}", req.headers_ref());
});
Expand Down
6 changes: 4 additions & 2 deletions examples/aws-xray/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ use opentelemetry::{
trace::{Span, Tracer},
};
use opentelemetry_contrib::trace::propagator::XrayPropagator;
use opentelemetry_http::HeaderExtractor;
use std::{convert::Infallible, net::SocketAddr};

async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
let parent_context =
global::get_text_map_propagator(|propagator| propagator.extract(req.headers()));
let parent_context = global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(req.headers()))
});

let x_amzn_trace_id = req
.headers()
Expand Down
3 changes: 2 additions & 1 deletion examples/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ path = "src/client.rs"
[dependencies]
hyper = "0.13"
tokio = { version = "0.2", features = ["full"] }
opentelemetry = { path = "../../opentelemetry", features = ["http"] }
opentelemetry = { path = "../../opentelemetry" }
opentelemetry-http = { path = "../../opentelemetry-http" }
3 changes: 2 additions & 1 deletion examples/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use opentelemetry::{
trace::{TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_http::HeaderInjector;

fn init_tracer() -> (impl Tracer, stdout::Uninstall) {
global::set_text_map_propagator(TraceContextPropagator::new());
Expand All @@ -33,7 +34,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sy

let mut req = hyper::Request::builder().uri("http://127.0.0.1:3000");
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, req.headers_mut().unwrap())
propagator.inject_context(&cx, &mut HeaderInjector(&mut req.headers_mut().unwrap()))
});
let res = client.request(req.body(Body::from("Hallo!"))?).await?;

Expand Down
5 changes: 4 additions & 1 deletion examples/http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ use opentelemetry::{
},
trace::{Span, Tracer},
};
use opentelemetry_http::HeaderExtractor;
use std::{convert::Infallible, net::SocketAddr};

async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
let parent_cx = global::get_text_map_propagator(|propagator| propagator.extract(req.headers()));
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(req.headers()))
});
let span = global::tracer("example/server").start_with_context("hello", parent_cx);
span.add_event("handling this...".to_string(), Vec::new());

Expand Down
13 changes: 7 additions & 6 deletions opentelemetry-contrib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ rustdoc-args = ["--cfg", "docsrs"]
default = []
base64_format = ["base64", "binary_propagator"]
binary_propagator = []
datadog = ["indexmap", "rmp", "async-trait", "thiserror"]
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry/reqwest"]
reqwest-client = ["reqwest", "opentelemetry/reqwest"]
surf-client = ["surf", "opentelemetry/surf"]
datadog = ["indexmap", "rmp", "async-trait", "thiserror", "opentelemetry-http"]
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"]
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
surf-client = ["surf", "opentelemetry-http/surf"]
aws-xray = []

[dependencies]
async-trait = { version = "0.1", optional = true }
indexmap = { version = "1.6", optional = true }
opentelemetry = { version = "0.11", path = "../opentelemetry", features = ["trace", "http"] }
opentelemetry = { version = "0.11", path = "../opentelemetry", features = ["trace"] }
opentelemetry-http = { version = "0.1", path = "../opentelemetry-http", optional = true }
rmp = { version = "0.8", optional = true }
lazy_static = "1.4"
reqwest = { version = "0.10", optional = true }
Expand All @@ -43,4 +44,4 @@ thiserror = { version = "1.0", optional = true }
[dev-dependencies]
base64 = "0.13"
isahc = "0.9"
opentelemetry = { path = "../opentelemetry", features = ["trace", "http", "testing"] }
opentelemetry = { path = "../opentelemetry", features = ["trace", "testing"] }
5 changes: 3 additions & 2 deletions opentelemetry-contrib/src/trace/exporter/datadog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
//! use opentelemetry::{KeyValue, trace::Tracer};
//! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource};
//! use opentelemetry::sdk::export::trace::ExportResult;
//! use opentelemetry::sdk::export::trace::HttpClient;
//! use opentelemetry_contrib::trace::exporter::datadog::{new_pipeline, ApiVersion};
//! use opentelemetry_http::HttpClient;
//! use async_trait::async_trait;
//! use opentelemetry_contrib::trace::exporter::datadog::Error;
//!
Expand Down Expand Up @@ -128,9 +128,10 @@ pub use model::Error;
use async_trait::async_trait;
use http::{Method, Request, Uri};
use opentelemetry::sdk::export::trace;
use opentelemetry::sdk::export::trace::{HttpClient, SpanData};
use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk, trace::TracerProvider};
use opentelemetry_http::HttpClient;

/// Default Datadog collector endpoint
const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126";
Expand Down
17 changes: 17 additions & 0 deletions opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "opentelemetry-http"
version = "0.1.0"
authors = ["OpenTelemetry Authors <cncf-opentelemetry-contributors@lists.cncf.io>"]
description = "Helper implementations for exchange of traces and metrics over HTTP"
homepage = "https://github.com/open-telemetry/opentelemetry-rust"
repository = "https://github.com/open-telemetry/opentelemetry-rust"
keywords = ["opentelemetry", "tracing", "metrics"]
license = "Apache-2.0"
edition = "2018"

[dependencies]
async-trait = "0.1.42"
http = "0.2.2"
opentelemetry = { version = "0.11.2", path = "../opentelemetry", features = ["trace"] }
reqwest = { version = "0.10", default-features = false, features = ["blocking"], optional = true }
surf = { version = "2.0", default-features = false, optional = true }
192 changes: 192 additions & 0 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#[cfg(feature = "reqwest")]
use std::convert::TryInto;
#[cfg(any(feature = "surf", feature = "reqwest"))]
use std::fmt::{Debug, Display, Formatter};

use async_trait::async_trait;
use http::Request;
use opentelemetry::propagation::{Extractor, Injector};
#[cfg(any(feature = "surf", feature = "reqwest"))]
use opentelemetry::sdk::export::ExportError;
use opentelemetry::trace::TraceError;

pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);

impl<'a> Injector for HeaderInjector<'a> {
/// Set a key and value in the HeaderMap. Does nothing if the key or value are not valid inputs.
fn set(&mut self, key: &str, value: String) {
if let Ok(name) = http::header::HeaderName::from_bytes(key.as_bytes()) {
if let Ok(val) = http::header::HeaderValue::from_str(&value) {
self.0.insert(name, val);
}
}
}
}

pub struct HeaderExtractor<'a>(pub &'a http::HeaderMap);

impl<'a> Extractor for HeaderExtractor<'a> {
/// Get a value for a key from the HeaderMap. If the value is not valid ASCII, returns None.
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|value| value.to_str().ok())
}

/// Collect all the keys from the HeaderMap.
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|value| value.as_str())
.collect::<Vec<_>>()
}
}

/// A minimal interface necessary for export spans over HTTP.
///
/// Users sometime choose http clients that relay on certain runtime. This trait
/// allows users to bring their choice of http clients.
#[async_trait]
pub trait HttpClient: Debug + Send + Sync {
/// Send a batch of spans to collectors
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError>;
}

#[cfg(feature = "reqwest")]
#[derive(Debug)]
struct ReqwestError(reqwest::Error);

#[cfg(feature = "reqwest")]
impl Display for ReqwestError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
}
}

#[cfg(feature = "reqwest")]
impl std::error::Error for ReqwestError {}

#[cfg(feature = "reqwest")]
impl From<reqwest::Error> for ReqwestError {
fn from(err: reqwest::Error) -> Self {
ReqwestError(err)
}
}

#[cfg(feature = "reqwest")]
impl ExportError for ReqwestError {
fn exporter_name(&self) -> &'static str {
"reqwest"
}
}

#[cfg(feature = "surf")]
impl ExportError for SurfError {
fn exporter_name(&self) -> &'static str {
"surf"
}
}

#[cfg(feature = "surf")]
#[derive(Debug)]
struct SurfError(surf::Error);

#[cfg(feature = "surf")]
impl Display for SurfError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
}
}

#[cfg(feature = "surf")]
impl std::error::Error for SurfError {}

#[cfg(feature = "surf")]
impl From<surf::Error> for SurfError {
fn from(err: surf::Error) -> Self {
SurfError(err)
}
}

#[cfg(feature = "reqwest")]
#[async_trait]
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let request = request.try_into().map_err(|e| ReqwestError::from(e))?;
let _ = self
.execute(request)
.await
.and_then(|rsp| rsp.error_for_status())
.map_err(|e| ReqwestError::from(e))?;
Ok(())
}
}

#[cfg(feature = "reqwest")]
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let _ = request
.try_into()
.and_then(|req| self.execute(req))
.and_then(|rsp| rsp.error_for_status())
.map_err(|e| ReqwestError::from(e))?;
Ok(())
}
}

#[cfg(feature = "surf")]
#[async_trait]
impl HttpClient for surf::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let (parts, body) = request.into_parts();
let uri = parts
.uri
.to_string()
.parse()
.map_err(|_err: surf::http::url::ParseError| TraceError::from("error parse url"))?;

let req = surf::Request::builder(surf::http::Method::Post, uri)
.content_type("application/json")
.body(body);
let result = self.send(req).await.map_err::<SurfError, _>(Into::into)?;

if result.status().is_success() {
Ok(())
} else {
Err(SurfError(surf::Error::from_str(
result.status(),
result.status().canonical_reason(),
))
.into())
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn http_headers_get() {
let mut carrier = http::HeaderMap::new();
HeaderInjector(&mut carrier).set("headerName", "value".to_string());

assert_eq!(
HeaderExtractor(&carrier).get("HEADERNAME"),
Some("value"),
"case insensitive extraction"
)
}

#[test]
fn http_headers_keys() {
let mut carrier = http::HeaderMap::new();
HeaderInjector(&mut carrier).set("headerName1", "value1".to_string());
HeaderInjector(&mut carrier).set("headerName2", "value2".to_string());

let extractor = HeaderExtractor(&carrier);
let got = extractor.keys();
assert_eq!(got.len(), 2);
assert!(got.contains(&"headername1"));
assert!(got.contains(&"headername2"));
}
}
9 changes: 5 additions & 4 deletions opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ http = { version = "0.2", optional = true }
isahc = { version = "0.9", default-features = false, optional = true }
js-sys = { version = "0.3", optional = true }
opentelemetry = { version = "0.11", default-features = false, features = ["trace"], path = "../opentelemetry" }
opentelemetry-http = { version = "0.1", path = "../opentelemetry-http", optional = true }
pin-project = { version = "1.0", optional = true }
thrift = "0.13"
tokio = { version = "0.2", features = ["udp", "sync"], optional = true }
Expand Down Expand Up @@ -58,11 +59,11 @@ optional = true

[features]
default = []
collector_client = ["http", "opentelemetry/http"]
collector_client = ["http", "opentelemetry-http"]
isahc_collector_client = ["isahc", "collector_client"]
reqwest_blocking_collector_client = ["reqwest/blocking", "collector_client", "headers", "opentelemetry/reqwest"]
reqwest_collector_client = ["reqwest", "collector_client", "headers", "opentelemetry/reqwest"]
surf_collector_client = ["surf", "collector_client", "opentelemetry/surf"]
reqwest_blocking_collector_client = ["reqwest/blocking", "collector_client", "headers", "opentelemetry-http/reqwest"]
reqwest_collector_client = ["reqwest", "collector_client", "headers", "opentelemetry-http/reqwest"]
surf_collector_client = ["surf", "collector_client", "opentelemetry-http/surf"]
wasm_collector_client = [
"base64",
"futures-util",
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-jaeger/src/exporter/collector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! # HTTP Jaeger Collector Client
use http::Uri;
#[cfg(feature = "collector_client")]
use opentelemetry::sdk::export::trace::HttpClient;
use opentelemetry_http::HttpClient;
use std::sync::atomic::AtomicUsize;

/// `CollectorAsyncClientHttp` implements an async version of the
Expand Down
Loading

0 comments on commit 8fd5e99

Please sign in to comment.