diff --git a/docs/format.rst b/docs/format.rst index 421b73374d5..612006a3524 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -338,6 +338,25 @@ systems and cloud object stores, with the notable except of AWS S3. For ones that lack this functionality, an external locking mechanism can be configured by the user. +Manifest Naming Schemes +~~~~~~~~~~~~~~~~~~~~~~~ + +Manifest files must use a consistent naming scheme. The names correspond to the +versions. That way we can open the right version of the dataset without having +to read all the manifests. It also makes it clear which file path is the next +one to be written. + +There are two naming schemes that can be used: + +1. V1: ``_versions/{version}.manifest``. This is the legacy naming scheme. +2. V2: ``_versions/{u64::MAX - version:020}.manifest``. This is the new naming + scheme. The version is zero-padded (to 20 digits) and subtracted from + ``u64::MAX``. This allows the versions to be sorted in descending order, + making it possible to find the latest manifest on object storage using a + single list call. + +It is an error for there to be a mixture of these two naming schemes. + .. _conflict_resolution: Conflict resolution diff --git a/java/core/lance-jni/src/blocking_dataset.rs b/java/core/lance-jni/src/blocking_dataset.rs index 47515b5e336..75ec6e30d18 100644 --- a/java/core/lance-jni/src/blocking_dataset.rs +++ b/java/core/lance-jni/src/blocking_dataset.rs @@ -92,6 +92,7 @@ impl BlockingDataset { None, None, object_store_registry, + false, // TODO: support enable_v2_manifest_paths ))?; Ok(Self { inner }) } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 1bd9a630318..c7edb1598a8 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1751,6 +1751,7 @@ def commit( read_version: Optional[int] = None, commit_lock: Optional[CommitLock] = None, storage_options: Optional[Dict[str, str]] = None, + enable_v2_manifest_paths: Optional[bool] = None, ) -> LanceDataset: """Create a new version of dataset @@ -1788,6 +1789,14 @@ def commit( storage_options : optional, dict Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. + enable_v2_manifest_paths : bool, optional + If True, and this is a new dataset, uses the new V2 manifest paths. + These paths provide more efficient opening of datasets with many + versions on object stores. This parameter has no effect if the dataset + already exists. To migrate an existing dataset, instead use the + :meth:`migrate_manifest_paths_v2` method. Default is False. WARNING: + turning this on will make the dataset unreadable for older versions + of Lance (prior to 0.17.0). Returns ------- @@ -1831,6 +1840,7 @@ def commit( read_version, commit_lock, storage_options=storage_options, + enable_v2_manifest_paths=enable_v2_manifest_paths, ) return LanceDataset(base_uri, storage_options=storage_options) @@ -1843,6 +1853,20 @@ def validate(self): """ self._ds.validate() + def migrate_manifest_paths_v2(self): + """ + Migrate the manifest paths to the new format. + + This will update the manifest to use the new v2 format for paths. + + This function is idempotent, and can be run multiple times without + changing the state of the object store. + + DANGER: this should not be run while other concurrent operations are happening. + And it should also run until completion before resuming other operations. + """ + self._ds.migrate_manifest_paths_v2() + @property def optimize(self) -> "DatasetOptimizer": return DatasetOptimizer(self) @@ -2818,6 +2842,7 @@ def write_dataset( storage_options: Optional[Dict[str, str]] = None, data_storage_version: str = "legacy", use_legacy_format: Optional[bool] = None, + enable_v2_manifest_paths: bool = False, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -2865,6 +2890,12 @@ def write_dataset( use_legacy_format : optional, bool, default None Deprecated method for setting the data storage version. Use the `data_storage_version` parameter instead. + enable_v2_manifest_paths : bool, optional + If True, and this is a new dataset, uses the new V2 manifest paths. + These paths provide more efficient opening of datasets with many + versions on object stores. This parameter has no effect if the dataset + already exists. To migrate an existing dataset, instead use the + :meth:`LanceDataset.migrate_manifest_paths_v2` method. Default is False. """ if use_legacy_format is not None: warnings.warn( @@ -2897,6 +2928,7 @@ def write_dataset( "progress": progress, "storage_options": storage_options, "data_storage_version": data_storage_version, + "enable_v2_manifest_paths": enable_v2_manifest_paths, } if commit_lock: diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index a4d0341137a..b851cb83247 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -251,6 +251,30 @@ def test_asof_checkout(tmp_path: Path): assert len(ds.to_table()) == 9 +def test_v2_manifest_paths(tmp_path: Path): + lance.write_dataset( + pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=True + ) + manifest_path = os.listdir(tmp_path / "_versions") + assert len(manifest_path) == 1 + assert re.match(r"\d{20}\.manifest", manifest_path[0]) + + +def test_v2_manifest_paths_migration(tmp_path: Path): + # Create a dataset with v1 manifest paths + lance.write_dataset( + pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=False + ) + manifest_path = os.listdir(tmp_path / "_versions") + assert manifest_path == ["1.manifest"] + + # Migrate to v2 manifest paths + lance.dataset(tmp_path).migrate_manifest_paths_v2() + manifest_path = os.listdir(tmp_path / "_versions") + assert len(manifest_path) == 1 + assert re.match(r"\d{20}\.manifest", manifest_path[0]) + + def test_tag(tmp_path: Path): table = pa.Table.from_pydict({"colA": [1, 2, 3], "colB": [4, 5, 6]}) base_dir = tmp_path / "test" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 822a2b7989f..8e843b318d0 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1218,6 +1218,7 @@ impl Dataset { read_version: Option, commit_lock: Option<&PyAny>, storage_options: Option>, + enable_v2_manifest_paths: Option, ) -> PyResult { let object_store_params = storage_options @@ -1255,6 +1256,7 @@ impl Dataset { object_store_params, commit_handler, object_store_registry, + enable_v2_manifest_paths.unwrap_or(false), ) .await })? @@ -1270,6 +1272,14 @@ impl Dataset { .map_err(|err| PyIOError::new_err(err.to_string())) } + fn migrate_manifest_paths_v2(&mut self) -> PyResult<()> { + let mut new_self = self.ds.as_ref().clone(); + RT.block_on(None, new_self.migrate_manifest_paths_v2())? + .map_err(|err| PyIOError::new_err(err.to_string()))?; + self.ds = Arc::new(new_self); + Ok(()) + } + fn drop_columns(&mut self, columns: Vec<&str>) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); RT.block_on(None, new_self.drop_columns(&columns))? @@ -1464,6 +1474,12 @@ pub fn get_write_params(options: &PyDict) -> PyResult> { }); } + if let Some(enable_v2_manifest_paths) = + get_dict_opt::(options, "enable_v2_manifest_paths")? + { + p.enable_v2_manifest_paths = enable_v2_manifest_paths; + } + p.commit_handler = get_commit_handler(options); Some(p) diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index b4fd2ed2007..67e82df8ba7 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -93,7 +93,12 @@ pub struct ObjectStore { pub inner: Arc, scheme: String, block_size: usize, + /// Whether to use constant size upload parts for multipart uploads. This + /// is only necessary for Cloudflare R2. pub use_constant_size_upload_parts: bool, + /// Whether we can assume that the list of files is lexically ordered. This + /// is true for object stores, but not for local filesystems. + pub list_is_lexically_ordered: bool, io_parallelism: usize, } @@ -338,6 +343,7 @@ pub struct ObjectStoreParams { /// is false, max upload size is 2.5TB. When this is true, the max size is /// 50GB. pub use_constant_size_upload_parts: bool, + pub list_is_lexically_ordered: Option, } impl Default for ObjectStoreParams { @@ -350,6 +356,7 @@ impl Default for ObjectStoreParams { object_store_wrapper: None, storage_options: None, use_constant_size_upload_parts: false, + list_is_lexically_ordered: None, } } } @@ -431,6 +438,7 @@ impl ObjectStore { scheme: String::from(scheme), block_size: 4 * 1024, // 4KB block size use_constant_size_upload_parts: false, + list_is_lexically_ordered: false, io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, }, Path::from_absolute_path(expanded_path.as_path())?, @@ -456,6 +464,7 @@ impl ObjectStore { scheme: String::from("file"), block_size: 4 * 1024, // 4KB block size use_constant_size_upload_parts: false, + list_is_lexically_ordered: false, io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, } } @@ -467,6 +476,7 @@ impl ObjectStore { scheme: String::from("memory"), block_size: 64 * 1024, use_constant_size_upload_parts: false, + list_is_lexically_ordered: true, io_parallelism: get_num_compute_intensive_cpus(), } } @@ -801,6 +811,7 @@ async fn configure_store( scheme: String::from(url.scheme()), block_size: 64 * 1024, use_constant_size_upload_parts, + list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, }) } @@ -818,6 +829,7 @@ async fn configure_store( scheme: String::from("gs"), block_size: 64 * 1024, use_constant_size_upload_parts: false, + list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, }) } @@ -831,6 +843,7 @@ async fn configure_store( scheme: String::from("az"), block_size: 64 * 1024, use_constant_size_upload_parts: false, + list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, }) } @@ -847,6 +860,7 @@ async fn configure_store( scheme: String::from("memory"), block_size: 64 * 1024, use_constant_size_upload_parts: false, + list_is_lexically_ordered: true, io_parallelism: get_num_compute_intensive_cpus(), }), unknown_scheme => { @@ -870,6 +884,7 @@ impl ObjectStore { block_size: Option, wrapper: Option>, use_constant_size_upload_parts: bool, + list_is_lexically_ordered: bool, io_parallelism: usize, ) -> Self { let scheme = location.scheme(); @@ -885,6 +900,7 @@ impl ObjectStore { scheme: scheme.into(), block_size, use_constant_size_upload_parts, + list_is_lexically_ordered, io_parallelism, } } diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 07e5e76751b..747e684e549 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -741,6 +741,7 @@ mod tests { None, None, false, + false, 1, )); @@ -828,6 +829,7 @@ mod tests { None, None, false, + false, 1, )); diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 5d4c8ab00df..e7e3b230210 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -32,6 +32,7 @@ use futures::{ stream::BoxStream, StreamExt, TryStreamExt, }; +use log::warn; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use url::Url; @@ -60,6 +61,94 @@ use crate::format::{Index, Manifest}; const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; +/// How manifest files should be named. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ManifestNamingScheme { + /// `_versions/{version}.manifest` + V1, + /// `_manifests/{u64::MAX - version}.manifest` + /// + /// Zero-padded and reversed for O(1) lookup of latest version on object stores. + V2, +} + +impl ManifestNamingScheme { + pub fn manifest_path(&self, base: &Path, version: u64) -> Path { + let directory = base.child(VERSIONS_DIR); + match self { + Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")), + Self::V2 => { + let inverted_version = u64::MAX - version; + directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}")) + } + } + } + + pub fn parse_version(&self, filename: &str) -> Option { + let file_number = filename + .split_once('.') + .and_then(|(version_str, _)| version_str.parse::().ok()); + match self { + Self::V1 => file_number, + Self::V2 => file_number.map(|v| u64::MAX - v), + } + } + + pub fn detect_scheme(filename: &str) -> Option { + if filename.ends_with(MANIFEST_EXTENSION) { + const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len(); + if filename.len() == V2_LEN { + Some(Self::V2) + } else { + Some(Self::V1) + } + } else { + None + } + } + + pub fn detect_scheme_staging(filename: &str) -> Self { + if filename.chars().nth(20) == Some('.') { + Self::V2 + } else { + Self::V1 + } + } +} + +/// Migrate all V1 manifests to V2 naming scheme. +/// +/// This function will rename all V1 manifests to V2 naming scheme. +/// +/// This function is idempotent, and can be run multiple times without +/// changing the state of the object store. +/// +/// However, it should not be run while other concurrent operations are happening. +/// And it should also run until completion before resuming other operations. +pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> { + object_store + .inner + .list(Some(&dataset_base.child(VERSIONS_DIR))) + .try_filter(|res| { + let res = if let Some(filename) = res.location.filename() { + ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1) + } else { + false + }; + future::ready(res) + }) + .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move { + let filename = meta.location.filename().unwrap(); + let version = ManifestNamingScheme::V1.parse_version(filename).unwrap(); + let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version); + object_store.inner.rename(&meta.location, &path).await?; + Ok(()) + }) + .await?; + + Ok(()) +} + /// Function that writes the manifest to the object store. pub type ManifestWriter = for<'a> fn( object_store: &'a ObjectStore, @@ -68,12 +157,6 @@ pub type ManifestWriter = for<'a> fn( path: &'a Path, ) -> BoxFuture<'a, Result<()>>; -/// Get the manifest file path for a version. -pub fn manifest_path(base: &Path, version: u64) -> Path { - base.child(VERSIONS_DIR) - .child(format!("{version}.{MANIFEST_EXTENSION}")) -} - #[derive(Debug)] pub struct ManifestLocation { /// The version the manifest corresponds to. @@ -82,6 +165,8 @@ pub struct ManifestLocation { pub path: Path, /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`. pub size: Option, + /// Naming scheme of the manifest file. + pub naming_scheme: ManifestNamingScheme, } /// Get the latest manifest path @@ -95,45 +180,92 @@ async fn current_manifest_path( } } - // We use `list_with_delimiter` to avoid listing the contents of child directories. - let manifest_files = object_store - .inner - .list_with_delimiter(Some(&base.child(VERSIONS_DIR))) - .await?; + let manifest_files = object_store.inner.list(Some(&base.child(VERSIONS_DIR))); - let current = manifest_files - .objects - .into_iter() - .filter(|meta| { - meta.location.filename().is_some() - && meta - .location - .filename() - .unwrap() - .ends_with(MANIFEST_EXTENSION) - }) - .filter_map(|meta| { - let version = meta - .location - .filename() - .unwrap() - .split_once('.') - .and_then(|(version_str, _)| version_str.parse::().ok())?; - Some((version, meta)) - }) - .max_by_key(|(version, _)| *version); + let mut valid_manifests = manifest_files.try_filter_map(|res| { + if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap()) + { + future::ready(Ok(Some((scheme, res)))) + } else { + future::ready(Ok(None)) + } + }); + + let first = valid_manifests.next().await.transpose()?; + match (first, object_store.list_is_lexically_ordered) { + // If the first valid manifest we see is V2, we can assume that we are using + // V2 naming scheme for all manifests. + (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => { + let version = scheme + .parse_version(meta.location.filename().unwrap()) + .unwrap(); + + // Sanity check: verify at least for the first 1k files that they are all V2 + // and that the version numbers are decreasing. We use the first 1k because + // this is the typical size of an object store list endpoint response page. + for (scheme, meta) in valid_manifests.take(999).try_collect::>().await? { + if scheme != ManifestNamingScheme::V2 { + warn!( + "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \ + to migrate the directory." + ); + break; + } + let next_version = scheme + .parse_version(meta.location.filename().unwrap()) + .unwrap(); + if next_version >= version { + warn!( + "List operation was expected to be lexically ordered, but was not. This \ + could mean a corrupt read. Please make a bug report on the lancedb/lance \ + GitHub repository." + ); + break; + } + } - if let Some((version, meta)) = current { - Ok(ManifestLocation { - version, - path: meta.location, - size: Some(meta.size as u64), - }) - } else { - Err(Error::NotFound { - uri: manifest_path(base, 1).to_string(), + Ok(ManifestLocation { + version, + path: meta.location, + size: Some(meta.size as u64), + naming_scheme: scheme, + }) + } + // If the first valid manifest we see if V1, assume for now that we are + // using V1 naming scheme for all manifests. Since we are listing the + // directory anyways, we will assert there aren't any V2 manifests. + (Some((scheme, meta)), _) => { + let mut current_version = scheme + .parse_version(meta.location.filename().unwrap()) + .unwrap(); + let mut current_meta = meta; + + while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? { + if matches!(scheme, ManifestNamingScheme::V2) { + return Err(Error::Internal { + message: "Found V2 manifest in a V1 manifest directory".to_string(), + location: location!(), + }); + } + let version = scheme + .parse_version(meta.location.filename().unwrap()) + .unwrap(); + if version > current_version { + current_version = version; + current_meta = meta; + } + } + Ok(ManifestLocation { + version: current_version, + path: current_meta.location, + size: Some(current_meta.size as u64), + naming_scheme: scheme, + }) + } + (None, _) => Err(Error::NotFound { + uri: base.child(VERSIONS_DIR).to_string(), location: location!(), - }) + }), } } @@ -146,19 +278,34 @@ fn current_manifest_local(base: &Path) -> std::io::Result = None; + let mut scheme: Option = None; + for entry in entries { let entry = entry?; let filename_raw = entry.file_name(); let filename = filename_raw.to_string_lossy(); - if !filename.ends_with(MANIFEST_EXTENSION) { + + let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else { // Need to ignore temporary files, such as // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d continue; + }; + + if let Some(scheme) = scheme { + if scheme != entry_scheme { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Found multiple manifest naming schemes in the same directory: {:?} and {:?}", + scheme, entry_scheme + ), + )); + } + } else { + scheme = Some(entry_scheme); } - let Some(version) = filename - .split_once('.') - .and_then(|(version_str, _)| version_str.parse::().ok()) - else { + + let Some(version) = entry_scheme.parse_version(&filename) else { continue; }; @@ -178,6 +325,7 @@ fn current_manifest_local(base: &Path) -> std::io::Result( base_path: &Path, object_store: &'a dyn OSObjectStore, ) -> Result>> { - let base_path = base_path.clone(); Ok(object_store .read_dir_all(&base_path.child(VERSIONS_DIR), None) .await? @@ -264,13 +411,26 @@ pub trait CommitHandler: Debug + Send + Sync { } /// Get the path to a specific versioned manifest of a dataset at the base_path + /// + /// The version must already exist. async fn resolve_version( &self, base_path: &Path, version: u64, - _object_store: &dyn OSObjectStore, + object_store: &dyn OSObjectStore, ) -> std::result::Result { - Ok(manifest_path(base_path, version)) + Ok(default_resolve_version(base_path, version, object_store) + .await? + .path) + } + + async fn resolve_version_location( + &self, + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, + ) -> Result { + default_resolve_version(base_path, version, object_store).await } /// List manifests that are available for a dataset at the base_path @@ -293,9 +453,38 @@ pub trait CommitHandler: Debug + Send + Sync { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError>; } +async fn default_resolve_version( + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, +) -> Result { + // try V2, fallback to V1. + let scheme = ManifestNamingScheme::V2; + let path = scheme.manifest_path(base_path, version); + match object_store.head(&path).await { + Ok(meta) => Ok(ManifestLocation { + version, + path, + size: Some(meta.size as u64), + naming_scheme: scheme, + }), + Err(ObjectStoreError::NotFound { .. }) => { + // fallback to V1 + let scheme = ManifestNamingScheme::V1; + Ok(ManifestLocation { + version, + path: scheme.manifest_path(base_path, version), + size: None, + naming_scheme: scheme, + }) + } + Err(e) => Err(e.into()), + } +} /// Adapt an object_store credentials into AWS SDK creds #[cfg(feature = "dynamodb")] #[derive(Debug)] @@ -502,6 +691,7 @@ impl CommitHandler for UnsafeCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // Log a one-time warning if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) { @@ -512,9 +702,7 @@ impl CommitHandler for UnsafeCommitHandler { ); } - let version_path = self - .resolve_version(base_path, manifest.version, &object_store.inner) - .await?; + let version_path = naming_scheme.manifest_path(base_path, manifest.version); // Write the manifest naively manifest_writer(object_store, manifest, indices, &version_path).await?; @@ -563,10 +751,9 @@ impl CommitHandler for T { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { - let path = self - .resolve_version(base_path, manifest.version, &object_store.inner) - .await?; + let path = naming_scheme.manifest_path(base_path, manifest.version); // NOTE: once we have the lease we cannot use ? to return errors, since // we must release the lease before returning. let lease = self.lock(manifest.version).await?; @@ -607,9 +794,17 @@ impl CommitHandler for Arc { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { self.as_ref() - .commit(manifest, indices, base_path, object_store, manifest_writer) + .commit( + manifest, + indices, + base_path, + object_store, + manifest_writer, + naming_scheme, + ) .await } } @@ -628,25 +823,13 @@ impl CommitHandler for RenameCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // Create a temporary object, then use `rename_if_not_exists` to commit. // If failed, clean up the temporary object. - let path = self - .resolve_version(base_path, manifest.version, &object_store.inner) - .await?; - - // Add .tmp_ prefix to the path - let mut parts: Vec<_> = path.parts().collect(); - // Add a UUID to the end of the filename to avoid conflicts - let uuid = uuid::Uuid::new_v4(); - let new_name = format!( - ".tmp_{}_{}", - parts.last().unwrap().as_ref(), - uuid.as_hyphenated() - ); - let _ = std::mem::replace(parts.last_mut().unwrap(), new_name.into()); - let tmp_path: Path = parts.into_iter().collect(); + let path = naming_scheme.manifest_path(base_path, manifest.version); + let tmp_path = make_staging_manifest_path(&path)?; // Write the manifest to the temporary path manifest_writer(object_store, manifest, indices, &tmp_path).await?; @@ -689,3 +872,86 @@ impl Default for CommitConfig { Self { num_retries: 20 } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_manifest_naming_scheme() { + let v1 = ManifestNamingScheme::V1; + let v2 = ManifestNamingScheme::V2; + + assert_eq!( + v1.manifest_path(&Path::from("base"), 0), + Path::from("base/_versions/0.manifest") + ); + assert_eq!( + v1.manifest_path(&Path::from("base"), 42), + Path::from("base/_versions/42.manifest") + ); + + assert_eq!( + v2.manifest_path(&Path::from("base"), 0), + Path::from("base/_versions/18446744073709551615.manifest") + ); + assert_eq!( + v2.manifest_path(&Path::from("base"), 42), + Path::from("base/_versions/18446744073709551573.manifest") + ); + + assert_eq!(v1.parse_version("0.manifest"), Some(0)); + assert_eq!(v1.parse_version("42.manifest"), Some(42)); + assert_eq!( + v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"), + Some(42) + ); + + assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0)); + assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42)); + assert_eq!( + v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"), + Some(42) + ); + + assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1)); + assert_eq!( + ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"), + Some(v2) + ); + assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None); + } + + #[tokio::test] + async fn test_manifest_naming_migration() { + let object_store = ObjectStore::memory(); + let base = Path::from("base"); + let versions_dir = base.child(VERSIONS_DIR); + + // Write two v1 files and one v1 + let original_files = vec![ + versions_dir.child("irrelevant"), + ManifestNamingScheme::V1.manifest_path(&base, 0), + ManifestNamingScheme::V2.manifest_path(&base, 1), + ]; + for path in original_files { + object_store.put(&path, b"".as_slice()).await.unwrap(); + } + + migrate_scheme_to_v2(&object_store, &base).await.unwrap(); + + let expected_files = vec![ + ManifestNamingScheme::V2.manifest_path(&base, 1), + ManifestNamingScheme::V2.manifest_path(&base, 0), + versions_dir.child("irrelevant"), + ]; + let actual_files = object_store + .inner + .list(Some(&versions_dir)) + .map_ok(|res| res.location) + .try_collect::>() + .await + .unwrap(); + assert_eq!(actual_files, expected_files); + } +} diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index d205f99c1c8..0ddd9d4396c 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -15,8 +15,8 @@ use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjec use snafu::{location, Location}; use super::{ - current_manifest_path, make_staging_manifest_path, manifest_path, ManifestLocation, - MANIFEST_EXTENSION, + current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation, + ManifestNamingScheme, MANIFEST_EXTENSION, }; use crate::format::{Index, Manifest}; use crate::io::commit::{CommitError, CommitHandler, ManifestWriter}; @@ -52,12 +52,18 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { &self, base_uri: &str, ) -> Result> { - self.get_latest_version(base_uri).await.map(|res| { - res.map(|(version, uri)| ManifestLocation { - version, - path: Path::from(uri), - size: None, + self.get_latest_version(base_uri).await.and_then(|res| { + res.map(|(version, uri)| { + let path = Path::from(uri); + let naming_scheme = detect_naming_scheme_from_path(&path)?; + Ok(ManifestLocation { + version, + path, + size: None, + naming_scheme, + }) }) + .transpose() }) } @@ -68,6 +74,18 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { async fn put_if_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>; } +fn detect_naming_scheme_from_path(path: &Path) -> Result { + path.filename() + .and_then(ManifestNamingScheme::detect_scheme) + .ok_or_else(|| { + Error::corrupt_file( + path.clone(), + "Path does not follow known manifest naming convention.", + location!(), + ) + }) +} + /// External manifest commit handler /// This handler is used to commit a manifest to an external store /// for detailed design, see https://github.com/lancedb/lance/issues/1183 @@ -77,7 +95,7 @@ pub struct ExternalManifestCommitHandler { } impl ExternalManifestCommitHandler { - /// The manifest is considered committed once the staging manifest is writen + /// The manifest is considered committed once the staging manifest is written /// to object store and that path is committed to the external store. /// /// However, to fully complete this, the staging manifest should be materialized @@ -92,9 +110,10 @@ impl ExternalManifestCommitHandler { staging_manifest_path: &Path, version: u64, store: &dyn OSObjectStore, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result { // step 1: copy the manifest to the final location - let final_manifest_path = manifest_path(base_path, version); + let final_manifest_path = naming_scheme.manifest_path(base_path, version); match store .copy(staging_manifest_path, &final_manifest_path) .await @@ -128,12 +147,14 @@ impl CommitHandler for ExternalManifestCommitHandler { object_store: &ObjectStore, ) -> std::result::Result { let path = self.resolve_latest_version(base_path, object_store).await?; + let naming_scheme = detect_naming_scheme_from_path(&path)?; Ok(ManifestLocation { version: self .resolve_latest_version_id(base_path, object_store) .await?, path, size: None, + naming_scheme, }) } @@ -155,9 +176,19 @@ impl CommitHandler for ExternalManifestCommitHandler { return Ok(Path::parse(path)?); } + // Detect naming scheme based on presence of zero padding. let staged_path = Path::parse(&path)?; - self.finalize_manifest(base_path, &staged_path, version, &object_store.inner) - .await + let naming_scheme = + ManifestNamingScheme::detect_scheme_staging(staged_path.filename().unwrap()); + + self.finalize_manifest( + base_path, + &staged_path, + version, + &object_store.inner, + naming_scheme, + ) + .await } // Dataset not found in the external store, this could be because the dataset did not // use external store for commit before. In this case, we search for the latest manifest @@ -198,8 +229,13 @@ impl CommitHandler for ExternalManifestCommitHandler { Ok(p) => p, // not board external manifest yet, direct to object store Err(Error::NotFound { .. }) => { - let path = manifest_path(base_path, version); - // if exist update external manifest store + let path = default_resolve_version(base_path, version, object_store) + .await + .map_err(|_| Error::NotFound { + uri: format!("{}@{}", base_path, version), + location: location!(), + })? + .path; if object_store.exists(&path).await? { // best effort put, if it fails, it's okay match self @@ -209,27 +245,58 @@ impl CommitHandler for ExternalManifestCommitHandler { { Ok(_) => {} Err(e) => { - warn!("could up update external manifest store during load, with error: {}", e); + warn!( + "could not update external manifest store during load, with error: {}", + e + ); } } + return Ok(path); } else { return Err(Error::NotFound { uri: path.to_string(), location: location!(), }); } - return Ok(manifest_path(base_path, version)); } Err(e) => return Err(e), }; // finalized path, just return - if path.ends_with(&format!(".{MANIFEST_EXTENSION}")) { - return Ok(Path::parse(path)?); + let current_path = Path::parse(path)?; + if current_path.extension() == Some(MANIFEST_EXTENSION) { + return Ok(current_path); } - self.finalize_manifest(base_path, &Path::parse(&path)?, version, object_store) - .await + let naming_scheme = + ManifestNamingScheme::detect_scheme_staging(current_path.filename().unwrap()); + + self.finalize_manifest( + base_path, + &Path::parse(¤t_path)?, + version, + object_store, + naming_scheme, + ) + .await + } + + async fn resolve_version_location( + &self, + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, + ) -> std::result::Result { + let path = self + .resolve_version(base_path, version, object_store) + .await?; + let naming_scheme = detect_naming_scheme_from_path(&path)?; + Ok(ManifestLocation { + version, + path, + size: None, + naming_scheme, + }) } async fn commit( @@ -239,12 +306,13 @@ impl CommitHandler for ExternalManifestCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // path we get here is the path to the manifest we want to write // use object_store.base_path.as_ref() for getting the root of the dataset // step 1: Write the manifest we want to commit to object store with a temporary name - let path = manifest_path(base_path, manifest.version); + let path = naming_scheme.manifest_path(base_path, manifest.version); let staging_path = make_staging_manifest_path(&path)?; manifest_writer(object_store, manifest, indices, &staging_path).await?; @@ -265,11 +333,14 @@ impl CommitHandler for ExternalManifestCommitHandler { return res; } + let scheme = detect_naming_scheme_from_path(&path)?; + self.finalize_manifest( base_path, &staging_path, manifest.version, &object_store.inner, + scheme, ) .await?; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 20ebcdc0880..36e074387c9 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -25,7 +25,8 @@ use lance_table::format::{ DataStorageFormat, Fragment, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION, }; use lance_table::io::commit::{ - commit_handler_from_url, CommitError, CommitHandler, CommitLock, ManifestLocation, + commit_handler_from_url, migrate_scheme_to_v2, CommitError, CommitHandler, CommitLock, + ManifestLocation, ManifestNamingScheme, }; use lance_table::io::manifest::{read_manifest, write_manifest}; use log::warn; @@ -100,6 +101,7 @@ pub struct Dataset { pub(crate) manifest: Arc, pub(crate) session: Arc, pub tags: Tags, + pub manifest_naming_scheme: ManifestNamingScheme, } /// Dataset Version @@ -332,15 +334,10 @@ impl Dataset { async fn checkout_by_version_number(&self, version: u64) -> Result { let base_path = self.base.clone(); - let manifest_file = self + let manifest_location = self .commit_handler - .resolve_version(&base_path, version, &self.object_store.inner) + .resolve_version_location(&base_path, version, &self.object_store.inner) .await?; - let manifest_location = ManifestLocation { - version, - path: manifest_file, - size: None, - }; let manifest = Self::load_manifest(self.object_store.as_ref(), &manifest_location).await?; Self::checkout_manifest( self.object_store.clone(), @@ -349,6 +346,7 @@ impl Dataset { manifest, self.session.clone(), self.commit_handler.clone(), + manifest_location.naming_scheme, ) .await } @@ -431,6 +429,7 @@ impl Dataset { manifest: Manifest, session: Arc, commit_handler: Arc, + manifest_naming_scheme: ManifestNamingScheme, ) -> Result { let tags = Tags::new( object_store.clone(), @@ -445,6 +444,7 @@ impl Dataset { commit_handler, session, tags, + manifest_naming_scheme, }) } @@ -532,6 +532,14 @@ impl Dataset { } } + let manifest_naming_scheme = if let Some(d) = dataset.as_ref() { + d.manifest_naming_scheme + } else if params.enable_v2_manifest_paths { + ManifestNamingScheme::V2 + } else { + ManifestNamingScheme::V1 + }; + let params = params; // discard mut if let Some(d) = dataset.as_ref() { @@ -583,6 +591,7 @@ impl Dataset { &transaction, &manifest_config, &Default::default(), + manifest_naming_scheme, ) .await? } else { @@ -592,6 +601,7 @@ impl Dataset { &base, &transaction, &manifest_config, + manifest_naming_scheme, ) .await? }; @@ -606,6 +616,7 @@ impl Dataset { session: Arc::new(Session::default()), commit_handler, tags, + manifest_naming_scheme, }) } @@ -676,6 +687,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -762,6 +774,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?, ); @@ -833,6 +846,12 @@ impl Dataset { /// * `operation` - A description of the change to commit /// * `read_version` - The version of the dataset that this change is based on /// * `store_params` Parameters controlling object store access to the manifest + /// * `enable_v2_manifest_paths`: If set to true, and this is a new dataset, uses the new v2 manifest + /// paths. These allow constant-time lookups for the latest manifest on object storage. + /// This parameter has no effect on existing datasets. To migrate an existing + /// dataset, use the [`Self::migrate_manifest_paths_v2`] method. WARNING: turning + /// this on will make the dataset unreadable for older versions of Lance + /// (prior to 0.17.0). Default is False. pub async fn commit( base_uri: &str, operation: Operation, @@ -840,6 +859,7 @@ impl Dataset { store_params: Option, commit_handler: Option>, object_store_registry: Arc, + enable_v2_manifest_paths: bool, ) -> Result { let read_version = read_version.map_or_else( || match operation { @@ -893,6 +913,14 @@ impl Dataset { None }; + let manifest_naming_scheme = if let Some(ds) = &dataset { + ds.manifest_naming_scheme + } else if enable_v2_manifest_paths { + ManifestNamingScheme::V2 + } else { + ManifestNamingScheme::V1 + }; + let transaction = Transaction::new(read_version, operation, None); let manifest = if let Some(dataset) = &dataset { @@ -903,6 +931,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + manifest_naming_scheme, ) .await? } else { @@ -912,6 +941,7 @@ impl Dataset { &base, &transaction, &Default::default(), + manifest_naming_scheme, ) .await? }; @@ -927,6 +957,7 @@ impl Dataset { session: Arc::new(Session::default()), commit_handler, tags, + manifest_naming_scheme, }) } @@ -1086,6 +1117,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -1252,6 +1284,43 @@ impl Dataset { Ok(()) } + + /// Migrate the dataset to use the new manifest path scheme. + /// + /// This function will rename all V1 manifests to [ManifestNamingScheme::V2]. + /// These paths provide more efficient opening of datasets with many versions + /// on object stores. + /// + /// This function is idempotent, and can be run multiple times without + /// changing the state of the object store. + /// + /// However, it should not be run while other concurrent operations are happening. + /// And it should also run until completion before resuming other operations. + /// + /// ```rust + /// # use lance::dataset::Dataset; + /// # use lance_table::io::commit::ManifestNamingScheme; + /// # use lance_datagen::{array, RowCount, BatchCount}; + /// # use arrow_array::types::Int32Type; + /// # let data = lance_datagen::gen() + /// # .col("key", array::step::()) + /// # .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + /// # let fut = async { + /// let mut dataset = Dataset::write(data, "memory://test", None).await.unwrap(); + /// assert_eq!(dataset.manifest_naming_scheme, ManifestNamingScheme::V1); + /// + /// dataset.migrate_manifest_paths_v2().await.unwrap(); + /// assert_eq!(dataset.manifest_naming_scheme, ManifestNamingScheme::V2); + /// # }; + /// # tokio::runtime::Runtime::new().unwrap().block_on(fut); + /// ``` + pub async fn migrate_manifest_paths_v2(&mut self) -> Result<()> { + migrate_scheme_to_v2(self.object_store(), &self.base).await?; + // We need to re-open. + let latest_version = self.latest_version_id().await?; + *self = self.checkout_version(latest_version).await?; + Ok(()) + } } /// # Schema Evolution @@ -1385,6 +1454,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -1455,6 +1525,7 @@ pub(crate) async fn write_manifest_file( manifest: &mut Manifest, indices: Option>, config: &ManifestWriteConfig, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { if config.auto_set_feature_flags { apply_feature_flags(manifest, config.use_move_stable_row_ids)?; @@ -1471,6 +1542,7 @@ pub(crate) async fn write_manifest_file( base_path, object_store, write_manifest_file_to_path, + naming_scheme, ) .await?; @@ -1963,6 +2035,7 @@ mod tests { use_legacy_format: None, storage_format: None, }, + dataset.manifest_naming_scheme, ) .await .unwrap(); @@ -2629,6 +2702,104 @@ mod tests { assert!(matches!(result.unwrap_err(), Error::DatasetNotFound { .. })); } + fn assert_all_manifests_use_scheme(test_dir: &TempDir, scheme: ManifestNamingScheme) { + let entries_names = test_dir + .path() + .join("_versions") + .read_dir() + .unwrap() + .map(|entry| entry.unwrap().file_name().into_string().unwrap()) + .collect::>(); + assert!( + entries_names + .iter() + .all(|name| ManifestNamingScheme::detect_scheme(name) == Some(scheme)), + "Entries: {:?}", + entries_names + ); + } + + #[tokio::test] + async fn test_v2_manifest_path_create() { + // Can create a dataset, using V2 paths + let data = lance_datagen::gen() + .col("key", array::step::()) + .into_batch_rows(RowCount::from(10)) + .unwrap(); + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + Dataset::write( + RecordBatchIterator::new([Ok(data.clone())], data.schema().clone()), + test_uri, + Some(WriteParams { + enable_v2_manifest_paths: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); + + // Appending to it will continue to use those paths + let dataset = Dataset::write( + RecordBatchIterator::new([Ok(data.clone())], data.schema().clone()), + test_uri, + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); + + UpdateBuilder::new(Arc::new(dataset)) + .update_where("key = 5") + .unwrap() + .set("key", "200") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + + assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); + } + + #[tokio::test] + async fn test_v2_manifest_path_commit() { + let schema = Schema::try_from(&ArrowSchema::new(vec![ArrowField::new( + "x", + DataType::Int32, + false, + )])) + .unwrap(); + let operation = Operation::Overwrite { + fragments: vec![], + schema, + }; + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + let dataset = Dataset::commit( + test_uri, + operation, + None, + None, + None, + Arc::new(ObjectStoreRegistry::default()), + true, // enable_v2_manifest_paths + ) + .await + .unwrap(); + + assert!(dataset.manifest_naming_scheme == ManifestNamingScheme::V2); + + assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); + } + #[rstest] #[tokio::test] async fn test_merge( diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 5523b71b25c..5d7037b6012 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -8,7 +8,7 @@ use lance_io::object_store::{ }; use lance_table::{ format::Manifest, - io::commit::{commit_handler_from_url, CommitHandler, ManifestLocation}, + io::commit::{commit_handler_from_url, CommitHandler}, }; use object_store::{aws::AwsCredentialProvider, path::Path, DynObjectStore}; use prost::Message; @@ -228,6 +228,7 @@ impl DatasetBuilder { self.options.block_size, self.options.object_store_wrapper, self.options.use_constant_size_upload_parts, + store.1.scheme() != "file", // If user supplied an object store then we just assume it's probably // cloud-like DEFAULT_CLOUD_IO_PARALLELISM, @@ -261,6 +262,8 @@ impl DatasetBuilder { let cloned_ref = self.version.clone(); let table_uri = self.table_uri.clone(); + // How do we detect which version scheme is in use? + let manifest = self.manifest.take(); let (object_store, base_path, commit_handler) = self.build_object_store().await?; @@ -279,27 +282,21 @@ impl DatasetBuilder { } } - let manifest = if manifest.is_some() { - let mut manifest = manifest.unwrap(); + let (manifest, manifest_naming_scheme) = if let Some(mut manifest) = manifest { + let location = commit_handler + .resolve_version_location(&base_path, manifest.version, &object_store.inner) + .await?; if manifest.schema.has_dictionary_types() { - let path = commit_handler - .resolve_version(&base_path, manifest.version, &object_store.inner) - .await?; - let reader = object_store.open(&path).await?; + let reader = object_store.open(&location.path).await?; populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; } - manifest + (manifest, location.naming_scheme) } else { let manifest_location = match version { Some(version) => { - let path = commit_handler - .resolve_version(&base_path, version, &object_store.inner) - .await?; - ManifestLocation { - version, - path, - size: None, - } + commit_handler + .resolve_version_location(&base_path, version, &object_store.inner) + .await? } None => commit_handler .resolve_latest_location(&base_path, &object_store) @@ -311,7 +308,8 @@ impl DatasetBuilder { })?, }; - Dataset::load_manifest(&object_store, &manifest_location).await? + let manifest = Dataset::load_manifest(&object_store, &manifest_location).await?; + (manifest, manifest_location.naming_scheme) }; Dataset::checkout_manifest( @@ -321,6 +319,7 @@ impl DatasetBuilder { manifest, session, commit_handler, + manifest_naming_scheme, ) .await } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 9dc3cfaa12b..4d2ce5a562e 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2246,7 +2246,7 @@ mod tests { }; let registry = Arc::new(ObjectStoreRegistry::default()); - let new_dataset = Dataset::commit(test_uri, op, None, None, None, registry) + let new_dataset = Dataset::commit(test_uri, op, None, None, None, registry, false) .await .unwrap(); @@ -2346,7 +2346,7 @@ mod tests { }; let registry = Arc::new(ObjectStoreRegistry::default()); - let dataset = Dataset::commit(test_uri, op, None, None, None, registry) + let dataset = Dataset::commit(test_uri, op, None, None, None, registry, false) .await .unwrap(); @@ -2572,6 +2572,7 @@ mod tests { None, None, registry, + false, ) .await?; @@ -2699,6 +2700,7 @@ mod tests { None, None, object_store_registry, + false, ) .await .unwrap(); diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index bbac0aac2c3..cfe4bf67ba8 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -603,6 +603,7 @@ async fn reserve_fragment_ids( &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; @@ -896,6 +897,7 @@ pub async fn commit_compaction( &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index 05d8f8caf40..9a5865159ea 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -228,6 +228,7 @@ pub(super) async fn add_columns( &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; @@ -469,6 +470,7 @@ pub(super) async fn alter_columns( &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; @@ -517,6 +519,7 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index a1a5f677504..e2b32b64979 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -111,6 +111,13 @@ pub struct WriteParams { /// secondary indices need to be updated to point to new row ids. pub enable_move_stable_row_ids: bool, + /// If set to true, and this is a new dataset, uses the new v2 manifest paths. + /// These allow constant-time lookups for the latest manifest on object storage. + /// This parameter has no effect on existing datasets. To migrate an existing + /// dataset, use the [`super::Dataset::migrate_manifest_paths_v2`] method. + /// Default is False. + pub enable_v2_manifest_paths: bool, + pub object_store_registry: Arc, } @@ -128,6 +135,7 @@ impl Default for WriteParams { commit_handler: None, data_storage_version: None, enable_move_stable_row_ids: false, + enable_v2_manifest_paths: false, object_store_registry: Arc::new(ObjectStoreRegistry::default()), } } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 352db503adc..37feaacf6f7 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -991,6 +991,7 @@ impl MergeInsertJob { &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 6b970dea50d..6fbe47d4303 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -335,6 +335,7 @@ impl UpdateJob { &transaction, &Default::default(), &Default::default(), + self.dataset.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 2b1962ab436..969a719376a 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -324,6 +324,7 @@ impl DatasetIndexExt for Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -393,6 +394,7 @@ impl DatasetIndexExt for Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -479,6 +481,7 @@ impl DatasetIndexExt for Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 83c18e93443..be2c75723ab 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -29,7 +29,7 @@ use lance_file::version::LanceFileVersion; use lance_table::format::{ pb, DataStorageFormat, DeletionFile, Fragment, Index, Manifest, WriterVersion, }; -use lance_table::io::commit::{CommitConfig, CommitError, CommitHandler}; +use lance_table::io::commit::{CommitConfig, CommitError, CommitHandler, ManifestNamingScheme}; use lance_table::io::deletion::read_deletion_file; use rand::Rng; use snafu::{location, Location}; @@ -121,6 +121,7 @@ pub(crate) async fn commit_new_dataset( base_path: &Path, transaction: &Transaction, write_config: &ManifestWriteConfig, + manifest_naming_scheme: ManifestNamingScheme, ) -> Result { let transaction_file = write_transaction_file(object_store, base_path, transaction).await?; @@ -138,6 +139,7 @@ pub(crate) async fn commit_new_dataset( Some(indices.clone()) }, write_config, + manifest_naming_scheme, ) .await?; @@ -415,6 +417,7 @@ pub(crate) async fn commit_transaction( transaction: &Transaction, write_config: &ManifestWriteConfig, commit_config: &CommitConfig, + manifest_naming_scheme: ManifestNamingScheme, ) -> Result { // Note: object_store has been configured with WriteParams, but dataset.object_store() // has not necessarily. So for anything involving writing, use `object_store`. @@ -504,6 +507,7 @@ pub(crate) async fn commit_transaction( Some(indices.clone()) }, write_config, + manifest_naming_scheme, ) .await; diff --git a/rust/lance/src/io/commit/dynamodb.rs b/rust/lance/src/io/commit/dynamodb.rs index ab393010027..94502004926 100644 --- a/rust/lance/src/io/commit/dynamodb.rs +++ b/rust/lance/src/io/commit/dynamodb.rs @@ -47,7 +47,7 @@ mod test { use lance_table::io::commit::{ dynamodb::DynamoDBExternalManifestStore, external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore}, - manifest_path, CommitHandler, + CommitHandler, ManifestNamingScheme, }; fn read_params(handler: Arc) -> ReadParams { @@ -313,7 +313,10 @@ mod test { let version_six_staging_location = base_path.child(format!("6.manifest-{}", uuid::Uuid::new_v4())); localfs - .rename(&manifest_path(&ds.base, 6), &version_six_staging_location) + .rename( + &ManifestNamingScheme::V1.manifest_path(&ds.base, 6), + &version_six_staging_location, + ) .await .unwrap(); store diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index adabbd33c6a..14bd842f373 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -13,7 +13,7 @@ mod test { use lance_table::io::commit::external_manifest::{ ExternalManifestCommitHandler, ExternalManifestStore, }; - use lance_table::io::commit::{manifest_path, CommitHandler}; + use lance_table::io::commit::{CommitHandler, ManifestNamingScheme}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use object_store::local::LocalFileSystem; use object_store::path::Path; @@ -303,7 +303,10 @@ mod test { let version_six_staging_location = base_path.child(format!("6.manifest-{}", uuid::Uuid::new_v4())); localfs - .rename(&manifest_path(&ds.base, 6), &version_six_staging_location) + .rename( + &ManifestNamingScheme::V1.manifest_path(&ds.base, 6), + &version_six_staging_location, + ) .await .unwrap(); { diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 16b2ef87c61..170d20d0a6f 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -113,9 +113,17 @@ impl TestDatasetGenerator { let operation = Operation::Overwrite { fragments, schema }; let registry = Arc::new(ObjectStoreRegistry::default()); - Dataset::commit(uri, operation, None, Default::default(), None, registry) - .await - .unwrap() + Dataset::commit( + uri, + operation, + None, + Default::default(), + None, + registry, + false, + ) + .await + .unwrap() } fn make_schema(&self, rng: &mut impl Rng) -> Schema {