diff --git a/Cargo.lock b/Cargo.lock index a57e405f67..f0b36c6b15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "cloudabi" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467" +dependencies = [ + "bitflags", +] + [[package]] name = "colored" version = "1.9.3" @@ -1799,6 +1808,12 @@ dependencies = [ "serde_yaml", ] +[[package]] +name = "instant" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7777a24a1ce5de49fcdde84ec46efa487c3af49d5b6e6e0a50367cc5c1096182" + [[package]] name = "iovec" version = "0.1.4" @@ -1862,9 +1877,9 @@ dependencies = [ [[package]] name = "lapin" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8cea80315b4bf8f362dd1866242e51889fd78e0ac8ed954f8c718553779ab28" +checksum = "cf258f012529f87ac3bfc144951f024ff3f87217b6737f10cc6d53f190f6a07e" dependencies = [ "amq-protocol", "async-task", @@ -1872,7 +1887,7 @@ dependencies = [ "futures-core", "log 0.4.8", "mio 0.7.0", - "parking_lot", + "parking_lot 0.11.0", "pinky-swear", ] @@ -1955,6 +1970,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lock_api" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de302ce1fe7482db13738fbaf2e21cfb06a986b89c0bf38d88abf16681aada4e" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.3.9" @@ -2351,8 +2375,19 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" dependencies = [ - "lock_api", - "parking_lot_core", + "lock_api 0.3.4", + "parking_lot_core 0.7.2", +] + +[[package]] +name = "parking_lot" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4893845fa2ca272e647da5d0e46660a314ead9c2fdd9a883aabc32e481a8733" +dependencies = [ + "instant", + "lock_api 0.4.0", + "parking_lot_core 0.8.0", ] [[package]] @@ -2362,7 +2397,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" dependencies = [ "cfg-if", - "cloudabi", + "cloudabi 0.0.3", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.8", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b" +dependencies = [ + "cfg-if", + "cloudabi 0.1.0", + "instant", "libc", "redox_syscall", "smallvec", @@ -2483,13 +2533,13 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pinky-swear" -version = "4.0.1" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1e26b89412ebb77b27ff1c04ab91568d572b0780e86800945a07df42eddbdf" +checksum = "69d77f4b87d15fa8a91f6397ed103bed133e26701ccea39a3736c18c0e822129" dependencies = [ "doc-comment", "log 0.4.8", - "parking_lot", + "parking_lot 0.11.0", ] [[package]] @@ -2615,7 +2665,7 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa" dependencies = [ - "unicode-xid 0.2.0", + "unicode-xid 0.2.1", ] [[package]] @@ -2665,7 +2715,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1497e40855348e4a8a40767d8e55174bce1e445a3ac9254ad44ad468ee0485af" dependencies = [ "log 0.4.8", - "parking_lot", + "parking_lot 0.10.2", "scheduled-thread-pool", ] @@ -2802,7 +2852,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" dependencies = [ - "cloudabi", + "cloudabi 0.0.3", "fuchsia-cprng", "libc", "rand_core 0.4.2", @@ -3000,7 +3050,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0988d7fdf88d5e5fcf5923a0f1e8ab345f3e98ab4bc6bc45a2d5ff7f7458fbf6" dependencies = [ - "parking_lot", + "parking_lot 0.10.2", ] [[package]] @@ -3381,7 +3431,7 @@ checksum = "e8d5d96e8cbb005d6959f119f773bfaebb5684296108fb32600c00cde305b2cd" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", - "unicode-xid 0.2.0", + "unicode-xid 0.2.1", ] [[package]] @@ -3406,9 +3456,9 @@ dependencies = [ [[package]] name = "tcp-stream" -version = "0.19.4" +version = "0.19.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9be1176f6da7ecc64c24ee636ebad264eba7051ada9c5103aa3e99d49d88694c" +checksum = "5d7c73e237527c79ff01f993848361c73996dcc1a9aef5c302536218861b8535" dependencies = [ "cfg-if", "mio 0.7.0", @@ -3612,7 +3662,7 @@ dependencies = [ "fallible-iterator", "futures", "log 0.4.8", - "parking_lot", + "parking_lot 0.10.2", "percent-encoding", "phf 0.8.0", "pin-project-lite", @@ -3860,9 +3910,9 @@ checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" [[package]] name = "unicode-xid" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" [[package]] name = "url" diff --git a/iml-warp-drive/README.md b/iml-warp-drive/README.md new file mode 100644 index 0000000000..7920b2f18b --- /dev/null +++ b/iml-warp-drive/README.md @@ -0,0 +1,9 @@ +# IML Warp Drive + +## Overview + +The `iml-warp-drive` crate is responsible for providing a [Server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) endpoint. It is accessible at `/messaging` on a running manager instance. + +The endpoint provides near-realtime data to consumers based off of changes to the IML database via [LISTEN / NOTIFY](https://www.postgresql.org/docs/9.6/sql-notify.html). + +Internally, the crate uses [warp](https://github.com/seanmonstar/warp) to handle SSE connections. diff --git a/iml-warp-drive/src/users.rs b/iml-warp-drive/src/users.rs index 2a2d37e8c3..fedfb34e10 100644 --- a/iml-warp-drive/src/users.rs +++ b/iml-warp-drive/src/users.rs @@ -3,12 +3,7 @@ // license that can be found in the LICENSE file. use crate::locks::Locks; -use futures::{ - channel::{mpsc, oneshot}, - future::poll_fn, - lock::Mutex, - Stream, StreamExt, -}; +use futures::{channel::mpsc, lock::Mutex, Stream, StreamExt}; use im::HashMap; use iml_wire_types::warp_drive::{Cache, Message}; use std::sync::{ @@ -42,40 +37,26 @@ pub async fn user_connected( // Save the sender in our list of connected users. state.lock().await.insert(id, tx); - // Make an extra clone of users list to give to our disconnection handler... - let state2 = Arc::clone(&state); - - // Create channel to track disconnecting the receiver side of events. - // This is little bit tricky. - let (mut dtx, mut drx) = oneshot::channel::<()>(); - - // When `drx` is dropped then `dtx` will be canceled. - // We can track it to make sure when the user disconnects. - tokio::spawn(async move { - poll_fn(move |cx| dtx.poll_canceled(cx)).await; - drx.close(); - user_disconnected(id, &state2).await; - }); - // Convert messages into Server-Sent Events and return resulting stream. rx.map(|msg| Ok(warp::sse::data(serde_json::to_string(&msg).unwrap()))) } +/// Sends a message to each connected user +/// Any users for whom `unbounded_send` returns an error +/// will be dropped. pub async fn send_message(msg: Message, state: SharedUsers) { tracing::debug!("Sending message {:?} to users {:?}", msg, state); - let lock = state.lock().await; + let mut lock = state.lock().await; - for (_, tx) in lock.iter() { - match tx.unbounded_send(msg.clone()) { - Ok(()) => (), - Err(_disconnected) => { - // The tx is disconnected, our `user_disconnected` code - // should be happening in another task, nothing more to - // do here. - } + lock.retain(|id, tx| match tx.unbounded_send(msg.clone()) { + Ok(()) => true, + Err(_disconnected) => { + tracing::debug!("user {} disconnected", id); + + false } - } + }); } pub async fn disconnect_all_users(state: SharedUsers) { @@ -83,10 +64,3 @@ pub async fn disconnect_all_users(state: SharedUsers) { state.lock().await.clear(); } - -pub async fn user_disconnected(id: usize, state: &SharedUsers) { - tracing::debug!("user {} disconnected", id); - - // Stream ended, so remove from the user list - state.lock().await.remove(&id); -}