Skip to content

Commit

Permalink
refactor(node): generalize embed node creation
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Jan 25, 2024
1 parent d281deb commit a687968
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 111 deletions.
9 changes: 1 addition & 8 deletions internal/topo/node/join_align_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,7 @@ func NewJoinAlignNode(name string, emitters []string, options *api.RuleOption) (
n := &JoinAlignNode{
batch: batch,
}
n.defaultSinkNode = &defaultSinkNode{
input: make(chan interface{}, options.BufferLength),
defaultNode: &defaultNode{
outputs: make(map[string]chan<- interface{}),
name: name,
sendError: options.SendError,
},
}
n.defaultSinkNode = newDefaultSinkNode(name, options)
return n, nil
}

Expand Down
9 changes: 1 addition & 8 deletions internal/topo/node/lookup_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ func NewLookupNode(name string, fields []string, keys []string, joinType ast.Joi
joinType: joinType,
vals: vals,
}
n.defaultSinkNode = &defaultSinkNode{
input: make(chan interface{}, options.BufferLength),
defaultNode: &defaultNode{
outputs: make(map[string]chan<- interface{}),
name: name,
sendError: options.SendError,
},
}
n.defaultSinkNode = newDefaultSinkNode(name, options)
return n, nil
}

Expand Down
35 changes: 33 additions & 2 deletions internal/topo/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"fmt"

"github.com/lf-edge/ekuiper/internal/binder/io"
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
"github.com/lf-edge/ekuiper/internal/topo/node/metric"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/cast"
)

type OperatorNode interface {
Expand All @@ -48,14 +50,23 @@ type DataSourceNode interface {

type defaultNode struct {
name string
outputs map[string]chan<- interface{}
outputs map[string]chan<- any
concurrency int
sendError bool
statManager metric.StatManager
ctx api.StreamContext
qos api.Qos
}

func newDefaultNode(name string, options *api.RuleOption) *defaultNode {
return &defaultNode{
name: name,
outputs: make(map[string]chan<- any),
concurrency: 1,
sendError: options.SendError,
}
}

func (o *defaultNode) AddOutput(output chan<- interface{}, name string) error {
if _, ok := o.outputs[name]; !ok {
o.outputs[name] = output
Expand Down Expand Up @@ -137,11 +148,18 @@ func (o *defaultNode) GetStreamContext() api.StreamContext {

type defaultSinkNode struct {
*defaultNode
input chan interface{}
input chan any
barrierHandler checkpoint.BarrierHandler
inputCount int
}

func newDefaultSinkNode(name string, options *api.RuleOption) *defaultSinkNode {
return &defaultSinkNode{
defaultNode: newDefaultNode(name, options),
input: make(chan any, options.BufferLength),
}
}

func (o *defaultSinkNode) GetInput() (chan<- interface{}, string) {
return o.input, o.name
}
Expand Down Expand Up @@ -203,3 +221,16 @@ func SinkPing(sinkType string, config map[string]interface{}) error {
}
return fmt.Errorf("sink %v doesnt't support ping connection", sinkType)
}

func propsToNodeOption(props map[string]any) *api.RuleOption {
options := &api.RuleOption{
BufferLength: 1024,
SendError: true,
Qos: api.AtLeastOnce,
}
err := cast.MapToStruct(props, options)
if err != nil {
conf.Log.Warnf("fail to parse rule option %v from props", err)
}
return options
}
10 changes: 1 addition & 9 deletions internal/topo/node/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,7 @@ type UnaryOperator struct {
// New NewUnary creates *UnaryOperator value
func New(name string, options *api.RuleOption) *UnaryOperator {
return &UnaryOperator{
defaultSinkNode: &defaultSinkNode{
input: make(chan interface{}, options.BufferLength),
defaultNode: &defaultNode{
name: name,
outputs: make(map[string]chan<- interface{}),
concurrency: 1,
sendError: options.SendError,
},
},
defaultSinkNode: newDefaultSinkNode(name, options),
}
}

Expand Down
36 changes: 7 additions & 29 deletions internal/topo/node/sink_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,42 +69,20 @@ type SinkNode struct {
}

func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
bufferLength := 1024
if c, ok := props["bufferLength"]; ok {
if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 {
// invalid property bufferLength
} else {
bufferLength = t
}
}
return &SinkNode{
defaultSinkNode: &defaultSinkNode{
input: make(chan interface{}, bufferLength),
defaultNode: &defaultNode{
name: name,
concurrency: 1,
ctx: nil,
},
},
sinkType: sinkType,
options: props,
defaultSinkNode: newDefaultSinkNode(name, propsToNodeOption(props)),
sinkType: sinkType,
options: props,
}
}

// NewSinkNodeWithSink Only for mock source, do not use it in production
func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode {
return &SinkNode{
defaultSinkNode: &defaultSinkNode{
input: make(chan interface{}, 1024),
defaultNode: &defaultNode{
name: name,
concurrency: 1,
ctx: nil,
},
},
options: props,
isMock: true,
sink: sink,
defaultSinkNode: newDefaultSinkNode(name, propsToNodeOption(props)),
options: props,
isMock: true,
sink: sink,
}
}

Expand Down
13 changes: 4 additions & 9 deletions internal/topo/node/source_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type SourceNode struct {
IsSchemaless bool
}

func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.Options, sendError, isWildcard, isSchemaless bool, schema map[string]*ast.JsonStreamField) *SourceNode {
func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.Options, rOptions *api.RuleOption, isWildcard, isSchemaless bool, schema map[string]*ast.JsonStreamField) *SourceNode {
t := options.TYPE
if t == "" {
if st == ast.TypeStream {
Expand All @@ -52,14 +52,9 @@ func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.
}
}
return &SourceNode{
streamType: st,
sourceType: t,
defaultNode: &defaultNode{
name: name,
outputs: make(map[string]chan<- interface{}),
concurrency: 1,
sendError: sendError,
},
streamType: st,
sourceType: t,
defaultNode: newDefaultNode(name, rOptions),
preprocessOp: op,
options: options,
schema: schema,
Expand Down
7 changes: 4 additions & 3 deletions internal/topo/node/source_node_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"

nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/cast"
)
Expand All @@ -44,7 +45,7 @@ func TestGetConf_Apply(t *testing.T) {
n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
DATASOURCE: "/feed",
TYPE: "httppull",
}, false, false, false, nil)
}, &api.RuleOption{SendError: false}, false, false, nil)
conf := nodeConf.GetSourceConf(n.sourceType, n.options)
if !reflect.DeepEqual(result, conf) {
t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf)
Expand All @@ -70,7 +71,7 @@ func TestGetConfAndConvert_Apply(t *testing.T) {
n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
DATASOURCE: "/feed",
TYPE: "httppull",
}, false, false, false, nil)
}, &api.RuleOption{SendError: false}, false, false, nil)
conf := nodeConf.GetSourceConf(n.sourceType, n.options)
assert.Equal(t, result, conf)

Expand Down
8 changes: 4 additions & 4 deletions internal/topo/node/source_pool_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,7 @@ func TestSourcePool(t *testing.T) {
DATASOURCE: "demo",
TYPE: "mock",
SHARED: true,
}, false, false, false, nil)
}, &api.RuleOption{SendError: false}, false, false, nil)
n.concurrency = 2
contextLogger := conf.Log.WithField("rule", "mockRule0")
ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
Expand All @@ -39,7 +39,7 @@ func TestSourcePool(t *testing.T) {
DATASOURCE: "demo1",
TYPE: "mock",
SHARED: true,
}, false, false, false, nil)
}, &api.RuleOption{SendError: false}, false, false, nil)

contextLogger = conf.Log.WithField("rule", "mockRule1")
ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
Expand All @@ -48,7 +48,7 @@ func TestSourcePool(t *testing.T) {
n2 := NewSourceNode("test2", ast.TypeStream, nil, &ast.Options{
DATASOURCE: "demo1",
TYPE: "mock",
}, false, false, false, nil)
}, &api.RuleOption{SendError: false}, false, false, nil)
contextLogger = conf.Log.WithField("rule", "mockRule2")
ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
tempStore, _ = state.CreateStore("mockRule2", api.AtMostOnce)
Expand Down
15 changes: 2 additions & 13 deletions internal/topo/node/switch_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,10 @@ func NewSwitchNode(name string, conf *SwitchConfig, options *api.RuleOption) (*S
sn := &SwitchNode{
conf: conf,
}
sn.defaultSinkNode = &defaultSinkNode{
input: make(chan interface{}, options.BufferLength),
defaultNode: &defaultNode{
outputs: nil,
name: name,
sendError: options.SendError,
},
}
sn.defaultSinkNode = newDefaultSinkNode(name, options)
outputs := make([]defaultNode, len(conf.Cases))
for i := range conf.Cases {
outputs[i] = defaultNode{
outputs: make(map[string]chan<- interface{}),
name: name + fmt.Sprintf("_%d", i),
sendError: options.SendError,
}
outputs[i] = *newDefaultNode(fmt.Sprintf("name_%d", i), options)
}
sn.outputNodes = outputs
return sn, nil
Expand Down
15 changes: 4 additions & 11 deletions internal/topo/node/watermark_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,10 @@ func NewWatermarkOp(name string, sendWatermark bool, streams []string, options *
wms[s] = options.LateTol
}
return &WatermarkOp{
defaultSinkNode: &defaultSinkNode{
input: make(chan interface{}, options.BufferLength),
defaultNode: &defaultNode{
outputs: make(map[string]chan<- interface{}),
name: name,
sendError: options.SendError,
},
},
lateTolerance: options.LateTol,
sendWatermark: sendWatermark,
streamWMs: wms,
defaultSinkNode: newDefaultSinkNode(name, options),
lateTolerance: options.LateTol,
sendWatermark: sendWatermark,
streamWMs: wms,
}
}

Expand Down
9 changes: 1 addition & 8 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,7 @@ func init() {
func NewWindowOp(name string, w WindowConfig, options *api.RuleOption) (*WindowOperator, error) {
o := new(WindowOperator)

o.defaultSinkNode = &defaultSinkNode{
input: make(chan interface{}, options.BufferLength),
defaultNode: &defaultNode{
outputs: make(map[string]chan<- interface{}),
name: name,
sendError: options.SendError,
},
}
o.defaultSinkNode = newDefaultSinkNode(name, options)
o.isEventTime = options.IsEventTime
o.window = &w
if o.window.Interval == 0 && o.window.Type == ast.COUNT_WINDOW {
Expand Down
6 changes: 3 additions & 3 deletions internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -289,7 +289,7 @@ func transformSourceNode(t *DataSourcePlan, mockSourcesProp map[string]map[strin
return nil, err
}
}
srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError, t.isWildCard, t.isSchemaless, t.streamFields)
srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options, t.isWildCard, t.isSchemaless, t.streamFields)
if isMock {
srcNode.SetProps(mockSourceConf)
}
Expand All @@ -304,7 +304,7 @@ func transformSourceNode(t *DataSourcePlan, mockSourcesProp map[string]map[strin
if t.isSchemaless {
schema = nil
}
srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError, t.isWildCard, t.isSchemaless, schema)
srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options, t.isWildCard, t.isSchemaless, schema)
if isMock {
srcNode.SetProps(mockSourceConf)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/planner/planner_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.Ke
if err != nil {
return nil, ILLEGAL, "", err
}
srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError, false, false, nil)
srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options, false, false, nil)
return srcNode, STREAM, nodeName, nil
case "table":
return nil, ILLEGAL, "", fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")
Expand Down
6 changes: 3 additions & 3 deletions internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -4749,7 +4749,7 @@ func TestTransformSourceNode(t *testing.T) {
},
node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
TYPE: "file",
}, false, false, false, nil),
}, &api.RuleOption{SendError: false}, false, false, nil),
},
{
name: "schema source node",
Expand All @@ -4769,7 +4769,7 @@ func TestTransformSourceNode(t *testing.T) {
},
node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
TYPE: "file",
}, false, false, false, schema),
}, &api.RuleOption{SendError: false}, false, false, schema),
},
}
for _, tc := range testCases {
Expand Down

0 comments on commit a687968

Please sign in to comment.