Skip to content

Commit

Permalink
Change the default data storage version to "stable" (e.g. v2.0)
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Sep 4, 2024
1 parent 2a9f8b5 commit 359a686
Show file tree
Hide file tree
Showing 18 changed files with 277 additions and 149 deletions.
2 changes: 1 addition & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2816,7 +2816,7 @@ def write_dataset(
commit_lock: Optional[CommitLock] = None,
progress: Optional[FragmentWriteProgress] = None,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
) -> LanceDataset:
"""Write a given data_obj to the given uri
Expand Down
4 changes: 2 additions & 2 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def create(
progress: Optional[FragmentWriteProgress] = None,
mode: str = "append",
*,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> FragmentMetadata:
Expand Down Expand Up @@ -528,7 +528,7 @@ def write_fragments(
max_rows_per_group: int = 1024,
max_bytes_per_file: int = DEFAULT_MAX_BYTES_PER_FILE,
progress: Optional[FragmentWriteProgress] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> List[FragmentMetadata]:
Expand Down
8 changes: 4 additions & 4 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _write_fragment(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: int = 1024, # Only useful for v1 writer.
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
storage_options: Optional[Dict[str, Any]] = None,
) -> Tuple[FragmentMetadata, pa.Schema]:
from ..dependencies import _PANDAS_AVAILABLE
Expand Down Expand Up @@ -188,7 +188,7 @@ def __init__(
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",
max_rows_per_file: int = 1024 * 1024,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, Any]] = None,
*args,
Expand Down Expand Up @@ -295,7 +295,7 @@ def __init__(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: Optional[int] = None, # Only useful for v1 writer.
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = False,
storage_options: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -387,7 +387,7 @@ def write_lance(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
storage_options: Optional[Dict[str, Any]] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
) -> None:
"""Write Ray dataset at scale.
Expand Down
27 changes: 26 additions & 1 deletion python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def test_roundtrip_types(tmp_path: Path):
}
)

dataset = lance.write_dataset(table, tmp_path)
# TODO: V2 does not currently handle large_dict
dataset = lance.write_dataset(table, tmp_path, data_storage_version="legacy")
assert dataset.schema == table.schema
assert dataset.to_table() == table

Expand Down Expand Up @@ -2205,3 +2206,27 @@ def test_late_materialization_batch_size(tmp_path: Path):
columns=["values"], filter="filter % 2 == 0", batch_size=32
):
assert batch.num_rows == 32


EXPECTED_DEFAULT_STORAGE_VERSION = "2.0"
EXPECTED_MAJOR_VERSION = 0
EXPECTED_MINOR_VERSION = 3


def test_default_storage_version(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path)
assert dataset.data_storage_version == EXPECTED_DEFAULT_STORAGE_VERSION

frag = lance.LanceFragment.create(dataset.uri, table)
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION

from lance.fragment import write_fragments

frags = write_fragments(table, dataset.uri)
frag = frags[0]
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION
68 changes: 47 additions & 21 deletions rust/lance-datagen/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::{iter, marker::PhantomData, sync::Arc};
use arrow::{
array::{ArrayData, AsArray},
buffer::{BooleanBuffer, Buffer, OffsetBuffer, ScalarBuffer},
datatypes::{ArrowPrimitiveType, Int32Type, IntervalDayTime, IntervalMonthDayNano},
datatypes::{ArrowPrimitiveType, Int32Type, Int64Type, IntervalDayTime, IntervalMonthDayNano},
};
use arrow_array::{
make_array,
types::{ArrowDictionaryKeyType, BinaryType, ByteArrayType, Utf8Type},
Array, FixedSizeBinaryArray, FixedSizeListArray, ListArray, NullArray, PrimitiveArray,
RecordBatch, RecordBatchOptions, RecordBatchReader, StringArray, StructArray,
Array, FixedSizeBinaryArray, FixedSizeListArray, LargeListArray, ListArray, NullArray,
PrimitiveArray, RecordBatch, RecordBatchOptions, RecordBatchReader, StringArray, StructArray,
};
use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SchemaRef};
use futures::{stream::BoxStream, StreamExt};
Expand Down Expand Up @@ -913,20 +913,32 @@ struct RandomListGenerator {
child_field: Arc<Field>,
items_gen: Box<dyn ArrayGenerator>,
lengths_gen: Box<dyn ArrayGenerator>,
is_large: bool,
}

impl RandomListGenerator {
// Creates a list generator that generates random lists with lengths between 0 and 10 (inclusive)
fn new(items_gen: Box<dyn ArrayGenerator>) -> Self {
fn new(items_gen: Box<dyn ArrayGenerator>, is_large: bool) -> Self {
let child_field = Arc::new(Field::new("item", items_gen.data_type().clone(), true));
let field = Field::new("", DataType::List(child_field.clone()), true);
let lengths_dist = Uniform::new_inclusive(0, 10);
let lengths_gen = rand_with_distribution::<Int32Type, Uniform<i32>>(lengths_dist);
let list_type = if is_large {
DataType::LargeList(child_field.clone())
} else {
DataType::List(child_field.clone())
};
let field = Field::new("", list_type, true);
let lengths_gen = if is_large {
let lengths_dist = Uniform::new_inclusive(0, 10);
rand_with_distribution::<Int64Type, Uniform<i64>>(lengths_dist)
} else {
let lengths_dist = Uniform::new_inclusive(0, 10);
rand_with_distribution::<Int32Type, Uniform<i32>>(lengths_dist)
};
Self {
field: Arc::new(field),
child_field,
items_gen,
lengths_gen,
is_large,
}
}
}
Expand All @@ -938,16 +950,29 @@ impl ArrayGenerator for RandomListGenerator {
rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
) -> Result<Arc<dyn Array>, ArrowError> {
let lengths = self.lengths_gen.generate(length, rng)?;
let lengths = lengths.as_primitive::<Int32Type>();
let total_length = lengths.values().iter().sum::<i32>() as u64;
let offsets = OffsetBuffer::from_lengths(lengths.values().iter().map(|v| *v as usize));
let items = self.items_gen.generate(RowCount::from(total_length), rng)?;
Ok(Arc::new(ListArray::try_new(
self.child_field.clone(),
offsets,
items,
None,
)?))
if self.is_large {
let lengths = lengths.as_primitive::<Int64Type>();
let total_length = lengths.values().iter().sum::<i64>() as u64;
let offsets = OffsetBuffer::from_lengths(lengths.values().iter().map(|v| *v as usize));
let items = self.items_gen.generate(RowCount::from(total_length), rng)?;
Ok(Arc::new(LargeListArray::try_new(
self.child_field.clone(),
offsets,
items,
None,
)?))
} else {
let lengths = lengths.as_primitive::<Int32Type>();
let total_length = lengths.values().iter().sum::<i32>() as u64;
let offsets = OffsetBuffer::from_lengths(lengths.values().iter().map(|v| *v as usize));
let items = self.items_gen.generate(RowCount::from(total_length), rng)?;
Ok(Arc::new(ListArray::try_new(
self.child_field.clone(),
offsets,
items,
None,
)?))
}
}

fn data_type(&self) -> &DataType {
Expand Down Expand Up @@ -1688,9 +1713,9 @@ pub mod array {
Box::<RandomBooleanGenerator>::default()
}

pub fn rand_list(item_type: &DataType) -> Box<dyn ArrayGenerator> {
pub fn rand_list(item_type: &DataType, is_large: bool) -> Box<dyn ArrayGenerator> {
let child_gen = rand_type(item_type);
Box::new(RandomListGenerator::new(child_gen))
Box::new(RandomListGenerator::new(child_gen, is_large))
}

pub fn rand_struct(fields: Fields) -> Box<dyn ArrayGenerator> {
Expand Down Expand Up @@ -1734,7 +1759,8 @@ pub mod array {
Dimension::from(*dimension as u32),
),
DataType::FixedSizeBinary(size) => rand_fsb(*size),
DataType::List(child) => rand_list(child.data_type()),
DataType::List(child) => rand_list(child.data_type(), false),
DataType::LargeList(child) => rand_list(child.data_type(), true),
DataType::Duration(unit) => match unit {
TimeUnit::Second => rand::<DurationSecondType>(),
TimeUnit::Millisecond => rand::<DurationMillisecondType>(),
Expand Down Expand Up @@ -1940,7 +1966,7 @@ mod tests {
fn test_rng_list() {
// Note: these tests are heavily dependent on the default seed.
let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
let mut gen = array::rand_list(&DataType::Int32);
let mut gen = array::rand_list(&DataType::Int32, false);
let arr = gen.generate(RowCount::from(100), &mut rng).unwrap();
// Make sure we can generate empty lists (note, test is dependent on seed)
let arr = arr.as_list::<i32>();
Expand Down
17 changes: 6 additions & 11 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,11 @@ pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {

/// The core array encoding strategy is a set of basic encodings that
/// are generally applicable in most scenarios.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct CoreArrayEncodingStrategy {
pub version: LanceFileVersion,
}

impl Default for CoreArrayEncodingStrategy {
fn default() -> Self {
Self {
version: LanceFileVersion::default_v2(),
}
}
}

fn get_compression_scheme(field_meta: Option<&HashMap<String, String>>) -> CompressionScheme {
field_meta
.map(|metadata| {
Expand Down Expand Up @@ -699,11 +691,14 @@ pub struct CoreFieldEncodingStrategy {
pub version: LanceFileVersion,
}

// For some reason clippy has a false negative and thinks this can be derived but
// it can't because ArrayEncodingStrategy has no default implementation
#[allow(clippy::derivable_impls)]
impl Default for CoreFieldEncodingStrategy {
fn default() -> Self {
Self {
array_encoding_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
version: LanceFileVersion::default_v2(),
version: LanceFileVersion::default(),
}
}
}
Expand Down Expand Up @@ -762,7 +757,7 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
)?))
} else {
match data_type {
DataType::List(_child) => {
DataType::List(_child) | DataType::LargeList(_child) => {
let list_idx = column_index.next_column_index(field.id);
let inner_encoding = encoding_strategy_root.create_field_encoder(
encoding_strategy_root,
Expand Down
31 changes: 30 additions & 1 deletion rust/lance-encoding/src/encodings/logical/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ mod tests {

use std::{collections::HashMap, sync::Arc};

use arrow::array::StringBuilder;
use arrow::array::{LargeListBuilder, StringBuilder};
use arrow_array::{
builder::{Int32Builder, ListBuilder},
Array, ArrayRef, BooleanArray, ListArray, StructArray, UInt64Array,
Expand All @@ -1230,12 +1230,22 @@ mod tests {
DataType::List(Arc::new(Field::new("item", inner_type, true)))
}

fn make_large_list_type(inner_type: DataType) -> DataType {
DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
}

#[test_log::test(tokio::test)]
async fn test_list() {
let field = Field::new("", make_list_type(DataType::Int32), true);
check_round_trip_encoding_random(field, HashMap::new()).await;
}

#[test_log::test(tokio::test)]
async fn test_large_list() {
let field = Field::new("", make_large_list_type(DataType::Int32), true);
check_round_trip_encoding_random(field, HashMap::new()).await;
}

#[test_log::test(tokio::test)]
async fn test_nested_strings() {
let field = Field::new("", make_list_type(DataType::Utf8), true);
Expand Down Expand Up @@ -1300,6 +1310,25 @@ mod tests {
.await;
}

#[test_log::test(tokio::test)]
async fn test_simple_large_list() {
let items_builder = Int32Builder::new();
let mut list_builder = LargeListBuilder::new(items_builder);
list_builder.append_value([Some(1), Some(2), Some(3)]);
list_builder.append_value([Some(4), Some(5)]);
list_builder.append_null();
list_builder.append_value([Some(6), Some(7), Some(8)]);
let list_array = list_builder.finish();

let test_cases = TestCases::default()
.with_range(0..2)
.with_range(0..3)
.with_range(1..3)
.with_indices(vec![1, 3]);
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
.await;
}

#[test_log::test(tokio::test)]
async fn test_empty_lists() {
// Scenario 1: Some lists are empty
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-encoding/src/encodings/physical/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ pub(crate) mod tests {
let mut buffed_index = 1;
let encoder = ValueEncoder::try_new(Arc::new(CoreBufferEncodingStrategy {
compression_scheme: CompressionScheme::None,
version: LanceFileVersion::default_v2(),
version: LanceFileVersion::default(),
}))
.unwrap();
let result = encoder.encode(&arrs, &mut buffed_index).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-encoding/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub async fn check_round_trip_encoding_random(field: Field, metadata: HashMap<St
check_round_trip_encoding_generated(
field,
Box::new(array_generator_provider),
LanceFileVersion::default_v2(),
LanceFileVersion::default(),
)
.await;
}
Expand Down
8 changes: 1 addition & 7 deletions rust/lance-encoding/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub enum LanceFileVersion {
// Note that Stable must come AFTER the stable version and Next must come AFTER the next version
// this way comparisons like x >= V2_0 will work the same if x is Stable or V2_0
/// The legacy (0.1) format
#[default]
Legacy,
#[default]
V2_0,
/// The latest stable release
Stable,
Expand All @@ -36,12 +36,6 @@ impl LanceFileVersion {
}
}

/// Returns the default version if Legacy is not an option
pub fn default_v2() -> Self {
// This will go away soon, but there are a few spots where the Legacy default doesn't make sense
Self::V2_0
}

pub fn try_from_major_minor(major: u32, minor: u32) -> Result<Self> {
match (major, minor) {
(0, 0) => Ok(Self::Legacy),
Expand Down
10 changes: 2 additions & 8 deletions rust/lance-file/src/v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,7 @@ impl FileWriter {

let keep_original_array = self.options.keep_original_array.unwrap_or(false);
let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
let version = self
.options
.format_version
.unwrap_or(LanceFileVersion::default_v2());
let version = self.options.format_version.unwrap_or_default();
Arc::new(CoreFieldEncodingStrategy {
array_encoding_strategy: Arc::new(CoreArrayEncodingStrategy { version }),
version,
Expand Down Expand Up @@ -449,10 +446,7 @@ impl FileWriter {
/// Converts self.version (which is a mix of "software version" and
/// "format version" into a format version)
fn version_to_numbers(&self) -> (u16, u16) {
let version = self
.options
.format_version
.unwrap_or(LanceFileVersion::default_v2());
let version = self.options.format_version.unwrap_or_default();
match version.resolve() {
LanceFileVersion::V2_0 => (0, 3),
LanceFileVersion::V2_1 => (2, 1),
Expand Down
Loading

0 comments on commit 359a686

Please sign in to comment.