Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: coalesce scheduling of reads to speed up random access #2636

Merged
merged 12 commits into from
Jul 29, 2024
2 changes: 1 addition & 1 deletion protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ message SimpleStruct {}

// An array encoding for binary fields
message Binary {
ArrayEncoding indices = 1;
Buffer indices = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a breaking change to the format? What does that mean for users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a breaking change, as such. It won't affect users since the encoding and decoding workflows are still the same for binary types and there is no change from the user point of view
However we may have to revise this again once we use bitpacking for the indices encoding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't these protobuf messages serialized in the data files? I'm worried they won't be able to read Lance V2 files written with older versions. This seems to go against the protobuf advice for backwards compatibility:

Almost never change the type of a field; it’ll mess up deserialization, same as re-using a tag number. The protobuf docs outline a small number of cases that are okay (for example, going between int32, uint32, int64 and bool). However, changing a field’s message type will break unless the new message is a superset of the old one.

https://protobuf.dev/programming-guides/dos-donts/#change-type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see...yes, in that case it's a breaking change. But as per the current structure, we do need to be able to access the buffer index in the page if we want to coalesce requests, which makes changing the protobuf necessary (otherwise the string scheduling ends up taking too long during random access) .
Is there any recommended alternative in such a situation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create a new field and deprecate the old field. Or maybe create a BinaryV2 message and deprecate Binary or something like that. You can either have an error message if it has the old one, or handle both fields/messages in different code paths. cc @westonpace

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @wjones127 that, at this point, we want to start avoiding these breaking changes.

Also, this approach won't work. We want indices to be bitpacked which means we need this to be ArrayEncoding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the workflow to use ArrayEncoding but still schedule the requests together

ArrayEncoding bytes = 2;
uint64 null_adjustment = 3;
}
Expand Down
40 changes: 40 additions & 0 deletions python/python/benchmarks/test_random_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from datetime import datetime

import lance
import pyarrow.parquet as pq
from lance.tracing import trace_to_chrome

trace_to_chrome(file="/tmp/foo.trace", level="debug")

# This file compares the performance of lance v1 and v2 on the lineitem dataset,
# specifically for random access scans

tab = pq.read_table("~/lineitemsf1.snappy.parquet")
dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", use_legacy_format=True)
dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", use_legacy_format=False)

dsv1 = lance.dataset("/tmp/lineitem.lancev1")
dsv2 = lance.dataset("/tmp/lineitem.lancev2")

start = datetime.now()
dsv1.to_table(filter="l_shipmode = 'FOB'", limit=10000)
duration = (datetime.now() - start).total_seconds()
print(f"V1 query time: {duration}s")

start = datetime.now()
dsv2.to_table(filter="l_shipmode = 'FOB'", limit=10000)
duration = (datetime.now() - start).total_seconds()
print(f"V2 query time: {duration}s")

start = datetime.now()
dsv1.take([1, 40, 100, 130, 200])
duration = (datetime.now() - start).total_seconds()
print(f"V1 query time: {duration}s")

start = datetime.now()
dsv2.take([1, 40, 100, 130, 200])
duration = (datetime.now() - start).total_seconds()
print(f"V2 query time: {duration}s")
7 changes: 3 additions & 4 deletions rust/lance-encoding/src/encodings/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,10 @@ pub fn decoder_from_array_encoding(
decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
}
pb::array_encoding::ArrayEncoding::Binary(binary) => {
let indices_encoding = binary.indices.as_ref().unwrap();
let indices_buffer_desc = binary.indices.as_ref().unwrap();
let (buffer_offset, _) = get_buffer(indices_buffer_desc, buffers);
let bytes_encoding = binary.bytes.as_ref().unwrap();

let indices_scheduler =
decoder_from_array_encoding(indices_encoding, buffers, data_type);
let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);

let offset_type = match data_type {
Expand All @@ -164,9 +163,9 @@ pub fn decoder_from_array_encoding(
};

Box::new(BinaryPageScheduler::new(
indices_scheduler.into(),
bytes_scheduler.into(),
offset_type,
buffer_offset,
binary.null_adjustment,
))
}
Expand Down
88 changes: 42 additions & 46 deletions rust/lance-encoding/src/encodings/physical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use arrow_array::types::UInt64Type;
use arrow_array::{Array, ArrayRef};
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, ScalarBuffer};
use bytes::BytesMut;
use futures::stream::StreamExt;
use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt};
use futures::{future::BoxFuture, FutureExt};

use crate::{
decoder::{PageScheduler, PrimitivePageDecoder},
Expand All @@ -19,10 +18,8 @@ use crate::{
EncodingsIo,
};

use crate::decoder::LogicalPageDecoder;
use crate::encodings::logical::primitive::PrimitiveFieldDecoder;

use arrow_array::{PrimitiveArray, UInt64Array, UInt8Array};
use arrow_buffer::Buffer;
use arrow_schema::DataType;
use lance_core::Result;

Expand Down Expand Up @@ -77,38 +74,28 @@ impl IndicesNormalizer {

#[derive(Debug)]
pub struct BinaryPageScheduler {
indices_scheduler: Arc<dyn PageScheduler>,
bytes_scheduler: Arc<dyn PageScheduler>,
offsets_type: DataType,
buffer_offset: u64,
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved
null_adjustment: u64,
}

impl BinaryPageScheduler {
pub fn new(
indices_scheduler: Arc<dyn PageScheduler>,
bytes_scheduler: Arc<dyn PageScheduler>,
offsets_type: DataType,
buffer_offset: u64,
null_adjustment: u64,
) -> Self {
Self {
indices_scheduler,
bytes_scheduler,
offsets_type,
buffer_offset,
null_adjustment,
}
}
}

impl BinaryPageScheduler {
fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
let mut primitive_wrapper =
PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows);
let drained_task = primitive_wrapper.drain(num_rows)?;
let indices_decode_task = drained_task.task;
indices_decode_task.decode()
}
}

impl PageScheduler for BinaryPageScheduler {
fn schedule_ranges(
&self,
Expand All @@ -120,26 +107,26 @@ impl PageScheduler for BinaryPageScheduler {
// if user wants row range a..b
// Case 1: if a != 0, we need indices a-1..b to decode
// Case 2: if a = 0, we need indices 0..b to decode
let indices_ranges = ranges
// To get the byte ranges we then multiply these by 8
// Then we add the buffer offset to map to the correct buffer in the page,
// since multiple encoding tasks may have been used to create the page
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved
let indices_byte_ranges = ranges
.iter()
.map(|range| {
if range.start != 0 {
(range.start - 1)..(range.end)
(self.buffer_offset + ((range.start - 1) * 8))
..(self.buffer_offset + (range.end * 8))
} else {
0..(range.end)
self.buffer_offset..(self.buffer_offset + (range.end * 8))
}
})
.collect::<Vec<std::ops::Range<u64>>>();

let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();

let mut futures_ordered = indices_ranges
.iter()
.map(|range| {
self.indices_scheduler
.schedule_ranges(&[range.clone()], scheduler, top_level_row)
})
.collect::<FuturesOrdered<_>>();
// We schedule all the indices for decoding together
// This is more efficient compared to scheduling them one by one (reduces speed significantly for random access)
let indices_bytes = scheduler.submit_request(indices_byte_ranges, top_level_row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can combine these into one call and keep indices_scheduler. Just make a single call to indices_scheduler.schedule_ranges (passing in many ranges) instead of many calls to schedule_ranges (each passing in one range).


let ranges = ranges.to_vec();
let copy_scheduler = scheduler.clone();
Expand All @@ -159,22 +146,24 @@ impl PageScheduler for BinaryPageScheduler {
// Cumulative sum: 0, 4 | 8 | 13
// These are the normalized offsets stored in decoded_indices
// Rest of the workflow is continued later in BinaryPageDecoder
let decoded_indices_vec = &indices_bytes.await?;

let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
let mut bytes_ranges = Vec::new();
let mut curr_range_idx = 0;
while let Some(indices_page_decoder) = futures_ordered.next().await {
let decoder = Arc::from(indices_page_decoder?);

// Build and run decode task for offsets
let curr_indices_range = indices_ranges[curr_range_idx].clone();
let curr_row_range = ranges[curr_range_idx].clone();
let indices_num_rows = curr_indices_range.end - curr_indices_range.start;
for (index, curr_row_range) in ranges.iter().enumerate() {
let decoded_indices = &decoded_indices_vec[index];
let decoded_indices = UInt64Array::new(Buffer::from(decoded_indices).into(), None);

let indices = Self::decode_indices(decoder, indices_num_rows)?;
let indices = indices.as_primitive::<UInt64Type>();
let row_start = curr_row_range.start;
let curr_range_len = (curr_row_range.end - row_start) as usize;
let indices = if row_start == 0 {
decoded_indices.slice(0, curr_range_len)
} else {
decoded_indices.slice(0, curr_range_len + 1)
};

let first = if curr_row_range.start == 0 {
let first = if row_start == 0 {
0
} else {
indices_builder
Expand All @@ -188,9 +177,7 @@ impl PageScheduler for BinaryPageScheduler {
bytes_ranges.push(first..last);
}

indices_builder.extend(indices, curr_row_range.start == 0);

curr_range_idx += 1;
indices_builder.extend(&indices, row_start == 0);
}

let (indices, validity) = indices_builder.into_parts();
Expand Down Expand Up @@ -409,7 +396,6 @@ fn get_indices_from_string_arrays(arrays: &[ArrayRef]) -> (ArrayRef, u64) {
}
indices_offset += array.len();
}

(Arc::new(UInt64Array::from(indices)), null_adjustment)
}

Expand Down Expand Up @@ -454,20 +440,30 @@ fn get_bytes_from_string_arrays(arrays: &[ArrayRef]) -> Vec<ArrayRef> {
impl ArrayEncoder for BinaryEncoder {
fn encode(&self, arrays: &[ArrayRef], buffer_index: &mut u32) -> Result<EncodedArray> {
let (index_array, null_adjustment) = get_indices_from_string_arrays(arrays);
let encoded_indices = self.indices_encoder.encode(&[index_array], buffer_index)?;
let encoded_indices = self.indices_encoder.encode(&[index_array], &mut 0)?;

let byte_arrays = get_bytes_from_string_arrays(arrays);
let encoded_bytes = self.bytes_encoder.encode(&byte_arrays, buffer_index)?;
let encoded_bytes = self.bytes_encoder.encode(&byte_arrays, &mut 1)?;

let mut encoded_buffers = encoded_indices.buffers;
encoded_buffers.extend(encoded_bytes.buffers);

for mut buf in encoded_buffers.clone() {
buf.index += *buffer_index;
}

let index = *buffer_index;
*buffer_index += 1;

Ok(EncodedArray {
buffers: encoded_buffers,
encoding: pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::Binary(Box::new(
pb::Binary {
indices: Some(Box::new(encoded_indices.encoding)),
indices: Some(pb::Buffer {
buffer_index: index,
buffer_type: pb::buffer::BufferType::Page as i32,
}),
bytes: Some(Box::new(encoded_bytes.encoding)),
null_adjustment,
},
Expand All @@ -494,7 +490,7 @@ pub mod tests {
use super::get_indices_from_string_arrays;

#[test_log::test(tokio::test)]
async fn test_utf8() {
async fn test_utf8_binary() {
let field = Field::new("", DataType::Utf8, false);
check_round_trip_encoding_random(field, HashMap::new()).await;
}
Expand Down
69 changes: 67 additions & 2 deletions rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ impl ScanScheduler {
let reader = self.object_store.open(path).await?;
let mut file_counter = self.file_counter.lock().unwrap();
let file_index = *file_counter;
let block_size = self.object_store.block_size() as u64;
*file_counter += 1;
Ok(FileScheduler {
reader: reader.into(),
block_size,
root: self.clone(),
file_index,
})
Expand Down Expand Up @@ -243,9 +245,19 @@ impl ScanScheduler {
pub struct FileScheduler {
reader: Arc<dyn Reader>,
root: Arc<ScanScheduler>,
block_size: u64,
file_index: u32,
}

fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
// Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
range2.start <= (range1.end + block_size)
}

fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
range1.start < range2.end && range2.start < range1.end
}

impl FileScheduler {
/// Submit a batch of I/O requests to the reader
///
Expand All @@ -258,8 +270,61 @@ impl FileScheduler {
) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
// The final priority is a combination of the row offset and the file number
let priority = ((self.file_index as u128) << 64) + priority as u128;
self.root
.submit_request(self.reader.clone(), request, priority)

let mut updated_requests = Vec::with_capacity(request.len());

let copy_request = request.clone();
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved

if !request.is_empty() {
let mut curr_interval = request[0].clone();

for req in request.iter().skip(1) {
if is_close_together(&curr_interval, req, self.block_size) {
curr_interval.end = curr_interval.end.max(req.end);
} else {
Comment on lines +279 to +282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this assume the requests are already kind of monotonic? Is that handled anywhere? Should we sort the requests by request.start before doing this?

Copy link
Contributor Author

@raunaks13 raunaks13 Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe requests (ranges) are assumed to be sorted by a few encodings at this point. I don't think this is enforced anywhere though. We should probably make a note of wherever this sorting is assumed.
It may be better to sort the requests in the encoding itself (inside schedule_ranges()), before they are passed to submit_request() since then we won't have to call sort multiple times. In that case it may be better to leave this for another PR?
We could add a debug_assert!() though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Looks like you are coalescing on the request side, and so they are all from the same array. If we put this one a different side, where they might be sourced from multiple decoders, it might be different. Seems fine for now, but wanted to note that seems to be an implicit assumption.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I broke coalescing up into two github issues: "in batch coalescing" which is cheap (we can assume all requests are for a single page of data and they arrive in sorted order) and "out of batch coalescing". I'm not sure we ever want to bother with "out of batch coalescing". You'd need some kind of sorted heap of requests and then you'd have to unsort them after the request is made. You'd also get weird things like what happens if you can coalesce part of a low priority request that's way back in the queue with a high priority request that's almost ready to run. You'd also need to make sure you aren't blocking the I/O threads at any point while you sort your queue. 💀

updated_requests.push(curr_interval);
curr_interval = req.clone();
}
}

updated_requests.push(curr_interval);
}

let copy_updated_requests = updated_requests.clone();

let bytes_vec_fut =
self.root
.submit_request(self.reader.clone(), updated_requests, priority);
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved

let mut updated_index = 0;
let mut final_bytes = Vec::with_capacity(copy_request.len());
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved

async move {
let bytes_vec = bytes_vec_fut.await?;

let mut orig_index = 0;
while (updated_index < copy_updated_requests.len()) && (orig_index < copy_request.len())
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved
{
let updated_range = &copy_updated_requests[updated_index];
let orig_range = &copy_request[orig_index];
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved
let byte_offset = updated_range.start as usize;

if is_overlapping(updated_range, orig_range) {
// Rescale the ranges since they correspond to the entire set of bytes, while
// But we need to slice into a subset of the bytes in a particular index of bytes_vec
let start = orig_range.start as usize - byte_offset;
let end = orig_range.end as usize - byte_offset;

let sliced_range = bytes_vec[updated_index].slice(start..end);
final_bytes.push(sliced_range);
orig_index += 1;
} else {
updated_index += 1;
}
}

Ok(final_bytes)
}
}

/// Submit a single IOP to the reader
Expand Down
Loading