Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,25 @@ systems and cloud object stores, with the notable except of AWS S3. For ones
that lack this functionality, an external locking mechanism can be configured
by the user.

Manifest Naming Schemes
~~~~~~~~~~~~~~~~~~~~~~~

Manifest files must use a consistent naming scheme. The names correspond to the
versions. That way we can open the right version of the dataset without having
to read all the manifests. It also makes it clear which file path is the next
one to be written.

There are two naming schemes that can be used:

1. V1: ``_versions/{version}.manifest``. This is the legacy naming scheme.
2. V2: ``_versions/{u64::MAX - version:020}.manifest``. This is the new naming
scheme. The version is zero-padded (to 20 digits) and subtracted from
``u64::MAX``. This allows the versions to be sorted in descending order,
making it possible to find the latest manifest on object storage using a
single list call.

It is an error for there to be a mixture of these two naming schemes.

.. _conflict_resolution:

Conflict resolution
Expand Down
1 change: 1 addition & 0 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl BlockingDataset {
None,
None,
object_store_registry,
false, // TODO: support enable_v2_manifest_paths
))?;
Ok(Self { inner })
}
Expand Down
32 changes: 32 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,7 @@ def commit(
read_version: Optional[int] = None,
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> LanceDataset:
"""Create a new version of dataset

Expand Down Expand Up @@ -1788,6 +1789,14 @@ def commit(
storage_options : optional, dict
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
enable_v2_manifest_paths : bool, optional
If True, and this is a new dataset, uses the new V2 manifest paths.
These paths provide more efficient opening of datasets with many
versions on object stores. This parameter has no effect if the dataset
already exists. To migrate an existing dataset, instead use the
:meth:`migrate_manifest_paths_v2` method. Default is False. WARNING:
turning this on will make the dataset unreadable for older versions
of Lance (prior to 0.17.0).

Returns
-------
Expand Down Expand Up @@ -1831,6 +1840,7 @@ def commit(
read_version,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
)
return LanceDataset(base_uri, storage_options=storage_options)

Expand All @@ -1843,6 +1853,20 @@ def validate(self):
"""
self._ds.validate()

def migrate_manifest_paths_v2(self):
"""
Migrate the manifest paths to the new format.

This will update the manifest to use the new v2 format for paths.

This function is idempotent, and can be run multiple times without
changing the state of the object store.

DANGER: this should not be run while other concurrent operations are happening.
And it should also run until completion before resuming other operations.
"""
self._ds.migrate_manifest_paths_v2()

@property
def optimize(self) -> "DatasetOptimizer":
return DatasetOptimizer(self)
Expand Down Expand Up @@ -2818,6 +2842,7 @@ def write_dataset(
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: str = "legacy",
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: bool = False,
) -> LanceDataset:
"""Write a given data_obj to the given uri

Expand Down Expand Up @@ -2865,6 +2890,12 @@ def write_dataset(
use_legacy_format : optional, bool, default None
Deprecated method for setting the data storage version. Use the
`data_storage_version` parameter instead.
enable_v2_manifest_paths : bool, optional
If True, and this is a new dataset, uses the new V2 manifest paths.
These paths provide more efficient opening of datasets with many
versions on object stores. This parameter has no effect if the dataset
already exists. To migrate an existing dataset, instead use the
:meth:`LanceDataset.migrate_manifest_paths_v2` method. Default is False.
"""
if use_legacy_format is not None:
warnings.warn(
Expand Down Expand Up @@ -2897,6 +2928,7 @@ def write_dataset(
"progress": progress,
"storage_options": storage_options,
"data_storage_version": data_storage_version,
"enable_v2_manifest_paths": enable_v2_manifest_paths,
}

if commit_lock:
Expand Down
24 changes: 24 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,30 @@ def test_asof_checkout(tmp_path: Path):
assert len(ds.to_table()) == 9


def test_v2_manifest_paths(tmp_path: Path):
lance.write_dataset(
pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=True
)
manifest_path = os.listdir(tmp_path / "_versions")
assert len(manifest_path) == 1
assert re.match(r"\d{20}\.manifest", manifest_path[0])


def test_v2_manifest_paths_migration(tmp_path: Path):
# Create a dataset with v1 manifest paths
lance.write_dataset(
pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=False
)
manifest_path = os.listdir(tmp_path / "_versions")
assert manifest_path == ["1.manifest"]

# Migrate to v2 manifest paths
lance.dataset(tmp_path).migrate_manifest_paths_v2()
manifest_path = os.listdir(tmp_path / "_versions")
assert len(manifest_path) == 1
assert re.match(r"\d{20}\.manifest", manifest_path[0])


def test_tag(tmp_path: Path):
table = pa.Table.from_pydict({"colA": [1, 2, 3], "colB": [4, 5, 6]})
base_dir = tmp_path / "test"
Expand Down
16 changes: 16 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,7 @@ impl Dataset {
read_version: Option<u64>,
commit_lock: Option<&PyAny>,
storage_options: Option<HashMap<String, String>>,
enable_v2_manifest_paths: Option<bool>,
) -> PyResult<Self> {
let object_store_params =
storage_options
Expand Down Expand Up @@ -1255,6 +1256,7 @@ impl Dataset {
object_store_params,
commit_handler,
object_store_registry,
enable_v2_manifest_paths.unwrap_or(false),
)
.await
})?
Expand All @@ -1270,6 +1272,14 @@ impl Dataset {
.map_err(|err| PyIOError::new_err(err.to_string()))
}

fn migrate_manifest_paths_v2(&mut self) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
RT.block_on(None, new_self.migrate_manifest_paths_v2())?
.map_err(|err| PyIOError::new_err(err.to_string()))?;
self.ds = Arc::new(new_self);
Ok(())
}

fn drop_columns(&mut self, columns: Vec<&str>) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
RT.block_on(None, new_self.drop_columns(&columns))?
Expand Down Expand Up @@ -1464,6 +1474,12 @@ pub fn get_write_params(options: &PyDict) -> PyResult<Option<WriteParams>> {
});
}

if let Some(enable_v2_manifest_paths) =
get_dict_opt::<bool>(options, "enable_v2_manifest_paths")?
{
p.enable_v2_manifest_paths = enable_v2_manifest_paths;
}

p.commit_handler = get_commit_handler(options);

Some(p)
Expand Down
16 changes: 16 additions & 0 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ pub struct ObjectStore {
pub inner: Arc<dyn OSObjectStore>,
scheme: String,
block_size: usize,
/// Whether to use constant size upload parts for multipart uploads. This
/// is only necessary for Cloudflare R2.
pub use_constant_size_upload_parts: bool,
/// Whether we can assume that the list of files is lexically ordered. This
/// is true for object stores, but not for local filesystems.
pub list_is_lexically_ordered: bool,
io_parallelism: usize,
}

Expand Down Expand Up @@ -338,6 +343,7 @@ pub struct ObjectStoreParams {
/// is false, max upload size is 2.5TB. When this is true, the max size is
/// 50GB.
pub use_constant_size_upload_parts: bool,
pub list_is_lexically_ordered: Option<bool>,
}

impl Default for ObjectStoreParams {
Expand All @@ -350,6 +356,7 @@ impl Default for ObjectStoreParams {
object_store_wrapper: None,
storage_options: None,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: None,
}
}
}
Expand Down Expand Up @@ -431,6 +438,7 @@ impl ObjectStore {
scheme: String::from(scheme),
block_size: 4 * 1024, // 4KB block size
use_constant_size_upload_parts: false,
list_is_lexically_ordered: false,
io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM,
},
Path::from_absolute_path(expanded_path.as_path())?,
Expand All @@ -456,6 +464,7 @@ impl ObjectStore {
scheme: String::from("file"),
block_size: 4 * 1024, // 4KB block size
use_constant_size_upload_parts: false,
list_is_lexically_ordered: false,
io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM,
}
}
Expand All @@ -467,6 +476,7 @@ impl ObjectStore {
scheme: String::from("memory"),
block_size: 64 * 1024,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: get_num_compute_intensive_cpus(),
}
}
Expand Down Expand Up @@ -801,6 +811,7 @@ async fn configure_store(
scheme: String::from(url.scheme()),
block_size: 64 * 1024,
use_constant_size_upload_parts,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
})
}
Expand All @@ -818,6 +829,7 @@ async fn configure_store(
scheme: String::from("gs"),
block_size: 64 * 1024,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
})
}
Expand All @@ -831,6 +843,7 @@ async fn configure_store(
scheme: String::from("az"),
block_size: 64 * 1024,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
})
}
Expand All @@ -847,6 +860,7 @@ async fn configure_store(
scheme: String::from("memory"),
block_size: 64 * 1024,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: get_num_compute_intensive_cpus(),
}),
unknown_scheme => {
Expand All @@ -870,6 +884,7 @@ impl ObjectStore {
block_size: Option<usize>,
wrapper: Option<Arc<dyn WrappingObjectStore>>,
use_constant_size_upload_parts: bool,
list_is_lexically_ordered: bool,
io_parallelism: usize,
) -> Self {
let scheme = location.scheme();
Expand All @@ -885,6 +900,7 @@ impl ObjectStore {
scheme: scheme.into(),
block_size,
use_constant_size_upload_parts,
list_is_lexically_ordered,
io_parallelism,
}
}
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ mod tests {
None,
None,
false,
false,
1,
));

Expand Down Expand Up @@ -828,6 +829,7 @@ mod tests {
None,
None,
false,
false,
1,
));

Expand Down
Loading