From d7bcc55ab461b14093e18a69faf9cb65b7cf057e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 27 Aug 2024 12:54:47 -0700 Subject: [PATCH 01/11] wip: faster manifest lookup --- rust/lance-table/src/io/commit.rs | 76 +++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 19 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 5d4c8ab00df..0921a039d72 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -57,9 +57,38 @@ use { use crate::format::{Index, Manifest}; -const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; +#[derive(Clone, Copy, Debug)] +pub enum ManifestNamingScheme { + /// `_versions/{version}.manifest` + V1, + /// `_manifests/{99999999999999999999 - version}.manifest` + /// + /// Zero-padded and reversed for O(1) lookup of latest version on object stores. + V2, +} + +impl ManifestNamingScheme { + pub fn manifests_dir(&self, base: &Path) -> Path { + match self { + Self::V1 => base.child("_versions"), + Self::V2 => base.child("_manifests"), + } + } + + pub fn manifest_path(&self, base: &Path, version: u64) -> Path { + let directory = self.manifests_dir(base); + match self { + Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")), + Self::V2 => { + let inverted_version = 99999_99999_99999_99999 - version; + directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}")) + } + } + } +} + /// Function that writes the manifest to the object store. pub type ManifestWriter = for<'a> fn( object_store: &'a ObjectStore, @@ -68,12 +97,6 @@ pub type ManifestWriter = for<'a> fn( path: &'a Path, ) -> BoxFuture<'a, Result<()>>; -/// Get the manifest file path for a version. -pub fn manifest_path(base: &Path, version: u64) -> Path { - base.child(VERSIONS_DIR) - .child(format!("{version}.{MANIFEST_EXTENSION}")) -} - #[derive(Debug)] pub struct ManifestLocation { /// The version the manifest corresponds to. @@ -88,17 +111,21 @@ pub struct ManifestLocation { async fn current_manifest_path( object_store: &ObjectStore, base: &Path, + scheme: ManifestNamingScheme, ) -> Result { if object_store.is_local() { - if let Ok(Some(location)) = current_manifest_local(base) { + if let Ok(Some(location)) = current_manifest_local(base, scheme) { return Ok(location); } } + // TODO: Move to method of ManifestNamingScheme and dispatch to optimized + // method for V2. + // We use `list_with_delimiter` to avoid listing the contents of child directories. let manifest_files = object_store .inner - .list_with_delimiter(Some(&base.child(VERSIONS_DIR))) + .list_with_delimiter(Some(&scheme.manifests_dir(base))) .await?; let current = manifest_files @@ -131,7 +158,7 @@ async fn current_manifest_path( }) } else { Err(Error::NotFound { - uri: manifest_path(base, 1).to_string(), + uri: scheme.manifest_path(base, 1).to_string(), location: location!(), }) } @@ -140,8 +167,11 @@ async fn current_manifest_path( // This is an optimized function that searches for the latest manifest. In // object_store, list operations lookup metadata for each file listed. This // method only gets the metadata for the found latest manifest. -fn current_manifest_local(base: &Path) -> std::io::Result> { - let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR)); +fn current_manifest_local( + base: &Path, + scheme: ManifestNamingScheme, +) -> std::io::Result> { + let path = lance_io::local::to_local_path(&scheme.manifests_dir(base)); let entries = std::fs::read_dir(path)?; let mut latest_entry: Option<(u64, DirEntry)> = None; @@ -155,6 +185,7 @@ fn current_manifest_local(base: &Path) -> std::io::Result().ok()) @@ -187,10 +218,10 @@ fn current_manifest_local(base: &Path) -> std::io::Result( base_path: &Path, object_store: &'a dyn OSObjectStore, + scheme: ManifestNamingScheme, ) -> Result>> { - let base_path = base_path.clone(); Ok(object_store - .read_dir_all(&base_path.child(VERSIONS_DIR), None) + .read_dir_all(&scheme.manifests_dir(base_path), None) .await? .try_filter_map(|obj_meta| { if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) { @@ -238,8 +269,9 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, object_store: &ObjectStore, + scheme: ManifestNamingScheme, ) -> Result { - Ok(current_manifest_path(object_store, base_path).await?) + Ok(current_manifest_path(object_store, base_path, scheme).await?) } /// Get the path to the latest version manifest of a dataset at the base_path @@ -247,9 +279,12 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, object_store: &ObjectStore, + scheme: ManifestNamingScheme, ) -> std::result::Result { // TODO: we need to pade 0's to the version number on the manifest file path - Ok(current_manifest_path(object_store, base_path).await?.path) + Ok(current_manifest_path(object_store, base_path, scheme) + .await? + .path) } // for default implementation, parse the version from the path @@ -257,8 +292,9 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, object_store: &ObjectStore, + scheme: ManifestNamingScheme, ) -> Result { - Ok(current_manifest_path(object_store, base_path) + Ok(current_manifest_path(object_store, base_path, scheme) .await? .version) } @@ -269,8 +305,9 @@ pub trait CommitHandler: Debug + Send + Sync { base_path: &Path, version: u64, _object_store: &dyn OSObjectStore, + scheme: ManifestNamingScheme, ) -> std::result::Result { - Ok(manifest_path(base_path, version)) + Ok(scheme.manifest_path(base_path, version)) } /// List manifests that are available for a dataset at the base_path @@ -278,8 +315,9 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, object_store: &'a dyn OSObjectStore, + scheme: ManifestNamingScheme, ) -> Result>> { - list_manifests(base_path, object_store).await + list_manifests(base_path, object_store, scheme).await } /// Commit a manifest. From a85f546f876734fc2f032469989b760003ee889f Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 28 Aug 2024 16:01:04 -0700 Subject: [PATCH 02/11] more progress towards a new naming scheme --- rust/lance-table/src/format/manifest.rs | 10 ++++ rust/lance-table/src/io/commit.rs | 43 ++++++++++++---- .../src/io/commit/external_manifest.rs | 51 ++++++++++++++----- rust/lance/src/dataset/builder.rs | 2 + 4 files changed, 81 insertions(+), 25 deletions(-) diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 9fd92c37032..f1113dc0997 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -17,6 +17,7 @@ use prost_types::Timestamp; use super::Fragment; use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS}; use crate::format::pb; +use crate::io::commit::ManifestNamingScheme; use lance_core::cache::FileMetadataCache; use lance_core::datatypes::Schema; use lance_core::{Error, Result}; @@ -279,6 +280,15 @@ impl Manifest { pub fn should_use_legacy_format(&self) -> bool { self.data_storage_format.version == LEGACY_FORMAT_VERSION } + + pub fn manifest_naming_scheme(&self) -> ManifestNamingScheme { + // TODO: Make this user configurable + if std::env::var("LANCE_USE_V2_VERSION_NAMES").is_ok() { + ManifestNamingScheme::V2 + } else { + ManifestNamingScheme::V1 + } + } } #[derive(Debug, Clone, PartialEq)] diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 0921a039d72..56cfbc942a9 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -63,9 +63,11 @@ const MANIFEST_EXTENSION: &str = "manifest"; pub enum ManifestNamingScheme { /// `_versions/{version}.manifest` V1, - /// `_manifests/{99999999999999999999 - version}.manifest` + /// `_manifests/{u64::MAX - version}.manifest` /// /// Zero-padded and reversed for O(1) lookup of latest version on object stores. + /// Lexicographically, the first file in the directory should always be the + /// latest manifest.. V2, } @@ -82,11 +84,21 @@ impl ManifestNamingScheme { match self { Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")), Self::V2 => { - let inverted_version = 99999_99999_99999_99999 - version; + let inverted_version = std::u64::MAX - version; directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}")) } } } + + pub fn parse_version(&self, filename: &str) -> Option { + let file_number = filename + .split_once('.') + .and_then(|(version_str, _)| version_str.parse::().ok()); + match self { + Self::V1 => file_number, + Self::V2 => file_number.map(|v| std::u64::MAX - v), + } + } } /// Function that writes the manifest to the object store. @@ -185,11 +197,8 @@ fn current_manifest_local( // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d continue; } - // TODO: make parse_version a method on the file naming scheme. - let Some(version) = filename - .split_once('.') - .and_then(|(version_str, _)| version_str.parse::().ok()) - else { + + let Some(version) = scheme.parse_version(&filename) else { continue; }; @@ -331,6 +340,7 @@ pub trait CommitHandler: Debug + Send + Sync { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError>; } @@ -540,6 +550,7 @@ impl CommitHandler for UnsafeCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // Log a one-time warning if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) { @@ -551,7 +562,7 @@ impl CommitHandler for UnsafeCommitHandler { } let version_path = self - .resolve_version(base_path, manifest.version, &object_store.inner) + .resolve_version(base_path, manifest.version, &object_store.inner, scheme) .await?; // Write the manifest naively manifest_writer(object_store, manifest, indices, &version_path).await?; @@ -601,9 +612,10 @@ impl CommitHandler for T { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { let path = self - .resolve_version(base_path, manifest.version, &object_store.inner) + .resolve_version(base_path, manifest.version, &object_store.inner, scheme) .await?; // NOTE: once we have the lease we cannot use ? to return errors, since // we must release the lease before returning. @@ -645,9 +657,17 @@ impl CommitHandler for Arc { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { self.as_ref() - .commit(manifest, indices, base_path, object_store, manifest_writer) + .commit( + manifest, + indices, + base_path, + object_store, + manifest_writer, + scheme, + ) .await } } @@ -666,12 +686,13 @@ impl CommitHandler for RenameCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // Create a temporary object, then use `rename_if_not_exists` to commit. // If failed, clean up the temporary object. let path = self - .resolve_version(base_path, manifest.version, &object_store.inner) + .resolve_version(base_path, manifest.version, &object_store.inner, scheme) .await?; // Add .tmp_ prefix to the path diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index d205f99c1c8..b829b0dee9e 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -15,7 +15,7 @@ use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjec use snafu::{location, Location}; use super::{ - current_manifest_path, make_staging_manifest_path, manifest_path, ManifestLocation, + current_manifest_path, make_staging_manifest_path, ManifestLocation, ManifestNamingScheme, MANIFEST_EXTENSION, }; use crate::format::{Index, Manifest}; @@ -77,7 +77,7 @@ pub struct ExternalManifestCommitHandler { } impl ExternalManifestCommitHandler { - /// The manifest is considered committed once the staging manifest is writen + /// The manifest is considered committed once the staging manifest is written /// to object store and that path is committed to the external store. /// /// However, to fully complete this, the staging manifest should be materialized @@ -92,9 +92,10 @@ impl ExternalManifestCommitHandler { staging_manifest_path: &Path, version: u64, store: &dyn OSObjectStore, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result { // step 1: copy the manifest to the final location - let final_manifest_path = manifest_path(base_path, version); + let final_manifest_path = naming_scheme.manifest_path(base_path, version); match store .copy(staging_manifest_path, &final_manifest_path) .await @@ -126,11 +127,14 @@ impl CommitHandler for ExternalManifestCommitHandler { &self, base_path: &Path, object_store: &ObjectStore, + scheme: ManifestNamingScheme, ) -> std::result::Result { - let path = self.resolve_latest_version(base_path, object_store).await?; + let path = self + .resolve_latest_version(base_path, object_store, scheme) + .await?; Ok(ManifestLocation { version: self - .resolve_latest_version_id(base_path, object_store) + .resolve_latest_version_id(base_path, object_store, scheme) .await?, path, size: None, @@ -142,6 +146,7 @@ impl CommitHandler for ExternalManifestCommitHandler { &self, base_path: &Path, object_store: &ObjectStore, + scheme: ManifestNamingScheme, ) -> std::result::Result { let version = self .external_manifest_store @@ -156,12 +161,20 @@ impl CommitHandler for ExternalManifestCommitHandler { } let staged_path = Path::parse(&path)?; - self.finalize_manifest(base_path, &staged_path, version, &object_store.inner) - .await + self.finalize_manifest( + base_path, + &staged_path, + version, + &object_store.inner, + scheme, + ) + .await } // Dataset not found in the external store, this could be because the dataset did not // use external store for commit before. In this case, we search for the latest manifest - None => Ok(current_manifest_path(object_store, base_path).await?.path), + None => Ok(current_manifest_path(object_store, base_path, scheme) + .await? + .path), } } @@ -169,6 +182,7 @@ impl CommitHandler for ExternalManifestCommitHandler { &self, base_path: &Path, object_store: &ObjectStore, + scheme: ManifestNamingScheme, ) -> std::result::Result { let version = self .external_manifest_store @@ -177,7 +191,7 @@ impl CommitHandler for ExternalManifestCommitHandler { match version { Some((version, _)) => Ok(version), - None => Ok(current_manifest_path(object_store, base_path) + None => Ok(current_manifest_path(object_store, base_path, scheme) .await? .version), } @@ -188,6 +202,7 @@ impl CommitHandler for ExternalManifestCommitHandler { base_path: &Path, version: u64, object_store: &dyn OSObjectStore, + scheme: ManifestNamingScheme, ) -> std::result::Result { let path_res = self .external_manifest_store @@ -198,7 +213,7 @@ impl CommitHandler for ExternalManifestCommitHandler { Ok(p) => p, // not board external manifest yet, direct to object store Err(Error::NotFound { .. }) => { - let path = manifest_path(base_path, version); + let path = scheme.manifest_path(base_path, version); // if exist update external manifest store if object_store.exists(&path).await? { // best effort put, if it fails, it's okay @@ -218,7 +233,7 @@ impl CommitHandler for ExternalManifestCommitHandler { location: location!(), }); } - return Ok(manifest_path(base_path, version)); + return Ok(scheme.manifest_path(base_path, version)); } Err(e) => return Err(e), }; @@ -228,8 +243,14 @@ impl CommitHandler for ExternalManifestCommitHandler { return Ok(Path::parse(path)?); } - self.finalize_manifest(base_path, &Path::parse(&path)?, version, object_store) - .await + self.finalize_manifest( + base_path, + &Path::parse(&path)?, + version, + object_store, + scheme, + ) + .await } async fn commit( @@ -239,12 +260,13 @@ impl CommitHandler for ExternalManifestCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, + scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // path we get here is the path to the manifest we want to write // use object_store.base_path.as_ref() for getting the root of the dataset // step 1: Write the manifest we want to commit to object store with a temporary name - let path = manifest_path(base_path, manifest.version); + let path = scheme.manifest_path(base_path, manifest.version); let staging_path = make_staging_manifest_path(&path)?; manifest_writer(object_store, manifest, indices, &staging_path).await?; @@ -270,6 +292,7 @@ impl CommitHandler for ExternalManifestCommitHandler { &staging_path, manifest.version, &object_store.inner, + scheme, ) .await?; diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 5523b71b25c..3eeba061692 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -261,6 +261,8 @@ impl DatasetBuilder { let cloned_ref = self.version.clone(); let table_uri = self.table_uri.clone(); + // How do we detect which version scheme is in use? + let manifest = self.manifest.take(); let (object_store, base_path, commit_handler) = self.build_object_store().await?; From 386217b8a5bcea6c11f57579982845509d9f8808 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 29 Aug 2024 17:07:01 -0700 Subject: [PATCH 03/11] naming scheme --- rust/lance-table/src/io/commit.rs | 262 +++++++++++------- .../src/io/commit/external_manifest.rs | 134 ++++++--- rust/lance/src/dataset.rs | 24 +- rust/lance/src/dataset/builder.rs | 13 +- rust/lance/src/io/commit/dynamodb.rs | 5 +- rust/lance/src/io/commit/external_manifest.rs | 7 +- 6 files changed, 274 insertions(+), 171 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 56cfbc942a9..9892647e1f8 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -57,9 +57,10 @@ use { use crate::format::{Index, Manifest}; +const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum ManifestNamingScheme { /// `_versions/{version}.manifest` V1, @@ -72,19 +73,12 @@ pub enum ManifestNamingScheme { } impl ManifestNamingScheme { - pub fn manifests_dir(&self, base: &Path) -> Path { - match self { - Self::V1 => base.child("_versions"), - Self::V2 => base.child("_manifests"), - } - } - pub fn manifest_path(&self, base: &Path, version: u64) -> Path { - let directory = self.manifests_dir(base); + let directory = base.child(VERSIONS_DIR); match self { Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")), Self::V2 => { - let inverted_version = std::u64::MAX - version; + let inverted_version = u64::MAX - version; directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}")) } } @@ -96,7 +90,20 @@ impl ManifestNamingScheme { .and_then(|(version_str, _)| version_str.parse::().ok()); match self { Self::V1 => file_number, - Self::V2 => file_number.map(|v| std::u64::MAX - v), + Self::V2 => file_number.map(|v| u64::MAX - v), + } + } + + pub fn detect_scheme(filename: &str) -> Option { + if filename.ends_with(MANIFEST_EXTENSION) { + const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len(); + if filename.len() == V2_LEN { + Some(Self::V2) + } else { + Some(Self::V1) + } + } else { + None } } } @@ -117,88 +124,122 @@ pub struct ManifestLocation { pub path: Path, /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`. pub size: Option, + /// Naming scheme of the manifest file. + pub naming_scheme: ManifestNamingScheme, } /// Get the latest manifest path async fn current_manifest_path( object_store: &ObjectStore, base: &Path, - scheme: ManifestNamingScheme, ) -> Result { if object_store.is_local() { - if let Ok(Some(location)) = current_manifest_local(base, scheme) { + if let Ok(Some(location)) = current_manifest_local(base) { return Ok(location); } } - // TODO: Move to method of ManifestNamingScheme and dispatch to optimized - // method for V2. - - // We use `list_with_delimiter` to avoid listing the contents of child directories. - let manifest_files = object_store - .inner - .list_with_delimiter(Some(&scheme.manifests_dir(base))) - .await?; - - let current = manifest_files - .objects - .into_iter() - .filter(|meta| { - meta.location.filename().is_some() - && meta - .location - .filename() - .unwrap() - .ends_with(MANIFEST_EXTENSION) - }) - .filter_map(|meta| { - let version = meta - .location - .filename() - .unwrap() - .split_once('.') - .and_then(|(version_str, _)| version_str.parse::().ok())?; - Some((version, meta)) - }) - .max_by_key(|(version, _)| *version); + let manifest_files = object_store.inner.list(Some(&base.child(VERSIONS_DIR))); - if let Some((version, meta)) = current { - Ok(ManifestLocation { - version, - path: meta.location, - size: Some(meta.size as u64), - }) - } else { - Err(Error::NotFound { - uri: scheme.manifest_path(base, 1).to_string(), + let mut valid_manifests = manifest_files.try_filter_map(|res| { + if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap()) + { + future::ready(Ok(Some((scheme, res)))) + } else { + future::ready(Ok(None)) + } + }); + + let first = valid_manifests.next().await.transpose()?; + match first { + // If the first valid manifest we see is V2, we can assume that we are using + // V2 naming scheme for all manifests. + Some((scheme @ ManifestNamingScheme::V2, meta)) => { + let version = scheme + .parse_version(meta.location.filename().unwrap()) + .unwrap(); + Ok(ManifestLocation { + version, + path: meta.location, + size: Some(meta.size as u64), + naming_scheme: scheme, + }) + } + // If the first valid manifest we see if V1, assume for now that we are + // using V1 naming scheme for all manifests. Since we are listing the + // directory anyways, we will assert there aren't any V2 manifests. + Some((scheme @ ManifestNamingScheme::V1, meta)) => { + let mut current_version = scheme + .parse_version(meta.location.filename().unwrap()) + .unwrap(); + let mut current_meta = meta; + + while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? { + if matches!(scheme, ManifestNamingScheme::V2) { + return Err(Error::Internal { + message: "Found V2 manifest in a V1 manifest directory".to_string(), + location: location!(), + }); + } + let version = scheme + .parse_version(meta.location.filename().unwrap()) + .unwrap(); + if version > current_version { + current_version = version; + current_meta = meta; + } + } + Ok(ManifestLocation { + version: current_version, + path: current_meta.location, + size: Some(current_meta.size as u64), + naming_scheme: scheme, + }) + } + None => Err(Error::NotFound { + uri: base.child(VERSIONS_DIR).to_string(), location: location!(), - }) + }), } } // This is an optimized function that searches for the latest manifest. In // object_store, list operations lookup metadata for each file listed. This // method only gets the metadata for the found latest manifest. -fn current_manifest_local( - base: &Path, - scheme: ManifestNamingScheme, -) -> std::io::Result> { - let path = lance_io::local::to_local_path(&scheme.manifests_dir(base)); +fn current_manifest_local(base: &Path) -> std::io::Result> { + let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR)); let entries = std::fs::read_dir(path)?; let mut latest_entry: Option<(u64, DirEntry)> = None; + let mut scheme: Option = None; + for entry in entries { let entry = entry?; let filename_raw = entry.file_name(); let filename = filename_raw.to_string_lossy(); - if !filename.ends_with(MANIFEST_EXTENSION) { + + let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else { // Need to ignore temporary files, such as // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d continue; + }; + + if let Some(scheme) = scheme { + if scheme != entry_scheme { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Found multiple manifest naming schemes in the same directory: {:?} and {:?}", + scheme, entry_scheme + ), + )); + } + } else { + scheme = Some(entry_scheme); } - let Some(version) = scheme.parse_version(&filename) else { + let Some(version) = entry_scheme.parse_version(&filename) else { continue; }; @@ -218,6 +259,7 @@ fn current_manifest_local( version, path, size: Some(entry.metadata()?.len()), + naming_scheme: scheme.unwrap(), })) } else { Ok(None) @@ -227,10 +269,9 @@ fn current_manifest_local( async fn list_manifests<'a>( base_path: &Path, object_store: &'a dyn OSObjectStore, - scheme: ManifestNamingScheme, ) -> Result>> { Ok(object_store - .read_dir_all(&scheme.manifests_dir(base_path), None) + .read_dir_all(&base_path.child(VERSIONS_DIR), None) .await? .try_filter_map(|obj_meta| { if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) { @@ -278,9 +319,8 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, object_store: &ObjectStore, - scheme: ManifestNamingScheme, ) -> Result { - Ok(current_manifest_path(object_store, base_path, scheme).await?) + Ok(current_manifest_path(object_store, base_path).await?) } /// Get the path to the latest version manifest of a dataset at the base_path @@ -288,12 +328,9 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, object_store: &ObjectStore, - scheme: ManifestNamingScheme, ) -> std::result::Result { // TODO: we need to pade 0's to the version number on the manifest file path - Ok(current_manifest_path(object_store, base_path, scheme) - .await? - .path) + Ok(current_manifest_path(object_store, base_path).await?.path) } // for default implementation, parse the version from the path @@ -301,22 +338,33 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, object_store: &ObjectStore, - scheme: ManifestNamingScheme, ) -> Result { - Ok(current_manifest_path(object_store, base_path, scheme) + Ok(current_manifest_path(object_store, base_path) .await? .version) } /// Get the path to a specific versioned manifest of a dataset at the base_path + /// + /// The version must already exist. async fn resolve_version( &self, base_path: &Path, version: u64, - _object_store: &dyn OSObjectStore, - scheme: ManifestNamingScheme, + object_store: &dyn OSObjectStore, ) -> std::result::Result { - Ok(scheme.manifest_path(base_path, version)) + Ok(default_resolve_version(base_path, version, object_store) + .await? + .path) + } + + async fn resolve_version_location( + &self, + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, + ) -> Result { + default_resolve_version(base_path, version, object_store).await } /// List manifests that are available for a dataset at the base_path @@ -324,9 +372,8 @@ pub trait CommitHandler: Debug + Send + Sync { &self, base_path: &Path, object_store: &'a dyn OSObjectStore, - scheme: ManifestNamingScheme, ) -> Result>> { - list_manifests(base_path, object_store, scheme).await + list_manifests(base_path, object_store).await } /// Commit a manifest. @@ -340,10 +387,38 @@ pub trait CommitHandler: Debug + Send + Sync { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - scheme: ManifestNamingScheme, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError>; } +async fn default_resolve_version( + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, +) -> Result { + // try V2, fallback to V1. + let scheme = ManifestNamingScheme::V2; + let path = scheme.manifest_path(base_path, version); + match object_store.head(&path).await { + Ok(meta) => Ok(ManifestLocation { + version, + path, + size: Some(meta.size as u64), + naming_scheme: scheme, + }), + Err(ObjectStoreError::NotFound { .. }) => { + // fallback to V1 + let scheme = ManifestNamingScheme::V1; + Ok(ManifestLocation { + version, + path: scheme.manifest_path(base_path, version), + size: None, + naming_scheme: scheme, + }) + } + Err(e) => Err(e.into()), + } +} /// Adapt an object_store credentials into AWS SDK creds #[cfg(feature = "dynamodb")] #[derive(Debug)] @@ -550,7 +625,7 @@ impl CommitHandler for UnsafeCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - scheme: ManifestNamingScheme, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // Log a one-time warning if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) { @@ -561,9 +636,7 @@ impl CommitHandler for UnsafeCommitHandler { ); } - let version_path = self - .resolve_version(base_path, manifest.version, &object_store.inner, scheme) - .await?; + let version_path = naming_scheme.manifest_path(base_path, manifest.version); // Write the manifest naively manifest_writer(object_store, manifest, indices, &version_path).await?; @@ -612,11 +685,9 @@ impl CommitHandler for T { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - scheme: ManifestNamingScheme, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { - let path = self - .resolve_version(base_path, manifest.version, &object_store.inner, scheme) - .await?; + let path = naming_scheme.manifest_path(base_path, manifest.version); // NOTE: once we have the lease we cannot use ? to return errors, since // we must release the lease before returning. let lease = self.lock(manifest.version).await?; @@ -657,7 +728,7 @@ impl CommitHandler for Arc { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - scheme: ManifestNamingScheme, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { self.as_ref() .commit( @@ -666,7 +737,7 @@ impl CommitHandler for Arc { base_path, object_store, manifest_writer, - scheme, + naming_scheme, ) .await } @@ -686,26 +757,13 @@ impl CommitHandler for RenameCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - scheme: ManifestNamingScheme, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // Create a temporary object, then use `rename_if_not_exists` to commit. // If failed, clean up the temporary object. - let path = self - .resolve_version(base_path, manifest.version, &object_store.inner, scheme) - .await?; - - // Add .tmp_ prefix to the path - let mut parts: Vec<_> = path.parts().collect(); - // Add a UUID to the end of the filename to avoid conflicts - let uuid = uuid::Uuid::new_v4(); - let new_name = format!( - ".tmp_{}_{}", - parts.last().unwrap().as_ref(), - uuid.as_hyphenated() - ); - let _ = std::mem::replace(parts.last_mut().unwrap(), new_name.into()); - let tmp_path: Path = parts.into_iter().collect(); + let path = naming_scheme.manifest_path(base_path, manifest.version); + let tmp_path = make_staging_manifest_path(&path)?; // Write the manifest to the temporary path manifest_writer(object_store, manifest, indices, &tmp_path).await?; diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index b829b0dee9e..5cad514bdf2 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -9,14 +9,14 @@ use std::sync::Arc; use async_trait::async_trait; use lance_core::{Error, Result}; -use lance_io::object_store::{ObjectStore, ObjectStoreExt}; +use lance_io::object_store::ObjectStore; use log::warn; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use super::{ - current_manifest_path, make_staging_manifest_path, ManifestLocation, ManifestNamingScheme, - MANIFEST_EXTENSION, + current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation, + ManifestNamingScheme, MANIFEST_EXTENSION, }; use crate::format::{Index, Manifest}; use crate::io::commit::{CommitError, CommitHandler, ManifestWriter}; @@ -52,12 +52,18 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { &self, base_uri: &str, ) -> Result> { - self.get_latest_version(base_uri).await.map(|res| { - res.map(|(version, uri)| ManifestLocation { - version, - path: Path::from(uri), - size: None, + self.get_latest_version(base_uri).await.and_then(|res| { + res.map(|(version, uri)| { + let path = Path::from(uri); + let naming_scheme = detect_naming_scheme_from_path(&path)?; + Ok(ManifestLocation { + version, + path, + size: None, + naming_scheme, + }) }) + .transpose() }) } @@ -68,6 +74,18 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { async fn put_if_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>; } +fn detect_naming_scheme_from_path(path: &Path) -> Result { + path.filename() + .and_then(ManifestNamingScheme::detect_scheme) + .ok_or_else(|| { + Error::corrupt_file( + path.clone(), + "Path does not follow known manifest naming convention.", + location!(), + ) + }) +} + /// External manifest commit handler /// This handler is used to commit a manifest to an external store /// for detailed design, see https://github.com/lancedb/lance/issues/1183 @@ -127,17 +145,16 @@ impl CommitHandler for ExternalManifestCommitHandler { &self, base_path: &Path, object_store: &ObjectStore, - scheme: ManifestNamingScheme, ) -> std::result::Result { - let path = self - .resolve_latest_version(base_path, object_store, scheme) - .await?; + let path = self.resolve_latest_version(base_path, object_store).await?; + let naming_scheme = detect_naming_scheme_from_path(&path)?; Ok(ManifestLocation { version: self - .resolve_latest_version_id(base_path, object_store, scheme) + .resolve_latest_version_id(base_path, object_store) .await?, path, size: None, + naming_scheme, }) } @@ -146,7 +163,6 @@ impl CommitHandler for ExternalManifestCommitHandler { &self, base_path: &Path, object_store: &ObjectStore, - scheme: ManifestNamingScheme, ) -> std::result::Result { let version = self .external_manifest_store @@ -160,21 +176,26 @@ impl CommitHandler for ExternalManifestCommitHandler { return Ok(Path::parse(path)?); } + // Detect naming scheme based on presence of zero padding. + let naming_scheme = if path.chars().nth(20) == Some('.') { + ManifestNamingScheme::V2 + } else { + ManifestNamingScheme::V1 + }; + let staged_path = Path::parse(&path)?; self.finalize_manifest( base_path, &staged_path, version, &object_store.inner, - scheme, + naming_scheme, ) .await } // Dataset not found in the external store, this could be because the dataset did not // use external store for commit before. In this case, we search for the latest manifest - None => Ok(current_manifest_path(object_store, base_path, scheme) - .await? - .path), + None => Ok(current_manifest_path(object_store, base_path).await?.path), } } @@ -182,7 +203,6 @@ impl CommitHandler for ExternalManifestCommitHandler { &self, base_path: &Path, object_store: &ObjectStore, - scheme: ManifestNamingScheme, ) -> std::result::Result { let version = self .external_manifest_store @@ -191,7 +211,7 @@ impl CommitHandler for ExternalManifestCommitHandler { match version { Some((version, _)) => Ok(version), - None => Ok(current_manifest_path(object_store, base_path, scheme) + None => Ok(current_manifest_path(object_store, base_path) .await? .version), } @@ -202,7 +222,6 @@ impl CommitHandler for ExternalManifestCommitHandler { base_path: &Path, version: u64, object_store: &dyn OSObjectStore, - scheme: ManifestNamingScheme, ) -> std::result::Result { let path_res = self .external_manifest_store @@ -213,39 +232,44 @@ impl CommitHandler for ExternalManifestCommitHandler { Ok(p) => p, // not board external manifest yet, direct to object store Err(Error::NotFound { .. }) => { - let path = scheme.manifest_path(base_path, version); - // if exist update external manifest store - if object_store.exists(&path).await? { - // best effort put, if it fails, it's okay - match self - .external_manifest_store - .put_if_not_exists(base_path.as_ref(), version, path.as_ref()) - .await - { - Ok(_) => {} - Err(e) => { - warn!("could up update external manifest store during load, with error: {}", e); - } - } - } else { - return Err(Error::NotFound { - uri: path.to_string(), + let path = default_resolve_version(base_path, version, object_store) + .await + .map_err(|_| Error::NotFound { + uri: format!("{}@{}", base_path, version), location: location!(), - }); + })? + .path; + + // best effort put, if it fails, it's okay + match self + .external_manifest_store + .put_if_not_exists(base_path.as_ref(), version, path.as_ref()) + .await + { + Ok(_) => {} + Err(e) => { + warn!( + "could up update external manifest store during load, with error: {}", + e + ); + } } - return Ok(scheme.manifest_path(base_path, version)); + return Ok(path); } Err(e) => return Err(e), }; // finalized path, just return - if path.ends_with(&format!(".{MANIFEST_EXTENSION}")) { - return Ok(Path::parse(path)?); + let manifest_path = Path::parse(path)?; + if manifest_path.extension() == Some(MANIFEST_EXTENSION) { + return Ok(manifest_path); } + let scheme = detect_naming_scheme_from_path(&manifest_path)?; + self.finalize_manifest( base_path, - &Path::parse(&path)?, + &Path::parse(&manifest_path)?, version, object_store, scheme, @@ -253,6 +277,24 @@ impl CommitHandler for ExternalManifestCommitHandler { .await } + async fn resolve_version_location( + &self, + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, + ) -> std::result::Result { + let path = self + .resolve_version(base_path, version, object_store) + .await?; + let naming_scheme = detect_naming_scheme_from_path(&path)?; + Ok(ManifestLocation { + version, + path, + size: None, + naming_scheme, + }) + } + async fn commit( &self, manifest: &mut Manifest, @@ -260,13 +302,13 @@ impl CommitHandler for ExternalManifestCommitHandler { base_path: &Path, object_store: &ObjectStore, manifest_writer: ManifestWriter, - scheme: ManifestNamingScheme, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { // path we get here is the path to the manifest we want to write // use object_store.base_path.as_ref() for getting the root of the dataset // step 1: Write the manifest we want to commit to object store with a temporary name - let path = scheme.manifest_path(base_path, manifest.version); + let path = naming_scheme.manifest_path(base_path, manifest.version); let staging_path = make_staging_manifest_path(&path)?; manifest_writer(object_store, manifest, indices, &staging_path).await?; @@ -287,6 +329,8 @@ impl CommitHandler for ExternalManifestCommitHandler { return res; } + let scheme = detect_naming_scheme_from_path(&path)?; + self.finalize_manifest( base_path, &staging_path, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 20ebcdc0880..6db4d0edf2a 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -26,6 +26,7 @@ use lance_table::format::{ }; use lance_table::io::commit::{ commit_handler_from_url, CommitError, CommitHandler, CommitLock, ManifestLocation, + ManifestNamingScheme, }; use lance_table::io::manifest::{read_manifest, write_manifest}; use log::warn; @@ -332,15 +333,10 @@ impl Dataset { async fn checkout_by_version_number(&self, version: u64) -> Result { let base_path = self.base.clone(); - let manifest_file = self + let manifest_location = self .commit_handler - .resolve_version(&base_path, version, &self.object_store.inner) + .resolve_version_location(&base_path, version, &self.object_store.inner) .await?; - let manifest_location = ManifestLocation { - version, - path: manifest_file, - size: None, - }; let manifest = Self::load_manifest(self.object_store.as_ref(), &manifest_location).await?; Self::checkout_manifest( self.object_store.clone(), @@ -1428,11 +1424,12 @@ impl DatasetTakeRows for Dataset { #[derive(Debug)] pub(crate) struct ManifestWriteConfig { - auto_set_feature_flags: bool, // default true - timestamp: Option, // default None - use_move_stable_row_ids: bool, // default false - use_legacy_format: Option, // default None - storage_format: Option, // default None + auto_set_feature_flags: bool, // default true + timestamp: Option, // default None + use_move_stable_row_ids: bool, // default false + use_legacy_format: Option, // default None + storage_format: Option, // default None + manifest_naming_scheme: ManifestNamingScheme, // default V1. } impl Default for ManifestWriteConfig { @@ -1443,6 +1440,7 @@ impl Default for ManifestWriteConfig { use_move_stable_row_ids: false, use_legacy_format: None, storage_format: None, + manifest_naming_scheme: ManifestNamingScheme::V1, } } } @@ -1471,6 +1469,7 @@ pub(crate) async fn write_manifest_file( base_path, object_store, write_manifest_file_to_path, + config.manifest_naming_scheme, ) .await?; @@ -1962,6 +1961,7 @@ mod tests { use_move_stable_row_ids: false, use_legacy_format: None, storage_format: None, + manifest_naming_scheme: ManifestNamingScheme::V1, }, ) .await diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 3eeba061692..08a05e7c6f9 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -8,7 +8,7 @@ use lance_io::object_store::{ }; use lance_table::{ format::Manifest, - io::commit::{commit_handler_from_url, CommitHandler, ManifestLocation}, + io::commit::{commit_handler_from_url, CommitHandler}, }; use object_store::{aws::AwsCredentialProvider, path::Path, DynObjectStore}; use prost::Message; @@ -294,14 +294,9 @@ impl DatasetBuilder { } else { let manifest_location = match version { Some(version) => { - let path = commit_handler - .resolve_version(&base_path, version, &object_store.inner) - .await?; - ManifestLocation { - version, - path, - size: None, - } + commit_handler + .resolve_version_location(&base_path, version, &object_store.inner) + .await? } None => commit_handler .resolve_latest_location(&base_path, &object_store) diff --git a/rust/lance/src/io/commit/dynamodb.rs b/rust/lance/src/io/commit/dynamodb.rs index ab393010027..d54497c1a6d 100644 --- a/rust/lance/src/io/commit/dynamodb.rs +++ b/rust/lance/src/io/commit/dynamodb.rs @@ -313,7 +313,10 @@ mod test { let version_six_staging_location = base_path.child(format!("6.manifest-{}", uuid::Uuid::new_v4())); localfs - .rename(&manifest_path(&ds.base, 6), &version_six_staging_location) + .rename( + &ManifestNamingScheme::V1.manifest_path(&ds.base, 6), + &version_six_staging_location, + ) .await .unwrap(); store diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index adabbd33c6a..14bd842f373 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -13,7 +13,7 @@ mod test { use lance_table::io::commit::external_manifest::{ ExternalManifestCommitHandler, ExternalManifestStore, }; - use lance_table::io::commit::{manifest_path, CommitHandler}; + use lance_table::io::commit::{CommitHandler, ManifestNamingScheme}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use object_store::local::LocalFileSystem; use object_store::path::Path; @@ -303,7 +303,10 @@ mod test { let version_six_staging_location = base_path.child(format!("6.manifest-{}", uuid::Uuid::new_v4())); localfs - .rename(&manifest_path(&ds.base, 6), &version_six_staging_location) + .rename( + &ManifestNamingScheme::V1.manifest_path(&ds.base, 6), + &version_six_staging_location, + ) .await .unwrap(); { From 0114eebc839aed79d708ca06687d24a802ecba83 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 29 Aug 2024 18:52:07 -0700 Subject: [PATCH 04/11] fix test --- rust/lance-table/src/format/manifest.rs | 10 ---- rust/lance-table/src/io/commit.rs | 10 +++- .../src/io/commit/external_manifest.rs | 52 ++++++++++--------- 3 files changed, 37 insertions(+), 35 deletions(-) diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index f1113dc0997..9fd92c37032 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -17,7 +17,6 @@ use prost_types::Timestamp; use super::Fragment; use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS}; use crate::format::pb; -use crate::io::commit::ManifestNamingScheme; use lance_core::cache::FileMetadataCache; use lance_core::datatypes::Schema; use lance_core::{Error, Result}; @@ -280,15 +279,6 @@ impl Manifest { pub fn should_use_legacy_format(&self) -> bool { self.data_storage_format.version == LEGACY_FORMAT_VERSION } - - pub fn manifest_naming_scheme(&self) -> ManifestNamingScheme { - // TODO: Make this user configurable - if std::env::var("LANCE_USE_V2_VERSION_NAMES").is_ok() { - ManifestNamingScheme::V2 - } else { - ManifestNamingScheme::V1 - } - } } #[derive(Debug, Clone, PartialEq)] diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 9892647e1f8..cebaa923e4e 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -68,7 +68,7 @@ pub enum ManifestNamingScheme { /// /// Zero-padded and reversed for O(1) lookup of latest version on object stores. /// Lexicographically, the first file in the directory should always be the - /// latest manifest.. + /// latest manifest. V2, } @@ -106,6 +106,14 @@ impl ManifestNamingScheme { None } } + + pub fn detect_scheme_staging(filename: &str) -> Self { + if filename.chars().nth(20) == Some('.') { + Self::V2 + } else { + Self::V1 + } + } } /// Function that writes the manifest to the object store. diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 5cad514bdf2..3c1017d11a8 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use async_trait::async_trait; use lance_core::{Error, Result}; -use lance_io::object_store::ObjectStore; +use lance_io::object_store::{ObjectStore, ObjectStoreExt}; use log::warn; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::{location, Location}; @@ -177,13 +177,10 @@ impl CommitHandler for ExternalManifestCommitHandler { } // Detect naming scheme based on presence of zero padding. - let naming_scheme = if path.chars().nth(20) == Some('.') { - ManifestNamingScheme::V2 - } else { - ManifestNamingScheme::V1 - }; - let staged_path = Path::parse(&path)?; + let naming_scheme = + ManifestNamingScheme::detect_scheme_staging(staged_path.filename().unwrap()); + self.finalize_manifest( base_path, &staged_path, @@ -239,40 +236,47 @@ impl CommitHandler for ExternalManifestCommitHandler { location: location!(), })? .path; - - // best effort put, if it fails, it's okay - match self - .external_manifest_store - .put_if_not_exists(base_path.as_ref(), version, path.as_ref()) - .await - { - Ok(_) => {} - Err(e) => { - warn!( + if object_store.exists(&path).await? { + // best effort put, if it fails, it's okay + match self + .external_manifest_store + .put_if_not_exists(base_path.as_ref(), version, path.as_ref()) + .await + { + Ok(_) => {} + Err(e) => { + warn!( "could up update external manifest store during load, with error: {}", e ); + } } + return Ok(path); + } else { + return Err(Error::NotFound { + uri: path.to_string(), + location: location!(), + }); } - return Ok(path); } Err(e) => return Err(e), }; // finalized path, just return - let manifest_path = Path::parse(path)?; - if manifest_path.extension() == Some(MANIFEST_EXTENSION) { - return Ok(manifest_path); + let current_path = Path::parse(path)?; + if current_path.extension() == Some(MANIFEST_EXTENSION) { + return Ok(current_path); } - let scheme = detect_naming_scheme_from_path(&manifest_path)?; + let naming_scheme = + ManifestNamingScheme::detect_scheme_staging(current_path.filename().unwrap()); self.finalize_manifest( base_path, - &Path::parse(&manifest_path)?, + &Path::parse(¤t_path)?, version, object_store, - scheme, + naming_scheme, ) .await } From 67e5c75e3128678a76e09aed37f39c0a83a84453 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 30 Aug 2024 09:40:15 -0700 Subject: [PATCH 05/11] parameters to enable new ids --- python/python/lance/dataset.py | 4 + python/python/tests/test_dataset.py | 9 ++ python/src/dataset.rs | 8 ++ rust/lance/src/dataset.rs | 115 +++++++++++++++++-- rust/lance/src/dataset/builder.rs | 17 +-- rust/lance/src/dataset/fragment.rs | 5 +- rust/lance/src/dataset/optimize.rs | 2 + rust/lance/src/dataset/schema_evolution.rs | 3 + rust/lance/src/dataset/write.rs | 5 + rust/lance/src/dataset/write/merge_insert.rs | 1 + rust/lance/src/dataset/write/update.rs | 1 + rust/lance/src/index.rs | 3 + rust/lance/src/io/commit.rs | 6 +- rust/lance/src/utils/test.rs | 14 ++- 14 files changed, 170 insertions(+), 23 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 1bd9a630318..f89d3910f05 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1751,6 +1751,7 @@ def commit( read_version: Optional[int] = None, commit_lock: Optional[CommitLock] = None, storage_options: Optional[Dict[str, str]] = None, + enable_v2_manifest_paths: Optional[bool] = None, ) -> LanceDataset: """Create a new version of dataset @@ -1831,6 +1832,7 @@ def commit( read_version, commit_lock, storage_options=storage_options, + enable_v2_manifest_paths=enable_v2_manifest_paths, ) return LanceDataset(base_uri, storage_options=storage_options) @@ -2818,6 +2820,7 @@ def write_dataset( storage_options: Optional[Dict[str, str]] = None, data_storage_version: str = "legacy", use_legacy_format: Optional[bool] = None, + enable_v2_manifest_paths: bool = False, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -2897,6 +2900,7 @@ def write_dataset( "progress": progress, "storage_options": storage_options, "data_storage_version": data_storage_version, + "enable_v2_manifest_paths": enable_v2_manifest_paths, } if commit_lock: diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index a4d0341137a..7b04ff1f4f1 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -251,6 +251,15 @@ def test_asof_checkout(tmp_path: Path): assert len(ds.to_table()) == 9 +def test_v2_manifest_paths(tmp_path: Path): + lance.write_dataset( + pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=True + ) + manifest_path = os.listdir(tmp_path / "_versions") + assert len(manifest_path) == 1 + assert re.match(r"\d{20}\.manifest", manifest_path[0]) + + def test_tag(tmp_path: Path): table = pa.Table.from_pydict({"colA": [1, 2, 3], "colB": [4, 5, 6]}) base_dir = tmp_path / "test" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 822a2b7989f..929b92d7f62 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1218,6 +1218,7 @@ impl Dataset { read_version: Option, commit_lock: Option<&PyAny>, storage_options: Option>, + enable_v2_manifest_paths: Option, ) -> PyResult { let object_store_params = storage_options @@ -1255,6 +1256,7 @@ impl Dataset { object_store_params, commit_handler, object_store_registry, + enable_v2_manifest_paths.unwrap_or(false), ) .await })? @@ -1464,6 +1466,12 @@ pub fn get_write_params(options: &PyDict) -> PyResult> { }); } + if let Some(enable_v2_manifest_paths) = + get_dict_opt::(options, "enable_v2_manifest_paths")? + { + p.enable_v2_manifest_paths = enable_v2_manifest_paths; + } + p.commit_handler = get_commit_handler(options); Some(p) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6db4d0edf2a..bf80cb2dc46 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -101,6 +101,7 @@ pub struct Dataset { pub(crate) manifest: Arc, pub(crate) session: Arc, pub tags: Tags, + pub manifest_naming_scheme: ManifestNamingScheme, } /// Dataset Version @@ -345,6 +346,7 @@ impl Dataset { manifest, self.session.clone(), self.commit_handler.clone(), + manifest_location.naming_scheme, ) .await } @@ -427,6 +429,7 @@ impl Dataset { manifest: Manifest, session: Arc, commit_handler: Arc, + manifest_naming_scheme: ManifestNamingScheme, ) -> Result { let tags = Tags::new( object_store.clone(), @@ -441,6 +444,7 @@ impl Dataset { commit_handler, session, tags, + manifest_naming_scheme, }) } @@ -528,6 +532,14 @@ impl Dataset { } } + let manifest_naming_scheme = if let Some(d) = dataset.as_ref() { + d.manifest_naming_scheme + } else if params.enable_v2_manifest_paths { + ManifestNamingScheme::V2 + } else { + ManifestNamingScheme::V1 + }; + let params = params; // discard mut if let Some(d) = dataset.as_ref() { @@ -579,6 +591,7 @@ impl Dataset { &transaction, &manifest_config, &Default::default(), + manifest_naming_scheme, ) .await? } else { @@ -588,6 +601,7 @@ impl Dataset { &base, &transaction, &manifest_config, + manifest_naming_scheme, ) .await? }; @@ -602,6 +616,7 @@ impl Dataset { session: Arc::new(Session::default()), commit_handler, tags, + manifest_naming_scheme, }) } @@ -672,6 +687,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -758,6 +774,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?, ); @@ -836,6 +853,7 @@ impl Dataset { store_params: Option, commit_handler: Option>, object_store_registry: Arc, + enable_v2_manifest_paths: bool, ) -> Result { let read_version = read_version.map_or_else( || match operation { @@ -889,6 +907,14 @@ impl Dataset { None }; + let manifest_naming_scheme = if let Some(ds) = &dataset { + ds.manifest_naming_scheme + } else if enable_v2_manifest_paths { + ManifestNamingScheme::V2 + } else { + ManifestNamingScheme::V1 + }; + let transaction = Transaction::new(read_version, operation, None); let manifest = if let Some(dataset) = &dataset { @@ -899,6 +925,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + manifest_naming_scheme, ) .await? } else { @@ -908,6 +935,7 @@ impl Dataset { &base, &transaction, &Default::default(), + manifest_naming_scheme, ) .await? }; @@ -923,6 +951,7 @@ impl Dataset { session: Arc::new(Session::default()), commit_handler, tags, + manifest_naming_scheme, }) } @@ -1082,6 +1111,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -1381,6 +1411,7 @@ impl Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -1424,12 +1455,11 @@ impl DatasetTakeRows for Dataset { #[derive(Debug)] pub(crate) struct ManifestWriteConfig { - auto_set_feature_flags: bool, // default true - timestamp: Option, // default None - use_move_stable_row_ids: bool, // default false - use_legacy_format: Option, // default None - storage_format: Option, // default None - manifest_naming_scheme: ManifestNamingScheme, // default V1. + auto_set_feature_flags: bool, // default true + timestamp: Option, // default None + use_move_stable_row_ids: bool, // default false + use_legacy_format: Option, // default None + storage_format: Option, // default None } impl Default for ManifestWriteConfig { @@ -1440,7 +1470,6 @@ impl Default for ManifestWriteConfig { use_move_stable_row_ids: false, use_legacy_format: None, storage_format: None, - manifest_naming_scheme: ManifestNamingScheme::V1, } } } @@ -1453,6 +1482,7 @@ pub(crate) async fn write_manifest_file( manifest: &mut Manifest, indices: Option>, config: &ManifestWriteConfig, + naming_scheme: ManifestNamingScheme, ) -> std::result::Result<(), CommitError> { if config.auto_set_feature_flags { apply_feature_flags(manifest, config.use_move_stable_row_ids)?; @@ -1469,7 +1499,7 @@ pub(crate) async fn write_manifest_file( base_path, object_store, write_manifest_file_to_path, - config.manifest_naming_scheme, + naming_scheme, ) .await?; @@ -1961,8 +1991,8 @@ mod tests { use_move_stable_row_ids: false, use_legacy_format: None, storage_format: None, - manifest_naming_scheme: ManifestNamingScheme::V1, }, + dataset.manifest_naming_scheme, ) .await .unwrap(); @@ -2629,6 +2659,73 @@ mod tests { assert!(matches!(result.unwrap_err(), Error::DatasetNotFound { .. })); } + fn assert_all_manifests_use_scheme(test_dir: &TempDir, scheme: ManifestNamingScheme) { + let entries_names = test_dir + .path() + .join("_versions") + .read_dir() + .unwrap() + .map(|entry| entry.unwrap().file_name().into_string().unwrap()) + .collect::>(); + assert!( + entries_names + .iter() + .all(|name| ManifestNamingScheme::detect_scheme(name) == Some(scheme)), + "Entries: {:?}", + entries_names + ); + } + + #[tokio::test] + async fn test_v2_manifest_path_create() { + // Can create a dataset, using V2 paths + let data = lance_datagen::gen() + .col("key", array::step::()) + .into_batch_rows(RowCount::from(10)) + .unwrap(); + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + Dataset::write( + RecordBatchIterator::new([Ok(data.clone())], data.schema().clone()), + test_uri, + Some(WriteParams { + enable_v2_manifest_paths: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); + + // Appending to it will continue to use those paths + let dataset = Dataset::write( + RecordBatchIterator::new([Ok(data.clone())], data.schema().clone()), + test_uri, + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); + + UpdateBuilder::new(Arc::new(dataset)) + .update_where("key = 5") + .unwrap() + .set("key", "200") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + + assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); + } + #[rstest] #[tokio::test] async fn test_merge( diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 08a05e7c6f9..13ac1ac3d8c 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -281,16 +281,15 @@ impl DatasetBuilder { } } - let manifest = if manifest.is_some() { - let mut manifest = manifest.unwrap(); + let (manifest, manifest_naming_scheme) = if let Some(mut manifest) = manifest { + let location = commit_handler + .resolve_version_location(&base_path, manifest.version, &object_store.inner) + .await?; if manifest.schema.has_dictionary_types() { - let path = commit_handler - .resolve_version(&base_path, manifest.version, &object_store.inner) - .await?; - let reader = object_store.open(&path).await?; + let reader = object_store.open(&location.path).await?; populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; } - manifest + (manifest, location.naming_scheme) } else { let manifest_location = match version { Some(version) => { @@ -308,7 +307,8 @@ impl DatasetBuilder { })?, }; - Dataset::load_manifest(&object_store, &manifest_location).await? + let manifest = Dataset::load_manifest(&object_store, &manifest_location).await?; + (manifest, manifest_location.naming_scheme) }; Dataset::checkout_manifest( @@ -318,6 +318,7 @@ impl DatasetBuilder { manifest, session, commit_handler, + manifest_naming_scheme, ) .await } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 9dc3cfaa12b..f328a87a9fa 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2246,7 +2246,7 @@ mod tests { }; let registry = Arc::new(ObjectStoreRegistry::default()); - let new_dataset = Dataset::commit(test_uri, op, None, None, None, registry) + let new_dataset = Dataset::commit(test_uri, op, None, None, None, registry, false) .await .unwrap(); @@ -2346,7 +2346,7 @@ mod tests { }; let registry = Arc::new(ObjectStoreRegistry::default()); - let dataset = Dataset::commit(test_uri, op, None, None, None, registry) + let dataset = Dataset::commit(test_uri, op, None, None, None, registry, false) .await .unwrap(); @@ -2572,6 +2572,7 @@ mod tests { None, None, registry, + false, ) .await?; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index bbac0aac2c3..cfe4bf67ba8 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -603,6 +603,7 @@ async fn reserve_fragment_ids( &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; @@ -896,6 +897,7 @@ pub async fn commit_compaction( &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index 05d8f8caf40..9a5865159ea 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -228,6 +228,7 @@ pub(super) async fn add_columns( &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; @@ -469,6 +470,7 @@ pub(super) async fn alter_columns( &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; @@ -517,6 +519,7 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index a1a5f677504..69cc6a5e8a1 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -111,6 +111,10 @@ pub struct WriteParams { /// secondary indices need to be updated to point to new row ids. pub enable_move_stable_row_ids: bool, + /// If set to true, the writer will use v2 manifest paths. These allow + /// O(1) lookups for the latest manifest on object storage. + pub enable_v2_manifest_paths: bool, + pub object_store_registry: Arc, } @@ -128,6 +132,7 @@ impl Default for WriteParams { commit_handler: None, data_storage_version: None, enable_move_stable_row_ids: false, + enable_v2_manifest_paths: false, object_store_registry: Arc::new(ObjectStoreRegistry::default()), } } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 352db503adc..37feaacf6f7 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -991,6 +991,7 @@ impl MergeInsertJob { &transaction, &Default::default(), &Default::default(), + dataset.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 6b970dea50d..6fbe47d4303 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -335,6 +335,7 @@ impl UpdateJob { &transaction, &Default::default(), &Default::default(), + self.dataset.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 2b1962ab436..969a719376a 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -324,6 +324,7 @@ impl DatasetIndexExt for Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -393,6 +394,7 @@ impl DatasetIndexExt for Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; @@ -479,6 +481,7 @@ impl DatasetIndexExt for Dataset { &transaction, &Default::default(), &Default::default(), + self.manifest_naming_scheme, ) .await?; diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 83c18e93443..be2c75723ab 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -29,7 +29,7 @@ use lance_file::version::LanceFileVersion; use lance_table::format::{ pb, DataStorageFormat, DeletionFile, Fragment, Index, Manifest, WriterVersion, }; -use lance_table::io::commit::{CommitConfig, CommitError, CommitHandler}; +use lance_table::io::commit::{CommitConfig, CommitError, CommitHandler, ManifestNamingScheme}; use lance_table::io::deletion::read_deletion_file; use rand::Rng; use snafu::{location, Location}; @@ -121,6 +121,7 @@ pub(crate) async fn commit_new_dataset( base_path: &Path, transaction: &Transaction, write_config: &ManifestWriteConfig, + manifest_naming_scheme: ManifestNamingScheme, ) -> Result { let transaction_file = write_transaction_file(object_store, base_path, transaction).await?; @@ -138,6 +139,7 @@ pub(crate) async fn commit_new_dataset( Some(indices.clone()) }, write_config, + manifest_naming_scheme, ) .await?; @@ -415,6 +417,7 @@ pub(crate) async fn commit_transaction( transaction: &Transaction, write_config: &ManifestWriteConfig, commit_config: &CommitConfig, + manifest_naming_scheme: ManifestNamingScheme, ) -> Result { // Note: object_store has been configured with WriteParams, but dataset.object_store() // has not necessarily. So for anything involving writing, use `object_store`. @@ -504,6 +507,7 @@ pub(crate) async fn commit_transaction( Some(indices.clone()) }, write_config, + manifest_naming_scheme, ) .await; diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 16b2ef87c61..170d20d0a6f 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -113,9 +113,17 @@ impl TestDatasetGenerator { let operation = Operation::Overwrite { fragments, schema }; let registry = Arc::new(ObjectStoreRegistry::default()); - Dataset::commit(uri, operation, None, Default::default(), None, registry) - .await - .unwrap() + Dataset::commit( + uri, + operation, + None, + Default::default(), + None, + registry, + false, + ) + .await + .unwrap() } fn make_schema(&self, rng: &mut impl Rng) -> Schema { From bc5de79b12344498511868f250a578ca79831b16 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 30 Aug 2024 13:31:06 -0700 Subject: [PATCH 06/11] add migration method --- python/python/lance/dataset.py | 8 ++ python/python/tests/test_dataset.py | 15 ++++ python/src/dataset.rs | 8 ++ rust/lance-table/src/io/commit.rs | 116 ++++++++++++++++++++++++++++ rust/lance/src/dataset.rs | 24 +++++- 5 files changed, 169 insertions(+), 2 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index f89d3910f05..da0d3d9bad2 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1845,6 +1845,14 @@ def validate(self): """ self._ds.validate() + def migrate_manifest_paths_v2(self): + """ + Migrate the manifest paths to the new format. + + This will update the manifest to use the new v2 format for paths. + """ + self._ds.migrate_manifest_paths_v2() + @property def optimize(self) -> "DatasetOptimizer": return DatasetOptimizer(self) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 7b04ff1f4f1..b851cb83247 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -260,6 +260,21 @@ def test_v2_manifest_paths(tmp_path: Path): assert re.match(r"\d{20}\.manifest", manifest_path[0]) +def test_v2_manifest_paths_migration(tmp_path: Path): + # Create a dataset with v1 manifest paths + lance.write_dataset( + pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=False + ) + manifest_path = os.listdir(tmp_path / "_versions") + assert manifest_path == ["1.manifest"] + + # Migrate to v2 manifest paths + lance.dataset(tmp_path).migrate_manifest_paths_v2() + manifest_path = os.listdir(tmp_path / "_versions") + assert len(manifest_path) == 1 + assert re.match(r"\d{20}\.manifest", manifest_path[0]) + + def test_tag(tmp_path: Path): table = pa.Table.from_pydict({"colA": [1, 2, 3], "colB": [4, 5, 6]}) base_dir = tmp_path / "test" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 929b92d7f62..8e843b318d0 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1272,6 +1272,14 @@ impl Dataset { .map_err(|err| PyIOError::new_err(err.to_string())) } + fn migrate_manifest_paths_v2(&mut self) -> PyResult<()> { + let mut new_self = self.ds.as_ref().clone(); + RT.block_on(None, new_self.migrate_manifest_paths_v2())? + .map_err(|err| PyIOError::new_err(err.to_string()))?; + self.ds = Arc::new(new_self); + Ok(()) + } + fn drop_columns(&mut self, columns: Vec<&str>) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); RT.block_on(None, new_self.drop_columns(&columns))? diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index cebaa923e4e..89a2f9cc8c8 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -116,6 +116,39 @@ impl ManifestNamingScheme { } } +/// Migrate all V1 manifests to V2 naming scheme. +/// +/// This function will rename all V1 manifests to V2 naming scheme. +/// +/// This function is idempotent, and can be run multiple times without +/// changing the state of the object store. +/// +/// However, it should not be run while other concurrent operations are happening. +/// And it should also run until completion before resuming other operations. +pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> { + object_store + .inner + .list(Some(&dataset_base.child(VERSIONS_DIR))) + .try_filter(|res| { + let res = if let Some(filename) = res.location.filename() { + ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1) + } else { + false + }; + future::ready(res) + }) + .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move { + let filename = meta.location.filename().unwrap(); + let version = ManifestNamingScheme::V1.parse_version(filename).unwrap(); + let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version); + object_store.inner.rename(&meta.location, &path).await?; + Ok(()) + }) + .await?; + + Ok(()) +} + /// Function that writes the manifest to the object store. pub type ManifestWriter = for<'a> fn( object_store: &'a ObjectStore, @@ -814,3 +847,86 @@ impl Default for CommitConfig { Self { num_retries: 20 } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_manifest_naming_scheme() { + let v1 = ManifestNamingScheme::V1; + let v2 = ManifestNamingScheme::V2; + + assert_eq!( + v1.manifest_path(&Path::from("base"), 0), + Path::from("base/_versions/0.manifest") + ); + assert_eq!( + v1.manifest_path(&Path::from("base"), 42), + Path::from("base/_versions/42.manifest") + ); + + assert_eq!( + v2.manifest_path(&Path::from("base"), 0), + Path::from("base/_versions/18446744073709551615.manifest") + ); + assert_eq!( + v2.manifest_path(&Path::from("base"), 42), + Path::from("base/_versions/18446744073709551573.manifest") + ); + + assert_eq!(v1.parse_version("0.manifest"), Some(0)); + assert_eq!(v1.parse_version("42.manifest"), Some(42)); + assert_eq!( + v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"), + Some(42) + ); + + assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0)); + assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42)); + assert_eq!( + v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"), + Some(42) + ); + + assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1)); + assert_eq!( + ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"), + Some(v2) + ); + assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None); + } + + #[tokio::test] + async fn test_manifest_naming_migration() { + let object_store = ObjectStore::memory(); + let base = Path::from("base"); + let versions_dir = base.child(VERSIONS_DIR); + + // Write two v1 files and one v1 + let original_files = vec![ + versions_dir.child("irrelevant"), + ManifestNamingScheme::V1.manifest_path(&base, 0), + ManifestNamingScheme::V2.manifest_path(&base, 1), + ]; + for path in original_files { + object_store.put(&path, b"".as_slice()).await.unwrap(); + } + + migrate_scheme_to_v2(&object_store, &base).await.unwrap(); + + let expected_files = vec![ + ManifestNamingScheme::V2.manifest_path(&base, 1), + ManifestNamingScheme::V2.manifest_path(&base, 0), + versions_dir.child("irrelevant"), + ]; + let actual_files = object_store + .inner + .list(Some(&versions_dir)) + .map_ok(|res| res.location) + .try_collect::>() + .await + .unwrap(); + assert_eq!(actual_files, expected_files); + } +} diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index bf80cb2dc46..6512f7bb311 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -25,8 +25,8 @@ use lance_table::format::{ DataStorageFormat, Fragment, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION, }; use lance_table::io::commit::{ - commit_handler_from_url, CommitError, CommitHandler, CommitLock, ManifestLocation, - ManifestNamingScheme, + commit_handler_from_url, migrate_scheme_to_v2, CommitError, CommitHandler, CommitLock, + ManifestLocation, ManifestNamingScheme, }; use lance_table::io::manifest::{read_manifest, write_manifest}; use log::warn; @@ -1278,6 +1278,14 @@ impl Dataset { Ok(()) } + + pub async fn migrate_manifest_paths_v2(&mut self) -> Result<()> { + migrate_scheme_to_v2(self.object_store(), &self.base).await?; + // We need to re-open. + let latest_version = self.latest_version_id().await?; + *self = self.checkout_version(latest_version).await?; + Ok(()) + } } /// # Schema Evolution @@ -2726,6 +2734,18 @@ mod tests { assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); } + #[tokio::test] + async fn test_v2_manifest_path_migration() { + let data = lance_datagen::gen() + .col("key", array::step::()) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + let mut dataset = Dataset::write(data, "memory://test", None).await.unwrap(); + assert_eq!(dataset.manifest_naming_scheme, ManifestNamingScheme::V1); + + dataset.migrate_manifest_paths_v2().await.unwrap(); + assert_eq!(dataset.manifest_naming_scheme, ManifestNamingScheme::V2); + } + #[rstest] #[tokio::test] async fn test_merge( From 89c73b8009fd52fb1335a7e7bf8e8b067888ca38 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 30 Aug 2024 14:03:48 -0700 Subject: [PATCH 07/11] document --- docs/format.rst | 19 +++++++++ java/core/lance-jni/src/blocking_dataset.rs | 1 + python/python/lance/dataset.py | 18 ++++++++ rust/lance-table/src/io/commit.rs | 3 +- rust/lance/src/dataset.rs | 46 +++++++++++++++------ rust/lance/src/dataset/fragment.rs | 1 + rust/lance/src/dataset/write.rs | 7 +++- 7 files changed, 79 insertions(+), 16 deletions(-) diff --git a/docs/format.rst b/docs/format.rst index 421b73374d5..612006a3524 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -338,6 +338,25 @@ systems and cloud object stores, with the notable except of AWS S3. For ones that lack this functionality, an external locking mechanism can be configured by the user. +Manifest Naming Schemes +~~~~~~~~~~~~~~~~~~~~~~~ + +Manifest files must use a consistent naming scheme. The names correspond to the +versions. That way we can open the right version of the dataset without having +to read all the manifests. It also makes it clear which file path is the next +one to be written. + +There are two naming schemes that can be used: + +1. V1: ``_versions/{version}.manifest``. This is the legacy naming scheme. +2. V2: ``_versions/{u64::MAX - version:020}.manifest``. This is the new naming + scheme. The version is zero-padded (to 20 digits) and subtracted from + ``u64::MAX``. This allows the versions to be sorted in descending order, + making it possible to find the latest manifest on object storage using a + single list call. + +It is an error for there to be a mixture of these two naming schemes. + .. _conflict_resolution: Conflict resolution diff --git a/java/core/lance-jni/src/blocking_dataset.rs b/java/core/lance-jni/src/blocking_dataset.rs index 47515b5e336..75ec6e30d18 100644 --- a/java/core/lance-jni/src/blocking_dataset.rs +++ b/java/core/lance-jni/src/blocking_dataset.rs @@ -92,6 +92,7 @@ impl BlockingDataset { None, None, object_store_registry, + false, // TODO: support enable_v2_manifest_paths ))?; Ok(Self { inner }) } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index da0d3d9bad2..b580534aeb9 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1789,6 +1789,12 @@ def commit( storage_options : optional, dict Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. + enable_v2_manifest_paths : bool, optional + If True, and this is a new dataset, uses the new V2 manifest paths. + These paths provide more efficient opening of datasets with many + versions on object stores. This parameter has no effect if the dataset + already exists. To migrate an existing dataset, instead use the + :meth:`migrate_manifest_paths_v2` method. Default is False. Returns ------- @@ -1850,6 +1856,12 @@ def migrate_manifest_paths_v2(self): Migrate the manifest paths to the new format. This will update the manifest to use the new v2 format for paths. + + This function is idempotent, and can be run multiple times without + changing the state of the object store. + + However, it should not be run while other concurrent operations are happening. + And it should also run until completion before resuming other operations. """ self._ds.migrate_manifest_paths_v2() @@ -2876,6 +2888,12 @@ def write_dataset( use_legacy_format : optional, bool, default None Deprecated method for setting the data storage version. Use the `data_storage_version` parameter instead. + enable_v2_manifest_paths : bool, optional + If True, and this is a new dataset, uses the new V2 manifest paths. + These paths provide more efficient opening of datasets with many + versions on object stores. This parameter has no effect if the dataset + already exists. To migrate an existing dataset, instead use the + :meth:`LanceDataset.migrate_manifest_paths_v2` method. Default is False. """ if use_legacy_format is not None: warnings.warn( diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 89a2f9cc8c8..0fabad85411 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -60,6 +60,7 @@ use crate::format::{Index, Manifest}; const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; +/// How manifest files should be named. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum ManifestNamingScheme { /// `_versions/{version}.manifest` @@ -67,8 +68,6 @@ pub enum ManifestNamingScheme { /// `_manifests/{u64::MAX - version}.manifest` /// /// Zero-padded and reversed for O(1) lookup of latest version on object stores. - /// Lexicographically, the first file in the directory should always be the - /// latest manifest. V2, } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6512f7bb311..eaeac2821c6 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -846,6 +846,11 @@ impl Dataset { /// * `operation` - A description of the change to commit /// * `read_version` - The version of the dataset that this change is based on /// * `store_params` Parameters controlling object store access to the manifest + /// * `enable_v2_manifest_paths`: If set to true, and this is a new dataset, uses the new v2 manifest + /// paths. These allow constant-time lookups for the latest manifest on object storage. + /// This parameter has no effect on existing datasets. To migrate an existing + /// dataset, use the [`Self::migrate_manifest_paths_v2`] method. + /// Default is False. pub async fn commit( base_uri: &str, operation: Operation, @@ -1279,6 +1284,35 @@ impl Dataset { Ok(()) } + /// Migrate the dataset to use the new manifest path scheme. + /// + /// This function will rename all V1 manifests to [ManifestNamingScheme::V2]. + /// These paths provide more efficient opening of datasets with many versions + /// on object stores. + /// + /// This function is idempotent, and can be run multiple times without + /// changing the state of the object store. + /// + /// However, it should not be run while other concurrent operations are happening. + /// And it should also run until completion before resuming other operations. + /// + /// ```rust + /// # use lance::dataset::Dataset; + /// # use lance_table::io::commit::ManifestNamingScheme; + /// # use lance_datagen::{array, RowCount, BatchCount}; + /// # use arrow_array::types::Int32Type; + /// # let data = lance_datagen::gen() + /// # .col("key", array::step::()) + /// # .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + /// # let fut = async { + /// let mut dataset = Dataset::write(data, "memory://test", None).await.unwrap(); + /// assert_eq!(dataset.manifest_naming_scheme, ManifestNamingScheme::V1); + /// + /// dataset.migrate_manifest_paths_v2().await.unwrap(); + /// assert_eq!(dataset.manifest_naming_scheme, ManifestNamingScheme::V2); + /// # }; + /// # tokio::runtime::Runtime::new().unwrap().block_on(fut); + /// ``` pub async fn migrate_manifest_paths_v2(&mut self) -> Result<()> { migrate_scheme_to_v2(self.object_store(), &self.base).await?; // We need to re-open. @@ -2734,18 +2768,6 @@ mod tests { assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); } - #[tokio::test] - async fn test_v2_manifest_path_migration() { - let data = lance_datagen::gen() - .col("key", array::step::()) - .into_reader_rows(RowCount::from(10), BatchCount::from(1)); - let mut dataset = Dataset::write(data, "memory://test", None).await.unwrap(); - assert_eq!(dataset.manifest_naming_scheme, ManifestNamingScheme::V1); - - dataset.migrate_manifest_paths_v2().await.unwrap(); - assert_eq!(dataset.manifest_naming_scheme, ManifestNamingScheme::V2); - } - #[rstest] #[tokio::test] async fn test_merge( diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index f328a87a9fa..4d2ce5a562e 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2700,6 +2700,7 @@ mod tests { None, None, object_store_registry, + false, ) .await .unwrap(); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 69cc6a5e8a1..e2b32b64979 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -111,8 +111,11 @@ pub struct WriteParams { /// secondary indices need to be updated to point to new row ids. pub enable_move_stable_row_ids: bool, - /// If set to true, the writer will use v2 manifest paths. These allow - /// O(1) lookups for the latest manifest on object storage. + /// If set to true, and this is a new dataset, uses the new v2 manifest paths. + /// These allow constant-time lookups for the latest manifest on object storage. + /// This parameter has no effect on existing datasets. To migrate an existing + /// dataset, use the [`super::Dataset::migrate_manifest_paths_v2`] method. + /// Default is False. pub enable_v2_manifest_paths: bool, pub object_store_registry: Arc, From 3957e0538d8f76b92dba40baf1288b90513b4d11 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 30 Aug 2024 15:12:43 -0700 Subject: [PATCH 08/11] fix dynamo test --- rust/lance/src/io/commit/dynamodb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/io/commit/dynamodb.rs b/rust/lance/src/io/commit/dynamodb.rs index d54497c1a6d..94502004926 100644 --- a/rust/lance/src/io/commit/dynamodb.rs +++ b/rust/lance/src/io/commit/dynamodb.rs @@ -47,7 +47,7 @@ mod test { use lance_table::io::commit::{ dynamodb::DynamoDBExternalManifestStore, external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore}, - manifest_path, CommitHandler, + CommitHandler, ManifestNamingScheme, }; fn read_params(handler: Arc) -> ReadParams { From e6a8217f7f43e3f14bfa73c8ac7d1762b071f346 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 4 Sep 2024 10:41:16 -0700 Subject: [PATCH 09/11] pr feedback --- python/python/lance/dataset.py | 6 ++++-- rust/lance-table/src/io/commit/external_manifest.rs | 2 +- rust/lance/src/dataset.rs | 5 +++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index b580534aeb9..c7edb1598a8 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1794,7 +1794,9 @@ def commit( These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the - :meth:`migrate_manifest_paths_v2` method. Default is False. + :meth:`migrate_manifest_paths_v2` method. Default is False. WARNING: + turning this on will make the dataset unreadable for older versions + of Lance (prior to 0.17.0). Returns ------- @@ -1860,7 +1862,7 @@ def migrate_manifest_paths_v2(self): This function is idempotent, and can be run multiple times without changing the state of the object store. - However, it should not be run while other concurrent operations are happening. + DANGER: this should not be run while other concurrent operations are happening. And it should also run until completion before resuming other operations. """ self._ds.migrate_manifest_paths_v2() diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 3c1017d11a8..0ddd9d4396c 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -246,7 +246,7 @@ impl CommitHandler for ExternalManifestCommitHandler { Ok(_) => {} Err(e) => { warn!( - "could up update external manifest store during load, with error: {}", + "could not update external manifest store during load, with error: {}", e ); } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index eaeac2821c6..c9d37b92abb 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -849,8 +849,9 @@ impl Dataset { /// * `enable_v2_manifest_paths`: If set to true, and this is a new dataset, uses the new v2 manifest /// paths. These allow constant-time lookups for the latest manifest on object storage. /// This parameter has no effect on existing datasets. To migrate an existing - /// dataset, use the [`Self::migrate_manifest_paths_v2`] method. - /// Default is False. + /// dataset, use the [`Self::migrate_manifest_paths_v2`] method. WARNING: turning + /// this on will make the dataset unreadable for older versions of Lance + /// (prior to 0.17.0). Default is False. pub async fn commit( base_uri: &str, operation: Operation, From 9ba0fcaf7bc700d2731081a0f0e15c3131617567 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 4 Sep 2024 12:36:25 -0700 Subject: [PATCH 10/11] sanity checks --- rust/lance-io/src/object_store.rs | 16 +++++++++++++++ rust/lance-io/src/scheduler.rs | 2 ++ rust/lance-table/src/io/commit.rs | 34 +++++++++++++++++++++++++++---- rust/lance/src/dataset/builder.rs | 1 + 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index b4fd2ed2007..67e82df8ba7 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -93,7 +93,12 @@ pub struct ObjectStore { pub inner: Arc, scheme: String, block_size: usize, + /// Whether to use constant size upload parts for multipart uploads. This + /// is only necessary for Cloudflare R2. pub use_constant_size_upload_parts: bool, + /// Whether we can assume that the list of files is lexically ordered. This + /// is true for object stores, but not for local filesystems. + pub list_is_lexically_ordered: bool, io_parallelism: usize, } @@ -338,6 +343,7 @@ pub struct ObjectStoreParams { /// is false, max upload size is 2.5TB. When this is true, the max size is /// 50GB. pub use_constant_size_upload_parts: bool, + pub list_is_lexically_ordered: Option, } impl Default for ObjectStoreParams { @@ -350,6 +356,7 @@ impl Default for ObjectStoreParams { object_store_wrapper: None, storage_options: None, use_constant_size_upload_parts: false, + list_is_lexically_ordered: None, } } } @@ -431,6 +438,7 @@ impl ObjectStore { scheme: String::from(scheme), block_size: 4 * 1024, // 4KB block size use_constant_size_upload_parts: false, + list_is_lexically_ordered: false, io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, }, Path::from_absolute_path(expanded_path.as_path())?, @@ -456,6 +464,7 @@ impl ObjectStore { scheme: String::from("file"), block_size: 4 * 1024, // 4KB block size use_constant_size_upload_parts: false, + list_is_lexically_ordered: false, io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, } } @@ -467,6 +476,7 @@ impl ObjectStore { scheme: String::from("memory"), block_size: 64 * 1024, use_constant_size_upload_parts: false, + list_is_lexically_ordered: true, io_parallelism: get_num_compute_intensive_cpus(), } } @@ -801,6 +811,7 @@ async fn configure_store( scheme: String::from(url.scheme()), block_size: 64 * 1024, use_constant_size_upload_parts, + list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, }) } @@ -818,6 +829,7 @@ async fn configure_store( scheme: String::from("gs"), block_size: 64 * 1024, use_constant_size_upload_parts: false, + list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, }) } @@ -831,6 +843,7 @@ async fn configure_store( scheme: String::from("az"), block_size: 64 * 1024, use_constant_size_upload_parts: false, + list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, }) } @@ -847,6 +860,7 @@ async fn configure_store( scheme: String::from("memory"), block_size: 64 * 1024, use_constant_size_upload_parts: false, + list_is_lexically_ordered: true, io_parallelism: get_num_compute_intensive_cpus(), }), unknown_scheme => { @@ -870,6 +884,7 @@ impl ObjectStore { block_size: Option, wrapper: Option>, use_constant_size_upload_parts: bool, + list_is_lexically_ordered: bool, io_parallelism: usize, ) -> Self { let scheme = location.scheme(); @@ -885,6 +900,7 @@ impl ObjectStore { scheme: scheme.into(), block_size, use_constant_size_upload_parts, + list_is_lexically_ordered, io_parallelism, } } diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 07e5e76751b..747e684e549 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -741,6 +741,7 @@ mod tests { None, None, false, + false, 1, )); @@ -828,6 +829,7 @@ mod tests { None, None, false, + false, 1, )); diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 0fabad85411..e7e3b230210 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -32,6 +32,7 @@ use futures::{ stream::BoxStream, StreamExt, TryStreamExt, }; +use log::warn; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use url::Url; @@ -191,13 +192,38 @@ async fn current_manifest_path( }); let first = valid_manifests.next().await.transpose()?; - match first { + match (first, object_store.list_is_lexically_ordered) { // If the first valid manifest we see is V2, we can assume that we are using // V2 naming scheme for all manifests. - Some((scheme @ ManifestNamingScheme::V2, meta)) => { + (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => { let version = scheme .parse_version(meta.location.filename().unwrap()) .unwrap(); + + // Sanity check: verify at least for the first 1k files that they are all V2 + // and that the version numbers are decreasing. We use the first 1k because + // this is the typical size of an object store list endpoint response page. + for (scheme, meta) in valid_manifests.take(999).try_collect::>().await? { + if scheme != ManifestNamingScheme::V2 { + warn!( + "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \ + to migrate the directory." + ); + break; + } + let next_version = scheme + .parse_version(meta.location.filename().unwrap()) + .unwrap(); + if next_version >= version { + warn!( + "List operation was expected to be lexically ordered, but was not. This \ + could mean a corrupt read. Please make a bug report on the lancedb/lance \ + GitHub repository." + ); + break; + } + } + Ok(ManifestLocation { version, path: meta.location, @@ -208,7 +234,7 @@ async fn current_manifest_path( // If the first valid manifest we see if V1, assume for now that we are // using V1 naming scheme for all manifests. Since we are listing the // directory anyways, we will assert there aren't any V2 manifests. - Some((scheme @ ManifestNamingScheme::V1, meta)) => { + (Some((scheme, meta)), _) => { let mut current_version = scheme .parse_version(meta.location.filename().unwrap()) .unwrap(); @@ -236,7 +262,7 @@ async fn current_manifest_path( naming_scheme: scheme, }) } - None => Err(Error::NotFound { + (None, _) => Err(Error::NotFound { uri: base.child(VERSIONS_DIR).to_string(), location: location!(), }), diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 13ac1ac3d8c..5d7037b6012 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -228,6 +228,7 @@ impl DatasetBuilder { self.options.block_size, self.options.object_store_wrapper, self.options.use_constant_size_upload_parts, + store.1.scheme() != "file", // If user supplied an object store then we just assume it's probably // cloud-like DEFAULT_CLOUD_IO_PARALLELISM, From c033028b1e54a11ae36f3d56b27f97f93ae48d94 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 4 Sep 2024 16:27:22 -0700 Subject: [PATCH 11/11] add test --- rust/lance/src/dataset.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c9d37b92abb..36e074387c9 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2769,6 +2769,37 @@ mod tests { assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); } + #[tokio::test] + async fn test_v2_manifest_path_commit() { + let schema = Schema::try_from(&ArrowSchema::new(vec![ArrowField::new( + "x", + DataType::Int32, + false, + )])) + .unwrap(); + let operation = Operation::Overwrite { + fragments: vec![], + schema, + }; + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + let dataset = Dataset::commit( + test_uri, + operation, + None, + None, + None, + Arc::new(ObjectStoreRegistry::default()), + true, // enable_v2_manifest_paths + ) + .await + .unwrap(); + + assert!(dataset.manifest_naming_scheme == ManifestNamingScheme::V2); + + assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); + } + #[rstest] #[tokio::test] async fn test_merge(