From d42efa8e76f1b925e6a659f197466cc9a3fe1cdf Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 11 Nov 2024 13:55:03 -0800 Subject: [PATCH 01/10] perf: optimize reading transactions in commit loop --- rust/lance-table/src/format/fragment.rs | 13 ++- rust/lance/src/dataset/transaction.rs | 13 ++- rust/lance/src/io/commit.rs | 149 +++++++++++++++--------- 3 files changed, 108 insertions(+), 67 deletions(-) diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 76125c3c94..475b2fb23e 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use deepsize::DeepSizeOf; use lance_core::Error; use lance_file::format::{MAJOR_VERSION, MINOR_VERSION}; use lance_file::version::LanceFileVersion; @@ -16,7 +17,7 @@ use lance_core::error::Result; /// Lance Data File /// /// A data file is one piece of file storing data. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub struct DataFile { /// Relative path of the data file to dataset root. pub path: String, @@ -144,7 +145,7 @@ impl TryFrom for DataFile { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] #[serde(rename_all = "lowercase")] pub enum DeletionFileType { Array, @@ -161,7 +162,7 @@ impl DeletionFileType { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub struct DeletionFile { pub read_version: u64, pub id: u64, @@ -199,7 +200,7 @@ impl TryFrom for DeletionFile { } /// A reference to a part of a file. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub struct ExternalFile { pub path: String, pub offset: u64, @@ -207,7 +208,7 @@ pub struct ExternalFile { } /// Metadata about location of the row id sequence. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub enum RowIdMeta { Inline(Vec), External(ExternalFile), @@ -234,7 +235,7 @@ impl TryFrom for RowIdMeta { /// /// A fragment is a set of files which represent the different columns of the same rows. /// If column exists in the schema, but the related file does not exist, treat this column as `nulls`. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] pub struct Fragment { /// Fragment ID pub id: u64, diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 9c9ba7a44a..9600844718 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -43,6 +43,7 @@ use std::{ sync::Arc, }; +use deepsize::DeepSizeOf; use lance_core::{datatypes::Schema, Error, Result}; use lance_file::{datatypes::Fields, version::LanceFileVersion}; use lance_io::object_store::ObjectStore; @@ -70,7 +71,7 @@ use lance_table::feature_flags::{apply_feature_flags, FLAG_MOVE_STABLE_ROW_IDS}; /// /// This contains enough information to be able to build the next manifest, /// given the current manifest. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, DeepSizeOf)] pub struct Transaction { /// The version of the table this transaction is based off of. If this is /// the first transaction, this should be 0. @@ -94,7 +95,7 @@ pub enum BlobsOperation { } /// An operation on a dataset. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, DeepSizeOf)] pub enum Operation { /// Adding new fragments to the dataset. The fragments contained within /// haven't yet been assigned a final ID. @@ -173,7 +174,13 @@ pub struct RewrittenIndex { pub new_id: Uuid, } -#[derive(Debug, Clone)] +impl DeepSizeOf for RewrittenIndex { + fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize { + 0 + } +} + +#[derive(Debug, Clone, DeepSizeOf)] pub struct RewriteGroup { pub old_fragments: Vec, pub new_fragments: Vec, diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 794536b84f..26b20e8a41 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -36,7 +36,7 @@ use rand::{thread_rng, Rng}; use snafu::{location, Location}; use futures::future::Either; -use futures::{StreamExt, TryStreamExt}; +use futures::{FutureExt, StreamExt, TryStreamExt}; use lance_core::{Error, Result}; use lance_index::DatasetIndexExt; use object_store::path::Path; @@ -67,6 +67,39 @@ async fn read_transaction_file( transaction.try_into() } +async fn read_dataset_transaction_file( + dataset: &Dataset, + version: u64, +) -> Result> { + let cache_path = dataset + .base + .child("_transactions") + .child(format!("{}.txn", version)); + dataset + .session + .file_metadata_cache + .get_or_insert(&cache_path, |_| async move { + let dataset_version = dataset.checkout_version(version).await?; + let object_store = dataset_version.object_store(); + let path = dataset_version + .manifest + .transaction_file + .as_ref() + .ok_or_else(|| Error::Internal { + message: format!( + "Dataset version {} does not have a transaction file", + version + ), + location: location!(), + })?; + let transaction = read_transaction_file(object_store, &dataset.base, path) + .await + .unwrap(); + Ok(transaction) + }) + .await +} + /// Write a transaction to a file and return the relative path. async fn write_transaction_file( object_store: &ObjectStore, @@ -86,7 +119,7 @@ async fn write_transaction_file( fn check_transaction( transaction: &Transaction, other_version: u64, - other_transaction: &Option, + other_transaction: Option<&Transaction>, ) -> Result<()> { if other_transaction.is_none() { return Err(crate::Error::Internal { @@ -637,32 +670,25 @@ pub(crate) async fn commit_transaction( // has not necessarily. So for anything involving writing, use `object_store`. let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?; - let mut dataset = dataset.clone(); // First, get all transactions since read_version - let mut other_transactions = Vec::new(); let mut version = transaction.read_version; - loop { + let other_transactions = std::iter::from_fn(|| { version += 1; - match dataset.checkout_version(version).await { - Ok(next_dataset) => { - let other_txn = if let Some(txn_file) = &next_dataset.manifest.transaction_file { - Some(read_transaction_file(object_store, &next_dataset.base, txn_file).await?) - } else { - None - }; - other_transactions.push(other_txn); - dataset = next_dataset; - } - Err(crate::Error::NotFound { .. }) | Err(crate::Error::DatasetNotFound { .. }) => { - break; - } - Err(e) => { - return Err(e); - } - } - } + Some(read_dataset_transaction_file(dataset, version)) + }); + let other_transactions = futures::stream::iter(other_transactions) + .buffered(10) + .take_while(|res| { + futures::future::ready(!matches!( + res, + Err(crate::Error::NotFound { .. }) | Err(crate::Error::DatasetNotFound { .. }) + )) + }) + .try_collect::>() + .await?; - let mut target_version = version; + let mut target_version = transaction.read_version + other_transactions.len() as u64 + 1; + let mut dataset = dataset.checkout_version(target_version - 1).await?; if is_detached_version(target_version) { return Err(Error::Internal { message: "more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.".into(), location: location!() }); @@ -671,7 +697,7 @@ pub(crate) async fn commit_transaction( // If any of them conflict with the transaction, return an error for (version_offset, other_transaction) in other_transactions.iter().enumerate() { let other_version = transaction.read_version + version_offset as u64 + 1; - check_transaction(transaction, other_version, other_transaction)?; + check_transaction(transaction, other_version, Some(other_transaction.as_ref()))?; } for attempt_i in 0..commit_config.num_retries { @@ -743,18 +769,24 @@ pub(crate) async fn commit_transaction( let backoff_time = backoff_time(attempt_i); tokio::time::sleep(backoff_time).await; - let latest_version = dataset.latest_version_id().await?; - for version in target_version..=latest_version { - dataset = dataset.checkout_version(version).await?; - let other_transaction = if let Some(txn_file) = - dataset.manifest.transaction_file.as_ref() - { - Some(read_transaction_file(object_store, &dataset.base, txn_file).await?) - } else { - None - }; - check_transaction(transaction, version, &other_transaction)?; - } + dataset.checkout_latest().await?; + let latest_version = dataset.manifest.version; + futures::stream::iter(target_version..=latest_version) + .map(|version| { + read_dataset_transaction_file(&dataset, version) + .map(move |res| res.map(|tx| (version, tx))) + }) + .buffer_unordered(10) + .and_then(|(version, other_transaction)| { + let res = check_transaction( + transaction, + version, + Some(other_transaction.as_ref()), + ); + futures::future::ready(res) + }) + .try_all(|_| futures::future::ready(true)) + .await?; target_version = latest_version + 1; } Err(CommitError::OtherError(err)) => { @@ -1127,7 +1159,7 @@ mod tests { } } - async fn get_empty_dataset() -> Dataset { + async fn get_empty_dataset() -> (tempfile::TempDir, Dataset) { let test_dir = tempfile::tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -1137,18 +1169,19 @@ mod tests { false, )])); - Dataset::write( + let ds = Dataset::write( RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()), test_uri, None, ) .await - .unwrap() + .unwrap(); + (test_dir, ds) } #[tokio::test] async fn test_good_concurrent_config_writes() { - let dataset = get_empty_dataset().await; + let (_tmpdir, dataset) = get_empty_dataset().await; // Test successful concurrent insert config operations let futures: Vec<_> = ["key1", "key2", "key3", "key4", "key5"] @@ -1202,7 +1235,7 @@ mod tests { async fn test_bad_concurrent_config_writes() { // If two concurrent insert config operations occur for the same key, a // `CommitConflict` should be returned - let dataset = get_empty_dataset().await; + let (_tmpdir, dataset) = get_empty_dataset().await; let futures: Vec<_> = ["key1", "key1", "key2", "key3", "key4"] .iter() @@ -1220,30 +1253,30 @@ mod tests { // Assert that either the first or the second operation fails let mut first_operation_failed = false; - let error_fragment = "Commit conflict for version"; for (i, result) in results.into_iter().enumerate() { + let result = result.unwrap(); match i { 0 => { - if !matches!(result, Ok(Ok(_))) { + if result.is_err() { first_operation_failed = true; - assert!(result - .unwrap() - .err() - .unwrap() - .to_string() - .contains(error_fragment)); + assert!( + matches!(&result, &Err(Error::CommitConflict { .. })), + "{:?}", + result, + ); } } 1 => match first_operation_failed { - true => assert!(matches!(result, Ok(Ok(_))), "{:?}", result), - false => assert!(result - .unwrap() - .err() - .unwrap() - .to_string() - .contains(error_fragment)), + true => assert!(result.is_ok(), "{:?}", result), + false => { + assert!( + matches!(&result, &Err(Error::CommitConflict { .. })), + "{:?}", + result, + ); + } }, - _ => assert!(matches!(result, Ok(Ok(_))), "{:?}", result), + _ => assert!(result.is_ok(), "{:?}", result), } } } From 398a25d71731939c30d017917b5da17ea9af8bb7 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 14 Nov 2024 09:30:56 -0800 Subject: [PATCH 02/10] wip: debug --- rust/lance/src/io/commit.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 26b20e8a41..7d4883cb86 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -75,6 +75,7 @@ async fn read_dataset_transaction_file( .base .child("_transactions") .child(format!("{}.txn", version)); + dbg!(&dataset.session.file_metadata_cache); dataset .session .file_metadata_cache @@ -758,6 +759,8 @@ pub(crate) async fn commit_transaction( match result { Ok(()) => { + // TODO: insert in cache here. + return Ok(manifest); } Err(CommitError::CommitConflict) => { From d2465933b330b49aefdbcb7f6c1d95122410c3ac Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 18 Nov 2024 14:00:35 -0800 Subject: [PATCH 03/10] get IOPS test passing --- rust/lance-core/src/cache.rs | 3 +- rust/lance-io/src/object_store.rs | 17 ++ rust/lance-table/src/io/commit.rs | 16 +- .../src/io/commit/external_manifest.rs | 25 ++- rust/lance-table/src/io/manifest.rs | 14 +- rust/lance/src/dataset.rs | 81 +++++---- rust/lance/src/dataset/builder.rs | 9 +- rust/lance/src/dataset/cleanup.rs | 2 +- rust/lance/src/dataset/optimize.rs | 5 +- rust/lance/src/dataset/schema_evolution.rs | 9 +- rust/lance/src/dataset/transaction.rs | 8 +- rust/lance/src/dataset/write/commit.rs | 167 ++++++++++++++++-- rust/lance/src/dataset/write/merge_insert.rs | 3 +- rust/lance/src/dataset/write/update.rs | 3 +- rust/lance/src/index.rs | 11 +- rust/lance/src/io/commit.rs | 71 +++++--- rust/lance/src/utils/test.rs | 47 ++++- 17 files changed, 364 insertions(+), 127 deletions(-) diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index 133b780fa3..8f357bda5d 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use deepsize::{Context, DeepSizeOf}; use futures::Future; -use moka::sync::Cache; +use moka::sync::{Cache, ConcurrentCacheExt}; use object_store::path::Path; use crate::utils::path::LancePathExt; @@ -114,6 +114,7 @@ impl FileMetadataCache { pub fn size(&self) -> usize { if let Some(cache) = self.cache.as_ref() { + cache.sync(); cache.entry_count() as usize } else { 0 diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index b3deb41599..f668cdfaae 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -400,6 +400,23 @@ impl ObjectStore { uri: &str, params: &ObjectStoreParams, ) -> Result<(Self, Path)> { + if let Some((store, path)) = params.object_store.as_ref() { + let mut inner = store.clone(); + if let Some(wrapper) = params.object_store_wrapper.as_ref() { + inner = wrapper.wrap(inner); + } + let store = Self { + inner, + scheme: path.scheme().to_string(), + block_size: params.block_size.unwrap_or(64 * 1024), + use_constant_size_upload_parts: params.use_constant_size_upload_parts, + list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(), + io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, + }; + let path = Path::from(path.path()); + return Ok((store, path)); + } let (object_store, path) = match Url::parse(uri) { Ok(url) if url.scheme().len() == 1 && cfg!(windows) => { // On Windows, the drive is parsed as a scheme diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 4b7eba92d5..6a2b558707 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -473,7 +473,7 @@ pub trait CommitHandler: Debug + Send + Sync { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError>; + ) -> std::result::Result; } async fn default_resolve_version( @@ -723,7 +723,7 @@ impl CommitHandler for UnsafeCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { // Log a one-time warning if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) { WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed); @@ -737,7 +737,7 @@ impl CommitHandler for UnsafeCommitHandler { // Write the manifest naively manifest_writer(object_store, manifest, indices, &version_path).await?; - Ok(()) + Ok(version_path) } } @@ -783,7 +783,7 @@ impl CommitHandler for T { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { let path = naming_scheme.manifest_path(base_path, manifest.version); // NOTE: once we have the lease we cannot use ? to return errors, since // we must release the lease before returning. @@ -812,7 +812,7 @@ impl CommitHandler for T { // Release the lock lease.release(res.is_ok()).await?; - res.map_err(|err| err.into()) + res.map_err(|err| err.into()).map(|_| path) } } @@ -826,7 +826,7 @@ impl CommitHandler for Arc { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { self.as_ref() .commit( manifest, @@ -855,7 +855,7 @@ impl CommitHandler for RenameCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { // Create a temporary object, then use `rename_if_not_exists` to commit. // If failed, clean up the temporary object. @@ -870,7 +870,7 @@ impl CommitHandler for RenameCommitHandler { .rename_if_not_exists(&tmp_path, &path) .await { - Ok(_) => Ok(()), + Ok(_) => Ok(path), Err(ObjectStoreError::AlreadyExists { .. }) => { // Another transaction has already been committed // Attempt to clean up temporary object, but ignore errors if we can't diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 75cd0003fd..bace96ea3e 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -307,7 +307,7 @@ impl CommitHandler for ExternalManifestCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, - ) -> std::result::Result<(), CommitError> { + ) -> std::result::Result { // path we get here is the path to the manifest we want to write // use object_store.base_path.as_ref() for getting the root of the dataset @@ -323,27 +323,26 @@ impl CommitHandler for ExternalManifestCommitHandler { .await .map_err(|_| CommitError::CommitConflict {}); - if res.is_err() { + if let Err(err) = res { // delete the staging manifest match object_store.inner.delete(&staging_path).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => {} Err(e) => return Err(CommitError::OtherError(e.into())), } - return res; + return Err(err); } let scheme = detect_naming_scheme_from_path(&path)?; - self.finalize_manifest( - base_path, - &staging_path, - manifest.version, - &object_store.inner, - scheme, - ) - .await?; - - Ok(()) + Ok(self + .finalize_manifest( + base_path, + &staging_path, + manifest.version, + &object_store.inner, + scheme, + ) + .await?) } } diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index 9d3c55e109..766b665a33 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -28,8 +28,16 @@ use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC}; /// /// This only reads manifest files. It does not read data files. #[instrument(level = "debug", skip(object_store))] -pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result { - let file_size = object_store.inner.head(path).await?.size; +pub async fn read_manifest( + object_store: &ObjectStore, + path: &Path, + known_size: Option, +) -> Result { + let file_size = if let Some(known_size) = known_size { + known_size as usize + } else { + object_store.inner.head(path).await?.size + }; const PREFETCH_SIZE: usize = 64 * 1024; let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize; let range = Range { @@ -263,7 +271,7 @@ mod test { .unwrap(); writer.shutdown().await.unwrap(); - let roundtripped_manifest = read_manifest(&store, &path).await.unwrap(); + let roundtripped_manifest = read_manifest(&store, &path, None).await.unwrap(); assert_eq!(manifest, roundtripped_manifest); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index b95900fa53..f29fee4800 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -108,6 +108,9 @@ pub struct Dataset { uri: String, pub(crate) base: Path, pub(crate) manifest: Arc, + // Path for the manifest that is loaded. Used to get additional information, + // such as the index metadata. + pub(crate) manifest_file: Path, pub(crate) session: Arc, pub tags: Tags, pub manifest_naming_scheme: ManifestNamingScheme, @@ -322,6 +325,7 @@ impl Dataset { base_path, self.uri.clone(), manifest, + manifest_location.path, self.session.clone(), self.commit_handler.clone(), manifest_location.naming_scheme, @@ -400,11 +404,13 @@ impl Dataset { Ok(manifest) } + #[allow(clippy::too_many_arguments)] async fn checkout_manifest( object_store: Arc, base_path: Path, uri: String, manifest: Manifest, + manifest_file: Path, session: Arc, commit_handler: Arc, manifest_naming_scheme: ManifestNamingScheme, @@ -419,6 +425,7 @@ impl Dataset { base: base_path, uri, manifest: Arc::new(manifest), + manifest_file, commit_handler, session, tags, @@ -486,12 +493,18 @@ impl Dataset { .commit_handler .resolve_version_location(&blobs_path, blobs_version, &self.object_store.inner) .await?; - let manifest = read_manifest(&self.object_store, &blob_manifest_location.path).await?; + let manifest = read_manifest( + &self.object_store, + &blob_manifest_location.path, + blob_manifest_location.size, + ) + .await?; let blobs_dataset = Self::checkout_manifest( self.object_store.clone(), blobs_path, format!("{}/{}", self.uri, BLOB_DIR), manifest, + blob_manifest_location.path, self.session.clone(), self.commit_handler.clone(), ManifestNamingScheme::V2, @@ -512,14 +525,11 @@ impl Dataset { } pub async fn latest_manifest(&self) -> Result { - read_manifest( - &self.object_store, - &self - .commit_handler - .resolve_latest_version(&self.base, &self.object_store) - .await?, - ) - .await + let location = self + .commit_handler + .resolve_latest_location(&self.base, &self.object_store) + .await?; + read_manifest(&self.object_store, &location.path, location.size).await } /// Read the transaction file for this version of the dataset. @@ -550,18 +560,19 @@ impl Dataset { None, ); - self.manifest = Arc::new( - commit_transaction( - self, - &self.object_store, - self.commit_handler.as_ref(), - &transaction, - &Default::default(), - &Default::default(), - self.manifest_naming_scheme, - ) - .await?, - ); + let (restored_manifest, path) = commit_transaction( + self, + &self.object_store, + self.commit_handler.as_ref(), + &transaction, + &Default::default(), + &Default::default(), + self.manifest_naming_scheme, + ) + .await?; + + self.manifest = Arc::new(restored_manifest); + self.manifest_file = path; Ok(()) } @@ -897,7 +908,7 @@ impl Dataset { None, ); - let manifest = commit_transaction( + let (manifest, path) = commit_transaction( self, &self.object_store, self.commit_handler.as_ref(), @@ -909,6 +920,7 @@ impl Dataset { .await?; self.manifest = Arc::new(manifest); + self.manifest_file = path; Ok(()) } @@ -925,10 +937,8 @@ impl Dataset { &self.object_store } - pub(crate) async fn manifest_file(&self, version: u64) -> Result { - self.commit_handler - .resolve_version(&self.base, version, &self.object_store.inner) - .await + pub(crate) async fn manifest_file(&self) -> Result { + Ok(self.manifest_file.clone()) } pub(crate) fn data_dir(&self) -> Path { @@ -968,7 +978,7 @@ impl Dataset { .list_manifests(&self.base, &self.object_store.inner) .await? .try_filter_map(|path| async move { - match read_manifest(&self.object_store, &path).await { + match read_manifest(&self.object_store, &path, None).await { Ok(manifest) => Ok(Some(Version::from(&manifest))), Err(e) => Err(e), } @@ -1427,7 +1437,7 @@ impl Dataset { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( self, &self.object_store, self.commit_handler.as_ref(), @@ -1439,6 +1449,7 @@ impl Dataset { .await?; self.manifest = Arc::new(manifest); + self.manifest_file = manifest_path; Ok(()) } @@ -1479,7 +1490,7 @@ impl Dataset { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( self, &self.object_store, self.commit_handler.as_ref(), @@ -1491,6 +1502,7 @@ impl Dataset { .await?; self.manifest = Arc::new(manifest); + self.manifest_file = manifest_path; Ok(()) } @@ -1507,7 +1519,7 @@ impl Dataset { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( self, &self.object_store, self.commit_handler.as_ref(), @@ -1519,6 +1531,7 @@ impl Dataset { .await?; self.manifest = Arc::new(manifest); + self.manifest_file = manifest_path; Ok(()) } @@ -1565,7 +1578,7 @@ pub(crate) async fn write_manifest_file( indices: Option>, config: &ManifestWriteConfig, naming_scheme: ManifestNamingScheme, -) -> std::result::Result<(), CommitError> { +) -> std::result::Result { if config.auto_set_feature_flags { apply_feature_flags(manifest, config.use_move_stable_row_ids)?; } @@ -1583,9 +1596,7 @@ pub(crate) async fn write_manifest_file( write_manifest_file_to_path, naming_scheme, ) - .await?; - - Ok(()) + .await } fn write_manifest_file_to_path<'a>( @@ -2030,6 +2041,7 @@ mod tests { .resolve_latest_version(&dataset.base, dataset.object_store()) .await .unwrap(), + None, ) .await .unwrap(); @@ -2052,6 +2064,7 @@ mod tests { .resolve_latest_version(&dataset.base, dataset.object_store()) .await .unwrap(), + None, ) .await .unwrap(); diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 005ea89372..342965852a 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -292,7 +292,7 @@ impl DatasetBuilder { } } - let (manifest, manifest_naming_scheme) = if let Some(mut manifest) = manifest { + let (manifest, location) = if let Some(mut manifest) = manifest { let location = commit_handler .resolve_version_location(&base_path, manifest.version, &object_store.inner) .await?; @@ -300,7 +300,7 @@ impl DatasetBuilder { let reader = object_store.open(&location.path).await?; populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; } - (manifest, location.naming_scheme) + (manifest, location) } else { let manifest_location = match version { Some(version) => { @@ -319,7 +319,7 @@ impl DatasetBuilder { }; let manifest = Dataset::load_manifest(&object_store, &manifest_location).await?; - (manifest, manifest_location.naming_scheme) + (manifest, manifest_location) }; Dataset::checkout_manifest( @@ -327,9 +327,10 @@ impl DatasetBuilder { base_path, table_uri, manifest, + location.path, session, commit_handler, - manifest_naming_scheme, + location.naming_scheme, ) .await } diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 7cc75541e1..16dd432c86 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -171,7 +171,7 @@ impl<'a> CleanupTask<'a> { // ignore it then we might delete valid data files thinking they are not // referenced. - let manifest = read_manifest(&self.dataset.object_store, &path).await?; + let manifest = read_manifest(&self.dataset.object_store, &path, None).await?; let dataset_version = self.dataset.version().version; // Don't delete the latest version, even if it is old. Don't delete tagged versions, diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 3e17727bb6..034168c26c 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -597,7 +597,7 @@ async fn reserve_fragment_ids( None, ); - let manifest = commit_transaction( + let (manifest, _) = commit_transaction( dataset, dataset.object_store(), dataset.commit_handler.as_ref(), @@ -902,7 +902,7 @@ pub async fn commit_compaction( None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( dataset, dataset.object_store(), dataset.commit_handler.as_ref(), @@ -914,6 +914,7 @@ pub async fn commit_compaction( .await?; dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(metrics) } diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index b42aaaaa32..106c0570bb 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -269,7 +269,7 @@ pub(super) async fn add_columns( /*blob_op= */ None, None, ); - let new_manifest = commit_transaction( + let (new_manifest, new_path) = commit_transaction( dataset, &dataset.object_store, dataset.commit_handler.as_ref(), @@ -281,6 +281,7 @@ pub(super) async fn add_columns( .await?; dataset.manifest = Arc::new(new_manifest); + dataset.manifest_file = new_path; Ok(()) } @@ -590,7 +591,7 @@ pub(super) async fn alter_columns( // TODO: adjust the indices here for the new schema - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( dataset, &dataset.object_store, dataset.commit_handler.as_ref(), @@ -602,6 +603,7 @@ pub(super) async fn alter_columns( .await?; dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(()) } @@ -651,7 +653,7 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( dataset, &dataset.object_store, dataset.commit_handler.as_ref(), @@ -663,6 +665,7 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res .await?; dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(()) } diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 9600844718..558c0e9ba3 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -463,13 +463,13 @@ impl Transaction { config: &ManifestWriteConfig, tx_path: &str, ) -> Result<(Manifest, Vec)> { - let path = commit_handler - .resolve_version(base_path, version, &object_store.inner) + let location = commit_handler + .resolve_version_location(base_path, version, &object_store.inner) .await?; - let mut manifest = read_manifest(object_store, &path).await?; + let mut manifest = read_manifest(object_store, &location.path, location.size).await?; manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); manifest.transaction_file = Some(tx_path.to_string()); - let indices = read_manifest_indexes(object_store, &path, &manifest).await?; + let indices = read_manifest_indexes(object_store, &location.path, &manifest).await?; Ok((manifest, indices)) } diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index df75533d32..3015e1fdbc 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -190,22 +190,23 @@ impl<'a> CommitBuilder<'a> { } }; + let session = self + .session + .or_else(|| self.dest.dataset().map(|ds| ds.session.clone())) + .unwrap_or_default(); + let dest = match &self.dest { WriteDestination::Dataset(dataset) => WriteDestination::Dataset(dataset.clone()), WriteDestination::Uri(uri) => { // Check if it already exists. - let mut builder = DatasetBuilder::from_uri(uri).with_read_params(ReadParams { - store_options: self.store_params.clone(), - commit_handler: self.commit_handler.clone(), - object_store_registry: self.object_store_registry.clone(), - ..Default::default() - }); - - // If read_version is zero, then it might not have originally been - // passed. We can assume the latest version. - if transaction.read_version > 0 { - builder = builder.with_version(transaction.read_version) - } + let builder = DatasetBuilder::from_uri(uri) + .with_read_params(ReadParams { + store_options: self.store_params.clone(), + commit_handler: self.commit_handler.clone(), + object_store_registry: self.object_store_registry.clone(), + ..Default::default() + }) + .with_session(session.clone()); match builder.load().await { Ok(dataset) => WriteDestination::Dataset(Arc::new(dataset)), @@ -265,7 +266,7 @@ impl<'a> CommitBuilder<'a> { ..Default::default() }; - let manifest = if let Some(dataset) = dest.dataset() { + let (manifest, manifest_file) = if let Some(dataset) = dest.dataset() { if self.detached { if matches!(manifest_naming_scheme, ManifestNamingScheme::V1) { return Err(Error::NotSupported { @@ -308,6 +309,7 @@ impl<'a> CommitBuilder<'a> { &transaction, &manifest_config, manifest_naming_scheme, + &session, ) .await? }; @@ -321,7 +323,8 @@ impl<'a> CommitBuilder<'a> { match &self.dest { WriteDestination::Dataset(dataset) => Ok(Dataset { manifest: Arc::new(manifest), - session: self.session.unwrap_or(dataset.session.clone()), + manifest_file, + session, ..dataset.as_ref().clone() }), WriteDestination::Uri(uri) => Ok(Dataset { @@ -329,7 +332,8 @@ impl<'a> CommitBuilder<'a> { base: base_path, uri: uri.to_string(), manifest: Arc::new(manifest), - session: self.session.unwrap_or_default(), + manifest_file, + session, commit_handler, tags, manifest_naming_scheme, @@ -337,3 +341,136 @@ impl<'a> CommitBuilder<'a> { } } } + +#[cfg(test)] +mod tests { + use arrow::array::{Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_table::{ + format::{DataFile, Fragment}, + io::commit::RenameCommitHandler, + }; + use url::Url; + + use crate::dataset::{InsertBuilder, WriteParams}; + + use super::*; + + fn sample_transaction(read_version: u64) -> Transaction { + Transaction { + uuid: uuid::Uuid::new_v4().hyphenated().to_string(), + operation: Operation::Append { + fragments: vec![Fragment { + id: 0, + files: vec![DataFile { + path: "file.lance".to_string(), + fields: vec![0], + column_indices: vec![0], + file_major_version: 2, + file_minor_version: 0, + }], + deletion_file: None, + row_id_meta: None, + physical_rows: Some(10), + }], + }, + read_version, + blobs_op: None, + tag: None, + } + } + + #[tokio::test] + async fn test_reuse_session() { + // Need to use in-memory for accurate IOPS tracking. + use crate::utils::test::IoTrackingStore; + + // Create new dataset + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..10_i32))], + ) + .unwrap(); + let memory_store = Arc::new(object_store::memory::InMemory::new()); + let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper(); + let store_params = ObjectStoreParams { + object_store_wrapper: Some(io_stats_wrapper), + object_store: Some((memory_store.clone(), Url::parse("memory://test").unwrap())), + ..Default::default() + }; + let dataset = InsertBuilder::new("memory://test") + .with_params(&WriteParams { + store_params: Some(store_params.clone()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + ..Default::default() + }) + .execute(vec![batch]) + .await + .unwrap(); + let mut dataset = Arc::new(dataset); + + let reset_iops = || { + io_stats.lock().unwrap().read_iops = 0; + io_stats.lock().unwrap().write_iops = 0; + }; + let get_new_iops = || { + let read_iops = io_stats.lock().unwrap().read_iops; + let write_iops = io_stats.lock().unwrap().write_iops; + reset_iops(); + (read_iops, write_iops) + }; + + let (initial_reads, initial_writes) = get_new_iops(); + assert!(initial_reads > 0); + assert!(initial_writes > 0); + + // Commit transaction 5 times + for i in 0..5 { + let new_ds = CommitBuilder::new(dataset.clone()) + .execute(sample_transaction(1)) + .await + .unwrap(); + dataset = Arc::new(new_ds); + assert_eq!(dataset.manifest().version, i + 2); + + // Because we are writing transactions sequentially, and caching them, + // we shouldn't need to read anything from disk. + // We have the following read IOPs: + // 1. Find the latest version + // 2. Open that manifest + // 3. (If any indices exist,) read the indices off of that manifest + // TODO: can we cache the last two? + assert_eq!(get_new_iops().0, 2, "i = {}", i); + } + + // Commit transaction with URI and session + let new_ds = CommitBuilder::new("memory://test") + .with_store_params(store_params.clone()) + .with_commit_handler(Arc::new(RenameCommitHandler)) + .with_session(dataset.session.clone()) + .execute(sample_transaction(1)) + .await + .unwrap(); + assert_eq!(new_ds.manifest().version, 7); + // Session should still be re-used + // However, the dataset needs to be loaded, so an additional two IOPs + // are needed. + assert_eq!(get_new_iops().0, 4); + + // Commit transaction with URI and no session + let new_ds = CommitBuilder::new("memory://test") + .with_store_params(store_params) + .with_commit_handler(Arc::new(RenameCommitHandler)) + .execute(sample_transaction(1)) + .await + .unwrap(); + assert_eq!(new_ds.manifest().version, 8); + // Now we have to load all previous transactions. + assert!(get_new_iops().0 > 20); + } +} diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 471dfc8744..909014d62a 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1000,7 +1000,7 @@ impl MergeInsertJob { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( dataset.as_ref(), dataset.object_store(), dataset.commit_handler.as_ref(), @@ -1013,6 +1013,7 @@ impl MergeInsertJob { let mut dataset = dataset.as_ref().clone(); dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(Arc::new(dataset)) } diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index a2a37cdfcc..b15a328967 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -373,7 +373,7 @@ impl UpdateJob { None, ); - let manifest = commit_transaction( + let (manifest, manifest_path) = commit_transaction( self.dataset.as_ref(), self.dataset.object_store(), self.dataset.commit_handler.as_ref(), @@ -386,6 +386,7 @@ impl UpdateJob { let mut dataset = self.dataset.as_ref().clone(); dataset.manifest = Arc::new(manifest); + dataset.manifest_file = manifest_path; Ok(Arc::new(dataset)) } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 7edb2771d1..deef5ac13e 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -328,7 +328,7 @@ impl DatasetIndexExt for Dataset { None, ); - let new_manifest = commit_transaction( + let (new_manifest, manifest_path) = commit_transaction( self, self.object_store(), self.commit_handler.as_ref(), @@ -340,6 +340,7 @@ impl DatasetIndexExt for Dataset { .await?; self.manifest = Arc::new(new_manifest); + self.manifest_file = manifest_path; Ok(()) } @@ -354,7 +355,7 @@ impl DatasetIndexExt for Dataset { return Ok(indices); } - let manifest_file = self.manifest_file(self.version().version).await?; + let manifest_file = self.manifest_file().await?; let loaded_indices: Arc> = read_manifest_indexes(&self.object_store, &manifest_file, &self.manifest) .await? @@ -403,7 +404,7 @@ impl DatasetIndexExt for Dataset { None, ); - let new_manifest = commit_transaction( + let (new_manifest, new_path) = commit_transaction( self, self.object_store(), self.commit_handler.as_ref(), @@ -415,6 +416,7 @@ impl DatasetIndexExt for Dataset { .await?; self.manifest = Arc::new(new_manifest); + self.manifest_file = new_path; Ok(()) } @@ -501,7 +503,7 @@ impl DatasetIndexExt for Dataset { None, ); - let new_manifest = commit_transaction( + let (new_manifest, manifest_path) = commit_transaction( self, self.object_store(), self.commit_handler.as_ref(), @@ -513,6 +515,7 @@ impl DatasetIndexExt for Dataset { .await?; self.manifest = Arc::new(new_manifest); + self.manifest_file = manifest_path; Ok(()) } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 7d4883cb86..412d037820 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -47,6 +47,7 @@ use crate::dataset::fragment::FileFragment; use crate::dataset::transaction::{Operation, Transaction}; use crate::dataset::{write_manifest_file, ManifestWriteConfig, BLOB_DIR}; use crate::index::DatasetIndexInternalExt; +use crate::session::Session; use crate::Dataset; #[cfg(all(feature = "dynamodb", test))] @@ -67,15 +68,17 @@ async fn read_transaction_file( transaction.try_into() } +fn transaction_file_cache_path(base_path: &Path, version: u64) -> Path { + base_path + .child("_transactions") + .child(format!("{}.txn", version)) +} + async fn read_dataset_transaction_file( dataset: &Dataset, version: u64, ) -> Result> { - let cache_path = dataset - .base - .child("_transactions") - .child(format!("{}.txn", version)); - dbg!(&dataset.session.file_metadata_cache); + let cache_path = transaction_file_cache_path(&dataset.base, version); dataset .session .file_metadata_cache @@ -150,6 +153,7 @@ fn check_transaction( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn do_commit_new_dataset( object_store: &ObjectStore, commit_handler: &dyn CommitHandler, @@ -158,7 +162,8 @@ async fn do_commit_new_dataset( write_config: &ManifestWriteConfig, manifest_naming_scheme: ManifestNamingScheme, blob_version: Option, -) -> Result { + session: &Session, +) -> Result<(Manifest, Path)> { let transaction_file = write_transaction_file(object_store, base_path, transaction).await?; let (mut manifest, indices) = @@ -184,7 +189,13 @@ async fn do_commit_new_dataset( // TODO: Allow Append or Overwrite mode to retry using `commit_transaction` // if there is a conflict. match result { - Ok(()) => Ok(manifest), + Ok(manifest_path) => { + session.file_metadata_cache.insert( + transaction_file_cache_path(base_path, manifest.version), + Arc::new(transaction.clone()), + ); + Ok((manifest, manifest_path)) + } Err(CommitError::CommitConflict) => Err(crate::Error::DatasetAlreadyExists { uri: base_path.to_string(), location: location!(), @@ -200,11 +211,12 @@ pub(crate) async fn commit_new_dataset( transaction: &Transaction, write_config: &ManifestWriteConfig, manifest_naming_scheme: ManifestNamingScheme, -) -> Result { + session: &Session, +) -> Result<(Manifest, Path)> { let blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { let blob_path = base_path.child(BLOB_DIR); let blob_tx = Transaction::new(0, blob_op.clone(), None, None); - let blob_manifest = do_commit_new_dataset( + let (blob_manifest, _) = do_commit_new_dataset( object_store, commit_handler, &blob_path, @@ -212,6 +224,7 @@ pub(crate) async fn commit_new_dataset( write_config, manifest_naming_scheme, None, + session, ) .await?; Some(blob_manifest.version) @@ -227,6 +240,7 @@ pub(crate) async fn commit_new_dataset( write_config, manifest_naming_scheme, blob_version, + session, ) .await } @@ -511,7 +525,7 @@ pub(crate) async fn do_commit_detached_transaction( write_config: &ManifestWriteConfig, commit_config: &CommitConfig, new_blob_version: Option, -) -> Result { +) -> Result<(Manifest, Path)> { // We don't strictly need a transaction file but we go ahead and create one for // record-keeping if nothing else. let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?; @@ -569,8 +583,8 @@ pub(crate) async fn do_commit_detached_transaction( .await; match result { - Ok(()) => { - return Ok(manifest); + Ok(path) => { + return Ok((manifest, path)); } Err(CommitError::CommitConflict) => { // We pick a random u64 for the version, so it's possible (though extremely unlikely) @@ -606,12 +620,12 @@ pub(crate) async fn commit_detached_transaction( transaction: &Transaction, write_config: &ManifestWriteConfig, commit_config: &CommitConfig, -) -> Result { +) -> Result<(Manifest, Path)> { let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { let blobs_dataset = dataset.blobs_dataset().await?.unwrap(); let blobs_tx = Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None); - let blobs_manifest = do_commit_detached_transaction( + let (blobs_manifest, _) = do_commit_detached_transaction( blobs_dataset.as_ref(), object_store, commit_handler, @@ -647,12 +661,12 @@ pub(crate) async fn commit_transaction( write_config: &ManifestWriteConfig, commit_config: &CommitConfig, manifest_naming_scheme: ManifestNamingScheme, -) -> Result { +) -> Result<(Manifest, Path)> { let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { let blobs_dataset = dataset.blobs_dataset().await?.unwrap(); let blobs_tx = Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None); - let blobs_manifest = do_commit_detached_transaction( + let (blobs_manifest, _) = do_commit_detached_transaction( blobs_dataset.as_ref(), object_store, commit_handler, @@ -672,12 +686,12 @@ pub(crate) async fn commit_transaction( let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?; // First, get all transactions since read_version - let mut version = transaction.read_version; - let other_transactions = std::iter::from_fn(|| { - version += 1; - Some(read_dataset_transaction_file(dataset, version)) - }); - let other_transactions = futures::stream::iter(other_transactions) + let read_version = transaction.read_version; + let mut dataset = dataset.clone(); + dataset.checkout_latest().await?; + let latest_version = dataset.manifest.version; + let other_transactions = futures::stream::iter((read_version + 1)..latest_version) + .map(|version| read_dataset_transaction_file(&dataset, version)) .buffered(10) .take_while(|res| { futures::future::ready(!matches!( @@ -688,8 +702,7 @@ pub(crate) async fn commit_transaction( .try_collect::>() .await?; - let mut target_version = transaction.read_version + other_transactions.len() as u64 + 1; - let mut dataset = dataset.checkout_version(target_version - 1).await?; + let mut target_version = latest_version + 1; if is_detached_version(target_version) { return Err(Error::Internal { message: "more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.".into(), location: location!() }); @@ -758,10 +771,14 @@ pub(crate) async fn commit_transaction( .await; match result { - Ok(()) => { - // TODO: insert in cache here. + Ok(manifest_path) => { + let cache_path = transaction_file_cache_path(&dataset.base, target_version); + dataset + .session() + .file_metadata_cache + .insert(cache_path, Arc::new(transaction.clone())); - return Ok(manifest); + return Ok((manifest, manifest_path)); } Err(CommitError::CommitConflict) => { // See if we can retry the commit. Try to account for all diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 6621798ec0..6cd93471bc 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -17,7 +17,7 @@ use lance_table::format::Fragment; use object_store::path::Path; use object_store::{ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, - PutOptions, PutPayload, PutResult, Result as OSResult, + PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart, }; use rand::prelude::SliceRandom; use rand::{Rng, SeedableRng}; @@ -270,6 +270,8 @@ fn field_structure(fragment: &Fragment) -> Vec> { pub struct IoStats { pub read_iops: u64, pub read_bytes: u64, + pub write_iops: u64, + pub write_bytes: u64, } impl Display for IoStats { @@ -313,20 +315,23 @@ impl IoTrackingStore { stats.read_iops += 1; stats.read_bytes += num_bytes; } + + fn record_write(&self, num_bytes: u64) { + let mut stats = self.stats.lock().unwrap(); + stats.write_iops += 1; + stats.write_bytes += num_bytes; + } } #[async_trait::async_trait] impl ObjectStore for IoTrackingStore { - async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult { - self.target.put(location, bytes).await - } - async fn put_opts( &self, location: &Path, bytes: PutPayload, opts: PutOptions, ) -> OSResult { + self.record_write(bytes.content_length() as u64); self.target.put_opts(location, bytes, opts).await } @@ -335,7 +340,11 @@ impl ObjectStore for IoTrackingStore { location: &Path, opts: PutMultipartOpts, ) -> OSResult> { - self.target.put_multipart_opts(location, opts).await + let target = self.target.put_multipart_opts(location, opts).await?; + Ok(Box::new(IoTrackingMultipartUpload { + target, + stats: self.stats.clone(), + })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { @@ -402,6 +411,32 @@ impl ObjectStore for IoTrackingStore { } } +#[derive(Debug)] +struct IoTrackingMultipartUpload { + target: Box, + stats: Arc>, +} + +#[async_trait::async_trait] +impl MultipartUpload for IoTrackingMultipartUpload { + async fn abort(&mut self) -> OSResult<()> { + self.target.abort().await + } + + async fn complete(&mut self) -> OSResult { + self.target.complete().await + } + + fn put_part(&mut self, payload: PutPayload) -> UploadPart { + { + let mut stats = self.stats.lock().unwrap(); + stats.write_iops += 1; + stats.write_bytes += payload.content_length() as u64; + } + self.target.put_part(payload) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; From 5937da951fc06bffd746acebf4db03a8b8becf80 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 18 Nov 2024 14:32:34 -0800 Subject: [PATCH 04/10] minor schedule optimization --- rust/lance/src/io/commit.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 412d037820..c36b9abf16 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -691,8 +691,11 @@ pub(crate) async fn commit_transaction( dataset.checkout_latest().await?; let latest_version = dataset.manifest.version; let other_transactions = futures::stream::iter((read_version + 1)..latest_version) - .map(|version| read_dataset_transaction_file(&dataset, version)) - .buffered(10) + .map(|version| { + read_dataset_transaction_file(&dataset, version) + .map(move |res| res.map(|tx| (version, tx))) + }) + .buffer_unordered(10) .take_while(|res| { futures::future::ready(!matches!( res, @@ -709,9 +712,12 @@ pub(crate) async fn commit_transaction( } // If any of them conflict with the transaction, return an error - for (version_offset, other_transaction) in other_transactions.iter().enumerate() { - let other_version = transaction.read_version + version_offset as u64 + 1; - check_transaction(transaction, other_version, Some(other_transaction.as_ref()))?; + for (other_version, other_transaction) in other_transactions.iter() { + check_transaction( + transaction, + *other_version, + Some(other_transaction.as_ref()), + )?; } for attempt_i in 0..commit_config.num_retries { From 57c3ee3063d2d4135e7e3244ca478fdad846a033 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 19 Nov 2024 12:17:43 -0800 Subject: [PATCH 05/10] test write iops --- rust/lance/src/dataset/write/commit.rs | 16 +++++++++++++--- rust/lance/src/utils/test.rs | 3 +++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 3015e1fdbc..25f88a8382 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -445,7 +445,13 @@ mod tests { // 2. Open that manifest // 3. (If any indices exist,) read the indices off of that manifest // TODO: can we cache the last two? - assert_eq!(get_new_iops().0, 2, "i = {}", i); + let (reads, writes) = get_new_iops(); + assert_eq!(reads, 2, "i = {}", i); + // Should see 3 IOPs: + // 1. Write the transaction files + // 2. Write the manifest + // 3. Atomically rename the manifest + assert_eq!(writes, 3, "i = {}", i); } // Commit transaction with URI and session @@ -460,7 +466,9 @@ mod tests { // Session should still be re-used // However, the dataset needs to be loaded, so an additional two IOPs // are needed. - assert_eq!(get_new_iops().0, 4); + let (reads, writes) = get_new_iops(); + assert_eq!(reads, 4); + assert_eq!(writes, 3); // Commit transaction with URI and no session let new_ds = CommitBuilder::new("memory://test") @@ -471,6 +479,8 @@ mod tests { .unwrap(); assert_eq!(new_ds.manifest().version, 8); // Now we have to load all previous transactions. - assert!(get_new_iops().0 > 20); + let (reads, writes) = get_new_iops(); + assert!(reads > 20); + assert_eq!(writes, 3); } } diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 6cd93471bc..93bc9ee2ac 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -399,14 +399,17 @@ impl ObjectStore for IoTrackingStore { } async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> { + self.record_write(0); self.target.copy(from, to).await } async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> { + self.record_write(0); self.target.rename(from, to).await } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { + self.record_write(0); self.target.copy_if_not_exists(from, to).await } } From 80166a0627a5e1d9b1fdf411f7084a9b719b63c7 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 20 Nov 2024 12:01:14 -0800 Subject: [PATCH 06/10] fix: make sure to load dictionaries --- rust/lance/src/dataset.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index f29fee4800..f6c1233c7e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -529,7 +529,18 @@ impl Dataset { .commit_handler .resolve_latest_location(&self.base, &self.object_store) .await?; - read_manifest(&self.object_store, &location.path, location.size).await + let mut manifest = read_manifest(&self.object_store, &location.path, location.size).await?; + if manifest.schema.has_dictionary_types() { + let reader = if let Some(size) = location.size { + self.object_store + .open_with_size(&location.path, size as usize) + .await? + } else { + self.object_store.open(&location.path).await? + }; + populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; + } + Ok(manifest) } /// Read the transaction file for this version of the dataset. From 7d7e0d1cdc7d0629d0e58b6d58863bebf78f5224 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 20 Nov 2024 13:36:40 -0800 Subject: [PATCH 07/10] fix other tests --- rust/lance/src/dataset.rs | 10 ++++++---- rust/lance/src/io/commit.rs | 6 +++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index f6c1233c7e..3701f48d09 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -309,7 +309,9 @@ impl Dataset { /// Check out the latest version of the dataset pub async fn checkout_latest(&mut self) -> Result<()> { - self.manifest = Arc::new(self.latest_manifest().await?); + let (manifest, path) = self.latest_manifest().await?; + self.manifest = Arc::new(manifest); + self.manifest_file = path; Ok(()) } @@ -524,7 +526,7 @@ impl Dataset { == LanceFileVersion::Legacy } - pub async fn latest_manifest(&self) -> Result { + pub async fn latest_manifest(&self) -> Result<(Manifest, Path)> { let location = self .commit_handler .resolve_latest_location(&self.base, &self.object_store) @@ -540,7 +542,7 @@ impl Dataset { }; populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; } - Ok(manifest) + Ok((manifest, location.path)) } /// Read the transaction file for this version of the dataset. @@ -559,7 +561,7 @@ impl Dataset { /// Restore the currently checked out version of the dataset as the latest version. pub async fn restore(&mut self) -> Result<()> { - let latest_manifest = self.latest_manifest().await?; + let (latest_manifest, _) = self.latest_manifest().await?; let latest_version = latest_manifest.version; let transaction = Transaction::new( diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index c36b9abf16..c8916bf7db 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -688,7 +688,11 @@ pub(crate) async fn commit_transaction( // First, get all transactions since read_version let read_version = transaction.read_version; let mut dataset = dataset.clone(); - dataset.checkout_latest().await?; + if let Operation::Restore { version } = transaction.operation { + dataset.checkout_version(version).await?; + } else { + dataset.checkout_latest().await?; + } let latest_version = dataset.manifest.version; let other_transactions = futures::stream::iter((read_version + 1)..latest_version) .map(|version| { From c189f8cbd28ea5ad10b16e07cf0d3fe97c73c0e6 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 20 Nov 2024 15:33:49 -0800 Subject: [PATCH 08/10] fixes --- python/python/lance/dataset.py | 8 ++++++++ python/python/tests/test_dataset.py | 2 +- python/src/lib.rs | 2 +- rust/lance/src/dataset/write/commit.rs | 20 +++++++++++--------- rust/lance/src/io/commit.rs | 9 ++------- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 2c30d0e840..ae0593a6e4 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2187,6 +2187,14 @@ def commit( f"commit_lock must be a function, got {type(commit_lock)}" ) + if read_version is None and not isinstance( + operation, (LanceOperation.Overwrite, LanceOperation.Restore) + ): + raise ValueError( + "read_version is required for all operations except " + "Overwrite and Restore" + ) + new_ds = _Dataset.commit( base_uri, operation._to_inner(), diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 65b84eb121..af1e2974bb 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -847,7 +847,7 @@ def test_append_with_commit(tmp_path: Path): fragment = lance.fragment.LanceFragment.create(base_dir, table) append = lance.LanceOperation.Append([fragment]) - with pytest.raises(OSError): + with pytest.raises(ValueError): # Must specify read version dataset = lance.LanceDataset.commit(dataset, append) diff --git a/python/src/lib.rs b/python/src/lib.rs index 67c00b7692..ec39c834fd 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -304,7 +304,7 @@ fn manifest_needs_migration(dataset: &PyAny) -> PyResult { let indices = RT .block_on(Some(py), dataset_ref.load_indices())? .map_err(|err| PyIOError::new_err(format!("Could not read dataset metadata: {}", err)))?; - let manifest = RT + let (manifest, _) = RT .block_on(Some(py), dataset_ref.latest_manifest())? .map_err(|err| PyIOError::new_err(format!("Could not read dataset metadata: {}", err)))?; Ok(::lance::io::commit::manifest_needs_migration( diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 25f88a8382..831201f82f 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use lance_file::version::LanceFileVersion; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use lance_table::{ - format::DataStorageFormat, + format::{is_detached_version, DataStorageFormat}, io::commit::{CommitConfig, CommitHandler, ManifestNamingScheme}, }; use snafu::{location, Location}; @@ -199,7 +199,7 @@ impl<'a> CommitBuilder<'a> { WriteDestination::Dataset(dataset) => WriteDestination::Dataset(dataset.clone()), WriteDestination::Uri(uri) => { // Check if it already exists. - let builder = DatasetBuilder::from_uri(uri) + let mut builder = DatasetBuilder::from_uri(uri) .with_read_params(ReadParams { store_options: self.store_params.clone(), commit_handler: self.commit_handler.clone(), @@ -208,6 +208,13 @@ impl<'a> CommitBuilder<'a> { }) .with_session(session.clone()); + // If we are using a detached version, we need to load the dataset. + // Otherwise, we are writing to the main history, and need to check + // out the latest version. + if is_detached_version(transaction.read_version) { + builder = builder.with_version(transaction.read_version) + } + match builder.load().await { Ok(dataset) => WriteDestination::Dataset(Arc::new(dataset)), Err(Error::DatasetNotFound { .. } | Error::NotFound { .. }) => { @@ -440,13 +447,8 @@ mod tests { // Because we are writing transactions sequentially, and caching them, // we shouldn't need to read anything from disk. - // We have the following read IOPs: - // 1. Find the latest version - // 2. Open that manifest - // 3. (If any indices exist,) read the indices off of that manifest - // TODO: can we cache the last two? let (reads, writes) = get_new_iops(); - assert_eq!(reads, 2, "i = {}", i); + assert_eq!(reads, 0, "i = {}", i); // Should see 3 IOPs: // 1. Write the transaction files // 2. Write the manifest @@ -467,7 +469,7 @@ mod tests { // However, the dataset needs to be loaded, so an additional two IOPs // are needed. let (reads, writes) = get_new_iops(); - assert_eq!(reads, 4); + assert_eq!(reads, 2); assert_eq!(writes, 3); // Commit transaction with URI and no session diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index c8916bf7db..69aad919ea 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -688,18 +688,13 @@ pub(crate) async fn commit_transaction( // First, get all transactions since read_version let read_version = transaction.read_version; let mut dataset = dataset.clone(); - if let Operation::Restore { version } = transaction.operation { - dataset.checkout_version(version).await?; - } else { - dataset.checkout_latest().await?; - } let latest_version = dataset.manifest.version; let other_transactions = futures::stream::iter((read_version + 1)..latest_version) .map(|version| { read_dataset_transaction_file(&dataset, version) .map(move |res| res.map(|tx| (version, tx))) }) - .buffer_unordered(10) + .buffer_unordered(dataset.object_store().io_parallelism()) .take_while(|res| { futures::future::ready(!matches!( res, @@ -806,7 +801,7 @@ pub(crate) async fn commit_transaction( read_dataset_transaction_file(&dataset, version) .map(move |res| res.map(|tx| (version, tx))) }) - .buffer_unordered(10) + .buffer_unordered(dataset.object_store().io_parallelism()) .and_then(|(version, other_transaction)| { let res = check_transaction( transaction, From 1792fc3dd8f5bbb5925d36ed6168b30a535f37d4 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 20 Nov 2024 16:16:40 -0800 Subject: [PATCH 09/10] fix conflict resolution --- rust/lance/src/dataset/write/commit.rs | 8 +++++--- rust/lance/src/io/commit.rs | 6 +++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 831201f82f..626863a8c3 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -446,9 +446,11 @@ mod tests { assert_eq!(dataset.manifest().version, i + 2); // Because we are writing transactions sequentially, and caching them, - // we shouldn't need to read anything from disk. + // we shouldn't need to read anything from disk. Except we do need + // to check for the latest version to see if we need to do conflict + // resolution. let (reads, writes) = get_new_iops(); - assert_eq!(reads, 0, "i = {}", i); + assert_eq!(reads, 1, "i = {}", i); // Should see 3 IOPs: // 1. Write the transaction files // 2. Write the manifest @@ -469,7 +471,7 @@ mod tests { // However, the dataset needs to be loaded, so an additional two IOPs // are needed. let (reads, writes) = get_new_iops(); - assert_eq!(reads, 2); + assert_eq!(reads, 3); assert_eq!(writes, 3); // Commit transaction with URI and no session diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 69aad919ea..c302ac50ea 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -688,8 +688,8 @@ pub(crate) async fn commit_transaction( // First, get all transactions since read_version let read_version = transaction.read_version; let mut dataset = dataset.clone(); - let latest_version = dataset.manifest.version; - let other_transactions = futures::stream::iter((read_version + 1)..latest_version) + let latest_version = dataset.latest_version_id().await?; + let other_transactions = futures::stream::iter((read_version + 1)..=latest_version) .map(|version| { read_dataset_transaction_file(&dataset, version) .map(move |res| res.map(|tx| (version, tx))) @@ -711,7 +711,7 @@ pub(crate) async fn commit_transaction( } // If any of them conflict with the transaction, return an error - for (other_version, other_transaction) in other_transactions.iter() { + for (other_version, other_transaction) in dbg!(other_transactions).iter() { check_transaction( transaction, *other_version, From 4203b717dd49121ef927695ca7b8f8f0fdf4b608 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 20 Nov 2024 17:32:19 -0800 Subject: [PATCH 10/10] last few fixes --- rust/lance/src/dataset.rs | 3 +++ rust/lance/src/io/commit.rs | 11 +++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3701f48d09..b6adb78e21 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -531,6 +531,9 @@ impl Dataset { .commit_handler .resolve_latest_location(&self.base, &self.object_store) .await?; + if location.version == self.manifest.version { + return Ok((self.manifest.as_ref().clone(), self.manifest_file.clone())); + } let mut manifest = read_manifest(&self.object_store, &location.path, location.size).await?; if manifest.schema.has_dictionary_types() { let reader = if let Some(size) = location.size { diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index c302ac50ea..e8e77e4b41 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -688,7 +688,11 @@ pub(crate) async fn commit_transaction( // First, get all transactions since read_version let read_version = transaction.read_version; let mut dataset = dataset.clone(); - let latest_version = dataset.latest_version_id().await?; + // We need to checkout the latest version, because any fixes we apply + // (like computing the new row ids) needs to be done based on the most + // recent manifest. + dataset.checkout_latest().await?; + let latest_version = dataset.manifest.version; let other_transactions = futures::stream::iter((read_version + 1)..=latest_version) .map(|version| { read_dataset_transaction_file(&dataset, version) @@ -711,7 +715,7 @@ pub(crate) async fn commit_transaction( } // If any of them conflict with the transaction, return an error - for (other_version, other_transaction) in dbg!(other_transactions).iter() { + for (other_version, other_transaction) in other_transactions.iter() { check_transaction( transaction, *other_version, @@ -802,7 +806,7 @@ pub(crate) async fn commit_transaction( .map(move |res| res.map(|tx| (version, tx))) }) .buffer_unordered(dataset.object_store().io_parallelism()) - .and_then(|(version, other_transaction)| { + .try_for_each(|(version, other_transaction)| { let res = check_transaction( transaction, version, @@ -810,7 +814,6 @@ pub(crate) async fn commit_transaction( ); futures::future::ready(res) }) - .try_all(|_| futures::future::ready(true)) .await?; target_version = latest_version + 1; }