From c35e41bbfd99129568097eccb31ac3e5f1f64c48 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 3 Dec 2024 11:01:02 +0800 Subject: [PATCH 1/6] support envet Signed-off-by: Song Gao --- internal/topo/node/window_inc_agg_event_op.go | 135 +++++++++++++++ .../topo/node/window_inc_agg_event_op_test.go | 158 ++++++++++++++++++ internal/topo/node/window_inc_agg_op.go | 39 ++++- 3 files changed, 329 insertions(+), 3 deletions(-) create mode 100644 internal/topo/node/window_inc_agg_event_op.go create mode 100644 internal/topo/node/window_inc_agg_event_op_test.go diff --git a/internal/topo/node/window_inc_agg_event_op.go b/internal/topo/node/window_inc_agg_event_op.go new file mode 100644 index 0000000000..be63906910 --- /dev/null +++ b/internal/topo/node/window_inc_agg_event_op.go @@ -0,0 +1,135 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package node + +import ( + "time" + + "github.com/lf-edge/ekuiper/contract/v2/api" + + "github.com/lf-edge/ekuiper/v2/internal/xsql" +) + +type SlidingWindowIncAggEventOp struct { + *SlidingWindowIncAggOp + EmitList []*IncAggWindow +} + +func NewSlidingWindowIncAggEventOp(o *WindowIncAggOperator) *SlidingWindowIncAggEventOp { + op := &SlidingWindowIncAggEventOp{} + op.SlidingWindowIncAggOp = NewSlidingWindowIncAggOp(o) + op.EmitList = make([]*IncAggWindow, 0) + return op +} + +func (so *SlidingWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- error) { + fv, _ := xsql.NewFunctionValuersForOp(ctx) + for { + select { + case <-ctx.Done(): + return + case input := <-so.input: + data, processed := so.ingest(ctx, input) + if processed { + break + } + switch tuple := data.(type) { + case *xsql.WatermarkTuple: + now := tuple.GetTimestamp() + so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) + so.emitList(ctx, errCh, now) + case *xsql.Tuple: + now := tuple.GetTimestamp() + so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) + if so.Delay > 0 { + so.appendDelayIncAggWindowInEvent(ctx, errCh, fv, tuple) + continue + } + so.appendIncAggWindowInEvent(ctx, errCh, fv, tuple) + } + } + } +} + +func (so *SlidingWindowIncAggEventOp) emitList(ctx api.StreamContext, errCh chan<- error, triggerTS time.Time) { + if len(so.EmitList) > 0 { + triggerIndex := -1 + for index, window := range so.EmitList { + if window.StartTime.Before(triggerTS) || window.StartTime.Equal(triggerTS) { + triggerIndex = index + so.emit(ctx, errCh, window, triggerTS) + } else { + break + } + } + // emit nothing + if triggerIndex == -1 { + return + } + // emit all windows + if triggerIndex >= len(so.EmitList)-1 { + so.EmitList = make([]*IncAggWindow, 0) + return + } + // emit part of windows + so.EmitList = so.EmitList[triggerIndex+1:] + } +} + +func (so *SlidingWindowIncAggEventOp) appendIncAggWindowInEvent(ctx api.StreamContext, errCh chan<- error, fv *xsql.FunctionValuer, row *xsql.Tuple) { + so.appendIncAggWindow(ctx, errCh, fv, row, row.GetTimestamp()) + if so.isMatchCondition(ctx, fv, row) { + emitWindow := so.CurrWindowList[0].Clone(ctx) + emitWindow.StartTime = row.GetTimestamp() + so.EmitList = append(so.EmitList, emitWindow) + } + return +} + +func (so *SlidingWindowIncAggEventOp) appendDelayIncAggWindowInEvent(ctx api.StreamContext, errCh chan<- error, fv *xsql.FunctionValuer, row *xsql.Tuple) { + name := calDimension(fv, so.Dimensions, row) + so.CurrWindowList = append(so.CurrWindowList, newIncAggWindow(ctx, row.GetTimestamp())) + for _, incWindow := range so.CurrWindowList { + incAggCal(ctx, name, row, incWindow, so.aggFields) + } + for _, incWindow := range so.EmitList { + incAggCal(ctx, name, row, incWindow, so.aggFields) + } + if so.isMatchCondition(ctx, fv, row) { + emitWindow := so.CurrWindowList[0].Clone(ctx) + emitWindow.StartTime = row.GetTimestamp().Add(so.Delay) + so.EmitList = append(so.EmitList, emitWindow) + } +} + +func (o *WindowIncAggOperator) ingest(ctx api.StreamContext, item any) (any, bool) { + ctx.GetLogger().Debugf("receive %v", item) + item, processed := o.preprocess(ctx, item) + if processed { + return item, processed + } + switch d := item.(type) { + case error: + if o.sendError { + o.Broadcast(d) + } + return nil, true + case xsql.EOFTuple: + o.Broadcast(d) + return nil, true + } + // watermark tuple should return + return item, false +} diff --git a/internal/topo/node/window_inc_agg_event_op_test.go b/internal/topo/node/window_inc_agg_event_op_test.go new file mode 100644 index 0000000000..de214c96c8 --- /dev/null +++ b/internal/topo/node/window_inc_agg_event_op_test.go @@ -0,0 +1,158 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package node_test + +import ( + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/lf-edge/ekuiper/v2/internal/conf" + "github.com/lf-edge/ekuiper/v2/internal/pkg/def" + "github.com/lf-edge/ekuiper/v2/internal/pkg/store" + "github.com/lf-edge/ekuiper/v2/internal/topo/node" + "github.com/lf-edge/ekuiper/v2/internal/topo/planner" + "github.com/lf-edge/ekuiper/v2/internal/xsql" + mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" +) + +func TestIncEventSlidingWindow(t *testing.T) { + conf.IsTesting = true + o := &def.RuleOption{ + BufferLength: 10, + IsEventTime: true, + } + kv, err := store.GetKV("stream") + require.NoError(t, err) + require.NoError(t, prepareStream()) + sql := "select count(*) from stream group by slidingWindow(ss,10)" + stmt, err := xsql.NewParser(strings.NewReader(sql)).Parse() + require.NoError(t, err) + p, err := planner.CreateLogicalPlan(stmt, &def.RuleOption{ + PlanOptimizeStrategy: &def.PlanOptimizeStrategy{ + EnableIncrementalWindow: true, + }, + Qos: 0, + }, kv) + require.NoError(t, err) + require.NotNil(t, p) + incPlan := extractIncWindowPlan(p) + require.NotNil(t, incPlan) + op, err := node.NewWindowIncAggOp("1", &node.WindowConfig{ + Type: incPlan.WType, + Length: time.Second, + }, incPlan.Dimensions, incPlan.IncAggFuncs, o) + require.NoError(t, err) + require.NotNil(t, op) + input, _ := op.GetInput() + output := make(chan any, 10) + op.AddOutput(output, "output") + errCh := make(chan error, 10) + ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel() + op.Exec(ctx, errCh) + time.Sleep(10 * time.Millisecond) + tuple1Ts := time.Now() + tuple2Ts := tuple1Ts.Add(500 * time.Millisecond) + waterMark1Ts := tuple1Ts.Add(1200 * time.Millisecond) + input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}, Timestamp: tuple1Ts} + input <- &xsql.Tuple{Message: map[string]any{"a": int64(2)}, Timestamp: tuple2Ts} + input <- &xsql.WatermarkTuple{Timestamp: waterMark1Ts} + got1 := <-output + wt, ok := got1.(*xsql.WindowTuples) + require.True(t, ok) + require.NotNil(t, wt) + d := wt.ToMaps() + require.Equal(t, []map[string]any{ + { + "a": int64(1), + "inc_agg_col_1": int64(1), + }, + }, d) + got2 := <-output + wt, ok = got2.(*xsql.WindowTuples) + require.True(t, ok) + require.NotNil(t, wt) + d = wt.ToMaps() + require.Equal(t, []map[string]any{ + { + "a": int64(2), + "inc_agg_col_1": int64(2), + }, + }, d) + cancel() + time.Sleep(10 * time.Millisecond) + op.Close() +} + +func TestIncEventDelaySlidingWindow(t *testing.T) { + conf.IsTesting = true + o := &def.RuleOption{ + BufferLength: 10, + IsEventTime: true, + } + kv, err := store.GetKV("stream") + require.NoError(t, err) + require.NoError(t, prepareStream()) + sql := "select count(*) from stream group by slidingWindow(ss,1,1)" + stmt, err := xsql.NewParser(strings.NewReader(sql)).Parse() + require.NoError(t, err) + p, err := planner.CreateLogicalPlan(stmt, &def.RuleOption{ + PlanOptimizeStrategy: &def.PlanOptimizeStrategy{ + EnableIncrementalWindow: true, + }, + Qos: 0, + }, kv) + require.NoError(t, err) + require.NotNil(t, p) + incPlan := extractIncWindowPlan(p) + require.NotNil(t, incPlan) + op, err := node.NewWindowIncAggOp("1", &node.WindowConfig{ + Type: incPlan.WType, + Length: time.Second, + Delay: time.Second, + }, incPlan.Dimensions, incPlan.IncAggFuncs, o) + require.NoError(t, err) + require.NotNil(t, op) + input, _ := op.GetInput() + output := make(chan any, 10) + op.AddOutput(output, "output") + errCh := make(chan error, 10) + ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel() + op.Exec(ctx, errCh) + time.Sleep(10 * time.Millisecond) + + tuple1Ts := time.Now() + tuple2Ts := tuple1Ts.Add(500 * time.Millisecond) + waterMark1Ts := tuple1Ts.Add(1100 * time.Millisecond) + input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}, Timestamp: tuple1Ts} + input <- &xsql.Tuple{Message: map[string]any{"a": int64(2)}, Timestamp: tuple2Ts} + input <- &xsql.WatermarkTuple{Timestamp: waterMark1Ts} + got1 := <-output + wt, ok := got1.(*xsql.WindowTuples) + require.True(t, ok) + require.NotNil(t, wt) + d := wt.ToMaps() + require.Equal(t, []map[string]any{ + { + "a": int64(2), + "inc_agg_col_1": int64(2), + }, + }, d) + cancel() + time.Sleep(10 * time.Millisecond) + op.Close() +} diff --git a/internal/topo/node/window_inc_agg_op.go b/internal/topo/node/window_inc_agg_op.go index 52c0ca8790..baaadd3f4c 100644 --- a/internal/topo/node/window_inc_agg_op.go +++ b/internal/topo/node/window_inc_agg_op.go @@ -72,8 +72,13 @@ func NewWindowIncAggOp(name string, w *WindowConfig, dimensions ast.Dimensions, wExec := NewTumblingWindowIncAggOp(o) o.WindowExec = wExec case ast.SLIDING_WINDOW: - wExec := NewSlidingWindowIncAggOp(o) - o.WindowExec = wExec + if options.IsEventTime { + wExec := NewSlidingWindowIncAggEventOp(o) + o.WindowExec = wExec + } else { + wExec := NewSlidingWindowIncAggOp(o) + o.WindowExec = wExec + } case ast.HOPPING_WINDOW: o.WindowExec = NewHoppingWindowIncAggOp(o) } @@ -123,6 +128,14 @@ type IncAggWindow struct { DimensionsIncAggRange map[string]*IncAggRange } +func (w *IncAggWindow) Clone(ctx api.StreamContext) *IncAggWindow { + c := &IncAggWindow{StartTime: w.StartTime, DimensionsIncAggRange: map[string]*IncAggRange{}} + for k, v := range w.DimensionsIncAggRange { + c.DimensionsIncAggRange[k] = v.Clone(ctx) + } + return c +} + func (w *IncAggWindow) GenerateAllFunctionState() { if w == nil { return @@ -150,8 +163,28 @@ type IncAggRange struct { Fields map[string]interface{} } -func (r *IncAggRange) generateFunctionState() { +func (r *IncAggRange) Clone(ctx api.StreamContext) *IncAggRange { + fstore, _ := state.CreateStore("incAggWindow", 0) + fctx := topoContext.Background().WithMeta(ctx.GetRuleId(), ctx.GetOpId(), fstore) + for k, v := range r.generateFunctionState() { + fctx.PutState(k, v) + } + fv, _ := xsql.NewFunctionValuersForOp(fctx) + c := &IncAggRange{ + fctx: fctx.(*topoContext.DefaultContext), + fv: fv, + LastRow: r.LastRow.Clone().(*xsql.Tuple), + Fields: make(map[string]interface{}), + } + for k, v := range r.Fields { + c.Fields[k] = v + } + return c +} + +func (r *IncAggRange) generateFunctionState() map[string]interface{} { r.FunctionState = r.fctx.GetAllState() + return r.FunctionState } func (r *IncAggRange) restoreState(ctx api.StreamContext) { From b627ec8c8f27634fe73b6dcb5a2a9a463794d1ff Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 4 Dec 2024 14:39:09 +0800 Subject: [PATCH 2/6] fix Signed-off-by: Song Gao --- internal/topo/node/window_inc_agg_event_op.go | 7 ++++--- internal/topo/node/window_inc_agg_op.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/topo/node/window_inc_agg_event_op.go b/internal/topo/node/window_inc_agg_event_op.go index be63906910..560a47ef86 100644 --- a/internal/topo/node/window_inc_agg_event_op.go +++ b/internal/topo/node/window_inc_agg_event_op.go @@ -48,16 +48,17 @@ func (so *SlidingWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- e switch tuple := data.(type) { case *xsql.WatermarkTuple: now := tuple.GetTimestamp() - so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) so.emitList(ctx, errCh, now) + so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) case *xsql.Tuple: now := tuple.GetTimestamp() - so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) if so.Delay > 0 { so.appendDelayIncAggWindowInEvent(ctx, errCh, fv, tuple) + so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) continue } so.appendIncAggWindowInEvent(ctx, errCh, fv, tuple) + so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) } } } @@ -67,7 +68,7 @@ func (so *SlidingWindowIncAggEventOp) emitList(ctx api.StreamContext, errCh chan if len(so.EmitList) > 0 { triggerIndex := -1 for index, window := range so.EmitList { - if window.StartTime.Before(triggerTS) || window.StartTime.Equal(triggerTS) { + if window.StartTime.Compare(triggerTS) <= 0 { triggerIndex = index so.emit(ctx, errCh, window, triggerTS) } else { diff --git a/internal/topo/node/window_inc_agg_op.go b/internal/topo/node/window_inc_agg_op.go index baaadd3f4c..2235b9282f 100644 --- a/internal/topo/node/window_inc_agg_op.go +++ b/internal/topo/node/window_inc_agg_op.go @@ -796,7 +796,7 @@ func calDimension(fv *xsql.FunctionValuer, dimensions ast.Dimensions, row *xsql. func gcIncAggWindow(currWindowList []*IncAggWindow, windowLength time.Duration, now time.Time) []*IncAggWindow { index := 0 for i, incAggWindow := range currWindowList { - if now.Sub(incAggWindow.StartTime) > windowLength { + if now.Sub(incAggWindow.StartTime) >= windowLength { index = i + 1 continue } From 1b3b256438517ff379859b75befe9cac6d5294c4 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 5 Dec 2024 12:27:22 +0800 Subject: [PATCH 3/6] revise the code Signed-off-by: Song Gao --- internal/topo/node/window_inc_agg_event_op.go | 16 +++++++++------- internal/topo/node/window_inc_agg_op.go | 5 ++++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/internal/topo/node/window_inc_agg_event_op.go b/internal/topo/node/window_inc_agg_event_op.go index 560a47ef86..2765ff4e9e 100644 --- a/internal/topo/node/window_inc_agg_event_op.go +++ b/internal/topo/node/window_inc_agg_event_op.go @@ -51,14 +51,11 @@ func (so *SlidingWindowIncAggEventOp) exec(ctx api.StreamContext, errCh chan<- e so.emitList(ctx, errCh, now) so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) case *xsql.Tuple: - now := tuple.GetTimestamp() if so.Delay > 0 { so.appendDelayIncAggWindowInEvent(ctx, errCh, fv, tuple) - so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) continue } so.appendIncAggWindowInEvent(ctx, errCh, fv, tuple) - so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length, now) } } } @@ -68,7 +65,7 @@ func (so *SlidingWindowIncAggEventOp) emitList(ctx api.StreamContext, errCh chan if len(so.EmitList) > 0 { triggerIndex := -1 for index, window := range so.EmitList { - if window.StartTime.Compare(triggerTS) <= 0 { + if window.EventTime.Add(so.Delay).Compare(triggerTS) <= 0 { triggerIndex = index so.emit(ctx, errCh, window, triggerTS) } else { @@ -100,17 +97,22 @@ func (so *SlidingWindowIncAggEventOp) appendIncAggWindowInEvent(ctx api.StreamCo } func (so *SlidingWindowIncAggEventOp) appendDelayIncAggWindowInEvent(ctx api.StreamContext, errCh chan<- error, fv *xsql.FunctionValuer, row *xsql.Tuple) { + now := row.GetTimestamp() name := calDimension(fv, so.Dimensions, row) so.CurrWindowList = append(so.CurrWindowList, newIncAggWindow(ctx, row.GetTimestamp())) for _, incWindow := range so.CurrWindowList { - incAggCal(ctx, name, row, incWindow, so.aggFields) + if incWindow.StartTime.Compare(now) <= 0 && incWindow.StartTime.Add(so.Length).After(now) { + incAggCal(ctx, name, row, incWindow, so.aggFields) + } } for _, incWindow := range so.EmitList { - incAggCal(ctx, name, row, incWindow, so.aggFields) + if incWindow.EventTime.Compare(now) <= 0 && incWindow.EventTime.Add(so.Delay).After(now) { + incAggCal(ctx, name, row, incWindow, so.aggFields) + } } if so.isMatchCondition(ctx, fv, row) { emitWindow := so.CurrWindowList[0].Clone(ctx) - emitWindow.StartTime = row.GetTimestamp().Add(so.Delay) + emitWindow.EventTime = row.GetTimestamp() so.EmitList = append(so.EmitList, emitWindow) } } diff --git a/internal/topo/node/window_inc_agg_op.go b/internal/topo/node/window_inc_agg_op.go index 2235b9282f..ccfb1fb0f0 100644 --- a/internal/topo/node/window_inc_agg_op.go +++ b/internal/topo/node/window_inc_agg_op.go @@ -125,6 +125,7 @@ type CountWindowIncAggOpState struct { type IncAggWindow struct { StartTime time.Time + EventTime time.Time DimensionsIncAggRange map[string]*IncAggRange } @@ -534,7 +535,9 @@ func (so *SlidingWindowIncAggOp) appendIncAggWindow(ctx api.StreamContext, errCh name := calDimension(fv, so.Dimensions, row) so.CurrWindowList = append(so.CurrWindowList, newIncAggWindow(ctx, now)) for _, incWindow := range so.CurrWindowList { - incAggCal(ctx, name, row, incWindow, so.aggFields) + if incWindow.StartTime.Compare(now) <= 0 && incWindow.StartTime.Add(so.Length).After(now) { + incAggCal(ctx, name, row, incWindow, so.aggFields) + } } } From 0124f41ee9176e966b55fdc0e17a74c1ca0c1e04 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 5 Dec 2024 13:09:50 +0800 Subject: [PATCH 4/6] fix Signed-off-by: Song Gao --- internal/topo/node/window_inc_agg_event_op.go | 11 ++++++++++- internal/topo/node/window_inc_agg_op.go | 6 ++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/internal/topo/node/window_inc_agg_event_op.go b/internal/topo/node/window_inc_agg_event_op.go index 2765ff4e9e..e6ee5b47ea 100644 --- a/internal/topo/node/window_inc_agg_event_op.go +++ b/internal/topo/node/window_inc_agg_event_op.go @@ -87,7 +87,16 @@ func (so *SlidingWindowIncAggEventOp) emitList(ctx api.StreamContext, errCh chan } func (so *SlidingWindowIncAggEventOp) appendIncAggWindowInEvent(ctx api.StreamContext, errCh chan<- error, fv *xsql.FunctionValuer, row *xsql.Tuple) { - so.appendIncAggWindow(ctx, errCh, fv, row, row.GetTimestamp()) + now := row.GetTimestamp() + name := calDimension(fv, so.Dimensions, row) + if so.isMatchCondition(ctx, fv, row) { + so.CurrWindowList = append(so.CurrWindowList, newIncAggWindow(ctx, now)) + } + for _, incWindow := range so.CurrWindowList { + if incWindow.StartTime.Compare(now) <= 0 && incWindow.StartTime.Add(so.Length).After(now) { + incAggCal(ctx, name, row, incWindow, so.aggFields) + } + } if so.isMatchCondition(ctx, fv, row) { emitWindow := so.CurrWindowList[0].Clone(ctx) emitWindow.StartTime = row.GetTimestamp() diff --git a/internal/topo/node/window_inc_agg_op.go b/internal/topo/node/window_inc_agg_op.go index ccfb1fb0f0..a5cfdaf90a 100644 --- a/internal/topo/node/window_inc_agg_op.go +++ b/internal/topo/node/window_inc_agg_op.go @@ -533,9 +533,11 @@ func (so *SlidingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error) func (so *SlidingWindowIncAggOp) appendIncAggWindow(ctx api.StreamContext, errCh chan<- error, fv *xsql.FunctionValuer, row *xsql.Tuple, now time.Time) { name := calDimension(fv, so.Dimensions, row) - so.CurrWindowList = append(so.CurrWindowList, newIncAggWindow(ctx, now)) + if so.isMatchCondition(ctx, fv, row) { + so.CurrWindowList = append(so.CurrWindowList, newIncAggWindow(ctx, now)) + } for _, incWindow := range so.CurrWindowList { - if incWindow.StartTime.Compare(now) <= 0 && incWindow.StartTime.Add(so.Length).After(now) { + if incWindow.StartTime.Compare(now) <= 0 && incWindow.StartTime.Add(so.Length+so.Delay).After(now) { incAggCal(ctx, name, row, incWindow, so.aggFields) } } From d10653bdc9fb9ef92e6bb050b5dd50b609846c7f Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 5 Dec 2024 13:32:45 +0800 Subject: [PATCH 5/6] fix Signed-off-by: Song Gao --- internal/topo/node/window_inc_agg_op.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/internal/topo/node/window_inc_agg_op.go b/internal/topo/node/window_inc_agg_op.go index a5cfdaf90a..87fd173e80 100644 --- a/internal/topo/node/window_inc_agg_op.go +++ b/internal/topo/node/window_inc_agg_op.go @@ -504,7 +504,7 @@ func (so *SlidingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error) case *xsql.Tuple: so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length+so.Delay, now) so.appendIncAggWindow(ctx, errCh, fv, row, now) - if len(so.CurrWindowList) > 0 && so.isMatchCondition(ctx, fv, row) { + if so.isMatchCondition(ctx, fv, row) { if so.Delay > 0 { t := &IncAggOpTask{} go func(task *IncAggOpTask) { @@ -523,19 +523,17 @@ func (so *SlidingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error) } case <-so.taskCh: now := timex.GetNow() - so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length+so.Delay, now) if len(so.CurrWindowList) > 0 { so.emit(ctx, errCh, so.CurrWindowList[0], now) } + so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length+so.Delay, now) } } } func (so *SlidingWindowIncAggOp) appendIncAggWindow(ctx api.StreamContext, errCh chan<- error, fv *xsql.FunctionValuer, row *xsql.Tuple, now time.Time) { name := calDimension(fv, so.Dimensions, row) - if so.isMatchCondition(ctx, fv, row) { - so.CurrWindowList = append(so.CurrWindowList, newIncAggWindow(ctx, now)) - } + so.CurrWindowList = append(so.CurrWindowList, newIncAggWindow(ctx, now)) for _, incWindow := range so.CurrWindowList { if incWindow.StartTime.Compare(now) <= 0 && incWindow.StartTime.Add(so.Length+so.Delay).After(now) { incAggCal(ctx, name, row, incWindow, so.aggFields) From 5f99938ff8fcc1b8990b97609535c8b8ef213536 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 5 Dec 2024 13:45:48 +0800 Subject: [PATCH 6/6] fix Signed-off-by: Song Gao --- internal/topo/node/window_inc_agg_op.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/topo/node/window_inc_agg_op.go b/internal/topo/node/window_inc_agg_op.go index 87fd173e80..c8b9614047 100644 --- a/internal/topo/node/window_inc_agg_op.go +++ b/internal/topo/node/window_inc_agg_op.go @@ -523,10 +523,10 @@ func (so *SlidingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error) } case <-so.taskCh: now := timex.GetNow() + so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length+so.Delay, now) if len(so.CurrWindowList) > 0 { so.emit(ctx, errCh, so.CurrWindowList[0], now) } - so.CurrWindowList = gcIncAggWindow(so.CurrWindowList, so.Length+so.Delay, now) } } }