From 1510729739a1eec19d189bc7e0d41884515dbdce Mon Sep 17 00:00:00 2001 From: Threated Date: Wed, 16 Oct 2024 19:25:25 +0200 Subject: [PATCH] fix(sse): skip sse incompatible chars of `serde_json::RawValue` --- axum/Cargo.toml | 2 +- axum/src/response/sse.rs | 32 +++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/axum/Cargo.toml b/axum/Cargo.toml index e9e6c646f7..adf45db775 100644 --- a/axum/Cargo.toml +++ b/axum/Cargo.toml @@ -117,7 +117,7 @@ quickcheck = "1.0" quickcheck_macros = "1.0" reqwest = { version = "0.12", default-features = false, features = ["json", "stream", "multipart"] } serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json = { version = "1.0", features = ["raw_value"]} time = { version = "0.3", features = ["serde-human-readable"] } tokio = { package = "tokio", version = "1.25.0", features = ["macros", "rt", "rt-multi-thread", "net", "test-util"] } tokio-stream = "0.1" diff --git a/axum/src/response/sse.rs b/axum/src/response/sse.rs index e77b8c78a8..48e8c78420 100644 --- a/axum/src/response/sse.rs +++ b/axum/src/response/sse.rs @@ -213,7 +213,24 @@ impl Event { } self.buffer.extend_from_slice(b"data: "); - serde_json::to_writer((&mut self.buffer).writer(), &data).map_err(axum_core::Error::new)?; + struct IgnoreNewLines<'a>(bytes::buf::Writer<&'a mut BytesMut>); + impl std::io::Write for IgnoreNewLines<'_> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut last_split = 0; + for delimiter in memchr::memchr2_iter(b'\n', b'\r', buf) { + self.0.write(&buf[last_split..delimiter])?; + last_split = delimiter + 1; + } + self.0.write(&buf[last_split..])?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } + } + serde_json::to_writer(IgnoreNewLines((&mut self.buffer).writer()), &data) + .map_err(axum_core::Error::new)?; self.buffer.put_u8(b'\n'); self.flags.insert(EventFlags::HAS_DATA); @@ -515,6 +532,7 @@ mod tests { use super::*; use crate::{routing::get, test_helpers::*, Router}; use futures_util::stream; + use serde_json::value::RawValue; use std::{collections::HashMap, convert::Infallible}; use tokio_stream::StreamExt as _; @@ -527,6 +545,18 @@ mod tests { assert_eq!(&*leading_space.finalize(), b"data: foobar\n\n"); } + #[test] + fn valid_json_raw_value_chars_stripped() { + let json_string = "{\r\"foo\": \n\r\r \"bar\\n\"\n}"; + let json_raw_value_event = Event::default() + .json_data(serde_json::from_str::<&RawValue>(&json_string).unwrap()) + .unwrap(); + assert_eq!( + &*json_raw_value_event.finalize(), + format!("data: {}\n\n", json_string.replace(['\n', '\r'], "")).as_bytes() + ); + } + #[crate::test] async fn basic() { let app = Router::new().route(