diff --git a/.changesets/fix_geal_context_mutex.md b/.changesets/fix_geal_context_mutex.md new file mode 100644 index 0000000000..be40b646a7 --- /dev/null +++ b/.changesets/fix_geal_context_mutex.md @@ -0,0 +1,5 @@ +### use a parking-lot mutex in Context ([Issue #2751](https://github.com/apollographql/router/issues/2751)) + +The context requires synchronized access to the busy timer, and precedently we used a futures aware mutex for that, but those are susceptible to contention. This replaces that mutex with a parking-lot synchronous mutex that is much faster. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2885 diff --git a/.changesets/fix_geal_filter_spans.md b/.changesets/fix_geal_filter_spans.md new file mode 100644 index 0000000000..b664484860 --- /dev/null +++ b/.changesets/fix_geal_filter_spans.md @@ -0,0 +1,5 @@ +### Filter spans before sending them to the opentelemetry layer + +the sampling configuration in the opentelemetry layer only applies when the span closes, so in the meantime a lot of data is created just to be dropped. This adds a filter than can sample spans before the opentelemetry layer. The sampling decision is done at the root span, and then derived from the parent span in the rest of the trace. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2894 \ No newline at end of file diff --git a/.changesets/fix_geal_null_field_formatter.md b/.changesets/fix_geal_null_field_formatter.md new file mode 100644 index 0000000000..33c36c79dc --- /dev/null +++ b/.changesets/fix_geal_null_field_formatter.md @@ -0,0 +1,5 @@ +### prevent span attributes from being formatted to write logs + +we do not show span attributes in our logs, but the log formatter still spends some time formatting them to a string, even when there will be no logs written for the trace. This adds the `NullFieldFormatter` that entirely avoids formatting the attributes + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2890 \ No newline at end of file diff --git a/.changesets/maint_garypen_jemalloc.md b/.changesets/maint_garypen_jemalloc.md new file mode 100644 index 0000000000..d166fb95ea --- /dev/null +++ b/.changesets/maint_garypen_jemalloc.md @@ -0,0 +1,9 @@ +### use jemalloc on linux + +Detailed memory investigations of the router in use have revealed that there is a significant amount of memory fragmentation when using the default allocator, glibc, on linux. Performance testing and flamegraph analysis suggests that jemalloc on linux can yield significant performance improvements. In our tests, this figure shows performance to be about 35% faster than the default allocator. The improvement in performance being due to less time spent managing memory fragmentation. + +Not everyone will see a 35% performance improvement in this release of the router. Depending on your usage pattern, you may see more or less than this. If you see a regression, please file an issue with details. + +We have no reason to believe that there are allocation problems on other platforms, so this change is confined to linux. + +By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/2882 diff --git a/.changesets/maint_geal_path_manipulation.md b/.changesets/maint_geal_path_manipulation.md new file mode 100644 index 0000000000..1c25d82355 --- /dev/null +++ b/.changesets/maint_geal_path_manipulation.md @@ -0,0 +1,5 @@ +### lighter path manipulation in response formatting + +Response formatting generates a lot of temporary allocations to create response paths that end up unused. By making a reference based type to hold these paths, we can prevent those allocations and improve performance. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2854 \ No newline at end of file diff --git a/.changesets/maint_geal_private_context.md b/.changesets/maint_geal_private_context.md new file mode 100644 index 0000000000..467b376182 --- /dev/null +++ b/.changesets/maint_geal_private_context.md @@ -0,0 +1,7 @@ +### Add a private part to the Context structure ([Issue #2800](https://github.com/apollographql/router/issues/2800)) + +There's a cost in using the `Context` structure throughout a request's lifecycle, due to the JSON serialization and deserialization, so it should be reserved from inter plugin communication between rhai, coprocessor and Rust. But for internal router usage, we can have a more efficient structure that avoids serialization costs, and does not expose data that should not be modified by plugins. + +That structure is based on a map indexed by type id, which means that if some part of the code can see that type, then it can access it in the map. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2802 \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 118655c528..a324a2c72d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -345,6 +345,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry-zipkin", "p256 0.12.0", + "parking_lot 0.12.1", "paste", "pin-project-lite", "prometheus", @@ -378,6 +379,7 @@ dependencies = [ "test-log", "test-span", "thiserror", + "tikv-jemallocator", "tokio", "tokio-rustls", "tokio-stream", @@ -5883,6 +5885,26 @@ dependencies = [ "tower", ] +[[package]] +name = "tikv-jemalloc-sys" +version = "0.5.3+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a678df20055b43e57ef8cddde41cdfda9a3c1a060b67f4c5836dfb1d78543ba8" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20612db8a13a6c06d57ec83953694185a367e16945f66565e8028d2c0bd76979" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.20" diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 403af5096b..4aba8561f1 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -203,6 +203,7 @@ yaml-rust = "0.4.5" wsl = "0.1.0" tokio-rustls = "0.23.4" http-serde = "1.1.2" +parking_lot = "0.12.1" memchr = "2.5.0" brotli = "3.3.4" zstd = "0.12.3" @@ -214,6 +215,9 @@ uname = "0.1.1" [target.'cfg(unix)'.dependencies] uname = "0.1.1" +[target.'cfg(target_os = "linux")'.dependencies] +tikv-jemallocator = "0.5" + [dev-dependencies] ecdsa = { version = "0.15.1", features = ["signing", "pem", "pkcs8"] } fred = "6.0.0-beta.2" @@ -260,3 +264,4 @@ tonic-build = "0.8.4" [[test]] name = "integration_tests" path = "tests/integration_tests.rs" + diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index 398f686a4a..1dd5ffa652 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -436,7 +436,7 @@ async fn handle_graphql( .cloned(); let res = service.oneshot(request).await; - let dur = context.busy_time().await; + let dur = context.busy_time(); let processing_seconds = dur.as_secs_f64(); tracing::info!(histogram.apollo_router_processing_time = processing_seconds,); diff --git a/apollo-router/src/context/extensions.rs b/apollo-router/src/context/extensions.rs new file mode 100644 index 0000000000..c868d4f73f --- /dev/null +++ b/apollo-router/src/context/extensions.rs @@ -0,0 +1,153 @@ +// NOTE: this module is taken from tokio's tracing span's extensions +// which is taken from https://github.com/hyperium/http/blob/master/src/extensions.rs + +use std::any::Any; +use std::any::TypeId; +use std::collections::HashMap; +use std::fmt; +use std::hash::BuildHasherDefault; +use std::hash::Hasher; + +type AnyMap = HashMap, BuildHasherDefault>; + +// With TypeIds as keys, there's no need to hash them. They are already hashes +// themselves, coming from the compiler. The IdHasher just holds the u64 of +// the TypeId, and then returns it, instead of doing any bit fiddling. +#[derive(Default)] +struct IdHasher(u64); + +impl Hasher for IdHasher { + fn write(&mut self, _: &[u8]) { + unreachable!("TypeId calls write_u64"); + } + + #[inline] + fn write_u64(&mut self, id: u64) { + self.0 = id; + } + + #[inline] + fn finish(&self) -> u64 { + self.0 + } +} + +/// A type map of protocol extensions. +/// +/// `Extensions` can be used by `Request` and `Response` to store +/// extra data derived from the underlying protocol. +#[derive(Default)] +pub(crate) struct Extensions { + // If extensions are never used, no need to carry around an empty HashMap. + // That's 3 words. Instead, this is only 1 word. + map: Option>, +} + +#[allow(unused)] +impl Extensions { + /// Create an empty `Extensions`. + #[inline] + pub(crate) fn new() -> Extensions { + Extensions { map: None } + } + + /// Insert a type into this `Extensions`. + /// + /// If a extension of this type already existed, it will + /// be returned. + pub(crate) fn insert(&mut self, val: T) -> Option { + self.map + .get_or_insert_with(Box::default) + .insert(TypeId::of::(), Box::new(val)) + .and_then(|boxed| { + (boxed as Box) + .downcast() + .ok() + .map(|boxed| *boxed) + }) + } + + /// Get a reference to a type previously inserted on this `Extensions`. + pub(crate) fn get(&self) -> Option<&T> { + self.map + .as_ref() + .and_then(|map| map.get(&TypeId::of::())) + .and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref()) + } + + /// Get a mutable reference to a type previously inserted on this `Extensions`. + pub(crate) fn get_mut(&mut self) -> Option<&mut T> { + self.map + .as_mut() + .and_then(|map| map.get_mut(&TypeId::of::())) + .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut()) + } + + pub(crate) fn contains_key(&self) -> bool { + self.map + .as_ref() + .map(|map| map.contains_key(&TypeId::of::())) + .unwrap_or_default() + } + + /// Remove a type from this `Extensions`. + /// + /// If a extension of this type existed, it will be returned. + pub(crate) fn remove(&mut self) -> Option { + self.map + .as_mut() + .and_then(|map| map.remove(&TypeId::of::())) + .and_then(|boxed| { + (boxed as Box) + .downcast() + .ok() + .map(|boxed| *boxed) + }) + } + + /// Clear the `Extensions` of all inserted extensions. + #[inline] + pub(crate) fn clear(&mut self) { + if let Some(ref mut map) = self.map { + map.clear(); + } + } + + /// Check whether the extension set is empty or not. + #[inline] + pub(crate) fn is_empty(&self) -> bool { + self.map.as_ref().map_or(true, |map| map.is_empty()) + } + + /// Get the numer of extensions available. + #[inline] + pub(crate) fn len(&self) -> usize { + self.map.as_ref().map_or(0, |map| map.len()) + } +} + +impl fmt::Debug for Extensions { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Extensions").finish() + } +} + +#[test] +fn test_extensions() { + #[derive(Debug, PartialEq)] + struct MyType(i32); + + let mut extensions = Extensions::new(); + + extensions.insert(5i32); + extensions.insert(MyType(10)); + + assert_eq!(extensions.get(), Some(&5i32)); + assert_eq!(extensions.get_mut(), Some(&mut 5i32)); + + assert_eq!(extensions.remove::(), Some(5i32)); + assert!(extensions.get::().is_none()); + + assert_eq!(extensions.get::(), None); + assert_eq!(extensions.get(), Some(&MyType(10))); +} diff --git a/apollo-router/src/context.rs b/apollo-router/src/context/mod.rs similarity index 93% rename from apollo-router/src/context.rs rename to apollo-router/src/context/mod.rs index 00d5f7bf82..a05e913149 100644 --- a/apollo-router/src/context.rs +++ b/apollo-router/src/context/mod.rs @@ -10,13 +10,17 @@ use std::time::Instant; use dashmap::mapref::multiple::RefMulti; use dashmap::mapref::multiple::RefMutMulti; use dashmap::DashMap; -use futures::lock::Mutex; +use derivative::Derivative; +use parking_lot::Mutex; use serde::Deserialize; use serde::Serialize; use tower::BoxError; +use self::extensions::Extensions; use crate::json_ext::Value; +pub(crate) mod extensions; + /// Holds [`Context`] entries. pub(crate) type Entries = Arc>; @@ -31,17 +35,22 @@ pub(crate) type Entries = Arc>; /// [`crate::services::SubgraphResponse`] processing. At such times, /// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`] /// functions to minimise the possibility of mis-sequenced updates. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Deserialize, Serialize, Derivative)] +#[derivative(Debug)] pub struct Context { // Allows adding custom entries to the context. entries: Entries, + #[serde(skip, default)] + pub(crate) private_entries: Arc>, + /// Creation time #[serde(skip)] #[serde(default = "Instant::now")] pub(crate) created_at: Instant, #[serde(skip)] + #[derivative(Debug = "ignore")] busy_timer: Arc>, } @@ -50,6 +59,7 @@ impl Context { pub fn new() -> Self { Context { entries: Default::default(), + private_entries: Arc::new(parking_lot::Mutex::new(Extensions::default())), created_at: Instant::now(), busy_timer: Arc::new(Mutex::new(BusyTimer::new())), } @@ -195,18 +205,18 @@ impl Context { } /// Notify the busy timer that we're waiting on a network request - pub(crate) async fn enter_active_request(&self) { - self.busy_timer.lock().await.increment_active_requests() + pub(crate) fn enter_active_request(&self) { + self.busy_timer.lock().increment_active_requests() } /// Notify the busy timer that we stopped waiting on a network request - pub(crate) async fn leave_active_request(&self) { - self.busy_timer.lock().await.decrement_active_requests() + pub(crate) fn leave_active_request(&self) { + self.busy_timer.lock().decrement_active_requests() } /// How much time was spent working on the request - pub(crate) async fn busy_time(&self) -> Duration { - self.busy_timer.lock().await.current() + pub(crate) fn busy_time(&self) -> Duration { + self.busy_timer.lock().current() } } diff --git a/apollo-router/src/json_ext.rs b/apollo-router/src/json_ext.rs index 66135d2a30..e05b010550 100644 --- a/apollo-router/src/json_ext.rs +++ b/apollo-router/src/json_ext.rs @@ -577,6 +577,15 @@ pub enum PathElement { Key(String), } +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ResponsePathElement<'a> { + /// An index path element. + Index(usize), + + /// A key path element. + Key(&'a str), +} + fn deserialize_flatten<'de, D>(deserializer: D) -> Result<(), D::Error> where D: serde::Deserializer<'de>, @@ -676,6 +685,17 @@ impl Path { ) } + pub fn from_response_slice(s: &[ResponsePathElement]) -> Self { + Self( + s.iter() + .map(|x| match x { + ResponsePathElement::Index(index) => PathElement::Index(*index), + ResponsePathElement::Key(s) => PathElement::Key(s.to_string()), + }) + .collect(), + ) + } + pub fn iter(&self) -> impl Iterator { self.0.iter() } diff --git a/apollo-router/src/main.rs b/apollo-router/src/main.rs index 9e5ad32e50..413189a753 100644 --- a/apollo-router/src/main.rs +++ b/apollo-router/src/main.rs @@ -1,4 +1,14 @@ //! Main entry point for CLI command to start server. +// Note: We want to use jemalloc on linux, but we don't enable it if dhat-heap is in use because we +// can only have one global allocator +#[cfg(target_os = "linux")] +#[cfg(not(feature = "dhat-heap"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(target_os = "linux")] +#[cfg(not(feature = "dhat-heap"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; fn main() { match apollo_router::main() { diff --git a/apollo-router/src/plugins/coprocessor.rs b/apollo-router/src/plugins/coprocessor.rs index 060745f051..2227136584 100644 --- a/apollo-router/src/plugins/coprocessor.rs +++ b/apollo-router/src/plugins/coprocessor.rs @@ -516,9 +516,9 @@ where }; tracing::debug!(?payload, "externalized output"); - request.context.enter_active_request().await; + request.context.enter_active_request(); let co_processor_result = payload.call(http_client, &coprocessor_url).await; - request.context.leave_active_request().await; + request.context.leave_active_request(); tracing::debug!(?co_processor_result, "co-processor returned"); let co_processor_output = co_processor_result?; @@ -648,9 +648,9 @@ where // Second, call our co-processor and get a reply. tracing::debug!(?payload, "externalized output"); - response.context.enter_active_request().await; + response.context.enter_active_request(); let co_processor_result = payload.call(http_client, &coprocessor_url).await; - response.context.leave_active_request().await; + response.context.leave_active_request(); tracing::debug!(?co_processor_result, "co-processor returned"); let co_processor_output = co_processor_result?; @@ -738,9 +738,9 @@ where }; tracing::debug!(?payload, "externalized output"); - request.context.enter_active_request().await; + request.context.enter_active_request(); let co_processor_result = payload.call(http_client, &coprocessor_url).await; - request.context.leave_active_request().await; + request.context.leave_active_request(); tracing::debug!(?co_processor_result, "co-processor returned"); let co_processor_output = co_processor_result?; validate_coprocessor_output(&co_processor_output, PipelineStep::SubgraphRequest)?; @@ -876,9 +876,9 @@ where }; tracing::debug!(?payload, "externalized output"); - response.context.enter_active_request().await; + response.context.enter_active_request(); let co_processor_result = payload.call(http_client, &coprocessor_url).await; - response.context.leave_active_request().await; + response.context.leave_active_request(); tracing::debug!(?co_processor_result, "co-processor returned"); let co_processor_output = co_processor_result?; diff --git a/apollo-router/src/plugins/telemetry/config.rs b/apollo-router/src/plugins/telemetry/config.rs index 17b0a0405a..21bb70387f 100644 --- a/apollo-router/src/plugins/telemetry/config.rs +++ b/apollo-router/src/plugins/telemetry/config.rs @@ -611,6 +611,15 @@ impl Conf { (_, SamplerOption::TraceIdRatioBased(ratio)) if ratio == 0.0 => 0.0, (SamplerOption::TraceIdRatioBased(ratio), _) if ratio == 0.0 => 0.0, (_, SamplerOption::Always(Sampler::AlwaysOn)) => 1.0, + // the `field_ratio` should be a ratio of the entire set of requests. But FTV1 would only be reported + // if a trace was generated with the Apollo exporter, which has its own sampling `global_ratio`. + // in telemetry::request_ftv1, we activate FTV1 if the current trace is sampled and depending on + // the ratio returned by this function. + // This means that: + // - field_ratio cannot be larger than global_ratio (see above, we return an error in that case) + // - we have to divide field_ratio by global_ratio + // Example: we want to measure FTV1 on 30% of total requests, but we the Apollo tracer samples at 50%. + // If we measure FTV1 on 60% (0.3 / 0.5) of these sampled requests, that amounts to 30% of the total traffic ( SamplerOption::TraceIdRatioBased(global_ratio), SamplerOption::TraceIdRatioBased(field_ratio), diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 209ae3b0e8..5f86af0a46 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -3,6 +3,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::fmt; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread; use std::time::Duration; @@ -47,7 +48,6 @@ use tokio::runtime::Handle; use tower::BoxError; use tower::ServiceBuilder; use tower::ServiceExt; -use tracing_opentelemetry::OpenTelemetryLayer; use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::fmt::format::JsonFields; use tracing_subscriber::Layer; @@ -59,14 +59,17 @@ use self::apollo::SingleReport; use self::apollo_exporter::proto; use self::apollo_exporter::Sender; use self::config::Conf; +use self::config::Sampler; use self::formatters::text::TextFormatter; use self::metrics::apollo::studio::SingleTypeStat; use self::metrics::AttributesForwardConf; use self::metrics::MetricsAttributesConf; use self::reload::reload_fmt; use self::reload::reload_metrics; +use self::reload::LayeredTracer; +use self::reload::NullFieldFormatter; use self::reload::OPENTELEMETRY_TRACER_HANDLE; -use self::tracing::reload::ReloadTracer; +use self::reload::SPAN_SAMPLING_RATE; use crate::layers::ServiceBuilderExt; use crate::plugin::Plugin; use crate::plugin::PluginInit; @@ -93,7 +96,6 @@ use crate::plugins::telemetry::tracing::apollo_telemetry::decode_ftv1_trace; use crate::plugins::telemetry::tracing::apollo_telemetry::APOLLO_PRIVATE_OPERATION_SIGNATURE; use crate::plugins::telemetry::tracing::TracingConfigurator; use crate::query_planner::OperationKind; -use crate::query_planner::USAGE_REPORTING; use crate::register_plugin; use crate::router_factory::Endpoint; use crate::services::execution; @@ -126,9 +128,6 @@ pub(crate) const ROUTER_SPAN_NAME: &str = "router"; pub(crate) const EXECUTION_SPAN_NAME: &str = "execution"; const CLIENT_NAME: &str = "apollo_telemetry::client_name"; const CLIENT_VERSION: &str = "apollo_telemetry::client_version"; -const ATTRIBUTES: &str = "apollo_telemetry::metrics_attributes"; -const SUBGRAPH_ATTRIBUTES: &str = "apollo_telemetry::subgraph_metrics_attributes"; -const ENABLE_SUBGRAPH_FTV1: &str = "apollo_telemetry::enable_subgraph_ftv1"; const SUBGRAPH_FTV1: &str = "apollo_telemetry::subgraph_ftv1"; const OPERATION_KIND: &str = "apollo_telemetry::operation_kind"; pub(crate) const STUDIO_EXCLUDE: &str = "apollo_telemetry::studio::exclude"; @@ -281,16 +280,16 @@ impl Plugin for Telemetry { let expose_trace_id = config.tracing.as_ref().cloned().unwrap_or_default().response_trace_id; if let Ok(response) = &response { + let mut headers: HashMap> = HashMap::new(); if expose_trace_id.enabled { if let Some(header_name) = &expose_trace_id.header_name { - let mut headers: HashMap> = HashMap::new(); if let Some(value) = response.response.headers().get(header_name) { headers.insert(header_name.to_string(), vec![value.to_str().unwrap_or_default().to_string()]); - let response_headers = serde_json::to_string(&headers).unwrap_or_default(); - span.record("apollo_private.http.response_headers",&response_headers); } } } + let response_headers = serde_json::to_string(&headers).unwrap_or_default(); + span.record("apollo_private.http.response_headers",&response_headers); if response.response.status() >= StatusCode::BAD_REQUEST { span.record("otel.status_code", "Error"); @@ -320,8 +319,8 @@ impl Plugin for Telemetry { )) .map_response(move |mut resp: SupergraphResponse| { let config = config_map_res_first.clone(); - if let Ok(Some(usage_reporting)) = - resp.context.get::<_, UsageReporting>(USAGE_REPORTING) + if let Some(usage_reporting) = + resp.context.private_entries.lock().get::() { // Record the operation signature on the router span Span::current().record( @@ -564,15 +563,39 @@ impl Telemetry { config: &config::Conf, ) -> Result { let tracing_config = config.tracing.clone().unwrap_or_default(); - let trace_config = &tracing_config.trace_config.unwrap_or_default(); - let mut builder = - opentelemetry::sdk::trace::TracerProvider::builder().with_config(trace_config.into()); - - builder = setup_tracing(builder, &tracing_config.jaeger, trace_config)?; - builder = setup_tracing(builder, &tracing_config.zipkin, trace_config)?; - builder = setup_tracing(builder, &tracing_config.datadog, trace_config)?; - builder = setup_tracing(builder, &tracing_config.otlp, trace_config)?; - builder = setup_tracing(builder, &config.apollo, trace_config)?; + + let mut trace_config = tracing_config.trace_config.unwrap_or_default(); + + let sampling_rate = if tracing_config.jaeger.is_some() + || tracing_config.zipkin.is_some() + || tracing_config.datadog.is_some() + || tracing_config.otlp.is_some() + || config + .apollo + .as_ref() + .map(|c| c.apollo_key.is_some() && c.apollo_graph_ref.is_some()) + .unwrap_or(false) + { + match trace_config.sampler { + config::SamplerOption::TraceIdRatioBased(rate) => rate, + config::SamplerOption::Always(Sampler::AlwaysOn) => 1.0, + config::SamplerOption::Always(Sampler::AlwaysOff) => 0.0, + } + } else { + 0.0 + }; + + trace_config.sampler = config::SamplerOption::Always(Sampler::AlwaysOn); + SPAN_SAMPLING_RATE.store(f64::to_bits(sampling_rate), Ordering::Relaxed); + + let mut builder = opentelemetry::sdk::trace::TracerProvider::builder() + .with_config((&trace_config).into()); + + builder = setup_tracing(builder, &tracing_config.jaeger, &trace_config)?; + builder = setup_tracing(builder, &tracing_config.zipkin, &trace_config)?; + builder = setup_tracing(builder, &tracing_config.datadog, &trace_config)?; + builder = setup_tracing(builder, &tracing_config.otlp, &trace_config)?; + builder = setup_tracing(builder, &config.apollo, &trace_config)?; // For metrics builder = builder.with_simple_exporter(metrics::span_metrics_exporter::Exporter::default()); @@ -616,21 +639,7 @@ impl Telemetry { Ok(builder) } - #[allow(clippy::type_complexity)] - fn create_fmt_layer( - config: &config::Conf, - ) -> Box< - dyn Layer< - ::tracing_subscriber::layer::Layered< - OpenTelemetryLayer< - ::tracing_subscriber::Registry, - ReloadTracer<::opentelemetry::sdk::trace::Tracer>, - >, - ::tracing_subscriber::Registry, - >, - > + Send - + Sync, - > { + fn create_fmt_layer(config: &config::Conf) -> Box + Send + Sync> { let logging = &config.logging; let fmt = match logging.format { config::LoggingFormat::Pretty => tracing_subscriber::fmt::layer() @@ -641,6 +650,7 @@ impl Telemetry { .with_target(logging.display_target), filter_metric_events, )) + .fmt_fields(NullFieldFormatter) .boxed(), config::LoggingFormat::Json => tracing_subscriber::fmt::layer() .json() @@ -656,6 +666,7 @@ impl Telemetry { filter_metric_events, ) }) + .fmt_fields(NullFieldFormatter) .map_fmt_fields(|_f| JsonFields::default()) .boxed(), }; @@ -739,17 +750,21 @@ impl Telemetry { result: Result, request_duration: Duration, ) -> Result { - let mut metric_attrs = context - .get::<_, HashMap>(ATTRIBUTES) - .ok() - .flatten() - .map(|attrs| { - attrs - .into_iter() - .map(|(attr_name, attr_value)| KeyValue::new(attr_name, attr_value)) - .collect::>() - }) - .unwrap_or_default(); + let mut metric_attrs = { + context + .private_entries + .lock() + .get::() + .cloned() + } + .map(|attrs| { + attrs + .0 + .into_iter() + .map(|(attr_name, attr_value)| KeyValue::new(attr_name, attr_value)) + .collect::>() + }) + .unwrap_or_default(); let res = match result { Ok(response) => { metric_attrs.push(KeyValue::new( @@ -878,10 +893,13 @@ impl Telemetry { attributes.extend(router_attributes_conf.get_attributes_from_context(context)); } - let _ = context.insert(ATTRIBUTES, attributes); + let _ = context + .private_entries + .lock() + .insert(MetricsAttributes(attributes)); } if rand::thread_rng().gen_bool(field_level_instrumentation_ratio) { - context.insert_json_value(ENABLE_SUBGRAPH_FTV1, json!(true)); + context.private_entries.lock().insert(EnableSubgraphFtv1); } } @@ -967,8 +985,9 @@ impl Telemetry { } sub_request .context - .insert(SUBGRAPH_ATTRIBUTES, attributes) - .unwrap(); + .private_entries + .lock() + .insert(SubgraphMetricsAttributes(attributes)); //.unwrap(); } fn store_subgraph_response_attributes( @@ -979,17 +998,21 @@ impl Telemetry { now: Instant, result: &Result, ) { - let mut metric_attrs = context - .get::<_, HashMap>(SUBGRAPH_ATTRIBUTES) - .ok() - .flatten() - .map(|attrs| { - attrs - .into_iter() - .map(|(attr_name, attr_value)| KeyValue::new(attr_name, attr_value)) - .collect::>() - }) - .unwrap_or_default(); + let mut metric_attrs = { + context + .private_entries + .lock() + .get::() + .cloned() + } + .map(|attrs| { + attrs + .0 + .into_iter() + .map(|(attr_name, attr_value)| KeyValue::new(attr_name, attr_value)) + .collect::>() + }) + .unwrap_or_default(); metric_attrs.push(subgraph_attribute); // Fill attributes from context if let Some(subgraph_attributes_conf) = &*attribute_forward_config { @@ -1149,8 +1172,10 @@ impl Telemetry { operation_subtype: Option, ) { let metrics = if let Some(usage_reporting) = context - .get::<_, UsageReporting>(USAGE_REPORTING) - .unwrap_or_default() + .private_entries + .lock() + .get::() + .cloned() { let operation_count = operation_count(&usage_reporting.stats_report_key); let persisted_query_hit = context @@ -1480,8 +1505,12 @@ fn handle_error>(err: T) { register_plugin!("apollo", "telemetry", Telemetry); fn request_ftv1(mut req: SubgraphRequest) -> SubgraphRequest { - if req.context.contains_key(ENABLE_SUBGRAPH_FTV1) - && Span::current().context().span().span_context().is_sampled() + if req + .context + .private_entries + .lock() + .contains_key::() + && !Span::current().is_disabled() { req.subgraph_request.headers_mut().insert( "apollo-federation-include-trace", @@ -1493,7 +1522,12 @@ fn request_ftv1(mut req: SubgraphRequest) -> SubgraphRequest { fn store_ftv1(subgraph_name: &ByteString, resp: SubgraphResponse) -> SubgraphResponse { // Stash the FTV1 data - if resp.context.contains_key(ENABLE_SUBGRAPH_FTV1) { + if resp + .context + .private_entries + .lock() + .contains_key::() + { if let Some(serde_json_bytes::Value::String(ftv1)) = resp.response.body().extensions.get("ftv1") { @@ -1580,6 +1614,13 @@ impl TextMapPropagator for CustomTraceIdPropagator { } } +#[derive(Clone)] +struct MetricsAttributes(HashMap); + +#[derive(Clone)] +struct SubgraphMetricsAttributes(HashMap); + +struct EnableSubgraphFtv1; // // Please ensure that any tests added to the tests module use the tokio multi-threaded test executor. // diff --git a/apollo-router/src/plugins/telemetry/reload.rs b/apollo-router/src/plugins/telemetry/reload.rs index 8db539a173..631b91c28a 100644 --- a/apollo-router/src/plugins/telemetry/reload.rs +++ b/apollo-router/src/plugins/telemetry/reload.rs @@ -1,14 +1,24 @@ +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + use anyhow::anyhow; use anyhow::Result; use once_cell::sync::OnceCell; use opentelemetry::metrics::noop::NoopMeterProvider; use opentelemetry::sdk::trace::Tracer; use opentelemetry::trace::TracerProvider; +use rand::thread_rng; +use rand::Rng; use tower::BoxError; +use tracing::Subscriber; use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::filter::Filtered; +use tracing_subscriber::fmt::FormatFields; +use tracing_subscriber::layer::Filter; use tracing_subscriber::layer::Layer; use tracing_subscriber::layer::Layered; use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::reload::Handle; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; @@ -21,7 +31,10 @@ use crate::plugins::telemetry::metrics; use crate::plugins::telemetry::metrics::layer::MetricsLayer; use crate::plugins::telemetry::tracing::reload::ReloadTracer; -type LayeredTracer = Layered>, Registry>; +pub(super) type LayeredTracer = Layered< + Filtered>, SamplingFilter, Registry>, + Registry, +>; // These handles allow hot tracing of layers. They have complex type definitions because tracing has // generic types in the layer definition. @@ -29,6 +42,12 @@ pub(super) static OPENTELEMETRY_TRACER_HANDLE: OnceCell< ReloadTracer, > = OnceCell::new(); +static FMT_LAYER_HANDLE: OnceCell< + Handle + Send + Sync>, LayeredTracer>, +> = OnceCell::new(); + +pub(super) static SPAN_SAMPLING_RATE: AtomicU64 = AtomicU64::new(0); + #[allow(clippy::type_complexity)] static METRICS_LAYER_HANDLE: OnceCell< Handle< @@ -43,15 +62,13 @@ static METRICS_LAYER_HANDLE: OnceCell< >, > = OnceCell::new(); -static FMT_LAYER_HANDLE: OnceCell< - Handle + Send + Sync>, LayeredTracer>, -> = OnceCell::new(); - pub(crate) fn init_telemetry(log_level: &str) -> Result<()> { let hot_tracer = ReloadTracer::new( opentelemetry::sdk::trace::TracerProvider::default().versioned_tracer("noop", None, None), ); - let opentelemetry_layer = tracing_opentelemetry::layer().with_tracer(hot_tracer.clone()); + let opentelemetry_layer = tracing_opentelemetry::layer() + .with_tracer(hot_tracer.clone()) + .with_filter(SamplingFilter::new()); // We choose json or plain based on tty let fmt = if atty::is(atty::Stream::Stdout) { @@ -63,6 +80,7 @@ pub(crate) fn init_telemetry(log_level: &str) -> Result<()> { .with_target(false), filter_metric_events, )) + .fmt_fields(NullFieldFormatter) .boxed() } else { tracing_subscriber::fmt::Layer::new() @@ -76,6 +94,7 @@ pub(crate) fn init_telemetry(log_level: &str) -> Result<()> { filter_metric_events, ) }) + .fmt_fields(NullFieldFormatter) .boxed() }; @@ -122,15 +141,57 @@ pub(super) fn reload_metrics(layer: MetricsLayer) { } } -#[allow(clippy::type_complexity)] -pub(super) fn reload_fmt( - layer: Box< - dyn Layer>, Registry>> - + Send - + Sync, - >, -) { +pub(super) fn reload_fmt(layer: Box + Send + Sync>) { if let Some(handle) = FMT_LAYER_HANDLE.get() { handle.reload(layer).expect("fmt layer reload must succeed"); } } + +pub(crate) struct SamplingFilter {} + +impl SamplingFilter { + pub(crate) fn new() -> Self { + Self {} + } + + fn sample(&self) -> bool { + let s: f64 = thread_rng().gen_range(0.0..=1.0); + s <= f64::from_bits(SPAN_SAMPLING_RATE.load(Ordering::Relaxed)) + } +} + +impl Filter for SamplingFilter +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + fn enabled( + &self, + meta: &tracing::Metadata<'_>, + cx: &tracing_subscriber::layer::Context<'_, S>, + ) -> bool { + let current_span = cx.current_span(); + + // + !meta.is_span() + // this span is enabled if: + || current_span + .id() + // - there's a parent span and it was enabled + .map(|id| cx.span(id).is_some()) + // - there's no parent span (it's the root), so we make the sampling decision + .unwrap_or_else(|| self.sample()) + } +} + +/// prevents span fields from being formatted to a string when writing logs +pub(crate) struct NullFieldFormatter; + +impl<'writer> FormatFields<'writer> for NullFieldFormatter { + fn format_fields( + &self, + _writer: tracing_subscriber::fmt::format::Writer<'writer>, + _fields: R, + ) -> std::fmt::Result { + Ok(()) + } +} diff --git a/apollo-router/src/query_planner/bridge_query_planner.rs b/apollo-router/src/query_planner/bridge_query_planner.rs index 1f7f66ec16..e7d36281ac 100644 --- a/apollo-router/src/query_planner/bridge_query_planner.rs +++ b/apollo-router/src/query_planner/bridge_query_planner.rs @@ -30,8 +30,6 @@ use crate::spec::Query; use crate::spec::Schema; use crate::Configuration; -pub(crate) static USAGE_REPORTING: &str = "apollo_telemetry::usage_reporting"; - #[derive(Clone)] /// A query planner that calls out to the nodejs router-bridge query planner. /// @@ -243,29 +241,16 @@ impl Service for BridgeQueryPlanner { Err(e) => { match &e { QueryPlannerError::PlanningErrors(pe) => { - if let Err(inner_e) = req - .context - .insert(USAGE_REPORTING, pe.usage_reporting.clone()) - { - tracing::error!( - "usage reporting was not serializable to context, {}", - inner_e - ); - } + req.context + .private_entries + .lock() + .insert(pe.usage_reporting.clone()); } QueryPlannerError::SpecError(e) => { - if let Err(inner_e) = req.context.insert( - USAGE_REPORTING, - UsageReporting { - stats_report_key: e.get_error_key().to_string(), - referenced_fields_by_type: HashMap::new(), - }, - ) { - tracing::error!( - "usage reporting was not serializable to context, {}", - inner_e - ); - } + req.context.private_entries.lock().insert(UsageReporting { + stats_report_key: e.get_error_key().to_string(), + referenced_fields_by_type: HashMap::new(), + }); } _ => (), } diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index c0e4a00fc5..a1d6392021 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -8,12 +8,9 @@ use std::task; use futures::future::BoxFuture; use router_bridge::planner::Planner; use router_bridge::planner::UsageReporting; -use serde::Serialize; -use serde_json_bytes::value::Serializer; use tower::ServiceExt; use tracing::Instrument; -use super::USAGE_REPORTING; use crate::cache::DeduplicatingCache; use crate::error::CacheResolverError; use crate::error::QueryPlannerError; @@ -169,17 +166,10 @@ where } if let Some(QueryPlannerContent::Plan { plan, .. }) = &content { - match (plan.usage_reporting).serialize(Serializer) { - Ok(v) => { - context.insert_json_value(USAGE_REPORTING, v); - } - Err(e) => { - tracing::error!( - "usage reporting was not serializable to context, {}", - e - ); - } - } + context + .private_entries + .lock() + .insert(plan.usage_reporting.clone()); } Ok(QueryPlannerResponse { content, @@ -211,17 +201,10 @@ where match res { Ok(content) => { if let QueryPlannerContent::Plan { plan, .. } = &content { - match (plan.usage_reporting).serialize(Serializer) { - Ok(v) => { - context.insert_json_value(USAGE_REPORTING, v); - } - Err(e) => { - tracing::error!( - "usage reporting was not serializable to context, {}", - e - ); - } - } + context + .private_entries + .lock() + .insert(plan.usage_reporting.clone()); } Ok(QueryPlannerResponse::builder() @@ -232,29 +215,21 @@ where Err(error) => { match error.deref() { QueryPlannerError::PlanningErrors(pe) => { - if let Err(inner_e) = request + request .context - .insert(USAGE_REPORTING, pe.usage_reporting.clone()) - { - tracing::error!( - "usage reporting was not serializable to context, {}", - inner_e - ); - } + .private_entries + .lock() + .insert(pe.usage_reporting.clone()); } QueryPlannerError::SpecError(e) => { - if let Err(inner_e) = request.context.insert( - USAGE_REPORTING, - UsageReporting { + request + .context + .private_entries + .lock() + .insert(UsageReporting { stats_report_key: e.get_error_key().to_string(), referenced_fields_by_type: HashMap::new(), - }, - ) { - tracing::error!( - "usage reporting was not serializable to context, {}", - inner_e - ); - } + }); } _ => {} } @@ -427,10 +402,9 @@ mod tests { .await .unwrap() .context - .get::<_, UsageReporting>(USAGE_REPORTING) - .ok() - .flatten() - .is_some()); + .private_entries + .lock() + .contains_key::()); } } } diff --git a/apollo-router/src/query_planner/execution.rs b/apollo-router/src/query_planner/execution.rs index 61e8c4f1f6..85fba06537 100644 --- a/apollo-router/src/query_planner/execution.rs +++ b/apollo-router/src/query_planner/execution.rs @@ -167,7 +167,11 @@ impl PlanNode { parent_value, sender, ) - .instrument(tracing::info_span!(FLATTEN_SPAN_NAME, "graphql.path" = %current_dir, "otel.kind" = "INTERNAL")) + .instrument(tracing::info_span!( + FLATTEN_SPAN_NAME, + "graphql.path" = %current_dir, + "otel.kind" = "INTERNAL" + )) .await; value = v; diff --git a/apollo-router/src/services/layers/apq.rs b/apollo-router/src/services/layers/apq.rs index 44ed17a71d..b9c61c6227 100644 --- a/apollo-router/src/services/layers/apq.rs +++ b/apollo-router/src/services/layers/apq.rs @@ -192,7 +192,7 @@ mod apq_tests { use crate::configuration::Apq; use crate::error::Error; use crate::graphql::Response; - use crate::services::layers::content_negociation::ACCEPTS_JSON_CONTEXT_KEY; + use crate::services::router::ClientRequestAccepts; use crate::services::router_service::from_supergraph_mock_callback; use crate::services::router_service::from_supergraph_mock_callback_and_configuration; use crate::Configuration; @@ -521,7 +521,11 @@ mod apq_tests { fn new_context() -> Context { let context = Context::new(); - context.insert(ACCEPTS_JSON_CONTEXT_KEY, true).unwrap(); + context.private_entries.lock().insert(ClientRequestAccepts { + json: true, + ..Default::default() + }); + context } } diff --git a/apollo-router/src/services/layers/content_negociation.rs b/apollo-router/src/services/layers/content_negociation.rs index 3a651a3b28..bd3c418d8e 100644 --- a/apollo-router/src/services/layers/content_negociation.rs +++ b/apollo-router/src/services/layers/content_negociation.rs @@ -23,15 +23,13 @@ use crate::graphql; use crate::layers::sync_checkpoint::CheckpointService; use crate::layers::ServiceExt as _; use crate::services::router; +use crate::services::router::ClientRequestAccepts; use crate::services::supergraph; use crate::services::MULTIPART_DEFER_CONTENT_TYPE; use crate::services::MULTIPART_DEFER_SPEC_PARAMETER; use crate::services::MULTIPART_DEFER_SPEC_VALUE; pub(crate) const GRAPHQL_JSON_RESPONSE_HEADER_VALUE: &str = "application/graphql-response+json"; -pub(crate) const ACCEPTS_WILDCARD_CONTEXT_KEY: &str = "content-negociation:accepts-wildcard"; -pub(crate) const ACCEPTS_MULTIPART_CONTEXT_KEY: &str = "content-negociation:accepts-multipart"; -pub(crate) const ACCEPTS_JSON_CONTEXT_KEY: &str = "content-negociation:accepts-json"; /// [`Layer`] for Content-Type checks implementation. #[derive(Clone, Default)] @@ -72,20 +70,10 @@ where return Ok(ControlFlow::Break(response.into())); } - let accepts_multipart = accepts_multipart(req.router_request.headers()); - let accepts_json = accepts_json(req.router_request.headers()); - let accepts_wildcard = accepts_wildcard(req.router_request.headers()); + let accepts = parse_accept(req.router_request.headers()); - if accepts_wildcard || accepts_multipart || accepts_json { - req.context - .insert(ACCEPTS_WILDCARD_CONTEXT_KEY, accepts_wildcard) - .unwrap(); - req.context - .insert(ACCEPTS_MULTIPART_CONTEXT_KEY, accepts_multipart) - .unwrap(); - req.context - .insert(ACCEPTS_JSON_CONTEXT_KEY, accepts_json) - .unwrap(); + if accepts.wildcard || accepts.multipart || accepts.json { + req.context.private_entries.lock().insert(accepts); Ok(ControlFlow::Continue(req)) } else { @@ -129,17 +117,15 @@ where fn layer(&self, service: S) -> Self::Service { service .map_first_graphql_response(|context, mut parts, res| { - let accepts_wildcard: bool = context - .get(ACCEPTS_WILDCARD_CONTEXT_KEY) - .unwrap_or_default() - .unwrap_or_default(); - let accepts_json: bool = context - .get(ACCEPTS_JSON_CONTEXT_KEY) - .unwrap_or_default() - .unwrap_or_default(); - let accepts_multipart: bool = context - .get(ACCEPTS_MULTIPART_CONTEXT_KEY) - .unwrap_or_default() + let ClientRequestAccepts { + wildcard: accepts_wildcard, + json: accepts_json, + multipart: accepts_multipart, + } = context + .private_entries + .lock() + .get() + .cloned() .unwrap_or_default(); if !res.has_next.unwrap_or_default() && (accepts_json || accepts_wildcard) { @@ -182,75 +168,46 @@ fn content_type_is_json(headers: &HeaderMap) -> bool { }) } -/// Returns true if the headers contain `accept: application/json` or `accept: application/graphql-response+json`, -/// or if there is no `accept` header -fn accepts_json(headers: &HeaderMap) -> bool { - !headers.contains_key(ACCEPT) - || headers.get_all(ACCEPT).iter().any(|value| { - value - .to_str() - .map(|accept_str| { - let mut list = MediaTypeList::new(accept_str); - - list.any(|mime| { - mime.as_ref() - .map(|mime| { - (mime.ty == APPLICATION && mime.subty == JSON) - || (mime.ty == APPLICATION - && mime.subty.as_str() == "graphql-response" - && mime.suffix == Some(JSON)) - }) - .unwrap_or(false) - }) - }) - .unwrap_or(false) - }) -} - -/// Returns true if the headers contain header `accept: */*` -fn accepts_wildcard(headers: &HeaderMap) -> bool { - headers.get_all(ACCEPT).iter().any(|value| { - value - .to_str() - .map(|accept_str| { - let mut list = MediaTypeList::new(accept_str); - - list.any(|mime| { - mime.as_ref() - .map(|mime| (mime.ty == _STAR && mime.subty == _STAR)) - .unwrap_or(false) - }) - }) - .unwrap_or(false) - }) -} - -/// Returns true if the headers contain accept header to enable defer -fn accepts_multipart(headers: &HeaderMap) -> bool { - headers.get_all(ACCEPT).iter().any(|value| { - value - .to_str() - .map(|accept_str| { - let mut list = MediaTypeList::new(accept_str); - - list.any(|mime| { - mime.as_ref() - .map(|mime| { - mime.ty == MULTIPART - && mime.subty == MIXED - && mime.get_param( - mediatype::Name::new(MULTIPART_DEFER_SPEC_PARAMETER) - .expect("valid name"), - ) == Some( - mediatype::Value::new(MULTIPART_DEFER_SPEC_VALUE) - .expect("valid value"), - ) - }) - .unwrap_or(false) - }) - }) - .unwrap_or(false) - }) +// Clippy suggests `for mime in MediaTypeList::new(str).flatten()` but less indentation +// does not seem worth making it invisible that Result is involved. +#[allow(clippy::manual_flatten)] +/// Returns (accepts_json, accepts_wildcard, accepts_multipart) +fn parse_accept(headers: &HeaderMap) -> ClientRequestAccepts { + let mut header_present = false; + let mut accepts = ClientRequestAccepts::default(); + for value in headers.get_all(ACCEPT) { + header_present = true; + if let Ok(str) = value.to_str() { + for result in MediaTypeList::new(str) { + if let Ok(mime) = result { + if !accepts.json + && ((mime.ty == APPLICATION && mime.subty == JSON) + || (mime.ty == APPLICATION + && mime.subty.as_str() == "graphql-response" + && mime.suffix == Some(JSON))) + { + accepts.json = true + } + if !accepts.wildcard && (mime.ty == _STAR && mime.subty == _STAR) { + accepts.wildcard = true + } + if !accepts.multipart && (mime.ty == MULTIPART && mime.subty == MIXED) { + let parameter = mediatype::Name::new(MULTIPART_DEFER_SPEC_PARAMETER) + .expect("valid name"); + let value = + mediatype::Value::new(MULTIPART_DEFER_SPEC_VALUE).expect("valid value"); + if mime.get_param(parameter) == Some(value) { + accepts.multipart = true + } + } + } + } + } + } + if !header_present { + accepts.json = true + } + accepts } #[cfg(test)] @@ -265,17 +222,20 @@ mod tests { HeaderValue::from_static(APPLICATION_JSON.essence_str()), ); default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); - assert!(accepts_json(&default_headers)); + let accepts = parse_accept(&default_headers); + assert!(accepts.json); let mut default_headers = HeaderMap::new(); default_headers.insert(ACCEPT, HeaderValue::from_static("*/*")); default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); - assert!(accepts_wildcard(&default_headers)); + let accepts = parse_accept(&default_headers); + assert!(accepts.wildcard); let mut default_headers = HeaderMap::new(); // real life browser example default_headers.insert(ACCEPT, HeaderValue::from_static("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8")); - assert!(accepts_wildcard(&default_headers)); + let accepts = parse_accept(&default_headers); + assert!(accepts.wildcard); let mut default_headers = HeaderMap::new(); default_headers.insert( @@ -283,7 +243,8 @@ mod tests { HeaderValue::from_static(GRAPHQL_JSON_RESPONSE_HEADER_VALUE), ); default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); - assert!(accepts_json(&default_headers)); + let accepts = parse_accept(&default_headers); + assert!(accepts.json); let mut default_headers = HeaderMap::new(); default_headers.insert( @@ -294,6 +255,7 @@ mod tests { ACCEPT, HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE), ); - assert!(accepts_multipart(&default_headers)); + let accepts = parse_accept(&default_headers); + assert!(accepts.multipart); } } diff --git a/apollo-router/src/services/router.rs b/apollo-router/src/services/router.rs index 486d1b329b..ec0b123aa4 100644 --- a/apollo-router/src/services/router.rs +++ b/apollo-router/src/services/router.rs @@ -246,3 +246,10 @@ impl Response { ) } } + +#[derive(Clone, Default)] +pub(crate) struct ClientRequestAccepts { + pub(crate) multipart: bool, + pub(crate) json: bool, + pub(crate) wildcard: bool, +} diff --git a/apollo-router/src/services/router_service.rs b/apollo-router/src/services/router_service.rs index 305b6702bb..b79310e413 100644 --- a/apollo-router/src/services/router_service.rs +++ b/apollo-router/src/services/router_service.rs @@ -34,12 +34,10 @@ use tracing::Instrument; use super::layers::apq::APQLayer; use super::layers::content_negociation; -use super::layers::content_negociation::ACCEPTS_JSON_CONTEXT_KEY; -use super::layers::content_negociation::ACCEPTS_MULTIPART_CONTEXT_KEY; -use super::layers::content_negociation::ACCEPTS_WILDCARD_CONTEXT_KEY; use super::layers::static_page::StaticPageLayer; use super::new_service::ServiceFactory; use super::router; +use super::router::ClientRequestAccepts; use super::supergraph; use super::HasPlugins; #[cfg(test)] @@ -257,17 +255,15 @@ where Ok(request) => supergraph_creator.create().oneshot(request).await?, }; - let accepts_wildcard: bool = context - .get(ACCEPTS_WILDCARD_CONTEXT_KEY) - .unwrap_or_default() - .unwrap_or_default(); - let accepts_json: bool = context - .get(ACCEPTS_JSON_CONTEXT_KEY) - .unwrap_or_default() - .unwrap_or_default(); - let accepts_multipart: bool = context - .get(ACCEPTS_MULTIPART_CONTEXT_KEY) - .unwrap_or_default() + let ClientRequestAccepts { + wildcard: accepts_wildcard, + json: accepts_json, + multipart: accepts_multipart, + } = context + .private_entries + .lock() + .get() + .cloned() .unwrap_or_default(); let (mut parts, mut body) = response.into_parts(); diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 234b31d6e4..d0c35e84b2 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -329,13 +329,13 @@ async fn call_http( let cloned_service_name = service_name.clone(); let cloned_context = context.clone(); let (parts, body) = async move { - cloned_context.enter_active_request().await; + cloned_context.enter_active_request(); let response = match client .call(request) .await { Err(err) => { tracing::error!(fetch_error = format!("{err:?}").as_str()); - cloned_context.leave_active_request().await; + cloned_context.leave_active_request(); return Err(FetchError::SubrequestHttpError { status_code: None, @@ -359,7 +359,7 @@ async fn call_http( if !content_type_str.contains(APPLICATION_JSON.essence_str()) && !content_type_str.contains(GRAPHQL_JSON_RESPONSE_HEADER_VALUE) { - cloned_context.leave_active_request().await; + cloned_context.leave_active_request(); return if !parts.status.is_success() { @@ -387,7 +387,7 @@ async fn call_http( .instrument(tracing::debug_span!("aggregate_response_data")) .await { Err(err) => { - cloned_context.leave_active_request().await; + cloned_context.leave_active_request(); tracing::error!(fetch_error = format!("{err:?}").as_str()); @@ -400,7 +400,7 @@ async fn call_http( }, Ok(body) => body, }; - cloned_context.leave_active_request().await; + cloned_context.leave_active_request(); Ok((parts, body)) }.instrument(subgraph_req_span).await?; diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index bf0469973c..d33f69afeb 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -20,8 +20,8 @@ use tower_service::Service; use tracing_futures::Instrument; use super::layers::content_negociation; -use super::layers::content_negociation::ACCEPTS_MULTIPART_CONTEXT_KEY; use super::new_service::ServiceFactory; +use super::router::ClientRequestAccepts; use super::subgraph_service::MakeSubgraphService; use super::subgraph_service::SubgraphServiceFactory; use super::ExecutionServiceFactory; @@ -189,9 +189,14 @@ where let operation_name = body.operation_name.clone(); let is_deferred = plan.is_deferred(operation_name.as_deref(), &variables); - let accepts_multipart: bool = context - .get(ACCEPTS_MULTIPART_CONTEXT_KEY) - .unwrap_or_default() + let ClientRequestAccepts { + multipart: accepts_multipart, + .. + } = context + .private_entries + .lock() + .get() + .cloned() .unwrap_or_default(); if is_deferred && !accepts_multipart { @@ -1535,7 +1540,11 @@ mod tests { fn defer_context() -> Context { let context = Context::new(); - context.insert(ACCEPTS_MULTIPART_CONTEXT_KEY, true).unwrap(); + context.private_entries.lock().insert(ClientRequestAccepts { + multipart: true, + ..Default::default() + }); + context } diff --git a/apollo-router/src/spec/query.rs b/apollo-router/src/spec/query.rs index ccbfb9da36..d7f6975414 100644 --- a/apollo-router/src/spec/query.rs +++ b/apollo-router/src/spec/query.rs @@ -26,7 +26,7 @@ use crate::graphql::Request; use crate::graphql::Response; use crate::json_ext::Object; use crate::json_ext::Path; -use crate::json_ext::PathElement; +use crate::json_ext::ResponsePathElement; use crate::json_ext::Value; use crate::query_planner::fetch::OperationKind; use crate::spec::FieldType; @@ -158,7 +158,7 @@ impl Query { &mut parameters, &mut input, &mut output, - &mut Path::default(), + &mut Vec::new(), ) { Ok(()) => output.into(), Err(InvalidValue) => Value::Null, @@ -214,7 +214,7 @@ impl Query { &mut parameters, &mut input, &mut output, - &mut Path::default(), + &mut Vec::new(), ) { Ok(()) => output.into(), Err(InvalidValue) => Value::Null, @@ -328,15 +328,15 @@ impl Query { } #[allow(clippy::too_many_arguments)] - fn format_value( - &self, + fn format_value<'a: 'b, 'b>( + &'a self, parameters: &mut FormatParameters, field_type: &FieldType, input: &mut Value, output: &mut Value, - path: &mut Path, + path: &mut Vec>, parent_type: &FieldType, - selection_set: &[Selection], + selection_set: &'a [Selection], ) -> Result<(), InvalidValue> { // for every type, if we have an invalid value, we will replace it with null // and return Ok(()), because values are optional by default @@ -358,17 +358,17 @@ impl Query { Ok(_) => { if output.is_null() { let message = match path.last() { - Some(PathElement::Key(k)) => format!( + Some(ResponsePathElement::Key(k)) => format!( "Cannot return null for non-nullable field {parent_type}.{k}" ), - Some(PathElement::Index(i)) => format!( + Some(ResponsePathElement::Index(i)) => format!( "Cannot return null for non-nullable array element of type {inner_type} at index {i}" ), _ => todo!(), }; parameters.errors.push(Error { message, - path: Some(path.clone()), + path: Some(Path::from_response_slice(path)), ..Error::default() }); @@ -398,7 +398,7 @@ impl Query { .iter_mut() .enumerate() .try_for_each(|(i, element)| { - path.push(PathElement::Index(i)); + path.push(ResponsePathElement::Index(i)); let res = self.format_value( parameters, inner_type, @@ -412,7 +412,7 @@ impl Query { res }) { Err(InvalidValue) => { - parameters.nullified.push(path.clone()); + parameters.nullified.push(Path::from_response_slice(path)); *output = Value::Null; Ok(()) } @@ -458,7 +458,7 @@ impl Query { if !parameters.schema.object_types.contains_key(input_type) && !parameters.schema.interfaces.contains_key(input_type) { - parameters.nullified.push(path.clone()); + parameters.nullified.push(Path::from_response_slice(path)); *output = Value::Null; return Ok(()); } @@ -480,14 +480,14 @@ impl Query { ) .is_err() { - parameters.nullified.push(path.clone()); + parameters.nullified.push(Path::from_response_slice(path)); *output = Value::Null; } Ok(()) } _ => { - parameters.nullified.push(path.clone()); + parameters.nullified.push(Path::from_response_slice(path)); *output = Value::Null; Ok(()) } @@ -548,13 +548,13 @@ impl Query { } } - fn apply_selection_set( - &self, - selection_set: &[Selection], + fn apply_selection_set<'a: 'b, 'b>( + &'a self, + selection_set: &'a [Selection], parameters: &mut FormatParameters, input: &mut Object, output: &mut Object, - path: &mut Path, + path: &mut Vec>, parent_type: &FieldType, ) -> Result<(), InvalidValue> { // For skip and include, using .unwrap_or is legit here because @@ -597,7 +597,7 @@ impl Query { } } } else { - path.push(PathElement::Key(field_name.as_str().to_string())); + path.push(ResponsePathElement::Key(field_name.as_str())); let res = self.format_value( parameters, field_type, @@ -620,7 +620,7 @@ impl Query { "Cannot return null for non-nullable field {parent_type}.{}", field_name.as_str() ), - path: Some(path.clone()), + path: Some(Path::from_response_slice(path)), ..Error::default() }); @@ -722,13 +722,13 @@ impl Query { Ok(()) } - fn apply_root_selection_set( - &self, - operation: &Operation, + fn apply_root_selection_set<'a: 'b, 'b>( + &'a self, + operation: &'a Operation, parameters: &mut FormatParameters, input: &mut Object, output: &mut Object, - path: &mut Path, + path: &mut Vec>, ) -> Result<(), InvalidValue> { for selection in &operation.selection_set { match selection { @@ -759,7 +759,7 @@ impl Query { let selection_set = selection_set.as_deref().unwrap_or_default(); let output_value = output.entry((*field_name).clone()).or_insert(Value::Null); - path.push(PathElement::Key(field_name_str.to_string())); + path.push(ResponsePathElement::Key(field_name_str)); let res = self.format_value( parameters, field_type, @@ -784,7 +784,7 @@ impl Query { "Cannot return null for non-nullable field {}.{field_name_str}", operation.kind ), - path: Some(path.clone()), + path: Some(Path::from_response_slice(path)), ..Error::default() }); return Err(InvalidValue); diff --git a/apollo-router/tests/snapshots/tracing_tests__traced_basic_composition.snap b/apollo-router/tests/snapshots/tracing_tests__traced_basic_composition.snap index 51da4c49cc..d76289f1fa 100644 --- a/apollo-router/tests/snapshots/tracing_tests__traced_basic_composition.snap +++ b/apollo-router/tests/snapshots/tracing_tests__traced_basic_composition.snap @@ -94,6 +94,10 @@ expression: get_spans() "apollo_private.duration_ns", 0 ], + [ + "apollo_private.http.response_headers", + "{}" + ], [ "otel.status_code", "Ok" diff --git a/apollo-router/tests/snapshots/tracing_tests__traced_basic_request.snap b/apollo-router/tests/snapshots/tracing_tests__traced_basic_request.snap index cd7792d6bc..97b89bd0fe 100644 --- a/apollo-router/tests/snapshots/tracing_tests__traced_basic_request.snap +++ b/apollo-router/tests/snapshots/tracing_tests__traced_basic_request.snap @@ -94,6 +94,10 @@ expression: get_spans() "apollo_private.duration_ns", 0 ], + [ + "apollo_private.http.response_headers", + "{}" + ], [ "otel.status_code", "Ok" diff --git a/apollo-router/tests/snapshots/tracing_tests__variables.snap b/apollo-router/tests/snapshots/tracing_tests__variables.snap index ded86a220e..d147daa040 100644 --- a/apollo-router/tests/snapshots/tracing_tests__variables.snap +++ b/apollo-router/tests/snapshots/tracing_tests__variables.snap @@ -94,6 +94,10 @@ expression: get_spans() "apollo_private.duration_ns", 0 ], + [ + "apollo_private.http.response_headers", + "{}" + ], [ "otel.status_code", "Error" diff --git a/licenses.html b/licenses.html index cd3cc30b39..9f673ea53b 100644 --- a/licenses.html +++ b/licenses.html @@ -6464,6 +6464,7 @@

Used by:

  • text-size
  • thread_local
  • threadpool
  • +
  • tikv-jemalloc-sys
  • toml
  • toml_datetime
  • toml_edit
  • @@ -10323,6 +10324,7 @@

    Used by:

  • num-cmp
  • rhai_codegen
  • thrift
  • +
  • tikv-jemallocator
  • try_match_inner
  • unic-char-property
  • unic-char-range