Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
feat: upgrade to actix-web 4 (#387)
Browse files Browse the repository at this point in the history
* feat: upgrade to actix-web 4

- move to the latest cloud-storage (w/ a new API)
- upgrade cadence, requiring wrapping of its Client in Arc
- fix the 404 response to use the standard json payload
- kill hostname (we already use gethostname)
- update to new config API
- kill actix-web features to save on compile times
- update rust

Closes #386
  • Loading branch information
pjenvey authored May 13, 2022
1 parent f6876cf commit bd52656
Show file tree
Hide file tree
Showing 20 changed files with 1,012 additions and 1,574 deletions.
1,954 changes: 685 additions & 1,269 deletions Cargo.lock

Large diffs are not rendered by default.

46 changes: 20 additions & 26 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,42 @@ edition = "2021"
debug = 1

[dependencies]
actix-cors = "0.5"
actix-http = "2"
actix-rt = "1" # 2+ breaks testing, May need actix-web 4+?
actix-web = "3"
actix-web-location = { version = "0.5", features = ["actix-web-v3", "maxmind", "cadence"] }
actix-cors = "0.6"
actix-web = { version = "4", default_features = false, features = ["macros"] }
actix-web-location = { version = "0.7", features = ["actix-web-v4", "maxmind", "cadence"] }
async-trait = "0.1"
backtrace = "0.3"
base64 = "0.13"
blake3 = "1.0"
bytes = "1.0"
cadence = "0.26"
blake3 = "1"
bytes = "1"
cadence = "0.29"
chrono = "0.4"
docopt = "1.1"
cloud-storage = { git = "https://github.com/mozilla-services/cloud-storage-rs", branch = "release/0.6.2-create_with_params" } # 0.7+ includes request 0.11, tokio 1.4
config = "0.11"
dashmap = "4.0.2"
cloud-storage = { git = "https://github.com/mozilla-services/cloud-storage-rs", branch = "release/0.11.1-client-builder-and-params" }
config = "0.13"
dashmap = "5.3"
futures = "0.3"
gethostname = "0.2.1"
hex = "0.4"
hostname = "0.3"
image = "0.24"
lazy_static = "1.4"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_info"] }
rand ="0.8"
regex = "1.4"
reqwest = { version = "0.10", features = ["json"] } # 0.11+ conflicts with actix & tokio. Block until actix-web 4+?
serde = "1.0"
# pin to 0.19 (until our onpremise is upgraded):
# https://github.com/getsentry/sentry-rust/issues/277
sentry = "0.19"
sentry-backtrace = "0.19"
serde_json = "1.0"
scopeguard = "1.1.0"
regex = "1"
reqwest = { version = "0.11", features = ["json"] }
serde = "1"
sentry = "0.25"
sentry-backtrace = "0.25"
serde_json = "1"
scopeguard = "1.1"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_info", "dynamic-keys"] }
slog-async = "2.6"
slog-async = "2.7"
slog-envlogger = "2.2.0"
slog-mozlog-json = "0.1"
slog-scope = "4.4"
slog-stdlog = "4.1"
slog-term = "2.7"
thiserror = "1.0"
# pinning to 0.2.4 due to dependencies (actix, etc.)
tokio = { version = "0.2.4", features = ["macros", "sync"] }
slog-term = "2"
thiserror = "1"
tokio = { version = "1", features = ["macros", "sync"] }
url = "2"
woothee = "0.13"
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
ARG APPNAME=contile

# make sure that the build and run environments are the same version
FROM rust:1.58-slim-buster as builder
FROM rust:1.60-slim-buster as builder
ARG APPNAME
ADD . /app
WORKDIR /app
Expand Down
54 changes: 28 additions & 26 deletions src/adm/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
time::Duration,
};

use actix_http::http::Uri;
use actix_web::{http::Uri, rt};
use actix_web_location::Location;
use lazy_static::lazy_static;
use url::Url;
Expand Down Expand Up @@ -60,8 +60,6 @@ pub struct AdmFilter {
pub source_url: Option<url::Url>,
pub last_updated: Option<chrono::DateTime<chrono::Utc>>,
pub refresh_rate: Duration,
pub connect_timeout: Duration,
pub request_timeout: Duration,
}

/// Parse &str into a `Url`
Expand Down Expand Up @@ -104,27 +102,31 @@ fn check_url(url: Url, species: &'static str, filter: &[Vec<String>]) -> Handler
Err(HandlerErrorKind::UnexpectedHost(species, host).into())
}

pub fn spawn_updater(filter: &Arc<RwLock<AdmFilter>>, req: reqwest::Client) {
pub fn spawn_updater(
filter: &Arc<RwLock<AdmFilter>>,
storage_client: cloud_storage::Client,
) -> HandlerResult<()> {
if !filter.read().unwrap().is_cloud() {
return;
return Ok(());
}
let mfilter = filter.clone();
actix_rt::spawn(async move {
rt::spawn(async move {
let tags = crate::tags::Tags::default();
loop {
let mut filter = mfilter.write().unwrap();
match filter.requires_update(&req).await {
Ok(true) => filter.update().await.unwrap_or_else(|e| {
match filter.requires_update(&storage_client).await {
Ok(true) => filter.update(&storage_client).await.unwrap_or_else(|e| {
filter.report(&e, &tags);
}),
Ok(false) => {}
Err(e) => {
filter.report(&e, &tags);
}
}
actix_rt::time::delay_for(filter.refresh_rate).await;
rt::time::sleep(filter.refresh_rate).await;
}
});
Ok(())
}

/// Filter a given tile data set provided by ADM and validate the various elements
Expand All @@ -147,7 +149,10 @@ impl AdmFilter {
}

/// check to see if the bucket has been modified since the last time we updated.
pub async fn requires_update(&self, req: &reqwest::Client) -> HandlerResult<bool> {
pub async fn requires_update(
&self,
storage_client: &cloud_storage::Client,
) -> HandlerResult<bool> {
// don't update non-bucket versions (for now)
if !self.is_cloud() {
return Ok(false);
Expand All @@ -159,9 +164,10 @@ impl AdmFilter {
HandlerError::internal(&format!("Missing bucket Host {:?}", self.source))
})?
.to_string();
let obj =
cloud_storage::Object::read_with(&host, bucket.path().trim_start_matches('/'), req)
.await?;
let obj = storage_client
.object()
.read(&host, bucket.path().trim_start_matches('/'))
.await?;
if let Some(updated) = self.last_updated {
// if the bucket is older than when we last checked, do nothing.
return Ok(updated <= obj.updated);
Expand All @@ -172,20 +178,16 @@ impl AdmFilter {
}

/// Try to update the ADM filter data from the remote bucket.
pub async fn update(&mut self) -> HandlerResult<()> {
pub async fn update(&mut self, storage_client: &cloud_storage::Client) -> HandlerResult<()> {
if let Some(bucket) = &self.source_url {
let adm_settings = AdmFilterSettings::from_settings_bucket(
bucket,
self.connect_timeout,
self.request_timeout,
)
.await
.map_err(|e| {
HandlerError::internal(&format!(
"Invalid bucket data in {:?}: {:?}",
self.source, e
))
})?;
let adm_settings = AdmFilterSettings::from_settings_bucket(storage_client, bucket)
.await
.map_err(|e| {
HandlerError::internal(&format!(
"Invalid bucket data in {:?}: {:?}",
self.source, e
))
})?;
for (adv, setting) in adm_settings.advertisers {
if setting.delete {
trace!("Removing advertiser {:?}", &adv);
Expand Down
16 changes: 4 additions & 12 deletions src/adm/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,8 @@ impl TryFrom<String> for AdmFilterSettings {
impl AdmFilterSettings {
/// Try to fetch the ADM settings from a Google Storage bucket url.
pub async fn from_settings_bucket(
cloud_storage: &cloud_storage::Client,
settings_bucket: &url::Url,
connection_timeout: std::time::Duration,
request_timeout: std::time::Duration,
) -> Result<AdmFilterSettings, ConfigError> {
let settings_str = settings_bucket.as_str();
if settings_bucket.scheme() != "gs" {
Expand All @@ -315,12 +314,9 @@ impl AdmFilterSettings {
})?
.to_string();
let path = settings_bucket.path().trim_start_matches('/');
let req = reqwest::Client::builder()
.connect_timeout(connection_timeout)
.timeout(request_timeout)
.build()
.map_err(|e| ConfigError::Message(e.to_string()))?;
let contents = cloud_storage::Object::download_with(&bucket_name, path, &req)
let contents = cloud_storage
.object()
.download(&bucket_name, path)
.await
.map_err(|e| ConfigError::Message(format!("Could not download settings: {:?}", e)))?;
let mut reply =
Expand Down Expand Up @@ -414,8 +410,6 @@ impl From<&mut Settings> for HandlerResult<AdmFilter> {
.to_lowercase();
let mut all_include_regions = HashSet::new();
let source = settings.adm_settings.clone();
let connect_timeout = settings.connect_timeout;
let request_timeout = settings.request_timeout;
let source_url = match source.parse::<url::Url>() {
Ok(v) => Some(v),
Err(e) => {
Expand Down Expand Up @@ -453,8 +447,6 @@ impl From<&mut Settings> for HandlerResult<AdmFilter> {
source,
source_url,
refresh_rate: std::time::Duration::from_secs(refresh_rate),
connect_timeout: std::time::Duration::from_secs(connect_timeout),
request_timeout: std::time::Duration::from_secs(request_timeout),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/adm/tiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::{Duration, Instant},
};

use actix_http::http::header::{HeaderMap, HeaderValue};
use actix_web::http::header::{HeaderMap, HeaderValue};
use actix_web_location::Location;
use serde::{Deserialize, Serialize};
use url::Url;
Expand Down
23 changes: 12 additions & 11 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::result;
use actix_web::http::uri::InvalidUri;

use actix_web::{
dev::{HttpResponseBuilder, ServiceResponse},
error::ResponseError,
http::StatusCode,
middleware::errhandlers::ErrorHandlerResponse,
dev::ServiceResponse, error::ResponseError, http::StatusCode, middleware::ErrorHandlerResponse,
HttpResponse, Result,
};
use serde_json::json;
Expand Down Expand Up @@ -201,11 +198,16 @@ impl Error for HandlerError {
impl HandlerError {
pub fn render_404<B>(res: ServiceResponse<B>) -> Result<ErrorHandlerResponse<B>> {
// Replace the outbound error message with our own.
let resp = HttpResponseBuilder::new(StatusCode::NOT_FOUND).json(0);
Ok(ErrorHandlerResponse::Response(ServiceResponse::new(
res.request().clone(),
resp.into_body(),
)))
let status = StatusCode::NOT_FOUND;
let resp = HttpResponse::build(status).json(json!({
"code": status.as_u16(),
"errno": status.as_u16(),
"error": status.to_string(),
}));

let (req, _) = res.into_parts();
let resp = ServiceResponse::new(req, resp).map_into_right_body();
Ok(ErrorHandlerResponse::Response(resp))
}
}

Expand Down Expand Up @@ -236,8 +238,7 @@ impl fmt::Display for HandlerError {

impl ResponseError for HandlerError {
fn error_response(&self) -> HttpResponse {
let mut resp = HttpResponse::build(self.status_code());
resp.json(json!({
HttpResponse::build(self.status_code()).json(json!({
"code": self.kind().http_status().as_u16(),
"errno": self.kind().errno(),
"error": self.kind().as_response_string(),
Expand Down
3 changes: 1 addition & 2 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use slog_mozlog_json::MozLogJson;
/// the `human_logs` setting (see [crate::settings::Settings])
pub fn init_logging(json: bool) -> HandlerResult<()> {
let logger = if json {
let hostname = hostname::get()
.expect("Couldn't get hostname")
let hostname = gethostname::gethostname()
.into_string()
.expect("Couldn't get hostname");

Expand Down
33 changes: 16 additions & 17 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Metric collection and reporting via the `cadence` library.
use std::net::UdpSocket;
use std::time::Instant;
use std::{net::UdpSocket, sync::Arc, time::Instant};

use actix_web::{error::ErrorInternalServerError, web::Data, Error, HttpRequest};
use actix_web::{error::ErrorInternalServerError, web::Data, Error, HttpMessage, HttpRequest};
use cadence::{
BufferedUdpMetricSink, Counted, CountedExt, Metric, NopMetricSink, QueuingMetricSink,
StatsdClient, Timed,
Expand All @@ -25,7 +24,7 @@ pub struct MetricTimer {
/// The metric wrapper
#[derive(Debug, Clone)]
pub struct Metrics {
client: Option<StatsdClient>,
client: Option<Arc<StatsdClient>>,
tags: Option<Tags>,
timer: Option<MetricTimer>,
}
Expand Down Expand Up @@ -68,7 +67,7 @@ impl From<&HttpRequest> for Metrics {
let tags = exts.get::<Tags>().unwrap_or(&def_tags);
Metrics {
client: match req.app_data::<Data<ServerState>>() {
Some(v) => Some(*v.metrics.clone()),
Some(v) => Some(Arc::clone(&v.metrics)),
None => {
warn!("⚠️ metric error: No App State");
None
Expand All @@ -80,10 +79,10 @@ impl From<&HttpRequest> for Metrics {
}
}

impl From<&StatsdClient> for Metrics {
fn from(client: &StatsdClient) -> Self {
impl From<Arc<StatsdClient>> for Metrics {
fn from(client: Arc<StatsdClient>) -> Self {
Metrics {
client: Some(client.clone()),
client: Some(client),
tags: None,
timer: None,
}
Expand All @@ -93,7 +92,7 @@ impl From<&StatsdClient> for Metrics {
impl From<&ServerState> for Metrics {
fn from(state: &ServerState) -> Self {
Metrics {
client: Some(*state.metrics.clone()),
client: Some(Arc::clone(&state.metrics)),
tags: None,
timer: None,
}
Expand All @@ -108,7 +107,7 @@ impl Metrics {

pub fn noop() -> Self {
Self {
client: Some(Self::sink()),
client: Some(Arc::new(Self::sink())),
timer: None,
tags: None,
}
Expand Down Expand Up @@ -199,13 +198,13 @@ impl Metrics {
}

/// Fetch the metric information from the current [HttpRequest]
pub fn metrics_from_req(req: &HttpRequest) -> Result<Box<StatsdClient>, Error> {
Ok(req
.app_data::<Data<ServerState>>()
.ok_or_else(|| ErrorInternalServerError("Could not get state"))
.expect("Could not get state in metrics_from_req")
.metrics
.clone())
pub fn metrics_from_req(req: &HttpRequest) -> Result<Arc<StatsdClient>, Error> {
Ok(Arc::clone(
&req.app_data::<Data<ServerState>>()
.ok_or_else(|| ErrorInternalServerError("Could not get state"))
.expect("Could not get state in metrics_from_req")
.metrics,
))
}

/// Create a cadence StatsdClient from the given options
Expand Down
Loading

0 comments on commit bd52656

Please sign in to comment.