Skip to content

Commit

Permalink
Lossy reader
Browse files Browse the repository at this point in the history
  • Loading branch information
algesten committed Jul 27, 2024
1 parent ec15496 commit 68cd7bc
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 55 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ log = "0.4.22"
thiserror = "1.0.61"
once_cell = "1.19.0"
smallvec = "1.13.2"
utf-8 = "0.7.6"

# These are used regardless of TLS implementation.
rustls-pemfile = { version = "2.1.2", optional = true, default-features = false, features = ["std"] }
Expand Down
2 changes: 1 addition & 1 deletion examples/cureq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn run(opt: &Opt) -> Result<(), ureq::Error> {

const MAX_BODY_SIZE: u64 = 5 * 1024 * 1024;

let reader = BufReader::new(res.body_mut().as_reader(MAX_BODY_SIZE));
let reader = BufReader::new(res.body_mut().as_reader_with_config(MAX_BODY_SIZE, true));
let mut lines = reader.lines();

while let Some(r) = lines.next() {
Expand Down
2 changes: 1 addition & 1 deletion src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{WithBody, WithoutBody};
/// .call()
/// .unwrap()
/// .body_mut()
/// .read_to_string(1000)
/// .read_to_string(1000, false)
/// .unwrap();
///
/// println!("Secret is: {}", secret);
Expand Down
4 changes: 2 additions & 2 deletions src/body/charset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ where

impl<R: io::Read> io::Read for CharCodec<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.reached_end && self.buf.is_empty() {
if self.reached_end && self.buf.unconsumed().is_empty() {
return Ok(0);
}

let input = 'read: {
if self.buf.len() > MAX_OUTPUT / 4 {
if self.buf.unconsumed().len() > MAX_OUTPUT / 4 {
// Do not keep filling if we have unused output.
break 'read self.reader.buffer();
}
Expand Down
2 changes: 1 addition & 1 deletion src/body/gzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod test {
);

let mut res = agent.get("https://example.test/gz_body").call().unwrap();
res.body_mut().read_to_string(1000).unwrap();
res.body_mut().read_to_string(1000, false).unwrap();

assert_eq!(agent.pool_count(), 1);
}
Expand Down
4 changes: 2 additions & 2 deletions src/body/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mod test {
init_test_log();
set_handler("/get", 200, &[("content-length", "10")], b"hello");
let mut res = crate::get("https://my.test/get").call().unwrap();
let err = res.body_mut().read_to_string(1000).unwrap_err();
let err = res.body_mut().read_to_string(1000, false).unwrap_err();
let ioe = err.into_io();
assert_eq!(ioe.kind(), io::ErrorKind::UnexpectedEof);
}
Expand All @@ -58,7 +58,7 @@ mod test {
init_test_log();
set_handler("/get", 200, &[("content-length", "5")], b"hello");
let mut res = crate::get("https://my.test/get").call().unwrap();
let err = res.body_mut().read_to_string(3).unwrap_err();
let err = res.body_mut().read_to_string(3, false).unwrap_err();
println!("{:?}", err);
assert!(matches!(err, Error::BodyExceedsLimit(3)));
}
Expand Down
172 changes: 172 additions & 0 deletions src/body/lossy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use std::io;

use utf8::DecodeError;

use crate::util::ConsumeBuf;

const REPLACEMENT_CHAR: u8 = b'?';
const MIN_BUF: usize = 8;

pub struct LossyUtf8Reader<R> {
reader: R,
ended: bool,
input: ConsumeBuf,
}
impl<R> LossyUtf8Reader<R> {
pub(crate) fn new(reader: R) -> Self {
Self {
reader,
ended: false,
input: ConsumeBuf::new(8),
}
}
}

impl<R: io::Read> io::Read for LossyUtf8Reader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// Match the input buffer size
if !self.ended {
let total_len = self.input.unconsumed().len() + self.input.free_mut().len();
let wanted_len = buf.len().max(MIN_BUF);
if wanted_len < total_len {
self.input.add_space(total_len - wanted_len);
}
}

// Fill up to a point where we definitely will make progress.
while !self.ended && self.input.unconsumed().len() < MIN_BUF {
let amount = self.reader.read(self.input.free_mut())?;
self.input.add_filled(amount);

if amount == 0 {
self.ended = true;
}
}

if self.ended && self.input.unconsumed().is_empty() {
return Ok(0);
}

let valid_len = match utf8::decode(self.input.unconsumed()) {
Ok(_) => {
// Entire input is valid
self.input.unconsumed().len()
}
Err(e) => match e {
DecodeError::Invalid {
valid_prefix,
invalid_sequence,
..
} => {
let valid_len = valid_prefix.as_bytes().len();
let invalid_len = invalid_sequence.len();

// Switch out the problem input chars
let replace_in = self.input.unconsumed_mut();
for i in 0..invalid_len {
replace_in[valid_len + i] = REPLACEMENT_CHAR;
}

valid_len + invalid_len
}
DecodeError::Incomplete { valid_prefix, .. } => {
let valid_len = valid_prefix.len();

if self.ended {
// blank the rest
let replace_in = self.input.unconsumed_mut();
let invalid_len = replace_in.len() - valid_len;
for i in 0..invalid_len {
replace_in[valid_len + i] = REPLACEMENT_CHAR;
}
valid_len + invalid_len
} else {
valid_len
}
}
},
};
assert!(valid_len > 0);

let src = &self.input.unconsumed()[..valid_len];
let max = src.len().min(buf.len());
buf[..max].copy_from_slice(&src[..max]);
self.input.consume(max);

Ok(max)
}
}

#[cfg(test)]
mod test {
use std::io::Read;

use super::*;

fn do_reader<'a>(bytes: &'a mut [&'a [u8]]) -> String {
let mut r = LossyUtf8Reader::new(TestReader(bytes));
let mut buf = String::new();
r.read_to_string(&mut buf).unwrap();
buf
}

#[test]
fn ascii() {
assert_eq!(do_reader(&mut [b"abc123"]), "abc123");
}

#[test]
fn utf8_one_read() {
assert_eq!(do_reader(&mut ["åiåaäeö".as_bytes()]), "åiåaäeö");
}

#[test]
fn utf8_chopped_single_char() {
assert_eq!(do_reader(&mut [&[195], &[165]]), "å");
}

#[test]
fn utf8_chopped_prefix_ascii() {
assert_eq!(do_reader(&mut [&[97, 97, 97, 195], &[165]]), "aaaå");
}

#[test]
fn utf8_chopped_suffix_ascii() {
assert_eq!(do_reader(&mut [&[195], &[165, 97, 97, 97]]), "åaaa");
}

#[test]
fn utf8_broken_single() {
assert_eq!(do_reader(&mut [&[195]]), "?");
}

#[test]
fn utf8_broken_suffix_ascii() {
assert_eq!(do_reader(&mut [&[195, 97, 97, 97]]), "?aaa");
}

#[test]
fn utf8_broken_prefix_ascii() {
assert_eq!(do_reader(&mut [&[97, 97, 97, 195]]), "aaa?");
}

struct TestReader<'a>(&'a mut [&'a [u8]]);

impl<'a> io::Read for TestReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.0.iter().all(|c| c.is_empty()) {
return Ok(0);
}

let pos = self.0.iter().position(|c| !c.is_empty()).unwrap();
let cur = &self.0[pos];

let max = cur.len().min(buf.len());
buf[..max].copy_from_slice(&cur[..max]);

self.0[pos] = &cur[max..];

Ok(max)
}
}
}
Loading

0 comments on commit 68cd7bc

Please sign in to comment.