From c9f76e571e5570d2c4194feee03bb260b24c378f Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Tue, 30 Jul 2024 11:49:10 +0400 Subject: [PATCH] feat(vlog): Implement otlp guard with force flush on drop (#2536) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Improves the `ObservabilityGuard` so that it flushes both sentry & otlp events on drop. ## Why ❔ Without it, some events (at least for otlp) may be missed if the application exits right after events were produced. ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- core/lib/vlog/src/lib.rs | 63 ++++++++++++++++++++++++-- core/lib/vlog/src/opentelemetry/mod.rs | 8 ++-- 2 files changed, 64 insertions(+), 7 deletions(-) diff --git a/core/lib/vlog/src/lib.rs b/core/lib/vlog/src/lib.rs index 5633f20f5882..aebd413b749d 100644 --- a/core/lib/vlog/src/lib.rs +++ b/core/lib/vlog/src/lib.rs @@ -1,6 +1,8 @@ //! This crate contains the observability subsystem. //! It is responsible for providing a centralized interface for consistent observability configuration. +use std::time::Duration; + use ::sentry::ClientInitGuard; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -23,7 +25,53 @@ pub struct ObservabilityBuilder { /// Guard for the observability subsystem. /// Releases configured integrations upon being dropped. pub struct ObservabilityGuard { - _sentry_guard: Option, + /// Opentelemetry provider. Can be used to force flush spans. + otlp_provider: Option, + sentry_guard: Option, +} + +impl ObservabilityGuard { + /// Forces flushing of pending events. + /// This method is blocking. + pub fn force_flush(&self) { + // We don't want to wait for too long. + const FLUSH_TIMEOUT: Duration = Duration::from_secs(1); + + if let Some(sentry_guard) = &self.sentry_guard { + sentry_guard.flush(Some(FLUSH_TIMEOUT)); + } + + if let Some(provider) = &self.otlp_provider { + for result in provider.force_flush() { + if let Err(err) = result { + tracing::warn!("Flushing the spans failed: {err:?}"); + } + } + } + } + + /// Shutdown the observability subsystem. + /// It will stop the background tasks like collec + pub fn shutdown(&self) { + // We don't want to wait for too long. + const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1); + + if let Some(sentry_guard) = &self.sentry_guard { + sentry_guard.close(Some(SHUTDOWN_TIMEOUT)); + } + if let Some(provider) = &self.otlp_provider { + if let Err(err) = provider.shutdown() { + tracing::warn!("Shutting down the provider failed: {err:?}"); + } + } + } +} + +impl Drop for ObservabilityGuard { + fn drop(&mut self) { + self.force_flush(); + self.shutdown(); + } } impl std::fmt::Debug for ObservabilityGuard { @@ -62,16 +110,23 @@ impl ObservabilityBuilder { // Later we may want to enforce each layer to have its own filter. let global_filter = logs.build_filter(); + let logs_layer = logs.into_layer(); + let (otlp_provider, otlp_layer) = self + .opentelemetry_layer + .map(|layer| layer.into_layer()) + .unzip(); + tracing_subscriber::registry() .with(global_filter) - .with(logs.into_layer()) - .with(self.opentelemetry_layer.map(|layer| layer.into_layer())) + .with(logs_layer) + .with(otlp_layer) .init(); let sentry_guard = self.sentry.map(|sentry| sentry.install()); ObservabilityGuard { - _sentry_guard: sentry_guard, + otlp_provider, + sentry_guard, } } } diff --git a/core/lib/vlog/src/opentelemetry/mod.rs b/core/lib/vlog/src/opentelemetry/mod.rs index 64049df8ce9b..1085f6c6db06 100644 --- a/core/lib/vlog/src/opentelemetry/mod.rs +++ b/core/lib/vlog/src/opentelemetry/mod.rs @@ -108,7 +108,7 @@ impl OpenTelemetry { self } - pub(super) fn into_layer(self) -> impl Layer + pub(super) fn into_layer(self) -> (opentelemetry_sdk::trace::TracerProvider, impl Layer) where S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, { @@ -151,9 +151,11 @@ impl OpenTelemetry { let tracer = provider.tracer_builder(service_name).build(); opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); - tracing_opentelemetry::layer() + let layer = tracing_opentelemetry::layer() .with_tracer(tracer) - .with_filter(filter) + .with_filter(filter); + + (provider, layer) } }