diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c9ea9639cd..7d192d92c6 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -111,6 +111,14 @@ The Opentelemetry Rust SDK comes with an error type `openetelemetry::Error`. For For users that want to implement their own exporters. It's RECOMMENDED to wrap all errors from the exporter into a crate-level error type, and implement `ExporterError` trait. +### Priority of configurations +OpenTelemetry supports multiple ways to configure the API, SDK and other components. The priority of configurations is as follows: + +- Environment variables +- Compiling time configurations provided in the source code + + + ## Style Guide * Run `cargo clippy --all` - this will catch common mistakes and improve diff --git a/opentelemetry-jaeger/CHANGELOG.md b/opentelemetry-jaeger/CHANGELOG.md index acf49a24cb..1f28e055ca 100644 --- a/opentelemetry-jaeger/CHANGELOG.md +++ b/opentelemetry-jaeger/CHANGELOG.md @@ -6,6 +6,7 @@ - Bump MSRV to 1.65 [#1318](https://github.com/open-telemetry/opentelemetry-rust/pull/1318) - Bump MSRV to 1.64 [#1203](https://github.com/open-telemetry/opentelemetry-rust/pull/1203) +- Prioritize environment variables over compiling time variables [#1323](https://github.com/open-telemetry/opentelemetry-rust/pull/1323) ## v0.19.0 diff --git a/opentelemetry-jaeger/src/exporter/agent.rs b/opentelemetry-jaeger/src/exporter/agent.rs index dfbc0b706f..8982d526c8 100644 --- a/opentelemetry-jaeger/src/exporter/agent.rs +++ b/opentelemetry-jaeger/src/exporter/agent.rs @@ -1,5 +1,5 @@ //! # UDP Jaeger Agent Client -use crate::exporter::addrs_and_family; +use crate::exporter::address_family; use crate::exporter::runtime::JaegerTraceRuntime; use crate::exporter::thrift::{ agent::{self, TAgentSyncClient}, @@ -7,7 +7,7 @@ use crate::exporter::thrift::{ }; use crate::exporter::transport::{TBufferChannel, TNoopChannel}; use std::fmt; -use std::net::{ToSocketAddrs, UdpSocket}; +use std::net::{SocketAddr, UdpSocket}; use thrift::{ protocol::{TCompactInputProtocol, TCompactOutputProtocol}, transport::{ReadHalf, TIoChannel, WriteHalf}, @@ -43,10 +43,10 @@ pub(crate) struct AgentSyncClientUdp { impl AgentSyncClientUdp { /// Create a new UDP agent client - pub(crate) fn new( - agent_endpoint: T, + pub(crate) fn new( max_packet_size: usize, auto_split: bool, + agent_address: Vec, ) -> thrift::Result { let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?; let client = agent::AgentSyncClient::new( @@ -54,9 +54,8 @@ impl AgentSyncClientUdp { TCompactOutputProtocol::new(write), ); - let (addrs, family) = addrs_and_family(&agent_endpoint)?; - let conn = UdpSocket::bind(family)?; - conn.connect(addrs.as_slice())?; + let conn = UdpSocket::bind(address_family(agent_address.as_slice()))?; + conn.connect(agent_address.as_slice())?; Ok(AgentSyncClientUdp { conn, @@ -102,11 +101,11 @@ pub(crate) struct AgentAsyncClientUdp { impl AgentAsyncClientUdp { /// Create a new UDP agent client - pub(crate) fn new( - agent_endpoint: T, + pub(crate) fn new( max_packet_size: usize, runtime: R, auto_split: bool, + agent_address: Vec, ) -> thrift::Result { let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?; let client = agent::AgentSyncClient::new( @@ -114,7 +113,7 @@ impl AgentAsyncClientUdp { TCompactOutputProtocol::new(write), ); - let conn = runtime.create_socket(agent_endpoint)?; + let conn = runtime.create_socket(agent_address.as_slice())?; Ok(AgentAsyncClientUdp { runtime, diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index 36d928e30b..8bc3945c71 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -1,3 +1,15 @@ +use std::borrow::BorrowMut; +use std::net::ToSocketAddrs; +use std::sync::Arc; +use std::{env, net}; + +use opentelemetry::trace::TraceError; +use opentelemetry_sdk::trace::{BatchSpanProcessor, Tracer}; +use opentelemetry_sdk::{ + self, + trace::{BatchConfig, Config, TracerProvider}, +}; + use crate::exporter::agent::{AgentAsyncClientUdp, AgentSyncClientUdp}; use crate::exporter::config::{ build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig, @@ -5,15 +17,6 @@ use crate::exporter::config::{ }; use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader}; use crate::{Error, Exporter, JaegerTraceRuntime}; -use opentelemetry::trace::TraceError; -use opentelemetry_sdk::trace::{BatchSpanProcessor, Tracer}; -use opentelemetry_sdk::{ - self, - trace::{BatchConfig, Config, TracerProvider}, -}; -use std::borrow::BorrowMut; -use std::sync::Arc; -use std::{env, net}; /// The max size of UDP packet we want to send, synced with jaeger-agent const UDP_PACKET_MAX_LENGTH: usize = 65_000; @@ -78,38 +81,23 @@ pub struct AgentPipeline { transformation_config: TransformationConfig, trace_config: Option, batch_config: Option, - agent_endpoint: Result, crate::Error>, + agent_endpoint: Option, max_packet_size: usize, auto_split_batch: bool, } impl Default for AgentPipeline { fn default() -> Self { - let mut pipeline = AgentPipeline { + AgentPipeline { transformation_config: Default::default(), trace_config: Default::default(), batch_config: Some(Default::default()), - agent_endpoint: Ok(vec![format!( + agent_endpoint: Some(format!( "{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}" - ) - .parse() - .unwrap()]), + )), max_packet_size: UDP_PACKET_MAX_LENGTH, auto_split_batch: false, - }; - - let endpoint = match (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) { - (Ok(host), Ok(port)) => Some(format!("{}:{}", host.trim(), port.trim())), - (Ok(host), _) => Some(format!("{}:{DEFAULT_AGENT_ENDPOINT_PORT}", host.trim())), - (_, Ok(port)) => Some(format!("{DEFAULT_AGENT_ENDPOINT_HOST}:{}", port.trim())), - (_, _) => None, - }; - - if let Some(endpoint) = endpoint { - pipeline = pipeline.with_endpoint(endpoint); } - - pipeline } } @@ -147,16 +135,9 @@ impl AgentPipeline { /// Any valid socket address can be used. /// /// Default to be `127.0.0.1:6831`. - pub fn with_endpoint(self, agent_endpoint: T) -> Self { + pub fn with_endpoint>(self, agent_endpoint: T) -> Self { AgentPipeline { - agent_endpoint: agent_endpoint - .to_socket_addrs() - .map(|addrs| addrs.collect()) - .map_err(|io_err| crate::Error::ConfigError { - pipeline_name: "agent", - config_name: "endpoint", - reason: io_err.to_string(), - }), + agent_endpoint: Some(agent_endpoint.into()), ..self } } @@ -391,10 +372,10 @@ impl AgentPipeline { R: JaegerTraceRuntime, { let agent = AgentAsyncClientUdp::new( - self.agent_endpoint?.as_slice(), self.max_packet_size, runtime, self.auto_split_batch, + self.resolve_endpoint()?, ) .map_err::(Into::into)?; Ok(Arc::new(AsyncUploader::Agent( @@ -404,13 +385,38 @@ impl AgentPipeline { fn build_sync_agent_uploader(self) -> Result, TraceError> { let agent = AgentSyncClientUdp::new( - self.agent_endpoint?.as_slice(), self.max_packet_size, self.auto_split_batch, + self.resolve_endpoint()?, ) .map_err::(Into::into)?; Ok(Arc::new(SyncUploader::Agent(std::sync::Mutex::new(agent)))) } + + // resolve the agent endpoint from the environment variables or the builder + // if only one of the environment variables is set, the other one will be set to the default value + // if no environment variable is set, the builder value will be used. + fn resolve_endpoint(self) -> Result, TraceError> { + let endpoint_str = match (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) { + (Ok(host), Ok(port)) => format!("{}:{}", host.trim(), port.trim()), + (Ok(host), _) => format!("{}:{DEFAULT_AGENT_ENDPOINT_PORT}", host.trim()), + (_, Ok(port)) => format!("{DEFAULT_AGENT_ENDPOINT_HOST}:{}", port.trim()), + (_, _) => self.agent_endpoint.unwrap_or(format!( + "{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}" + )), + }; + endpoint_str + .to_socket_addrs() + .map(|addrs| addrs.collect()) + .map_err(|io_err| { + Error::ConfigError { + pipeline_name: "agent", + config_name: "endpoint", + reason: io_err.to_string(), + } + .into() + }) + } } #[cfg(test)] @@ -429,9 +435,12 @@ mod tests { ("127.0.0.1:1001", true), ]; for (socket_str, is_ok) in test_cases.into_iter() { - let pipeline = AgentPipeline::default().with_endpoint(socket_str); + let resolved_endpoint = AgentPipeline::default() + .with_endpoint(socket_str) + .resolve_endpoint(); assert_eq!( - pipeline.agent_endpoint.is_ok(), + resolved_endpoint.is_ok(), + // if is_ok is true, use socket_str, otherwise use the default endpoint is_ok, "endpoint string {}", socket_str diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 7777d3c4bd..0b6794ffff 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -1,12 +1,3 @@ -use crate::exporter::config::{ - build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig, - TransformationConfig, -}; -use crate::exporter::uploader::{AsyncUploader, Uploader}; -use crate::{Exporter, JaegerTraceRuntime}; -use http::Uri; -use opentelemetry::trace::TraceError; -use opentelemetry_sdk::trace::{BatchConfig, BatchSpanProcessor, Config, Tracer, TracerProvider}; use std::borrow::BorrowMut; use std::convert::TryFrom; use std::env; @@ -14,16 +5,25 @@ use std::sync::Arc; #[cfg(feature = "collector_client")] use std::time::Duration; +use http::Uri; + +use opentelemetry::trace::TraceError; #[cfg(feature = "collector_client")] use opentelemetry_http::HttpClient; +use opentelemetry_sdk::trace::{BatchConfig, BatchSpanProcessor, Config, Tracer, TracerProvider}; #[cfg(feature = "collector_client")] use crate::config::collector::http_client::CollectorHttpClient; - #[cfg(feature = "collector_client")] use crate::exporter::collector::AsyncHttpClient; #[cfg(feature = "wasm_collector_client")] use crate::exporter::collector::WasmCollector; +use crate::exporter::config::{ + build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig, + TransformationConfig, +}; +use crate::exporter::uploader::{AsyncUploader, Uploader}; +use crate::{Exporter, JaegerTraceRuntime}; #[cfg(feature = "collector_client")] mod http_client; @@ -43,7 +43,7 @@ const ENV_TIMEOUT: &str = "OTEL_EXPORTER_JAEGER_TIMEOUT"; const DEFAULT_COLLECTOR_TIMEOUT: Duration = Duration::from_secs(10); /// Username to send as part of "Basic" authentication to the collector endpoint. -const ENV_USER: &str = "OTEL_EXPORTER_JAEGER_USER"; +const ENV_USERNAME: &str = "OTEL_EXPORTER_JAEGER_USER"; /// Password to send as part of "Basic" authentication to the collector endpoint. const ENV_PASSWORD: &str = "OTEL_EXPORTER_JAEGER_PASSWORD"; @@ -97,7 +97,7 @@ pub struct CollectorPipeline { #[cfg(feature = "collector_client")] collector_timeout: Duration, // only used by builtin http clients. - collector_endpoint: Option>, + collector_endpoint: Option, collector_username: Option, collector_password: Option, @@ -106,7 +106,7 @@ pub struct CollectorPipeline { impl Default for CollectorPipeline { fn default() -> Self { - let mut pipeline = Self { + Self { #[cfg(feature = "collector_client")] collector_timeout: DEFAULT_COLLECTOR_TIMEOUT, collector_endpoint: None, @@ -116,33 +116,7 @@ impl Default for CollectorPipeline { transformation_config: Default::default(), trace_config: Default::default(), batch_config: Some(Default::default()), - }; - - #[cfg(feature = "collector_client")] - if let Some(timeout) = env::var(ENV_TIMEOUT).ok().filter(|var| !var.is_empty()) { - let timeout = match timeout.parse() { - Ok(timeout) => Duration::from_millis(timeout), - Err(e) => { - eprintln!("{} malformed defaulting to 10000: {}", ENV_TIMEOUT, e); - DEFAULT_COLLECTOR_TIMEOUT - } - }; - pipeline = pipeline.with_timeout(timeout); } - - if let Some(endpoint) = env::var(ENV_ENDPOINT).ok().filter(|var| !var.is_empty()) { - pipeline = pipeline.with_endpoint(endpoint); - } - - if let Some(user) = env::var(ENV_USER).ok().filter(|var| !var.is_empty()) { - pipeline = pipeline.with_username(user); - } - - if let Some(password) = env::var(ENV_PASSWORD).ok().filter(|var| !var.is_empty()) { - pipeline = pipeline.with_password(password); - } - - pipeline } } @@ -224,15 +198,9 @@ impl CollectorPipeline { /// Set the collector endpoint. /// /// E.g. "http://localhost:14268/api/traces" - pub fn with_endpoint(self, collector_endpoint: T) -> Self - where - http::Uri: core::convert::TryFrom, - >::Error: Into, - { + pub fn with_endpoint>(self, collector_endpoint: T) -> Self { Self { - collector_endpoint: Some( - core::convert::TryFrom::try_from(collector_endpoint).map_err(Into::into), - ), + collector_endpoint: Some(collector_endpoint.into()), ..self } } @@ -491,64 +459,95 @@ impl CollectorPipeline { where R: JaegerTraceRuntime, { - let endpoint = self - .collector_endpoint - .transpose() - .map_err::(|err| crate::Error::ConfigError { - pipeline_name: "collector", - config_name: "collector_endpoint", - reason: format!("invalid uri, {}", err), - })? - .unwrap_or_else(|| { - Uri::try_from(DEFAULT_ENDPOINT).unwrap() // default endpoint should always valid - }); + let endpoint = self.resolve_endpoint()?; + let username = self.resolve_username(); + let password = self.resolve_password(); + #[cfg(feature = "collector_client")] + let timeout = self.resolve_timeout(); match self.client_config { #[cfg(feature = "collector_client")] ClientConfig::Http { client_type } => { - let client = client_type.build_client( - self.collector_username, - self.collector_password, - self.collector_timeout, - )?; + let client = client_type.build_client(username, password, timeout)?; let collector = AsyncHttpClient::new(endpoint, client); Ok(Arc::new(AsyncUploader::::Collector(collector))) } #[cfg(feature = "wasm_collector_client")] ClientConfig::Wasm => { - let collector = - WasmCollector::new(endpoint, self.collector_username, self.collector_password) - .map_err::(Into::into)?; + let collector = WasmCollector::new(endpoint, username, password) + .map_err::(Into::into)?; Ok(Arc::new(AsyncUploader::::WasmCollector(collector))) } } } + + fn resolve_env_var(env_var: &'static str) -> Option { + env::var(env_var).ok().filter(|var| !var.is_empty()) + } + + // if provided value from environment variable or the builder is invalid, return error + fn resolve_endpoint(&self) -> Result { + let endpoint_from_env = Self::resolve_env_var(ENV_ENDPOINT) + .map(|endpoint| { + Uri::try_from(endpoint.as_str()).map_err::(|err| { + crate::Error::ConfigError { + pipeline_name: "collector", + config_name: "collector_endpoint", + reason: format!("invalid uri from environment variable, {}", err), + } + }) + }) + .transpose()?; + + Ok(match endpoint_from_env { + Some(endpoint) => endpoint, + None => { + if let Some(endpoint) = &self.collector_endpoint { + Uri::try_from(endpoint.as_str()).map_err::(|err| { + crate::Error::ConfigError { + pipeline_name: "collector", + config_name: "collector_endpoint", + reason: format!("invalid uri from the builder, {}", err), + } + })? + } else { + Uri::try_from(DEFAULT_ENDPOINT).unwrap() // default endpoint should always valid + } + } + }) + } + + #[cfg(feature = "collector_client")] + fn resolve_timeout(&self) -> Duration { + match Self::resolve_env_var(ENV_TIMEOUT) { + Some(timeout) => match timeout.parse() { + Ok(timeout) => Duration::from_millis(timeout), + Err(e) => { + eprintln!("{} malformed default to 10s: {}", ENV_TIMEOUT, e); + self.collector_timeout + } + }, + None => self.collector_timeout, + } + } + + fn resolve_username(&self) -> Option { + Self::resolve_env_var(ENV_USERNAME).or_else(|| self.collector_username.clone()) + } + + fn resolve_password(&self) -> Option { + Self::resolve_env_var(ENV_PASSWORD).or_else(|| self.collector_password.clone()) + } } #[cfg(test)] #[cfg(feature = "rt-tokio")] mod tests { - use super::*; - use crate::config::collector::http_client::test_http_client; use opentelemetry_sdk::runtime::Tokio; - #[test] - fn test_collector_defaults() { - // No Env Variable - std::env::remove_var(ENV_TIMEOUT); - let builder = CollectorPipeline::default(); - assert_eq!(DEFAULT_COLLECTOR_TIMEOUT, builder.collector_timeout); - - // Bad Env Variable - std::env::set_var(ENV_TIMEOUT, "a"); - let builder = CollectorPipeline::default(); - assert_eq!(DEFAULT_COLLECTOR_TIMEOUT, builder.collector_timeout); + use crate::config::collector::http_client::test_http_client; - // Good Env Variable - std::env::set_var(ENV_TIMEOUT, "777"); - let builder = CollectorPipeline::default(); - assert_eq!(Duration::from_millis(777), builder.collector_timeout); - } + use super::*; #[test] fn test_set_collector_endpoint() { @@ -559,7 +558,7 @@ mod tests { assert!(invalid_uri.is_err()); assert_eq!( format!("{:?}", invalid_uri.err().unwrap()), - "ConfigError { pipeline_name: \"collector\", config_name: \"collector_endpoint\", reason: \"invalid uri, invalid format\" }", + "ConfigError { pipeline_name: \"collector\", config_name: \"collector_endpoint\", reason: \"invalid uri from the builder, invalid format\" }", ); let valid_uri = new_collector_pipeline() @@ -578,4 +577,148 @@ mod tests { .build_collector_exporter::(); assert!(exporter.is_ok()); } + + #[test] + fn test_resolve_endpoint() { + struct TestCase<'a> { + description: &'a str, + env_var: &'a str, + builder_endpoint: Option<&'a str>, + expected_result: Result, + } + let test_cases = vec![ + TestCase { + description: "Positive: Endpoint from environment variable exists", + env_var: "http://example.com", + builder_endpoint: None, + expected_result: Ok(Uri::try_from("http://example.com").unwrap()), + }, + TestCase { + description: "Positive: Endpoint from builder", + env_var: "", + builder_endpoint: Some("http://example.com"), + expected_result: Ok(Uri::try_from("http://example.com").unwrap()), + }, + TestCase { + description: "Negative: Invalid URI from environment variable", + env_var: "invalid random uri", + builder_endpoint: None, + expected_result: Err(crate::Error::ConfigError { + pipeline_name: "collector", + config_name: "collector_endpoint", + reason: "invalid uri from environment variable, invalid uri character" + .to_string(), + }), + }, + TestCase { + description: "Negative: Invalid URI from builder", + env_var: "", + builder_endpoint: Some("invalid random uri"), + expected_result: Err(crate::Error::ConfigError { + pipeline_name: "collector", + config_name: "collector_endpoint", + reason: "invalid uri from the builder, invalid uri character".to_string(), + }), + }, + TestCase { + description: "Positive: Default endpoint (no environment variable set)", + env_var: "", + builder_endpoint: None, + expected_result: Ok(Uri::try_from(DEFAULT_ENDPOINT).unwrap()), + }, + ]; + for test_case in test_cases { + env::set_var(ENV_ENDPOINT, test_case.env_var); + let builder = CollectorPipeline { + collector_endpoint: test_case.builder_endpoint.map(|s| s.to_string()), + ..Default::default() + }; + let result = builder.resolve_endpoint(); + match test_case.expected_result { + Ok(expected) => { + assert_eq!(result.unwrap(), expected, "{}", test_case.description); + } + Err(expected_err) => { + assert!( + result.is_err(), + "{}, expected error, get {}", + test_case.description, + result.unwrap() + ); + match (result.unwrap_err(), expected_err) { + ( + crate::Error::ConfigError { + pipeline_name: result_pipeline_name, + config_name: result_config_name, + reason: result_reason, + }, + crate::Error::ConfigError { + pipeline_name: expected_pipeline_name, + config_name: expected_config_name, + reason: expected_reason, + }, + ) => { + assert_eq!( + result_pipeline_name, expected_pipeline_name, + "{}", + test_case.description + ); + assert_eq!( + result_config_name, expected_config_name, + "{}", + test_case.description + ); + assert_eq!(result_reason, expected_reason, "{}", test_case.description); + } + _ => panic!("we don't expect collector to return other error"), + } + } + } + env::remove_var(ENV_ENDPOINT); + } + } + + #[test] + fn test_resolve_timeout() { + struct TestCase<'a> { + description: &'a str, + env_var: &'a str, + builder_var: Option, + expected_duration: Duration, + } + let test_cases = vec![ + TestCase { + description: "Valid environment variable", + env_var: "5000", + builder_var: None, + expected_duration: Duration::from_millis(5000), + }, + TestCase { + description: "Invalid environment variable", + env_var: "invalid", + builder_var: None, + expected_duration: DEFAULT_COLLECTOR_TIMEOUT, + }, + TestCase { + description: "Missing environment variable", + env_var: "", + builder_var: Some(Duration::from_millis(5000)), + expected_duration: Duration::from_millis(5000), + }, + ]; + for test_case in test_cases { + env::set_var(ENV_TIMEOUT, test_case.env_var); + let mut builder = CollectorPipeline::default(); + if let Some(timeout) = test_case.builder_var { + builder = builder.with_timeout(timeout); + } + let result = builder.resolve_timeout(); + assert_eq!( + result, test_case.expected_duration, + "{}", + test_case.description + ); + env::remove_var(ENV_TIMEOUT); + } + } } diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 698cc0d646..be3ebe4c32 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -1,25 +1,20 @@ //! # Jaeger Exporter //! -mod agent; -#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] -mod collector; -pub(crate) mod runtime; -#[allow(clippy::all, unreachable_pub, dead_code)] -#[rustfmt::skip] // don't format generated files -mod thrift; -pub mod config; -pub(crate) mod transport; -mod uploader; - // Linting isn't detecting that it's used seems like linting bug. #[allow(unused_imports)] #[cfg(feature = "surf_collector_client")] use std::convert::TryFrom; +use std::convert::TryInto; +use std::fmt::Display; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; -use self::runtime::JaegerTraceRuntime; -use self::thrift::jaeger; -use crate::exporter::uploader::Uploader; use futures_core::future::BoxFuture; +#[cfg(feature = "isahc_collector_client")] +#[allow(unused_imports)] // this is actually used to configure authentication +use isahc::prelude::Configurable; + use opentelemetry::{ trace::{Event, Link, SpanKind, Status}, InstrumentationLibrary, Key, KeyValue, @@ -31,16 +26,22 @@ use opentelemetry_sdk::{ }, trace::EvictedQueue, }; -use std::convert::TryInto; -use std::fmt::Display; -use std::io; -use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}; -use std::sync::Arc; -use std::time::{Duration, SystemTime}; -#[cfg(feature = "isahc_collector_client")] -#[allow(unused_imports)] // this is actually used to configure authentication -use isahc::prelude::Configurable; +use crate::exporter::uploader::Uploader; + +use self::runtime::JaegerTraceRuntime; +use self::thrift::jaeger; + +mod agent; +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] +mod collector; +pub(crate) mod runtime; +#[allow(clippy::all, unreachable_pub, dead_code)] +#[rustfmt::skip] // don't format generated files +mod thrift; +pub mod config; +pub(crate) mod transport; +mod uploader; /// Instrument Library name MUST be reported in Jaeger Span tags with the following key const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; @@ -341,27 +342,25 @@ impl ExportError for Error { /// Sample the first address provided to designate which IP family to bind the socket to. /// IP families returned be INADDR_ANY as [`Ipv4Addr::UNSPECIFIED`] or /// IN6ADDR_ANY as [`Ipv6Addr::UNSPECIFIED`]. -fn addrs_and_family( - host_port: &impl ToSocketAddrs, -) -> Result<(Vec, SocketAddr), io::Error> { - let addrs = host_port.to_socket_addrs()?.collect::>(); - let family = match addrs.first() { +fn address_family(addrs: &[SocketAddr]) -> SocketAddr { + match addrs.first() { Some(SocketAddr::V4(_)) | None => SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)), Some(SocketAddr::V6(_)) => SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)), - }; - Ok((addrs, family)) + } } #[cfg(test)] mod tests { - use super::SPAN_KIND; - use crate::exporter::thrift::jaeger::Tag; - use crate::exporter::{build_span_tags, OTEL_STATUS_CODE, OTEL_STATUS_DESCRIPTION}; use opentelemetry::{ trace::{SpanKind, Status}, KeyValue, }; + use crate::exporter::thrift::jaeger::Tag; + use crate::exporter::{build_span_tags, OTEL_STATUS_CODE, OTEL_STATUS_DESCRIPTION}; + + use super::SPAN_KIND; + fn assert_tag_contains(tags: Vec, key: &'static str, expect_val: &'static str) { assert_eq!( tags.into_iter() diff --git a/opentelemetry-jaeger/src/exporter/runtime.rs b/opentelemetry-jaeger/src/exporter/runtime.rs index 5348eefbcf..2c3f29dc41 100644 --- a/opentelemetry-jaeger/src/exporter/runtime.rs +++ b/opentelemetry-jaeger/src/exporter/runtime.rs @@ -3,7 +3,7 @@ feature = "rt-tokio", feature = "rt-tokio-current-thread" ))] -use crate::exporter::addrs_and_family; +use crate::exporter::address_family; use async_trait::async_trait; use opentelemetry_sdk::runtime::RuntimeChannel; use std::net::ToSocketAddrs; @@ -29,8 +29,8 @@ impl JaegerTraceRuntime for opentelemetry_sdk::runtime::Tokio { type Socket = tokio::net::UdpSocket; fn create_socket(&self, endpoint: T) -> thrift::Result { - let (addrs, family) = addrs_and_family(&endpoint)?; - let conn = std::net::UdpSocket::bind(family)?; + let addrs = endpoint.to_socket_addrs()?.collect::>(); + let conn = std::net::UdpSocket::bind(address_family(addrs.as_slice()))?; conn.connect(addrs.as_slice())?; Ok(tokio::net::UdpSocket::from_std(conn)?) } @@ -48,8 +48,8 @@ impl JaegerTraceRuntime for opentelemetry_sdk::runtime::TokioCurrentThread { type Socket = tokio::net::UdpSocket; fn create_socket(&self, endpoint: T) -> thrift::Result { - let (addrs, family) = addrs_and_family(&endpoint)?; - let conn = std::net::UdpSocket::bind(family)?; + let addrs = endpoint.to_socket_addrs()?.collect::>(); + let conn = std::net::UdpSocket::bind(address_family(addrs.as_slice()))?; conn.connect(addrs.as_slice())?; Ok(tokio::net::UdpSocket::from_std(conn)?) } @@ -67,8 +67,8 @@ impl JaegerTraceRuntime for opentelemetry_sdk::runtime::AsyncStd { type Socket = async_std::net::UdpSocket; fn create_socket(&self, endpoint: T) -> thrift::Result { - let (addrs, family) = addrs_and_family(&endpoint)?; - let conn = std::net::UdpSocket::bind(family)?; + let addrs = endpoint.to_socket_addrs()?.collect::>(); + let conn = std::net::UdpSocket::bind(address_family(addrs.as_slice()))?; conn.connect(addrs.as_slice())?; Ok(async_std::net::UdpSocket::from(conn)) }