diff --git a/src/activity_queue.rs b/src/activity_queue.rs index 6e4ad99..31b433b 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -13,6 +13,7 @@ use crate::{ use anyhow::anyhow; use bytes::Bytes; +use dyn_clone::{DynClone, clone_trait_object}; use futures_core::Future; use http::{header::HeaderName, HeaderMap, HeaderValue}; use httpdate::fmt_http_date; @@ -96,6 +97,7 @@ where private_key: private_key.clone(), http_signature_compat: config.http_signature_compat, }; + let _ = config.outbound_storage.store_task(&message); // Don't use the activity queue if this is in debug mode, send and wait directly if config.debug { @@ -259,6 +261,36 @@ pub(crate) struct ActivityQueue { retry_sender_task: JoinHandle<()>, } +/// A trait for manipulating a cache of SendActivityTask +pub trait StorageInterface: DynClone + Send { + /// Store an SendActivityTask to disk + fn store_task(&self, task: &SendActivityTask) -> Result<(), anyhow::Error>; + + /// read SendActivityTask from disk + fn read_task(&self)->Result>, anyhow::Error>; + + /// Delete the SendActivityTask on disk + fn delete_task(&self, task: &SendActivityTask) -> Result<(), anyhow::Error>; +} + +#[derive(Clone)] +pub struct DefaultStorageHandler(); + +impl StorageInterface for DefaultStorageHandler { + fn store_task(&self, task: &SendActivityTask) -> Result<(), anyhow::Error> { + Ok(()) + } + + fn read_task(&self)->Result>, anyhow::Error> { + Ok(None) + } + + fn delete_task(&self, task: &SendActivityTask) -> Result<(), anyhow::Error> { + Ok(()) + } +} + +clone_trait_object!(StorageInterface); /// Simple stat counter to show where we're up to with sending messages /// This is a lock-free way to share things between tasks /// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning diff --git a/src/config.rs b/src/config.rs index 3500289..9621043 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,7 +16,7 @@ //! ``` use crate::{ - activity_queue::{create_activity_queue, ActivityQueue}, + activity_queue::{create_activity_queue, ActivityQueue, DefaultStorageHandler, StorageInterface}, error::Error, protocol::verification::verify_domains_match, traits::{ActivityHandler, Actor}, @@ -92,6 +92,11 @@ pub struct FederationConfig { /// present once constructed. #[builder(setter(skip))] pub(crate) activity_queue: Option>, + + /// Implements the StorageInterface trait which provides an interface for + /// storing objects and removing objects when appropiate + #[builder(default = "Box::new(DefaultStorageHandler())")] + pub(crate) outbound_storage: Box, } impl FederationConfig {