Skip to content

Commit

Permalink
WIP - jaeger exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
jwilm committed Apr 21, 2022
1 parent cc2175d commit 385caa2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ members = [
"opentelemetry-datadog",
"opentelemetry-dynatrace",
"opentelemetry-http",
# "opentelemetry-jaeger",
"opentelemetry-jaeger",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"opentelemetry-proto",
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"]
async-std = { version = "1.6", optional = true }
async-trait = "0.1"
base64 = { version = "0.13", optional = true }
futures = "0.3"
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true }
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
Expand Down
79 changes: 56 additions & 23 deletions opentelemetry-jaeger/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ use std::convert::TryFrom;

use self::runtime::JaegerTraceRuntime;
use self::thrift::jaeger;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::StreamExt;
use std::convert::TryInto;
use std::future::Future;

#[cfg(feature = "isahc_collector_client")]
#[allow(unused_imports)] // this is actually used to configure authentication
Expand All @@ -45,22 +48,59 @@ const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";
/// Jaeger span exporter
#[derive(Debug)]
pub struct Exporter {
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
tx: mpsc::Sender<(Vec<trace::SpanData>, oneshot::Sender<trace::ExportResult>)>,
}

impl Exporter {
fn new(
process: jaeger::Process,
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
) -> Exporter {
Exporter {
process,
export_instrumentation_lib,
uploader,
) -> (Exporter, impl Future<Output = ()>) {
let (tx, rx) = futures::channel::mpsc::channel(64);
(
Exporter { tx },
ExporterTask {
rx,
process,
export_instrumentation_lib,
uploader,
}
.run(),
)
}
}

struct ExporterTask {
rx: mpsc::Receiver<(Vec<trace::SpanData>, oneshot::Sender<trace::ExportResult>)>,
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
}

impl ExporterTask {
async fn run(mut self) {
// TODO jwilm: this might benefit from a ExporterMessage so that we can
// send Shutdown and break the loop.
while let Some((batch, tx)) = self.rx.next().await {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// TODO jwilm: is ignoring the err (fail to send) correct here?
let _ = tx.send(res);
}
}
}
Expand All @@ -74,23 +114,16 @@ pub struct Process {
pub tags: Vec<KeyValue>,
}

#[async_trait]
impl trace::SpanExporter for Exporter {
/// Export spans to Jaeger
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let (tx, rx) = oneshot::channel();

if let Err(err) = self.tx.try_send((batch, tx)) {
return Box::pin(futures::future::ready(Err(Into::into(err))));
}

self.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await
Box::pin(async move { rx.await? })
}
}

Expand Down

0 comments on commit 385caa2

Please sign in to comment.