Skip to content

Commit

Permalink
Coalesce ids before executing take when using TakeExec. This reduces …
Browse files Browse the repository at this point in the history
…the number of times we call take which has a number of performance benefits.
  • Loading branch information
westonpace committed Aug 5, 2024
1 parent 712405e commit b39a05b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 4 deletions.
30 changes: 28 additions & 2 deletions python/python/benchmarks/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def create_table(num_rows, offset) -> pa.Table:
"vector": vectors,
"filterable": filterable,
"category": categories,
"category_no_index": categories,
"genres": genres,
}
)
Expand All @@ -77,9 +78,11 @@ def create_base_dataset(data_dir: Path) -> lance.LanceDataset:
rows_remaining -= next_batch_length
table = create_table(next_batch_length, offset)
if offset == 0:
dataset = lance.write_dataset(table, tmp_path)
dataset = lance.write_dataset(table, tmp_path, use_legacy_format=False)
else:
dataset = lance.write_dataset(table, tmp_path, mode="append")
dataset = lance.write_dataset(
table, tmp_path, mode="append", use_legacy_format=False
)
offset += next_batch_length

dataset.create_index(
Expand Down Expand Up @@ -479,3 +482,26 @@ def test_label_list_index_prefilter(test_dataset, benchmark, filter: str):
prefilter=True,
filter=filter,
)


@pytest.mark.benchmark(group="late_materialization")
@pytest.mark.parametrize(
"use_index",
(False, True),
ids=["no_index", "with_index"],
)
def test_late_materialization(test_dataset, benchmark, use_index):
column = "category" if use_index else "category_no_index"
print(
test_dataset.scanner(
columns=["vector"],
filter=f"{column} = 0",
batch_size=32,
).explain_plan(True)
)
benchmark(
test_dataset.to_table,
columns=["vector"],
filter=f"{column} = 0",
batch_size=32,
)
12 changes: 12 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import lance
import lance.fragment
import numpy as np
import pandas as pd
import pandas.testing as tm
import polars as pl
Expand Down Expand Up @@ -1974,3 +1975,14 @@ def test_v2_dataset(tmp_path: Path):

fragment = list(dataset.get_fragments())[0]
assert "minor_version: 3" not in format_fragment(fragment.metadata, dataset)


def test_late_materialization_batch_size(tmp_path: Path):
table = pa.table({"filter": np.arange(32 * 32), "values": np.arange(32 * 32)})
dataset = lance.write_dataset(
table, tmp_path, use_legacy_format=False, max_rows_per_file=10000
)
for batch in dataset.to_batches(
columns=["values"], filter="filter % 2 == 0", batch_size=32
):
assert batch.num_rows == 32
8 changes: 7 additions & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,13 @@ impl DecodeBatchScheduler {
mut schedule_action: impl FnMut(Result<DecoderMessage>),
) {
let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
trace!("Scheduling ranges {:?} ({} rows)", ranges, rows_requested);
trace!(
"Scheduling {} ranges across {}..{} ({} rows)",
ranges.len(),
ranges.first().unwrap().start,
ranges.last().unwrap().end,
rows_requested
);

let mut context = SchedulerContext::new(io);
let maybe_root_job = self.root_scheduler.schedule_ranges(ranges, filter);
Expand Down
4 changes: 3 additions & 1 deletion rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use async_recursion::async_recursion;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::logical_expr::{lit, Expr};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::expressions;
use datafusion::physical_plan::projection::ProjectionExec as DFProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
Expand Down Expand Up @@ -1584,9 +1585,10 @@ impl Scanner {
projection: &Schema,
batch_readahead: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
let coalesced = Arc::new(CoalesceBatchesExec::new(input, self.get_batch_size()));
Ok(Arc::new(TakeExec::try_new(
self.dataset.clone(),
input,
coalesced,
Arc::new(projection.clone()),
batch_readahead,
)?))
Expand Down

0 comments on commit b39a05b

Please sign in to comment.