Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
refactor: Use async friendly RwLocks in async code paths (#394)
Browse files Browse the repository at this point in the history
  • Loading branch information
ncloudioj authored Jun 1, 2022
1 parent bd52656 commit 64a401b
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 26 deletions.
17 changes: 9 additions & 8 deletions src/adm/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::{
collections::{HashMap, HashSet},
fmt::Debug,
iter::FromIterator,
sync::{Arc, RwLock},
sync::Arc,
time::Duration,
};

use actix_web::{http::Uri, rt};
use actix_web_location::Location;
use lazy_static::lazy_static;
use tokio::sync::RwLock;
use url::Url;

use super::{
Expand Down Expand Up @@ -102,25 +103,25 @@ fn check_url(url: Url, species: &'static str, filter: &[Vec<String>]) -> Handler
Err(HandlerErrorKind::UnexpectedHost(species, host).into())
}

pub fn spawn_updater(
pub async fn spawn_updater(
filter: &Arc<RwLock<AdmFilter>>,
storage_client: cloud_storage::Client,
) -> HandlerResult<()> {
if !filter.read().unwrap().is_cloud() {
if !filter.read().await.is_cloud() {
return Ok(());
}
let mfilter = filter.clone();
rt::spawn(async move {
let tags = crate::tags::Tags::default();
let mut tags = crate::tags::Tags::default();
loop {
let mut filter = mfilter.write().unwrap();
let mut filter = mfilter.write().await;
match filter.requires_update(&storage_client).await {
Ok(true) => filter.update(&storage_client).await.unwrap_or_else(|e| {
filter.report(&e, &tags);
filter.report(&e, &mut tags);
}),
Ok(false) => {}
Err(e) => {
filter.report(&e, &tags);
filter.report(&e, &mut tags);
}
}
rt::time::sleep(filter.refresh_rate).await;
Expand All @@ -140,7 +141,7 @@ impl AdmFilter {
}

/// Report the error directly to sentry
fn report(&self, error: &HandlerError, tags: &Tags) {
fn report(&self, error: &HandlerError, tags: &mut Tags) {
// trace!(&error, &tags);
// TODO: if not error.is_reportable, just add to metrics.
let mut merged_tags = error.tags.clone();
Expand Down
25 changes: 11 additions & 14 deletions src/adm/tiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,17 @@ pub async fn get_tiles(
metrics.incr_with_tags("filter.adm.empty_response", Some(tags));
}

let filtered: Vec<Tile> = response
.tiles
.into_iter()
.filter_map(|tile| {
state.filter.read().unwrap().filter_and_process(
tile,
location,
&device_info,
tags,
metrics,
)
})
.take(settings.adm_max_tiles as usize)
.collect();
let filtered: Vec<Tile> = {
let filter = state.filter.read().await;
response
.tiles
.into_iter()
.filter_map(|tile| {
filter.filter_and_process(tile, location, &device_info, tags, metrics)
})
.take(settings.adm_max_tiles as usize)
.collect()
};

let mut tiles: Vec<Tile> = Vec::new();
for mut tile in filtered {
Expand Down
5 changes: 3 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Main application server
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;

use actix_cors::Cors;
use actix_web::{
Expand Down Expand Up @@ -121,7 +122,7 @@ impl Server {
raw_filter.update(&storage_client).await?
}
let filter = Arc::new(RwLock::new(raw_filter));
spawn_updater(&filter, storage_client)?;
spawn_updater(&filter, storage_client).await?;
let tiles_cache = cache::TilesCache::new(TILES_CACHE_INITIAL_CAPACITY);
let img_store = ImageStore::create(&settings, Arc::clone(&metrics), &req).await?;
let excluded_dmas = if let Some(exclude_dmas) = &settings.exclude_dma {
Expand Down
2 changes: 1 addition & 1 deletion src/web/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn get_tiles(
if !state
.filter
.read()
.unwrap()
.await
.all_include_regions
.contains(&location.country())
{
Expand Down
3 changes: 2 additions & 1 deletion src/web/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;

use actix_cors::Cors;
Expand All @@ -15,6 +15,7 @@ use actix_web::{
use cadence::{SpyMetricSink, StatsdClient};
use futures::{channel::mpsc, StreamExt};
use serde_json::{json, Value};
use tokio::sync::RwLock;
use url::Url;

use crate::{
Expand Down

0 comments on commit 64a401b

Please sign in to comment.