Skip to content

Commit

Permalink
feat: allow multiple incremental commits in optimize
Browse files Browse the repository at this point in the history
Currently "optimize" executes the whole plan in one commit, which might
fail. The larger the table, the more likely it is to fail and the more
expensive the failure is.

Add an option in OptimizeBuilder that allows specifying a commit
interval. If that is provided, the plan executor will periodically
commit the accumulated actions.
  • Loading branch information
kvap authored and rtyler committed Sep 19, 2023
1 parent 9579b04 commit fae39b1
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 104 deletions.
231 changes: 136 additions & 95 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -172,6 +172,7 @@ pub struct OptimizeBuilder<'a> {
max_spill_size: usize,
/// Optimize type
optimize_type: OptimizeType,
min_commit_interval: Option<Duration>,
}

impl<'a> OptimizeBuilder<'a> {
Expand All @@ -188,6 +189,7 @@ impl<'a> OptimizeBuilder<'a> {
max_concurrent_tasks: num_cpus::get(),
max_spill_size: 20 * 1024 * 1024 * 2014, // 20 GB.
optimize_type: OptimizeType::Compact,
min_commit_interval: None,
}
}

Expand Down Expand Up @@ -241,6 +243,12 @@ impl<'a> OptimizeBuilder<'a> {
self.max_spill_size = max_spill_size;
self
}

/// Min commit interval
pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self {
self.min_commit_interval = Some(min_commit_interval);
self
}
}

impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
Expand All @@ -263,10 +271,16 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.filters,
this.target_size.to_owned(),
writer_properties,
this.max_concurrent_tasks,
this.max_spill_size,
)?;
let metrics = plan.execute(this.store.clone(), &this.snapshot).await?;
let metrics = plan
.execute(
this.store.clone(),
&this.snapshot,
this.max_concurrent_tasks,
this.max_spill_size,
this.min_commit_interval,
)
.await?;
let mut table = DeltaTable::new_with_state(this.store, this.snapshot);
table.update().await?;
Ok((table, metrics))
Expand Down Expand Up @@ -341,12 +355,6 @@ pub struct MergePlan {
task_parameters: Arc<MergeTaskParameters>,
/// Version of the table at beginning of optimization. Used for conflict resolution.
read_table_version: i64,
/// Whether to preserve insertion order within files
/// Max number of concurrent tasks
max_concurrent_tasks: usize,
#[allow(dead_code)]
/// Maximum number of bytes that are allowed to spill to disk
max_spill_size: usize,
}

/// Parameters passed to individual merge tasks
Expand Down Expand Up @@ -576,54 +584,51 @@ impl MergePlan {
mut self,
object_store: ObjectStoreRef,
snapshot: &DeltaTableState,
max_concurrent_tasks: usize,
#[allow(unused_variables)] // used behind a feature flag
max_spill_size: usize,
min_commit_interval: Option<Duration>,
) -> Result<Metrics, DeltaTableError> {
let mut actions = vec![];

// Need to move metrics and operations out of self, so we can use self in the stream
let mut metrics = std::mem::take(&mut self.metrics);

let operations = std::mem::take(&mut self.operations);

match operations {
OptimizeOperations::Compact(bins) => {
futures::stream::iter(bins)
.flat_map(|(partition, bins)| {
futures::stream::iter(bins).map(move |bin| (partition.clone(), bin))
})
.map(|(partition, files)| {
let object_store_ref = object_store.clone();
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader =
ParquetObjectReader::new(object_store_ref, file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.boxed();

let rewrite_result = tokio::task::spawn(Self::rewrite_files(
self.task_parameters.clone(),
partition,
files,
object_store.clone(),
futures::future::ready(Ok(batch_stream)),
));
util::flatten_join_error(rewrite_result)
})
.buffer_unordered(self.max_concurrent_tasks)
.try_for_each(|(partial_actions, partial_metrics)| {
debug!("Recording metrics for a completed partition");
actions.extend(partial_actions);
metrics.add(&partial_metrics);
async { Ok(()) }
})
.await?;
}
let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
.flat_map(|(partition, bins)| {
futures::stream::iter(bins).map(move |bin| (partition.clone(), bin))
})
.map(|(partition, files)| {
debug!(
"merging a group of {} files in partition {:?}",
files.len(),
partition,
);
for file in files.iter() {
debug!(" file {}", file.location);
}
let object_store_ref = object_store.clone();
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store_ref, file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.boxed();

let rewrite_result = tokio::task::spawn(Self::rewrite_files(
self.task_parameters.clone(),
partition,
files,
object_store.clone(),
futures::future::ready(Ok(batch_stream)),
));
util::flatten_join_error(rewrite_result)
})
.boxed(),
OptimizeOperations::ZOrder(zorder_columns, bins) => {
#[cfg(not(feature = "datafusion"))]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
Expand All @@ -639,63 +644,103 @@ impl MergePlan {
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
object_store.clone(),
self.max_spill_size,
max_spill_size,
)?);
let task_parameters = self.task_parameters.clone();
let object_store = object_store.clone();
futures::stream::iter(bins)
.map(|(partition, files)| {
.map(move |(partition, files)| {
let batch_stream = Self::read_zorder(files.clone(), exec_context.clone());

let object_store = object_store.clone();

let rewrite_result = tokio::task::spawn(Self::rewrite_files(
self.task_parameters.clone(),
task_parameters.clone(),
partition,
files,
object_store,
batch_stream,
));
util::flatten_join_error(rewrite_result)
})
.buffer_unordered(self.max_concurrent_tasks)
.try_for_each(|(partial_actions, partial_metrics)| {
debug!("Recording metrics for a completed partition");
actions.extend(partial_actions);
metrics.add(&partial_metrics);
async { Ok(()) }
})
.await?;
.boxed()
}
}
};

metrics.preserve_insertion_order = true;
if metrics.num_files_added == 0 {
metrics.files_added.min = 0;
}
if metrics.num_files_removed == 0 {
metrics.files_removed.min = 0;
}
let mut stream = stream.buffer_unordered(max_concurrent_tasks);

let mut table = DeltaTable::new_with_state(object_store.clone(), snapshot.clone());

// TODO: Check for remove actions on optimized partitions. If a
// optimized partition was updated then abort the commit. Requires (#593).
if !actions.is_empty() {
let mut metadata = Map::new();
metadata.insert("readVersion".to_owned(), self.read_table_version.into());
let maybe_map_metrics = serde_json::to_value(metrics.clone());
if let Ok(map) = maybe_map_metrics {
metadata.insert("operationMetrics".to_owned(), map);
// Actions buffered so far. These will be flushed either at the end
// or when we reach the commit interval.
let mut actions = vec![];

// Each time we commit, we'll reset buffered_metrics to orig_metrics.
let orig_metrics = std::mem::take(&mut self.metrics);
let mut buffered_metrics = orig_metrics.clone();
let mut total_metrics = orig_metrics.clone();

let mut last_commit = Instant::now();
loop {
let next = stream.next().await.transpose()?;

let end = next.is_none();

if let Some((partial_actions, partial_metrics)) = next {
debug!("Recording metrics for a completed partition");
actions.extend(partial_actions);
buffered_metrics.add(&partial_metrics);
total_metrics.add(&partial_metrics);
}

commit(
object_store.as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
snapshot,
Some(metadata),
)
.await?;
let now = Instant::now();
let mature = match min_commit_interval {
None => false,
Some(i) => now.duration_since(last_commit) > i,
};
if !actions.is_empty() && (mature || end) {
let actions = std::mem::take(&mut actions);
last_commit = now;

buffered_metrics.preserve_insertion_order = true;
let mut metadata = Map::new();
metadata.insert("readVersion".to_owned(), self.read_table_version.into());
let maybe_map_metrics = serde_json::to_value(std::mem::replace(
&mut buffered_metrics,
orig_metrics.clone(),
));
if let Ok(map) = maybe_map_metrics {
metadata.insert("operationMetrics".to_owned(), map);
}

table.update_incremental(None).await?;
debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
commit(
table.object_store().as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
table.get_state(),
Some(metadata),
)
.await?;
}

if end {
break;
}
}

total_metrics.preserve_insertion_order = true;
if total_metrics.num_files_added == 0 {
total_metrics.files_added.min = 0;
}
if total_metrics.num_files_removed == 0 {
total_metrics.files_removed.min = 0;
}

Ok(metrics)
Ok(total_metrics)
}
}

Expand Down Expand Up @@ -727,8 +772,6 @@ pub fn create_merge_plan(
filters: &[PartitionFilter<'_, &str>],
target_size: Option<i64>,
writer_properties: WriterProperties,
max_concurrent_tasks: usize,
max_spill_size: usize,
) -> Result<MergePlan, DeltaTableError> {
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());

Expand Down Expand Up @@ -767,8 +810,6 @@ pub fn create_merge_plan(
writer_properties,
}),
read_table_version: snapshot.version(),
max_concurrent_tasks,
max_spill_size,
})
}

Expand Down
Loading

0 comments on commit fae39b1

Please sign in to comment.