Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Fix user disconnects
Browse files Browse the repository at this point in the history
In: https://github.com/whamcloud/integrated-manager-for-lustre/blob/83d065653a55ee4c1a6068c1d024f02a02c73172/iml-warp-drive/src/users.rs#L54-L58

We spawn a task to disconnect users. However, we move both the tx and rx
handles into the task and block on the tx being cancelled.

Due to this approach, we will not disconnect users and instead buffer
their handles.

Instead, we should use .retain when sending to users and remove the ones
that error.

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund committed Jun 24, 2020
1 parent 7917fb0 commit b0e0358
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 57 deletions.
88 changes: 69 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions iml-warp-drive/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# IML Warp Drive

## Overview

The `iml-warp-drive` crate is responsible for providing a [Server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) endpoint. It is accessible at `/messaging` on a running manager instance.

The endpoint provides near-realtime data to consumers based off of changes to the IML database via [LISTEN / NOTIFY](https://www.postgresql.org/docs/9.6/sql-notify.html).

Internally, the crate uses [warp](https://github.com/seanmonstar/warp) to handle SSE connections.
50 changes: 12 additions & 38 deletions iml-warp-drive/src/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,7 @@
// license that can be found in the LICENSE file.

use crate::locks::Locks;
use futures::{
channel::{mpsc, oneshot},
future::poll_fn,
lock::Mutex,
Stream, StreamExt,
};
use futures::{channel::mpsc, lock::Mutex, Stream, StreamExt};
use im::HashMap;
use iml_wire_types::warp_drive::{Cache, Message};
use std::sync::{
Expand Down Expand Up @@ -42,51 +37,30 @@ pub async fn user_connected(
// Save the sender in our list of connected users.
state.lock().await.insert(id, tx);

// Make an extra clone of users list to give to our disconnection handler...
let state2 = Arc::clone(&state);

// Create channel to track disconnecting the receiver side of events.
// This is little bit tricky.
let (mut dtx, mut drx) = oneshot::channel::<()>();

// When `drx` is dropped then `dtx` will be canceled.
// We can track it to make sure when the user disconnects.
tokio::spawn(async move {
poll_fn(move |cx| dtx.poll_canceled(cx)).await;
drx.close();
user_disconnected(id, &state2).await;
});

// Convert messages into Server-Sent Events and return resulting stream.
rx.map(|msg| Ok(warp::sse::data(serde_json::to_string(&msg).unwrap())))
}

/// Sends a message to each connected user
/// Any users for whom `unbounded_send` returns an error
/// will be dropped.
pub async fn send_message(msg: Message, state: SharedUsers) {
tracing::debug!("Sending message {:?} to users {:?}", msg, state);

let lock = state.lock().await;
let mut lock = state.lock().await;

for (_, tx) in lock.iter() {
match tx.unbounded_send(msg.clone()) {
Ok(()) => (),
Err(_disconnected) => {
// The tx is disconnected, our `user_disconnected` code
// should be happening in another task, nothing more to
// do here.
}
lock.retain(|id, tx| match tx.unbounded_send(msg.clone()) {
Ok(()) => true,
Err(_disconnected) => {
tracing::debug!("user {} disconnected", id);

false
}
}
});
}

pub async fn disconnect_all_users(state: SharedUsers) {
tracing::info!("Flushing all users");

state.lock().await.clear();
}

pub async fn user_disconnected(id: usize, state: &SharedUsers) {
tracing::debug!("user {} disconnected", id);

// Stream ended, so remove from the user list
state.lock().await.remove(&id);
}

0 comments on commit b0e0358

Please sign in to comment.