diff --git a/Cargo.lock b/Cargo.lock index 951fa9e3..e29d235a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1012,6 +1012,7 @@ dependencies = [ "ed25519-dalek", "futures", "kitsune2_api", + "kitsune2_memory", "prost", "rand", "serde", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 3c8c7d3d..17391d49 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -29,5 +29,6 @@ axum = { workspace = true, default-features = false, features = [ "tokio", ] } ed25519-dalek = { workspace = true, features = ["rand_core"] } +kitsune2_memory = { workspace = true } rand = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/crates/core/src/factories/core_fetch.rs b/crates/core/src/factories/core_fetch.rs index 584e805a..7de55025 100644 --- a/crates/core/src/factories/core_fetch.rs +++ b/crates/core/src/factories/core_fetch.rs @@ -19,7 +19,9 @@ //! //! ### Fetch tasks //! -//! A channel acts as the queue structure for the fetch tasks. Requests to send are passed +//! #### Requests +//! +//! A channel acts as the queue structure for the fetch request tasks. Ops to fetch are sent //! one by one through the channel to the receiving tasks running in parallel. The flow //! of sending fetch requests is as follows: //! @@ -36,24 +38,38 @@ //! from the set of requests if it is received in the meantime, and thus prevent a redundant //! fetch request. //! +//! #### Responses +//! +//! Similarly to fetch requests, a channel serves as a queue for responses to fetch requests. The queue +//! has the following properties: +//! - Simple queue which processes items in the order of the incoming requests. +//! - Requests consist of a list of requested op ids and the requesting agent id. +//! - Attempts to look up the op in the data store and send response are done once. +//! - Requests for data that the remote doesn't hold should be logged. +//! - If none of the requested ops could be read from the store, no response is sent. +//! - If sending or reception fails, it's the caller's responsibility to request again. +//! //! ### Incoming op task //! //! - Incoming op is written to the data store. //! - Once persisted successfully, op is removed from the set of ops to fetch. +use kitsune2_api::{ + builder, + fetch::{ + serialize_op_ids, serialize_ops, DynFetch, DynFetchFactory, Fetch, + FetchFactory, + }, + op_store, peer_store, + transport::DynTransport, + AgentId, BoxFut, K2Result, OpId, SpaceId, Url, +}; use std::{ collections::HashSet, sync::{Arc, Mutex}, }; use back_off::BackOffList; -use kitsune2_api::{ - builder, - fetch::{serialize_op_ids, DynFetch, DynFetchFactory, Fetch, FetchFactory}, - peer_store, - transport::DynTransport, - AgentId, BoxFut, K2Result, OpId, SpaceId, Url, -}; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, task::JoinHandle, @@ -129,6 +145,7 @@ impl FetchFactory for CoreFetchFactory { builder: Arc, space_id: SpaceId, peer_store: peer_store::DynPeerStore, + op_store: op_store::DynOpStore, transport: DynTransport, ) -> BoxFut<'static, K2Result> { Box::pin(async move { @@ -138,6 +155,7 @@ impl FetchFactory for CoreFetchFactory { config.core_fetch, space_id, peer_store, + op_store, transport, )); Ok(out) @@ -146,6 +164,7 @@ impl FetchFactory for CoreFetchFactory { } type FetchRequest = (OpId, AgentId); +type FetchResponse = (Vec, AgentId); #[derive(Debug)] struct State { @@ -156,8 +175,9 @@ struct State { #[derive(Debug)] struct CoreFetch { state: Arc>, - fetch_queue_tx: Sender, - fetch_tasks: Vec>, + request_tx: Sender, + response_tx: Sender, + tasks: Vec>, } impl CoreFetch { @@ -165,9 +185,12 @@ impl CoreFetch { config: CoreFetchConfig, space_id: SpaceId, peer_store: peer_store::DynPeerStore, + op_store: op_store::DynOpStore, transport: DynTransport, ) -> Self { - Self::spawn_fetch_tasks(config, space_id, peer_store, transport) + Self::spawn_fetch_tasks( + config, space_id, peer_store, op_store, transport, + ) } } @@ -192,10 +215,10 @@ impl Fetch for CoreFetch { // Pass requests to fetch tasks. for op_id in op_list { if let Err(err) = - self.fetch_queue_tx.send((op_id, source.clone())).await + self.request_tx.send((op_id, source.clone())).await { tracing::warn!( - "could not pass fetch request to fetch task: {err}" + "could not pass fetch request to request task: {err}" ); } } @@ -203,6 +226,21 @@ impl Fetch for CoreFetch { Ok(()) }) } + + fn respond_with_ops( + &self, + ops: Vec, + to: AgentId, + ) -> BoxFut<'_, K2Result<()>> { + Box::pin(async move { + if let Err(err) = self.response_tx.send((ops, to)).await { + tracing::warn!( + "could not pass fetch request to response task: {err}" + ); + } + Ok(()) + }) + } } impl CoreFetch { @@ -210,11 +248,16 @@ impl CoreFetch { config: CoreFetchConfig, space_id: SpaceId, peer_store: peer_store::DynPeerStore, + op_store: op_store::DynOpStore, transport: DynTransport, ) -> Self { - // Create a channel to send new requests to fetch to the tasks. This is in effect the fetch queue. - let (fetch_queue_tx, fetch_queue_rx) = channel::(16_384); - let fetch_queue_rx = Arc::new(tokio::sync::Mutex::new(fetch_queue_rx)); + // Create a channel to send op requests to request tasks. This is the fetch queue. + let (request_tx, request_rx) = channel::(16_384); + let request_rx = Arc::new(tokio::sync::Mutex::new(request_rx)); + + // Create another channel to send incoming requests to the response task. This is the fetch response queue. + let (response_tx, response_rx) = channel::(16_384); + let response_rx = Arc::new(tokio::sync::Mutex::new(response_rx)); let state = Arc::new(Mutex::new(State { requests: HashSet::new(), @@ -225,28 +268,41 @@ impl CoreFetch { ), })); - let mut fetch_tasks = + let mut tasks = Vec::with_capacity(config.parallel_request_count as usize); + // Spawn request tasks. for _ in 0..config.parallel_request_count { - let task = tokio::task::spawn(CoreFetch::fetch_task( - state.clone(), - fetch_queue_tx.clone(), - fetch_queue_rx.clone(), - peer_store.clone(), - space_id.clone(), - transport.clone(), - )); - fetch_tasks.push(task); + let request_task = + tokio::task::spawn(CoreFetch::spawn_request_task( + state.clone(), + request_tx.clone(), + request_rx.clone(), + peer_store.clone(), + space_id.clone(), + transport.clone(), + )); + tasks.push(request_task); } + // Spawn response task. + let response_task = tokio::task::spawn(CoreFetch::spawn_response_task( + response_rx, + peer_store, + op_store, + transport.clone(), + space_id.clone(), + )); + tasks.push(response_task); + Self { state, - fetch_queue_tx, - fetch_tasks, + request_tx, + response_tx, + tasks, } } - async fn fetch_task( + async fn spawn_request_task( state: Arc>, fetch_request_tx: Sender, fetch_request_rx: Arc>>, @@ -335,6 +391,54 @@ impl CoreFetch { } } + async fn spawn_response_task( + response_rx: Arc>>, + peer_store: peer_store::DynPeerStore, + op_store: op_store::DynOpStore, + transport: DynTransport, + space_id: SpaceId, + ) { + while let Some((op_ids, agent_id)) = + response_rx.lock().await.recv().await + { + tracing::debug!(?op_ids, ?agent_id, "incoming request"); + let peer = match CoreFetch::get_peer_url_from_store( + &agent_id, + peer_store.clone(), + ) + .await + { + None => continue, + Some(url) => url, + }; + + // Fetch ops to send from store. + let ops = match op_store.read_ops(op_ids.clone()).await { + Err(err) => { + tracing::error!("could not read ops from store: {err}"); + continue; + } + Ok(ops) => ops, + }; + if ops.is_empty() { + // Do not send a response when no ops could be read. + continue; + } + let data = serialize_ops(ops); + + if let Err(err) = transport + .send_module(peer, space_id.clone(), MOD_NAME.to_string(), data) + .await + { + tracing::warn!( + ?op_ids, + ?agent_id, + "could not send ops to requesting agent: {err}" + ); + } + } + } + async fn get_peer_url_from_store( agent_id: &AgentId, peer_store: peer_store::DynPeerStore, @@ -365,7 +469,7 @@ impl CoreFetch { impl Drop for CoreFetch { fn drop(&mut self) { - for t in self.fetch_tasks.iter() { + for t in self.tasks.iter() { t.abort(); } } diff --git a/crates/core/src/factories/core_fetch/back_off.rs b/crates/core/src/factories/core_fetch/back_off.rs index b5ee538a..35f7f1d9 100644 --- a/crates/core/src/factories/core_fetch/back_off.rs +++ b/crates/core/src/factories/core_fetch/back_off.rs @@ -8,9 +8,9 @@ use kitsune2_api::AgentId; #[derive(Debug)] pub struct BackOffList { - state: HashMap, - first_back_off_interval: Duration, - last_back_off_interval: Duration, + pub state: HashMap, + pub first_back_off_interval: Duration, + pub last_back_off_interval: Duration, num_back_off_intervals: usize, } @@ -63,7 +63,7 @@ impl BackOffList { } #[derive(Debug)] -struct BackOff { +pub(crate) struct BackOff { back_off: backon::ExponentialBackoff, current_interval: Duration, interval_start: Instant, @@ -115,20 +115,10 @@ impl BackOff { #[cfg(test)] mod test { - use std::{sync::Arc, time::Duration}; + use std::time::Duration; - use kitsune2_api::{fetch::Fetch, SpaceId, Url}; - - use crate::{ - default_builder, - factories::{ - core_fetch::{ - back_off::BackOffList, - test::{create_op_list, random_agent_id, MockTransport}, - CoreFetch, CoreFetchConfig, - }, - test_utils::AgentBuilder, - }, + use crate::factories::core_fetch::{ + back_off::BackOffList, test::utils::random_agent_id, }; #[test] @@ -157,191 +147,4 @@ mod test { assert!(!back_off_list.is_agent_on_back_off(&agent_id)); } - - #[tokio::test(flavor = "multi_thread")] - async fn agent_on_back_off_is_removed_from_list_after_successful_send() { - let builder = - Arc::new(default_builder().with_default_config().unwrap()); - let peer_store = - builder.peer_store.create(builder.clone()).await.unwrap(); - let config = CoreFetchConfig { - first_back_off_interval: Duration::from_millis(10), - ..Default::default() - }; - let mock_transport = MockTransport::new(false); - - let op_list = create_op_list(1); - let agent_id = random_agent_id(); - let agent_info = AgentBuilder { - agent: Some(agent_id.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), - ..Default::default() - } - .build(); - peer_store.insert(vec![agent_info.clone()]).await.unwrap(); - - let fetch = CoreFetch::new( - config.clone(), - agent_info.space.clone(), - peer_store, - mock_transport.clone(), - ); - - let first_back_off_interval = { - let mut lock = fetch.state.lock().unwrap(); - lock.back_off_list.back_off_agent(&agent_id); - - assert!(lock.back_off_list.is_agent_on_back_off(&agent_id)); - - lock.back_off_list.first_back_off_interval - }; - - tokio::time::sleep(first_back_off_interval).await; - - fetch.add_ops(op_list, agent_id.clone()).await.unwrap(); - - tokio::time::timeout(Duration::from_millis(10), async { - loop { - tokio::time::sleep(Duration::from_millis(1)).await; - if !mock_transport.requests_sent.lock().unwrap().is_empty() { - break; - } - } - }) - .await - .unwrap(); - - assert!(fetch.state.lock().unwrap().back_off_list.state.is_empty()); - } - - #[tokio::test(flavor = "multi_thread")] - async fn requests_are_dropped_when_max_back_off_expired() { - let builder = - Arc::new(default_builder().with_default_config().unwrap()); - let peer_store = - builder.peer_store.create(builder.clone()).await.unwrap(); - let config = CoreFetchConfig { - first_back_off_interval: Duration::from_millis(10), - last_back_off_interval: Duration::from_millis(10), - ..Default::default() - }; - let mock_transport = MockTransport::new(true); - let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); - - let op_list_1 = create_op_list(2); - let agent_id_1 = random_agent_id(); - let agent_info_1 = AgentBuilder { - agent: Some(agent_id_1.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), - ..Default::default() - } - .build(); - let agent_url_1 = agent_info_1.url.clone().unwrap(); - peer_store.insert(vec![agent_info_1.clone()]).await.unwrap(); - - // Create a second agent to later check that their ops have not been removed. - let op_list_2 = create_op_list(2); - let agent_id_2 = random_agent_id(); - let agent_info_2 = AgentBuilder { - agent: Some(agent_id_2.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), - ..Default::default() - } - .build(); - peer_store.insert(vec![agent_info_2.clone()]).await.unwrap(); - - let fetch = CoreFetch::new( - config.clone(), - space_id.clone(), - peer_store, - mock_transport.clone(), - ); - - fetch - .add_ops(op_list_1.clone(), agent_id_1.clone()) - .await - .unwrap(); - - // Wait for one request to fail, so agent is put on back off list. - tokio::time::timeout(Duration::from_millis(10), async { - loop { - tokio::time::sleep(Duration::from_millis(1)).await; - if !mock_transport - .requests_sent - .lock() - .unwrap() - .clone() - .is_empty() - { - break; - } - } - }) - .await - .unwrap(); - - let current_number_of_requests_to_agent_1 = mock_transport - .requests_sent - .lock() - .unwrap() - .iter() - .filter(|(_, a)| *a != agent_url_1) - .count(); - - // Back off agent the maximum possible number of times. - let last_back_off_interval = { - let mut lock = fetch.state.lock().unwrap(); - assert!(op_list_1.iter().all(|op_id| lock - .requests - .contains(&(op_id.clone(), agent_id_1.clone())))); - for _ in 0..config.num_back_off_intervals { - lock.back_off_list.back_off_agent(&agent_id_1); - } - - lock.back_off_list.last_back_off_interval - }; - - // Wait for back off interval to expire. Afterwards the request should fail again and all - // of the agent's requests should be removed from the set. - tokio::time::sleep(last_back_off_interval).await; - - assert!(fetch - .state - .lock() - .unwrap() - .back_off_list - .has_last_back_off_expired(&agent_id_1)); - - // Add control agent's ops to set. - fetch.add_ops(op_list_2, agent_id_2.clone()).await.unwrap(); - - // Wait for another request attempt to agent 1, which should remove all of their requests - // from the set. - tokio::time::timeout(Duration::from_millis(10), async { - loop { - if mock_transport - .requests_sent - .lock() - .unwrap() - .iter() - .filter(|(_, a)| *a != agent_url_1) - .count() - > current_number_of_requests_to_agent_1 - { - break; - } - tokio::time::sleep(Duration::from_millis(1)).await; - } - }) - .await - .unwrap(); - - assert!(fetch - .state - .lock() - .unwrap() - .requests - .iter() - .all(|(_, agent_id)| *agent_id != agent_id_1),); - } } diff --git a/crates/core/src/factories/core_fetch/test.rs b/crates/core/src/factories/core_fetch/test.rs index b1a1ef8f..ddf65c6c 100644 --- a/crates/core/src/factories/core_fetch/test.rs +++ b/crates/core/src/factories/core_fetch/test.rs @@ -1,424 +1,34 @@ -use std::{ - sync::{Arc, Mutex}, - time::Duration, -}; +mod request_queue; +mod response_queue; -use bytes::Bytes; -use kitsune2_api::{ - fetch::{deserialize_op_ids, Fetch}, - id::Id, - transport::Transport, - AgentId, K2Error, OpId, SpaceId, Url, -}; -use rand::Rng; +#[cfg(test)] +pub(crate) mod utils { + use bytes::Bytes; + use kitsune2_api::{id::Id, AgentId, OpId}; + use rand::Rng; -use crate::{default_builder, factories::test_utils::AgentBuilder}; - -use super::{CoreFetch, CoreFetchConfig}; - -#[derive(Debug)] -pub struct MockTransport { - pub requests_sent: Arc>>, - pub fail: bool, -} - -impl MockTransport { - pub fn new(fail: bool) -> Arc { - Arc::new(Self { - requests_sent: Arc::new(Mutex::new(Vec::new())), - fail, - }) - } -} - -impl Transport for MockTransport { - fn send_module( - &self, - peer: kitsune2_api::Url, - _space: kitsune2_api::SpaceId, - _module: String, - data: bytes::Bytes, - ) -> kitsune2_api::BoxFut<'_, kitsune2_api::K2Result<()>> { - Box::pin(async move { - let op_ids = deserialize_op_ids(data).unwrap(); - let mut lock = self.requests_sent.lock().unwrap(); - op_ids.into_iter().for_each(|op_id| { - lock.push((op_id, peer.clone())); - }); - - if self.fail { - Err(K2Error::other("connection timed out")) - } else { - Ok(()) - } - }) - } - - fn disconnect( - &self, - _peer: Url, - _reason: Option, - ) -> kitsune2_api::BoxFut<'_, ()> { - unimplemented!() - } - - fn register_module_handler( - &self, - _space: SpaceId, - _module: String, - _handler: kitsune2_api::transport::DynTxModuleHandler, - ) { - unimplemented!() - } - - fn register_space_handler( - &self, - _space: SpaceId, - _handler: kitsune2_api::transport::DynTxSpaceHandler, - ) { - unimplemented!() - } - - fn send_space_notify( - &self, - _peer: Url, - _space: SpaceId, - _data: bytes::Bytes, - ) -> kitsune2_api::BoxFut<'_, kitsune2_api::K2Result<()>> { - unimplemented!() - } -} - -#[tokio::test(flavor = "multi_thread")] -async fn fetch_queue() { - let builder = Arc::new(default_builder().with_default_config().unwrap()); - let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); - let mock_transport = MockTransport::new(false); - let config = CoreFetchConfig::default(); - - let op_id = random_op_id(); - let op_list = vec![op_id.clone()]; - let agent_id = random_agent_id(); - let agent_info = AgentBuilder { - agent: Some(agent_id.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), - ..Default::default() - } - .build(); - let agent_url = agent_info.url.clone().unwrap(); - peer_store.insert(vec![agent_info.clone()]).await.unwrap(); - - let fetch = CoreFetch::new( - config.clone(), - agent_info.space.clone(), - peer_store.clone(), - mock_transport.clone(), - ); - - let requests_sent = mock_transport.requests_sent.lock().unwrap().clone(); - assert!(requests_sent.is_empty()); - - // Add 1 op. - fetch.add_ops(op_list, agent_id.clone()).await.unwrap(); - - // Let the fetch request be sent multiple times. As only 1 op was added to the queue, - // this proves that it is being re-added to the queue after sending a request for it. - tokio::time::timeout(Duration::from_millis(10), async { - loop { - tokio::task::yield_now().await; - if mock_transport.requests_sent.lock().unwrap().len() >= 3 { - break; - } - } - }) - .await - .unwrap(); - - // Clear set of ops to fetch to stop sending requests. - fetch.state.lock().unwrap().requests.clear(); - - let mut num_requests_sent = - mock_transport.requests_sent.lock().unwrap().len(); - - // Wait for tasks to settle all requests. - tokio::time::timeout(Duration::from_millis(20), async { - loop { - tokio::time::sleep(Duration::from_millis(1)).await; - let current_num_requests_sent = - mock_transport.requests_sent.lock().unwrap().len(); - if current_num_requests_sent == num_requests_sent { - break; - } else { - num_requests_sent = current_num_requests_sent; - } - } - }) - .await - .unwrap(); - - // Check that all requests have been made for the 1 op to the agent. - assert!(mock_transport - .requests_sent - .lock() - .unwrap() - .iter() - .all(|request| request == &(op_id.clone(), agent_url.clone()))); - - // Give time for more requests to be sent, which shouldn't happen now that the set of - // ops to fetch is cleared. - tokio::time::sleep(Duration::from_millis(10)).await; - - // No more requests should have been sent. - // Ideally it were possible to check that no more fetch request have been passed back into - // the internal channel, but that would require a custom wrapper around the channel. - let requests_sent = mock_transport.requests_sent.lock().unwrap().clone(); - assert_eq!(requests_sent.len(), num_requests_sent); -} - -#[tokio::test(flavor = "multi_thread")] -async fn happy_op_fetch_from_multiple_agents() { - let builder = Arc::new(default_builder().with_default_config().unwrap()); - let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); - let config = CoreFetchConfig { - parallel_request_count: 5, - ..Default::default() - }; - let mock_transport = MockTransport::new(false); - let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); - - let op_list_1 = create_op_list(10); - let agent_1 = random_agent_id(); - let op_list_2 = create_op_list(20); - let agent_2 = random_agent_id(); - let op_list_3 = create_op_list(30); - let agent_3 = random_agent_id(); - let total_ops = op_list_1.len() + op_list_2.len() + op_list_3.len(); - - let agent_info_1 = AgentBuilder { - agent: Some(agent_1.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), - space: Some(space_id.clone()), - ..Default::default() - } - .build(); - let agent_info_2 = AgentBuilder { - agent: Some(agent_2.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), - space: Some(space_id.clone()), - ..Default::default() + pub fn random_id() -> Id { + let mut rng = rand::thread_rng(); + let mut bytes = [0u8; 32]; + rng.fill(&mut bytes); + let bytes = Bytes::from(bytes.to_vec()); + Id(bytes) } - .build(); - let agent_info_3 = AgentBuilder { - agent: Some(agent_3.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:3").unwrap())), - space: Some(space_id.clone()), - ..Default::default() - } - .build(); - let agent_url_1 = agent_info_1.url.clone().unwrap(); - let agent_url_2 = agent_info_2.url.clone().unwrap(); - let agent_url_3 = agent_info_3.url.clone().unwrap(); - peer_store - .insert(vec![agent_info_1, agent_info_2, agent_info_3]) - .await - .unwrap(); - let fetch = CoreFetch::new( - config.clone(), - space_id.clone(), - peer_store.clone(), - mock_transport.clone(), - ); - - let mut expected_requests = Vec::new(); - op_list_1 - .clone() - .into_iter() - .for_each(|op_id| expected_requests.push((op_id, agent_url_1.clone()))); - op_list_2 - .clone() - .into_iter() - .for_each(|op_id| expected_requests.push((op_id, agent_url_2.clone()))); - op_list_3 - .clone() - .into_iter() - .for_each(|op_id| expected_requests.push((op_id, agent_url_3.clone()))); - let mut expected_ops = Vec::new(); - op_list_1 - .clone() - .into_iter() - .for_each(|op_id| expected_ops.push((op_id, agent_1.clone()))); - op_list_2 - .clone() - .into_iter() - .for_each(|op_id| expected_ops.push((op_id, agent_2.clone()))); - op_list_3 - .clone() - .into_iter() - .for_each(|op_id| expected_ops.push((op_id, agent_3.clone()))); - futures::future::join_all([ - fetch.add_ops(op_list_1.clone(), agent_1.clone()), - fetch.add_ops(op_list_2.clone(), agent_2.clone()), - fetch.add_ops(op_list_3.clone(), agent_3.clone()), - ]) - .await; - - // Check that at least one request was sent for each op. - tokio::time::timeout(Duration::from_millis(20), async { - loop { - tokio::task::yield_now().await; - let requests_sent = - mock_transport.requests_sent.lock().unwrap().clone(); - if requests_sent.len() >= total_ops - && expected_requests - .iter() - .all(|expected_op| requests_sent.contains(expected_op)) - { - break; - } - } - }) - .await - .unwrap(); - - // Check that op ids are still part of ops to fetch. - let lock = fetch.state.lock().unwrap(); - assert!(expected_ops.iter().all(|v| lock.requests.contains(v))); -} - -#[tokio::test(flavor = "multi_thread")] -async fn ops_are_cleared_when_agent_not_in_peer_store() { - let builder = Arc::new(default_builder().with_default_config().unwrap()); - let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); - let config = CoreFetchConfig::default(); - let mock_transport = MockTransport::new(false); - - let op_list = create_op_list(2); - let agent_id = random_agent_id(); - let agent_info = AgentBuilder { - agent: Some(agent_id.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), - ..Default::default() + pub fn random_op_id() -> OpId { + OpId(random_id()) } - .build(); - - let fetch = CoreFetch::new( - config.clone(), - agent_info.space.clone(), - peer_store.clone(), - mock_transport.clone(), - ); - - fetch.add_ops(op_list, agent_id).await.unwrap(); - // Wait for agent to be looked up in peer store. - tokio::time::sleep(Duration::from_millis(10)).await; - - // Check that all op ids for agent have been removed from ops set. - assert!(fetch.state.lock().unwrap().requests.is_empty()); -} - -#[tokio::test(flavor = "multi_thread")] -async fn unresponsive_agents_are_put_on_back_off_list() { - let builder = Arc::new(default_builder().with_default_config().unwrap()); - let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); - let config = CoreFetchConfig::default(); - let mock_transport = MockTransport::new(true); - let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); - - let op_list_1 = create_op_list(5); - let agent_1 = random_agent_id(); - let op_list_2 = create_op_list(5); - let agent_2 = random_agent_id(); - - let agent_info_1 = AgentBuilder { - agent: Some(agent_1.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), - space: Some(space_id.clone()), - ..Default::default() - } - .build(); - let agent_info_2 = AgentBuilder { - agent: Some(agent_2.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), - space: Some(space_id.clone()), - ..Default::default() + pub fn random_agent_id() -> AgentId { + AgentId(random_id()) } - .build(); - let agent_url_1 = agent_info_1.url.clone().unwrap(); - let agent_url_2 = agent_info_2.url.clone().unwrap(); - peer_store - .insert(vec![agent_info_1, agent_info_2]) - .await - .unwrap(); - - let fetch = CoreFetch::new( - config.clone(), - space_id.clone(), - peer_store.clone(), - mock_transport.clone(), - ); - - // Add all ops to the queue. - futures::future::join_all([ - fetch.add_ops(op_list_1.clone(), agent_1.clone()), - fetch.add_ops(op_list_2.clone(), agent_2.clone()), - ]) - .await; - // Wait for one request for each agent. - let expected_agent_url = [agent_url_1, agent_url_2]; - let expected_agents = [agent_1, agent_2]; - tokio::time::timeout(Duration::from_millis(100), async { - loop { - tokio::time::sleep(Duration::from_millis(1)).await; - let requests_sent = - mock_transport.requests_sent.lock().unwrap().clone(); - let request_destinations = requests_sent - .iter() - .map(|(_, agent_id)| agent_id) - .collect::>(); - if expected_agent_url - .iter() - .all(|agent| request_destinations.contains(&agent)) - { - // Check all agents are on back off list. - let back_off_list = - &mut fetch.state.lock().unwrap().back_off_list; - if expected_agents - .iter() - .all(|agent| back_off_list.is_agent_on_back_off(agent)) - { - break; - } - } + pub fn create_op_list(num_ops: u16) -> Vec { + let mut ops = Vec::new(); + for _ in 0..num_ops { + let op = random_op_id(); + ops.push(op.clone()); } - }) - .await - .unwrap(); -} - -fn random_id() -> Id { - let mut rng = rand::thread_rng(); - let mut bytes = [0u8; 32]; - rng.fill(&mut bytes); - let bytes = Bytes::from(bytes.to_vec()); - Id(bytes) -} - -pub(super) fn random_op_id() -> OpId { - OpId(random_id()) -} - -pub(super) fn random_agent_id() -> AgentId { - AgentId(random_id()) -} - -pub(super) fn create_op_list(num_ops: u16) -> Vec { - let mut ops = Vec::new(); - for _ in 0..num_ops { - let op = random_op_id(); - ops.push(op.clone()); + ops } - ops } diff --git a/crates/core/src/factories/core_fetch/test/request_queue.rs b/crates/core/src/factories/core_fetch/test/request_queue.rs new file mode 100644 index 00000000..b4a60902 --- /dev/null +++ b/crates/core/src/factories/core_fetch/test/request_queue.rs @@ -0,0 +1,596 @@ +use crate::{ + default_builder, + factories::{ + core_fetch::{CoreFetch, CoreFetchConfig}, + test_utils::AgentBuilder, + }, +}; +use kitsune2_api::{ + fetch::{deserialize_op_ids, Fetch}, + transport::Transport, + K2Error, OpId, SpaceId, Url, +}; +use kitsune2_memory::Kitsune2MemoryOpStore; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use super::utils::{create_op_list, random_agent_id, random_op_id}; + +#[derive(Debug)] +pub struct MockTransport { + pub requests_sent: Arc>>, + pub fail: bool, +} + +impl MockTransport { + pub fn new(fail: bool) -> Arc { + Arc::new(Self { + requests_sent: Arc::new(Mutex::new(Vec::new())), + fail, + }) + } +} + +impl Transport for MockTransport { + fn send_module( + &self, + peer: kitsune2_api::Url, + _space: kitsune2_api::SpaceId, + _module: String, + data: bytes::Bytes, + ) -> kitsune2_api::BoxFut<'_, kitsune2_api::K2Result<()>> { + Box::pin(async move { + let op_ids = deserialize_op_ids(data).unwrap(); + let mut lock = self.requests_sent.lock().unwrap(); + op_ids.into_iter().for_each(|op_id| { + lock.push((op_id, peer.clone())); + }); + + if self.fail { + Err(K2Error::other("connection timed out")) + } else { + Ok(()) + } + }) + } + + fn disconnect( + &self, + _peer: Url, + _reason: Option, + ) -> kitsune2_api::BoxFut<'_, ()> { + unimplemented!() + } + + fn register_module_handler( + &self, + _space: SpaceId, + _module: String, + _handler: kitsune2_api::transport::DynTxModuleHandler, + ) { + unimplemented!() + } + + fn register_space_handler( + &self, + _space: SpaceId, + _handler: kitsune2_api::transport::DynTxSpaceHandler, + ) { + unimplemented!() + } + + fn send_space_notify( + &self, + _peer: Url, + _space: SpaceId, + _data: bytes::Bytes, + ) -> kitsune2_api::BoxFut<'_, kitsune2_api::K2Result<()>> { + unimplemented!() + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn fetch_queue() { + let builder = Arc::new(default_builder().with_default_config().unwrap()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let op_store = Arc::new(Kitsune2MemoryOpStore::default()); + let mock_transport = MockTransport::new(false); + let config = CoreFetchConfig::default(); + + let op_id = random_op_id(); + let op_list = vec![op_id.clone()]; + let agent_id = random_agent_id(); + let agent_info = AgentBuilder { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + let agent_url = agent_info.url.clone().unwrap(); + peer_store.insert(vec![agent_info.clone()]).await.unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + agent_info.space.clone(), + peer_store.clone(), + op_store, + mock_transport.clone(), + ); + + let requests_sent = mock_transport.requests_sent.lock().unwrap().clone(); + assert!(requests_sent.is_empty()); + + // Add 1 op. + fetch.add_ops(op_list, agent_id.clone()).await.unwrap(); + + // Let the fetch request be sent multiple times. As only 1 op was added to the queue, + // this proves that it is being re-added to the queue after sending a request for it. + tokio::time::timeout(Duration::from_millis(10), async { + loop { + tokio::task::yield_now().await; + if mock_transport.requests_sent.lock().unwrap().len() >= 3 { + break; + } + } + }) + .await + .unwrap(); + + // Clear set of ops to fetch to stop sending requests. + fetch.state.lock().unwrap().requests.clear(); + + let mut num_requests_sent = + mock_transport.requests_sent.lock().unwrap().len(); + + // Wait for tasks to settle all requests. + tokio::time::timeout(Duration::from_millis(20), async { + loop { + tokio::time::sleep(Duration::from_millis(1)).await; + let current_num_requests_sent = + mock_transport.requests_sent.lock().unwrap().len(); + if current_num_requests_sent == num_requests_sent { + break; + } else { + num_requests_sent = current_num_requests_sent; + } + } + }) + .await + .unwrap(); + + // Check that all requests have been made for the 1 op to the agent. + assert!(mock_transport + .requests_sent + .lock() + .unwrap() + .iter() + .all(|request| request == &(op_id.clone(), agent_url.clone()))); + + // Give time for more requests to be sent, which shouldn't happen now that the set of + // ops to fetch is cleared. + tokio::time::sleep(Duration::from_millis(10)).await; + + // No more requests should have been sent. + // Ideally it were possible to check that no more fetch request have been passed back into + // the internal channel, but that would require a custom wrapper around the channel. + let requests_sent = mock_transport.requests_sent.lock().unwrap().clone(); + assert_eq!(requests_sent.len(), num_requests_sent); +} + +#[tokio::test(flavor = "multi_thread")] +async fn happy_op_fetch_from_multiple_agents() { + let builder = Arc::new(default_builder().with_default_config().unwrap()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let mock_transport = MockTransport::new(false); + let op_store = Arc::new(Kitsune2MemoryOpStore::default()); + let config = CoreFetchConfig { + parallel_request_count: 5, + ..Default::default() + }; + let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); + + let op_list_1 = create_op_list(10); + let agent_1 = random_agent_id(); + let op_list_2 = create_op_list(20); + let agent_2 = random_agent_id(); + let op_list_3 = create_op_list(30); + let agent_3 = random_agent_id(); + let total_ops = op_list_1.len() + op_list_2.len() + op_list_3.len(); + + let agent_info_1 = AgentBuilder { + agent: Some(agent_1.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_info_2 = AgentBuilder { + agent: Some(agent_2.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_info_3 = AgentBuilder { + agent: Some(agent_3.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:3").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_url_1 = agent_info_1.url.clone().unwrap(); + let agent_url_2 = agent_info_2.url.clone().unwrap(); + let agent_url_3 = agent_info_3.url.clone().unwrap(); + peer_store + .insert(vec![agent_info_1, agent_info_2, agent_info_3]) + .await + .unwrap(); + let fetch = CoreFetch::new( + config.clone(), + space_id.clone(), + peer_store.clone(), + op_store, + mock_transport.clone(), + ); + + let mut expected_requests = Vec::new(); + op_list_1 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_1.clone()))); + op_list_2 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_2.clone()))); + op_list_3 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_3.clone()))); + let mut expected_ops = Vec::new(); + op_list_1 + .clone() + .into_iter() + .for_each(|op_id| expected_ops.push((op_id, agent_1.clone()))); + op_list_2 + .clone() + .into_iter() + .for_each(|op_id| expected_ops.push((op_id, agent_2.clone()))); + op_list_3 + .clone() + .into_iter() + .for_each(|op_id| expected_ops.push((op_id, agent_3.clone()))); + + futures::future::join_all([ + fetch.add_ops(op_list_1.clone(), agent_1.clone()), + fetch.add_ops(op_list_2.clone(), agent_2.clone()), + fetch.add_ops(op_list_3.clone(), agent_3.clone()), + ]) + .await; + + // Check that at least one request was sent for each op. + tokio::time::timeout(Duration::from_millis(20), async { + loop { + tokio::task::yield_now().await; + let requests_sent = + mock_transport.requests_sent.lock().unwrap().clone(); + if requests_sent.len() >= total_ops + && expected_requests + .iter() + .all(|expected_op| requests_sent.contains(expected_op)) + { + break; + } + } + }) + .await + .unwrap(); + + // Check that op ids are still part of ops to fetch. + let lock = fetch.state.lock().unwrap(); + assert!(expected_ops.iter().all(|v| lock.requests.contains(v))); +} + +#[tokio::test(flavor = "multi_thread")] +async fn ops_are_cleared_when_agent_not_in_peer_store() { + let builder = Arc::new(default_builder().with_default_config().unwrap()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let op_store = Arc::new(Kitsune2MemoryOpStore::default()); + let mock_transport = MockTransport::new(false); + let config = CoreFetchConfig::default(); + + let op_list = create_op_list(2); + let agent_id = random_agent_id(); + let agent_info = AgentBuilder { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + + let fetch = CoreFetch::new( + config.clone(), + agent_info.space.clone(), + peer_store.clone(), + op_store, + mock_transport.clone(), + ); + + fetch.add_ops(op_list, agent_id).await.unwrap(); + + // Wait for agent to be looked up in peer store. + tokio::time::sleep(Duration::from_millis(10)).await; + + // Check that all op ids for agent have been removed from ops set. + assert!(fetch.state.lock().unwrap().requests.is_empty()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn unresponsive_agents_are_put_on_back_off_list() { + let builder = Arc::new(default_builder().with_default_config().unwrap()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let op_store = Arc::new(Kitsune2MemoryOpStore::default()); + let mock_transport = MockTransport::new(true); + let config = CoreFetchConfig::default(); + let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); + + let op_list_1 = create_op_list(5); + let agent_1 = random_agent_id(); + let op_list_2 = create_op_list(5); + let agent_2 = random_agent_id(); + + let agent_info_1 = AgentBuilder { + agent: Some(agent_1.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_info_2 = AgentBuilder { + agent: Some(agent_2.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_url_1 = agent_info_1.url.clone().unwrap(); + let agent_url_2 = agent_info_2.url.clone().unwrap(); + peer_store + .insert(vec![agent_info_1, agent_info_2]) + .await + .unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + space_id.clone(), + peer_store, + op_store, + mock_transport.clone(), + ); + + // Add all ops to the queue. + futures::future::join_all([ + fetch.add_ops(op_list_1.clone(), agent_1.clone()), + fetch.add_ops(op_list_2.clone(), agent_2.clone()), + ]) + .await; + + // Wait for one request for each agent. + let expected_agent_url = [agent_url_1, agent_url_2]; + let expected_agents = [agent_1, agent_2]; + tokio::time::timeout(Duration::from_millis(100), async { + loop { + tokio::time::sleep(Duration::from_millis(1)).await; + let requests_sent = + mock_transport.requests_sent.lock().unwrap().clone(); + let request_destinations = requests_sent + .iter() + .map(|(_, agent_id)| agent_id) + .collect::>(); + if expected_agent_url + .iter() + .all(|agent| request_destinations.contains(&agent)) + { + // Check all agents are on back off list. + let back_off_list = + &mut fetch.state.lock().unwrap().back_off_list; + if expected_agents + .iter() + .all(|agent| back_off_list.is_agent_on_back_off(agent)) + { + break; + } + } + } + }) + .await + .unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn agent_on_back_off_is_removed_from_list_after_successful_send() { + let builder = Arc::new(default_builder().with_default_config().unwrap()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let op_store = Arc::new(Kitsune2MemoryOpStore::default()); + let mock_transport = MockTransport::new(false); + let config = CoreFetchConfig { + first_back_off_interval: Duration::from_millis(10), + ..Default::default() + }; + + let op_list = create_op_list(1); + let agent_id = random_agent_id(); + let agent_info = AgentBuilder { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + peer_store.insert(vec![agent_info.clone()]).await.unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + agent_info.space.clone(), + peer_store, + op_store, + mock_transport.clone(), + ); + + let first_back_off_interval = { + let mut lock = fetch.state.lock().unwrap(); + lock.back_off_list.back_off_agent(&agent_id); + + assert!(lock.back_off_list.is_agent_on_back_off(&agent_id)); + + lock.back_off_list.first_back_off_interval + }; + + tokio::time::sleep(first_back_off_interval).await; + + fetch.add_ops(op_list, agent_id.clone()).await.unwrap(); + + tokio::time::timeout(Duration::from_millis(10), async { + loop { + tokio::time::sleep(Duration::from_millis(1)).await; + if !mock_transport.requests_sent.lock().unwrap().is_empty() { + break; + } + } + }) + .await + .unwrap(); + + assert!(fetch.state.lock().unwrap().back_off_list.state.is_empty()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn requests_are_dropped_when_max_back_off_expired() { + let builder = Arc::new(default_builder().with_default_config().unwrap()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let op_store = Arc::new(Kitsune2MemoryOpStore::default()); + let mock_transport = MockTransport::new(true); + let config = CoreFetchConfig { + first_back_off_interval: Duration::from_millis(10), + last_back_off_interval: Duration::from_millis(10), + ..Default::default() + }; + let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); + + let op_list_1 = create_op_list(2); + let agent_id_1 = random_agent_id(); + let agent_info_1 = AgentBuilder { + agent: Some(agent_id_1.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + let agent_url_1 = agent_info_1.url.clone().unwrap(); + peer_store.insert(vec![agent_info_1.clone()]).await.unwrap(); + + // Create a second agent to later check that their ops have not been removed. + let op_list_2 = create_op_list(2); + let agent_id_2 = random_agent_id(); + let agent_info_2 = AgentBuilder { + agent: Some(agent_id_2.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), + ..Default::default() + } + .build(); + peer_store.insert(vec![agent_info_2.clone()]).await.unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + space_id.clone(), + peer_store, + op_store, + mock_transport.clone(), + ); + + fetch + .add_ops(op_list_1.clone(), agent_id_1.clone()) + .await + .unwrap(); + + // Wait for one request to fail, so agent is put on back off list. + tokio::time::timeout(Duration::from_millis(10), async { + loop { + tokio::time::sleep(Duration::from_millis(1)).await; + if !mock_transport + .requests_sent + .lock() + .unwrap() + .clone() + .is_empty() + { + break; + } + } + }) + .await + .unwrap(); + + let current_number_of_requests_to_agent_1 = mock_transport + .requests_sent + .lock() + .unwrap() + .iter() + .filter(|(_, a)| *a != agent_url_1) + .count(); + + // Back off agent the maximum possible number of times. + let last_back_off_interval = { + let mut lock = fetch.state.lock().unwrap(); + assert!(op_list_1.iter().all(|op_id| lock + .requests + .contains(&(op_id.clone(), agent_id_1.clone())))); + for _ in 0..config.num_back_off_intervals { + lock.back_off_list.back_off_agent(&agent_id_1); + } + + lock.back_off_list.last_back_off_interval + }; + + // Wait for back off interval to expire. Afterwards the request should fail again and all + // of the agent's requests should be removed from the set. + tokio::time::sleep(last_back_off_interval).await; + + assert!(fetch + .state + .lock() + .unwrap() + .back_off_list + .has_last_back_off_expired(&agent_id_1)); + + // Add control agent's ops to set. + fetch.add_ops(op_list_2, agent_id_2.clone()).await.unwrap(); + + // Wait for another request attempt to agent 1, which should remove all of their requests + // from the set. + tokio::time::timeout(Duration::from_millis(10), async { + loop { + if mock_transport + .requests_sent + .lock() + .unwrap() + .iter() + .filter(|(_, a)| *a != agent_url_1) + .count() + > current_number_of_requests_to_agent_1 + { + break; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + }) + .await + .unwrap(); + + assert!(fetch + .state + .lock() + .unwrap() + .requests + .iter() + .all(|(_, agent_id)| *agent_id != agent_id_1),); +} diff --git a/crates/core/src/factories/core_fetch/test/response_queue.rs b/crates/core/src/factories/core_fetch/test/response_queue.rs new file mode 100644 index 00000000..6665dec4 --- /dev/null +++ b/crates/core/src/factories/core_fetch/test/response_queue.rs @@ -0,0 +1,228 @@ +use super::utils::{random_agent_id, random_id, random_op_id}; +use crate::{ + default_builder, + factories::{ + core_fetch::{CoreFetch, CoreFetchConfig}, + test_utils::AgentBuilder, + }, +}; +use kitsune2_api::{ + fetch::{deserialize_ops, Fetch}, + transport::Transport, + MetaOp, OpStore, SpaceId, Timestamp, Url, +}; +use kitsune2_memory::{Kitsune2MemoryOp, Kitsune2MemoryOpStore}; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +#[derive(Debug)] +pub struct MockTransport { + responses_sent: Arc, Url)>>>, +} + +impl MockTransport { + fn new() -> Arc { + Arc::new(Self { + responses_sent: Arc::new(Mutex::new(Vec::new())), + }) + } +} + +impl Transport for MockTransport { + fn send_module( + &self, + peer: kitsune2_api::Url, + _space: kitsune2_api::SpaceId, + _module: String, + data: bytes::Bytes, + ) -> kitsune2_api::BoxFut<'_, kitsune2_api::K2Result<()>> { + Box::pin(async move { + let ops = deserialize_ops(data).unwrap(); + self.responses_sent.lock().unwrap().push((ops, peer)); + + Ok(()) + }) + } + + fn disconnect( + &self, + _peer: Url, + _reason: Option, + ) -> kitsune2_api::BoxFut<'_, ()> { + unimplemented!() + } + + fn register_module_handler( + &self, + _space: SpaceId, + _module: String, + _handler: kitsune2_api::transport::DynTxModuleHandler, + ) { + unimplemented!() + } + + fn register_space_handler( + &self, + _space: SpaceId, + _handler: kitsune2_api::transport::DynTxSpaceHandler, + ) { + unimplemented!() + } + + fn send_space_notify( + &self, + _peer: Url, + _space: SpaceId, + _data: bytes::Bytes, + ) -> kitsune2_api::BoxFut<'_, kitsune2_api::K2Result<()>> { + unimplemented!() + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn respond_to_multiple_requests() { + let builder = Arc::new(default_builder().with_default_config().unwrap()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let op_store = Arc::new(Kitsune2MemoryOpStore::default()); + let mock_transport = MockTransport::new(); + let config = CoreFetchConfig::default(); + let space_id = SpaceId::from(random_id()); + + let op_id_1 = random_op_id(); + let op_id_2 = random_op_id(); + let agent_id_1 = random_agent_id(); + let agent_info_1 = AgentBuilder { + agent: Some(agent_id_1.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + let agent_url_1 = agent_info_1.url.clone().unwrap(); + peer_store.insert(vec![agent_info_1.clone()]).await.unwrap(); + + let op_id_3 = random_op_id(); + let op_id_4 = random_op_id(); + let agent_id_2 = random_agent_id(); + let agent_info_2 = AgentBuilder { + agent: Some(agent_id_2.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), + ..Default::default() + } + .build(); + let agent_url_2 = agent_info_2.url.clone().unwrap(); + peer_store.insert(vec![agent_info_2.clone()]).await.unwrap(); + + // Insert ops to be read and sent into op store. + let op_1 = MetaOp { + op_id: op_id_1.clone(), + op_data: serde_json::to_vec(&Kitsune2MemoryOp::new( + op_id_1.clone(), + Timestamp::now(), + vec![1; 128], + )) + .unwrap(), + }; + let op_2 = MetaOp { + op_id: op_id_2.clone(), + op_data: serde_json::to_vec(&Kitsune2MemoryOp::new( + op_id_2.clone(), + Timestamp::now(), + vec![2; 128], + )) + .unwrap(), + }; + let op_3 = MetaOp { + op_id: op_id_3.clone(), + op_data: serde_json::to_vec(&Kitsune2MemoryOp::new( + op_id_3.clone(), + Timestamp::now(), + vec![3; 128], + )) + .unwrap(), + }; + // Insert op 1, 2, 3 into op store. Op 4 will not be returned in the response. + let ops_to_store = vec![op_1.clone(), op_2.clone(), op_3.clone()]; + op_store.process_incoming_ops(ops_to_store).await.unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + space_id.clone(), + peer_store.clone(), + op_store, + mock_transport.clone(), + ); + + // Receive 2 op requests. + let requested_ops_1 = vec![op_id_1.clone(), op_id_2.clone()]; + let requested_ops_2 = vec![op_id_3.clone(), op_id_4.clone()]; + futures::future::join_all([ + fetch.respond_with_ops(requested_ops_1, agent_id_1.clone()), + fetch.respond_with_ops(requested_ops_2, agent_id_2.clone()), + ]) + .await; + + tokio::time::timeout(Duration::from_millis(10), async { + loop { + tokio::time::sleep(Duration::from_millis(1)).await; + if mock_transport.responses_sent.lock().unwrap().len() == 2 { + break; + } + } + }) + .await + .unwrap(); + + assert!(mock_transport + .responses_sent + .lock() + .unwrap() + .contains(&(vec![op_1, op_2], agent_url_1))); + // Only op 3 is in op store. + assert!(mock_transport + .responses_sent + .lock() + .unwrap() + .contains(&(vec![op_3], agent_url_2))); +} + +#[tokio::test(flavor = "multi_thread")] +async fn no_response_sent_when_no_ops_found() { + let builder = Arc::new(default_builder().with_default_config().unwrap()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let op_store = Arc::new(Kitsune2MemoryOpStore::default()); + let mock_transport = MockTransport::new(); + let config = CoreFetchConfig::default(); + let space_id = SpaceId::from(random_id()); + + let op_id_1 = random_op_id(); + let op_id_2 = random_op_id(); + let agent_id = random_agent_id(); + let agent_info = AgentBuilder { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + peer_store.insert(vec![agent_info.clone()]).await.unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + space_id.clone(), + peer_store.clone(), + op_store, + mock_transport.clone(), + ); + + // Receive op request. + let requested_ops = vec![op_id_1.clone(), op_id_2.clone()]; + fetch + .respond_with_ops(requested_ops, agent_id.clone()) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(20)).await; + + assert!(mock_transport.responses_sent.lock().unwrap().is_empty()); +}