From f48e9706a13aae402c7965eee371e72f5b01a559 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 23 Sep 2024 12:22:23 -0700 Subject: [PATCH] fix: use existing storage version on overwrite, if none specified (#2924) This was supposed to be the logic but was not tested and was not being used correctly. --- python/python/lance/dataset.py | 9 ++-- python/python/lance/fragment.py | 18 +++---- python/python/lance/ray/sink.py | 24 +++++---- python/python/tests/test_dataset.py | 11 +++++ rust/lance/src/dataset.rs | 76 ++++++++++++++++++++++++++++- 5 files changed, 109 insertions(+), 29 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index bafda27057..2e6f71f2a1 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2868,7 +2868,7 @@ def write_dataset( commit_lock: Optional[CommitLock] = None, progress: Optional[FragmentWriteProgress] = None, storage_options: Optional[Dict[str, str]] = None, - data_storage_version: str = "stable", + data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, enable_v2_manifest_paths: bool = False, ) -> LanceDataset: @@ -2910,11 +2910,10 @@ def write_dataset( storage_options : optional, dict Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. - data_storage_version: optional, str, default "legacy" + data_storage_version: optional, str, default None The version of the data storage format to use. Newer versions are more - efficient but require newer versions of lance to read. The default is - "legacy" which will use the legacy v1 version. See the user guide - for more details. + efficient but require newer versions of lance to read. The default (None) + will use the latest stable version. See the user guide for more details. use_legacy_format : optional, bool, default None Deprecated method for setting the data storage version. Use the `data_storage_version` parameter instead. diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index 5659786fb8..808d25c822 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -147,7 +147,7 @@ def create( progress: Optional[FragmentWriteProgress] = None, mode: str = "append", *, - data_storage_version: str = "stable", + data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, ) -> FragmentMetadata: @@ -180,11 +180,10 @@ def create( The write mode. If "append" is specified, the data will be checked against the existing dataset's schema. Otherwise, pass "create" or "overwrite" to assign new field ids to the schema. - data_storage_version: optional, str, default "legacy" + data_storage_version: optional, str, default None The version of the data storage format to use. Newer versions are more - efficient but require newer versions of lance to read. The default is - "legacy" which will use the legacy v1 version. See the user guide - for more details. + efficient but require newer versions of lance to read. The default (None) + will use the latest stable version. See the user guide for more details. use_legacy_format: bool, default None Deprecated parameter. Use data_storage_version instead. storage_options : optional, dict @@ -529,7 +528,7 @@ def write_fragments( max_rows_per_group: int = 1024, max_bytes_per_file: int = DEFAULT_MAX_BYTES_PER_FILE, progress: Optional[FragmentWriteProgress] = None, - data_storage_version: str = "stable", + data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, ) -> List[FragmentMetadata]: @@ -568,11 +567,10 @@ def write_fragments( *Experimental API*. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing. - data_storage_version: optional, str, default "legacy" + data_storage_version: optional, str, default None The version of the data storage format to use. Newer versions are more - efficient but require newer versions of lance to read. The default is - "legacy" which will use the legacy v1 version. See the user guide - for more details. + efficient but require newer versions of lance to read. The default (None) + will use the 2.0 version. See the user guide for more details. use_legacy_format : optional, bool, default None Deprecated method for setting the data storage version. Use the `data_storage_version` parameter instead. diff --git a/python/python/lance/ray/sink.py b/python/python/lance/ray/sink.py index b2c42231f3..debf956ee9 100644 --- a/python/python/lance/ray/sink.py +++ b/python/python/lance/ray/sink.py @@ -54,7 +54,7 @@ def _write_fragment( max_rows_per_file: int = 1024 * 1024, max_bytes_per_file: Optional[int] = None, max_rows_per_group: int = 1024, # Only useful for v1 writer. - data_storage_version: str = "stable", + data_storage_version: Optional[str] = None, storage_options: Optional[Dict[str, Any]] = None, ) -> Tuple[FragmentMetadata, pa.Schema]: from ..dependencies import _PANDAS_AVAILABLE @@ -168,7 +168,7 @@ class LanceDatasink(_BaseLanceDatasink): Choices are 'append', 'create', 'overwrite'. max_rows_per_file : int, optional The maximum number of rows per file. Default is 1024 * 1024. - data_storage_version: optional, str, default "legacy" + data_storage_version: optional, str, default None The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default is "legacy" which will use the legacy v1 version. See the user guide @@ -188,7 +188,7 @@ def __init__( schema: Optional[pa.Schema] = None, mode: Literal["create", "append", "overwrite"] = "create", max_rows_per_file: int = 1024 * 1024, - data_storage_version: str = "stable", + data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, Any]] = None, *args, @@ -273,11 +273,10 @@ class LanceFragmentWriter: max_rows_per_group : int, optional The maximum number of rows per group. Default is 1024. Only useful for v1 writer. - data_storage_version: optional, str, default "legacy" + data_storage_version: optional, str, default None The version of the data storage format to use. Newer versions are more - efficient but require newer versions of lance to read. The default is - "legacy" which will use the legacy v1 version. See the user guide - for more details. + efficient but require newer versions of lance to read. The default + (None) will use the 2.0 version. See the user guide for more details. use_legacy_format : optional, bool, default None Deprecated method for setting the data storage version. Use the `data_storage_version` parameter instead. @@ -295,7 +294,7 @@ def __init__( max_rows_per_file: int = 1024 * 1024, max_bytes_per_file: Optional[int] = None, max_rows_per_group: Optional[int] = None, # Only useful for v1 writer. - data_storage_version: str = "stable", + data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = False, storage_options: Optional[Dict[str, Any]] = None, ): @@ -387,7 +386,7 @@ def write_lance( max_rows_per_file: int = 1024 * 1024, max_bytes_per_file: Optional[int] = None, storage_options: Optional[Dict[str, Any]] = None, - data_storage_version: str = "stable", + data_storage_version: Optional[str] = None, ) -> None: """Write Ray dataset at scale. @@ -410,11 +409,10 @@ def write_lance( The maximum number of bytes per file. Default is 90GB. storage_options : Dict[str, Any], optional The storage options for the writer. Default is None. - data_storage_version: optional, str, default "legacy" + data_storage_version: optional, str, default None The version of the data storage format to use. Newer versions are more - efficient but require newer versions of lance to read. The default is - "legacy" which will use the legacy v1 version. See the user guide - for more details. + efficient but require newer versions of lance to read. The default + (None) will use the 2.0 version. See the user guide for more details. """ data.map_batches( LanceFragmentWriter( diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 5c865195ce..e651b270ce 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2075,6 +2075,17 @@ def test_dataset_restore(tmp_path: Path): assert dataset.count_rows() == 100 +def test_mixed_mode_overwrite(tmp_path: Path): + data = pa.table({"a": range(100)}) + dataset = lance.write_dataset(data, tmp_path, data_storage_version="legacy") + + assert dataset.data_storage_version == "0.1" + + dataset = lance.write_dataset(data, tmp_path, mode="overwrite") + + assert dataset.data_storage_version == "0.1" + + def test_roundtrip_reader(tmp_path: Path): # Can roundtrip a reader data = pa.table({"a": range(100)}) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 36e074387c..e593228c03 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -514,7 +514,23 @@ impl Dataset { ) }; - let mut storage_version = params.storage_version_or_default(); + let mut storage_version = match (params.mode, dataset.as_ref()) { + (WriteMode::Append, Some(dataset)) => { + // If appending to an existing dataset, always use the dataset version + let m = dataset.manifest.as_ref(); + m.data_storage_format.lance_file_version()? + } + (WriteMode::Overwrite, Some(dataset)) => { + // If overwriting an existing dataset, allow the user to specify but use + // the existing version if they don't + params.data_storage_version.map(Ok).unwrap_or_else(|| { + let m = dataset.manifest.as_ref(); + m.data_storage_format.lance_file_version() + })? + } + // Otherwise (no existing dataset) fallback to the default if the user didn't specify + _ => params.storage_version_or_default(), + }; // append + input schema different from existing schema = error if matches!(params.mode, WriteMode::Append) { @@ -3970,6 +3986,64 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_overwrite_mixed_version() { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + false, + )])); + let arr = Arc::new(Int32Array::from(vec![1, 2, 3])); + + let data = RecordBatch::try_new(schema.clone(), vec![arr]).unwrap(); + let reader = + RecordBatchIterator::new(vec![data.clone()].into_iter().map(Ok), schema.clone()); + + let dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::Legacy), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!( + dataset + .manifest + .data_storage_format + .lance_file_version() + .unwrap(), + LanceFileVersion::Legacy + ); + + let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema); + let dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!( + dataset + .manifest + .data_storage_format + .lance_file_version() + .unwrap(), + LanceFileVersion::Legacy + ); + } + // Bug: https://github.com/lancedb/lancedb/issues/1223 #[tokio::test] async fn test_open_nonexisting_dataset() {