Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimizer rule for windowed sort #4874

Merged
merged 37 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4338ecc
basic impl
waynexia Oct 16, 2024
1f95472
Merge branch 'main' into windowed-sort-rule
waynexia Oct 17, 2024
a9de8ea
Merge branch 'main' into windowed-sort-rule
waynexia Oct 17, 2024
97cb043
implement physical rule
waynexia Oct 17, 2024
27c2dbc
feat: install windowed sort physical rule and optimize partition ranges
waynexia Oct 17, 2024
115efff
Merge branch 'main' into windowed-sort-rule
waynexia Oct 17, 2024
1a24bc1
add logs and sqlness test
waynexia Oct 21, 2024
4d33779
Merge branch 'main' into windowed-sort-rule
waynexia Oct 23, 2024
3618cc2
feat: introduce PartSortExec for partitioned sorting
waynexia Oct 23, 2024
682dced
tune exec nodes' properties and metrics
waynexia Oct 23, 2024
b7e67e8
clean up
waynexia Oct 24, 2024
753be5c
fix typo
waynexia Oct 24, 2024
ff144e6
debug: add more info on very wrong
discord9 Oct 24, 2024
d47d392
debug: also print overlap ranges
discord9 Oct 24, 2024
e1f679b
feat: add check when emit PartSort Stream
discord9 Oct 24, 2024
68007f0
dbg: info on overlap working range
discord9 Oct 24, 2024
8204e0c
feat: check batch range is inside part range
evenyag Oct 24, 2024
f04c722
set distinguish partition range param
waynexia Oct 24, 2024
640e373
chore: more logs
evenyag Oct 24, 2024
d71a7c2
update sqlness
waynexia Oct 24, 2024
159fce7
tune optimizer
waynexia Oct 24, 2024
c7b0405
clean up
waynexia Oct 24, 2024
7452f2a
fix lints
waynexia Oct 24, 2024
c45bcad
fix windowed sort rule
waynexia Oct 25, 2024
7ca9df9
fix: early terminate sort stream
waynexia Oct 25, 2024
529fd74
chore: remove min/max check
evenyag Oct 25, 2024
d5e94cc
chore: remove unused windowed_sort module, uuid feature and refactor …
waynexia Oct 28, 2024
5ae48cc
Merge branch 'main' into windowed-sort-rule
waynexia Oct 28, 2024
c29fedb
chore: print more fuzz log
evenyag Oct 28, 2024
f79103c
chore: more log
evenyag Oct 28, 2024
7baf4ad
fix: part sort should skip empty part
evenyag Oct 28, 2024
303b758
chore: remove insert logs
evenyag Oct 28, 2024
8a0891c
tests: empty PartitionRange
discord9 Oct 28, 2024
410bd2d
refactor: testcase
discord9 Oct 29, 2024
f16c562
Merge branch 'main' into windowed-sort-rule
evenyag Oct 29, 2024
4ce5c86
docs: update comment&tests: all empty
discord9 Oct 29, 2024
2e2ae98
ci: enlarge etcd cpu limit
evenyag Oct 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup-etcd-cluster/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ runs:
--set replicaCount=${{ inputs.etcd-replicas }} \
--set resources.requests.cpu=50m \
--set resources.requests.memory=128Mi \
--set resources.limits.cpu=1000m \
--set resources.limits.cpu=1500m \
--set resources.limits.memory=2Gi \
--set auth.rbac.create=false \
--set auth.rbac.token.enabled=false \
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ impl MitoEngine {
}

/// Returns a region scanner to scan the region for `request`.
async fn region_scanner(
fn region_scanner(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef> {
let scanner = self.scanner(region_id, request)?;
scanner.region_scanner().await
scanner.region_scanner()
waynexia marked this conversation as resolved.
Show resolved Hide resolved
}

/// Scans a region.
Expand Down Expand Up @@ -527,7 +527,6 @@ impl RegionEngine for MitoEngine {
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
self.region_scanner(region_id, request)
.await
.map_err(BoxedError::new)
}

Expand Down
25 changes: 25 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,17 +532,42 @@ impl Batch {
#[derive(Default)]
pub(crate) struct BatchChecker {
last_batch: Option<Batch>,
start: Option<Timestamp>,
end: Option<Timestamp>,
}

#[cfg(debug_assertions)]
impl BatchChecker {
/// Attaches the given start timestamp to the checker.
pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
self.start = start;
self
}

/// Attaches the given end timestamp to the checker.
pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
self.end = end;
self
}

/// Returns true if the given batch is monotonic and behind
/// the last batch.
pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> bool {
if !batch.check_monotonic() {
return false;
}

if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
if start > first {
return false;
}
}
if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
if end <= last {
return false;
}
}

// Checks the batch is behind the last batch.
// Then Updates the last batch.
let is_behind = self
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Scanner {

/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)),
Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)),
Expand Down
12 changes: 10 additions & 2 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ impl SeqScan {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default();
let mut checker = crate::read::BatchChecker::default()
.with_start(Some(part_range.start))
.with_end(Some(part_range.end));

while let Some(batch) = reader
.next_batch()
.await
Expand Down Expand Up @@ -304,8 +307,13 @@ impl RegionScanner for SeqScan {
self.scan_partition_impl(partition)
}

fn prepare(&mut self, ranges: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
Ok(())
}

Expand Down
11 changes: 9 additions & 2 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ impl UnorderedScan {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default();
let mut checker = crate::read::BatchChecker::default()
.with_start(Some(part_range.start))
.with_end(Some(part_range.end));

let stream = Self::scan_partition_range(
stream_ctx.clone(),
Expand Down Expand Up @@ -209,8 +211,13 @@ impl RegionScanner for UnorderedScan {
self.stream_ctx.input.mapper.output_schema()
}

fn prepare(&mut self, ranges: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
uuid.workspace = true

[dev-dependencies]
approx_eq = "0.1"
Expand Down
1 change: 1 addition & 0 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![feature(let_chains)]
#![feature(int_roundings)]
#![feature(trait_upcasting)]
#![feature(try_blocks)]

mod analyze;
pub mod dataframe;
Expand Down
1 change: 1 addition & 0 deletions src/query/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod string_normalization;
#[cfg(test)]
pub(crate) mod test_util;
pub mod type_conversion;
pub mod windowed_sort;

use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
Expand Down
13 changes: 12 additions & 1 deletion src/query/src/optimizer/parallelize_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,27 @@ impl ParallelizeScan {
let result = plan
.transform_down(|plan| {
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
if region_scan_exec.is_partition_set() {
return Ok(Transformed::no(plan));
}

let ranges = region_scan_exec.get_partition_ranges();
let total_range_num = ranges.len();
let expected_partition_num = config.execution.target_partitions;

// assign ranges to each partition
let partition_ranges =
let mut partition_ranges =
Self::assign_partition_range(ranges, expected_partition_num);
debug!(
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
);

// sort the ranges in each partition
// TODO(ruihang): smart sort!
for ranges in partition_ranges.iter_mut() {
ranges.sort_by(|a, b| a.start.cmp(&b.start));
}

// update the partition ranges
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
Expand Down
158 changes: 158 additions & 0 deletions src/query/src/optimizer/windowed_sort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result as DataFusionResult;
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use store_api::region_engine::PartitionRange;
use table::table::scan::RegionScanExec;

use crate::part_sort::PartSortExec;
use crate::window_sort::WindowedSortExec;

/// Optimize rule for windowed sort.
///
/// This is expected to run after [`ScanHint`] and [`ParallelizeScan`].
/// It would change the original sort to a custom plan. To make sure
/// other rules are applied correctly, this rule can be run as later as
/// possible.
///
/// [`ScanHint`]: crate::optimizer::scan_hint::ScanHintRule
/// [`ParallelizeScan`]: crate::optimizer::parallelize_scan::ParallelizeScan
pub struct WindowedSortPhysicalRule;

impl PhysicalOptimizerRule for WindowedSortPhysicalRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &datafusion::config::ConfigOptions,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
Self::do_optimize(plan, config)
}

fn name(&self) -> &str {
"WindowedSortRule"
}

fn schema_check(&self) -> bool {
false
}
}

impl WindowedSortPhysicalRule {
fn do_optimize(
plan: Arc<dyn ExecutionPlan>,
_config: &datafusion::config::ConfigOptions,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let result = plan
.transform_down(|plan| {
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
// TODO: support multiple expr in windowed sort
if !sort_exec.preserve_partitioning() || sort_exec.expr().len() != 1 {
return Ok(Transformed::no(plan));
}

let Some(scanner_info) = fetch_partition_range(sort_exec.input().clone())?
else {
return Ok(Transformed::no(plan));
};

if let Some(first_sort_expr) = sort_exec.expr().first()
&& !first_sort_expr.options.descending
&& let Some(column_expr) = first_sort_expr
.expr
.as_any()
.downcast_ref::<PhysicalColumn>()
&& column_expr.name() == scanner_info.time_index
{
} else {
return Ok(Transformed::no(plan));
}

let first_sort_expr = sort_exec.expr().first().unwrap().clone();
let part_sort_exec = Arc::new(PartSortExec::new(
first_sort_expr.clone(),
scanner_info.partition_ranges.clone(),
sort_exec.input().clone(),
));
let windowed_sort_exec = WindowedSortExec::try_new(
first_sort_expr,
sort_exec.fetch(),
scanner_info.partition_ranges,
part_sort_exec,
)?;

return Ok(Transformed {
data: Arc::new(windowed_sort_exec),
transformed: true,
tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
});
}

Ok(Transformed::no(plan))
})?
.data;

Ok(result)
}
}

struct ScannerInfo {
partition_ranges: Vec<Vec<PartitionRange>>,
time_index: String,
}

fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
let mut partition_ranges = None;
let mut time_index = None;

input.transform_up(|plan| {
// Unappliable case, reset the state.
if plan.as_any().is::<RepartitionExec>()
|| plan.as_any().is::<CoalesceBatchesExec>()
|| plan.as_any().is::<CoalescePartitionsExec>()
|| plan.as_any().is::<SortExec>()
|| plan.as_any().is::<WindowedSortExec>()
{
partition_ranges = None;
}

if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = region_scan_exec.time_index();

// set distinguish_partition_ranges to true, this is an incorrect workaround
region_scan_exec.with_distinguish_partition_range(true);
waynexia marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(Transformed::no(plan))
})?;

let result = try {
ScannerInfo {
partition_ranges: partition_ranges?,
time_index: time_index?,
}
};

Ok(result)
}
Loading