Skip to content

Commit

Permalink
feat: coalesce scheduling of reads to speed up random access (#2636)
Browse files Browse the repository at this point in the history
Should fix #2629, addresses #1959 
1. Earlier on randomly accessing rows, each request for a row was being
scheduled separately, which increased overhead, especially on large
datasets. This PR coalesces take scheduling when requests are within
`block_size` distance from each other. The block size is determined
based on the system.
2. The binary scheduler was also scheduling decoding of all indices
individually. This updates the binary scheduler so that it schedules all
offsets at once. These are then processed to determine which bytes to
decode like before.
3. A script we can use to compare v1 vs v2 performance is added as
`test_random_access.py`.

Specifically, on the lineitem dataset (same file from the issue above):
- v1 query time: `0.12s`
- v2 query time (before): `2.8s`.
- v2 query time (after (1)): `0.54s`. 
- v2 query time (after (1) and (2)): `0.02s`.
  • Loading branch information
raunaks13 authored Jul 29, 2024
1 parent aa92730 commit 93c81d2
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 39 deletions.
40 changes: 40 additions & 0 deletions python/python/benchmarks/test_random_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from datetime import datetime

import lance
import pyarrow.parquet as pq
from lance.tracing import trace_to_chrome

trace_to_chrome(file="/tmp/foo.trace", level="debug")

# This file compares the performance of lance v1 and v2 on the lineitem dataset,
# specifically for random access scans

tab = pq.read_table("~/lineitemsf1.snappy.parquet")
dsv1 = lance.write_dataset(tab, "/tmp/lineitem.lancev1", use_legacy_format=True)
dsv2 = lance.write_dataset(tab, "/tmp/lineitem.lancev2", use_legacy_format=False)

dsv1 = lance.dataset("/tmp/lineitem.lancev1")
dsv2 = lance.dataset("/tmp/lineitem.lancev2")

start = datetime.now()
dsv1.to_table(filter="l_shipmode = 'FOB'", limit=10000)
duration = (datetime.now() - start).total_seconds()
print(f"V1 query time: {duration}s")

start = datetime.now()
dsv2.to_table(filter="l_shipmode = 'FOB'", limit=10000)
duration = (datetime.now() - start).total_seconds()
print(f"V2 query time: {duration}s")

start = datetime.now()
dsv1.take([1, 40, 100, 130, 200])
duration = (datetime.now() - start).total_seconds()
print(f"V1 query time: {duration}s")

start = datetime.now()
dsv2.take([1, 40, 100, 130, 200])
duration = (datetime.now() - start).total_seconds()
print(f"V2 query time: {duration}s")
74 changes: 37 additions & 37 deletions rust/lance-encoding/src/encodings/physical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use arrow_array::{Array, ArrayRef};
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, ScalarBuffer};
use futures::stream::StreamExt;
use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt};
use futures::{future::BoxFuture, FutureExt};

use crate::decoder::LogicalPageDecoder;
use crate::encodings::logical::primitive::PrimitiveFieldDecoder;

use crate::buffer::LanceBuffer;
use crate::data::{
Expand All @@ -22,9 +24,6 @@ use crate::{
EncodingsIo,
};

use crate::decoder::LogicalPageDecoder;
use crate::encodings::logical::primitive::PrimitiveFieldDecoder;

use arrow_array::{PrimitiveArray, UInt64Array, UInt8Array};
use arrow_schema::DataType;
use lance_core::Result;
Expand Down Expand Up @@ -100,9 +99,7 @@ impl BinaryPageScheduler {
null_adjustment,
}
}
}

impl BinaryPageScheduler {
fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
let mut primitive_wrapper =
PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false);
Expand All @@ -127,22 +124,21 @@ impl PageScheduler for BinaryPageScheduler {
.iter()
.map(|range| {
if range.start != 0 {
(range.start - 1)..(range.end)
(range.start - 1)..range.end
} else {
0..(range.end)
0..range.end
}
})
.collect::<Vec<std::ops::Range<u64>>>();

let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
// We schedule all the indices for decoding together
// This is more efficient compared to scheduling them one by one (reduces speed significantly for random access)
let indices_page_decoder =
self.indices_scheduler
.schedule_ranges(&indices_ranges, scheduler, top_level_row);

let mut futures_ordered = indices_ranges
.iter()
.map(|range| {
self.indices_scheduler
.schedule_ranges(&[range.clone()], scheduler, top_level_row)
})
.collect::<FuturesOrdered<_>>();
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::<u64>();

let ranges = ranges.to_vec();
let copy_scheduler = scheduler.clone();
Expand All @@ -162,38 +158,43 @@ impl PageScheduler for BinaryPageScheduler {
// Cumulative sum: 0, 4 | 8 | 13
// These are the normalized offsets stored in decoded_indices
// Rest of the workflow is continued later in BinaryPageDecoder
let indices_decoder = Arc::from(indices_page_decoder.await?);
let indices = Self::decode_indices(indices_decoder, indices_num_rows)?;
let decoded_indices = indices.as_primitive::<UInt64Type>();

let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
let mut bytes_ranges = Vec::new();
let mut curr_range_idx = 0;
while let Some(indices_page_decoder) = futures_ordered.next().await {
let decoder = Arc::from(indices_page_decoder?);
let mut curr_offset_index = 0;

// Build and run decode task for offsets
let curr_indices_range = indices_ranges[curr_range_idx].clone();
let curr_row_range = ranges[curr_range_idx].clone();
let indices_num_rows = curr_indices_range.end - curr_indices_range.start;
for curr_row_range in ranges.iter() {
let row_start = curr_row_range.start;
let curr_range_len = (curr_row_range.end - row_start) as usize;

let indices = Self::decode_indices(decoder, indices_num_rows)?;
let indices = indices.as_primitive::<UInt64Type>();
let curr_indices;

let first = if curr_row_range.start == 0 {
if row_start == 0 {
curr_indices = decoded_indices.slice(0, curr_range_len);
curr_offset_index = curr_range_len;
} else {
curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1);
curr_offset_index += curr_range_len + 1;
}

let first = if row_start == 0 {
0
} else {
indices_builder
.normalize(*indices.values().first().unwrap())
.normalize(*curr_indices.values().first().unwrap())
.1
};
let last = indices_builder
.normalize(*indices.values().last().unwrap())
.normalize(*curr_indices.values().last().unwrap())
.1;
if first != last {
bytes_ranges.push(first..last);
}

indices_builder.extend(indices, curr_row_range.start == 0);

curr_range_idx += 1;
indices_builder.extend(&curr_indices, row_start == 0);
}

let (indices, validity) = indices_builder.into_parts();
Expand Down Expand Up @@ -411,7 +412,6 @@ fn get_indices_from_string_arrays(arrays: &[ArrayRef]) -> (ArrayRef, u64) {
}
indices_offset += array.len();
}

(Arc::new(UInt64Array::from(indices)), null_adjustment)
}

Expand Down Expand Up @@ -496,7 +496,7 @@ pub mod tests {
use super::get_indices_from_string_arrays;

#[test_log::test(tokio::test)]
async fn test_utf8() {
async fn test_utf8_binary() {
let field = Field::new("", DataType::Utf8, false);
check_round_trip_encoding_random(field, HashMap::new()).await;
}
Expand Down Expand Up @@ -570,14 +570,14 @@ pub mod tests {
}

#[test_log::test(tokio::test)]
async fn test_simple_utf8() {
let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
async fn test_simple_utf8_binary() {
let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);

let test_cases = TestCases::default()
.with_range(0..2)
.with_range(0..3)
.with_range(1..3)
.with_indices(vec![1, 3]);
.with_indices(vec![0, 1, 3, 4]);
check_round_trip_encoding_of_data(
vec![Arc::new(string_array)],
&test_cases,
Expand Down
64 changes: 62 additions & 2 deletions rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ impl ScanScheduler {
let reader = self.object_store.open(path).await?;
let mut file_counter = self.file_counter.lock().unwrap();
let file_index = *file_counter;
let block_size = self.object_store.block_size() as u64;
*file_counter += 1;
Ok(FileScheduler {
reader: reader.into(),
block_size,
root: self.clone(),
file_index,
})
Expand Down Expand Up @@ -243,9 +245,19 @@ impl ScanScheduler {
pub struct FileScheduler {
reader: Arc<dyn Reader>,
root: Arc<ScanScheduler>,
block_size: u64,
file_index: u32,
}

fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
// Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
range2.start <= (range1.end + block_size)
}

fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
range1.start < range2.end && range2.start < range1.end
}

impl FileScheduler {
/// Submit a batch of I/O requests to the reader
///
Expand All @@ -258,8 +270,56 @@ impl FileScheduler {
) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
// The final priority is a combination of the row offset and the file number
let priority = ((self.file_index as u128) << 64) + priority as u128;
self.root
.submit_request(self.reader.clone(), request, priority)

let mut updated_requests = Vec::with_capacity(request.len());

if !request.is_empty() {
let mut curr_interval = request[0].clone();

for req in request.iter().skip(1) {
if is_close_together(&curr_interval, req, self.block_size) {
curr_interval.end = curr_interval.end.max(req.end);
} else {
updated_requests.push(curr_interval);
curr_interval = req.clone();
}
}

updated_requests.push(curr_interval);
}

let bytes_vec_fut =
self.root
.submit_request(self.reader.clone(), updated_requests.clone(), priority);

let mut updated_index = 0;
let mut final_bytes = Vec::with_capacity(request.len());

async move {
let bytes_vec = bytes_vec_fut.await?;

let mut orig_index = 0;
while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
let updated_range = &updated_requests[updated_index];
let orig_range = &request[orig_index];
let byte_offset = updated_range.start as usize;

if is_overlapping(updated_range, orig_range) {
// Rescale the ranges since they correspond to the entire set of bytes, while
// But we need to slice into a subset of the bytes in a particular index of bytes_vec
let start = orig_range.start as usize - byte_offset;
let end = orig_range.end as usize - byte_offset;

let sliced_range = bytes_vec[updated_index].slice(start..end);
final_bytes.push(sliced_range);
orig_index += 1;
} else {
updated_index += 1;
}
}

Ok(final_bytes)
}
}

/// Submit a single IOP to the reader
Expand Down

0 comments on commit 93c81d2

Please sign in to comment.