Skip to content

Commit

Permalink
Env over compling time config (#1323)
Browse files Browse the repository at this point in the history
* feat: favor env vars in jaeger exporters

* feat: collectors

* fix unit tests

* unit tests

* unit tests

* add change logs
  • Loading branch information
TommyCpp authored Nov 6, 2023
1 parent e265d24 commit 4dd54a2
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 176 deletions.
8 changes: 8 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ The Opentelemetry Rust SDK comes with an error type `openetelemetry::Error`. For

For users that want to implement their own exporters. It's RECOMMENDED to wrap all errors from the exporter into a crate-level error type, and implement `ExporterError` trait.

### Priority of configurations
OpenTelemetry supports multiple ways to configure the API, SDK and other components. The priority of configurations is as follows:

- Environment variables
- Compiling time configurations provided in the source code



## Style Guide

* Run `cargo clippy --all` - this will catch common mistakes and improve
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-jaeger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Bump MSRV to 1.65 [#1318](https://github.com/open-telemetry/opentelemetry-rust/pull/1318)
- Bump MSRV to 1.64 [#1203](https://github.com/open-telemetry/opentelemetry-rust/pull/1203)
- Prioritize environment variables over compiling time variables [#1323](https://github.com/open-telemetry/opentelemetry-rust/pull/1323)

## v0.19.0

Expand Down
19 changes: 9 additions & 10 deletions opentelemetry-jaeger/src/exporter/agent.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! # UDP Jaeger Agent Client
use crate::exporter::addrs_and_family;
use crate::exporter::address_family;
use crate::exporter::runtime::JaegerTraceRuntime;
use crate::exporter::thrift::{
agent::{self, TAgentSyncClient},
jaeger,
};
use crate::exporter::transport::{TBufferChannel, TNoopChannel};
use std::fmt;
use std::net::{ToSocketAddrs, UdpSocket};
use std::net::{SocketAddr, UdpSocket};
use thrift::{
protocol::{TCompactInputProtocol, TCompactOutputProtocol},
transport::{ReadHalf, TIoChannel, WriteHalf},
Expand Down Expand Up @@ -43,20 +43,19 @@ pub(crate) struct AgentSyncClientUdp {

impl AgentSyncClientUdp {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
agent_endpoint: T,
pub(crate) fn new(
max_packet_size: usize,
auto_split: bool,
agent_address: Vec<SocketAddr>,
) -> thrift::Result<Self> {
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
TCompactOutputProtocol::new(write),
);

let (addrs, family) = addrs_and_family(&agent_endpoint)?;
let conn = UdpSocket::bind(family)?;
conn.connect(addrs.as_slice())?;
let conn = UdpSocket::bind(address_family(agent_address.as_slice()))?;
conn.connect(agent_address.as_slice())?;

Ok(AgentSyncClientUdp {
conn,
Expand Down Expand Up @@ -102,19 +101,19 @@ pub(crate) struct AgentAsyncClientUdp<R: JaegerTraceRuntime> {

impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
agent_endpoint: T,
pub(crate) fn new(
max_packet_size: usize,
runtime: R,
auto_split: bool,
agent_address: Vec<SocketAddr>,
) -> thrift::Result<Self> {
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
TCompactOutputProtocol::new(write),
);

let conn = runtime.create_socket(agent_endpoint)?;
let conn = runtime.create_socket(agent_address.as_slice())?;

Ok(AgentAsyncClientUdp {
runtime,
Expand Down
91 changes: 50 additions & 41 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use std::borrow::BorrowMut;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::{env, net};

use opentelemetry::trace::TraceError;
use opentelemetry_sdk::trace::{BatchSpanProcessor, Tracer};
use opentelemetry_sdk::{
self,
trace::{BatchConfig, Config, TracerProvider},
};

use crate::exporter::agent::{AgentAsyncClientUdp, AgentSyncClientUdp};
use crate::exporter::config::{
build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig,
TransformationConfig,
};
use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader};
use crate::{Error, Exporter, JaegerTraceRuntime};
use opentelemetry::trace::TraceError;
use opentelemetry_sdk::trace::{BatchSpanProcessor, Tracer};
use opentelemetry_sdk::{
self,
trace::{BatchConfig, Config, TracerProvider},
};
use std::borrow::BorrowMut;
use std::sync::Arc;
use std::{env, net};

/// The max size of UDP packet we want to send, synced with jaeger-agent
const UDP_PACKET_MAX_LENGTH: usize = 65_000;
Expand Down Expand Up @@ -78,38 +81,23 @@ pub struct AgentPipeline {
transformation_config: TransformationConfig,
trace_config: Option<Config>,
batch_config: Option<BatchConfig>,
agent_endpoint: Result<Vec<net::SocketAddr>, crate::Error>,
agent_endpoint: Option<String>,
max_packet_size: usize,
auto_split_batch: bool,
}

impl Default for AgentPipeline {
fn default() -> Self {
let mut pipeline = AgentPipeline {
AgentPipeline {
transformation_config: Default::default(),
trace_config: Default::default(),
batch_config: Some(Default::default()),
agent_endpoint: Ok(vec![format!(
agent_endpoint: Some(format!(
"{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
)
.parse()
.unwrap()]),
)),
max_packet_size: UDP_PACKET_MAX_LENGTH,
auto_split_batch: false,
};

let endpoint = match (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) {
(Ok(host), Ok(port)) => Some(format!("{}:{}", host.trim(), port.trim())),
(Ok(host), _) => Some(format!("{}:{DEFAULT_AGENT_ENDPOINT_PORT}", host.trim())),
(_, Ok(port)) => Some(format!("{DEFAULT_AGENT_ENDPOINT_HOST}:{}", port.trim())),
(_, _) => None,
};

if let Some(endpoint) = endpoint {
pipeline = pipeline.with_endpoint(endpoint);
}

pipeline
}
}

Expand Down Expand Up @@ -147,16 +135,9 @@ impl AgentPipeline {
/// Any valid socket address can be used.
///
/// Default to be `127.0.0.1:6831`.
pub fn with_endpoint<T: net::ToSocketAddrs>(self, agent_endpoint: T) -> Self {
pub fn with_endpoint<T: Into<String>>(self, agent_endpoint: T) -> Self {
AgentPipeline {
agent_endpoint: agent_endpoint
.to_socket_addrs()
.map(|addrs| addrs.collect())
.map_err(|io_err| crate::Error::ConfigError {
pipeline_name: "agent",
config_name: "endpoint",
reason: io_err.to_string(),
}),
agent_endpoint: Some(agent_endpoint.into()),
..self
}
}
Expand Down Expand Up @@ -391,10 +372,10 @@ impl AgentPipeline {
R: JaegerTraceRuntime,
{
let agent = AgentAsyncClientUdp::new(
self.agent_endpoint?.as_slice(),
self.max_packet_size,
runtime,
self.auto_split_batch,
self.resolve_endpoint()?,
)
.map_err::<Error, _>(Into::into)?;
Ok(Arc::new(AsyncUploader::Agent(
Expand All @@ -404,13 +385,38 @@ impl AgentPipeline {

fn build_sync_agent_uploader(self) -> Result<Arc<dyn Uploader>, TraceError> {
let agent = AgentSyncClientUdp::new(
self.agent_endpoint?.as_slice(),
self.max_packet_size,
self.auto_split_batch,
self.resolve_endpoint()?,
)
.map_err::<Error, _>(Into::into)?;
Ok(Arc::new(SyncUploader::Agent(std::sync::Mutex::new(agent))))
}

// resolve the agent endpoint from the environment variables or the builder
// if only one of the environment variables is set, the other one will be set to the default value
// if no environment variable is set, the builder value will be used.
fn resolve_endpoint(self) -> Result<Vec<net::SocketAddr>, TraceError> {
let endpoint_str = match (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) {
(Ok(host), Ok(port)) => format!("{}:{}", host.trim(), port.trim()),
(Ok(host), _) => format!("{}:{DEFAULT_AGENT_ENDPOINT_PORT}", host.trim()),
(_, Ok(port)) => format!("{DEFAULT_AGENT_ENDPOINT_HOST}:{}", port.trim()),
(_, _) => self.agent_endpoint.unwrap_or(format!(
"{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
)),
};
endpoint_str
.to_socket_addrs()
.map(|addrs| addrs.collect())
.map_err(|io_err| {
Error::ConfigError {
pipeline_name: "agent",
config_name: "endpoint",
reason: io_err.to_string(),
}
.into()
})
}
}

#[cfg(test)]
Expand All @@ -429,9 +435,12 @@ mod tests {
("127.0.0.1:1001", true),
];
for (socket_str, is_ok) in test_cases.into_iter() {
let pipeline = AgentPipeline::default().with_endpoint(socket_str);
let resolved_endpoint = AgentPipeline::default()
.with_endpoint(socket_str)
.resolve_endpoint();
assert_eq!(
pipeline.agent_endpoint.is_ok(),
resolved_endpoint.is_ok(),
// if is_ok is true, use socket_str, otherwise use the default endpoint
is_ok,
"endpoint string {}",
socket_str
Expand Down
Loading

0 comments on commit 4dd54a2

Please sign in to comment.