diff --git a/docs/read_and_write.rst b/docs/read_and_write.rst index 9ced48b83d..85eca5668b 100644 --- a/docs/read_and_write.rst +++ b/docs/read_and_write.rst @@ -700,6 +700,10 @@ These options apply to all object stores. - Description * - ``allow_http`` - Allow non-TLS, i.e. non-HTTPS connections. Default, ``False``. + * - ``download_retry_count`` + - Number of times to retry a download. Default, ``3``. This limit is applied when + the HTTP request succeeds but the response is not fully downloaded, typically due + to a violation of ``request_timeout``. * - ``allow_invalid_certificates`` - Skip certificate validation on https connections. Default, ``False``. Warning: This is insecure and should only be used for testing. diff --git a/rust/lance-io/src/object_reader.rs b/rust/lance-io/src/object_reader.rs index e943d791a6..58a6385096 100644 --- a/rust/lance-io/src/object_reader.rs +++ b/rust/lance-io/src/object_reader.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use deepsize::DeepSizeOf; use futures::future::BoxFuture; use lance_core::Result; -use object_store::{path::Path, ObjectStore}; +use object_store::{path::Path, GetOptions, ObjectStore}; use tokio::sync::OnceCell; use tracing::instrument; @@ -28,6 +28,7 @@ pub struct CloudObjectReader { size: OnceCell, block_size: usize, + download_retry_count: usize, } impl DeepSizeOf for CloudObjectReader { @@ -44,12 +45,14 @@ impl CloudObjectReader { path: Path, block_size: usize, known_size: Option, + download_retry_count: usize, ) -> Result { Ok(Self { object_store, path, size: OnceCell::new_with(known_size), block_size, + download_retry_count, }) } @@ -104,7 +107,40 @@ impl Reader for CloudObjectReader { #[instrument(level = "debug", skip(self))] async fn get_range(&self, range: Range) -> object_store::Result { - self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone())) - .await + // We have a separate retry loop here. This is because object_store does not + // attempt retries on downloads that fail during streaming of the response body. + // + // However, this failure is pretty common (e.g. timeout) and we want to retry in these + // situations. In addition, we provide additional logging information in these + // failures cases. + let mut retries = self.download_retry_count; + loop { + let get_result = self + .do_with_retry(|| { + let options = GetOptions { + range: Some(range.clone().into()), + ..Default::default() + }; + self.object_store.get_opts(&self.path, options) + }) + .await?; + match get_result.bytes().await { + Ok(bytes) => return Ok(bytes), + Err(err) => { + if retries == 0 { + log::warn!("Failed to download range {:?} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", range, self.path, self.download_retry_count, err); + return Err(err); + } + log::debug!( + "Retrying range {:?} from {} (remaining retries: {}). Error details: {:?}", + range, + self.path, + retries, + err + ); + retries -= 1; + } + } + } } } diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 67e82df8ba..dc50e2dc12 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -46,6 +46,8 @@ pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8; // Cloud disks often need many many threads to saturate the network pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64; +pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3; + #[async_trait] pub trait ObjectStoreExt { /// Returns true if the file exists. @@ -100,6 +102,8 @@ pub struct ObjectStore { /// is true for object stores, but not for local filesystems. pub list_is_lexically_ordered: bool, io_parallelism: usize, + /// Number of times to retry a failed download + download_retry_count: usize, } impl DeepSizeOf for ObjectStore { @@ -440,6 +444,7 @@ impl ObjectStore { use_constant_size_upload_parts: false, list_is_lexically_ordered: false, io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, + download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, }, Path::from_absolute_path(expanded_path.as_path())?, )) @@ -466,6 +471,7 @@ impl ObjectStore { use_constant_size_upload_parts: false, list_is_lexically_ordered: false, io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, + download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, } } @@ -478,6 +484,7 @@ impl ObjectStore { use_constant_size_upload_parts: false, list_is_lexically_ordered: true, io_parallelism: get_num_compute_intensive_cpus(), + download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, } } @@ -516,6 +523,7 @@ impl ObjectStore { path.clone(), self.block_size, None, + self.download_retry_count, )?)), } } @@ -533,6 +541,7 @@ impl ObjectStore { path.clone(), self.block_size, Some(known_size), + self.download_retry_count, )?)), } } @@ -641,6 +650,28 @@ impl ObjectStore { Ok(self.inner.head(path).await?.size) } } + +/// Options that can be set for multiple object stores +#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)] +pub enum LanceConfigKey { + /// Number of times to retry a download that fails + DownloadRetryCount, +} + +impl FromStr for LanceConfigKey { + type Err = Error; + + fn from_str(s: &str) -> std::result::Result { + match s.to_ascii_lowercase().as_str() { + "download_retry_count" => Ok(Self::DownloadRetryCount), + _ => Err(Error::InvalidInput { + source: format!("Invalid LanceConfigKey: {}", s).into(), + location: location!(), + }), + } + } +} + #[derive(Clone, Debug, Default)] pub struct StorageOptions(pub HashMap); @@ -709,6 +740,15 @@ impl StorageOptions { }) } + /// Number of times to retry a download that fails + pub fn download_retry_count(&self) -> usize { + self.0 + .iter() + .find(|(key, _)| key.to_ascii_lowercase() == "download_retry_count") + .map(|(_, value)| value.parse::().unwrap_or(3)) + .unwrap_or(3) + } + /// Subset of options relevant for azure storage pub fn as_azure_options(&self) -> HashMap { self.0 @@ -755,6 +795,7 @@ async fn configure_store( options: ObjectStoreParams, ) -> Result { let mut storage_options = StorageOptions(options.storage_options.clone().unwrap_or_default()); + let download_retry_count = storage_options.download_retry_count(); let mut url = ensure_table_uri(url)?; // Block size: On local file systems, we use 4KB block size. On cloud // object stores, we use 64KB block size. This is generally the largest @@ -813,6 +854,7 @@ async fn configure_store( use_constant_size_upload_parts, list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count, }) } "gs" => { @@ -831,6 +873,7 @@ async fn configure_store( use_constant_size_upload_parts: false, list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count, }) } "az" => { @@ -845,6 +888,7 @@ async fn configure_store( use_constant_size_upload_parts: false, list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count, }) } // we have a bypass logic to use `tokio::fs` directly to lower overhead @@ -862,6 +906,7 @@ async fn configure_store( use_constant_size_upload_parts: false, list_is_lexically_ordered: true, io_parallelism: get_num_compute_intensive_cpus(), + download_retry_count, }), unknown_scheme => { if let Some(provider) = registry.providers.get(unknown_scheme) { @@ -878,6 +923,7 @@ async fn configure_store( } impl ObjectStore { + #[allow(clippy::too_many_arguments)] pub fn new( store: Arc, location: Url, @@ -886,6 +932,7 @@ impl ObjectStore { use_constant_size_upload_parts: bool, list_is_lexically_ordered: bool, io_parallelism: usize, + download_retry_count: usize, ) -> Self { let scheme = location.scheme(); let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme)); @@ -902,6 +949,7 @@ impl ObjectStore { use_constant_size_upload_parts, list_is_lexically_ordered, io_parallelism, + download_retry_count, } } } diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 747e684e54..206383fd2a 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -660,7 +660,7 @@ mod tests { use tokio::{runtime::Handle, time::timeout}; use url::Url; - use crate::testing::MockObjectStore; + use crate::{object_store::DEFAULT_DOWNLOAD_RETRY_COUNT, testing::MockObjectStore}; use super::*; @@ -743,6 +743,7 @@ mod tests { false, false, 1, + DEFAULT_DOWNLOAD_RETRY_COUNT, )); let config = SchedulerConfig { @@ -831,6 +832,7 @@ mod tests { false, false, 1, + DEFAULT_DOWNLOAD_RETRY_COUNT, )); let config = SchedulerConfig { diff --git a/rust/lance-io/src/utils.rs b/rust/lance-io/src/utils.rs index 2c4cb0900a..37253339a5 100644 --- a/rust/lance-io/src/utils.rs +++ b/rust/lance-io/src/utils.rs @@ -183,7 +183,7 @@ mod tests { use crate::{ object_reader::CloudObjectReader, - object_store::ObjectStore, + object_store::{ObjectStore, DEFAULT_DOWNLOAD_RETRY_COUNT}, object_writer::ObjectWriter, traits::{ProtoStruct, WriteExt, Writer}, utils::read_struct, @@ -226,7 +226,9 @@ mod tests { assert_eq!(pos, 0); object_writer.shutdown().await.unwrap(); - let object_reader = CloudObjectReader::new(store.inner, path, 1024, None).unwrap(); + let object_reader = + CloudObjectReader::new(store.inner, path, 1024, None, DEFAULT_DOWNLOAD_RETRY_COUNT) + .unwrap(); let actual: BytesWrapper = read_struct(&object_reader, pos).await.unwrap(); assert_eq!(some_message, actual); } diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 5d7037b601..005ea89372 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -4,7 +4,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use lance_file::datatypes::populate_schema_dictionary; use lance_io::object_store::{ - ObjectStore, ObjectStoreParams, ObjectStoreRegistry, DEFAULT_CLOUD_IO_PARALLELISM, + ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptions, + DEFAULT_CLOUD_IO_PARALLELISM, }; use lance_table::{ format::Manifest, @@ -220,6 +221,14 @@ impl DatasetBuilder { None => commit_handler_from_url(&self.table_uri, &Some(self.options.clone())).await, }?; + let storage_options = self + .options + .storage_options + .clone() + .map(StorageOptions::new) + .unwrap_or_default(); + let download_retry_count = storage_options.download_retry_count(); + match &self.options.object_store { Some(store) => Ok(( ObjectStore::new( @@ -232,6 +241,7 @@ impl DatasetBuilder { // If user supplied an object store then we just assume it's probably // cloud-like DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count, ), Path::from(store.1.path()), commit_handler,