diff --git a/Cargo.lock b/Cargo.lock index f2227608..0cbb6d24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -670,6 +670,7 @@ dependencies = [ "rand 0.8.4", "regex", "reqwest", + "scopeguard", "sentry", "sentry-backtrace", "serde 1.0.133", diff --git a/Cargo.toml b/Cargo.toml index 99000f4a..140d46bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ serde = "1.0" sentry = "0.19" sentry-backtrace = "0.19" serde_json = "1.0" +scopeguard = "1.1.0" slog = { version = "2.7", features = ["max_level_trace", "release_max_level_info", "dynamic-keys"] } slog-async = "2.6" slog-envlogger = "2.2.0" diff --git a/src/adm/tiles.rs b/src/adm/tiles.rs index fc5bd9dc..0d9b35ad 100644 --- a/src/adm/tiles.rs +++ b/src/adm/tiles.rs @@ -135,7 +135,7 @@ pub async fn get_tiles( tags: &mut Tags, metrics: &Metrics, headers: Option<&HeaderMap>, -) -> Result { +) -> HandlerResult { let settings = &state.settings; let image_store = &state.img_store; let adm_url = Url::parse_with_params( diff --git a/src/server/cache.rs b/src/server/cache.rs index 5820356c..4796bf6b 100644 --- a/src/server/cache.rs +++ b/src/server/cache.rs @@ -1,7 +1,6 @@ //! Tile cache manager use std::{ fmt::Debug, - ops::Deref, sync::Arc, time::{Duration, SystemTime}, }; @@ -56,13 +55,91 @@ impl TilesCache { } }); } + + /// Get an immutable reference to an entry in the cache + pub fn get( + &self, + audience_key: &AudienceKey, + ) -> Option> { + self.inner.get(audience_key) + } + + /// Prepare to write to the cache. + /// + /// Sets the cache entry to the Refreshing/Populating states. + /// `WriteHandle` resets those states when it goes out of scope if no + /// `insert` call was issued (due to errors or panics). + pub fn prepare_write<'a>( + &'a self, + audience_key: &'a AudienceKey, + expired: bool, + ) -> WriteHandle<'a, impl FnOnce(()) + '_> { + if expired { + // The cache entry's expired and we're about to refresh it + trace!("prepare_write: Fresh now expired, Refreshing"); + self.inner + .alter(audience_key, |_, tiles_state| match tiles_state { + TilesState::Fresh { tiles } if tiles.expired() => { + TilesState::Refreshing { tiles } + } + _ => tiles_state, + }); + } else { + // We'll populate this cache entry for probably the first time + trace!("prepare_write: Populating"); + self.inner + .insert(audience_key.clone(), TilesState::Populating); + }; + + let guard = scopeguard::guard((), move |_| { + trace!("prepare_write (ScopeGuard cleanup): Resetting state"); + if expired { + // Back to Fresh (though the tiles are expired): so a later request + // will retry refreshing again + self.inner + .alter(audience_key, |_, tiles_state| match tiles_state { + TilesState::Refreshing { tiles } => TilesState::Fresh { tiles }, + _ => tiles_state, + }); + } else { + // Clear the entry: a later request will retry populating again + self.inner.remove_if(audience_key, |_, tiles_state| { + matches!(tiles_state, TilesState::Populating) + }); + } + }); + WriteHandle { + cache: self, + audience_key, + guard, + } + } } -impl Deref for TilesCache { - type Target = Arc>; +/// Manages a write to a specific `TilesCache` entry. +/// +/// This will reset the temporary state set by `prepare_write` when it's gone +/// out of scope and no `insert` was issued (e.g. in the case of errors or +/// panics). +pub struct WriteHandle<'a, F> +where + F: FnOnce(()), +{ + cache: &'a TilesCache, + audience_key: &'a AudienceKey, + guard: scopeguard::ScopeGuard<(), F>, +} - fn deref(&self) -> &Self::Target { - &self.inner +impl WriteHandle<'_, F> +where + F: FnOnce(()), +{ + /// Insert a value into the cache for our audience_key + pub fn insert(self, tiles: TilesState) { + self.cache.inner.insert(self.audience_key.clone(), tiles); + // With the write completed cancel scopeguard's cleanup + scopeguard::ScopeGuard::into_inner(self.guard); + trace!("WriteHandle: ScopeGuard defused (cancelled)"); } } @@ -140,7 +217,7 @@ async fn tiles_cache_garbage_collect(cache: &TilesCache, metrics: &Metrics) { // calculate the size and GC (for seldomly used Tiles) while we're at it let mut cache_count = 0; let mut cache_size = 0; - for refm in cache.iter() { + for refm in cache.inner.iter() { cache_count += 1; cache_size += refm.value().size(); } diff --git a/src/web/handlers.rs b/src/web/handlers.rs index 8bd05cc3..e28738f7 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -6,7 +6,7 @@ use rand::{thread_rng, Rng}; use crate::{ adm, - error::{HandlerError, HandlerErrorKind}, + error::{HandlerErrorKind, HandlerResult}, metrics::Metrics, server::{ cache::{self, Tiles, TilesState}, @@ -43,7 +43,7 @@ pub async fn get_tiles( metrics: Metrics, state: web::Data, request: HttpRequest, -) -> Result { +) -> HandlerResult { trace!("get_tiles"); metrics.incr("tiles.get"); @@ -127,22 +127,13 @@ pub async fn get_tiles( // Alter the cache separately from the read above: writes are more // expensive and these alterations occur infrequently - if expired { - // The cache entry's expired and we're about to refresh it - trace!("get_tiles: Fresh now expired, Refreshing"); - state - .tiles_cache - .alter(&audience_key, |_, tiles_state| match tiles_state { - TilesState::Fresh { tiles } if tiles.expired() => TilesState::Refreshing { tiles }, - _ => tiles_state, - }); - } else { - // We'll populate this cache entry for probably the first time - trace!("get_tiles: Populating"); - state - .tiles_cache - .insert(audience_key.clone(), TilesState::Populating); - }; + + // Prepare to write: temporarily set the cache entry to + // Refreshing/Populating until we've completed our write, notifying other + // requests in flight during this time to return stale data/204 No Content + // instead of making duplicate/redundant writes. The handle will reset the + // temporary state if no write occurs (due to errors/panics) + let handle = state.tiles_cache.prepare_write(&audience_key, expired); let result = adm::get_tiles( &state, @@ -159,75 +150,43 @@ pub async fn get_tiles( ) .await; - let handle_result = || { - match result { - Ok(response) => { - let tiles = cache::Tiles::new(response, add_jitter(&state.settings))?; - trace!( - "get_tiles: cache miss{}: {:?}", - if expired { " (expired)" } else { "" }, - &audience_key - ); - metrics.incr("tiles_cache.miss"); - state.tiles_cache.insert( - audience_key.clone(), - TilesState::Fresh { - tiles: tiles.clone(), - }, - ); - Ok(content_response(&tiles.content)) - } - Err(e) => { - // Add some kind of stats to Retrieving or RetrievingFirst? - // do we need a kill switch if we're restricting like this already? - match e.kind() { - HandlerErrorKind::BadAdmResponse(es) => { - warn!("Bad response from ADM: {:?}", e); - metrics.incr_with_tags("tiles.invalid", Some(&tags)); - state.tiles_cache.insert( - audience_key.clone(), - TilesState::Fresh { - tiles: Tiles::empty(add_jitter(&state.settings)), - }, - ); - // Report directly to sentry - // (This is starting to become a pattern. 🤔) - let mut tags = Tags::from_head(request.head(), settings); - tags.add_extra("err", es); - tags.add_tag("level", "warning"); - l_sentry::report(&tags, sentry::event_from_error(&e)); - warn!("ADM Server error: {:?}", e); - Ok(HttpResponse::NoContent().finish()) - } - _ => Err(e), + match result { + Ok(response) => { + let tiles = cache::Tiles::new(response, add_jitter(&state.settings))?; + trace!( + "get_tiles: cache miss{}: {:?}", + if expired { " (expired)" } else { "" }, + &audience_key + ); + metrics.incr("tiles_cache.miss"); + handle.insert(TilesState::Fresh { + tiles: tiles.clone(), + }); + Ok(content_response(&tiles.content)) + } + Err(e) => { + // Add some kind of stats to Retrieving or RetrievingFirst? + // do we need a kill switch if we're restricting like this already? + match e.kind() { + HandlerErrorKind::BadAdmResponse(es) => { + warn!("Bad response from ADM: {:?}", e); + metrics.incr_with_tags("tiles.invalid", Some(&tags)); + handle.insert(TilesState::Fresh { + tiles: Tiles::empty(add_jitter(&state.settings)), + }); + // Report directly to sentry + // (This is starting to become a pattern. 🤔) + let mut tags = Tags::from_head(request.head(), settings); + tags.add_extra("err", es); + tags.add_tag("level", "warning"); + l_sentry::report(&tags, sentry::event_from_error(&e)); + warn!("ADM Server error: {:?}", e); + Ok(HttpResponse::NoContent().finish()) } + _ => Err(e), } } - }; - - let result = handle_result(); - // Cleanup the TilesState on errors - // TODO: potential panics are not currently cleaned up - if result.is_err() { - if expired { - // Back to Fresh (though the tiles are expired): so a later request - // will retry refreshing again - state - .tiles_cache - .alter(&audience_key, |_, tiles_state| match tiles_state { - TilesState::Refreshing { tiles } => TilesState::Fresh { tiles }, - _ => tiles_state, - }); - } else { - // Clear the entry: a later request will retry populating again - state - .tiles_cache - .remove_if(&audience_key, |_, tiles_state| { - matches!(tiles_state, TilesState::Populating) - }); - } } - result } fn content_response(content: &cache::TilesContent) -> HttpResponse {