From 7a15c70e35037b683cbe6ca8ae1f48cd8aa7d6d0 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Tue, 19 Jul 2022 11:04:21 +0800 Subject: [PATCH 1/2] feat(window): window op align to the nature time Signed-off-by: Jiyong Huang --- internal/conf/time.go | 13 ++- internal/topo/node/watermark.go | 9 +- internal/topo/node/window_op.go | 103 ++++++++++++++++------ internal/topo/node/window_op_test.go | 50 +++++++++++ internal/topo/topotest/checkpoint_test.go | 21 ++--- 5 files changed, 150 insertions(+), 46 deletions(-) diff --git a/internal/conf/time.go b/internal/conf/time.go index 906db12705..cb95dd6047 100644 --- a/internal/conf/time.go +++ b/internal/conf/time.go @@ -1,4 +1,4 @@ -// Copyright 2021 EMQ Technologies Co., Ltd. +// Copyright 2021-2022 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,11 +16,12 @@ package conf import ( "github.com/benbjohnson/clock" - "github.com/lf-edge/ekuiper/pkg/cast" "time" ) -var Clock clock.Clock +var ( + Clock clock.Clock +) func InitClock() { if IsTesting { @@ -40,8 +41,12 @@ func GetTimer(duration int) *clock.Timer { return Clock.Timer(time.Duration(duration) * time.Millisecond) } +func GetTimerByTime(t time.Time) *clock.Timer { + return Clock.Timer(time.Until(t)) +} + func GetNowInMilli() int64 { - return cast.TimeToUnixMilli(Clock.Now()) + return Clock.Now().UnixMilli() } func GetNow() time.Time { diff --git a/internal/topo/node/watermark.go b/internal/topo/node/watermark.go index fb0d0ba469..97f09821a2 100644 --- a/internal/topo/node/watermark.go +++ b/internal/topo/node/watermark.go @@ -128,10 +128,10 @@ func (w *WatermarkGenerator) getNextWindow(inputs []*xsql.Tuple, current int64, } else { interval := int64(w.interval) nextTs := getEarliestEventTs(inputs, current, watermark) - if nextTs == math.MaxInt64 || nextTs%interval == 0 { + if nextTs == math.MaxInt64 { return nextTs } - return nextTs + (interval - nextTs%interval) + return getAlignedWindowEndTime(nextTs, interval).UnixMilli() } case ast.SLIDING_WINDOW: nextTs := getEarliestEventTs(inputs, current, watermark) @@ -148,10 +148,7 @@ func (w *WatermarkGenerator) getNextSessionWindow(inputs []*xsql.Tuple) (int64, return inputs[i].Timestamp < inputs[j].Timestamp }) et := inputs[0].Timestamp - tick := et + (duration - et%duration) - if et%duration == 0 { - tick = et - } + tick := getAlignedWindowEndTime(et, duration).UnixMilli() var p int64 ticked := false for _, tuple := range inputs { diff --git a/internal/topo/node/window_op.go b/internal/topo/node/window_op.go index 8259d1a699..c7faa84137 100644 --- a/internal/topo/node/window_op.go +++ b/internal/topo/node/window_op.go @@ -160,32 +160,51 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) { } } +func getAlignedWindowEndTime(n, interval int64) time.Time { + now := time.UnixMilli(n) + _, offset := now.Local().Zone() + start := now.Truncate(24 * time.Hour).Add(time.Duration(-1*offset) * time.Second) + diff := now.Sub(start).Milliseconds() + return now.Add(time.Duration(interval-(diff%interval)) * time.Millisecond) +} + +func getFirstTimer(ctx api.StreamContext, interval int64) (int64, *clock.Timer) { + next := getAlignedWindowEndTime(conf.GetNowInMilli(), interval) + ctx.GetLogger().Infof("align window timer to %v(%d)", next, next.UnixMilli()) + return next.UnixMilli(), conf.GetTimerByTime(next) +} + func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*xsql.Tuple, errCh chan<- error) { log := ctx.GetLogger() var ( - c <-chan time.Time timeoutTicker *clock.Timer - timeout <-chan time.Time + // The first ticker to align the first window to the nature time + firstTicker *clock.Timer + firstTime int64 + nextTime int64 + firstC <-chan time.Time + timeout <-chan time.Time + c <-chan time.Time ) switch o.window.Type { case ast.NOT_WINDOW: case ast.TUMBLING_WINDOW: - o.ticker = conf.GetTicker(o.window.Length) + firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Length)) o.interval = o.window.Length case ast.HOPPING_WINDOW: - o.ticker = conf.GetTicker(o.window.Interval) + firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Interval)) o.interval = o.window.Interval case ast.SLIDING_WINDOW: o.interval = o.window.Length case ast.SESSION_WINDOW: - o.ticker = conf.GetTicker(o.window.Length) + firstTime, firstTicker = getFirstTimer(ctx, int64(o.window.Length)) o.interval = o.window.Interval case ast.COUNT_WINDOW: o.interval = o.window.Interval } - if o.ticker != nil { - c = o.ticker.C + if firstTicker != nil { + firstC = firstTicker.C //resume previous window if len(inputs) > 0 && o.triggerTime > 0 { nextTick := conf.GetNowInMilli() + int64(o.interval) @@ -316,24 +335,31 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x o.Broadcast(fmt.Errorf("run Window error: expect xsql.Tuple type but got %[1]T(%[1]v)", d)) o.statManager.IncTotalExceptions() } - case now := <-c: - n := cast.TimeToUnixMilli(now) + case now := <-firstC: + log.Debugf("First tick at %v(%d), defined at %d", now, now.UnixMilli(), firstTime) + switch o.window.Type { + case ast.TUMBLING_WINDOW: + o.ticker = conf.GetTicker(o.window.Length) + case ast.HOPPING_WINDOW: + o.ticker = conf.GetTicker(o.window.Interval) + case ast.SESSION_WINDOW: + o.ticker = conf.GetTicker(o.window.Length) + } + firstTicker = nil + c = o.ticker.C + inputs = o.tick(ctx, inputs, firstTime, log) if o.window.Type == ast.SESSION_WINDOW { - log.Debugf("session window update trigger time %d with %d inputs", n, len(inputs)) - if len(inputs) == 0 || n-int64(o.window.Length) < inputs[0].Timestamp { - if len(inputs) > 0 { - log.Debugf("session window last trigger time %d < first tuple %d", n-int64(o.window.Length), inputs[0].Timestamp) - } - break - } + nextTime = firstTime + int64(o.window.Length) + } else { + nextTime = firstTime + int64(o.interval) } - if len(inputs) > 0 { - o.statManager.ProcessTimeStart() - log.Debugf("triggered by ticker at %d", n) - inputs, _ = o.scan(inputs, n, ctx) - o.statManager.ProcessTimeEnd() - ctx.PutState(WINDOW_INPUTS_KEY, inputs) - ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime) + case now := <-c: + log.Debugf("Successive tick at %v(%d)", now, now.UnixMilli()) + inputs = o.tick(ctx, inputs, nextTime, log) + if o.window.Type == ast.SESSION_WINDOW { + nextTime += int64(o.window.Length) + } else { + nextTime += int64(o.interval) } case now := <-timeout: if len(inputs) > 0 { @@ -358,6 +384,29 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x } } +func (o *WindowOperator) tick(ctx api.StreamContext, inputs []*xsql.Tuple, n int64, log api.Logger) []*xsql.Tuple { + if o.window.Type == ast.SESSION_WINDOW { + log.Debugf("session window update trigger time %d with %d inputs", n, len(inputs)) + if len(inputs) == 0 || n-int64(o.window.Length) < inputs[0].Timestamp { + if len(inputs) > 0 { + log.Debugf("session window last trigger time %d < first tuple %d", n-int64(o.window.Length), inputs[0].Timestamp) + } + return inputs + } + } + if len(inputs) > 0 { + o.statManager.ProcessTimeStart() + log.Debugf("triggered by ticker at %d", n) + inputs, _ = o.scan(inputs, n, ctx) + o.statManager.ProcessTimeEnd() + ctx.PutState(WINDOW_INPUTS_KEY, inputs) + } else { + o.triggerTime = n + } + ctx.PutState(TRIGGER_TIME_KEY, o.triggerTime) + return inputs +} + type TupleList struct { tuples []*xsql.Tuple index int //Current index @@ -456,6 +505,9 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S case ast.SLIDING_WINDOW: windowStart = triggerTime - int64(o.window.Length) } + if windowStart <= 0 { + windowStart = windowEnd - int64(o.window.Length) + } results.WindowRange = xsql.NewWindowRange(windowStart, windowEnd) log.Debugf("window %s triggered for %d tuples", o.name, len(inputs)) if o.isEventTime { @@ -464,11 +516,10 @@ func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime int64, ctx api.S log.Debugf("Sent: %v", results) o.Broadcast(results) triggered = true - o.triggerTime = triggerTime o.statManager.IncTotalRecordsOut() - log.Debugf("done scan") } - + o.triggerTime = triggerTime + log.Debugf("new trigger time %d", o.triggerTime) return inputs[:i], triggered } diff --git a/internal/topo/node/window_op_test.go b/internal/topo/node/window_op_test.go index be1f60041c..2b1a8ab0f3 100644 --- a/internal/topo/node/window_op_test.go +++ b/internal/topo/node/window_op_test.go @@ -19,6 +19,7 @@ import ( "github.com/lf-edge/ekuiper/internal/xsql" "reflect" "testing" + "time" ) var fivet = []*xsql.Tuple{ @@ -49,6 +50,55 @@ var fivet = []*xsql.Tuple{ }, } +func TestTime(t *testing.T) { + var tests = []struct { + interval int + end time.Time + }{ + { + interval: 10, + end: time.UnixMilli(1658218371340), + }, { + interval: 500, + end: time.UnixMilli(1658218371500), + }, { + interval: 1000, + end: time.UnixMilli(1658218372000), + }, { + interval: 40000, // 4oms + end: time.UnixMilli(1658218400000), + }, { + interval: 60000, + end: time.UnixMilli(1658218380000), + }, { + interval: 180000, + end: time.UnixMilli(1658218500000), + }, { + interval: 3600000, + end: time.UnixMilli(1658221200000), + }, { + interval: 7200000, + end: time.UnixMilli(1658224800000), + }, { + interval: 18000000, // 5 hours + end: time.UnixMilli(1658232000000), + }, { + interval: 3600000 * 24, // 1 day + end: time.UnixMilli(1658246400000), + }, { + interval: 3600000 * 24 * 7, // 1 week + end: time.UnixMilli(1658764800000), + }, + } + fmt.Printf("The test bucket size is %d.\n\n", len(tests)) + for i, tt := range tests { + ae := getAlignedWindowEndTime(1658218371337, int64(tt.interval)) + if tt.end.UnixMilli() != ae.UnixMilli() { + t.Errorf("%d for interval %d. error mismatch:\n exp=%s(%d)\n got=%s(%d)\n\n", i, tt.interval, tt.end, tt.end.UnixMilli(), ae, ae.UnixMilli()) + } + } +} + func TestNewTupleList(t *testing.T) { _, e := NewTupleList(nil, 0) es1 := "Window size should not be less than zero." diff --git a/internal/topo/topotest/checkpoint_test.go b/internal/topo/topotest/checkpoint_test.go index b72611d874..6bc00e96d5 100644 --- a/internal/topo/topotest/checkpoint_test.go +++ b/internal/topo/topotest/checkpoint_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 EMQ Technologies Co., Ltd. +// Copyright 2021-2022 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -52,10 +52,6 @@ func TestCheckpoint(t *testing.T) { "color": "blue", "size": float64(2), "ts": float64(1541152487632), - }, { - "color": "yellow", - "size": float64(4), - "ts": float64(1541152488442), }}, {{ "color": "blue", @@ -65,6 +61,11 @@ func TestCheckpoint(t *testing.T) { "color": "yellow", "size": float64(4), "ts": float64(1541152488442), + }}, + {{ + "color": "yellow", + "size": float64(4), + "ts": float64(1541152488442), }, { "color": "red", "size": float64(1), @@ -72,17 +73,17 @@ func TestCheckpoint(t *testing.T) { }}, }, M: map[string]interface{}{ - "op_3_project_0_records_in_total": int64(3), - "op_3_project_0_records_out_total": int64(3), + "op_3_project_0_records_in_total": int64(4), + "op_3_project_0_records_out_total": int64(4), - "sink_mockSink_0_records_in_total": int64(3), - "sink_mockSink_0_records_out_total": int64(3), + "sink_mockSink_0_records_in_total": int64(4), + "sink_mockSink_0_records_out_total": int64(4), "source_demo_0_records_in_total": int64(3), "source_demo_0_records_out_total": int64(3), "op_2_window_0_records_in_total": int64(3), - "op_2_window_0_records_out_total": int64(3), + "op_2_window_0_records_out_total": int64(4), }, }, PauseSize: 3, From b62b0899b679d54f01003fbc846c4693656788e4 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Tue, 19 Jul 2022 17:32:12 +0800 Subject: [PATCH 2/2] doc(window): align nature time Signed-off-by: Jiyong Huang --- docs/en_US/sqls/windows.md | 2 +- docs/zh_CN/sqls/windows.md | 4 ++-- internal/conf/time.go | 16 +++++++++++++++- internal/topo/node/window_op.go | 2 +- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/docs/en_US/sqls/windows.md b/docs/en_US/sqls/windows.md index cd7febb175..66e3d67a38 100644 --- a/docs/en_US/sqls/windows.md +++ b/docs/en_US/sqls/windows.md @@ -8,7 +8,7 @@ All the windowing operations output results at the end of the window. The output ## Time-units -There are 5 time-units can be used in the windows. For example, `TUMBLINGWINDOW(ss, 10)`, which means group the data with tumbling with with 10 seconds interval. +There are 5 time-units can be used in the windows. For example, `TUMBLINGWINDOW(ss, 10)`, which means group the data with tumbling with 10 seconds interval. The time intervals will align to the nature time. For example, a 10 second time window will always end at each 10s second such as 10, 20 or 30 regardless of the rule start time. A day window will always end in 24:00 local time. **DD**: day unit diff --git a/docs/zh_CN/sqls/windows.md b/docs/zh_CN/sqls/windows.md index 45066db30f..117f4b3b9d 100755 --- a/docs/zh_CN/sqls/windows.md +++ b/docs/zh_CN/sqls/windows.md @@ -8,7 +8,7 @@ ## 时间单位 -窗口中可以使用5个时间单位。 例如,`TUMBLINGWINDOW(ss,10)`,这意味着以10秒为间隔的滚动将数据分组。 +窗口中可以使用5个时间单位。 例如,`TUMBLINGWINDOW(ss,10)`,这意味着以10秒为间隔的滚动将数据分组。时间间隔会根据自然时间对齐。例如,10秒的窗口,不管规则何时开始运行,窗口结束时间总是10秒的倍数,例如20秒,30秒等。以天为单位的窗口,窗口结束时间总是在当地时间的24:00。 DD:天单位 @@ -16,7 +16,7 @@ HH :小时单位 MI:分钟单位 - SS:秒单位 +SS:秒单位 MS :毫秒单位 diff --git a/internal/conf/time.go b/internal/conf/time.go index cb95dd6047..50ce474ff5 100644 --- a/internal/conf/time.go +++ b/internal/conf/time.go @@ -32,6 +32,16 @@ func InitClock() { } } +func GetLocalZone() int { + if IsTesting { + return 28800 // default to UTC+8 + } else { + _, offset := time.Now().Local().Zone() + return offset + } + +} + //Time related. For Mock func GetTicker(duration int) *clock.Ticker { return Clock.Ticker(time.Duration(duration) * time.Millisecond) @@ -42,7 +52,11 @@ func GetTimer(duration int) *clock.Timer { } func GetTimerByTime(t time.Time) *clock.Timer { - return Clock.Timer(time.Until(t)) + if IsTesting { + return Clock.Timer(time.Duration(t.UnixMilli()-GetNowInMilli()) * time.Millisecond) + } else { + return Clock.Timer(time.Until(t)) + } } func GetNowInMilli() int64 { diff --git a/internal/topo/node/window_op.go b/internal/topo/node/window_op.go index c7faa84137..6995dc4cd8 100644 --- a/internal/topo/node/window_op.go +++ b/internal/topo/node/window_op.go @@ -162,7 +162,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) { func getAlignedWindowEndTime(n, interval int64) time.Time { now := time.UnixMilli(n) - _, offset := now.Local().Zone() + offset := conf.GetLocalZone() start := now.Truncate(24 * time.Hour).Add(time.Duration(-1*offset) * time.Second) diff := now.Sub(start).Milliseconds() return now.Add(time.Duration(interval-(diff%interval)) * time.Millisecond)