Skip to content

Commit

Permalink
refactor(stream): introduce Successor to refine SortBuffer (#7550)
Browse files Browse the repository at this point in the history
1. introduce `Successor` trait
2. refactor `SortBuffer` to use `Successor`

Approved-By: st1page
Approved-By: BugenZhao
  • Loading branch information
TennyZhuang authored Jan 28, 2023
1 parent 5e860b0 commit 6a1be40
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 30 deletions.
2 changes: 0 additions & 2 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
// limitations under the License.

#![allow(rustdoc::private_intra_doc_links)]
#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(trait_alias)]
#![feature(binary_heap_drain_sorted)]
#![feature(is_sorted)]
#![feature(fn_traits)]
#![feature(type_alias_impl_trait)]
#![feature(test)]
#![feature(trusted_len)]
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::error::BoxedError;
mod native_type;
mod ops;
mod scalar_impl;
mod successor;

use std::fmt::Debug;
use std::io::Cursor;
Expand All @@ -37,6 +38,7 @@ pub use native_type::*;
use risingwave_pb::data::data_type::IntervalType::*;
use risingwave_pb::data::data_type::{IntervalType, TypeName};
pub use scalar_impl::*;
pub use successor::*;
pub mod chrono_wrapper;
pub mod decimal;
pub mod interval;
Expand Down
84 changes: 84 additions & 0 deletions src/common/src/types/successor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2023 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::Duration;

use super::{NaiveDateTimeWrapper, NaiveDateWrapper, ScalarImpl};

/// A successor is a term that comes right after a particular value. Suppose n is a number (where n
/// belongs to any whole number), then the successor of n is 'n+1'. The other terminologies used for
/// a successor are just after, immediately after, and next value.
pub trait Successor {
/// Returns the successor of the current value if it exists, otherwise returns None.
fn successor(&self) -> Option<Self>
where
Self: Sized,
{
None
}
}

impl Successor for i16 {
fn successor(&self) -> Option<Self> {
self.checked_add(1)
}
}

impl Successor for i32 {
fn successor(&self) -> Option<Self> {
self.checked_add(1)
}
}

impl Successor for i64 {
fn successor(&self) -> Option<Self> {
self.checked_add(1)
}
}

impl Successor for NaiveDateTimeWrapper {
fn successor(&self) -> Option<Self> {
self.0
.checked_add_signed(Duration::nanoseconds(1))
.map(NaiveDateTimeWrapper)
}
}

impl Successor for NaiveDateWrapper {
fn successor(&self) -> Option<Self> {
self.0
.checked_add_signed(Duration::days(1))
.map(NaiveDateWrapper)
}
}

impl ScalarImpl {
/// Returns the successor of the current value if it exists.
///
/// See also [`Successor`].
///
/// The function may return None when:
/// 1. The current value is the maximum value of the type.
/// 2. The successor value of the type is not well-defined.
pub fn successor(&self) -> Option<Self> {
match self {
ScalarImpl::Int16(v) => v.successor().map(ScalarImpl::Int16),
ScalarImpl::Int32(v) => v.successor().map(ScalarImpl::Int32),
ScalarImpl::Int64(v) => v.successor().map(ScalarImpl::Int64),
ScalarImpl::NaiveDateTime(v) => v.successor().map(ScalarImpl::NaiveDateTime),
ScalarImpl::NaiveDate(v) => v.successor().map(ScalarImpl::NaiveDate),
_ => None,
}
}
}
1 change: 0 additions & 1 deletion src/sqlparser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#![cfg_attr(not(feature = "std"), no_std)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![expect(clippy::derive_partial_eq_without_eq)]
#![expect(clippy::doc_markdown)]
#![expect(clippy::upper_case_acronyms)]

Expand Down
43 changes: 16 additions & 27 deletions src/stream/src/executor/sort_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,36 +140,25 @@ impl<S: StateStore> SortBuffer<S> {
// Only records with timestamp greater than the last watermark will be output, so
// records will only be emitted exactly once unless recovery.
let start_bound = if let Some(last_watermark) = last_watermark.clone() {
// TODO: `start_bound` is wrong here, only values with `val.0 > last_watermark`
// should be output, but it's hard to represent `OwnedRow::MAX`. A possible
// implementation is introducing `next_unit` on a subset of `ScalarImpl` variants.
// Currently, we can skip some values explicitly.
Bound::Excluded((last_watermark, OwnedRow::empty().into()))
Bound::Excluded((
// TODO: unsupported type or watermark overflow. Do we have better ways instead
// of unwrap?
last_watermark.successor().unwrap(),
OwnedRow::empty().into(),
))
} else {
Bound::Unbounded
};
// TODO: `end_bound` = `Bound::Inclusive((watermark_value + 1, OwnedRow::empty()))`, but
// it's hard to represent now, so we end the loop by an explicit break.
let end_bound = Bound::Unbounded;

for ((time_col, _), (row, _)) in self.buffer.range((start_bound, end_bound)) {
if let Some(ref last_watermark) = &last_watermark && time_col == last_watermark {
continue;
}
// Only when a record's timestamp is prior to the watermark should it be
// sent to downstream.
if time_col <= watermark_val {
// Add the record to stream chunk data. Note that we retrieve the
// record from a BTreeMap, so data in this chunk should be ordered
// by timestamp and pk.
if let Some(data_chunk) = data_chunk_builder.append_one_row(row) {
// When the chunk size reaches its maximum, we construct a data chunk and
// send it to downstream.
yield data_chunk;
}
} else {
// We have collected all data below watermark.
break;
let end_bound = Bound::Excluded((
(watermark_val.successor().unwrap()),
OwnedRow::empty().into(),
));

for (_, (row, _)) in self.buffer.range((start_bound, end_bound)) {
if let Some(data_chunk) = data_chunk_builder.append_one_row(row) {
// When the chunk size reaches its maximum, we construct a data chunk and
// send it to downstream.
yield data_chunk;
}
}

Expand Down

0 comments on commit 6a1be40

Please sign in to comment.