Skip to content

Commit

Permalink
doc(window): align nature time
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Jul 21, 2022
1 parent 21fa3fa commit b01383f
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/en_US/sqls/windows.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ All the windowing operations output results at the end of the window. The output

## Time-units

There are 5 time-units can be used in the windows. For example, `TUMBLINGWINDOW(ss, 10)`, which means group the data with tumbling with with 10 seconds interval.
There are 5 time-units can be used in the windows. For example, `TUMBLINGWINDOW(ss, 10)`, which means group the data with tumbling with 10 seconds interval. The time intervals will align to the nature time. For example, a 10 second time window will always end at each 10s second such as 10, 20 or 30 regardless of the rule start time. A day window will always end in 24:00 local time.

**DD**: day unit

Expand Down
4 changes: 2 additions & 2 deletions docs/zh_CN/sqls/windows.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

## 时间单位

窗口中可以使用5个时间单位。 例如,`TUMBLINGWINDOW(ss,10)`,这意味着以10秒为间隔的滚动将数据分组。
窗口中可以使用5个时间单位。 例如,`TUMBLINGWINDOW(ss,10)`,这意味着以10秒为间隔的滚动将数据分组。时间间隔会根据自然时间对齐。例如,10秒的窗口,不管规则何时开始运行,窗口结束时间总是10秒的倍数,例如20秒,30秒等。以天为单位的窗口,窗口结束时间总是在当地时间的24:00。

DD:天单位

HH :小时单位

MI:分钟单位

SS:秒单位
SS:秒单位

MS :毫秒单位

Expand Down
16 changes: 15 additions & 1 deletion internal/conf/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ func InitClock() {
}
}

func GetLocalZone() int {
if IsTesting {
return 28800 // default to UTC+8
} else {
_, offset := time.Now().Local().Zone()
return offset
}

}

//Time related. For Mock
func GetTicker(duration int) *clock.Ticker {
return Clock.Ticker(time.Duration(duration) * time.Millisecond)
Expand All @@ -42,7 +52,11 @@ func GetTimer(duration int) *clock.Timer {
}

func GetTimerByTime(t time.Time) *clock.Timer {
return Clock.Timer(time.Until(t))
if IsTesting {
return Clock.Timer(time.Duration(t.UnixMilli()-GetNowInMilli()) * time.Millisecond)
} else {
return Clock.Timer(time.Until(t))
}
}

func GetNowInMilli() int64 {
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {

func getAlignedWindowEndTime(n, interval int64) time.Time {
now := time.UnixMilli(n)
_, offset := now.Local().Zone()
offset := conf.GetLocalZone()
start := now.Truncate(24 * time.Hour).Add(time.Duration(-1*offset) * time.Second)
diff := now.Sub(start).Milliseconds()
return now.Add(time.Duration(interval-(diff%interval)) * time.Millisecond)
Expand Down

0 comments on commit b01383f

Please sign in to comment.