From 54c8844825061e5f436018f419ff988ec37c6b83 Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Mon, 22 Jul 2024 23:00:16 +0000 Subject: [PATCH 01/10] optimize write perf for random access --- rust/lance-io/src/scheduler.rs | 103 ++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 2 deletions(-) diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 6e8c62db18..aa29bc9824 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -179,9 +179,11 @@ impl ScanScheduler { let reader = self.object_store.open(path).await?; let mut file_counter = self.file_counter.lock().unwrap(); let file_index = *file_counter; + let block_size = self.object_store.block_size() as u64; *file_counter += 1; Ok(FileScheduler { reader: reader.into(), + block_size, root: self.clone(), file_index, }) @@ -243,9 +245,19 @@ impl ScanScheduler { pub struct FileScheduler { reader: Arc, root: Arc, + block_size: u64, file_index: u32, } +fn is_close_together(range1: &Range, range2: &Range, block_size: u64) -> bool { + debug_assert!(range1.end <= range2.start); + range2.start <= (range1.end + block_size) +} + +fn is_overlapping(range1: &Range, range2: &Range) -> bool { + range1.start < range2.end && range2.start < range1.end +} + impl FileScheduler { /// Submit a batch of I/O requests to the reader /// @@ -258,8 +270,61 @@ impl FileScheduler { ) -> impl Future>> + Send { // The final priority is a combination of the row offset and the file number let priority = ((self.file_index as u128) << 64) + priority as u128; - self.root - .submit_request(self.reader.clone(), request, priority) + + let mut updated_requests = Vec::new(); + + let copy_request = request.clone(); + + if !request.is_empty() { + let mut curr_interval = request[0].clone(); + + for req in request.iter().skip(1) { + if is_close_together(&curr_interval, req, self.block_size) { + curr_interval.end = curr_interval.end.max(req.end); + } else { + updated_requests.push(curr_interval); + curr_interval = req.clone(); + } + } + + updated_requests.push(curr_interval); + } + + let copy_updated_requests = updated_requests.clone(); + + let bytes_vec_fut = + self.root + .submit_request(self.reader.clone(), updated_requests, priority); + + let mut updated_index = 0; + let mut final_bytes = Vec::with_capacity(copy_request.len()); + + async move { + let bytes_vec = bytes_vec_fut.await?; + + let mut orig_index = 0; + while (updated_index < copy_updated_requests.len()) && (orig_index < copy_request.len()) + { + let updated_range = ©_updated_requests[updated_index]; + let orig_range = ©_request[orig_index]; + let byte_offset = updated_range.start as usize; + + if is_overlapping(updated_range, orig_range) { + // Rescale the ranges since they correspond to the entire set of bytes, while + // But we need to slice into a subset of the bytes in a particular index of bytes_vec + let start = orig_range.start as usize - byte_offset; + let end = orig_range.end as usize - byte_offset; + + let sliced_range = bytes_vec[updated_index].slice(start..end); + final_bytes.push(sliced_range); + orig_index += 1; + } else { + updated_index += 1; + } + } + + Ok(final_bytes) + } } /// Submit a single IOP to the reader @@ -426,4 +491,38 @@ mod tests { semaphore_copy.add_permits(1); assert!(second_fut.await.unwrap().unwrap().len() == 20); } + + // #[tokio::test] + // async fn test_submit_request() { + // let tmpdir = tempdir().unwrap(); + // let tmp_path = tmpdir.path().to_str().unwrap(); + // let tmp_path = Path::parse(tmp_path).unwrap(); + // let tmp_file = tmp_path.child("foo.file"); + + // let obj_store = Arc::new(ObjectStore::local()); + + // // Write 1MiB of data + // // const DATA_SIZE: u64 = 100; + // let some_data = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; + // // let mut some_data = vec![0; DATA_SIZE as usize]; + // // rand::thread_rng().fill_bytes(&mut some_data); + // obj_store.put(&tmp_file, &some_data).await.unwrap(); + + // let scheduler = ScanScheduler::new(obj_store); + // let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap(); + + // let test_cases = vec![ + // vec![0..3, 3..4, 7..8, 9..10], + // vec![1..2, 2..3, 3..4, 4..5, 5..6, 6..7, 7..8, 8..9, 9..10, 10..11, 11..12], + // ] + + // for case in test_cases { + // let bytes = file_scheduler + // .submit_request(case, 0) + // .await + // .unwrap(); + + // assert_eq!() + // } + // } } From c63b565d1b3e2617337d45256983d649fa483954 Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Wed, 24 Jul 2024 03:54:04 +0000 Subject: [PATCH 02/10] coalesced all scheduling in the binary decoding workflow --- protos/encodings.proto | 2 +- rust/lance-encoding-datafusion/src/zone.rs | 1 + rust/lance-encoding/src/encoder.rs | 1 + rust/lance-encoding/src/encodings/physical.rs | 10 +- .../src/encodings/physical/binary.rs | 92 ++++++++++--------- .../src/encodings/physical/bitmap.rs | 1 + .../src/encodings/physical/value.rs | 1 + rust/lance-encoding/src/lib.rs | 1 + rust/lance-io/src/scheduler.rs | 7 +- 9 files changed, 64 insertions(+), 52 deletions(-) diff --git a/protos/encodings.proto b/protos/encodings.proto index 453e5866f5..9462832b7d 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -181,7 +181,7 @@ message SimpleStruct {} // An array encoding for binary fields message Binary { - ArrayEncoding indices = 1; + Buffer indices = 1; ArrayEncoding bytes = 2; uint64 null_adjustment = 3; } diff --git a/rust/lance-encoding-datafusion/src/zone.rs b/rust/lance-encoding-datafusion/src/zone.rs index ba139aaf5b..ffa9d312ec 100644 --- a/rust/lance-encoding-datafusion/src/zone.rs +++ b/rust/lance-encoding-datafusion/src/zone.rs @@ -210,6 +210,7 @@ impl ZoneMapsFieldScheduler { unloaded_pushdown.position..(unloaded_pushdown.position + unloaded_pushdown.size) }) .collect::>(); + println!("Doing zone submit request"); let zone_maps_fut = io.submit_request(ranges, 0); async move { let zone_map_buffers = zone_maps_fut.await?; diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index e4a09b9851..0091942fe1 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -51,6 +51,7 @@ impl std::fmt::Debug for EncodedBuffer { } } +#[derive(Clone)] pub struct EncodedArrayBuffer { /// The data making up the buffer pub parts: Vec, diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index 129a5b886c..3cc8fd0ddb 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -132,11 +132,12 @@ pub fn decoder_from_array_encoding( decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type) } pb::array_encoding::ArrayEncoding::Binary(binary) => { - let indices_encoding = binary.indices.as_ref().unwrap(); + let indices_buffer_desc = binary.indices.as_ref().unwrap(); + let (buffer_offset, _) = get_buffer(indices_buffer_desc, buffers); let bytes_encoding = binary.bytes.as_ref().unwrap(); - let indices_scheduler = - decoder_from_array_encoding(indices_encoding, buffers, data_type); + // let indices_scheduler = + // decoder_from_array_encoding(indices_buffer, buffers, data_type); let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type); let offset_type = match data_type { @@ -145,9 +146,10 @@ pub fn decoder_from_array_encoding( }; Box::new(BinaryPageScheduler::new( - indices_scheduler.into(), + // indices_scheduler.into(), bytes_scheduler.into(), offset_type, + buffer_offset, binary.null_adjustment, )) } diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index f05fed03e3..0c9ce079a2 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -9,8 +9,7 @@ use arrow_array::types::UInt64Type; use arrow_array::{Array, ArrayRef}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, ScalarBuffer}; use bytes::BytesMut; -use futures::stream::StreamExt; -use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt}; +use futures::{future::BoxFuture, FutureExt}; use crate::{ decoder::{PageScheduler, PrimitivePageDecoder}, @@ -19,12 +18,10 @@ use crate::{ EncodingsIo, }; -use crate::decoder::LogicalPageDecoder; -use crate::encodings::logical::primitive::PrimitiveFieldDecoder; - use arrow_array::{PrimitiveArray, UInt64Array, UInt8Array}; use arrow_schema::DataType; use lance_core::Result; +use arrow_buffer::Buffer; struct IndicesNormalizer { indices: Vec, @@ -77,38 +74,31 @@ impl IndicesNormalizer { #[derive(Debug)] pub struct BinaryPageScheduler { - indices_scheduler: Arc, + // indices_scheduler: Arc, bytes_scheduler: Arc, offsets_type: DataType, + buffer_offset: u64, null_adjustment: u64, } impl BinaryPageScheduler { pub fn new( - indices_scheduler: Arc, + // indices_scheduler: Arc, bytes_scheduler: Arc, offsets_type: DataType, + buffer_offset: u64, null_adjustment: u64, ) -> Self { Self { - indices_scheduler, + // indices_scheduler, bytes_scheduler, offsets_type, + buffer_offset, null_adjustment, } } } -impl BinaryPageScheduler { - fn decode_indices(decoder: Arc, num_rows: u64) -> Result { - let mut primitive_wrapper = - PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows); - let drained_task = primitive_wrapper.drain(num_rows)?; - let indices_decode_task = drained_task.task; - indices_decode_task.decode() - } -} - impl PageScheduler for BinaryPageScheduler { fn schedule_ranges( &self, @@ -120,26 +110,26 @@ impl PageScheduler for BinaryPageScheduler { // if user wants row range a..b // Case 1: if a != 0, we need indices a-1..b to decode // Case 2: if a = 0, we need indices 0..b to decode - let indices_ranges = ranges + // To get the byte ranges we then multiply these by 8 + // Then we add the buffer offset to map to the correct buffer in the page, + // since multiple encoding tasks may have been used to create the page + let indices_byte_ranges = ranges .iter() .map(|range| { if range.start != 0 { - (range.start - 1)..(range.end) + (self.buffer_offset + ((range.start - 1) * 8))..(self.buffer_offset + (range.end * 8)) } else { - 0..(range.end) + self.buffer_offset..(self.buffer_offset + (range.end * 8)) } }) .collect::>>(); let num_rows = ranges.iter().map(|r| r.end - r.start).sum::(); - let mut futures_ordered = indices_ranges - .iter() - .map(|range| { - self.indices_scheduler - .schedule_ranges(&[range.clone()], scheduler, top_level_row) - }) - .collect::>(); + // We schedule all the indices for decoding together + // This is more efficient compared to scheduling them one by one (reduces speed significantly for random access) + println!("Binary submit request"); + let indices_bytes = scheduler.submit_request(indices_byte_ranges, top_level_row); let ranges = ranges.to_vec(); let copy_scheduler = scheduler.clone(); @@ -159,20 +149,25 @@ impl PageScheduler for BinaryPageScheduler { // Cumulative sum: 0, 4 | 8 | 13 // These are the normalized offsets stored in decoded_indices // Rest of the workflow is continued later in BinaryPageDecoder + let decoded_indices_vec = &indices_bytes.await?; let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment); let mut bytes_ranges = Vec::new(); - let mut curr_range_idx = 0; - while let Some(indices_page_decoder) = futures_ordered.next().await { - let decoder = Arc::from(indices_page_decoder?); - // Build and run decode task for offsets - let curr_indices_range = indices_ranges[curr_range_idx].clone(); - let curr_row_range = ranges[curr_range_idx].clone(); - let indices_num_rows = curr_indices_range.end - curr_indices_range.start; + for (index, curr_row_range) in ranges.iter().enumerate() { - let indices = Self::decode_indices(decoder, indices_num_rows)?; - let indices = indices.as_primitive::(); + let decoded_indices = &decoded_indices_vec[index]; + let decoded_indices = UInt64Array::new(Buffer::from(decoded_indices).into(), None); + + let row_start = curr_row_range.start as usize; + let curr_range_len = (curr_row_range.end - curr_row_range.start) as usize; + let indices; + if row_start == 0 { + indices = decoded_indices.slice(0, curr_range_len); + } + else { + indices = decoded_indices.slice(0, curr_range_len+1); + } let first = if curr_row_range.start == 0 { 0 @@ -188,9 +183,7 @@ impl PageScheduler for BinaryPageScheduler { bytes_ranges.push(first..last); } - indices_builder.extend(indices, curr_row_range.start == 0); - - curr_range_idx += 1; + indices_builder.extend(&indices, curr_row_range.start == 0); } let (indices, validity) = indices_builder.into_parts(); @@ -409,7 +402,6 @@ fn get_indices_from_string_arrays(arrays: &[ArrayRef]) -> (ArrayRef, u64) { } indices_offset += array.len(); } - (Arc::new(UInt64Array::from(indices)), null_adjustment) } @@ -454,20 +446,30 @@ fn get_bytes_from_string_arrays(arrays: &[ArrayRef]) -> Vec { impl ArrayEncoder for BinaryEncoder { fn encode(&self, arrays: &[ArrayRef], buffer_index: &mut u32) -> Result { let (index_array, null_adjustment) = get_indices_from_string_arrays(arrays); - let encoded_indices = self.indices_encoder.encode(&[index_array], buffer_index)?; + let encoded_indices = self.indices_encoder.encode(&[index_array], &mut 0)?; let byte_arrays = get_bytes_from_string_arrays(arrays); - let encoded_bytes = self.bytes_encoder.encode(&byte_arrays, buffer_index)?; + let encoded_bytes = self.bytes_encoder.encode(&byte_arrays, &mut 1)?; let mut encoded_buffers = encoded_indices.buffers; encoded_buffers.extend(encoded_bytes.buffers); + for mut buf in encoded_buffers.clone() { + buf.index = *buffer_index + buf.index; + } + + let index = *buffer_index; + *buffer_index += 1; + Ok(EncodedArray { buffers: encoded_buffers, encoding: pb::ArrayEncoding { array_encoding: Some(pb::array_encoding::ArrayEncoding::Binary(Box::new( pb::Binary { - indices: Some(Box::new(encoded_indices.encoding)), + indices: Some(pb::Buffer { + buffer_index: index, + buffer_type: pb::buffer::BufferType::Page as i32, + }), bytes: Some(Box::new(encoded_bytes.encoding)), null_adjustment, }, @@ -494,7 +496,7 @@ pub mod tests { use super::get_indices_from_string_arrays; #[test_log::test(tokio::test)] - async fn test_utf8() { + async fn test_utf8_binary() { let field = Field::new("", DataType::Utf8, false); check_round_trip_encoding_random(field).await; } diff --git a/rust/lance-encoding/src/encodings/physical/bitmap.rs b/rust/lance-encoding/src/encodings/physical/bitmap.rs index eaf9dffb84..db11e17f4d 100644 --- a/rust/lance-encoding/src/encodings/physical/bitmap.rs +++ b/rust/lance-encoding/src/encodings/physical/bitmap.rs @@ -63,6 +63,7 @@ impl PageScheduler for DenseBitmapScheduler { min, max ); + println!("Bitmap submit request"); let bytes = scheduler.submit_request(byte_ranges, top_level_row); async move { diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index fb9aa88c73..ae85bb8c7e 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -115,6 +115,7 @@ impl PageScheduler for ValuePageScheduler { min, max ); + println!("Value submit request"); let bytes = scheduler.submit_request(byte_ranges, top_level_row); let bytes_per_value = self.bytes_per_value; diff --git a/rust/lance-encoding/src/lib.rs b/rust/lance-encoding/src/lib.rs index a102c1bb32..a3e501cdea 100644 --- a/rust/lance-encoding/src/lib.rs +++ b/rust/lance-encoding/src/lib.rs @@ -52,6 +52,7 @@ pub trait EncodingsIo: Send + Sync { range: std::ops::Range, priority: u64, ) -> BoxFuture<'static, lance_core::Result> { + println!("Submitting single request"); self.submit_request(vec![range], priority) .map_ok(|mut v| v.pop().unwrap()) .boxed() diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index aa29bc9824..84d31fa7e3 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -250,7 +250,7 @@ pub struct FileScheduler { } fn is_close_together(range1: &Range, range2: &Range, block_size: u64) -> bool { - debug_assert!(range1.end <= range2.start); + // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays) range2.start <= (range1.end + block_size) } @@ -270,8 +270,9 @@ impl FileScheduler { ) -> impl Future>> + Send { // The final priority is a combination of the row offset and the file number let priority = ((self.file_index as u128) << 64) + priority as u128; + println!("Number of requests: {}", request.len()); - let mut updated_requests = Vec::new(); + let mut updated_requests = Vec::with_capacity(request.len()); let copy_request = request.clone(); @@ -290,6 +291,7 @@ impl FileScheduler { updated_requests.push(curr_interval); } + println!("length of updated_requests: {}", updated_requests.len()); let copy_updated_requests = updated_requests.clone(); let bytes_vec_fut = @@ -336,6 +338,7 @@ impl FileScheduler { range: Range, priority: u64, ) -> impl Future> + Send { + println!("Submitting single request"); self.submit_request(vec![range], priority) .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap()) } From d8f58bd004a84e4554b140cfdab0e80f7ec1d87f Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Wed, 24 Jul 2024 04:49:45 +0000 Subject: [PATCH 03/10] clippy --- rust/lance-encoding/src/encodings/physical.rs | 3 -- .../src/encodings/physical/binary.rs | 33 ++++++++----------- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index 3cc8fd0ddb..4ca06525aa 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -136,8 +136,6 @@ pub fn decoder_from_array_encoding( let (buffer_offset, _) = get_buffer(indices_buffer_desc, buffers); let bytes_encoding = binary.bytes.as_ref().unwrap(); - // let indices_scheduler = - // decoder_from_array_encoding(indices_buffer, buffers, data_type); let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type); let offset_type = match data_type { @@ -146,7 +144,6 @@ pub fn decoder_from_array_encoding( }; Box::new(BinaryPageScheduler::new( - // indices_scheduler.into(), bytes_scheduler.into(), offset_type, buffer_offset, diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 0c9ce079a2..648e74025a 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -19,9 +19,9 @@ use crate::{ }; use arrow_array::{PrimitiveArray, UInt64Array, UInt8Array}; +use arrow_buffer::Buffer; use arrow_schema::DataType; use lance_core::Result; -use arrow_buffer::Buffer; struct IndicesNormalizer { indices: Vec, @@ -74,7 +74,6 @@ impl IndicesNormalizer { #[derive(Debug)] pub struct BinaryPageScheduler { - // indices_scheduler: Arc, bytes_scheduler: Arc, offsets_type: DataType, buffer_offset: u64, @@ -83,14 +82,12 @@ pub struct BinaryPageScheduler { impl BinaryPageScheduler { pub fn new( - // indices_scheduler: Arc, bytes_scheduler: Arc, offsets_type: DataType, buffer_offset: u64, null_adjustment: u64, ) -> Self { Self { - // indices_scheduler, bytes_scheduler, offsets_type, buffer_offset, @@ -111,13 +108,14 @@ impl PageScheduler for BinaryPageScheduler { // Case 1: if a != 0, we need indices a-1..b to decode // Case 2: if a = 0, we need indices 0..b to decode // To get the byte ranges we then multiply these by 8 - // Then we add the buffer offset to map to the correct buffer in the page, + // Then we add the buffer offset to map to the correct buffer in the page, // since multiple encoding tasks may have been used to create the page let indices_byte_ranges = ranges .iter() .map(|range| { if range.start != 0 { - (self.buffer_offset + ((range.start - 1) * 8))..(self.buffer_offset + (range.end * 8)) + (self.buffer_offset + ((range.start - 1) * 8)) + ..(self.buffer_offset + (range.end * 8)) } else { self.buffer_offset..(self.buffer_offset + (range.end * 8)) } @@ -155,21 +153,18 @@ impl PageScheduler for BinaryPageScheduler { let mut bytes_ranges = Vec::new(); for (index, curr_row_range) in ranges.iter().enumerate() { - let decoded_indices = &decoded_indices_vec[index]; let decoded_indices = UInt64Array::new(Buffer::from(decoded_indices).into(), None); - let row_start = curr_row_range.start as usize; - let curr_range_len = (curr_row_range.end - curr_row_range.start) as usize; - let indices; - if row_start == 0 { - indices = decoded_indices.slice(0, curr_range_len); - } - else { - indices = decoded_indices.slice(0, curr_range_len+1); - } + let row_start = curr_row_range.start; + let curr_range_len = (curr_row_range.end - row_start) as usize; + let indices = if row_start == 0 { + decoded_indices.slice(0, curr_range_len) + } else { + decoded_indices.slice(0, curr_range_len + 1) + }; - let first = if curr_row_range.start == 0 { + let first = if row_start == 0 { 0 } else { indices_builder @@ -183,7 +178,7 @@ impl PageScheduler for BinaryPageScheduler { bytes_ranges.push(first..last); } - indices_builder.extend(&indices, curr_row_range.start == 0); + indices_builder.extend(&indices, row_start == 0); } let (indices, validity) = indices_builder.into_parts(); @@ -455,7 +450,7 @@ impl ArrayEncoder for BinaryEncoder { encoded_buffers.extend(encoded_bytes.buffers); for mut buf in encoded_buffers.clone() { - buf.index = *buffer_index + buf.index; + buf.index += *buffer_index; } let index = *buffer_index; From 3cdecf222917ddcd5889842098784cf5c79ac24b Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Wed, 24 Jul 2024 16:26:38 +0000 Subject: [PATCH 04/10] removed extra test --- rust/lance-io/src/scheduler.rs | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 84d31fa7e3..6f3c6d29a3 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -494,38 +494,4 @@ mod tests { semaphore_copy.add_permits(1); assert!(second_fut.await.unwrap().unwrap().len() == 20); } - - // #[tokio::test] - // async fn test_submit_request() { - // let tmpdir = tempdir().unwrap(); - // let tmp_path = tmpdir.path().to_str().unwrap(); - // let tmp_path = Path::parse(tmp_path).unwrap(); - // let tmp_file = tmp_path.child("foo.file"); - - // let obj_store = Arc::new(ObjectStore::local()); - - // // Write 1MiB of data - // // const DATA_SIZE: u64 = 100; - // let some_data = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; - // // let mut some_data = vec![0; DATA_SIZE as usize]; - // // rand::thread_rng().fill_bytes(&mut some_data); - // obj_store.put(&tmp_file, &some_data).await.unwrap(); - - // let scheduler = ScanScheduler::new(obj_store); - // let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap(); - - // let test_cases = vec![ - // vec![0..3, 3..4, 7..8, 9..10], - // vec![1..2, 2..3, 3..4, 4..5, 5..6, 6..7, 7..8, 8..9, 9..10, 10..11, 11..12], - // ] - - // for case in test_cases { - // let bytes = file_scheduler - // .submit_request(case, 0) - // .await - // .unwrap(); - - // assert_eq!() - // } - // } } From ad4153e132887a2545a62d5843db438bcf2cb894 Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Wed, 24 Jul 2024 16:43:23 +0000 Subject: [PATCH 05/10] added a benchmark --- .../python/benchmarks/test_random_access.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 python/python/benchmarks/test_random_access.py diff --git a/python/python/benchmarks/test_random_access.py b/python/python/benchmarks/test_random_access.py new file mode 100644 index 0000000000..1327083de3 --- /dev/null +++ b/python/python/benchmarks/test_random_access.py @@ -0,0 +1,37 @@ +from datetime import datetime + +import lance +import pyarrow.parquet as pq +from lance.tracing import trace_to_chrome + +trace_to_chrome(file="/tmp/foo.trace", level="debug") + +# This file compares the performance of lance v1 and v2 on the lineitem dataset, +# specifically for random access scans + +tab = pq.read_table("~/lineitemsf1.snappy.parquet") +dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", use_legacy_format=True) +dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", use_legacy_format=False) + +dsv1 = lance.dataset("/tmp/lineitem.lancev1") +dsv2 = lance.dataset("/tmp/lineitem.lancev2") + +start = datetime.now() +dsv1.to_table(filter="l_shipmode = 'FOB'", limit=10000) +duration = (datetime.now() - start).total_seconds() +print(f"V1 query time: {duration}s") + +start = datetime.now() +dsv2.to_table(filter="l_shipmode = 'FOB'", limit=10000) +duration = (datetime.now() - start).total_seconds() +print(f"V2 query time: {duration}s") + +start = datetime.now() +dsv1.take([1, 40, 100, 130, 200]) +duration = (datetime.now() - start).total_seconds() +print(f"V1 query time: {duration}s") + +start = datetime.now() +dsv2.take([1, 40, 100, 130, 200]) +duration = (datetime.now() - start).total_seconds() +print(f"V2 query time: {duration}s") From f38e0b473e0015bb87cb9590f1fda6f602eec048 Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Wed, 24 Jul 2024 16:44:44 +0000 Subject: [PATCH 06/10] license header --- python/python/benchmarks/test_random_access.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/python/benchmarks/test_random_access.py b/python/python/benchmarks/test_random_access.py index 1327083de3..2f988121aa 100644 --- a/python/python/benchmarks/test_random_access.py +++ b/python/python/benchmarks/test_random_access.py @@ -1,3 +1,6 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + from datetime import datetime import lance From ef9d4ba46510b0686091359880cdfe291024f025 Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Wed, 24 Jul 2024 16:49:34 +0000 Subject: [PATCH 07/10] removed print statements --- rust/lance-io/src/scheduler.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 6f3c6d29a3..d359d8ebf7 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -270,7 +270,6 @@ impl FileScheduler { ) -> impl Future>> + Send { // The final priority is a combination of the row offset and the file number let priority = ((self.file_index as u128) << 64) + priority as u128; - println!("Number of requests: {}", request.len()); let mut updated_requests = Vec::with_capacity(request.len()); @@ -291,7 +290,6 @@ impl FileScheduler { updated_requests.push(curr_interval); } - println!("length of updated_requests: {}", updated_requests.len()); let copy_updated_requests = updated_requests.clone(); let bytes_vec_fut = @@ -338,7 +336,6 @@ impl FileScheduler { range: Range, priority: u64, ) -> impl Future> + Send { - println!("Submitting single request"); self.submit_request(vec![range], priority) .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap()) } From cea66d16365a14d666f5055aa39a4757bb2581ee Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Wed, 24 Jul 2024 16:53:49 +0000 Subject: [PATCH 08/10] removed more print statements --- rust/lance-encoding-datafusion/src/zone.rs | 1 - rust/lance-encoding/src/encodings/physical/binary.rs | 1 - rust/lance-encoding/src/encodings/physical/bitmap.rs | 1 - rust/lance-encoding/src/encodings/physical/value.rs | 1 - rust/lance-encoding/src/lib.rs | 1 - 5 files changed, 5 deletions(-) diff --git a/rust/lance-encoding-datafusion/src/zone.rs b/rust/lance-encoding-datafusion/src/zone.rs index f8fbc71433..4bac25f73e 100644 --- a/rust/lance-encoding-datafusion/src/zone.rs +++ b/rust/lance-encoding-datafusion/src/zone.rs @@ -212,7 +212,6 @@ impl ZoneMapsFieldScheduler { unloaded_pushdown.position..(unloaded_pushdown.position + unloaded_pushdown.size) }) .collect::>(); - println!("Doing zone submit request"); let zone_maps_fut = io.submit_request(ranges, 0); async move { let zone_map_buffers = zone_maps_fut.await?; diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 8367d016dd..2b095a900d 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -126,7 +126,6 @@ impl PageScheduler for BinaryPageScheduler { // We schedule all the indices for decoding together // This is more efficient compared to scheduling them one by one (reduces speed significantly for random access) - println!("Binary submit request"); let indices_bytes = scheduler.submit_request(indices_byte_ranges, top_level_row); let ranges = ranges.to_vec(); diff --git a/rust/lance-encoding/src/encodings/physical/bitmap.rs b/rust/lance-encoding/src/encodings/physical/bitmap.rs index 2944e45dbc..80ea8b479a 100644 --- a/rust/lance-encoding/src/encodings/physical/bitmap.rs +++ b/rust/lance-encoding/src/encodings/physical/bitmap.rs @@ -63,7 +63,6 @@ impl PageScheduler for DenseBitmapScheduler { min, max ); - println!("Bitmap submit request"); let bytes = scheduler.submit_request(byte_ranges, top_level_row); async move { diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index 5d1e908d7b..1ac7d051c0 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -112,7 +112,6 @@ impl PageScheduler for ValuePageScheduler { min, max ); - println!("Value submit request"); let bytes = scheduler.submit_request(byte_ranges, top_level_row); let bytes_per_value = self.bytes_per_value; diff --git a/rust/lance-encoding/src/lib.rs b/rust/lance-encoding/src/lib.rs index a3e501cdea..a102c1bb32 100644 --- a/rust/lance-encoding/src/lib.rs +++ b/rust/lance-encoding/src/lib.rs @@ -52,7 +52,6 @@ pub trait EncodingsIo: Send + Sync { range: std::ops::Range, priority: u64, ) -> BoxFuture<'static, lance_core::Result> { - println!("Submitting single request"); self.submit_request(vec![range], priority) .map_ok(|mut v| v.pop().unwrap()) .boxed() From e219c57024aee48a926ed433417322da81166017 Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Mon, 29 Jul 2024 05:11:17 +0000 Subject: [PATCH 09/10] addressed comments --- protos/encodings.proto | 2 +- rust/lance-encoding/src/encodings/physical.rs | 7 +- .../src/encodings/physical/binary.rs | 84 ++++++++++--------- rust/lance-io/src/scheduler.rs | 15 ++-- 4 files changed, 55 insertions(+), 53 deletions(-) diff --git a/protos/encodings.proto b/protos/encodings.proto index 0a4ab81601..d884706f40 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -193,7 +193,7 @@ message SimpleStruct {} // An array encoding for binary fields message Binary { - Buffer indices = 1; + ArrayEncoding indices = 1; ArrayEncoding bytes = 2; uint64 null_adjustment = 3; } diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index b5a747bbb6..5eb35298e7 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -151,10 +151,11 @@ pub fn decoder_from_array_encoding( decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type) } pb::array_encoding::ArrayEncoding::Binary(binary) => { - let indices_buffer_desc = binary.indices.as_ref().unwrap(); - let (buffer_offset, _) = get_buffer(indices_buffer_desc, buffers); + let indices_encoding = binary.indices.as_ref().unwrap(); let bytes_encoding = binary.bytes.as_ref().unwrap(); + let indices_scheduler = + decoder_from_array_encoding(indices_encoding, buffers, data_type); let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type); let offset_type = match data_type { @@ -163,9 +164,9 @@ pub fn decoder_from_array_encoding( }; Box::new(BinaryPageScheduler::new( + indices_scheduler.into(), bytes_scheduler.into(), offset_type, - buffer_offset, binary.null_adjustment, )) } diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 2b095a900d..e7bbddcfdc 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -11,6 +11,8 @@ use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, ScalarBuffer}; use bytes::BytesMut; use futures::{future::BoxFuture, FutureExt}; +use crate::decoder::LogicalPageDecoder; +use crate::encodings::logical::primitive::PrimitiveFieldDecoder; use crate::{ decoder::{PageScheduler, PrimitivePageDecoder}, encoder::{ArrayEncoder, EncodedArray}, @@ -19,7 +21,6 @@ use crate::{ }; use arrow_array::{PrimitiveArray, UInt64Array, UInt8Array}; -use arrow_buffer::Buffer; use arrow_schema::DataType; use lance_core::Result; @@ -74,26 +75,34 @@ impl IndicesNormalizer { #[derive(Debug)] pub struct BinaryPageScheduler { + indices_scheduler: Arc, bytes_scheduler: Arc, offsets_type: DataType, - buffer_offset: u64, null_adjustment: u64, } impl BinaryPageScheduler { pub fn new( + indices_scheduler: Arc, bytes_scheduler: Arc, offsets_type: DataType, - buffer_offset: u64, null_adjustment: u64, ) -> Self { Self { + indices_scheduler, bytes_scheduler, offsets_type, - buffer_offset, null_adjustment, } } + + fn decode_indices(decoder: Arc, num_rows: u64) -> Result { + let mut primitive_wrapper = + PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows); + let drained_task = primitive_wrapper.drain(num_rows)?; + let indices_decode_task = drained_task.task; + indices_decode_task.decode() + } } impl PageScheduler for BinaryPageScheduler { @@ -110,23 +119,25 @@ impl PageScheduler for BinaryPageScheduler { // To get the byte ranges we then multiply these by 8 // Then we add the buffer offset to map to the correct buffer in the page, // since multiple encoding tasks may have been used to create the page - let indices_byte_ranges = ranges + let indices_ranges = ranges .iter() .map(|range| { if range.start != 0 { - (self.buffer_offset + ((range.start - 1) * 8)) - ..(self.buffer_offset + (range.end * 8)) + (range.start - 1)..range.end } else { - self.buffer_offset..(self.buffer_offset + (range.end * 8)) + 0..range.end } }) .collect::>>(); - let num_rows = ranges.iter().map(|r| r.end - r.start).sum::(); - // We schedule all the indices for decoding together // This is more efficient compared to scheduling them one by one (reduces speed significantly for random access) - let indices_bytes = scheduler.submit_request(indices_byte_ranges, top_level_row); + let indices_page_decoder = + self.indices_scheduler + .schedule_ranges(&indices_ranges, scheduler, top_level_row); + + let num_rows = ranges.iter().map(|r| r.end - r.start).sum::(); + let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::(); let ranges = ranges.to_vec(); let copy_scheduler = scheduler.clone(); @@ -146,38 +157,43 @@ impl PageScheduler for BinaryPageScheduler { // Cumulative sum: 0, 4 | 8 | 13 // These are the normalized offsets stored in decoded_indices // Rest of the workflow is continued later in BinaryPageDecoder - let decoded_indices_vec = &indices_bytes.await?; + let indices_decoder = Arc::from(indices_page_decoder.await?); + let indices = Self::decode_indices(indices_decoder, indices_num_rows)?; + let decoded_indices = indices.as_primitive::(); let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment); let mut bytes_ranges = Vec::new(); + let mut curr_offset_index = 0; - for (index, curr_row_range) in ranges.iter().enumerate() { - let decoded_indices = &decoded_indices_vec[index]; - let decoded_indices = UInt64Array::new(Buffer::from(decoded_indices).into(), None); - + for curr_row_range in ranges.iter() { let row_start = curr_row_range.start; let curr_range_len = (curr_row_range.end - row_start) as usize; - let indices = if row_start == 0 { - decoded_indices.slice(0, curr_range_len) + + let curr_indices; + + if row_start == 0 { + curr_indices = decoded_indices.slice(0, curr_range_len); + curr_offset_index = curr_range_len; } else { - decoded_indices.slice(0, curr_range_len + 1) - }; + curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1); + curr_offset_index += curr_range_len + 1; + } let first = if row_start == 0 { 0 } else { indices_builder - .normalize(*indices.values().first().unwrap()) + .normalize(*curr_indices.values().first().unwrap()) .1 }; let last = indices_builder - .normalize(*indices.values().last().unwrap()) + .normalize(*curr_indices.values().last().unwrap()) .1; if first != last { bytes_ranges.push(first..last); } - indices_builder.extend(&indices, row_start == 0); + indices_builder.extend(&curr_indices, row_start == 0); } let (indices, validity) = indices_builder.into_parts(); @@ -440,30 +456,20 @@ fn get_bytes_from_string_arrays(arrays: &[ArrayRef]) -> Vec { impl ArrayEncoder for BinaryEncoder { fn encode(&self, arrays: &[ArrayRef], buffer_index: &mut u32) -> Result { let (index_array, null_adjustment) = get_indices_from_string_arrays(arrays); - let encoded_indices = self.indices_encoder.encode(&[index_array], &mut 0)?; + let encoded_indices = self.indices_encoder.encode(&[index_array], buffer_index)?; let byte_arrays = get_bytes_from_string_arrays(arrays); - let encoded_bytes = self.bytes_encoder.encode(&byte_arrays, &mut 1)?; + let encoded_bytes = self.bytes_encoder.encode(&byte_arrays, buffer_index)?; let mut encoded_buffers = encoded_indices.buffers; encoded_buffers.extend(encoded_bytes.buffers); - for mut buf in encoded_buffers.clone() { - buf.index += *buffer_index; - } - - let index = *buffer_index; - *buffer_index += 1; - Ok(EncodedArray { buffers: encoded_buffers, encoding: pb::ArrayEncoding { array_encoding: Some(pb::array_encoding::ArrayEncoding::Binary(Box::new( pb::Binary { - indices: Some(pb::Buffer { - buffer_index: index, - buffer_type: pb::buffer::BufferType::Page as i32, - }), + indices: Some(Box::new(encoded_indices.encoding)), bytes: Some(Box::new(encoded_bytes.encoding)), null_adjustment, }, @@ -564,14 +570,14 @@ pub mod tests { } #[test_log::test(tokio::test)] - async fn test_simple_utf8() { - let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]); + async fn test_simple_utf8_binary() { + let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]); let test_cases = TestCases::default() .with_range(0..2) .with_range(0..3) .with_range(1..3) - .with_indices(vec![1, 3]); + .with_indices(vec![0, 1, 3, 4]); check_round_trip_encoding_of_data( vec![Arc::new(string_array)], &test_cases, diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index d359d8ebf7..8eb5eb8075 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -273,8 +273,6 @@ impl FileScheduler { let mut updated_requests = Vec::with_capacity(request.len()); - let copy_request = request.clone(); - if !request.is_empty() { let mut curr_interval = request[0].clone(); @@ -290,23 +288,20 @@ impl FileScheduler { updated_requests.push(curr_interval); } - let copy_updated_requests = updated_requests.clone(); - let bytes_vec_fut = self.root - .submit_request(self.reader.clone(), updated_requests, priority); + .submit_request(self.reader.clone(), updated_requests.clone(), priority); let mut updated_index = 0; - let mut final_bytes = Vec::with_capacity(copy_request.len()); + let mut final_bytes = Vec::with_capacity(request.len()); async move { let bytes_vec = bytes_vec_fut.await?; let mut orig_index = 0; - while (updated_index < copy_updated_requests.len()) && (orig_index < copy_request.len()) - { - let updated_range = ©_updated_requests[updated_index]; - let orig_range = ©_request[orig_index]; + while (updated_index < updated_requests.len()) && (orig_index < request.len()) { + let updated_range = &updated_requests[updated_index]; + let orig_range = &request[orig_index]; let byte_offset = updated_range.start as usize; if is_overlapping(updated_range, orig_range) { From 2a1951d059efd6af492294cbbb367e8c091a097f Mon Sep 17 00:00:00 2001 From: raunaks13 Date: Mon, 29 Jul 2024 23:54:06 +0000 Subject: [PATCH 10/10] fix --- rust/lance-encoding/src/encodings/physical/binary.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 4a4ab60e19..6bb90ff59a 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -120,9 +120,6 @@ impl PageScheduler for BinaryPageScheduler { // if user wants row range a..b // Case 1: if a != 0, we need indices a-1..b to decode // Case 2: if a = 0, we need indices 0..b to decode - // To get the byte ranges we then multiply these by 8 - // Then we add the buffer offset to map to the correct buffer in the page, - // since multiple encoding tasks may have been used to create the page let indices_ranges = ranges .iter() .map(|range| {