Skip to content

Commit

Permalink
Add ObjectStore::get_ranges (#2293) (#2336)
Browse files Browse the repository at this point in the history
* Add ObjectStore::get_ranges (#2293)

* Review feedback
  • Loading branch information
tustvold authored Aug 8, 2022
1 parent 0c828a9 commit 9a630a1
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 21 deletions.
25 changes: 24 additions & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ mod multipart;
mod util;

use crate::path::Path;
use crate::util::{collect_bytes, maybe_spawn_blocking};
use crate::util::{
coalesce_ranges, collect_bytes, maybe_spawn_blocking, OBJECT_STORE_COALESCE_DEFAULT,
};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -231,6 +233,21 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes>;

/// Return the bytes that are stored at the specified location
/// in the given byte ranges
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
coalesce_ranges(
ranges,
|range| self.get_range(location, range),
OBJECT_STORE_COALESCE_DEFAULT,
)
.await
}

/// Return the metadata for the specified location
async fn head(&self, location: &Path) -> Result<ObjectMeta>;

Expand Down Expand Up @@ -552,6 +569,12 @@ mod tests {

// Should be a non-fatal error
out_of_range_result.unwrap_err();

let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
assert_eq!(bytes, expected_data.slice(range.clone()))
}
}

let head = storage.head(&location).await.unwrap();
Expand Down
9 changes: 9 additions & 0 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_ranges(location, ranges).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.head(location).await
Expand Down
59 changes: 40 additions & 19 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,26 +322,25 @@ impl ObjectStore for LocalFileSystem {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let mut file = open_file(&path)?;
let to_read = range.end - range.start;
file.seek(SeekFrom::Start(range.start as u64))
.context(SeekSnafu { path: &path })?;

let mut buf = Vec::with_capacity(to_read);
let read = file
.take(to_read as u64)
.read_to_end(&mut buf)
.context(UnableToReadBytesSnafu { path: &path })?;

ensure!(
read == to_read,
OutOfRangeSnafu {
path: &path,
expected: to_read,
actual: read
}
);
read_range(&mut file, &path, range)
})
.await
}

Ok(buf.into())
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
let path = self.config.path_to_filesystem(location)?;
let ranges = ranges.to_vec();
maybe_spawn_blocking(move || {
// Vectored IO might be faster
let mut file = open_file(&path)?;
ranges
.into_iter()
.map(|r| read_range(&mut file, &path, r))
.collect()
})
.await
}
Expand Down Expand Up @@ -750,6 +749,28 @@ impl AsyncWrite for LocalUpload {
}
}

fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
let to_read = range.end - range.start;
file.seek(SeekFrom::Start(range.start as u64))
.context(SeekSnafu { path })?;

let mut buf = Vec::with_capacity(to_read);
let read = file
.take(to_read as u64)
.read_to_end(&mut buf)
.context(UnableToReadBytesSnafu { path })?;

ensure!(
read == to_read,
OutOfRangeSnafu {
path,
expected: to_read,
actual: read
}
);
Ok(buf.into())
}

fn open_file(path: &PathBuf) -> Result<File> {
let file = File::open(path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Expand Down
16 changes: 16 additions & 0 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,22 @@ impl ObjectStore for InMemory {
Ok(data.slice(range))
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
let data = self.get_bytes(location).await?;
ranges
.iter()
.map(|range| {
ensure!(range.end <= data.len(), OutOfRangeSnafu);
ensure!(range.start <= range.end, BadRangeSnafu);
Ok(data.slice(range.clone()))
})
.collect()
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let last_modified = Utc::now();
let bytes = self.get_bytes(location).await?;
Expand Down
30 changes: 29 additions & 1 deletion object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,30 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let config = self.config();

let sleep_duration = config.wait_delete_per_call
let sleep_duration = config.wait_get_per_call
+ config.wait_get_per_byte * (range.end - range.start) as u32;

sleep(sleep_duration).await;

self.inner.get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
let config = self.config();

let total_bytes: usize = ranges.iter().map(|range| range.end - range.start).sum();
let sleep_duration =
config.wait_get_per_call + config.wait_get_per_byte * total_bytes as u32;

sleep(sleep_duration).await;

self.inner.get_ranges(location, ranges).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
sleep(self.config().wait_put_per_call).await;
self.inner.head(location).await
Expand Down Expand Up @@ -260,11 +276,23 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.copy(from, to).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
sleep(self.config().wait_put_per_call).await;

self.inner.rename(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
sleep(self.config().wait_put_per_call).await;

self.inner.copy_if_not_exists(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
sleep(self.config().wait_put_per_call).await;

self.inner.rename_if_not_exists(from, to).await
}
}

/// Saturated `usize` to `u32` cast.
Expand Down
96 changes: 96 additions & 0 deletions object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,99 @@ where
Err(_) => f(),
}
}

/// Range requests with a gap less than or equal to this,
/// will be coalesced into a single request by [`coalesce_ranges`]
pub const OBJECT_STORE_COALESCE_DEFAULT: usize = 1024 * 1024;

/// Takes a function to fetch ranges and coalesces adjacent ranges if they are
/// less than `coalesce` bytes apart. Out of order `ranges` are not coalesced
pub async fn coalesce_ranges<F, Fut>(
ranges: &[std::ops::Range<usize>],
mut fetch: F,
coalesce: usize,
) -> Result<Vec<Bytes>>
where
F: FnMut(std::ops::Range<usize>) -> Fut,
Fut: std::future::Future<Output = Result<Bytes>>,
{
let mut ret = Vec::with_capacity(ranges.len());
let mut start_idx = 0;
let mut end_idx = 1;

while start_idx != ranges.len() {
while end_idx != ranges.len()
&& ranges[end_idx]
.start
.checked_sub(ranges[start_idx].end)
.map(|delta| delta <= coalesce)
.unwrap_or(false)
{
end_idx += 1;
}

let start = ranges[start_idx].start;
let end = ranges[end_idx - 1].end;
let bytes = fetch(start..end).await?;
for i in start_idx..end_idx {
let range = ranges[i].clone();
ret.push(bytes.slice(range.start - start..range.end - start))
}
start_idx = end_idx;
end_idx += 1;
}
Ok(ret)
}

#[cfg(test)]
mod tests {
use super::*;
use std::ops::Range;

#[tokio::test]
async fn test_coalesce_ranges() {
let do_fetch = |ranges: Vec<Range<usize>>, coalesce: usize| async move {
let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
let src: Vec<_> = (0..max).map(|x| x as u8).collect();

let mut fetches = vec![];
let coalesced = coalesce_ranges(
&ranges,
|range| {
fetches.push(range.clone());
futures::future::ready(Ok(Bytes::from(src[range].to_vec())))
},
coalesce,
)
.await
.unwrap();

assert_eq!(ranges.len(), coalesced.len());
for (range, bytes) in ranges.iter().zip(coalesced) {
assert_eq!(bytes.as_ref(), &src[range.clone()]);
}
fetches
};

let fetches = do_fetch(vec![], 0).await;
assert_eq!(fetches, vec![]);

let fetches = do_fetch(vec![0..3], 0).await;
assert_eq!(fetches, vec![0..3]);

let fetches = do_fetch(vec![0..2, 3..5], 0).await;
assert_eq!(fetches, vec![0..2, 3..5]);

let fetches = do_fetch(vec![0..1, 1..2], 0).await;
assert_eq!(fetches, vec![0..2]);

let fetches = do_fetch(vec![0..1, 2..72], 1).await;
assert_eq!(fetches, vec![0..72]);

let fetches = do_fetch(vec![0..1, 56..72, 73..75], 1).await;
assert_eq!(fetches, vec![0..1, 56..75]);

let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
assert_eq!(fetches, vec![0..1, 5..9, 2..6]);
}
}

0 comments on commit 9a630a1

Please sign in to comment.