Skip to content

Commit

Permalink
feat: add backpressure to v2 I/O scheduler (#2683)
Browse files Browse the repository at this point in the history
Adds backpressure to the I/O scheduler. The general problem is rather
tricky. If your I/O threads all pause because your I/O buffer is full
and then your decode threads are waiting for queued I/O tasks then you
will have a deadlock.

I/O priority should allow us to avoid this situation but it is something
that will need to be monitored, especially if users want to set really
small I/O buffer sizes. For this reason I haven't made any of the new
settings configurable (except for deadlock detection which we may turn
on for debugging purposes if people seem to have encountered a
deadlock).

One way you can hit this deadlock is to create a file with 10 pages
where each page has 10GB of data. Even a single read will fill up the
I/O buffer. We submit the reads in priority order but we have 8 I/O
threads and so they race to grab the permits.

As a result I've added a much needed option to split primitive arrays
into multiple pages if we are given huge chunks of data. The splitting
algorithm is not perfect and could also use some work (perhaps we can do
a sort of "binary splitting" where we continuously split the largest
chunk in half until all chunks are below a given size)

At the moment I think things are "safe enough" that this PR prevents
more problems (avoids OOM) than it introduces (deadlock in esoteric
cases).
  • Loading branch information
westonpace authored Aug 5, 2024
1 parent eb7b790 commit 508c1a1
Show file tree
Hide file tree
Showing 26 changed files with 753 additions and 177 deletions.
18 changes: 15 additions & 3 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use arrow::pyarrow::PyArrowType;
use arrow_array::{RecordBatch, RecordBatchReader, UInt32Array};
use arrow_schema::Schema as ArrowSchema;
use futures::stream::StreamExt;
use lance::io::{ObjectStore, RecordBatchStream};
use lance::{
io::{ObjectStore, RecordBatchStream},
utils::default_deadlock_prevention_timeout,
};
use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression};
use lance_file::{
v2::{
Expand All @@ -27,7 +30,10 @@ use lance_file::{
},
version::LanceFileVersion,
};
use lance_io::{scheduler::ScanScheduler, ReadBatchParams};
use lance_io::{
scheduler::{ScanScheduler, SchedulerConfig},
ReadBatchParams,
};
use object_store::path::Path;
use pyo3::{
exceptions::{PyIOError, PyRuntimeError, PyValueError},
Expand Down Expand Up @@ -281,7 +287,13 @@ pub struct LanceFileReader {
impl LanceFileReader {
async fn open(uri_or_path: String) -> PyResult<Self> {
let (object_store, path) = object_store_from_uri_or_path(uri_or_path).await?;
let scheduler = ScanScheduler::new(Arc::new(object_store));
let scheduler = ScanScheduler::new(
Arc::new(object_store),
SchedulerConfig {
io_buffer_size_bytes: 2 * 1024 * 1024 * 1024,
deadlock_prevention_timeout: default_deadlock_prevention_timeout(),
},
);
let file = scheduler.open_file(&path).await.infer_error()?;
let inner = FileReader::try_open(file, None, DecoderMiddlewareChain::default())
.await
Expand Down
22 changes: 7 additions & 15 deletions rust/lance-encoding-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use lance_core::{
};
use lance_encoding::{
decoder::{ColumnInfoIter, DecoderMiddlewareChainCursor, FieldDecoderStrategy, FieldScheduler},
encoder::{ColumnIndexSequence, CoreFieldEncodingStrategy, FieldEncodingStrategy},
encoder::{
ColumnIndexSequence, CoreFieldEncodingStrategy, EncodingOptions, FieldEncodingStrategy,
},
encodings::physical::FileBuffers,
};
use zone::{extract_zone_info, UnloadedPushdown, ZoneMapsFieldEncoder, ZoneMapsFieldScheduler};
Expand Down Expand Up @@ -170,9 +172,7 @@ impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
encoding_strategy_root: &dyn FieldEncodingStrategy,
field: &lance_core::datatypes::Field,
column_index: &mut ColumnIndexSequence,
cache_bytes_per_column: u64,
keep_original_array: bool,
config: &std::collections::HashMap<String, String>,
options: &EncodingOptions,
) -> lance_core::Result<Box<dyn lance_encoding::encoder::FieldEncoder>> {
let data_type = field.data_type();
if data_type.is_primitive()
Expand All @@ -186,24 +186,16 @@ impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
&self.core,
field,
column_index,
cache_bytes_per_column,
keep_original_array,
config,
options,
)?;
Ok(Box::new(ZoneMapsFieldEncoder::try_new(
inner_encoder,
data_type.clone(),
self.rows_per_map,
)?))
} else {
self.core.create_field_encoder(
encoding_strategy_root,
field,
column_index,
cache_bytes_per_column,
keep_original_array,
config,
)
self.core
.create_field_encoder(encoding_strategy_root, field, column_index, options)
}
}
}
15 changes: 12 additions & 3 deletions rust/lance-encoding-datafusion/src/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use lance_encoding::{
},
encoder::{
encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodedBuffer, EncodedColumn,
FieldEncoder,
EncodingOptions, FieldEncoder,
},
format::pb,
EncodingsIo,
Expand Down Expand Up @@ -496,8 +496,17 @@ impl ZoneMapsFieldEncoder {
let zone_maps =
RecordBatch::try_new(Arc::new(zone_map_schema), vec![mins, maxes, null_counts])?;
let encoding_strategy = CoreFieldEncodingStrategy::default();
let encoded_zone_maps =
encode_batch(&zone_maps, Arc::new(schema), &encoding_strategy, u64::MAX).await?;
let encoded_zone_maps = encode_batch(
&zone_maps,
Arc::new(schema),
&encoding_strategy,
&EncodingOptions {
cache_bytes_per_column: u64::MAX,
max_page_bytes: u64::MAX,
keep_original_array: true,
},
)
.await?;
let zone_maps_buffer = encoded_zone_maps.try_to_mini_lance()?;

Ok(EncodedBuffer {
Expand Down
16 changes: 11 additions & 5 deletions rust/lance-encoding/benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow_select::take::take;
use criterion::{criterion_group, criterion_main, Criterion};
use lance_encoding::{
decoder::{DecoderMiddlewareChain, FilterExpression},
encoder::{encode_batch, CoreFieldEncodingStrategy},
encoder::{encode_batch, CoreFieldEncodingStrategy, EncodingOptions},
};

use rand::Rng;
Expand Down Expand Up @@ -54,6 +54,12 @@ const PRIMITIVE_TYPES_FOR_FSL: &[DataType] = &[
DataType::Float64,
];

const ENCODING_OPTIONS: EncodingOptions = EncodingOptions {
cache_bytes_per_column: 1024 * 1024,
max_page_bytes: 32 * 1024 * 1024,
keep_original_array: true,
};

fn bench_decode(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_primitive");
Expand All @@ -72,7 +78,7 @@ fn bench_decode(c: &mut Criterion) {
&data,
lance_schema,
&encoding_strategy,
1024 * 1024,
&ENCODING_OPTIONS,
))
.unwrap();
let func_name = format!("{:?}", data_type).to_lowercase();
Expand Down Expand Up @@ -111,7 +117,7 @@ fn bench_decode_fsl(c: &mut Criterion) {
&data,
lance_schema,
&encoding_strategy,
1024 * 1024,
&ENCODING_OPTIONS,
))
.unwrap();
let func_name = format!("{:?}", data_type).to_lowercase();
Expand Down Expand Up @@ -167,7 +173,7 @@ fn bench_decode_str_with_dict_encoding(c: &mut Criterion) {
&data,
lance_schema,
&encoding_strategy,
1024 * 1024,
&ENCODING_OPTIONS,
))
.unwrap();
let func_name = format!("{:?}", data_type).to_lowercase();
Expand Down Expand Up @@ -236,7 +242,7 @@ fn bench_decode_packed_struct(c: &mut Criterion) {
&data,
lance_schema,
&encoding_strategy,
1024 * 1024,
&ENCODING_OPTIONS,
))
.unwrap();

Expand Down
28 changes: 24 additions & 4 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
//! relation to the way the data is stored.

use std::collections::VecDeque;
use std::sync::Once;
use std::{ops::Range, sync::Arc};

use arrow_array::cast::AsArray;
Expand All @@ -224,7 +225,7 @@ use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt};
use lance_arrow::DataTypeExt;
use lance_core::datatypes::{Field, Schema};
use log::trace;
use log::{trace, warn};
use snafu::{location, Location};
use tokio::sync::mpsc::{self, unbounded_channel};

Expand All @@ -240,6 +241,9 @@ use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb;
use crate::{BufferScheduler, EncodingsIo};

// If users are getting batches over 10MiB large then it's time to reduce the batch size
const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;

/// Metadata describing a page in a file
///
/// This is typically created by reading the metadata section of a Lance file
Expand Down Expand Up @@ -1050,6 +1054,7 @@ pub struct BatchDecodeStream {
rows_scheduled: u64,
rows_drained: u64,
scheduler_exhuasted: bool,
emitted_batch_size_warning: Arc<Once>,
}

impl BatchDecodeStream {
Expand Down Expand Up @@ -1077,6 +1082,7 @@ impl BatchDecodeStream {
rows_scheduled: 0,
rows_drained: 0,
scheduler_exhuasted: false,
emitted_batch_size_warning: Arc::new(Once::new()),
}
}

Expand Down Expand Up @@ -1164,10 +1170,23 @@ impl BatchDecodeStream {
}

#[instrument(level = "debug", skip_all)]
fn task_to_batch(task: NextDecodeTask) -> Result<RecordBatch> {
fn task_to_batch(
task: NextDecodeTask,
emitted_batch_size_warning: Arc<Once>,
) -> Result<RecordBatch> {
let struct_arr = task.task.decode();
match struct_arr {
Ok(struct_arr) => Ok(RecordBatch::from(struct_arr.as_struct())),
Ok(struct_arr) => {
let batch = RecordBatch::from(struct_arr.as_struct());
let size_bytes = batch.get_array_memory_size() as u64;
if size_bytes > BATCH_SIZE_BYTES_WARNING {
emitted_batch_size_warning.call_once(|| {
let size_mb = size_bytes / 1024 / 1024;
warn!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
});
}
Ok(batch)
}
Err(e) => {
let e = Error::Internal {
message: format!("Error decoding batch: {}", e),
Expand All @@ -1183,9 +1202,10 @@ impl BatchDecodeStream {
let next_task = slf.next_batch_task().await;
let next_task = next_task.transpose().map(|next_task| {
let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
let task = tokio::spawn(async move {
let next_task = next_task?;
Self::task_to_batch(next_task)
Self::task_to_batch(next_task, emitted_batch_size_warning)
});
(task, num_rows)
});
Expand Down
Loading

0 comments on commit 508c1a1

Please sign in to comment.