Skip to content

Commit

Permalink
Init telemetry once in spin.rs by performing a crime against humanity…
Browse files Browse the repository at this point in the history
… and parsing runtime config in spin.rs

Signed-off-by: Caleb Schoepp <caleb.schoepp@fermyon.com>
  • Loading branch information
calebschoepp committed Mar 7, 2024
1 parent 6e106fd commit 3146132
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 88 deletions.
83 changes: 21 additions & 62 deletions crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,15 @@
use std::io::IsTerminal;

use opentelemetry::global;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Tracer};
use std::sync::OnceLock;
use tracing::level_filters::LevelFilter;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{
filter::Filtered,
fmt,
prelude::*,
registry,
reload::{self, Handle},
EnvFilter, Registry,
};
use tracing_subscriber::{fmt, prelude::*, registry, EnvFilter};
use url::Url;

mod traces;

pub use traces::accept_trace;

type TelemetryLayer = Filtered<OpenTelemetryLayer<Registry, Tracer>, LevelFilter, Registry>;

static GLOBAL_TELEMETRY_LAYER_RELOAD_HANDLE: OnceLock<Handle<Option<TelemetryLayer>, Registry>> =
OnceLock::new();
static GLOBAL_SERVICE_DESCRIPTION: OnceLock<ServiceDescription> = OnceLock::new();

/// Description of the service for which telemetry is being collected
#[derive(Clone)]
pub struct ServiceDescription {
Expand All @@ -43,13 +27,16 @@ impl ServiceDescription {
}

/// TODO
///
/// Sets the open telemetry pipeline as the default tracing subscriber
pub fn init_globally(service: ServiceDescription) -> anyhow::Result<ShutdownGuard> {
// Globally set the service description
let result = GLOBAL_SERVICE_DESCRIPTION.set(service);
if result.is_err() {
return Err(anyhow::anyhow!("failed to set global service description",));
pub fn init(
service: ServiceDescription,
endpoint: Option<Url>,
traces_enabled: bool,
_metrics_enabled: bool,
) -> anyhow::Result<ShutdownGuard> {
if traces_enabled && endpoint.is_none() {
return Err(anyhow::anyhow!(
"Traces are enabled but no endpoint is provided"
));
}

// TODO: comment
Expand All @@ -63,52 +50,24 @@ pub fn init_globally(service: ServiceDescription) -> anyhow::Result<ShutdownGuar
);

// TODO: comment
let (telemetry_layer, reload_handle) = reload::Layer::new(None);

// TODO: comment
let result = GLOBAL_TELEMETRY_LAYER_RELOAD_HANDLE.set(reload_handle);
if result.is_err() {
return Err(anyhow::anyhow!(
"failed to set global telemetry layer reload handle",
));
}
let otlp_layer = if traces_enabled && endpoint.is_some() {
Some(traces::otel_tracing_layer(
service,
endpoint.unwrap().to_string(),
)?)
} else {
None
};

// TODO: comment
registry().with(telemetry_layer).with(fmt_layer).init();
registry().with(otlp_layer).with(fmt_layer).init();

// TODO: comment
global::set_text_map_propagator(TraceContextPropagator::new());

Ok(ShutdownGuard(None))
}

/// TODO
pub fn reload_globally(endpoint: Url, traces: bool, metrics: bool) {
if traces {
if let Err(error) = reload_telemetry_layer(endpoint) {
tracing::error!("failed to load otlp telemetry: {}", error);
}
}
if metrics {
// TODO: Setup metrics
}
}

/// TODO
fn reload_telemetry_layer(endpoint: Url) -> anyhow::Result<()> {
let service = GLOBAL_SERVICE_DESCRIPTION.get().unwrap();
let otel_tracing_layer = Some(traces::otel_tracing_layer(
service.clone(),
endpoint.to_string(),
)?);

GLOBAL_TELEMETRY_LAYER_RELOAD_HANDLE
.get()
.unwrap()
.reload(otel_tracing_layer)
.map_err(|e| anyhow::anyhow!(e))
}

/// An RAII implementation for connection to open telemetry services.
///
/// Shutdown of the open telemetry services will happen on `Drop`.
Expand Down
10 changes: 0 additions & 10 deletions crates/telemetry/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,10 @@ use super::ServiceDescription;

/// Associate the current span with the incoming requests trace context.
pub fn accept_trace<T>(req: &Request<T>) {
tracing::info!("headers map {:?}", &HeaderExtractor(req.headers()).keys());
let parent_context = global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(req.headers()))
});
tracing::info!("parent context {:?}", parent_context);
tracing::info!(
"current::span context {:?}",
tracing::Span::current().context()
);
tracing::Span::current().set_parent(parent_context);
tracing::info!(
"current::span metadata {:?}",
tracing::Span::current().metadata()
);
}

pub(crate) fn otel_tracing_layer(
Expand Down
2 changes: 1 addition & 1 deletion crates/trigger-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tokio::{
net::TcpListener,
task,
};
use tracing::{instrument, log, Instrument};
use tracing::{instrument, log};
use wasmtime_wasi_http::body::HyperIncomingBody as Body;

use crate::{handler::HttpHandlerExecutor, wagi::WagiHttpExecutor};
Expand Down
12 changes: 0 additions & 12 deletions crates/trigger/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use spin_common::{arg_parser::parse_kv, sloth};
use crate::network::Network;
use crate::runtime_config::llm::LLmOptions;
use crate::runtime_config::sqlite::SqlitePersistenceMessageHook;
use crate::runtime_config::telemetry::TelemetryOpts;
use crate::stdio::StdioLoggingTriggerHooks;
use crate::{
loader::TriggerLoader,
Expand Down Expand Up @@ -162,17 +161,6 @@ where
}

let runtime_config = self.build_runtime_config()?;
if let Some(telemetry_opts) = runtime_config.telemetry() {
match telemetry_opts {
TelemetryOpts::Otlp(otlp_opts) => {
spin_telemetry::reload_globally(
otlp_opts.endpoint.clone(),
otlp_opts.traces,
otlp_opts.metrics,
);
}
}
}

// Required env vars
let working_dir = std::env::var(SPIN_WORKING_DIR).context(SPIN_WORKING_DIR)?;
Expand Down
1 change: 1 addition & 0 deletions crates/trigger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use spin_core::{
OutboundWasiHttpHandler, Store, StoreBuilder, WasiVersion,
};

pub use crate::runtime_config::telemetry::TelemetryOpts;
pub use crate::runtime_config::RuntimeConfig;

pub enum EitherInstancePre<T> {
Expand Down
69 changes: 66 additions & 3 deletions src/bin/spin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ use spin_cli::commands::{
watch::WatchCommand,
};
use spin_cli::{build_info::*, subprocess::ExitStatusError};
use spin_telemetry::ServiceDescription;
use spin_telemetry::{ServiceDescription, ShutdownGuard};
use spin_trigger::cli::help::HelpArgsOnlyTrigger;
use spin_trigger::cli::TriggerExecutorCommand;
use spin_trigger::RuntimeConfig;
use spin_trigger_http::HttpTrigger;
use spin_trigger_redis::RedisTrigger;
use std::path::Path;

#[tokio::main]
async fn main() {
Expand All @@ -42,8 +44,7 @@ async fn main() {
}

async fn _main() -> anyhow::Result<()> {
let _telemetry_guard =
spin_telemetry::init_globally(ServiceDescription::new("spin", VERSION.to_string()))?;
let _telemetry_guard = init_telemetry()?;

let plugin_help_entries = plugin_help_entries();

Expand Down Expand Up @@ -77,6 +78,68 @@ async fn _main() -> anyhow::Result<()> {
SpinApp::from_arg_matches(&matches)?.run(cmd).await
}

/// init_telemetry uses spin_telemetry to initialize the tracing library for the purposes of
/// logging, tracing, and metrics. It returns a ShutdownGuard that should be held onto for the
/// duration of the program to ensure that the telemetry is properly shut down.
///
/// Logs are always emitted to STDERR. Based off configuration in the runtime config, traces and
/// metrics may be optionally emitted to a remote service.
///
/// This function is a crime against humanity because it directly parses the runtime config here
/// instead of down in spin_trigger where it should be done. This turns out to be necessary because
/// a tracing subscriber can only be set once globally in a binary. We need to set the subscribe
/// immediately so that we can start logging and tracing right away. Thus we're forced to load
/// runtime config this early in the program.
///
/// There are technically ways to work around the tracing subscriber being set once globally which
/// would subsequently let us avoid this crime against humanity:
/// 1) You can use a scope default subscriber. This works but requires us to put
/// .with_current_subscriber() on every async call in the trigger crates which is a really bad
/// experience.
/// 2) You can add a reload handle to a layer in a subscriber to reload it with the correct runtime
/// configuration later. This works but it ends up having a bug when used with
/// .set_parent_context() which is necessary to support trace propagation. Therefore this isn't
/// an option until this issue is fixed upstream.
fn init_telemetry() -> anyhow::Result<ShutdownGuard> {
let mut runtime_config = RuntimeConfig::new(None); // TODO: Do I need to pass a path here

let args = std::env::args().collect::<Vec<_>>();
let mut runtime_config_file_path = None;
for i in 1..args.len() {
if args[i] == "--runtime-config-file" {
if i + 1 < args.len() {
runtime_config_file_path = Some(args[i + 1].clone());
break;
} else {
// TODO: Panic or something here
}
}
}
if runtime_config_file_path.is_some() {
runtime_config.merge_config_file(Path::new(runtime_config_file_path.unwrap().as_str()))?;
}

let mut endpoint = None;
let mut traces_enabled = false;
let mut metrics_enabled = false;
if let Some(telemetry_opts) = runtime_config.telemetry() {
match telemetry_opts {
spin_trigger::TelemetryOpts::Otlp(otlp_opts) => {
traces_enabled = otlp_opts.traces;
metrics_enabled = otlp_opts.metrics;
endpoint = Some(otlp_opts.endpoint.clone());
}
}
}

spin_telemetry::init(
ServiceDescription::new("spin", VERSION.to_string()),
endpoint.clone(),
traces_enabled,
metrics_enabled,
)
}

fn print_error_chain(err: anyhow::Error) {
if let Some(cause) = err.source() {
let is_multiple = cause.source().is_some();
Expand Down

0 comments on commit 3146132

Please sign in to comment.