Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize reading transactions in commit loop #3117

Merged
merged 10 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,14 @@ def commit(
f"commit_lock must be a function, got {type(commit_lock)}"
)

if read_version is None and not isinstance(
operation, (LanceOperation.Overwrite, LanceOperation.Restore)
):
raise ValueError(
"read_version is required for all operations except "
"Overwrite and Restore"
)

new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
Expand Down
2 changes: 1 addition & 1 deletion python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ def test_append_with_commit(tmp_path: Path):
fragment = lance.fragment.LanceFragment.create(base_dir, table)
append = lance.LanceOperation.Append([fragment])

with pytest.raises(OSError):
with pytest.raises(ValueError):
# Must specify read version
dataset = lance.LanceDataset.commit(dataset, append)

Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ fn manifest_needs_migration(dataset: &PyAny) -> PyResult<bool> {
let indices = RT
.block_on(Some(py), dataset_ref.load_indices())?
.map_err(|err| PyIOError::new_err(format!("Could not read dataset metadata: {}", err)))?;
let manifest = RT
let (manifest, _) = RT
.block_on(Some(py), dataset_ref.latest_manifest())?
.map_err(|err| PyIOError::new_err(format!("Could not read dataset metadata: {}", err)))?;
Ok(::lance::io::commit::manifest_needs_migration(
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;

use deepsize::{Context, DeepSizeOf};
use futures::Future;
use moka::sync::Cache;
use moka::sync::{Cache, ConcurrentCacheExt};
use object_store::path::Path;

use crate::utils::path::LancePathExt;
Expand Down Expand Up @@ -114,6 +114,7 @@ impl FileMetadataCache {

pub fn size(&self) -> usize {
if let Some(cache) = self.cache.as_ref() {
cache.sync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this mainly for testing purposes? Or are there non-testing reasons we need this to be accurate? I don't think it's a problem as we shouldn't be calling size in a loop so seems fine, just curious.

cache.entry_count() as usize
} else {
0
Expand Down
17 changes: 17 additions & 0 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
&self,
dir_path: impl Into<&Path> + Send,
unmodified_since: Option<DateTime<Utc>>,
) -> Result<BoxStream<Result<ObjectMeta>>>;

Check warning on line 65 in rust/lance-io/src/object_store.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

elided lifetime has a name

Check warning on line 65 in rust/lance-io/src/object_store.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

elided lifetime has a name
}

#[async_trait]
Expand All @@ -71,7 +71,7 @@
&self,
dir_path: impl Into<&Path> + Send,
unmodified_since: Option<DateTime<Utc>>,
) -> Result<BoxStream<Result<ObjectMeta>>> {

Check warning on line 74 in rust/lance-io/src/object_store.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

elided lifetime has a name

Check warning on line 74 in rust/lance-io/src/object_store.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

elided lifetime has a name
let mut output = self.list(Some(dir_path.into()));
if let Some(unmodified_since_val) = unmodified_since {
output = output
Expand Down Expand Up @@ -400,6 +400,23 @@
uri: &str,
params: &ObjectStoreParams,
) -> Result<(Self, Path)> {
if let Some((store, path)) = params.object_store.as_ref() {
let mut inner = store.clone();
if let Some(wrapper) = params.object_store_wrapper.as_ref() {
inner = wrapper.wrap(inner);
}
let store = Self {
inner,
scheme: path.scheme().to_string(),
block_size: params.block_size.unwrap_or(64 * 1024),
use_constant_size_upload_parts: params.use_constant_size_upload_parts,
list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
};
let path = Path::from(path.path());
return Ok((store, path));
}
let (object_store, path) = match Url::parse(uri) {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
// On Windows, the drive is parsed as a scheme
Expand Down Expand Up @@ -635,7 +652,7 @@
pub fn remove_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<Result<Path>> {

Check warning on line 655 in rust/lance-io/src/object_store.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

elided lifetime has a name

Check warning on line 655 in rust/lance-io/src/object_store.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

elided lifetime has a name
self.inner
.delete_stream(locations.err_into::<ObjectStoreError>().boxed())
.err_into::<Error>()
Expand Down
13 changes: 7 additions & 6 deletions rust/lance-table/src/format/fragment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use deepsize::DeepSizeOf;
use lance_core::Error;
use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
use lance_file::version::LanceFileVersion;
Expand All @@ -16,7 +17,7 @@ use lance_core::error::Result;
/// Lance Data File
///
/// A data file is one piece of file storing data.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct DataFile {
/// Relative path of the data file to dataset root.
pub path: String,
Expand Down Expand Up @@ -144,7 +145,7 @@ impl TryFrom<pb::DataFile> for DataFile {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
#[serde(rename_all = "lowercase")]
pub enum DeletionFileType {
Array,
Expand All @@ -161,7 +162,7 @@ impl DeletionFileType {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct DeletionFile {
pub read_version: u64,
pub id: u64,
Expand Down Expand Up @@ -199,15 +200,15 @@ impl TryFrom<pb::DeletionFile> for DeletionFile {
}

/// A reference to a part of a file.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct ExternalFile {
pub path: String,
pub offset: u64,
pub size: u64,
}

/// Metadata about location of the row id sequence.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub enum RowIdMeta {
Inline(Vec<u8>),
External(ExternalFile),
Expand All @@ -234,7 +235,7 @@ impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
///
/// A fragment is a set of files which represent the different columns of the same rows.
/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct Fragment {
/// Fragment ID
pub id: u64,
Expand Down
16 changes: 8 additions & 8 deletions rust/lance-table/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ pub trait CommitHandler: Debug + Send + Sync {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError>;
) -> std::result::Result<Path, CommitError>;
}

async fn default_resolve_version(
Expand Down Expand Up @@ -723,7 +723,7 @@ impl CommitHandler for UnsafeCommitHandler {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
// Log a one-time warning
if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -737,7 +737,7 @@ impl CommitHandler for UnsafeCommitHandler {
// Write the manifest naively
manifest_writer(object_store, manifest, indices, &version_path).await?;

Ok(())
Ok(version_path)
}
}

Expand Down Expand Up @@ -783,7 +783,7 @@ impl<T: CommitLock + Send + Sync> CommitHandler for T {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
let path = naming_scheme.manifest_path(base_path, manifest.version);
// NOTE: once we have the lease we cannot use ? to return errors, since
// we must release the lease before returning.
Expand Down Expand Up @@ -812,7 +812,7 @@ impl<T: CommitLock + Send + Sync> CommitHandler for T {
// Release the lock
lease.release(res.is_ok()).await?;

res.map_err(|err| err.into())
res.map_err(|err| err.into()).map(|_| path)
}
}

Expand All @@ -826,7 +826,7 @@ impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
self.as_ref()
.commit(
manifest,
Expand Down Expand Up @@ -855,7 +855,7 @@ impl CommitHandler for RenameCommitHandler {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
// Create a temporary object, then use `rename_if_not_exists` to commit.
// If failed, clean up the temporary object.

Expand All @@ -870,7 +870,7 @@ impl CommitHandler for RenameCommitHandler {
.rename_if_not_exists(&tmp_path, &path)
.await
{
Ok(_) => Ok(()),
Ok(_) => Ok(path),
Err(ObjectStoreError::AlreadyExists { .. }) => {
// Another transaction has already been committed
// Attempt to clean up temporary object, but ignore errors if we can't
Expand Down
25 changes: 12 additions & 13 deletions rust/lance-table/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl CommitHandler for ExternalManifestCommitHandler {
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
) -> std::result::Result<Path, CommitError> {
// path we get here is the path to the manifest we want to write
// use object_store.base_path.as_ref() for getting the root of the dataset

Expand All @@ -323,27 +323,26 @@ impl CommitHandler for ExternalManifestCommitHandler {
.await
.map_err(|_| CommitError::CommitConflict {});

if res.is_err() {
if let Err(err) = res {
// delete the staging manifest
match object_store.inner.delete(&staging_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(CommitError::OtherError(e.into())),
}
return res;
return Err(err);
}

let scheme = detect_naming_scheme_from_path(&path)?;

self.finalize_manifest(
base_path,
&staging_path,
manifest.version,
&object_store.inner,
scheme,
)
.await?;

Ok(())
Ok(self
.finalize_manifest(
base_path,
&staging_path,
manifest.version,
&object_store.inner,
scheme,
)
.await?)
}
}
14 changes: 11 additions & 3 deletions rust/lance-table/src/io/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC};
///
/// This only reads manifest files. It does not read data files.
#[instrument(level = "debug", skip(object_store))]
pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Manifest> {
let file_size = object_store.inner.head(path).await?.size;
pub async fn read_manifest(
object_store: &ObjectStore,
path: &Path,
known_size: Option<u64>,
) -> Result<Manifest> {
let file_size = if let Some(known_size) = known_size {
known_size as usize
} else {
object_store.inner.head(path).await?.size
};
const PREFETCH_SIZE: usize = 64 * 1024;
let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize;
let range = Range {
Expand Down Expand Up @@ -263,7 +271,7 @@ mod test {
.unwrap();
writer.shutdown().await.unwrap();

let roundtripped_manifest = read_manifest(&store, &path).await.unwrap();
let roundtripped_manifest = read_manifest(&store, &path, None).await.unwrap();

assert_eq!(manifest, roundtripped_manifest);

Expand Down
Loading
Loading