Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support optional parameter offset in tumble and hop #8490

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
a6ff7d2
update
Eridanus117 Mar 11, 2023
e8b338b
update
Eridanus117 Mar 11, 2023
ea831d7
Merge branch 'risingwavelabs:main' into support_optional_parameter_of…
Eridanus117 Mar 11, 2023
af76e35
update
Eridanus117 Mar 11, 2023
cf943e8
update
Eridanus117 Mar 11, 2023
7b801f9
update
Eridanus117 Mar 11, 2023
57560bc
update
Eridanus117 Mar 11, 2023
6372eae
sync
Eridanus117 Mar 12, 2023
00998ac
sync
Eridanus117 Mar 12, 2023
0136bc6
sync
Eridanus117 Mar 12, 2023
87aa72d
sync
Eridanus117 Mar 12, 2023
c0328fa
sync
Eridanus117 Mar 12, 2023
0bbd69e
sync
Eridanus117 Mar 13, 2023
4bee50b
sync
Eridanus117 Mar 13, 2023
69513c6
sync
Eridanus117 Mar 13, 2023
5349586
sync
Eridanus117 Mar 13, 2023
8556c8c
sync
Eridanus117 Mar 13, 2023
9ce193f
sync
Eridanus117 Mar 13, 2023
94cfbef
sync
Eridanus117 Mar 13, 2023
66970a3
sync
Eridanus117 Mar 13, 2023
cdd6515
sync
Eridanus117 Mar 13, 2023
9fdbee2
sync
Eridanus117 Mar 13, 2023
03f7065
sync
Eridanus117 Mar 13, 2023
1b5af19
sync
Eridanus117 Mar 13, 2023
2f73330
sync
Eridanus117 Mar 13, 2023
5db03bd
sync
Eridanus117 Mar 13, 2023
e9b0826
sync
Eridanus117 Mar 13, 2023
228e0e9
sync
Eridanus117 Mar 13, 2023
f2f315f
sync
Eridanus117 Mar 13, 2023
ff0f21d
sync
Eridanus117 Mar 13, 2023
cbdf975
sync
Eridanus117 Mar 13, 2023
cb700f7
sync
Eridanus117 Mar 13, 2023
6bd78b5
sync
Eridanus117 Mar 13, 2023
2d5b26f
sync
Eridanus117 Mar 13, 2023
de92a9d
sync
Eridanus117 Mar 13, 2023
b4ea3ca
sync
Eridanus117 Mar 13, 2023
4fcc463
sync
Eridanus117 Mar 13, 2023
1707957
sync
Eridanus117 Mar 13, 2023
6fa09e5
sync
Eridanus117 Mar 13, 2023
163af1d
sync
Eridanus117 Mar 13, 2023
dea0bc5
sync
Eridanus117 Mar 13, 2023
28e3998
add tests
Eridanus117 Mar 14, 2023
affa80e
sync
Eridanus117 Mar 14, 2023
75047cd
sync
Eridanus117 Mar 14, 2023
bb3f63e
sync
Eridanus117 Mar 14, 2023
faa9fdc
sync
Eridanus117 Mar 14, 2023
d4741b0
sync
Eridanus117 Mar 14, 2023
272e480
sync
Eridanus117 Mar 14, 2023
58c299d
Merge branch 'main' into support_optional_parameter_offset_in_tumble_…
Eridanus117 Mar 14, 2023
d1af3b2
sync
Eridanus117 Mar 14, 2023
17f82a2
sync
Eridanus117 Mar 14, 2023
d9b2c19
sync
Eridanus117 Mar 15, 2023
02ac0a4
Merge branch 'main' into support_optional_parameter_offset_in_tumble_…
Eridanus117 Mar 15, 2023
919c93f
add tests
Eridanus117 Mar 15, 2023
4b29e0d
fix
Eridanus117 Mar 15, 2023
91ee4da
var naming
Eridanus117 Mar 15, 2023
2ff7209
fix comment
Eridanus117 Mar 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions e2e_test/batch/basic/time_window.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ from tumble(t1, created_at, interval '30' minute) order by row_id, window_start;
7 1 2022-01-01 10:51:00 2022-01-01 10:30:00 2022-01-01 11:00:00
8 3 2022-01-01 11:02:00 2022-01-01 11:00:00 2022-01-01 11:30:00


query IITTT
select row_id, uid, created_at, window_start, window_end
from tumble(t1, created_at, interval '30' minute, interval '13' minute) order by row_id, window_start;
----
1 1 2022-01-01 10:00:00 2022-01-01 09:43:00 2022-01-01 10:13:00
2 3 2022-01-01 10:05:00 2022-01-01 09:43:00 2022-01-01 10:13:00
3 2 2022-01-01 10:14:00 2022-01-01 10:13:00 2022-01-01 10:43:00
4 1 2022-01-01 10:22:00 2022-01-01 10:13:00 2022-01-01 10:43:00
5 3 2022-01-01 10:33:00 2022-01-01 10:13:00 2022-01-01 10:43:00
6 2 2022-01-01 10:42:00 2022-01-01 10:13:00 2022-01-01 10:43:00
7 1 2022-01-01 10:51:00 2022-01-01 10:43:00 2022-01-01 11:13:00
8 3 2022-01-01 11:02:00 2022-01-01 10:43:00 2022-01-01 11:13:00


query IITTT
select row_id, uid, created_at, window_start, window_end
from hop(t1, created_at, interval '15' minute, interval '30' minute) order by row_id, window_start;
Expand All @@ -49,6 +64,27 @@ from hop(t1, created_at, interval '15' minute, interval '30' minute) order by ro
8 3 2022-01-01 11:02:00 2022-01-01 10:45:00 2022-01-01 11:15:00
8 3 2022-01-01 11:02:00 2022-01-01 11:00:00 2022-01-01 11:30:00

query IITTT
select row_id, uid, created_at, window_start, window_end
from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute) order by row_id, window_start;
----
1 1 2022-01-01 10:00:00 2022-01-01 09:43:00 2022-01-01 10:13:00
1 1 2022-01-01 10:00:00 2022-01-01 09:58:00 2022-01-01 10:28:00
2 3 2022-01-01 10:05:00 2022-01-01 09:43:00 2022-01-01 10:13:00
2 3 2022-01-01 10:05:00 2022-01-01 09:58:00 2022-01-01 10:28:00
3 2 2022-01-01 10:14:00 2022-01-01 09:58:00 2022-01-01 10:28:00
3 2 2022-01-01 10:14:00 2022-01-01 10:13:00 2022-01-01 10:43:00
4 1 2022-01-01 10:22:00 2022-01-01 09:58:00 2022-01-01 10:28:00
4 1 2022-01-01 10:22:00 2022-01-01 10:13:00 2022-01-01 10:43:00
5 3 2022-01-01 10:33:00 2022-01-01 10:13:00 2022-01-01 10:43:00
5 3 2022-01-01 10:33:00 2022-01-01 10:28:00 2022-01-01 10:58:00
6 2 2022-01-01 10:42:00 2022-01-01 10:13:00 2022-01-01 10:43:00
6 2 2022-01-01 10:42:00 2022-01-01 10:28:00 2022-01-01 10:58:00
7 1 2022-01-01 10:51:00 2022-01-01 10:28:00 2022-01-01 10:58:00
7 1 2022-01-01 10:51:00 2022-01-01 10:43:00 2022-01-01 11:13:00
8 3 2022-01-01 11:02:00 2022-01-01 10:43:00 2022-01-01 11:13:00
8 3 2022-01-01 11:02:00 2022-01-01 10:58:00 2022-01-01 11:28:00

query IIT rowsort
select row_id, uid, created_at
from hop(t1, created_at, interval '15' minute, interval '30' minute);
Expand All @@ -70,6 +106,29 @@ from hop(t1, created_at, interval '15' minute, interval '30' minute);
8 3 2022-01-01 11:02:00
8 3 2022-01-01 11:02:00


query IIT rowsort
select row_id, uid, created_at
from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute);
----
1 1 2022-01-01 10:00:00
1 1 2022-01-01 10:00:00
2 3 2022-01-01 10:05:00
2 3 2022-01-01 10:05:00
3 2 2022-01-01 10:14:00
3 2 2022-01-01 10:14:00
4 1 2022-01-01 10:22:00
4 1 2022-01-01 10:22:00
5 3 2022-01-01 10:33:00
5 3 2022-01-01 10:33:00
6 2 2022-01-01 10:42:00
6 2 2022-01-01 10:42:00
7 1 2022-01-01 10:51:00
7 1 2022-01-01 10:51:00
8 3 2022-01-01 11:02:00
8 3 2022-01-01 11:02:00


query IT
select sum(v), window_start
from tumble(t1, created_at, interval '30' minute)
Expand All @@ -79,6 +138,15 @@ group by window_start order by window_start;
18 2022-01-01 10:30:00
8 2022-01-01 11:00:00

query IT
select sum(v), window_start
from tumble(t1, created_at, interval '30' minute, interval '13' minute)
group by window_start order by window_start;
----
7 2022-01-01 09:43:00
15 2022-01-01 10:13:00
14 2022-01-01 10:43:00

query IIT
select uid, sum(v), window_start
from tumble(t1, created_at, interval '30' minute)
Expand All @@ -92,6 +160,20 @@ group by window_start, uid order by window_start, uid;
3 5 2022-01-01 10:30:00
3 8 2022-01-01 11:00:00

query IIT
select uid, sum(v), window_start
from tumble(t1, created_at, interval '30' minute, interval '13' minute)
group by window_start, uid order by window_start, uid;
----
1 4 2022-01-01 09:43:00
3 3 2022-01-01 09:43:00
1 1 2022-01-01 10:13:00
2 9 2022-01-01 10:13:00
3 5 2022-01-01 10:13:00
1 6 2022-01-01 10:43:00
3 8 2022-01-01 10:43:00


query IT
select sum(v), window_start
from hop(t1, created_at, interval '15' minute, interval '30' minute)
Expand All @@ -104,6 +186,19 @@ group by window_start order by window_start;
14 2022-01-01 10:45:00
8 2022-01-01 11:00:00


query IT
select sum(v), window_start
from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute)
group by window_start order by window_start;
----
7 2022-01-01 09:43:00
10 2022-01-01 09:58:00
15 2022-01-01 10:13:00
18 2022-01-01 10:28:00
14 2022-01-01 10:43:00
8 2022-01-01 10:58:00

query IIT
select uid, sum(v), window_start
from hop(t1, created_at, interval '15' minute, interval '30' minute)
Expand All @@ -125,6 +220,28 @@ group by window_start, uid order by window_start, uid;
3 8 2022-01-01 10:45:00
3 8 2022-01-01 11:00:00



query IIT
select uid, sum(v), window_start
from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute)
group by window_start, uid order by window_start, uid;
----
1 4 2022-01-01 09:43:00
3 3 2022-01-01 09:43:00
1 5 2022-01-01 09:58:00
2 2 2022-01-01 09:58:00
3 3 2022-01-01 09:58:00
1 1 2022-01-01 10:13:00
2 9 2022-01-01 10:13:00
3 5 2022-01-01 10:13:00
1 6 2022-01-01 10:28:00
2 7 2022-01-01 10:28:00
3 5 2022-01-01 10:28:00
1 6 2022-01-01 10:43:00
3 8 2022-01-01 10:43:00
3 8 2022-01-01 10:58:00

statement error
select * from hop(t1, created_at, interval '0', interval '1');

Expand Down
112 changes: 103 additions & 9 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ impl HopWindowExecutor {
async fn do_execute(self: Box<Self>) {
let Self {
child,

window_slide,
window_size,
output_indices,
Expand Down Expand Up @@ -219,7 +218,12 @@ mod tests {
use super::*;
use crate::executor::test_utils::MockExecutor;

fn create_executor(output_indices: Vec<usize>) -> Box<HopWindowExecutor> {
fn create_executor(
output_indices: Vec<usize>,
window_slide: IntervalUnit,
window_size: IntervalUnit,
window_offset: IntervalUnit,
) -> Box<HopWindowExecutor> {
let field1 = Field::unnamed(DataType::Int64);
let field2 = Field::unnamed(DataType::Int64);
let field3 = Field::with_name(DataType::Timestamp, "created_at");
Expand All @@ -237,14 +241,17 @@ mod tests {
8 3 ^11:02:00"
.replace('^', "2022-2-2T"),
);

let mut mock_executor = MockExecutor::new(schema.clone());
mock_executor.add(chunk);

let window_slide = IntervalUnit::from_minutes(15);
let window_size = IntervalUnit::from_minutes(30);
let (window_start_exprs, window_end_exprs) =
make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap();
let (window_start_exprs, window_end_exprs) = make_hop_window_expression(
DataType::Timestamp,
2,
window_size,
window_slide,
window_offset,
)
.unwrap();

Box::new(HopWindowExecutor::new(
Box::new(mock_executor),
Expand All @@ -259,10 +266,94 @@ mod tests {
))
}

#[tokio::test]
async fn test_window_offset() {
async fn test_window_offset_helper(window_offset: IntervalUnit) -> DataChunk {
let default_indices = (0..3 + 2).collect_vec();
let window_slide = IntervalUnit::from_minutes(15);
let window_size = IntervalUnit::from_minutes(30);
let executor =
create_executor(default_indices, window_slide, window_size, window_offset);
let mut stream = executor.execute();
stream.next().await.unwrap().unwrap()
}

let window_size = 30;
for offset in 0..window_size {
for coefficient in -5..0 {
assert_eq!(
test_window_offset_helper(IntervalUnit::from_minutes(
coefficient * window_size + offset
))
.await,
test_window_offset_helper(IntervalUnit::from_minutes(
(coefficient - 1) * window_size + offset
))
.await
);
}
}
for offset in 0..window_size {
for coefficient in 0..5 {
assert_eq!(
test_window_offset_helper(IntervalUnit::from_minutes(
coefficient * window_size + offset
))
.await,
test_window_offset_helper(IntervalUnit::from_minutes(
(coefficient + 1) * window_size + offset
))
.await
);
}
}
for offset in -window_size..window_size {
assert_eq!(
test_window_offset_helper(IntervalUnit::from_minutes(window_size + offset)).await,
test_window_offset_helper(IntervalUnit::from_minutes(-window_size + offset)).await
);
}

assert_eq!(
test_window_offset_helper(IntervalUnit::from_minutes(-31)).await,
DataChunk::from_pretty(
&"I I TS TS TS
1 1 ^10:00:00 ^09:44:00 ^10:14:00
2 3 ^10:05:00 ^09:44:00 ^10:14:00
3 2 ^10:14:00 ^09:59:00 ^10:29:00
4 1 ^10:22:00 ^09:59:00 ^10:29:00
5 3 ^10:33:00 ^10:14:00 ^10:44:00
6 2 ^10:42:00 ^10:14:00 ^10:44:00
7 1 ^10:51:00 ^10:29:00 ^10:59:00
8 3 ^11:02:00 ^10:44:00 ^11:14:00"
.replace('^', "2022-2-2T"),
)
);
assert_eq!(
test_window_offset_helper(IntervalUnit::from_minutes(29)).await,
DataChunk::from_pretty(
&"I I TS TS TS
1 1 ^10:00:00 ^09:44:00 ^10:14:00
2 3 ^10:05:00 ^09:44:00 ^10:14:00
3 2 ^10:14:00 ^09:59:00 ^10:29:00
4 1 ^10:22:00 ^09:59:00 ^10:29:00
5 3 ^10:33:00 ^10:14:00 ^10:44:00
6 2 ^10:42:00 ^10:14:00 ^10:44:00
7 1 ^10:51:00 ^10:29:00 ^10:59:00
8 3 ^11:02:00 ^10:44:00 ^11:14:00"
.replace('^', "2022-2-2T"),
)
);
}

#[tokio::test]
async fn test_execute() {
let default_indices = (0..3 + 2).collect_vec();
let executor = create_executor(default_indices);

let window_slide = IntervalUnit::from_minutes(15);
let window_size = IntervalUnit::from_minutes(30);
let window_offset = IntervalUnit::from_minutes(0);
let executor = create_executor(default_indices, window_slide, window_size, window_offset);

let mut stream = executor.execute();
// TODO: add more test infra to reduce the duplicated codes below.
Expand Down Expand Up @@ -303,7 +394,10 @@ mod tests {
}
#[tokio::test]
async fn test_output_indices() {
let executor = create_executor(vec![1, 3, 4, 2]);
let window_slide = IntervalUnit::from_minutes(15);
let window_size = IntervalUnit::from_minutes(30);
let window_offset = IntervalUnit::from_minutes(0);
let executor = create_executor(vec![1, 3, 4, 2], window_slide, window_size, window_offset);

let mut stream = executor.execute();
// TODO: add more test infra to reduce the duplicated codes below.
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/types/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ pub struct IntervalUnit {
usecs: i64,
}

const USECS_PER_SEC: i64 = 1_000_000;
const USECS_PER_DAY: i64 = 86400 * USECS_PER_SEC;
const USECS_PER_MONTH: i64 = 30 * USECS_PER_DAY;
pub const USECS_PER_SEC: i64 = 1_000_000;
pub const USECS_PER_DAY: i64 = 86400 * USECS_PER_SEC;
pub const USECS_PER_MONTH: i64 = 30 * USECS_PER_DAY;

impl IntervalUnit {
/// Smallest interval value.
Expand Down
24 changes: 21 additions & 3 deletions src/expr/src/expr/build_expr_from_prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use super::expr_unary::{
use super::expr_vnode::VnodeExpression;
use crate::expr::expr_array_distinct::ArrayDistinctExpression;
use crate::expr::expr_array_to_string::ArrayToStringExpression;
use crate::expr::expr_binary_nonnull::new_tumble_start;
use crate::expr::expr_ternary::new_tumble_start_offset;
use crate::expr::{
build_from_prost as expr_build_from_prost, BoxedExpression, Expression, InputRefExpression,
LiteralExpression,
Expand All @@ -69,9 +71,9 @@ pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
build_unary_expr_prost(prost)
}
Equal | NotEqual | LessThan | LessThanOrEqual | GreaterThan | GreaterThanOrEqual | Add
| Subtract | Multiply | Divide | Modulus | Extract | RoundDigit | Pow | TumbleStart
| Position | BitwiseShiftLeft | BitwiseShiftRight | BitwiseAnd | BitwiseOr | BitwiseXor
| ConcatOp | AtTimeZone | CastWithTimeZone | JsonbAccessInner | JsonbAccessStr => {
| Subtract | Multiply | Divide | Modulus | Extract | RoundDigit | Pow | Position
| BitwiseShiftLeft | BitwiseShiftRight | BitwiseAnd | BitwiseOr | BitwiseXor | ConcatOp
| AtTimeZone | CastWithTimeZone | JsonbAccessInner | JsonbAccessStr => {
build_binary_expr_prost(prost)
}
And | Or | IsDistinctFrom | IsNotDistinctFrom | ArrayAccess | FormatType => {
Expand All @@ -87,6 +89,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
Translate => build_translate_expr(prost),

// Variable number of arguments and based on `Unary/Binary/Ternary/...Expression`
TumbleStart => build_tumble_start_expr(prost),
Substr => build_substr_expr(prost),
Overlay => build_overlay_expr(prost),
Trim => build_trim_expr(prost),
Expand Down Expand Up @@ -272,6 +275,21 @@ fn build_date_trunc_expr(prost: &ExprNode) -> Result<BoxedExpression> {
Ok(new_date_trunc_expr(ret_type, field, source, time_zone))
}

fn build_tumble_start_expr(prost: &ExprNode) -> Result<BoxedExpression> {
let (children, ret_type) = get_children_and_return_type(prost)?;
ensure!(children.len() == 2 || children.len() == 3);
let time = expr_build_from_prost(&children[0])?;
let window_size = expr_build_from_prost(&children[1])?;
if children.len() == 2 {
new_tumble_start(time, window_size, ret_type)
} else if children.len() == 3 {
let offset = expr_build_from_prost(&children[2])?;
new_tumble_start_offset(time, window_size, offset, ret_type)
} else {
unreachable!()
}
}

fn build_length_expr(prost: &ExprNode) -> Result<BoxedExpression> {
let (children, ret_type) = get_children_and_return_type(prost)?;
// TODO: add encoding length expr
Expand Down
Loading