Skip to content

Commit

Permalink
test: test that ops are removed from set when agent is not in peer store
Browse files Browse the repository at this point in the history
  • Loading branch information
jost-s committed Dec 14, 2024
1 parent 3c3e60c commit 43f2111
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 90 deletions.
25 changes: 23 additions & 2 deletions crates/core/src/factories/core_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ impl CoreFetch {

lock.cool_down_list.is_agent_cooling_down(&agent_id)
};
tracing::trace!(
"is agent {agent_id} cooling down {is_agent_cooling_down}"
);

// Send request if agent is not on cool-down list.
if !is_agent_cooling_down {
Expand All @@ -248,7 +251,16 @@ impl CoreFetch {
.await
{
Some(url) => url,
None => continue,
None => {
let mut lock = state.lock().unwrap();
lock.ops = lock
.ops
.clone()
.into_iter()
.filter(|(_, a)| *a != agent_id)
.collect();
continue;
}
};

let data = serialize_op_ids(vec![op_id.clone()]);
Expand All @@ -268,7 +280,7 @@ impl CoreFetch {
if let Err(err) = fetch_request_tx
.try_send((op_id.clone(), agent_id.clone()))
{
tracing::warn!("could not re-insert fetch request for op {op_id} to agent {agent_id} in queue: {err}");
tracing::warn!("could not re-insert fetch request for op {op_id} to agent {agent_id} into queue: {err}");
// Remove op id/agent id from set to prevent build-up of state.
state
.lock()
Expand All @@ -284,6 +296,15 @@ impl CoreFetch {
.unwrap()
.cool_down_list
.add_agent(agent_id.clone());
// Agent is unresponsive.
// Remove associated op ids from set to prevent build-up of state.
let mut lock = state.lock().unwrap();
lock.ops = lock
.ops
.clone()
.into_iter()
.filter(|(_, a)| *a != agent_id)
.collect();
}
}
}
Expand Down
Loading

0 comments on commit 43f2111

Please sign in to comment.