Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Implement read and read_into for Reader #4467

Merged
merged 8 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/ofs/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,9 @@ impl PathFilesystem for Fuse {

self.set_opened_file_offset(FileKey::try_from(fh)?, path, offset + data.len() as u64)?;

Ok(ReplyData { data: data.into() })
Ok(ReplyData {
data: data.to_bytes(),
})
}

async fn write(
Expand Down
3 changes: 2 additions & 1 deletion bindings/c/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use ::opendal as core;
use opendal::Buffer;

use crate::types::opendal_bytes;

Expand Down Expand Up @@ -96,7 +97,7 @@ impl opendal_error {
/// free this error.
pub fn new(err: core::Error) -> *mut opendal_error {
let code = opendal_code::from(err.kind());
let message = opendal_bytes::new(err.to_string().into_bytes());
let message = opendal_bytes::new(Buffer::from(err.to_string()));

Box::into_raw(Box::new(opendal_error { code, message }))
}
Expand Down
4 changes: 3 additions & 1 deletion bindings/c/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use opendal::Buffer;
use std::collections::HashMap;
use std::os::raw::c_char;

Expand All @@ -35,7 +36,8 @@ pub struct opendal_bytes {

impl opendal_bytes {
/// Construct a [`opendal_bytes`] from the Rust [`Vec`] of bytes
pub(crate) fn new(vec: Vec<u8>) -> Self {
pub(crate) fn new(buf: Buffer) -> Self {
let vec = buf.to_vec();
let data = vec.as_ptr();
let len = vec.len();
std::mem::forget(vec);
Expand Down
4 changes: 2 additions & 2 deletions bindings/java/src/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_read(

fn intern_read(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) -> Result<jbyteArray> {
let path = jstring_to_string(env, &path)?;
let content = op.read(&path)?;
let result = env.byte_array_from_slice(content.as_slice())?;
let content = op.read(&path)?.to_bytes();
let result = env.byte_array_from_slice(&content)?;
Ok(result.into_raw())
}

Expand Down
4 changes: 2 additions & 2 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ fn intern_read(
}

async fn do_read<'local>(op: &mut Operator, path: String) -> Result<JObject<'local>> {
let content = op.read(&path).await?;
let content = op.read(&path).await?.to_bytes();

let env = unsafe { get_current_env() };
let result = env.byte_array_from_slice(content.as_slice())?;
let result = env.byte_array_from_slice(&content)?;
Ok(result.into())
}

Expand Down
18 changes: 14 additions & 4 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ impl Operator {
/// ```
#[napi]
pub async fn read(&self, path: String) -> Result<Buffer> {
let res = self.0.read(&path).await.map_err(format_napi_error)?;
let res = self
.0
.read(&path)
.await
.map_err(format_napi_error)?
.to_vec();
Ok(res.into())
}

Expand All @@ -198,7 +203,12 @@ impl Operator {
/// ```
#[napi]
pub fn read_sync(&self, path: String) -> Result<Buffer> {
let res = self.0.blocking().read(&path).map_err(format_napi_error)?;
let res = self
.0
.blocking()
.read(&path)
.map_err(format_napi_error)?
.to_vec();
Ok(res.into())
}

Expand Down Expand Up @@ -679,8 +689,8 @@ impl Reader {
/// Read bytes from this reader into given buffer.
#[napi]
pub async unsafe fn read(&mut self, mut buf: Buffer) -> Result<usize> {
let mut buf = buf.as_mut();
let n = self.inner.read(&mut buf).await.map_err(format_napi_error)?;
let buf = buf.as_mut();
let n = self.inner.read(buf).await.map_err(format_napi_error)?;
Ok(n)
}
}
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Operator {

/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p PyAny> {
let buffer = self.0.read(path).map_err(format_pyerr)?;
let buffer = self.0.read(path).map_err(format_pyerr)?.to_vec();
Buffer::new(buffer).into_bytes_ref(py)
}

Expand Down Expand Up @@ -262,7 +262,7 @@ impl AsyncOperator {
pub fn read<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let res: Vec<u8> = this.read(&path).await.map_err(format_pyerr)?;
let res: Vec<u8> = this.read(&path).await.map_err(format_pyerr)?.to_vec();
Python::with_gil(|py| Buffer::new(res).into_bytes(py))
})
}
Expand Down
5 changes: 1 addition & 4 deletions core/benches/ops/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,8 @@ fn bench_read_parallel(c: &mut Criterion, name: &str, op: Operator) {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let futures = (0..parallel)
.map(|_| async {
let mut buf = Vec::with_capacity(*buf_size);
let r = op.reader_with(path).await.unwrap();
r.read_range(&mut buf, offset..=offset + size.bytes() as u64)
.await
.unwrap();
let _ = r.read(offset..offset + *buf_size as u64).await.unwrap();

let mut d = 0;
// mock same little cpu work
Expand Down
6 changes: 2 additions & 4 deletions core/benches/vs_s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu
group.bench_function("opendal_s3_reader", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let r = op.reader("file").await.unwrap();
let mut bs = Vec::new();
let _ = r.read_to_end(&mut bs).await.unwrap();
let _ = r.read(..).await.unwrap();
});
});
group.bench_function("aws_s3_sdk_into_async_read", |b| {
Expand All @@ -97,8 +96,7 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu
group.bench_function("opendal_s3_reader_with_capacity", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let r = op.reader("file").await.unwrap();
let mut bs = Vec::with_capacity(16 * 1024 * 1024);
let _ = r.read_to_end(&mut bs).await.unwrap();
let _ = r.read(..16 * 1024 * 1024).await.unwrap();
});
});
group.bench_function("aws_s3_sdk_into_async_read_with_capacity", |b| {
Expand Down
2 changes: 1 addition & 1 deletion core/edge/s3_read_on_wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn hello_world() -> String {
)
.await
.unwrap();
let bs = op.read("test").await.unwrap();
let bs = op.read("test").await.unwrap().to_bytes();
String::from_utf8_lossy(&bs).to_string()
}

Expand Down
2 changes: 1 addition & 1 deletion core/fuzz/fuzz_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) -> Result<()> {

writer.close().await?;

let result = op.read(&path).await?;
let result = op.read(&path).await?.to_bytes();

checker.check(&result);

Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ mod tests {
let r = op.reader("retryable_error").await.unwrap();
let mut content = Vec::new();
let size = r
.read_to_end(&mut content)
.read_into(&mut content, ..)
.await
.expect("read must succeed");
assert_eq!(size, 13);
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ mod tests {

let reader = op.reader("test").await.unwrap();

let res = reader.read(&mut Vec::default(), 0, 4).await;
let res = reader.read(0..4).await;
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err.kind(), ErrorKind::Unexpected);
Expand Down
14 changes: 6 additions & 8 deletions core/src/raw/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,11 @@ impl ReadChecker {
for action in actions {
match *action {
ReadAction::Read(offset, size) => {
let mut bs = Vec::with_capacity(size);
let n = r
.read(&mut bs, offset as u64, size)
let bs = r
.read(offset as u64..(offset + size) as u64)
.await
.expect("read must success");
self.check_read(offset, size, &bs[..n]);
self.check_read(offset, size, bs.to_bytes().as_ref());
}
}
}
Expand All @@ -122,11 +121,10 @@ impl ReadChecker {
for action in actions {
match *action {
ReadAction::Read(offset, size) => {
let mut bs = Vec::with_capacity(size);
let n = r
.read(&mut bs, offset as u64, size)
let bs = r
.read(offset as u64..(offset + size) as u64)
.expect("read must success");
self.check_read(offset, size, &bs[..n]);
self.check_read(offset, size, bs.to_bytes().as_ref());
}
}
}
Expand Down
95 changes: 62 additions & 33 deletions core/src/types/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,62 @@ impl BlockingReader {
Ok(BlockingReader { inner: r })
}

/// Read from underlying storage and write data into the specified buffer, starting at
/// the given offset and up to the limit.
/// Read give range from reader into [`Buffer`].
///
/// A return value of `n` signifies that `n` bytes of data have been read into `buf`.
/// If `n < limit`, it indicates that the reader has reached EOF (End of File).
#[inline]
pub fn read(&self, buf: &mut impl BufMut, offset: u64, limit: usize) -> Result<usize> {
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
buf.put(bs);
Ok(n)
/// This operation is zero-copy, which means it keeps the [`Bytes`] returned by underlying
/// storage services without any extra copy or intensive memory allocations.
///
/// # Notes
///
/// - Buffer length smaller than range means we have reached the end of file.
pub fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
let start = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start + 1,
Bound::Unbounded => 0,
};

let end = match range.end_bound().cloned() {
Bound::Included(end) => Some(end + 1),
Bound::Excluded(end) => Some(end),
Bound::Unbounded => None,
};

// If range is empty, return Ok(0) directly.
if let Some(end) = end {
if end <= start {
return Ok(Buffer::new());
}
}

let mut bufs = Vec::new();
let mut offset = start;

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
bufs.push(bs);
if n < limit {
return Ok(bufs.into_iter().flatten().collect());
}

offset += n as u64;
if Some(offset) == end {
return Ok(bufs.into_iter().flatten().collect());
}
}
}

/// Read given range bytes of data from reader.
pub fn read_range(&self, buf: &mut impl BufMut, range: impl RangeBounds<u64>) -> Result<usize> {
///
/// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
/// [`BufMut`] doesn't have enough space.
///
/// # Notes
///
/// - Returning length smaller than range means we have reached the end of file.
pub fn read_into(&self, buf: &mut impl BufMut, range: impl RangeBounds<u64>) -> Result<usize> {
let start = match range.start_bound().cloned() {
Bound::Included(start) => start,
Bound::Excluded(start) => start + 1,
Expand All @@ -81,38 +122,26 @@ impl BlockingReader {
}

let mut offset = start;
let mut size = end.map(|end| end - start);

let mut read = 0;

loop {
let bs = self
.inner
// TODO: use service preferred io size instead.
.read_at(offset, size.unwrap_or(4 * 1024 * 1024) as usize)?;
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
read += n;
buf.put(bs);
if n == 0 {
return Ok(read);
read += n as u64;
if n < limit {
return Ok(read as _);
}

offset += n as u64;

size = size.map(|v| v - n as u64);
if size == Some(0) {
return Ok(read);
if Some(offset) == end {
return Ok(read as _);
}
}
}

/// Read all data from reader.
///
/// This API is exactly the same with `BlockingReader::read_range(buf, ..)`.
#[inline]
pub fn read_to_end(&self, buf: &mut impl BufMut) -> Result<usize> {
self.read_range(buf, ..)
}

/// Convert reader into [`FuturesIoAsyncReader`] which implements [`futures::AsyncRead`],
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
Expand Down
Loading
Loading