Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: recluster disable sort spill #15490

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ pub trait TableContext: Send + Sync {
fn set_cacheable(&self, cacheable: bool);
fn get_can_scan_from_agg_index(&self) -> bool;
fn set_can_scan_from_agg_index(&self, enable: bool);
fn get_enable_sort_spill(&self) -> bool;
fn set_enable_sort_spill(&self, enable: bool);
fn set_compaction_num_block_hint(&self, hint: u64);
fn get_compaction_num_block_hint(&self) -> u64;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl PipelineBuilder {
})
.collect();

self.ctx.set_enable_sort_spill(false);
let sort_pipeline_builder =
SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs))
.with_partial_block_size(partial_block_size)
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/pipelines/builders/builder_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ impl SortPipelineBuilder {
}

fn get_memory_settings(&self, num_threads: usize) -> Result<(usize, usize)> {
let enable_sort_spill = self.ctx.get_enable_sort_spill();
if !enable_sort_spill {
return Ok((0, 0));
}

let settings = self.ctx.get_settings();
let memory_ratio = settings.get_sort_spilling_memory_ratio()?;
let bytes_limit_per_proc = settings.get_sort_spilling_bytes_threshold_per_proc()?;
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,16 @@ impl TableContext for QueryContext {
.store(enable, Ordering::Release);
}

fn get_enable_sort_spill(&self) -> bool {
self.shared.enable_sort_spill.load(Ordering::Acquire)
}

fn set_enable_sort_spill(&self, enable: bool) {
self.shared
.enable_sort_spill
.store(enable, Ordering::Release);
}

// get a hint at the number of blocks that need to be compacted.
fn get_compaction_num_block_hint(&self) -> u64 {
self.shared
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) cacheable: Arc<AtomicBool>,
pub(in crate::sessions) can_scan_from_agg_index: Arc<AtomicBool>,
pub(in crate::sessions) num_fragmented_block_hint: Arc<AtomicU64>,
pub(in crate::sessions) enable_sort_spill: Arc<AtomicBool>,
// Status info.
pub(in crate::sessions) status: Arc<RwLock<String>>,

Expand Down Expand Up @@ -166,6 +167,7 @@ impl QueryContextShared {
cacheable: Arc::new(AtomicBool::new(true)),
can_scan_from_agg_index: Arc::new(AtomicBool::new(true)),
num_fragmented_block_hint: Arc::new(AtomicU64::new(0)),
enable_sort_spill: Arc::new(AtomicBool::new(true)),
status: Arc::new(RwLock::new("null".to_string())),
user_agent: Arc::new(RwLock::new("null".to_string())),
materialized_cte_tables: Arc::new(Default::default()),
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,13 @@ impl TableContext for CtxDelegation {
todo!()
}

fn get_enable_sort_spill(&self) -> bool {
todo!()
}
fn set_enable_sort_spill(&self, _enable: bool) {
todo!()
}

fn attach_query_str(&self, _kind: QueryKind, _query: String) {}

fn get_query_str(&self) -> String {
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,13 @@ impl TableContext for CtxDelegation {
todo!()
}

fn get_enable_sort_spill(&self) -> bool {
todo!()
}
fn set_enable_sort_spill(&self, _enable: bool) {
todo!()
}

fn attach_query_str(&self, _kind: QueryKind, _query: String) {}

fn get_query_str(&self) -> String {
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ impl DefaultSettings {
let max_memory_usage = Self::max_memory_usage()?;
// The sort merge consumes more than twice as much memory,
// so the block size is set relatively conservatively here.
let recluster_block_size = max_memory_usage * 35 / 100;
let recluster_block_size = max_memory_usage * 32 / 100;
Ok(recluster_block_size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ReclusterMutator {

let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?;
let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize;
let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 40 / 100);
let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 35 / 100);

let max_blocks_num = std::cmp::max(
memory_threshold / self.block_thresholds.max_bytes_per_block,
Expand Down
2 changes: 1 addition & 1 deletion tests/suites/1_stateful/02_query/02_0006_set_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)

with NativeClient(name="client1>") as client1:
# TODO: Enable this test after enable new queries executor
# TODO: Enable this test after enable new queries executor
client1.expect(prompt)
# client1.expect("")
#
Expand Down
Loading