diff --git a/src/adm/filter.rs b/src/adm/filter.rs index 655418b4..05112d4a 100644 --- a/src/adm/filter.rs +++ b/src/adm/filter.rs @@ -3,13 +3,14 @@ use std::{ collections::{HashMap, HashSet}, fmt::Debug, iter::FromIterator, - sync::{Arc, RwLock}, + sync::Arc, time::Duration, }; use actix_web::{http::Uri, rt}; use actix_web_location::Location; use lazy_static::lazy_static; +use tokio::sync::RwLock; use url::Url; use super::{ @@ -102,25 +103,25 @@ fn check_url(url: Url, species: &'static str, filter: &[Vec]) -> Handler Err(HandlerErrorKind::UnexpectedHost(species, host).into()) } -pub fn spawn_updater( +pub async fn spawn_updater( filter: &Arc>, storage_client: cloud_storage::Client, ) -> HandlerResult<()> { - if !filter.read().unwrap().is_cloud() { + if !filter.read().await.is_cloud() { return Ok(()); } let mfilter = filter.clone(); rt::spawn(async move { - let tags = crate::tags::Tags::default(); + let mut tags = crate::tags::Tags::default(); loop { - let mut filter = mfilter.write().unwrap(); + let mut filter = mfilter.write().await; match filter.requires_update(&storage_client).await { Ok(true) => filter.update(&storage_client).await.unwrap_or_else(|e| { - filter.report(&e, &tags); + filter.report(&e, &mut tags); }), Ok(false) => {} Err(e) => { - filter.report(&e, &tags); + filter.report(&e, &mut tags); } } rt::time::sleep(filter.refresh_rate).await; @@ -140,7 +141,7 @@ impl AdmFilter { } /// Report the error directly to sentry - fn report(&self, error: &HandlerError, tags: &Tags) { + fn report(&self, error: &HandlerError, tags: &mut Tags) { // trace!(&error, &tags); // TODO: if not error.is_reportable, just add to metrics. let mut merged_tags = error.tags.clone(); diff --git a/src/adm/tiles.rs b/src/adm/tiles.rs index 5a2fcc8b..fb61ba32 100644 --- a/src/adm/tiles.rs +++ b/src/adm/tiles.rs @@ -248,20 +248,17 @@ pub async fn get_tiles( metrics.incr_with_tags("filter.adm.empty_response", Some(tags)); } - let filtered: Vec = response - .tiles - .into_iter() - .filter_map(|tile| { - state.filter.read().unwrap().filter_and_process( - tile, - location, - &device_info, - tags, - metrics, - ) - }) - .take(settings.adm_max_tiles as usize) - .collect(); + let filtered: Vec = { + let filter = state.filter.read().await; + response + .tiles + .into_iter() + .filter_map(|tile| { + filter.filter_and_process(tile, location, &device_info, tags, metrics) + }) + .take(settings.adm_max_tiles as usize) + .collect() + }; let mut tiles: Vec = Vec::new(); for mut tile in filtered { diff --git a/src/server/mod.rs b/src/server/mod.rs index cd219d37..003f3664 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,6 +1,7 @@ //! Main application server -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, Instant}; +use tokio::sync::RwLock; use actix_cors::Cors; use actix_web::{ @@ -121,7 +122,7 @@ impl Server { raw_filter.update(&storage_client).await? } let filter = Arc::new(RwLock::new(raw_filter)); - spawn_updater(&filter, storage_client)?; + spawn_updater(&filter, storage_client).await?; let tiles_cache = cache::TilesCache::new(TILES_CACHE_INITIAL_CAPACITY); let img_store = ImageStore::create(&settings, Arc::clone(&metrics), &req).await?; let excluded_dmas = if let Some(exclude_dmas) = &settings.exclude_dma { diff --git a/src/web/handlers.rs b/src/web/handlers.rs index 7d487acd..55f1b512 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -51,7 +51,7 @@ pub async fn get_tiles( if !state .filter .read() - .unwrap() + .await .all_include_regions .contains(&location.country()) { diff --git a/src/web/test.rs b/src/web/test.rs index 9cc1c50f..572be77b 100644 --- a/src/web/test.rs +++ b/src/web/test.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::convert::TryFrom; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use actix_cors::Cors; @@ -15,6 +15,7 @@ use actix_web::{ use cadence::{SpyMetricSink, StatsdClient}; use futures::{channel::mpsc, StreamExt}; use serde_json::{json, Value}; +use tokio::sync::RwLock; use url::Url; use crate::{