Skip to content

Commit

Permalink
refactor(xsql): unify row and collection
Browse files Browse the repository at this point in the history
- Each node now only needs to handle two kinds of data Row/Collection. Other data may be error, watermark, barrier

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Mar 13, 2024
1 parent d0a6024 commit d5bb84f
Show file tree
Hide file tree
Showing 37 changed files with 844 additions and 861 deletions.
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,7 @@ github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0=
github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle/v2 v2.1.2 h1:0f7vaaXINONKTsxYDn4otOAiJanX/BMeAtY//BXqzlg=
github.com/jackc/puddle/v2 v2.1.2/go.mod h1:2lpufsF5mRHO6SuZkm0fNYxM6SWHfvyFj62KwNzgels=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
Expand Down Expand Up @@ -1440,6 +1441,7 @@ golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
Expand Down
26 changes: 14 additions & 12 deletions internal/topo/node/join_align_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,19 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
n.alignBatch(ctx, d)
} else { // table window
log.Debugf("JoinAlignNode receive batch source %s", d)
emitter := d.Content[0].GetEmitter()
// Buffer and update batch inputs
_, ok := n.batch[emitter]
if !ok {
e := fmt.Errorf("run JoinAlignNode error: receive batch input from unknown emitter %[1]T(%[1]v)", d)
n.Broadcast(e)
n.statManager.IncTotalExceptions(e.Error())
break
if et, ok := d.Content[0].(xsql.EmittedData); ok {
emitter := et.GetEmitter()
// Buffer and update batch inputs
_, ok := n.batch[emitter]
if !ok {
e := fmt.Errorf("run JoinAlignNode error: receive batch input from unknown emitter %[1]T(%[1]v)", d)
n.Broadcast(e)
n.statManager.IncTotalExceptions(e.Error())
break

Check warning on line 111 in internal/topo/node/join_align_node.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/node/join_align_node.go#L108-L111

Added lines #L108 - L111 were not covered by tests
}
n.batch[emitter] = convertToTupleSlice(d.Content)
_ = ctx.PutState(BatchKey, n.batch)
}
n.batch[emitter] = convertToTupleSlice(d.Content)
_ = ctx.PutState(BatchKey, n.batch)
}
default:
e := fmt.Errorf("run JoinAlignNode error: invalid input type but got %[1]T(%[1]v)", d)
Expand All @@ -129,7 +131,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
}()
}

func convertToTupleSlice(content []xsql.TupleRow) []*xsql.Tuple {
func convertToTupleSlice(content []xsql.Row) []*xsql.Tuple {
tuples := make([]*xsql.Tuple, len(content))
for i, v := range content {
tuples[i] = v.(*xsql.Tuple)
Expand All @@ -143,7 +145,7 @@ func (n *JoinAlignNode) alignBatch(_ api.StreamContext, input any) {
switch t := input.(type) {
case *xsql.Tuple:
w = &xsql.WindowTuples{
Content: make([]xsql.TupleRow, 0),
Content: make([]xsql.Row, 0),
}
w.AddTuple(t)
case *xsql.WindowTuples:
Expand Down
8 changes: 4 additions & 4 deletions internal/topo/node/lookup_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
n.statManager.IncTotalExceptions(d.Error())
case *xsql.WatermarkTuple:
n.Broadcast(d)
case xsql.TupleRow:
case xsql.Row:
log.Debugf("Lookup Node receive tuple input %s", d)
n.statManager.ProcessTimeStart()
sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
Expand All @@ -134,7 +134,7 @@ func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
n.statManager.ProcessTimeStart()
sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0), WindowRange: item.(*xsql.WindowTuples).GetWindowRange()}
err := d.Range(func(i int, r xsql.ReadonlyRow) (bool, error) {
tr, ok := r.(xsql.TupleRow)
tr, ok := r.(xsql.Row)
if !ok {
return false, fmt.Errorf("Invalid window element, must be a tuple row but got %v", r)
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
}

// lookup will lookup the cache firstly, if expires, read the external source
func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.TupleRow, fv *xsql.FunctionValuer, ns api.LookupSource, tuples *xsql.JoinTuples, c *cache.Cache) error {
func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.Row, fv *xsql.FunctionValuer, ns api.LookupSource, tuples *xsql.JoinTuples, c *cache.Cache) error {
ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
cvs := make([]interface{}, len(n.vals))
hasNil := false
Expand Down Expand Up @@ -230,7 +230,7 @@ func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.TupleRow, fv *xsql.Fun
}
}

func (n *LookupNode) merge(ctx api.StreamContext, d xsql.TupleRow, r []map[string]interface{}) {
func (n *LookupNode) merge(ctx api.StreamContext, d xsql.Row, r []map[string]interface{}) {

Check warning on line 233 in internal/topo/node/lookup_node.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/node/lookup_node.go#L233

Added line #L233 was not covered by tests
n.statManager.ProcessTimeStart()
sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}

Expand Down
26 changes: 13 additions & 13 deletions internal/topo/node/lookup_node_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestLookup(t *testing.T) {
output: &xsql.JoinTuples{
Content: []*xsql.JoinTuple{
{
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -162,7 +162,7 @@ func TestLookup(t *testing.T) {
},
},
}, {
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -184,7 +184,7 @@ func TestLookup(t *testing.T) {
},
{
input: &xsql.WindowTuples{
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -205,7 +205,7 @@ func TestLookup(t *testing.T) {
output: &xsql.JoinTuples{
Content: []*xsql.JoinTuple{
{
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -222,7 +222,7 @@ func TestLookup(t *testing.T) {
},
},
}, {
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -239,7 +239,7 @@ func TestLookup(t *testing.T) {
},
},
}, {
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -256,7 +256,7 @@ func TestLookup(t *testing.T) {
},
},
}, {
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -273,7 +273,7 @@ func TestLookup(t *testing.T) {
},
},
}, {
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -290,7 +290,7 @@ func TestLookup(t *testing.T) {
},
},
}, {
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestCachedLookup(t *testing.T) {
outputBefore := &xsql.JoinTuples{
Content: []*xsql.JoinTuple{
{
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -424,7 +424,7 @@ func TestCachedLookup(t *testing.T) {
},
},
}, {
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand All @@ -447,7 +447,7 @@ func TestCachedLookup(t *testing.T) {
outputAfter := &xsql.JoinTuples{
Content: []*xsql.JoinTuple{
{
Tuples: []xsql.TupleRow{
Tuples: []xsql.Row{
&xsql.Tuple{
Emitter: "demo",
Message: map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (o *defaultNode) doBroadcast(val interface{}) {
case xsql.Collection:
val = vt.Clone()
break
case xsql.TupleRow:
case xsql.Row:
val = vt.Clone()
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/node/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
o.statManager.IncTotalMessagesProcessed(1)
o.statManager.IncTotalExceptions(val.Error())
continue
case []xsql.TupleRow:
case []xsql.Row:
o.statManager.ProcessTimeEnd()
for _, v := range val {
o.Broadcast(v)
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/node/sink_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func Test_itemToMap(t *testing.T) {
{
name: "test4",
args: args{
item: xsql.Collection(&xsql.WindowTuples{Content: []xsql.TupleRow{
item: xsql.Collection(&xsql.WindowTuples{Content: []xsql.Row{
&xsql.Tuple{Emitter: "a", Message: map[string]interface{}{"a": 1, "b": "2"}, Timestamp: conf.GetNowInMilli(), Metadata: nil},
}}),
},
Expand Down
15 changes: 11 additions & 4 deletions internal/topo/node/switch_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,20 @@ func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error) {
n.statManager.IncTotalExceptions(d.Error())
case *xsql.WatermarkTuple:
n.Broadcast(d)
case xsql.TupleRow:
case xsql.Row:
ctx.GetLogger().Debugf("SwitchNode receive tuple input %s", d)
ve = &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
case xsql.SingleCollection:
case xsql.Collection:
ctx.GetLogger().Debugf("SwitchNode receive window input %s", d)
afv.SetData(d)
ve = &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(d, fv, d, fv, afv, &xsql.WildcardValuer{Data: d})}
if cr, ok := d.(xsql.CollectionRow); ok {
afv.SetData(cr)
ve = &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(cr, fv, cr, fv, afv, &xsql.WildcardValuer{Data: cr})}
} else {
e := fmt.Errorf("run switch node error: invalid input type but got %[1]T(%[1]v)", d)
n.Broadcast(e)
n.statManager.IncTotalExceptions(e.Error())
break

Check warning on line 112 in internal/topo/node/switch_node.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/node/switch_node.go#L109-L112

Added lines #L109 - L112 were not covered by tests
}
default:
e := fmt.Errorf("run switch node error: invalid input type but got %[1]T(%[1]v)", d)
n.Broadcast(e)
Expand Down
14 changes: 7 additions & 7 deletions internal/topo/node/switch_node_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -209,7 +209,7 @@ outterFor:
func TestCollection(t *testing.T) {
inputs := []*xsql.WindowTuples{
{
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f1": "v1",
Expand All @@ -224,7 +224,7 @@ func TestCollection(t *testing.T) {
},
},
}, {
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f1": "v2",
Expand All @@ -245,7 +245,7 @@ func TestCollection(t *testing.T) {
},
},
}, {
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f1": "v1",
Expand All @@ -270,7 +270,7 @@ func TestCollection(t *testing.T) {
outputs := [][]*xsql.WindowTuples{
{ // avg(f2) > 50
{
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f1": "v1",
Expand All @@ -285,7 +285,7 @@ func TestCollection(t *testing.T) {
},
},
}, {
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f1": "v1",
Expand All @@ -309,7 +309,7 @@ func TestCollection(t *testing.T) {
},
{ // else
{
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f1": "v2",
Expand Down
6 changes: 3 additions & 3 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func (tl *TupleList) count() int {

func (tl *TupleList) nextCountWindow() *xsql.WindowTuples {
results := &xsql.WindowTuples{
Content: make([]xsql.TupleRow, 0),
Content: make([]xsql.Row, 0),
}
var subT []*xsql.Tuple
subT = tl.tuples[len(tl.tuples)-tl.size : len(tl.tuples)]
Expand Down Expand Up @@ -533,15 +533,15 @@ func (o *WindowOperator) isTimeRelatedWindow() bool {
return false
}

func (o *WindowOperator) handleInputs(inputs []*xsql.Tuple, triggerTime int64, ctx api.StreamContext) ([]*xsql.Tuple, []xsql.TupleRow) {
func (o *WindowOperator) handleInputs(inputs []*xsql.Tuple, triggerTime int64, ctx api.StreamContext) ([]*xsql.Tuple, []xsql.Row) {
log := ctx.GetLogger()
log.Debugf("window %s triggered at %s(%d)", o.name, time.Unix(triggerTime/1000, triggerTime%1000), triggerTime)
var delta int64
length := o.window.Length + o.window.Delay
if o.window.Type == ast.HOPPING_WINDOW || o.window.Type == ast.SLIDING_WINDOW {
delta = o.calDelta(triggerTime, log)
}
content := make([]xsql.TupleRow, 0)
content := make([]xsql.Row, 0)
i := 0
// Sync table
for _, tuple := range inputs {
Expand Down
8 changes: 4 additions & 4 deletions internal/topo/node/window_op_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestCountWindow(t *testing.T) {
expWinCount: 1,
winTupleSets: []xsql.WindowTuples{
{
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f1": "v1",
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestCountWindow(t *testing.T) {
expWinCount: 1,
winTupleSets: []xsql.WindowTuples{
{
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f3": "v3",
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestCountWindow(t *testing.T) {
expWinCount: 1,
winTupleSets: []xsql.WindowTuples{
{
Content: []xsql.TupleRow{
Content: []xsql.Row{
&xsql.Tuple{
Message: map[string]interface{}{
"f4": "v4",
Expand Down
Loading

0 comments on commit d5bb84f

Please sign in to comment.