Skip to content

Commit

Permalink
fix: use existing storage version on overwrite, if none specified (#2924
Browse files Browse the repository at this point in the history
)

This was supposed to be the logic but was not tested and was not being
used correctly.
  • Loading branch information
westonpace authored Sep 23, 2024
1 parent ea78168 commit f48e970
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 29 deletions.
9 changes: 4 additions & 5 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2868,7 +2868,7 @@ def write_dataset(
commit_lock: Optional[CommitLock] = None,
progress: Optional[FragmentWriteProgress] = None,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: str = "stable",
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: bool = False,
) -> LanceDataset:
Expand Down Expand Up @@ -2910,11 +2910,10 @@ def write_dataset(
storage_options : optional, dict
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
data_storage_version: optional, str, default "legacy"
data_storage_version: optional, str, default None
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
for more details.
efficient but require newer versions of lance to read. The default (None)
will use the latest stable version. See the user guide for more details.
use_legacy_format : optional, bool, default None
Deprecated method for setting the data storage version. Use the
`data_storage_version` parameter instead.
Expand Down
18 changes: 8 additions & 10 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def create(
progress: Optional[FragmentWriteProgress] = None,
mode: str = "append",
*,
data_storage_version: str = "stable",
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> FragmentMetadata:
Expand Down Expand Up @@ -180,11 +180,10 @@ def create(
The write mode. If "append" is specified, the data will be checked
against the existing dataset's schema. Otherwise, pass "create" or
"overwrite" to assign new field ids to the schema.
data_storage_version: optional, str, default "legacy"
data_storage_version: optional, str, default None
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
for more details.
efficient but require newer versions of lance to read. The default (None)
will use the latest stable version. See the user guide for more details.
use_legacy_format: bool, default None
Deprecated parameter. Use data_storage_version instead.
storage_options : optional, dict
Expand Down Expand Up @@ -529,7 +528,7 @@ def write_fragments(
max_rows_per_group: int = 1024,
max_bytes_per_file: int = DEFAULT_MAX_BYTES_PER_FILE,
progress: Optional[FragmentWriteProgress] = None,
data_storage_version: str = "stable",
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> List[FragmentMetadata]:
Expand Down Expand Up @@ -568,11 +567,10 @@ def write_fragments(
*Experimental API*. Progress tracking for writing the fragment. Pass
a custom class that defines hooks to be called when each fragment is
starting to write and finishing writing.
data_storage_version: optional, str, default "legacy"
data_storage_version: optional, str, default None
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
for more details.
efficient but require newer versions of lance to read. The default (None)
will use the 2.0 version. See the user guide for more details.
use_legacy_format : optional, bool, default None
Deprecated method for setting the data storage version. Use the
`data_storage_version` parameter instead.
Expand Down
24 changes: 11 additions & 13 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _write_fragment(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: int = 1024, # Only useful for v1 writer.
data_storage_version: str = "stable",
data_storage_version: Optional[str] = None,
storage_options: Optional[Dict[str, Any]] = None,
) -> Tuple[FragmentMetadata, pa.Schema]:
from ..dependencies import _PANDAS_AVAILABLE
Expand Down Expand Up @@ -168,7 +168,7 @@ class LanceDatasink(_BaseLanceDatasink):
Choices are 'append', 'create', 'overwrite'.
max_rows_per_file : int, optional
The maximum number of rows per file. Default is 1024 * 1024.
data_storage_version: optional, str, default "legacy"
data_storage_version: optional, str, default None
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
Expand All @@ -188,7 +188,7 @@ def __init__(
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",
max_rows_per_file: int = 1024 * 1024,
data_storage_version: str = "stable",
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, Any]] = None,
*args,
Expand Down Expand Up @@ -273,11 +273,10 @@ class LanceFragmentWriter:
max_rows_per_group : int, optional
The maximum number of rows per group. Default is 1024.
Only useful for v1 writer.
data_storage_version: optional, str, default "legacy"
data_storage_version: optional, str, default None
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
for more details.
efficient but require newer versions of lance to read. The default
(None) will use the 2.0 version. See the user guide for more details.
use_legacy_format : optional, bool, default None
Deprecated method for setting the data storage version. Use the
`data_storage_version` parameter instead.
Expand All @@ -295,7 +294,7 @@ def __init__(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: Optional[int] = None, # Only useful for v1 writer.
data_storage_version: str = "stable",
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = False,
storage_options: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -387,7 +386,7 @@ def write_lance(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
storage_options: Optional[Dict[str, Any]] = None,
data_storage_version: str = "stable",
data_storage_version: Optional[str] = None,
) -> None:
"""Write Ray dataset at scale.
Expand All @@ -410,11 +409,10 @@ def write_lance(
The maximum number of bytes per file. Default is 90GB.
storage_options : Dict[str, Any], optional
The storage options for the writer. Default is None.
data_storage_version: optional, str, default "legacy"
data_storage_version: optional, str, default None
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
for more details.
efficient but require newer versions of lance to read. The default
(None) will use the 2.0 version. See the user guide for more details.
"""
data.map_batches(
LanceFragmentWriter(
Expand Down
11 changes: 11 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,17 @@ def test_dataset_restore(tmp_path: Path):
assert dataset.count_rows() == 100


def test_mixed_mode_overwrite(tmp_path: Path):
data = pa.table({"a": range(100)})
dataset = lance.write_dataset(data, tmp_path, data_storage_version="legacy")

assert dataset.data_storage_version == "0.1"

dataset = lance.write_dataset(data, tmp_path, mode="overwrite")

assert dataset.data_storage_version == "0.1"


def test_roundtrip_reader(tmp_path: Path):
# Can roundtrip a reader
data = pa.table({"a": range(100)})
Expand Down
76 changes: 75 additions & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,23 @@ impl Dataset {
)
};

let mut storage_version = params.storage_version_or_default();
let mut storage_version = match (params.mode, dataset.as_ref()) {
(WriteMode::Append, Some(dataset)) => {
// If appending to an existing dataset, always use the dataset version
let m = dataset.manifest.as_ref();
m.data_storage_format.lance_file_version()?
}
(WriteMode::Overwrite, Some(dataset)) => {
// If overwriting an existing dataset, allow the user to specify but use
// the existing version if they don't
params.data_storage_version.map(Ok).unwrap_or_else(|| {
let m = dataset.manifest.as_ref();
m.data_storage_format.lance_file_version()
})?
}
// Otherwise (no existing dataset) fallback to the default if the user didn't specify
_ => params.storage_version_or_default(),
};

// append + input schema different from existing schema = error
if matches!(params.mode, WriteMode::Append) {
Expand Down Expand Up @@ -3970,6 +3986,64 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_overwrite_mixed_version() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();

let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
false,
)]));
let arr = Arc::new(Int32Array::from(vec![1, 2, 3]));

let data = RecordBatch::try_new(schema.clone(), vec![arr]).unwrap();
let reader =
RecordBatchIterator::new(vec![data.clone()].into_iter().map(Ok), schema.clone());

let dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(LanceFileVersion::Legacy),
..Default::default()
}),
)
.await
.unwrap();

assert_eq!(
dataset
.manifest
.data_storage_format
.lance_file_version()
.unwrap(),
LanceFileVersion::Legacy
);

let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema);
let dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
mode: WriteMode::Overwrite,
..Default::default()
}),
)
.await
.unwrap();

assert_eq!(
dataset
.manifest
.data_storage_format
.lance_file_version()
.unwrap(),
LanceFileVersion::Legacy
);
}

// Bug: https://github.com/lancedb/lancedb/issues/1223
#[tokio::test]
async fn test_open_nonexisting_dataset() {
Expand Down

0 comments on commit f48e970

Please sign in to comment.