diff --git a/relay-common/src/retry.rs b/relay-common/src/retry.rs index d7d7b53fdd..bc2530d3db 100644 --- a/relay-common/src/retry.rs +++ b/relay-common/src/retry.rs @@ -11,6 +11,7 @@ const DEFAULT_RANDOMIZATION: f64 = 0.0; const INITIAL_INTERVAL: u64 = 1000; /// A retry interval generator that increases timeouts with exponential backoff. +#[derive(Debug)] pub struct RetryBackoff { backoff: ExponentialBackoff, attempt: usize, diff --git a/relay-server/src/actors/relays.rs b/relay-server/src/actors/relays.rs index 2aeb053694..c67f9cae3b 100644 --- a/relay-server/src/actors/relays.rs +++ b/relay-server/src/actors/relays.rs @@ -1,41 +1,133 @@ -//! This actor caches known public keys. use std::borrow::Cow; use std::collections::HashMap; -use std::mem; use std::sync::Arc; use std::time::{Duration, Instant}; -use ::actix::fut; -use ::actix::prelude::*; -use actix_web::{http::Method, HttpResponse, ResponseError}; -use failure::Fail; -use futures01::{future, future::Shared, sync::oneshot, Future}; +use actix::SystemService; +use actix_web::http::Method; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; use relay_auth::{PublicKey, RelayId}; use relay_common::RetryBackoff; use relay_config::{Config, RelayInfo}; use relay_log::LogError; +use relay_system::{compat, Addr, AsyncResponse, FromMessage, Interface, Sender, Service}; use crate::actors::upstream::{RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay}; -use crate::utils::{self, ApiErrorResponse, Response}; +use crate::service::REGISTRY; +use crate::utils::SleepHandle; -#[derive(Fail, Debug)] -#[fail(display = "failed to fetch keys")] -pub enum KeyError { - #[fail(display = "failed to fetch relay key from upstream")] - FetchFailed, +/// Resolves [`RelayInfo`] by it's [identifier](RelayId). +/// +/// This message may fail if the upstream is not reachable repeatedly and Relay information cannot +/// be resolved. +#[derive(Debug)] +pub struct GetRelay { + /// The unique identifier of the Relay deployment. + /// + /// This is part of the Relay credentials file and determined during setup. + pub relay_id: RelayId, +} + +/// Response of a [`GetRelay`] message. +/// +/// This is `Some` if the Relay is known by the upstream or `None` the Relay is unknown. +pub type GetRelayResult = Option; + +/// Manages authentication information for downstream Relays. +#[derive(Debug)] +pub struct RelayCache(GetRelay, Sender); + +impl RelayCache { + pub fn from_registry() -> Addr { + REGISTRY.get().unwrap().relay_cache.clone() + } +} + +impl Interface for RelayCache {} + +impl FromMessage for RelayCache { + type Response = AsyncResponse; + + fn from_message(message: GetRelay, sender: Sender) -> Self { + Self(message, sender) + } +} + +/// Compatibility format for deserializing [`GetRelaysResponse`] from the legacy endpoint. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PublicKeysResultCompatibility { + /// DEPRECATED. Legacy format only public key info. + #[serde(default, rename = "public_keys")] + pub public_keys: HashMap>, + + /// A map from Relay's identifier to its information. + /// + /// Missing entries or explicit `None` both indicate that a Relay with this ID is not known by + /// the upstream and should not be authenticated. + #[serde(default)] + pub relays: HashMap>, +} - #[fail(display = "could not schedule key fetching")] - ScheduleFailed, +/// Response of the [`GetRelays`] upstream query. +/// +/// Former versions of the endpoint returned a different response containing only public keys, +/// defined by [`PublicKeysResultCompatibility`]. Relay's own endpoint is allowed to skip this field +/// and return just the new information. +#[derive(Debug, Serialize, Deserialize)] +pub struct GetRelaysResponse { + /// A map from Relay's identifier to its information. + /// + /// Missing entries or explicit `None` both indicate that a Relay with this ID is not known by + /// the upstream and should not be authenticated. + pub relays: HashMap>, } -impl ResponseError for KeyError { - fn error_response(&self) -> HttpResponse { - HttpResponse::BadGateway().json(&ApiErrorResponse::from_fail(self)) +impl From for GetRelaysResponse { + fn from(relays_info: PublicKeysResultCompatibility) -> Self { + let relays = if relays_info.relays.is_empty() && !relays_info.public_keys.is_empty() { + relays_info + .public_keys + .into_iter() + .map(|(id, pk)| (id, pk.map(RelayInfo::new))) + .collect() + } else { + relays_info.relays + }; + Self { relays } } } +/// Upstream batch query to resolve information for Relays by ID. +#[derive(Debug, Deserialize, Serialize)] +pub struct GetRelays { + /// A list of Relay deployment identifiers to fetch. + pub relay_ids: Vec, +} + +impl UpstreamQuery for GetRelays { + type Response = PublicKeysResultCompatibility; + + fn method(&self) -> Method { + Method::POST + } + + fn path(&self) -> Cow<'static, str> { + Cow::Borrowed("/api/0/relays/publickeys/") + } + + fn priority() -> RequestPriority { + RequestPriority::High + } + + fn retry() -> bool { + false + } +} + +/// Cache entry with metadata. #[derive(Debug)] enum RelayState { Exists { @@ -48,6 +140,7 @@ enum RelayState { } impl RelayState { + /// Returns `true` if this cache entry is still valid. fn is_valid_cache(&self, config: &Config) -> bool { match *self { RelayState::Exists { checked_at, .. } => { @@ -59,6 +152,9 @@ impl RelayState { } } + /// Returns `Some` if there is an existing entry. + /// + /// This entry may be expired; use `is_valid_cache` to verify this. fn as_option(&self) -> Option<&RelayInfo> { match *self { RelayState::Exists { ref relay, .. } => Some(relay), @@ -66,6 +162,7 @@ impl RelayState { } } + /// Constructs a cache entry from an upstream response. fn from_option(option: Option) -> Self { match option { Some(relay) => RelayState::Exists { @@ -79,49 +176,44 @@ impl RelayState { } } -#[derive(Debug)] -struct RelayInfoChannel { - sender: oneshot::Sender>, - receiver: Shared>>, -} +/// Result type of the background fetch task. +/// +/// - `Ok`: The task succeeded and information from the response should be inserted into the cache. +/// - `Err`: The task failed and the senders should be placed back for the next fetch. +type FetchResult = Result>>>; -impl RelayInfoChannel { - pub fn new() -> Self { - let (sender, receiver) = oneshot::channel(); - RelayInfoChannel { - sender, - receiver: receiver.shared(), - } - } - - pub fn send(self, value: Option) -> Result<(), Option> { - self.sender.send(value) - } - - pub fn receiver(&self) -> Shared>> { - self.receiver.clone() - } -} - -pub struct RelayCache { +/// Service implementing the [`RelayCache`] interface. +#[derive(Debug)] +pub struct RelayCacheService { + static_relays: HashMap, + relays: HashMap, + senders: HashMap>>, + fetch_channel: (mpsc::Sender, mpsc::Receiver), backoff: RetryBackoff, + delay: SleepHandle, config: Arc, - relays: HashMap, - static_relays: HashMap, - relay_channels: HashMap, } -impl RelayCache { +impl RelayCacheService { + /// Creates a new [`RelayCache`] service. pub fn new(config: Arc) -> Self { - RelayCache { - backoff: RetryBackoff::new(config.http_max_retry_interval()), + Self { static_relays: config.static_relays().clone(), - config, relays: HashMap::new(), - relay_channels: HashMap::new(), + senders: HashMap::new(), + fetch_channel: mpsc::channel(1), + backoff: RetryBackoff::new(config.http_max_retry_interval()), + delay: SleepHandle::idle(), + config, } } + /// Returns a clone of the sender for the background fetch task. + fn fetch_tx(&self) -> mpsc::Sender { + let (ref tx, _) = self.fetch_channel; + tx.clone() + } + /// Returns the backoff timeout for a batched upstream query. /// /// If previous queries succeeded, this will be the general batch interval. Additionally, an @@ -131,75 +223,97 @@ impl RelayCache { } /// Schedules a batched upstream query with exponential backoff. - fn schedule_fetch(&mut self, context: &mut Context) { - utils::run_later(self.next_backoff(), Self::fetch_relays).spawn(context) + fn schedule_fetch(&mut self) { + let backoff = self.next_backoff(); + self.delay.set(backoff); } /// Executes an upstream request to fetch information on downstream Relays. /// /// This assumes that currently no request is running. If the upstream request fails or new /// channels are pushed in the meanwhile, this will reschedule automatically. - fn fetch_relays(&mut self, context: &mut Context) { - let channels = mem::take(&mut self.relay_channels); + fn fetch_relays(&mut self) { + let channels = std::mem::take(&mut self.senders); relay_log::debug!( "updating public keys for {} relays (attempt {})", channels.len(), self.backoff.attempt(), ); - let request = GetRelays { - relay_ids: channels.keys().cloned().collect(), - }; - - UpstreamRelay::from_registry() - .send(SendQuery(request)) - .map_err(|_| KeyError::ScheduleFailed) - .into_actor(self) - .and_then(|response, slf, ctx| { - match response { - Ok(response) => { - let mut response = GetRelaysResult::from(response); - slf.backoff.reset(); - - for (id, channel) in channels { - let info = response.relays.remove(&id).unwrap_or(None); - slf.relays.insert(id, RelayState::from_option(info.clone())); - relay_log::debug!("relay {} public key updated", id); - channel.send(info).ok(); + let fetch_tx = self.fetch_tx(); + tokio::spawn(async move { + let request = GetRelays { + relay_ids: channels.keys().cloned().collect(), + }; + + let upstream = UpstreamRelay::from_registry(); + let query_result = match compat::send(upstream, SendQuery(request)).await { + Ok(inner) => inner, + // Drop the senders to propagate the SendError up. + Err(_send_error) => return, + }; + + let fetch_result = match query_result { + Ok(response) => { + let response = GetRelaysResponse::from(response); + + for (id, channels) in channels { + relay_log::debug!("relay {} public key updated", id); + let info = response.relays.get(&id).unwrap_or(&None); + for channel in channels { + channel.send(info.clone()); } } - Err(error) => { - relay_log::error!("error fetching public keys: {}", LogError(&error)); - // Put the channels back into the queue, in addition to channels that have - // been pushed in the meanwhile. We will retry again shortly. - slf.relay_channels.extend(channels); - } + Ok(response) } + Err(error) => { + relay_log::error!("error fetching public keys: {}", LogError(&error)); + Err(channels) + } + }; + + fetch_tx.send(fetch_result).await.ok(); + }); + } + + /// Handles results from the background fetch task. + fn handle_fetch_result(&mut self, result: FetchResult) { + match result { + Ok(response) => { + self.backoff.reset(); - if !slf.relay_channels.is_empty() { - slf.schedule_fetch(ctx); + for (id, info) in response.relays { + self.relays.insert(id, RelayState::from_option(info)); } + } + Err(channels) => { + self.senders.extend(channels); + } + } - fut::ok(()) - }) - .drop_err() - .spawn(context); + if !self.senders.is_empty() { + self.schedule_fetch(); + } } - fn get_or_fetch_info( - &mut self, - relay_id: RelayId, - context: &mut Context, - ) -> Response<(RelayId, Option), KeyError> { - //first check the statically configured relays + /// Resolves information for a Relay and passes it to the sender. + /// + /// Sends information immediately if it is available in the cache. Otherwise, this schedules a + /// delayed background fetch and queues the sender. + fn get_or_fetch(&mut self, message: GetRelay, sender: Sender) { + let relay_id = message.relay_id; + + // First check the statically configured relays if let Some(key) = self.static_relays.get(&relay_id) { - return Response::ok((relay_id, Some(key.clone()))); + sender.send(Some(key.clone())); + return; } if let Some(key) = self.relays.get(&relay_id) { if key.is_valid_cache(&self.config) { - return Response::ok((relay_id, key.as_option().cloned())); + sender.send(key.as_option().cloned()); + return; } } @@ -208,165 +322,41 @@ impl RelayCache { "No credentials configured. Relay {} cannot send requests to this relay.", relay_id ); - return Response::ok((relay_id, None)); + sender.send(None); + return; } relay_log::debug!("relay {} public key requested", relay_id); - if !self.backoff.started() { - self.backoff.reset(); - self.schedule_fetch(context); - } - - let receiver = self - .relay_channels + self.senders .entry(relay_id) - .or_insert_with(RelayInfoChannel::new) - .receiver() - .map(move |key| (relay_id, (*key).clone())) - .map_err(|_| KeyError::FetchFailed); - - Response::future(receiver) - } -} - -impl Actor for RelayCache { - type Context = Context; - - fn started(&mut self, _ctx: &mut Self::Context) { - relay_log::info!("key cache started"); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - relay_log::info!("key cache stopped"); - } -} - -impl Supervised for RelayCache {} - -impl SystemService for RelayCache {} - -impl Default for RelayCache { - fn default() -> Self { - unimplemented!("register with the SystemRegistry instead") - } -} + .or_insert_with(Vec::new) + .push(sender); -#[derive(Debug)] -pub struct GetRelay { - pub relay_id: RelayId, -} - -#[derive(Debug)] -pub struct GetRelayResult { - pub relay: Option, -} - -impl Message for GetRelay { - type Result = Result; -} - -impl Handler for RelayCache { - type Result = Response; - - fn handle(&mut self, message: GetRelay, context: &mut Self::Context) -> Self::Result { - self.get_or_fetch_info(message.relay_id, context) - .map(|(_id, relay)| GetRelayResult { relay }) - } -} - -#[derive(Debug, Deserialize, Serialize)] -pub struct GetRelays { - pub relay_ids: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct GetRelaysResult { - /// new format public key plus additional parameters - pub relays: HashMap>, -} - -impl From for GetRelaysResult { - fn from(relays_info: PublicKeysResultCompatibility) -> Self { - let relays = if relays_info.relays.is_empty() && !relays_info.public_keys.is_empty() { - relays_info - .public_keys - .into_iter() - .map(|(id, pk)| (id, pk.map(RelayInfo::new))) - .collect() - } else { - relays_info.relays - }; - Self { relays } + if !self.backoff.started() { + self.schedule_fetch(); + } } } -/// Defines a compatibility format for deserializing relays info that supports -/// both the old and the new format for relay info -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PublicKeysResultCompatibility { - /// DEPRECATED. Legacy format only public key info. - #[serde(default, rename = "public_keys")] - pub public_keys: HashMap>, - /// A map from Relay's identifier to its information. - #[serde(default)] - pub relays: HashMap>, -} +impl Service for RelayCacheService { + type Interface = RelayCache; -impl Message for GetRelays { - type Result = Result; -} + fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + tokio::spawn(async move { + relay_log::info!("key cache started"); -impl UpstreamQuery for GetRelays { - type Response = PublicKeysResultCompatibility; + loop { + tokio::select! { + biased; - fn method(&self) -> Method { - Method::POST - } - - fn path(&self) -> Cow<'static, str> { - Cow::Borrowed("/api/0/relays/publickeys/") - } - - fn priority() -> RequestPriority { - RequestPriority::High - } - - fn retry() -> bool { - false - } -} - -impl Handler for RelayCache { - type Result = Response; - - fn handle(&mut self, message: GetRelays, context: &mut Self::Context) -> Self::Result { - let mut relays = HashMap::new(); - let mut futures = Vec::new(); - - for id in message.relay_ids { - match self.get_or_fetch_info(id, context) { - Response::Future(fut) => { - futures.push(fut); - } - Response::Reply(Ok((id, key))) => { - relays.insert(id, key); - } - Response::Reply(Err(_)) => { - // Cannot happen + Some(result) = self.fetch_channel.1.recv() => self.handle_fetch_result(result), + () = &mut self.delay => self.fetch_relays(), + Some(message) = rx.recv() => self.get_or_fetch(message.0, message.1), + else => break, } } - } - if futures.is_empty() { - return Response::reply(Ok(GetRelaysResult { relays })); - } - - let future = future::join_all(futures).map(move |responses| { - relays.extend(responses); - GetRelaysResult { relays } + relay_log::info!("key cache stopped"); }); - - Response::future(future) } } diff --git a/relay-server/src/endpoints/public_keys.rs b/relay-server/src/endpoints/public_keys.rs index f57ed6bebf..ce166a3f18 100644 --- a/relay-server/src/endpoints/public_keys.rs +++ b/relay-server/src/endpoints/public_keys.rs @@ -1,17 +1,32 @@ +use std::collections::HashMap; + use actix_web::{actix::*, Error, Json}; -use futures01::prelude::*; +use futures::{future, FutureExt, TryFutureExt}; -use crate::actors::relays::{GetRelays, GetRelaysResult, RelayCache}; +use crate::actors::relays::{GetRelay, GetRelays, GetRelaysResponse, RelayCache}; use crate::extractors::SignedJson; use crate::service::ServiceApp; -fn get_public_keys(body: SignedJson) -> ResponseFuture, Error> { - let future = RelayCache::from_registry() - .send(body.inner) - .map_err(Error::from) - .and_then(|x| x.map_err(Error::from).map(Json)); +fn get_public_keys(body: SignedJson) -> ResponseFuture, Error> { + let future = async move { + let relay_cache = RelayCache::from_registry(); + + let relay_ids = body.inner.relay_ids.into_iter(); + let futures = relay_ids.map(|relay_id| { + let inner = relay_cache.send(GetRelay { relay_id }); + async move { (relay_id, inner.await) } + }); + + let mut relays = HashMap::new(); + for (relay_id, result) in future::join_all(futures).await { + let relay_info = result.map_err(|_| Error::from(MailboxError::Closed))?; + relays.insert(relay_id, relay_info); + } + + Ok(Json(GetRelaysResponse { relays })) + }; - Box::new(future) + Box::new(future.boxed().compat()) } /// Registers the Relay public keys endpoint. diff --git a/relay-server/src/extractors/signed_json.rs b/relay-server/src/extractors/signed_json.rs index 074361b974..822606f4a5 100644 --- a/relay-server/src/extractors/signed_json.rs +++ b/relay-server/src/extractors/signed_json.rs @@ -2,6 +2,7 @@ use actix_web::actix::*; use actix_web::http::StatusCode; use actix_web::{Error, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError}; use failure::Fail; +use futures::{FutureExt, TryFutureExt}; use futures01::prelude::*; use serde::de::DeserializeOwned; @@ -89,12 +90,10 @@ impl FromRequest for SignedJson let future = RelayCache::from_registry() .send(GetRelay { relay_id }) - .map_err(Error::from) - .and_then(|result| { - result? - .relay - .ok_or_else(|| Error::from(SignatureError::UnknownRelay)) - }) + .boxed() + .compat() + .map_err(|_| Error::from(MailboxError::Closed)) + .and_then(|result| result.ok_or_else(|| Error::from(SignatureError::UnknownRelay))) .join(RequestBody::new(req, MAX_JSON_SIZE).map_err(Error::from)) .and_then(move |(relay, body)| { relay diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 72568bb824..9765006508 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -19,7 +19,7 @@ use crate::actors::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutco use crate::actors::outcome_aggregator::OutcomeAggregator; use crate::actors::processor::{EnvelopeProcessor, EnvelopeProcessorService}; use crate::actors::project_cache::ProjectCache; -use crate::actors::relays::RelayCache; +use crate::actors::relays::{RelayCache, RelayCacheService}; use crate::actors::test_store::{TestStore, TestStoreService}; use crate::actors::upstream::UpstreamRelay; use crate::middlewares::{ @@ -117,6 +117,7 @@ pub struct Registry { pub processor: Addr, pub envelope_manager: Addr, pub test_store: Addr, + pub relay_cache: Addr, } impl fmt::Debug for Registry { @@ -189,7 +190,7 @@ impl ServiceState { registry.set(project_cache.clone()); let health_check = HealthCheckService::new(config.clone()).start(); - registry.set(RelayCache::new(config.clone()).start()); + let relay_cache = RelayCacheService::new(config.clone()).start(); let aggregator = Aggregator::new(config.aggregator_config(), project_cache.recipient()); registry.set(Arbiter::start(|_| aggregator)); @@ -208,6 +209,7 @@ impl ServiceState { outcome_aggregator, envelope_manager, test_store, + relay_cache, })) .unwrap(); diff --git a/relay-server/src/utils/actix.rs b/relay-server/src/utils/actix.rs index 838cf188da..741ec15c26 100644 --- a/relay-server/src/utils/actix.rs +++ b/relay-server/src/utils/actix.rs @@ -15,10 +15,6 @@ impl Response { Response::Reply(Ok(value)) } - pub fn reply(result: Result) -> Self { - Response::Reply(result) - } - pub fn future(future: F) -> Self where F: IntoFuture, @@ -28,18 +24,6 @@ impl Response { } } -impl Response { - pub fn map(self, f: F) -> Response - where - F: FnOnce(T) -> U, - { - match self { - Response::Reply(result) => Response::reply(result.map(f)), - Response::Future(future) => Response::future(future.map(f)), - } - } -} - impl MessageResponse for Response where A: Actor, diff --git a/relay-server/src/utils/sleep_handle.rs b/relay-server/src/utils/sleep_handle.rs index 0e58d2b67e..6d7f0d3c3a 100644 --- a/relay-server/src/utils/sleep_handle.rs +++ b/relay-server/src/utils/sleep_handle.rs @@ -5,8 +5,11 @@ use std::time::Duration; /// A future wrapper around [`tokio::time::Sleep`]. /// -/// This has two internal states, either it is pending indefinite or it wakes up after a certain -/// duration of time has elapsed. +/// When initialized with [`SleepHandle::idle`], this future is pending indefinitely every time it +/// is polled. To initiate a delay, use [`set`](Self::set). After the delay has passed, the future +/// resolves with `()` **exactly once** and resets to idle. To reset the future while it is +/// sleeping, use [`reset`](Self::reset). +#[derive(Debug)] pub struct SleepHandle(Option>>); impl SleepHandle { @@ -35,9 +38,15 @@ impl Future for SleepHandle { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - match &mut self.0 { + let poll = match &mut self.0 { Some(sleep) => Pin::new(sleep).poll(cx), None => Poll::Pending, + }; + + if poll.is_ready() { + self.reset(); } + + poll } }