Skip to content

Commit

Permalink
fix: use retry loop when loading deletion files (#2899)
Browse files Browse the repository at this point in the history
When loading deletion files we were directly accessing
`ObjectStore::inner` which bypasses the retry loop. This makes loading
deletion files somewhat brittle and was causing issues in scans (which
can load a lot of deletion files at the same time they are loading lots
of other ranges).

This PR changes the deletion file load to use the `ObjectStore` and adds
convenience methods to make this easier in the future.

In the future we should also move this load to use a `FileScheduler`
instead of `ObjectStore` so that these deletion file loads do not
violate our max-iops-per-scan but that can be done later.
  • Loading branch information
westonpace authored Sep 18, 2024
1 parent 31b76fd commit 067c0ae
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 44 deletions.
18 changes: 17 additions & 1 deletion rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! Optimized local I/Os

use std::fs::File;
use std::io::{ErrorKind, SeekFrom};
use std::io::{ErrorKind, Read, SeekFrom};
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -167,6 +167,22 @@ impl Reader for LocalObjectReader {
source: err.into(),
})
}

/// Reads the entire file.
#[instrument(level = "debug", skip(self))]
async fn get_all(&self) -> object_store::Result<Bytes> {
let mut file = self.file.clone();
tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
file.read_to_end(buf.as_mut())?;
Ok(Bytes::from(buf))
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
}
}

#[cfg(windows)]
Expand Down
100 changes: 61 additions & 39 deletions rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bytes::Bytes;
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, GetOptions, ObjectStore};
use object_store::{path::Path, GetOptions, GetResult, ObjectStore, Result as OSResult};
use tokio::sync::OnceCell;
use tracing::instrument;

Expand Down Expand Up @@ -61,8 +61,8 @@ impl CloudObjectReader {
// of the response body. Thus we add an outer retry loop here.
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> object_store::Result<O> {
f: impl Fn() -> BoxFuture<'a, OSResult<O>>,
) -> OSResult<O> {
let mut retries = 3;
loop {
match f().await {
Expand All @@ -76,6 +76,40 @@ impl CloudObjectReader {
}
}
}

// We have a separate retry loop here. This is because object_store does not
// attempt retries on downloads that fail during streaming of the response body.
//
// However, this failure is pretty common (e.g. timeout) and we want to retry in these
// situations. In addition, we provide additional logging information in these
// failures cases.
async fn do_get_with_outer_retry<'a>(
&self,
f: impl Fn() -> BoxFuture<'a, OSResult<GetResult>> + Copy,
desc: impl Fn() -> String,
) -> OSResult<Bytes> {
let mut retries = self.download_retry_count;
loop {
let get_result = self.do_with_retry(f).await?;
match get_result.bytes().await {
Ok(bytes) => return Ok(bytes),
Err(err) => {
if retries == 0 {
log::warn!("Failed to download {} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", desc(), self.path, self.download_retry_count, err);
return Err(err);
}
log::debug!(
"Retrying {} from {} (remaining retries: {}). Error details: {:?}",
desc(),
self.path,
retries,
err
);
retries -= 1;
}
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -106,41 +140,29 @@ impl Reader for CloudObjectReader {
}

#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
// We have a separate retry loop here. This is because object_store does not
// attempt retries on downloads that fail during streaming of the response body.
//
// However, this failure is pretty common (e.g. timeout) and we want to retry in these
// situations. In addition, we provide additional logging information in these
// failures cases.
let mut retries = self.download_retry_count;
loop {
let get_result = self
.do_with_retry(|| {
let options = GetOptions {
range: Some(range.clone().into()),
..Default::default()
};
self.object_store.get_opts(&self.path, options)
})
.await?;
match get_result.bytes().await {
Ok(bytes) => return Ok(bytes),
Err(err) => {
if retries == 0 {
log::warn!("Failed to download range {:?} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", range, self.path, self.download_retry_count, err);
return Err(err);
}
log::debug!(
"Retrying range {:?} from {} (remaining retries: {}). Error details: {:?}",
range,
self.path,
retries,
err
);
retries -= 1;
}
}
}
async fn get_range(&self, range: Range<usize>) -> OSResult<Bytes> {
self.do_get_with_outer_retry(
|| {
let options = GetOptions {
range: Some(range.clone().into()),
..Default::default()
};
self.object_store.get_opts(&self.path, options)
},
|| format!("range {:?}", range),
)
.await
}

#[instrument(level = "debug", skip_all)]
async fn get_all(&self) -> OSResult<Bytes> {
self.do_get_with_outer_retry(
|| {
self.object_store
.get_opts(&self.path, GetOptions::default())
},
|| "read_all".to_string(),
)
.await
}
}
37 changes: 37 additions & 0 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Extend [object_store::ObjectStore] functionalities

use std::collections::HashMap;
use std::ops::Range;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -12,6 +13,7 @@ use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_credential_types::provider::ProvideCredentials;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use deepsize::DeepSizeOf;
use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -649,6 +651,21 @@ impl ObjectStore {
pub async fn size(&self, path: &Path) -> Result<usize> {
Ok(self.inner.head(path).await?.size)
}

/// Convenience function to open a reader and read all the bytes
pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
let reader = self.open(path).await?;
Ok(reader.get_all().await?)
}

/// Convenience function open a reader and make a single request
///
/// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
/// and then call [`Reader::get_range`] multiple times.
pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
let reader = self.open(path).await?;
Ok(reader.get_range(range).await?)
}
}

/// Options that can be set for multiple object stores
Expand Down Expand Up @@ -1301,6 +1318,26 @@ mod tests {
assert_eq!(buf.as_bytes(), b"LOCAL");
}

#[tokio::test]
async fn test_read_one() {
let temp_dir = tempfile::tempdir().unwrap();

let file_path = temp_dir.path().join("test_file");
let mut writer = ObjectStore::create_local_writer(file_path.as_path())
.await
.unwrap();
writer.write_all(b"LOCAL").await.unwrap();
writer.shutdown().await.unwrap();

let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
let obj_store = ObjectStore::local();
let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
assert_eq!(buf.as_bytes(), b"LOCAL");

let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
assert_eq!(buf.as_bytes(), b"LOCAL");
}

#[tokio::test]
#[cfg(windows)]
async fn test_windows_paths() {
Expand Down
6 changes: 6 additions & 0 deletions rust/lance-io/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
///
/// TODO: change to read_at()?
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes>;

/// Read all bytes from the object.
///
/// By default this reads the size in a separate IOP but some implementations
/// may not need the size beforehand.
async fn get_all(&self) -> object_store::Result<Bytes>;
}
10 changes: 6 additions & 4 deletions rust/lance-table/src/io/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use object_store::path::Path;
use rand::Rng;
use roaring::bitmap::RoaringBitmap;
use snafu::{location, Location, ResultExt};
use tracing::instrument;

use crate::format::{DeletionFile, DeletionFileType, Fragment};

Expand Down Expand Up @@ -87,7 +88,7 @@ pub async fn write_deletion_file(
// Drop writer so out is no longer borrowed.
}

object_store.inner.put(&path, out.into()).await?;
object_store.put(&path, &out).await?;

Ok(Some(deletion_file))
}
Expand All @@ -104,7 +105,7 @@ pub async fn write_deletion_file(
let mut out: Vec<u8> = Vec::new();
bitmap.serialize_into(&mut out)?;

object_store.inner.put(&path, out.into()).await?;
object_store.put(&path, &out).await?;

Ok(Some(deletion_file))
}
Expand All @@ -116,6 +117,7 @@ pub async fn write_deletion_file(
/// Returns the deletion vector if one was present. Otherwise returns `Ok(None)`.
///
/// Will return an error if the file is present but invalid.
#[instrument(level = "debug", skip_all)]
pub async fn read_deletion_file(
base: &Path,
fragment: &Fragment,
Expand All @@ -129,7 +131,7 @@ pub async fn read_deletion_file(
DeletionFileType::Array => {
let path = deletion_file_path(base, fragment.id, deletion_file);

let data = object_store.inner.get(&path).await?.bytes().await?;
let data = object_store.read_one_all(&path).await?;
let data = std::io::Cursor::new(data);
let mut batches: Vec<RecordBatch> = ArrowFileReader::try_new(data, None)?
.collect::<std::result::Result<_, ArrowError>>()
Expand Down Expand Up @@ -183,7 +185,7 @@ pub async fn read_deletion_file(
DeletionFileType::Bitmap => {
let path = deletion_file_path(base, fragment.id, deletion_file);

let data = object_store.inner.get(&path).await?.bytes().await?;
let data = object_store.read_one_all(&path).await?;
let reader = data.reader();
let bitmap = RoaringBitmap::deserialize_from(reader)
.map_err(box_error)
Expand Down

0 comments on commit 067c0ae

Please sign in to comment.