Skip to content

Commit

Permalink
fix(otlp): upgrade tonic to 0.5. (#597)
Browse files Browse the repository at this point in the history
* fix(otlp): upgrade tonic to 0.5.

We no longer have a universal type for both client with interceptors and the client without interceptors. So instead of using a interceptors we just add the headers for each request as needed.

* chore: remove tower as dependency.
  • Loading branch information
TommyCpp authored Jul 14, 2021
1 parent e7d05e7 commit 55d798e
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 50 deletions.
2 changes: 1 addition & 1 deletion examples/external-otlp-tonic-tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio", "metrics
opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["tonic", "tls", "tls-roots"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
tonic = "0.4.0"
tonic = "0.5.0"
url = "2.2.0"
10 changes: 6 additions & 4 deletions opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ async-trait = "0.1"
futures = "0.3"
grpcio = { version = "0.9", optional = true }
opentelemetry = { version = "0.15", default-features = false, features = ["trace"], path = "../opentelemetry" }
prost = { version = "0.7", optional = true }
prost = { version = "0.8", optional = true }
protobuf = { version = "2.18", optional = true }
thiserror = "1.0"
tonic = { version = "0.4", optional = true }
tonic = { version = "0.5", optional = true }
tokio = { version = "1.0", features = ["full"], optional = true }
opentelemetry-http = { version = "0.4", path = "../opentelemetry-http", optional = true }
reqwest = { version = "0.11", optional = true, default-features = false }
Expand All @@ -52,6 +52,8 @@ http = "0.2"
[dev-dependencies]
chrono = "0.4"
tokio-stream = { version = "0.1", features = ["net"] }
# need tokio runtime to run smoke tests.
opentelemetry = { version = "0.15", features = ["trace", "rt-tokio"], path = "../opentelemetry" }
protobuf-codegen = { version = "2.16"}
protoc-grpcio = { version = "3.0"}

Expand All @@ -75,5 +77,5 @@ reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]
surf-client = ["surf", "opentelemetry-http/surf"]

[build-dependencies]
tonic-build = { version = "0.4", optional = true }
prost-build = {version = "0.7", optional = true }
tonic-build = { version = "0.5", optional = true }
prost-build = {version = "0.8", optional = true }
38 changes: 17 additions & 21 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub struct MetricsExporter {
#[cfg(feature = "tokio")]
sender: Arc<Mutex<tokio::sync::mpsc::Sender<ExportMsg>>>,
export_kind_selector: Arc<dyn ExportKindFor + Send + Sync>,
metadata: Option<tonic::metadata::MetadataMap>,
}

impl Debug for MetricsExporter {
Expand All @@ -274,7 +275,7 @@ impl MetricsExporter {
#[cfg(feature = "tonic")]
pub fn new<T: ExportKindFor + Send + Sync + 'static>(
config: ExportConfig,
tonic_config: TonicConfig,
mut tonic_config: TonicConfig,
export_selector: T,
) -> Result<MetricsExporter> {
let endpoint =
Expand All @@ -297,25 +298,7 @@ impl MetricsExporter {
.connect_lazy()
.map_err::<crate::Error, _>(Into::into)?;

let client = match tonic_config.metadata.to_owned() {
None => MetricsServiceClient::new(channel),
Some(metadata) => {
MetricsServiceClient::with_interceptor(channel, move |mut req: Request<()>| {
for key_and_value in metadata.iter() {
match key_and_value {
KeyAndValueRef::Ascii(key, value) => {
req.metadata_mut().append(key, value.to_owned())
}
KeyAndValueRef::Binary(key, value) => {
req.metadata_mut().append_bin(key, value.to_owned())
}
};
}

Ok(req)
})
}
};
let client = MetricsServiceClient::new(channel);

let (sender, mut receiver) = tokio::sync::mpsc::channel::<ExportMsg>(2);
tokio::spawn(Box::pin(async move {
Expand All @@ -334,6 +317,7 @@ impl MetricsExporter {
Ok(MetricsExporter {
sender: Arc::new(Mutex::new(sender)),
export_kind_selector: Arc::new(export_selector),
metadata: tonic_config.metadata.take(),
})
}
}
Expand All @@ -359,7 +343,19 @@ impl Exporter for MetricsExporter {
Err(err) => Err(err),
}
})?;
let request = Request::new(sink(resource_metrics));
let mut request = Request::new(sink(resource_metrics));
if let Some(metadata) = &self.metadata {
for key_and_value in metadata.iter() {
match key_and_value {
KeyAndValueRef::Ascii(key, value) => {
request.metadata_mut().append(key, value.to_owned())
}
KeyAndValueRef::Binary(key, value) => {
request.metadata_mut().append_bin(key, value.to_owned())
}
};
}
}
self.sender
.lock()
.map(|sender| {
Expand Down
43 changes: 20 additions & 23 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,30 +352,10 @@ impl SpanExporter {
tonic_config: TonicConfig,
channel: tonic::transport::Channel,
) -> Result<Self, crate::Error> {
let client = match tonic_config.metadata.to_owned() {
None => TonicTraceServiceClient::new(channel),
Some(metadata) => {
TonicTraceServiceClient::with_interceptor(channel, move |mut req: Request<()>| {
for key_and_value in metadata.iter() {
match key_and_value {
KeyAndValueRef::Ascii(key, value) => {
req.metadata_mut().append(key, value.to_owned())
}
KeyAndValueRef::Binary(key, value) => {
req.metadata_mut().append_bin(key, value.to_owned())
}
};
}

Ok(req)
})
}
};

Ok(SpanExporter::Tonic {
timeout: config.timeout,
metadata: tonic_config.metadata,
trace_exporter: client,
trace_exporter: TonicTraceServiceClient::new(channel),
})
}

Expand Down Expand Up @@ -466,11 +446,28 @@ impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter {
}

#[cfg(feature = "tonic")]
SpanExporter::Tonic { trace_exporter, .. } => {
let request = Request::new(TonicRequest {
SpanExporter::Tonic {
trace_exporter,
metadata,
..
} => {
let mut request = Request::new(TonicRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
});

if let Some(metadata) = metadata {
for key_and_value in metadata.iter() {
match key_and_value {
KeyAndValueRef::Ascii(key, value) => {
request.metadata_mut().append(key, value.to_owned())
}
KeyAndValueRef::Binary(key, value) => {
request.metadata_mut().append_bin(key, value.to_owned())
}
};
}
}

trace_exporter
.to_owned()
.export(request)
Expand Down
10 changes: 9 additions & 1 deletion opentelemetry-otlp/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ impl TraceService for MockServer {
request: tonic::Request<ExportTraceServiceRequest>,
) -> Result<tonic::Response<ExportTraceServiceResponse>, tonic::Status> {
println!("Sending request into channel...");
// assert we have required metadata key
assert_eq!(
request.metadata().get("x-header-key"),
Some(&("header-value".parse().unwrap()))
);
self.tx
.lock()
.unwrap()
Expand Down Expand Up @@ -68,12 +73,15 @@ async fn smoke_tracer() {

{
println!("Installing tracer...");
let mut metadata = tonic::metadata::MetadataMap::new();
metadata.insert("x-header-key", "header-value".parse().unwrap());
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(format!("http://{}", addr)),
.with_endpoint(format!("http://{}", addr))
.with_metadata(metadata),
)
.install_batch(opentelemetry::runtime::Tokio)
.expect("failed to install");
Expand Down

0 comments on commit 55d798e

Please sign in to comment.