diff --git a/crates/core/src/factories/core_fetch.rs b/crates/core/src/factories/core_fetch.rs index aebbd5da..b0ae2fef 100644 --- a/crates/core/src/factories/core_fetch.rs +++ b/crates/core/src/factories/core_fetch.rs @@ -238,6 +238,9 @@ impl CoreFetch { lock.cool_down_list.is_agent_cooling_down(&agent_id) }; + tracing::trace!( + "is agent {agent_id} cooling down {is_agent_cooling_down}" + ); // Send request if agent is not on cool-down list. if !is_agent_cooling_down { @@ -248,7 +251,16 @@ impl CoreFetch { .await { Some(url) => url, - None => continue, + None => { + let mut lock = state.lock().unwrap(); + lock.ops = lock + .ops + .clone() + .into_iter() + .filter(|(_, a)| *a != agent_id) + .collect(); + continue; + } }; let data = serialize_op_ids(vec![op_id.clone()]); @@ -268,7 +280,7 @@ impl CoreFetch { if let Err(err) = fetch_request_tx .try_send((op_id.clone(), agent_id.clone())) { - tracing::warn!("could not re-insert fetch request for op {op_id} to agent {agent_id} in queue: {err}"); + tracing::warn!("could not re-insert fetch request for op {op_id} to agent {agent_id} into queue: {err}"); // Remove op id/agent id from set to prevent build-up of state. state .lock() @@ -284,6 +296,15 @@ impl CoreFetch { .unwrap() .cool_down_list .add_agent(agent_id.clone()); + // Agent is unresponsive. + // Remove associated op ids from set to prevent build-up of state. + let mut lock = state.lock().unwrap(); + lock.ops = lock + .ops + .clone() + .into_iter() + .filter(|(_, a)| *a != agent_id) + .collect(); } } } diff --git a/crates/core/src/factories/core_fetch/test.rs b/crates/core/src/factories/core_fetch/test.rs index 784ecaa2..c413f99d 100644 --- a/crates/core/src/factories/core_fetch/test.rs +++ b/crates/core/src/factories/core_fetch/test.rs @@ -101,7 +101,7 @@ async fn fetch_queue() { let agent_id = random_agent_id(); let agent_info = AgentBuild { agent: Some(agent_id.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:8888").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), ..Default::default() } .build(); @@ -187,7 +187,7 @@ async fn happy_multi_op_fetch_from_single_agent() { let agent_id = random_agent_id(); let agent_info = AgentBuild { agent: Some(agent_id.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:8888").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), ..Default::default() } .build(); @@ -228,6 +228,10 @@ async fn happy_multi_op_fetch_from_single_agent() { }) .await .unwrap(); + + // CHeck that op ids are still part of ops to fetch. + let lock = fetch.state.lock().unwrap(); + assert!(expected_ops.iter().all(|v| lock.ops.contains(v))); } #[tokio::test(flavor = "multi_thread")] @@ -251,28 +255,28 @@ async fn happy_multi_op_fetch_from_multiple_agents() { let agent_info_1 = AgentBuild { agent: Some(agent_1.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:8888").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), space: Some(space_id.clone()), ..Default::default() } .build(); let agent_info_2 = AgentBuild { agent: Some(agent_2.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:8888").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), space: Some(space_id.clone()), ..Default::default() } .build(); let agent_info_3 = AgentBuild { agent: Some(agent_3.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:8888").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:3").unwrap())), space: Some(space_id.clone()), ..Default::default() } .build(); let agent_url_1 = agent_info_1.url.clone().unwrap(); - let agent_url_2 = agent_info_1.url.clone().unwrap(); - let agent_url_3 = agent_info_1.url.clone().unwrap(); + let agent_url_2 = agent_info_2.url.clone().unwrap(); + let agent_url_3 = agent_info_3.url.clone().unwrap(); peer_store .insert(vec![agent_info_1, agent_info_2, agent_info_3]) .await @@ -284,41 +288,48 @@ async fn happy_multi_op_fetch_from_multiple_agents() { mock_transport.clone(), ); + let mut expected_requests = Vec::new(); + op_list_1 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_1.clone()))); + op_list_2 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_2.clone()))); + op_list_3 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_3.clone()))); let mut expected_ops = Vec::new(); op_list_1 .clone() .into_iter() - .for_each(|op_id| expected_ops.push((op_id, agent_url_1.clone()))); + .for_each(|op_id| expected_ops.push((op_id, agent_1.clone()))); op_list_2 .clone() .into_iter() - .for_each(|op_id| expected_ops.push((op_id, agent_url_2.clone()))); + .for_each(|op_id| expected_ops.push((op_id, agent_2.clone()))); op_list_3 .clone() .into_iter() - .for_each(|op_id| expected_ops.push((op_id, agent_url_3.clone()))); + .for_each(|op_id| expected_ops.push((op_id, agent_3.clone()))); - fetch - .add_ops(op_list_1.clone(), agent_1.clone()) - .await - .unwrap(); - fetch - .add_ops(op_list_2.clone(), agent_2.clone()) - .await - .unwrap(); - fetch - .add_ops(op_list_3.clone(), agent_3.clone()) - .await - .unwrap(); + futures::future::join_all([ + fetch.add_ops(op_list_1.clone(), agent_1.clone()), + fetch.add_ops(op_list_2.clone(), agent_2.clone()), + fetch.add_ops(op_list_3.clone(), agent_3.clone()), + ]) + .await; // Check that at least one request was sent for each op. - tokio::time::timeout(Duration::from_millis(100), async { + tokio::time::timeout(Duration::from_millis(10), async { loop { tokio::task::yield_now().await; let requests_sent = mock_transport.requests_sent.lock().unwrap().clone(); if requests_sent.len() >= total_ops - && expected_ops + && expected_requests .iter() .all(|expected_op| requests_sent.contains(expected_op)) { @@ -328,6 +339,42 @@ async fn happy_multi_op_fetch_from_multiple_agents() { }) .await .unwrap(); + + // CHeck that op ids are still part of ops to fetch. + let lock = fetch.state.lock().unwrap(); + assert!(expected_ops.iter().all(|v| lock.ops.contains(v))); +} + +#[tokio::test(flavor = "multi_thread")] +async fn ops_are_cleared_when_agent_not_in_peer_store() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let config = CoreFetchConfig::default(); + let mock_transport = MockTransport::new(false); + + let op_list = create_op_list(2); + let agent_id = random_agent_id(); + let agent_info = AgentBuild { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + + let fetch = CoreFetch::new( + config.clone(), + agent_info.space.clone(), + peer_store.clone(), + mock_transport.clone(), + ); + + fetch.add_ops(op_list, agent_id).await.unwrap(); + + // Wait for agent to be looked up in peer store. + tokio::time::sleep(Duration::from_millis(10)).await; + + // Check that all op ids for agent have been removed from ops set. + assert!(fetch.state.lock().unwrap().ops.is_empty()); } #[tokio::test(flavor = "multi_thread")] @@ -341,7 +388,7 @@ async fn unresponsive_agents_are_put_on_cool_down_list() { let agent_id = random_agent_id(); let agent_info = AgentBuild { agent: Some(agent_id.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:8888").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), ..Default::default() } .build(); @@ -373,85 +420,46 @@ async fn unresponsive_agents_are_put_on_cool_down_list() { }) .await .unwrap(); -} - -#[tokio::test(flavor = "multi_thread")] -async fn agent_cooling_down_is_removed_from_list() { - let builder = Arc::new(default_builder()); - let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); - let config = CoreFetchConfig { - cool_down_interval_ms: 10, - ..Default::default() - }; - let mock_transport = MockTransport::new(false); - let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); - - let fetch = CoreFetch::new( - config.clone(), - space_id, - peer_store, - mock_transport.clone(), - ); - let agent_id = random_agent_id(); - - fetch - .state - .lock() - .unwrap() - .cool_down_list - .add_agent(agent_id.clone()); - assert!(fetch - .state - .lock() - .unwrap() - .cool_down_list - .is_agent_cooling_down(&agent_id)); - - // Wait for the cool-down interval + 1 ms to avoid flakiness. - tokio::time::sleep(Duration::from_millis(config.cool_down_interval_ms + 1)) - .await; + // Give time to remove op id from set. + tokio::time::sleep(Duration::from_millis(1)).await; - assert!(!fetch - .state - .lock() - .unwrap() - .cool_down_list - .is_agent_cooling_down(&agent_id)); + // Op should have been removed from ops to fetch. + assert!(fetch.state.lock().unwrap().ops.is_empty()); } #[tokio::test(flavor = "multi_thread")] -async fn multi_op_fetch_from_multiple_unresponsive_agents() { +async fn add_ops_for_multiple_unresponsive_agents() { let builder = Arc::new(default_builder()); let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); let config = CoreFetchConfig::default(); let mock_transport = MockTransport::new(true); let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); - let op_list_1 = create_op_list(10); + let op_list_1 = create_op_list(2); let agent_1 = random_agent_id(); - let op_list_2 = create_op_list(20); + let op_list_2 = create_op_list(1); let agent_2 = random_agent_id(); - let op_list_3 = create_op_list(30); + let op_list_3 = create_op_list(1); let agent_3 = random_agent_id(); let agent_info_1 = AgentBuild { agent: Some(agent_1.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.1:0").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), space: Some(space_id.clone()), ..Default::default() } .build(); let agent_info_2 = AgentBuild { agent: Some(agent_2.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.2:0").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), space: Some(space_id.clone()), ..Default::default() } .build(); let agent_info_3 = AgentBuild { agent: Some(agent_3.clone()), - url: Some(Some(Url::from_str("wss://127.0.0.3:0").unwrap())), + url: Some(Some(Url::from_str("wss://127.0.0.1:3").unwrap())), space: Some(space_id.clone()), ..Default::default() } @@ -472,18 +480,12 @@ async fn multi_op_fetch_from_multiple_unresponsive_agents() { ); // Add all ops to the queue. - fetch - .add_ops(op_list_1.clone(), agent_1.clone()) - .await - .unwrap(); - fetch - .add_ops(op_list_2.clone(), agent_2.clone()) - .await - .unwrap(); - fetch - .add_ops(op_list_3.clone(), agent_3.clone()) - .await - .unwrap(); + futures::future::join_all([ + fetch.add_ops(op_list_1.clone(), agent_1.clone()), + fetch.add_ops(op_list_2.clone(), agent_2.clone()), + fetch.add_ops(op_list_3.clone(), agent_3.clone()), + ]) + .await; // Wait for one request for each agent. let expected_agent_url = [agent_url_1, agent_url_2, agent_url_3]; @@ -501,7 +503,7 @@ async fn multi_op_fetch_from_multiple_unresponsive_agents() { .iter() .all(|agent| request_destinations.contains(&agent)) { - // Check all agents are on cool_down_list. + // Check all agents are on cool-down_list. let cool_down_list = &mut fetch.state.lock().unwrap().cool_down_list; if expected_agents @@ -515,6 +517,61 @@ async fn multi_op_fetch_from_multiple_unresponsive_agents() { }) .await .unwrap(); + + // Give time to remove op id from set. + tokio::time::sleep(Duration::from_millis(1)).await; + + // Op should have been removed from ops to fetch. + assert!( + fetch.state.lock().unwrap().ops.is_empty(), + "ops are {:?}", + fetch.state.lock().unwrap().ops.len() + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn agent_cooling_down_is_removed_from_list() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let config = CoreFetchConfig { + cool_down_interval_ms: 10, + ..Default::default() + }; + let mock_transport = MockTransport::new(false); + let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); + + let fetch = CoreFetch::new( + config.clone(), + space_id, + peer_store, + mock_transport.clone(), + ); + let agent_id = random_agent_id(); + + fetch + .state + .lock() + .unwrap() + .cool_down_list + .add_agent(agent_id.clone()); + + assert!(fetch + .state + .lock() + .unwrap() + .cool_down_list + .is_agent_cooling_down(&agent_id)); + + // Wait for the cool-down interval + 1 ms to avoid flakiness. + tokio::time::sleep(Duration::from_millis(config.cool_down_interval_ms + 1)) + .await; + + assert!(!fetch + .state + .lock() + .unwrap() + .cool_down_list + .is_agent_cooling_down(&agent_id)); } fn random_id() -> Id {