From 6f7588777926046c235af3061293fc5d0ee645c5 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Thu, 21 Apr 2022 10:49:30 -0700 Subject: [PATCH] [WIP] Implement new SpanExporter API for Jaeger Key points - decouple exporter from uploaders via channel and spawned task - some uploaders are a shared I/O resource and cannot be multiplexed - necessitates a task queue - eg, HttpClient will spawn many I/O tasks internally, AgentUploader is a single I/O resource. Different level of abstraction. - Synchronous API not supported without a Runtime argument (can we thread one through?) --- Cargo.toml | 2 +- opentelemetry-jaeger/Cargo.toml | 2 + .../src/exporter/config/agent.rs | 88 ++++++++++--------- .../src/exporter/config/mod.rs | 1 + opentelemetry-jaeger/src/exporter/mod.rs | 79 ++++++++++++----- 5 files changed, 105 insertions(+), 67 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a595369927..95f0788102 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ "opentelemetry-datadog", "opentelemetry-dynatrace", "opentelemetry-http", -# "opentelemetry-jaeger", + "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-prometheus", "opentelemetry-proto", diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index f020cb87ca..f37308ac35 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"] async-std = { version = "1.6", optional = true } async-trait = "0.1" base64 = { version = "0.13", optional = true } +futures = "0.3" +futures-channel = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true } headers = { version = "0.3.2", optional = true } http = { version = "0.2", optional = true } diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index efefc17a9b..0b70dcc51c 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -228,25 +228,25 @@ impl AgentPipeline { /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline. /// /// The exporter will send each span to the agent upon the span ends. - pub fn build_simple(mut self) -> Result { - let mut builder = sdk::trace::TracerProvider::builder(); - - let (config, process) = build_config_and_process( - builder.sdk_provided_resource(), - self.trace_config.take(), - self.transformation_config.service_name.take(), - ); - let exporter = Exporter::new( - process.into(), - self.transformation_config.export_instrument_library, - self.build_sync_agent_uploader()?, - ); - - builder = builder.with_simple_exporter(exporter); - builder = builder.with_config(config); - - Ok(builder.build()) - } + // pub fn build_simple(mut self) -> Result { + // let mut builder = sdk::trace::TracerProvider::builder(); + + // let (config, process) = build_config_and_process( + // builder.sdk_provided_resource(), + // self.trace_config.take(), + // self.transformation_config.service_name.take(), + // ); + // let exporter = Exporter::new( + // process.into(), + // self.transformation_config.export_instrument_library, + // self.build_sync_agent_uploader()?, + // ); + + // builder = builder.with_simple_exporter(exporter); + // builder = builder.with_config(config); + + // Ok(builder.build()) + // } /// Build a `TracerProvider` using a async exporter and configurations from the pipeline. /// @@ -275,7 +275,9 @@ impl AgentPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_async_agent_uploader(runtime.clone())?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); @@ -287,10 +289,10 @@ impl AgentPipeline { /// tracer provider. /// /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. - pub fn install_simple(self) -> Result { - let tracer_provider = self.build_simple()?; - install_tracer_provider_and_get_tracer(tracer_provider) - } + // pub fn install_simple(self) -> Result { + // let tracer_provider = self.build_simple()?; + // install_tracer_provider_and_get_tracer(tracer_provider) + // } /// Similar to [`build_batch`][AgentPipeline::build_batch] but also returns a tracer from the /// tracer provider. @@ -321,28 +323,27 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let uploader = self.build_async_agent_uploader(runtime)?; - Ok(Exporter::new( - process.into(), - export_instrument_library, - uploader, - )) + let uploader = self.build_async_agent_uploader(runtime.clone())?; + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); + Ok(exporter) } /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime. - pub fn build_sync_agent_exporter(mut self) -> Result { - let builder = sdk::trace::TracerProvider::builder(); - let (_, process) = build_config_and_process( - builder.sdk_provided_resource(), - self.trace_config.take(), - self.transformation_config.service_name.take(), - ); - Ok(Exporter::new( - process.into(), - self.transformation_config.export_instrument_library, - self.build_sync_agent_uploader()?, - )) - } + // pub fn build_sync_agent_exporter(mut self) -> Result { + // let builder = sdk::trace::TracerProvider::builder(); + // let (_, process) = build_config_and_process( + // builder.sdk_provided_resource(), + // self.trace_config.take(), + // self.transformation_config.service_name.take(), + // ); + // Ok(Exporter::new( + // process.into(), + // self.transformation_config.export_instrument_library, + // self.build_sync_agent_uploader()?, + // )) + // } fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> where @@ -358,6 +359,7 @@ impl AgentPipeline { Ok(Box::new(AsyncUploader::Agent(agent))) } + #[allow(dead_code)] // TODO jwilm fn build_sync_agent_uploader(self) -> Result, TraceError> { let agent = AgentSyncClientUdp::new( self.agent_endpoint?.as_slice(), diff --git a/opentelemetry-jaeger/src/exporter/config/mod.rs b/opentelemetry-jaeger/src/exporter/config/mod.rs index a363c0270a..039dcc1a07 100644 --- a/opentelemetry-jaeger/src/exporter/config/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/mod.rs @@ -153,6 +153,7 @@ mod tests { // OTEL_SERVICE_NAME env var also works env::set_var("OTEL_SERVICE_NAME", "test service"); let builder = new_agent_pipeline(); + // TODO jwilm let exporter = builder.build_sync_agent_exporter().unwrap(); assert_eq!(exporter.process.service_name, "test service"); env::set_var("OTEL_SERVICE_NAME", "") diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 0d64a739ea..5500505df6 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -18,8 +18,11 @@ use std::convert::TryFrom; use self::runtime::JaegerTraceRuntime; use self::thrift::jaeger; -use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; +use futures::future::BoxFuture; +use futures::StreamExt; use std::convert::TryInto; +use std::future::Future; #[cfg(feature = "isahc_collector_client")] #[allow(unused_imports)] // this is actually used to configure authentication @@ -45,10 +48,7 @@ const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; /// Jaeger span exporter #[derive(Debug)] pub struct Exporter { - process: jaeger::Process, - /// Whether or not to export instrumentation information. - export_instrumentation_lib: bool, - uploader: Box, + tx: mpsc::Sender<(Vec, oneshot::Sender)>, } impl Exporter { @@ -56,11 +56,51 @@ impl Exporter { process: jaeger::Process, export_instrumentation_lib: bool, uploader: Box, - ) -> Exporter { - Exporter { - process, - export_instrumentation_lib, - uploader, + ) -> (Exporter, impl Future) { + let (tx, rx) = futures::channel::mpsc::channel(64); + ( + Exporter { tx }, + ExporterTask { + rx, + process, + export_instrumentation_lib, + uploader, + } + .run(), + ) + } +} + +struct ExporterTask { + rx: mpsc::Receiver<(Vec, oneshot::Sender)>, + process: jaeger::Process, + /// Whether or not to export instrumentation information. + export_instrumentation_lib: bool, + uploader: Box, +} + +impl ExporterTask { + async fn run(mut self) { + // TODO jwilm: this might benefit from a ExporterMessage so that we can + // send Shutdown and break the loop. + while let Some((batch, tx)) = self.rx.next().await { + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let process = self.process.clone(); + + for span in batch.into_iter() { + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); + } + + let res = self + .uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await; + + // TODO jwilm: is ignoring the err (fail to send) correct here? + let _ = tx.send(res); } } } @@ -74,23 +114,16 @@ pub struct Process { pub tags: Vec, } -#[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Jaeger - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let process = self.process.clone(); - - for span in batch.into_iter() { - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); + fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { + let (tx, rx) = oneshot::channel(); + + if let Err(err) = self.tx.try_send((batch, tx)) { + return Box::pin(futures::future::ready(Err(Into::into(err)))); } - self.uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await + Box::pin(async move { rx.await? }) } }