Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a separate retry loop on object contents download #2874

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/read_and_write.rst
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ These options apply to all object stores.
- Description
* - ``allow_http``
- Allow non-TLS, i.e. non-HTTPS connections. Default, ``False``.
* - ``download_retry_count``
- Number of times to retry a download. Default, ``3``. This limit is applied when
the HTTP request succeeds but the response is not fully downloaded, typically due
to a violation of ``request_timeout``.
* - ``allow_invalid_certificates``
- Skip certificate validation on https connections. Default, ``False``.
Warning: This is insecure and should only be used for testing.
Expand Down
42 changes: 39 additions & 3 deletions rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bytes::Bytes;
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, ObjectStore};
use object_store::{path::Path, GetOptions, ObjectStore};
use tokio::sync::OnceCell;
use tracing::instrument;

Expand All @@ -28,6 +28,7 @@ pub struct CloudObjectReader {
size: OnceCell<usize>,

block_size: usize,
download_retry_count: usize,
}

impl DeepSizeOf for CloudObjectReader {
Expand All @@ -44,12 +45,14 @@ impl CloudObjectReader {
path: Path,
block_size: usize,
known_size: Option<usize>,
download_retry_count: usize,
) -> Result<Self> {
Ok(Self {
object_store,
path,
size: OnceCell::new_with(known_size),
block_size,
download_retry_count,
})
}

Expand Down Expand Up @@ -104,7 +107,40 @@ impl Reader for CloudObjectReader {

#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone()))
.await
// We have a separate retry loop here. This is because object_store does not
// attempt retries on downloads that fail during streaming of the response body.
//
// However, this failure is pretty common (e.g. timeout) and we want to retry in these
// situations. In addition, we provide additional logging information in these
// failures cases.
let mut retries = self.download_retry_count;
loop {
let get_result = self
.do_with_retry(|| {
let options = GetOptions {
range: Some(range.clone().into()),
..Default::default()
};
self.object_store.get_opts(&self.path, options)
})
.await?;
match get_result.bytes().await {
Ok(bytes) => return Ok(bytes),
Err(err) => {
if retries == 0 {
log::warn!("Failed to download range {:?} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", range, self.path, self.download_retry_count, err);
return Err(err);
}
log::debug!(
"Retrying range {:?} from {} (remaining retries: {}). Error details: {:?}",
range,
self.path,
retries,
err
);
retries -= 1;
}
}
}
}
}
48 changes: 48 additions & 0 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
// Cloud disks often need many many threads to saturate the network
pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;

pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;

#[async_trait]
pub trait ObjectStoreExt {
/// Returns true if the file exists.
Expand Down Expand Up @@ -100,6 +102,8 @@
/// is true for object stores, but not for local filesystems.
pub list_is_lexically_ordered: bool,
io_parallelism: usize,
/// Number of times to retry a failed download
download_retry_count: usize,
}

impl DeepSizeOf for ObjectStore {
Expand Down Expand Up @@ -440,6 +444,7 @@
use_constant_size_upload_parts: false,
list_is_lexically_ordered: false,
io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM,
download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
},
Path::from_absolute_path(expanded_path.as_path())?,
))
Expand All @@ -466,6 +471,7 @@
use_constant_size_upload_parts: false,
list_is_lexically_ordered: false,
io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM,
download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
}
}

Expand All @@ -478,6 +484,7 @@
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: get_num_compute_intensive_cpus(),
download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
}
}

Expand Down Expand Up @@ -516,6 +523,7 @@
path.clone(),
self.block_size,
None,
self.download_retry_count,
)?)),
}
}
Expand All @@ -533,6 +541,7 @@
path.clone(),
self.block_size,
Some(known_size),
self.download_retry_count,
)?)),
}
}
Expand Down Expand Up @@ -620,7 +629,7 @@
pub fn remove_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<Result<Path>> {

Check warning on line 632 in rust/lance-io/src/object_store.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

elided lifetime has a name
self.inner
.delete_stream(locations.err_into::<ObjectStoreError>().boxed())
.err_into::<Error>()
Expand All @@ -641,6 +650,28 @@
Ok(self.inner.head(path).await?.size)
}
}

/// Options that can be set for multiple object stores
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
pub enum LanceConfigKey {
/// Number of times to retry a download that fails
DownloadRetryCount,
}

impl FromStr for LanceConfigKey {
type Err = Error;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"download_retry_count" => Ok(Self::DownloadRetryCount),
_ => Err(Error::InvalidInput {
source: format!("Invalid LanceConfigKey: {}", s).into(),
location: location!(),
}),
}
}
}

#[derive(Clone, Debug, Default)]
pub struct StorageOptions(pub HashMap<String, String>);

Expand Down Expand Up @@ -709,6 +740,15 @@
})
}

/// Number of times to retry a download that fails
pub fn download_retry_count(&self) -> usize {
self.0
.iter()
.find(|(key, _)| key.to_ascii_lowercase() == "download_retry_count")
.map(|(_, value)| value.parse::<usize>().unwrap_or(3))
.unwrap_or(3)
}

/// Subset of options relevant for azure storage
pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
self.0
Expand Down Expand Up @@ -755,6 +795,7 @@
options: ObjectStoreParams,
) -> Result<ObjectStore> {
let mut storage_options = StorageOptions(options.storage_options.clone().unwrap_or_default());
let download_retry_count = storage_options.download_retry_count();
let mut url = ensure_table_uri(url)?;
// Block size: On local file systems, we use 4KB block size. On cloud
// object stores, we use 64KB block size. This is generally the largest
Expand Down Expand Up @@ -813,6 +854,7 @@
use_constant_size_upload_parts,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
})
}
"gs" => {
Expand All @@ -831,6 +873,7 @@
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
})
}
"az" => {
Expand All @@ -845,6 +888,7 @@
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
})
}
// we have a bypass logic to use `tokio::fs` directly to lower overhead
Expand All @@ -862,6 +906,7 @@
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: get_num_compute_intensive_cpus(),
download_retry_count,
}),
unknown_scheme => {
if let Some(provider) = registry.providers.get(unknown_scheme) {
Expand All @@ -878,6 +923,7 @@
}

impl ObjectStore {
#[allow(clippy::too_many_arguments)]
pub fn new(
store: Arc<DynObjectStore>,
location: Url,
Expand All @@ -886,6 +932,7 @@
use_constant_size_upload_parts: bool,
list_is_lexically_ordered: bool,
io_parallelism: usize,
download_retry_count: usize,
) -> Self {
let scheme = location.scheme();
let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
Expand All @@ -902,6 +949,7 @@
use_constant_size_upload_parts,
list_is_lexically_ordered,
io_parallelism,
download_retry_count,
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ mod tests {
use tokio::{runtime::Handle, time::timeout};
use url::Url;

use crate::testing::MockObjectStore;
use crate::{object_store::DEFAULT_DOWNLOAD_RETRY_COUNT, testing::MockObjectStore};

use super::*;

Expand Down Expand Up @@ -743,6 +743,7 @@ mod tests {
false,
false,
1,
DEFAULT_DOWNLOAD_RETRY_COUNT,
));

let config = SchedulerConfig {
Expand Down Expand Up @@ -831,6 +832,7 @@ mod tests {
false,
false,
1,
DEFAULT_DOWNLOAD_RETRY_COUNT,
));

let config = SchedulerConfig {
Expand Down
6 changes: 4 additions & 2 deletions rust/lance-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ mod tests {

use crate::{
object_reader::CloudObjectReader,
object_store::ObjectStore,
object_store::{ObjectStore, DEFAULT_DOWNLOAD_RETRY_COUNT},
object_writer::ObjectWriter,
traits::{ProtoStruct, WriteExt, Writer},
utils::read_struct,
Expand Down Expand Up @@ -226,7 +226,9 @@ mod tests {
assert_eq!(pos, 0);
object_writer.shutdown().await.unwrap();

let object_reader = CloudObjectReader::new(store.inner, path, 1024, None).unwrap();
let object_reader =
CloudObjectReader::new(store.inner, path, 1024, None, DEFAULT_DOWNLOAD_RETRY_COUNT)
.unwrap();
let actual: BytesWrapper = read_struct(&object_reader, pos).await.unwrap();
assert_eq!(some_message, actual);
}
Expand Down
12 changes: 11 additions & 1 deletion rust/lance/src/dataset/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration};

use lance_file::datatypes::populate_schema_dictionary;
use lance_io::object_store::{
ObjectStore, ObjectStoreParams, ObjectStoreRegistry, DEFAULT_CLOUD_IO_PARALLELISM,
ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptions,
DEFAULT_CLOUD_IO_PARALLELISM,
};
use lance_table::{
format::Manifest,
Expand Down Expand Up @@ -220,6 +221,14 @@ impl DatasetBuilder {
None => commit_handler_from_url(&self.table_uri, &Some(self.options.clone())).await,
}?;

let storage_options = self
.options
.storage_options
.clone()
.map(StorageOptions::new)
.unwrap_or_default();
let download_retry_count = storage_options.download_retry_count();

match &self.options.object_store {
Some(store) => Ok((
ObjectStore::new(
Expand All @@ -232,6 +241,7 @@ impl DatasetBuilder {
// If user supplied an object store then we just assume it's probably
// cloud-like
DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
),
Path::from(store.1.path()),
commit_handler,
Expand Down
Loading