Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nature time processing window #1323

Merged
merged 2 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en_US/sqls/windows.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ All the windowing operations output results at the end of the window. The output

## Time-units

There are 5 time-units can be used in the windows. For example, `TUMBLINGWINDOW(ss, 10)`, which means group the data with tumbling with with 10 seconds interval.
There are 5 time-units can be used in the windows. For example, `TUMBLINGWINDOW(ss, 10)`, which means group the data with tumbling with 10 seconds interval. The time intervals will align to the nature time. For example, a 10 second time window will always end at each 10s second such as 10, 20 or 30 regardless of the rule start time. A day window will always end in 24:00 local time.

**DD**: day unit

Expand Down
4 changes: 2 additions & 2 deletions docs/zh_CN/sqls/windows.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

## 时间单位

窗口中可以使用5个时间单位。 例如,`TUMBLINGWINDOW(ss,10)`,这意味着以10秒为间隔的滚动将数据分组。
窗口中可以使用5个时间单位。 例如,`TUMBLINGWINDOW(ss,10)`,这意味着以10秒为间隔的滚动将数据分组。时间间隔会根据自然时间对齐。例如,10秒的窗口,不管规则何时开始运行,窗口结束时间总是10秒的倍数,例如20秒,30秒等。以天为单位的窗口,窗口结束时间总是在当地时间的24:00。

DD:天单位

HH :小时单位

MI:分钟单位

SS:秒单位
SS:秒单位

MS :毫秒单位

Expand Down
27 changes: 23 additions & 4 deletions internal/conf/time.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 EMQ Technologies Co., Ltd.
// Copyright 2021-2022 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,12 @@ package conf

import (
"github.com/benbjohnson/clock"
"github.com/lf-edge/ekuiper/pkg/cast"
"time"
)

var Clock clock.Clock
var (
Clock clock.Clock
)

func InitClock() {
if IsTesting {
Expand All @@ -31,6 +32,16 @@ func InitClock() {
}
}

func GetLocalZone() int {
if IsTesting {
return 28800 // default to UTC+8
} else {
_, offset := time.Now().Local().Zone()
return offset
}

}

//Time related. For Mock
func GetTicker(duration int) *clock.Ticker {
return Clock.Ticker(time.Duration(duration) * time.Millisecond)
Expand All @@ -40,8 +51,16 @@ func GetTimer(duration int) *clock.Timer {
return Clock.Timer(time.Duration(duration) * time.Millisecond)
}

func GetTimerByTime(t time.Time) *clock.Timer {
if IsTesting {
return Clock.Timer(time.Duration(t.UnixMilli()-GetNowInMilli()) * time.Millisecond)
} else {
return Clock.Timer(time.Until(t))
}
}

func GetNowInMilli() int64 {
return cast.TimeToUnixMilli(Clock.Now())
return Clock.Now().UnixMilli()
}

func GetNow() time.Time {
Expand Down
9 changes: 3 additions & 6 deletions internal/topo/node/watermark.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64,
} else {
interval := int64(w.interval)
nextTs := getEarliestEventTs(inputs, current, watermark)
if nextTs == math.MaxInt64 || nextTs%interval == 0 {
if nextTs == math.MaxInt64 {
return nextTs
}
return nextTs + (interval - nextTs%interval)
return getAlignedWindowEndTime(nextTs, interval).UnixMilli()
}
case ast.SLIDING_WINDOW:
nextTs := getEarliestEventTs(inputs, current, watermark)
Expand All @@ -148,10 +148,7 @@ func (w *WatermarkGenerator) getNextSessionWindow(inputs []*xsql.Tuple) (int64,
return inputs[i].Timestamp < inputs[j].Timestamp
})
et := inputs[0].Timestamp
tick := et + (duration - et%duration)
if et%duration == 0 {
tick = et
}
tick := getAlignedWindowEndTime(et, duration).UnixMilli()
var p int64
ticked := false
for _, tuple := range inputs {
Expand Down
103 changes: 77 additions & 26 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,32 +160,51 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
}
}

func getAlignedWindowEndTime(n, interval int64) time.Time {
now := time.UnixMilli(n)
offset := conf.GetLocalZone()
start := now.Truncate(24 * time.Hour).Add(time.Duration(-1*offset) * time.Second)
diff := now.Sub(start).Milliseconds()
return now.Add(time.Duration(interval-(diff%interval)) * time.Millisecond)
}

func getFirstTimer(ctx api.StreamContext, interval int64) (int64, *clock.Timer) {
next := getAlignedWindowEndTime(conf.GetNowInMilli(), interval)
ctx.GetLogger().Infof("align window timer to %v(%d)", next, next.UnixMilli())
return next.UnixMilli(), conf.GetTimerByTime(next)
}

func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) {
log := ctx.GetLogger()
var (
c <-chan time.Time
timeoutTicker *clock.Timer
timeout <-chan time.Time
// The first ticker to align the first window to the nature time
firstTicker *clock.Timer
firstTime int64
nextTime int64
firstC <-chan time.Time
timeout <-chan time.Time
c <-chan time.Time
)
switch o.window.Type {
case ast.NOT_WINDOW:
case ast.TUMBLING_WINDOW:
o.ticker = conf.GetTicker(o.window.Length)
firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Length))
o.interval = o.window.Length
case ast.HOPPING_WINDOW:
o.ticker = conf.GetTicker(o.window.Interval)
firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Interval))
o.interval = o.window.Interval
case ast.SLIDING_WINDOW:
o.interval = o.window.Length
case ast.SESSION_WINDOW:
o.ticker = conf.GetTicker(o.window.Length)
firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Length))
o.interval = o.window.Interval
case ast.COUNT_WINDOW:
o.interval = o.window.Interval
}

if o.ticker != nil {
c = o.ticker.C
if firstTicker != nil {
firstC = firstTicker.C
//resume previous window
if len(inputs) > 0 && o.triggerTime > 0 {
nextTick := conf.GetNowInMilli() + int64(o.interval)
Expand Down Expand Up @@ -316,24 +335,31 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
o.Broadcast(fmt.Errorf("run Window error: expect xsql.Tuple type but got %[1]T(%[1]v)", d))
o.statManager.IncTotalExceptions()
}
case now := <-c:
n := cast.TimeToUnixMilli(now)
case now := <-firstC:
log.Debugf("First tick at %v(%d), defined at %d", now, now.UnixMilli(), firstTime)
switch o.window.Type {
case ast.TUMBLING_WINDOW:
o.ticker = conf.GetTicker(o.window.Length)
case ast.HOPPING_WINDOW:
o.ticker = conf.GetTicker(o.window.Interval)
case ast.SESSION_WINDOW:
o.ticker = conf.GetTicker(o.window.Length)
}
firstTicker = nil
c = o.ticker.C
inputs = o.tick(ctx, inputs, firstTime, log)
if o.window.Type == ast.SESSION_WINDOW {
log.Debugf("session window update trigger time %d with %d inputs", n, len(inputs))
if len(inputs) == 0 || n-int64(o.window.Length) < inputs[0].Timestamp {
if len(inputs) > 0 {
log.Debugf("session window last trigger time %d < first tuple %d", n-int64(o.window.Length), inputs[0].Timestamp)
}
break
}
nextTime = firstTime + int64(o.window.Length)
} else {
nextTime = firstTime + int64(o.interval)
}
if len(inputs) > 0 {
o.statManager.ProcessTimeStart()
log.Debugf("triggered by ticker at %d", n)
inputs, _ = o.scan(inputs, n, ctx)
o.statManager.ProcessTimeEnd()
ctx.PutState(WINDOW_INPUTS_KEY, inputs)
ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
case now := <-c:
log.Debugf("Successive tick at %v(%d)", now, now.UnixMilli())
inputs = o.tick(ctx, inputs, nextTime, log)
if o.window.Type == ast.SESSION_WINDOW {
nextTime += int64(o.window.Length)
} else {
nextTime += int64(o.interval)
}
case now := <-timeout:
if len(inputs) > 0 {
Expand All @@ -358,6 +384,29 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
}
}

func (o *WindowOperator) tick(ctx api.StreamContext, inputs []*xsql.Tuple, n int64, log api.Logger) []*xsql.Tuple {
if o.window.Type == ast.SESSION_WINDOW {
log.Debugf("session window update trigger time %d with %d inputs", n, len(inputs))
if len(inputs) == 0 || n-int64(o.window.Length) < inputs[0].Timestamp {
if len(inputs) > 0 {
log.Debugf("session window last trigger time %d < first tuple %d", n-int64(o.window.Length), inputs[0].Timestamp)
}
return inputs
}
}
if len(inputs) > 0 {
o.statManager.ProcessTimeStart()
log.Debugf("triggered by ticker at %d", n)
inputs, _ = o.scan(inputs, n, ctx)
o.statManager.ProcessTimeEnd()
ctx.PutState(WINDOW_INPUTS_KEY, inputs)
} else {
o.triggerTime = n
}
ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime)
return inputs
}

type TupleList struct {
tuples []*xsql.Tuple
index int //Current index
Expand Down Expand Up @@ -456,6 +505,9 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
case ast.SLIDING_WINDOW:
windowStart = triggerTime - int64(o.window.Length)
}
if windowStart <= 0 {
windowStart = windowEnd - int64(o.window.Length)
}
results.WindowRange = xsql.NewWindowRange(windowStart, windowEnd)
log.Debugf("window %s triggered for %d tuples", o.name, len(inputs))
if o.isEventTime {
Expand All @@ -464,11 +516,10 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S
log.Debugf("Sent: %v", results)
o.Broadcast(results)
triggered = true
o.triggerTime = triggerTime
o.statManager.IncTotalRecordsOut()
log.Debugf("done scan")
}

o.triggerTime = triggerTime
log.Debugf("new trigger time %d", o.triggerTime)
return inputs[:i], triggered
}

Expand Down
50 changes: 50 additions & 0 deletions internal/topo/node/window_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lf-edge/ekuiper/internal/xsql"
"reflect"
"testing"
"time"
)

var fivet = []*xsql.Tuple{
Expand Down Expand Up @@ -49,6 +50,55 @@ var fivet = []*xsql.Tuple{
},
}

func TestTime(t *testing.T) {
var tests = []struct {
interval int
end time.Time
}{
{
interval: 10,
end: time.UnixMilli(1658218371340),
}, {
interval: 500,
end: time.UnixMilli(1658218371500),
}, {
interval: 1000,
end: time.UnixMilli(1658218372000),
}, {
interval: 40000, // 4oms
end: time.UnixMilli(1658218400000),
}, {
interval: 60000,
end: time.UnixMilli(1658218380000),
}, {
interval: 180000,
end: time.UnixMilli(1658218500000),
}, {
interval: 3600000,
end: time.UnixMilli(1658221200000),
}, {
interval: 7200000,
end: time.UnixMilli(1658224800000),
}, {
interval: 18000000, // 5 hours
end: time.UnixMilli(1658232000000),
}, {
interval: 3600000 * 24, // 1 day
end: time.UnixMilli(1658246400000),
}, {
interval: 3600000 * 24 * 7, // 1 week
end: time.UnixMilli(1658764800000),
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
for i, tt := range tests {
ae := getAlignedWindowEndTime(1658218371337, int64(tt.interval))
if tt.end.UnixMilli() != ae.UnixMilli() {
t.Errorf("%d for interval %d. error mismatch:\n exp=%s(%d)\n got=%s(%d)\n\n", i, tt.interval, tt.end, tt.end.UnixMilli(), ae, ae.UnixMilli())
}
}
}

func TestNewTupleList(t *testing.T) {
_, e := NewTupleList(nil, 0)
es1 := "Window size should not be less than zero."
Expand Down
21 changes: 11 additions & 10 deletions internal/topo/topotest/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 EMQ Technologies Co., Ltd.
// Copyright 2021-2022 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,10 +52,6 @@ func TestCheckpoint(t *testing.T) {
"color": "blue",
"size": float64(2),
"ts": float64(1541152487632),
}, {
"color": "yellow",
"size": float64(4),
"ts": float64(1541152488442),
}},
{{
"color": "blue",
Expand All @@ -65,24 +61,29 @@ func TestCheckpoint(t *testing.T) {
"color": "yellow",
"size": float64(4),
"ts": float64(1541152488442),
}},
{{
"color": "yellow",
"size": float64(4),
"ts": float64(1541152488442),
}, {
"color": "red",
"size": float64(1),
"ts": float64(1541152489252),
}},
},
M: map[string]interface{}{
"op_3_project_0_records_in_total": int64(3),
"op_3_project_0_records_out_total": int64(3),
"op_3_project_0_records_in_total": int64(4),
"op_3_project_0_records_out_total": int64(4),

"sink_mockSink_0_records_in_total": int64(3),
"sink_mockSink_0_records_out_total": int64(3),
"sink_mockSink_0_records_in_total": int64(4),
"sink_mockSink_0_records_out_total": int64(4),

"source_demo_0_records_in_total": int64(3),
"source_demo_0_records_out_total": int64(3),

"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(3),
"op_2_window_0_records_out_total": int64(4),
},
},
PauseSize: 3,
Expand Down