Skip to content

Commit

Permalink
fix(offchain): Avoid starvation in the offchain monitor
Browse files Browse the repository at this point in the history
This addresses both sides of the issue, by making sure the task
holding the `CallAll` doesn't hang, and by removing the concurrency
control done by `Buffer`, which may be the reason why PR #4570 didn't
fully work.
  • Loading branch information
leoyvens committed May 26, 2023
1 parent 15941da commit 3b21f8d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
5 changes: 3 additions & 2 deletions core/src/polling_monitor/ipfs_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ pub fn ipfs_service(
.service_fn(move |req| ipfs.cheap_clone().call_inner(req))
.boxed();

// The `Buffer` makes it so the rate and concurrency limit are shared among clones.
Buffer::new(svc, 1)
// The `Buffer` makes it so the rate limit is shared among clones.
// Make it unbounded to avoid any risk of starvation.
Buffer::new(svc, u32::MAX as usize)
}

#[derive(Clone)]
Expand Down
13 changes: 8 additions & 5 deletions core/src/polling_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<T> Queue<T> {
/// `Option`, to represent the object not being found.
pub fn spawn_monitor<ID, S, E, Res: Send + 'static>(
service: S,
response_sender: mpsc::Sender<(ID, Res)>,
response_sender: mpsc::UnboundedSender<(ID, Res)>,
logger: Logger,
metrics: PollingMonitorMetrics,
) -> PollingMonitor<ID>
Expand Down Expand Up @@ -149,10 +149,13 @@ where
let mut backoffs = Backoffs::new();
let mut responses = service.call_all(queue_to_stream).unordered().boxed();
while let Some(response) = responses.next().await {
// Note: Be careful not to `await` within this loop, as that could block requests in
// the `CallAll` from being polled. This can cause starvation as those requests may
// be holding on to resources such as slots for concurrent calls.
match response {
Ok((id, Some(response))) => {
backoffs.remove(&id);
let send_result = response_sender.send((id, response)).await;
let send_result = response_sender.send((id, response));
if send_result.is_err() {
// The receiver has been dropped, cancel this task.
break;
Expand Down Expand Up @@ -250,10 +253,10 @@ mod tests {
fn setup() -> (
mock::Handle<&'static str, Option<&'static str>>,
PollingMonitor<&'static str>,
mpsc::Receiver<(&'static str, &'static str)>,
mpsc::UnboundedReceiver<(&'static str, &'static str)>,
) {
let (svc, handle) = mock::pair();
let (tx, rx) = mpsc::channel(10);
let (tx, rx) = mpsc::unbounded_channel();
let monitor = spawn_monitor(svc, tx, log::discard(), PollingMonitorMetrics::mock());
(handle, monitor, rx)
}
Expand All @@ -263,7 +266,7 @@ mod tests {
let (svc, mut handle) = mock::pair();
let shared_svc = tower::buffer::Buffer::new(tower::limit::ConcurrencyLimit::new(svc, 1), 1);
let make_monitor = |svc| {
let (tx, rx) = mpsc::channel(10);
let (tx, rx) = mpsc::unbounded_channel();
let metrics = PollingMonitorMetrics::mock();
let monitor = spawn_monitor(svc, tx, log::discard(), metrics);
(monitor, rx)
Expand Down
6 changes: 4 additions & 2 deletions core/src/subgraph/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {

pub struct OffchainMonitor {
ipfs_monitor: PollingMonitor<CidFile>,
ipfs_monitor_rx: mpsc::Receiver<(CidFile, Bytes)>,
ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>,
}

impl OffchainMonitor {
Expand All @@ -195,7 +195,9 @@ impl OffchainMonitor {
subgraph_hash: &DeploymentHash,
ipfs_service: IpfsService,
) -> Self {
let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::channel(10);
// The channel is unbounded, as it is expected that `fn ready_offchain_events` is called
// frequently, or at least with the same frequency that requests are sent.
let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::unbounded_channel();
let ipfs_monitor = spawn_monitor(
ipfs_service,
ipfs_monitor_tx,
Expand Down

0 comments on commit 3b21f8d

Please sign in to comment.