diff --git a/Cargo.toml b/Cargo.toml index 3af5843fe4..2ed327e7b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "examples/testing", "examples/request_local_state", "examples/request_guard", + "examples/sse", "examples/stream", "examples/json", "examples/msgpack", diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index 6249468ec5..282e87b734 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -20,6 +20,7 @@ all-features = true [features] default = ["private-cookies"] tls = ["rocket_http/tls"] +sse = [] private-cookies = ["rocket_http/private-cookies"] [dependencies] diff --git a/core/lib/src/ext.rs b/core/lib/src/ext.rs index 8813b74177..8124224665 100644 --- a/core/lib/src/ext.rs +++ b/core/lib/src/ext.rs @@ -1,18 +1,35 @@ use std::io; -pub trait ReadExt: io::Read { - fn read_max(&mut self, mut buf: &mut [u8]) -> io::Result { - let start_len = buf.len(); - while !buf.is_empty() { - match self.read(buf) { - Ok(0) => break, - Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; } - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } +fn read_max_internal(reader: &mut T, mut buf: &mut [u8], + wouldblock_flush_signalling: bool) + -> io::Result<(usize, bool)> { + let start_len = buf.len(); + let need_flush = loop { + if buf.is_empty() { break false } + match reader.read(buf) { + Ok(0) => { break true } + Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(ref e) if (e.kind() == io::ErrorKind::WouldBlock && + wouldblock_flush_signalling) => { break true } + Err(e) => return Err(e), } + }; - Ok(start_len - buf.len()) + Ok((start_len - buf.len(), need_flush)) +} + +pub trait ReadExt: io::Read + Sized { + fn read_max(&mut self, buf: &mut [u8]) -> io::Result { + Ok(read_max_internal(self, buf, false)?.0) + } + + /// Tries to fill buf with data. Short reads can occur for EOF or + /// flush requests. With SSE enabled, a flush request occurs if + /// the underlying reader returns ErrorKind::Wouldblock + fn read_max_wfs(&mut self, buf: &mut [u8]) + -> io::Result<(usize, bool)> { + read_max_internal(self, buf, cfg!(feature="sse")) } } diff --git a/core/lib/src/response/response.rs b/core/lib/src/response/response.rs index 2c94342b9f..4638983cfc 100644 --- a/core/lib/src/response/response.rs +++ b/core/lib/src/response/response.rs @@ -1018,6 +1018,12 @@ impl<'r> Response<'r> { /// [DEFAULT_CHUNK_SIZE](::response::DEFAULT_CHUNK_SIZE). Use /// [set_chunked_body](#method.set_chunked_body) for custom chunk sizes. /// + /// Normally, data will be buffered and sent only in complete + /// chunks. If you need timely transmission of available data, + /// rather than buffering, enable the `sse` feature and use the + /// `WouldBlock` technique described in + /// [Stream](::response::Stream). + /// /// # Example /// /// ```rust diff --git a/core/lib/src/response/stream.rs b/core/lib/src/response/stream.rs index f91d5beaff..e744660622 100644 --- a/core/lib/src/response/stream.rs +++ b/core/lib/src/response/stream.rs @@ -29,6 +29,30 @@ impl Stream { /// # #[allow(unused_variables)] /// let response = Stream::chunked(io::stdin(), 10); /// ``` + /// + /// # Buffering and blocking + /// + /// Normally, data will be buffered and sent only in complete + /// `chunk_size` chunks. + /// + /// With the feature `sse` enabled, the `Read`er may signal that + /// data sent so far should be transmitted in a timely fashion + /// (e.g. it is responding to a Server-Side Events (JavaScript + /// `EventSource`) request. To do this it should return an + /// [io::Error](std::io::Error) of kind `WouldBlock` (which should + /// not normally occur), after returning a collection of data. + /// This will cause a flush of data seen so far, rather than being + /// treated as an error. + /// + /// Note that long-running responses may easily exhaust Rocket's + /// thread pool, so consider increasing the number of threads. + /// If doing SSE, also note the 'maximum open connections' browser + /// limitation which is described in the + /// [EventSource documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) + /// on the Mozilla Developer Network. + /// + /// Without the `sse` feature, a `WouldBlock` error is treated + /// as an actual error. pub fn chunked(reader: T, chunk_size: u64) -> Stream { Stream(reader, chunk_size) } diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index 9a90f8bad3..d5d65e2a93 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -159,9 +159,12 @@ impl Rocket { let mut buffer = vec![0; chunk_size as usize]; let mut stream = hyp_res.start()?; loop { - match body.read_max(&mut buffer)? { - 0 => break, - n => stream.write_all(&buffer[..n])?, + match body.read_max_wfs(&mut buffer)? { + (0, _) => break, + (n, f) => { + stream.write_all(&buffer[..n])?; + if f { stream.flush()? } + }, } } diff --git a/examples/sse/Cargo.toml b/examples/sse/Cargo.toml new file mode 100644 index 0000000000..56d93f9b0f --- /dev/null +++ b/examples/sse/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "sse" +version = "0.0.0" +workspace = "../../" +publish = false + +[dependencies] +rocket = { path = "../../core/lib", features = ["sse"] } diff --git a/examples/sse/src/main.rs b/examples/sse/src/main.rs new file mode 100644 index 0000000000..c7bb4ae92b --- /dev/null +++ b/examples/sse/src/main.rs @@ -0,0 +1,79 @@ +#![feature(proc_macro_hygiene, decl_macro)] +#[macro_use] +extern crate rocket; + +use rocket::http::ContentType; +use rocket::response::Content; +use rocket::response::Responder; +use std::io::BufReader; +use std::io::Read; +use std::thread::sleep; +use std::time::Duration; + +#[get("/")] +fn index<'r>() -> impl Responder<'r> { + Content( + ContentType::HTML, + r##" + +

Hi!

+ +
nothing yet
+ + + +"##, + ) +} + +#[get("/script.js")] +fn script<'r>() -> impl Responder<'r> { + Content( + ContentType::JavaScript, + r##" +status_node = document.getElementById('spong'); +status_node.innerHTML = 'js-done' + +es = new EventSource("updates"); +es.onmessage = function(event) { + status_node.innerHTML = event.data; +} +"##, + ) +} + +const BUF_SIZE : usize = 4096; + +type TestCounter = BufReader; +#[derive(Debug)] +struct TestCounterInner { + next: usize, +} +impl Read for TestCounterInner { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + sleep(Duration::from_millis(500)); + let data = format!("data: {}\n\n", self.next); + self.next += 1; + // `BufReader` won't call us unless its buffer is empty, and + // then buf will be the whole of the buffer, ie of size + // BUF_SIZE (due to the `with_capacity` call). So `data` is + // definitely going to fit. + buf[0..data.len()].copy_from_slice(data.as_bytes()); + Ok(buf.len()) + } +} + +#[get("/updates")] +fn updates<'x>() -> impl Responder<'x> { + let tc = TestCounterInner { next: 0 }; + let tc = BufReader::with_capacity(BUF_SIZE, tc); + let ch = rocket::response::Stream::from(tc); + let ct = ContentType::parse_flexible("text/event-stream; charset=utf-8").unwrap(); + Content(ct, ch) +} + +fn main() { + rocket::ignite() + .mount("/", routes![index, script, updates,]) + .launch(); +}