From 1598e6b8bdf3235968240968937d4c400fa58d09 Mon Sep 17 00:00:00 2001 From: Roland Rauch Date: Tue, 20 Aug 2024 11:50:07 +0200 Subject: [PATCH 1/7] Fix: On recent nightly (since 2024-06-13) compilation fails with 'error: item does not constrain ... but has it in its signature'. This is caused by changes in TAIT handling. This commit works around that and fixes the compilation error --- src/lib.rs | 99 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 60 insertions(+), 39 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index acd33e1..88ae7d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,8 +36,8 @@ use std::io::{Error as IoError, ErrorKind, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; +use crate::inner::{send_request, RequestStream, RequestStreamFuture}; use bytes::Bytes; -use futures_util::{FutureExt, Stream, StreamExt}; use pin_project::pin_project; use reqwest::{RequestBuilder, StatusCode}; use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; @@ -58,12 +58,65 @@ impl std::fmt::Display for HttpResponseStatusError { impl std::error::Error for HttpResponseStatusError {} -// Unfortunately, `reqwest::Response::bytes_stream()` returns -// an unnamed type. Since we need to store this type, -// we need to be able to name it, thus requiring the TAIT here. -type RequestStream = impl Stream>; -type StreamData = (Option, RequestStream); -type RequestStreamFuture = impl Future>; +// Due to changes in TAIT handling, TAITs cannot remain unconstrained if they appear +// in a signature. In the long run, the `#[defines]` attribute will be used to mark +// defining methods. Until then, the current workaround is to move the TAIT declaration +// as well as all methods that actually constrain its hidden type into its own submodule. +// see: https://github.com/rust-lang/rust/pull/113169 +mod inner { + use crate::{determine_size, to_io_error, HttpResponseStatusError}; + use bytes::Bytes; + use futures_util::{FutureExt, Stream, StreamExt}; + use reqwest::{RequestBuilder, StatusCode}; + use std::future::Future; + use std::io::{Error as IoError, ErrorKind}; + + // Unfortunately, `reqwest::Response::bytes_stream()` returns + // an unnamed type. Since we need to store this type, + // we need to be able to name it, thus requiring the TAIT here. + pub type RequestStream = impl Stream>; + pub type StreamData = (Option, RequestStream); + pub type RequestStreamFuture = + impl Future>; + + /// Send a request with a `Range` header, + /// returning a future for the response stream. + pub fn send_request( + request: &RequestBuilder, + offset: u64, + ) -> RequestStreamFuture { + let request = request + .try_clone() + .expect("request contains streaming body"); + request + .header(reqwest::header::RANGE, format!("bytes={offset}-")) + .send() + .map(move |result| { + result + .and_then(|response| response.error_for_status()) + .map_err(to_io_error) + .and_then(|response| { + if response.status() == StatusCode::OK { + if offset != 0 { + return Err(ErrorKind::NotSeekable.into()); + } + } else if response.status() + != StatusCode::PARTIAL_CONTENT + { + let error = + HttpResponseStatusError(response.status()); + return Err(to_io_error(error)); + } + let size = determine_size(offset, &response); + let stream = response + .bytes_stream() + .map(|result| result.map_err(to_io_error)); + + Ok((size, stream)) + }) + }) + } +} fn parse_content_length(headers: &reqwest::header::HeaderMap) -> Option { headers @@ -102,38 +155,6 @@ fn determine_size(offset: u64, response: &reqwest::Response) -> Option { } } -/// Send a request with a `Range` header, -/// returning a future for the response stream. -fn send_request(request: &RequestBuilder, offset: u64) -> RequestStreamFuture { - let request = request - .try_clone() - .expect("request contains streaming body"); - request - .header(reqwest::header::RANGE, format!("bytes={offset}-")) - .send() - .map(move |result| { - result - .and_then(|response| response.error_for_status()) - .map_err(to_io_error) - .and_then(|response| { - if response.status() == StatusCode::OK { - if offset != 0 { - return Err(ErrorKind::NotSeekable.into()); - } - } else if response.status() != StatusCode::PARTIAL_CONTENT { - let error = HttpResponseStatusError(response.status()); - return Err(to_io_error(error)); - } - let size = determine_size(offset, &response); - let stream = response - .bytes_stream() - .map(|result| result.map_err(to_io_error)); - - Ok((size, stream)) - }) - }) -} - /// Try to read `delta` bytes from a stream, /// returning how many bytes were read and remain to be read. /// From 82b4b4a5456be05dd6d0aa406be1de63427d3215 Mon Sep 17 00:00:00 2001 From: Roland Rauch Date: Tue, 20 Aug 2024 11:53:37 +0200 Subject: [PATCH 2/7] Removed obsolete 'mixed_integer_ops' feature-flag as has been stabilized and doesn't require a flag any more. --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 88ae7d2..f3230e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,7 @@ //! # }) //! ``` -#![feature(type_alias_impl_trait, mixed_integer_ops, io_error_more)] +#![feature(type_alias_impl_trait, io_error_more)] #![forbid(unsafe_code)] use std::future::Future; From 28fcccf92995110f919b8e2a35753ac674679eaf Mon Sep 17 00:00:00 2001 From: Roland Rauch Date: Tue, 20 Aug 2024 13:46:07 +0200 Subject: [PATCH 3/7] Feat: Support for stable Rust added. This is the default configuration. Nightly functions remain intact behind the `nightly` feature flag. --- Cargo.toml | 3 ++ src/lib.rs | 130 +++++++++++++++++++++++++++++++++++++---------------- 2 files changed, 94 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 11cb504..a72e2d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,9 @@ tokio = { version = "1.16.1", default-features = false } tokio-util = { version = "0.7.0", default-features = false, features = ["io"] } reqwest = { version = "0.11.9", default-features = false, features = ["stream"] } +[features] +nightly = [] + [dev-dependencies] paste = "1.0.6" serde = { version = "1.0.136", default-features = false, features = ["derive"] } diff --git a/src/lib.rs b/src/lib.rs index f3230e2..fdfbc60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,10 @@ //! # }) //! ``` -#![feature(type_alias_impl_trait, io_error_more)] +#![cfg_attr( + feature = "nightly", + feature(type_alias_impl_trait, io_error_more) +)] #![forbid(unsafe_code)] use std::future::Future; @@ -38,6 +41,7 @@ use std::task::{Context, Poll}; use crate::inner::{send_request, RequestStream, RequestStreamFuture}; use bytes::Bytes; +use futures_util::{FutureExt, StreamExt}; use pin_project::pin_project; use reqwest::{RequestBuilder, StatusCode}; use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; @@ -63,18 +67,15 @@ impl std::error::Error for HttpResponseStatusError {} // defining methods. Until then, the current workaround is to move the TAIT declaration // as well as all methods that actually constrain its hidden type into its own submodule. // see: https://github.com/rust-lang/rust/pull/113169 +#[cfg(feature = "nightly")] mod inner { - use crate::{determine_size, to_io_error, HttpResponseStatusError}; - use bytes::Bytes; - use futures_util::{FutureExt, Stream, StreamExt}; - use reqwest::{RequestBuilder, StatusCode}; - use std::future::Future; - use std::io::{Error as IoError, ErrorKind}; + use super::*; // Unfortunately, `reqwest::Response::bytes_stream()` returns // an unnamed type. Since we need to store this type, // we need to be able to name it, thus requiring the TAIT here. - pub type RequestStream = impl Stream>; + pub type RequestStream = + impl futures_util::Stream>; pub type StreamData = (Option, RequestStream); pub type RequestStreamFuture = impl Future>; @@ -85,37 +86,81 @@ mod inner { request: &RequestBuilder, offset: u64, ) -> RequestStreamFuture { - let request = request - .try_clone() - .expect("request contains streaming body"); - request - .header(reqwest::header::RANGE, format!("bytes={offset}-")) - .send() - .map(move |result| { - result - .and_then(|response| response.error_for_status()) - .map_err(to_io_error) - .and_then(|response| { - if response.status() == StatusCode::OK { - if offset != 0 { - return Err(ErrorKind::NotSeekable.into()); - } - } else if response.status() - != StatusCode::PARTIAL_CONTENT - { - let error = - HttpResponseStatusError(response.status()); - return Err(to_io_error(error)); - } - let size = determine_size(offset, &response); - let stream = response - .bytes_stream() - .map(|result| result.map_err(to_io_error)); - - Ok((size, stream)) - }) - }) + let request = prepare_request(request, offset); + request.send().map(move |result| { + result + .and_then(|response| response.error_for_status()) + .map_err(to_io_error) + .and_then(|response| { + let size = process_response(&response, offset)?; + let stream = response + .bytes_stream() + .map(|result| result.map_err(to_io_error)); + + Ok((size, stream)) + }) + }) + } +} + +#[cfg(not(feature = "nightly"))] +mod inner { + use super::*; + use futures_util::future::BoxFuture; + use futures_util::stream::BoxStream; + + pub type RequestStream = BoxStream<'static, Result>; + pub type StreamData = (Option, RequestStream); + pub type RequestStreamFuture = + BoxFuture<'static, Result>; + + /// Send a request with a `Range` header, + /// returning a future for the response stream. + pub fn send_request( + request: &RequestBuilder, + offset: u64, + ) -> RequestStreamFuture { + let request = prepare_request(request, offset); + Box::pin(request.send().map(move |result| { + result + .and_then(|response| response.error_for_status()) + .map_err(to_io_error) + .and_then(|response| { + let size = process_response(&response, offset)?; + let stream = response + .bytes_stream() + .map(|result| result.map_err(to_io_error)) + .boxed(); + + Ok((size, stream)) + }) + })) + } +} + +fn prepare_request(request: &RequestBuilder, offset: u64) -> RequestBuilder { + let request = request + .try_clone() + .expect("request contains streaming body"); + request.header(reqwest::header::RANGE, format!("bytes={offset}-")) +} + +fn process_response( + response: &reqwest::Response, + offset: u64, +) -> Result, IoError> { + if response.status() == StatusCode::OK { + if offset != 0 { + #[cfg(feature = "nightly")] + return Err(ErrorKind::NotSeekable.into()); + #[cfg(not(feature = "nightly"))] + return Err(ErrorKind::Unsupported.into()); + } + } else if response.status() != StatusCode::PARTIAL_CONTENT { + let error = HttpResponseStatusError(response.status()); + return Err(to_io_error(error)); } + Ok(determine_size(offset, response)) } fn parse_content_length(headers: &reqwest::header::HeaderMap) -> Option { @@ -1049,7 +1094,14 @@ mod tests { } #[tokio::test] - #[should_panic(expected = "seek error: Kind(NotSeekable)")] + #[cfg_attr( + feature = "nightly", + should_panic(expected = "seek error: Kind(NotSeekable)") + )] + #[cfg_attr( + not(feature = "nightly"), + should_panic(expected = "seek error: Kind(Unsupported)") + )] async fn test_seek_no_range_support() { let url = start_server(); let client = reqwest::Client::new(); From aa01c0f2ee072505370001107576204afd4d868d Mon Sep 17 00:00:00 2001 From: Roland Rauch Date: Tue, 20 Aug 2024 13:56:23 +0200 Subject: [PATCH 4/7] Added explicit end value to request byte range if known to improve compatibility with non-spec conforming servers --- src/lib.rs | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fdfbc60..ad92170 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,10 +28,7 @@ //! # }) //! ``` -#![cfg_attr( - feature = "nightly", - feature(type_alias_impl_trait, io_error_more) -)] +#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait, io_error_more))] #![forbid(unsafe_code)] use std::future::Future; @@ -85,8 +82,9 @@ mod inner { pub fn send_request( request: &RequestBuilder, offset: u64, + size: Option, ) -> RequestStreamFuture { - let request = prepare_request(request, offset); + let request = prepare_request(request, offset, size); request.send().map(move |result| { result .and_then(|response| response.error_for_status()) @@ -119,8 +117,9 @@ mod inner { pub fn send_request( request: &RequestBuilder, offset: u64, + size: Option, ) -> RequestStreamFuture { - let request = prepare_request(request, offset); + let request = prepare_request(request, offset, size); Box::pin(request.send().map(move |result| { result .and_then(|response| response.error_for_status()) @@ -138,11 +137,25 @@ mod inner { } } -fn prepare_request(request: &RequestBuilder, offset: u64) -> RequestBuilder { +fn prepare_request( + request: &RequestBuilder, + offset: u64, + size: Option, +) -> RequestBuilder { + let range = match size { + Some(size) if size > 0 => { + let end = size - 1; + format!("bytes={offset}-{end}") + } + _ => { + format!("bytes={offset}-") + } + }; + let request = request .try_clone() .expect("request contains streaming body"); - request.header(reqwest::header::RANGE, format!("bytes={offset}-")) + request.header(reqwest::header::RANGE, range) } fn process_response( @@ -475,7 +488,11 @@ impl /// Drive the state from `State::Initial` to `State::Pending`. fn drive_initial(&mut self) { if let State::Initial = self.state { - let future = Box::pin(send_request(self.request, *self.position)); + let future = Box::pin(send_request( + self.request, + *self.position, + *self.size_, + )); *self.state = State::Pending(future); } } From 5faed76336ae5d6856e69edef0220118bfc7e9a9 Mon Sep 17 00:00:00 2001 From: Roland Rauch Date: Tue, 20 Aug 2024 17:53:18 +0200 Subject: [PATCH 5/7] Feat: Added support for `futures_util::io::{AsyncRead, AsyncSeek}` in addition to the existing `tokio::io::{AsyncRead, AsyncSeek}` support. The feature flags `tokio_io` and `futures_io` can be used to select which one should be in use. This is either/or. Enabling both or neither leads to a descriptive compilation error. --- Cargo.toml | 6 +- README.md | 1 + build.rs | 7 ++ src/lib.rs | 251 ++++++++++++++++++++++++++++++++++++++++++----------- 4 files changed, 211 insertions(+), 54 deletions(-) create mode 100644 build.rs diff --git a/Cargo.toml b/Cargo.toml index a72e2d1..ba84966 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,14 @@ keywords = ["http", "request", "async"] bytes = "1.1.0" pin-project = "1.0.10" futures-util = "0.3.21" -tokio = { version = "1.16.1", default-features = false } -tokio-util = { version = "0.7.0", default-features = false, features = ["io"] } +tokio = { version = "1.16.1", default-features = false, optional = true } +tokio-util = { version = "0.7.0", default-features = false, features = ["io"], optional = true } reqwest = { version = "0.11.9", default-features = false, features = ["stream"] } [features] nightly = [] +tokio_io = ["dep:tokio", "dep:tokio-util"] +futures_io = ["futures-util/io"] [dev-dependencies] paste = "1.0.6" diff --git a/README.md b/README.md index ea53aeb..086bf97 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Use web resources like regular async files. * No unsafe code (`#[forbid(unsafe_code)]`) * Tested; code coverage: 100% + * Supports either `futures_util::io::{AsyncRead, AsyncSeek}` or `tokio::io::{AsyncRead, AsyncSeek}` (via feature flags) ## Example diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..ee6b2d9 --- /dev/null +++ b/build.rs @@ -0,0 +1,7 @@ +fn main() { + #[cfg(all(feature = "tokio_io", feature = "futures_io"))] + compile_error!("feature \"tokio_io\" and feature \"futures_io\" cannot be enabled at the same time"); + + #[cfg(not(any(feature = "tokio_io", feature = "futures_io")))] + compile_error!("either feature \"tokio_io\" or feature \"futures_io\" needs to be enabled"); +} diff --git a/src/lib.rs b/src/lib.rs index ad92170..a28d0e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ //! //! * No unsafe code (`#[forbid(unsafe_code)]`) //! * Tested; code coverage: 100% +//! * Supports either `futures_util::io::{AsyncRead, AsyncSeek}` or `tokio::io::{AsyncRead, AsyncSeek}` (via feature flags) //! //! # Examples //! @@ -27,7 +28,6 @@ //! assert_eq!(&buffer, b"world"); //! # }) //! ``` - #![cfg_attr(feature = "nightly", feature(type_alias_impl_trait, io_error_more))] #![forbid(unsafe_code)] @@ -38,10 +38,20 @@ use std::task::{Context, Poll}; use crate::inner::{send_request, RequestStream, RequestStreamFuture}; use bytes::Bytes; -use futures_util::{FutureExt, StreamExt}; +use futures_util::{ready, FutureExt, StreamExt}; use pin_project::pin_project; use reqwest::{RequestBuilder, StatusCode}; -use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; + +#[cfg(feature = "futures_io")] +use futures_util::io::{AsyncRead, AsyncSeek, AsyncSeekExt}; +#[cfg(feature = "futures_io")] +use futures_util::stream::IntoAsyncRead; +#[cfg(feature = "futures_io")] +use futures_util::TryStreamExt; + +#[cfg(feature = "tokio_io")] +use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, ReadBuf}; +#[cfg(feature = "tokio_io")] use tokio_util::io::StreamReader; fn to_io_error(e: impl std::error::Error + Send + Sync + 'static) -> IoError { @@ -213,30 +223,81 @@ fn determine_size(offset: u64, response: &reqwest::Response) -> Option { } } +#[cfg(feature = "tokio_io")] +fn fastforward( + reader: Pin<&mut R>, + delta: u64, + context: &mut Context<'_>, +) -> (u64, u64, Poll>) { + fastforward_impl::( + reader, + delta, + context, + |reader, context, buffer| { + let mut array = [std::mem::MaybeUninit::uninit(); BUFFER]; + let buffer_size = buffer.len(); + let mut read_buf = ReadBuf::uninit(&mut array[0..buffer_size]); + match reader.poll_read(context, &mut read_buf) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + }, + ) +} + +#[cfg(feature = "futures_io")] +fn fastforward( + reader: Pin<&mut R>, + delta: u64, + context: &mut Context<'_>, +) -> (u64, u64, Poll>) { + fastforward_impl::( + reader, + delta, + context, + |reader, context, buffer| match reader.poll_read(context, buffer) { + Poll::Ready(Ok(read)) => Poll::Ready(Ok(read)), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + }, + ) +} + /// Try to read `delta` bytes from a stream, /// returning how many bytes were read and remain to be read. /// /// Return `Ok` if the fastforward is complete, /// or if EOF is reached (at the first empty read). -fn fastforward( +fn fastforward_impl( mut reader: Pin<&mut R>, delta: u64, context: &mut Context<'_>, -) -> (u64, u64, Poll>) { - let mut array = [std::mem::MaybeUninit::uninit(); BUFFER]; + poll_read_fn: F, +) -> (u64, u64, Poll>) +where + R: Unpin, + F: FnMut( + Pin<&mut R>, + &mut Context<'_>, + &mut [u8], + ) -> Poll>, +{ + let mut buffer = vec![0u8; BUFFER]; let mut remaining = delta; + let mut poll_read_fn = poll_read_fn; + let poll = loop { assert!(remaining > 0); let buffer_size = (remaining as usize).min(BUFFER); - let mut buffer = ReadBuf::uninit(&mut array[0..buffer_size]); - match reader.as_mut().poll_read(context, &mut buffer) { - Poll::Ready(Ok(())) => { - let read = buffer.filled().len() as u64; + let buffer = &mut buffer[..buffer_size]; + match poll_read_fn(reader.as_mut(), context, buffer) { + Poll::Ready(Ok(read)) => { if read == 0 { // reached EOF break Poll::Ready(Ok(())); } else { - remaining = remaining.checked_sub(read).unwrap(); + remaining = remaining.checked_sub(read as u64).unwrap(); if remaining == 0 { break Poll::Ready(Ok(())); } else { @@ -244,7 +305,8 @@ fn fastforward( } } } - other => break other, + Poll::Ready(Err(e)) => break Poll::Ready(Err(e)), + Poll::Pending => break Poll::Pending, } }; let read = delta.checked_sub(remaining).unwrap(); @@ -283,7 +345,7 @@ enum State { /// /// If the webserver does not support range requests, /// seeking to anything other than the start of the file -/// will return a [`NotSeekable`] error +/// will either return a [`NotSeekable`] or [`Unsupported`] error /// (excluding [fast-forwards](#fast-forward)). /// Note that the `Accept-Ranges` response header is not /// used to check range request support before sending one. @@ -312,7 +374,7 @@ enum State { /// # Reading /// /// Reads are implemented by wrapping the response body -/// stream in [`StreamReader`]. +/// stream in either [`StreamReader`] or [`IntoAsyncRead`]. /// /// # Seeking /// @@ -360,11 +422,17 @@ pub struct RequestFile< /// The request template. request: RequestBuilder, /// The state of the HTTP request. + #[cfg(feature = "tokio_io")] state: State>, + #[cfg(feature = "futures_io")] + state: State>, /// Track the size of the response body. size_: Option, /// Track the current position in the response body. position: u64, + /// Track the current seek operation + #[cfg(feature = "futures_io")] + seek_position: Option, } impl @@ -419,6 +487,8 @@ impl state: State::Initial, size_: size.into(), position: 0, + #[cfg(feature = "futures_io")] + seek_position: None, } } @@ -439,7 +509,6 @@ impl /// Perform the HTTP request so the response is ready to be read. pub async fn prepare(&mut self) -> Result<(), IoError> { - use tokio::io::AsyncSeekExt; self.seek(SeekFrom::Current(0)).await.map(|_| ()) } } @@ -505,7 +574,16 @@ impl if let State::Pending(future) = self.state { future.as_mut().poll(context).map_ok(move |(size, stream)| { *self.size_ = self.size_.or(size); - *self.state = State::Ready(Box::pin(StreamReader::new(stream))); + #[cfg(feature = "tokio_io")] + { + *self.state = + State::Ready(Box::pin(StreamReader::new(stream))); + } + #[cfg(feature = "futures_io")] + { + *self.state = + State::Ready(Box::pin(stream.into_async_read())); + } }) } else { Poll::Ready(Ok(())) @@ -550,8 +628,6 @@ impl &'a mut self, context: &mut Context<'_>, ) -> Poll> { - use futures_util::ready; - self.drive_initial(); assert!(!matches!(self.state, State::Initial)); @@ -567,8 +643,31 @@ impl assert!(matches!(self.state, State::Ready(_))); Poll::Ready(Ok(())) } + + fn prepare_seek(&mut self, final_position: u64) { + let initial_position = *self.position; + if initial_position != final_position { + let delta_forward = final_position.saturating_sub(initial_position); + if 0 < delta_forward && delta_forward <= FF_WINDOW { + // seeking forwards by a small leap + if let State::Ready(reader) = + std::mem::replace(self.state, State::Transient) + { + *self.state = State::Seeking(reader, delta_forward); + } else { + *self.position = final_position; + *self.state = State::Initial; + } + } else { + // seeking backwards or a large leap forwards + *self.position = final_position; + *self.state = State::Initial; + } + } + } } +#[cfg(feature = "tokio_io")] impl AsyncRead for RequestFile { @@ -594,6 +693,7 @@ impl AsyncRead } } +#[cfg(feature = "tokio_io")] impl AsyncSeek for RequestFile { @@ -601,27 +701,9 @@ impl AsyncSeek self: Pin<&mut Self>, position: SeekFrom, ) -> Result<(), IoError> { - let this = self.project(); - let initial_position = *this.position; - let final_position = this.resolve_seek_position(position)?; - if initial_position != final_position { - let delta_forward = final_position.saturating_sub(initial_position); - if 0 < delta_forward && delta_forward <= FF_WINDOW { - // seeking forwards by a small leap - if let State::Ready(reader) = - std::mem::replace(this.state, State::Transient) - { - *this.state = State::Seeking(reader, delta_forward); - } else { - *this.position = final_position; - *this.state = State::Initial; - } - } else { - // seeking backwards or a large leap forwards - *this.position = final_position; - *this.state = State::Initial; - } - } + let mut this = self.project(); + let absolute_position = this.resolve_seek_position(position)?; + this.prepare_seek(absolute_position); Ok(()) } @@ -634,11 +716,72 @@ impl AsyncSeek } } +#[cfg(feature = "futures_io")] +impl AsyncRead + for RequestFile +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let mut this = self.project(); + let reader = match this.poll_drive(cx) { + Poll::Ready(Ok(())) => match this.state { + State::Ready(reader) => reader.as_mut(), + _ => unreachable!(), + }, + Poll::Pending => { + return Poll::Pending; + } + Poll::Ready(Err(err)) => { + return Poll::Ready(Err(err)); + } + }; + reader.poll_read(cx, buf).map_ok(|read| { + *this.position = this.position.saturating_add(read as u64); + read + }) + } +} + +#[cfg(feature = "futures_io")] +impl AsyncSeek + for RequestFile +{ + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + let mut this = self.project(); + if this.seek_position != &Some(pos) { + // Ensure previous seeks have finished before starting a new one + ready!(this.poll_drive(cx))?; + let absolute_position = match this.resolve_seek_position(pos) { + Ok(pos) => pos, + Err(e) => return Poll::Ready(Err(e)), + }; + *this.seek_position = Some(pos); + this.prepare_seek(absolute_position); + } + let result = this.poll_drive(cx).map_ok(|_| *this.position); + if let Poll::Ready(_) = &result { + *this.seek_position = None; + } + result + } +} + #[cfg(test)] mod tests { use super::RequestFile; use std::io::SeekFrom; - use tokio::io::{AsyncReadExt, AsyncSeekExt}; + #[cfg(feature = "tokio_io")] + use tokio::io::{AsyncReadExt, AsyncSeek, AsyncSeekExt}; + + #[cfg(feature = "futures_io")] + use futures_util::io::{AsyncReadExt, AsyncSeekExt}; #[derive(Debug, serde::Deserialize)] pub struct QueryParams { @@ -976,19 +1119,23 @@ mod tests { let client = reqwest::Client::new(); let request = client.get(format!("{url}/?data=abc")); let file: RequestFile = RequestFile::new(request); - - // NOTE: We cannot use AsyncReadExt.seek() here - // since that does a `poll_complete()` before `start_seek()` - // which leaves the file in the `Ready` state. - use futures_util::future::poll_fn; - use tokio::io::AsyncSeek; + let pos = SeekFrom::Start(4); tokio::pin!(file); - file.as_mut() - .start_seek(SeekFrom::Start(4)) - .expect("start seek error"); - let pos = poll_fn(|context| file.as_mut().poll_complete(context)) - .await - .expect("complete seek error"); + #[cfg(feature = "tokio_io")] + let pos = { + // NOTE: We cannot use AsyncReadExt.seek() here + // since that does a `poll_complete()` before `start_seek()` + // which leaves the file in the `Ready` state. + use futures_util::future::poll_fn; + + file.as_mut().start_seek(pos).expect("start seek error"); + poll_fn(|context| file.as_mut().poll_complete(context)) + .await + .expect("complete seek error") + }; + + #[cfg(feature = "futures_io")] + let pos = { file.seek(pos).await.expect("seek error") }; assert_eq!(pos, 4); } From 6b1401413248e5a4eb9403ef6e9a5b95f94a6cc6 Mon Sep 17 00:00:00 2001 From: Roland Rauch Date: Tue, 20 Aug 2024 18:45:46 +0200 Subject: [PATCH 6/7] Chore: Updated dependencies --- Cargo.toml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ba84966..17e96ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,12 @@ readme = "README.md" keywords = ["http", "request", "async"] [dependencies] -bytes = "1.1.0" -pin-project = "1.0.10" -futures-util = "0.3.21" -tokio = { version = "1.16.1", default-features = false, optional = true } -tokio-util = { version = "0.7.0", default-features = false, features = ["io"], optional = true } -reqwest = { version = "0.11.9", default-features = false, features = ["stream"] } +bytes = "1.7" +pin-project = "1.1" +futures-util = "0.3" +tokio = { version = "1.39", default-features = false, optional = true } +tokio-util = { version = "0.7", default-features = false, features = ["io", "io-util"], optional = true } +reqwest = { version = "0.12", default-features = false, features = ["stream"] } [features] nightly = [] @@ -25,8 +25,8 @@ tokio_io = ["dep:tokio", "dep:tokio-util"] futures_io = ["futures-util/io"] [dev-dependencies] -paste = "1.0.6" -serde = { version = "1.0.136", default-features = false, features = ["derive"] } -tokio = { version = "1.16.1", features = ["rt", "macros"] } -tokio-test = "0.4.2" -axum = { version = "0.5.16", default-features = false, features = ["headers", "query"] } +paste = "1.0" +serde = { version = "1.0", default-features = false, features = ["derive"] } +tokio = { version = "1.39", features = ["rt", "macros"] } +tokio-test = "0.4" +axum = { version = "0.5", default-features = false, features = ["http1", "headers", "query"] } From 3df2a97267a5461d65f7445ed7d5e93c65cd7f6f Mon Sep 17 00:00:00 2001 From: Roland Rauch Date: Tue, 20 Aug 2024 18:48:50 +0200 Subject: [PATCH 7/7] Bumped version number to 0.3.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 17e96ea..a8c80c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "reqwest-file" -version = "0.2.1" +version = "0.3.0" edition = "2021" license = "MIT OR Apache-2.0" authors = ["Alexander van Ratingen"]