diff --git a/internal/topo/node/join_align_node.go b/internal/topo/node/join_align_node.go index de1a1e5f04..67665ca6c7 100644 --- a/internal/topo/node/join_align_node.go +++ b/internal/topo/node/join_align_node.go @@ -41,14 +41,7 @@ func NewJoinAlignNode(name string, emitters []string, options *api.RuleOption) ( n := &JoinAlignNode{ batch: batch, } - n.defaultSinkNode = &defaultSinkNode{ - input: make(chan interface{}, options.BufferLength), - defaultNode: &defaultNode{ - outputs: make(map[string]chan<- interface{}), - name: name, - sendError: options.SendError, - }, - } + n.defaultSinkNode = newDefaultSinkNode(name, options) return n, nil } diff --git a/internal/topo/node/lookup_node.go b/internal/topo/node/lookup_node.go index 4c9acc711b..5ad9fec24b 100644 --- a/internal/topo/node/lookup_node.go +++ b/internal/topo/node/lookup_node.go @@ -70,14 +70,7 @@ func NewLookupNode(name string, fields []string, keys []string, joinType ast.Joi joinType: joinType, vals: vals, } - n.defaultSinkNode = &defaultSinkNode{ - input: make(chan interface{}, options.BufferLength), - defaultNode: &defaultNode{ - outputs: make(map[string]chan<- interface{}), - name: name, - sendError: options.SendError, - }, - } + n.defaultSinkNode = newDefaultSinkNode(name, options) return n, nil } diff --git a/internal/topo/node/node.go b/internal/topo/node/node.go index 8f8abb6ae4..adf1911681 100644 --- a/internal/topo/node/node.go +++ b/internal/topo/node/node.go @@ -18,10 +18,12 @@ import ( "fmt" "github.com/lf-edge/ekuiper/internal/binder/io" + "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/internal/topo/checkpoint" "github.com/lf-edge/ekuiper/internal/topo/node/metric" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/cast" ) type OperatorNode interface { @@ -48,7 +50,7 @@ type DataSourceNode interface { type defaultNode struct { name string - outputs map[string]chan<- interface{} + outputs map[string]chan<- any concurrency int sendError bool statManager metric.StatManager @@ -56,6 +58,15 @@ type defaultNode struct { qos api.Qos } +func newDefaultNode(name string, options *api.RuleOption) *defaultNode { + return &defaultNode{ + name: name, + outputs: make(map[string]chan<- any), + concurrency: 1, + sendError: options.SendError, + } +} + func (o *defaultNode) AddOutput(output chan<- interface{}, name string) error { if _, ok := o.outputs[name]; !ok { o.outputs[name] = output @@ -137,11 +148,18 @@ func (o *defaultNode) GetStreamContext() api.StreamContext { type defaultSinkNode struct { *defaultNode - input chan interface{} + input chan any barrierHandler checkpoint.BarrierHandler inputCount int } +func newDefaultSinkNode(name string, options *api.RuleOption) *defaultSinkNode { + return &defaultSinkNode{ + defaultNode: newDefaultNode(name, options), + input: make(chan any, options.BufferLength), + } +} + func (o *defaultSinkNode) GetInput() (chan<- interface{}, string) { return o.input, o.name } @@ -203,3 +221,16 @@ func SinkPing(sinkType string, config map[string]interface{}) error { } return fmt.Errorf("sink %v doesnt't support ping connection", sinkType) } + +func propsToNodeOption(props map[string]any) *api.RuleOption { + options := &api.RuleOption{ + BufferLength: 1024, + SendError: true, + Qos: api.AtLeastOnce, + } + err := cast.MapToStruct(props, options) + if err != nil { + conf.Log.Warnf("fail to parse rule option %v from props", err) + } + return options +} diff --git a/internal/topo/node/operations.go b/internal/topo/node/operations.go index 2cb683eeee..9c794acd78 100644 --- a/internal/topo/node/operations.go +++ b/internal/topo/node/operations.go @@ -45,15 +45,7 @@ type UnaryOperator struct { // New NewUnary creates *UnaryOperator value func New(name string, options *api.RuleOption) *UnaryOperator { return &UnaryOperator{ - defaultSinkNode: &defaultSinkNode{ - input: make(chan interface{}, options.BufferLength), - defaultNode: &defaultNode{ - name: name, - outputs: make(map[string]chan<- interface{}), - concurrency: 1, - sendError: options.SendError, - }, - }, + defaultSinkNode: newDefaultSinkNode(name, options), } } diff --git a/internal/topo/node/sink_node.go b/internal/topo/node/sink_node.go index b04b4215e3..ebda7fe931 100644 --- a/internal/topo/node/sink_node.go +++ b/internal/topo/node/sink_node.go @@ -69,42 +69,20 @@ type SinkNode struct { } func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode { - bufferLength := 1024 - if c, ok := props["bufferLength"]; ok { - if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 { - // invalid property bufferLength - } else { - bufferLength = t - } - } return &SinkNode{ - defaultSinkNode: &defaultSinkNode{ - input: make(chan interface{}, bufferLength), - defaultNode: &defaultNode{ - name: name, - concurrency: 1, - ctx: nil, - }, - }, - sinkType: sinkType, - options: props, + defaultSinkNode: newDefaultSinkNode(name, propsToNodeOption(props)), + sinkType: sinkType, + options: props, } } // NewSinkNodeWithSink Only for mock source, do not use it in production func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode { return &SinkNode{ - defaultSinkNode: &defaultSinkNode{ - input: make(chan interface{}, 1024), - defaultNode: &defaultNode{ - name: name, - concurrency: 1, - ctx: nil, - }, - }, - options: props, - isMock: true, - sink: sink, + defaultSinkNode: newDefaultSinkNode(name, propsToNodeOption(props)), + options: props, + isMock: true, + sink: sink, } } diff --git a/internal/topo/node/source_node.go b/internal/topo/node/source_node.go index 550bc273cc..a059c648b1 100644 --- a/internal/topo/node/source_node.go +++ b/internal/topo/node/source_node.go @@ -42,7 +42,7 @@ type SourceNode struct { IsSchemaless bool } -func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.Options, sendError, isWildcard, isSchemaless bool, schema map[string]*ast.JsonStreamField) *SourceNode { +func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.Options, rOptions *api.RuleOption, isWildcard, isSchemaless bool, schema map[string]*ast.JsonStreamField) *SourceNode { t := options.TYPE if t == "" { if st == ast.TypeStream { @@ -52,14 +52,9 @@ func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast. } } return &SourceNode{ - streamType: st, - sourceType: t, - defaultNode: &defaultNode{ - name: name, - outputs: make(map[string]chan<- interface{}), - concurrency: 1, - sendError: sendError, - }, + streamType: st, + sourceType: t, + defaultNode: newDefaultNode(name, rOptions), preprocessOp: op, options: options, schema: schema, diff --git a/internal/topo/node/source_node_test.go b/internal/topo/node/source_node_test.go index aa2c7de3f3..9ec193754c 100644 --- a/internal/topo/node/source_node_test.go +++ b/internal/topo/node/source_node_test.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf" + "github.com/lf-edge/ekuiper/pkg/api" "github.com/lf-edge/ekuiper/pkg/ast" "github.com/lf-edge/ekuiper/pkg/cast" ) @@ -44,7 +45,7 @@ func TestGetConf_Apply(t *testing.T) { n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{ DATASOURCE: "/feed", TYPE: "httppull", - }, false, false, false, nil) + }, &api.RuleOption{SendError: false}, false, false, nil) conf := nodeConf.GetSourceConf(n.sourceType, n.options) if !reflect.DeepEqual(result, conf) { t.Errorf("result mismatch:\n\nexp=%s\n\ngot=%s\n\n", result, conf) @@ -70,7 +71,7 @@ func TestGetConfAndConvert_Apply(t *testing.T) { n := NewSourceNode("test", ast.TypeStream, nil, &ast.Options{ DATASOURCE: "/feed", TYPE: "httppull", - }, false, false, false, nil) + }, &api.RuleOption{SendError: false}, false, false, nil) conf := nodeConf.GetSourceConf(n.sourceType, n.options) assert.Equal(t, result, conf) diff --git a/internal/topo/node/source_pool_test.go b/internal/topo/node/source_pool_test.go index 1bf7a48399..2f24d34fdb 100644 --- a/internal/topo/node/source_pool_test.go +++ b/internal/topo/node/source_pool_test.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ func TestSourcePool(t *testing.T) { DATASOURCE: "demo", TYPE: "mock", SHARED: true, - }, false, false, false, nil) + }, &api.RuleOption{SendError: false}, false, false, nil) n.concurrency = 2 contextLogger := conf.Log.WithField("rule", "mockRule0") ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger) @@ -39,7 +39,7 @@ func TestSourcePool(t *testing.T) { DATASOURCE: "demo1", TYPE: "mock", SHARED: true, - }, false, false, false, nil) + }, &api.RuleOption{SendError: false}, false, false, nil) contextLogger = conf.Log.WithField("rule", "mockRule1") ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger) @@ -48,7 +48,7 @@ func TestSourcePool(t *testing.T) { n2 := NewSourceNode("test2", ast.TypeStream, nil, &ast.Options{ DATASOURCE: "demo1", TYPE: "mock", - }, false, false, false, nil) + }, &api.RuleOption{SendError: false}, false, false, nil) contextLogger = conf.Log.WithField("rule", "mockRule2") ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger) tempStore, _ = state.CreateStore("mockRule2", api.AtMostOnce) diff --git a/internal/topo/node/switch_node.go b/internal/topo/node/switch_node.go index c3f1d17cfe..bc3fba7390 100644 --- a/internal/topo/node/switch_node.go +++ b/internal/topo/node/switch_node.go @@ -56,21 +56,10 @@ func NewSwitchNode(name string, conf *SwitchConfig, options *api.RuleOption) (*S sn := &SwitchNode{ conf: conf, } - sn.defaultSinkNode = &defaultSinkNode{ - input: make(chan interface{}, options.BufferLength), - defaultNode: &defaultNode{ - outputs: nil, - name: name, - sendError: options.SendError, - }, - } + sn.defaultSinkNode = newDefaultSinkNode(name, options) outputs := make([]defaultNode, len(conf.Cases)) for i := range conf.Cases { - outputs[i] = defaultNode{ - outputs: make(map[string]chan<- interface{}), - name: name + fmt.Sprintf("_%d", i), - sendError: options.SendError, - } + outputs[i] = *newDefaultNode(fmt.Sprintf("name_%d", i), options) } sn.outputNodes = outputs return sn, nil diff --git a/internal/topo/node/watermark_op.go b/internal/topo/node/watermark_op.go index b2ce552379..43a0a79539 100644 --- a/internal/topo/node/watermark_op.go +++ b/internal/topo/node/watermark_op.go @@ -53,17 +53,10 @@ func NewWatermarkOp(name string, sendWatermark bool, streams []string, options * wms[s] = options.LateTol } return &WatermarkOp{ - defaultSinkNode: &defaultSinkNode{ - input: make(chan interface{}, options.BufferLength), - defaultNode: &defaultNode{ - outputs: make(map[string]chan<- interface{}), - name: name, - sendError: options.SendError, - }, - }, - lateTolerance: options.LateTol, - sendWatermark: sendWatermark, - streamWMs: wms, + defaultSinkNode: newDefaultSinkNode(name, options), + lateTolerance: options.LateTol, + sendWatermark: sendWatermark, + streamWMs: wms, } } diff --git a/internal/topo/node/window_op.go b/internal/topo/node/window_op.go index 71bde2cd52..b428cc5db0 100644 --- a/internal/topo/node/window_op.go +++ b/internal/topo/node/window_op.go @@ -74,14 +74,7 @@ func init() { func NewWindowOp(name string, w WindowConfig, options *api.RuleOption) (*WindowOperator, error) { o := new(WindowOperator) - o.defaultSinkNode = &defaultSinkNode{ - input: make(chan interface{}, options.BufferLength), - defaultNode: &defaultNode{ - outputs: make(map[string]chan<- interface{}), - name: name, - sendError: options.SendError, - }, - } + o.defaultSinkNode = newDefaultSinkNode(name, options) o.isEventTime = options.IsEventTime o.window = &w if o.window.Interval == 0 && o.window.Type == ast.COUNT_WINDOW { diff --git a/internal/topo/planner/planner.go b/internal/topo/planner/planner.go index 050dcea95c..5eed2a6bee 100644 --- a/internal/topo/planner/planner.go +++ b/internal/topo/planner/planner.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -289,7 +289,7 @@ func transformSourceNode(t *DataSourcePlan, mockSourcesProp map[string]map[strin return nil, err } } - srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError, t.isWildCard, t.isSchemaless, t.streamFields) + srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options, t.isWildCard, t.isSchemaless, t.streamFields) if isMock { srcNode.SetProps(mockSourceConf) } @@ -304,7 +304,7 @@ func transformSourceNode(t *DataSourcePlan, mockSourcesProp map[string]map[strin if t.isSchemaless { schema = nil } - srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError, t.isWildCard, t.isSchemaless, schema) + srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options, t.isWildCard, t.isSchemaless, schema) if isMock { srcNode.SetProps(mockSourceConf) } diff --git a/internal/topo/planner/planner_graph.go b/internal/topo/planner/planner_graph.go index f8614b95b8..c44dc75460 100644 --- a/internal/topo/planner/planner_graph.go +++ b/internal/topo/planner/planner_graph.go @@ -511,7 +511,7 @@ func parseSource(nodeName string, gn *api.GraphNode, rule *api.Rule, store kv.Ke if err != nil { return nil, ILLEGAL, "", err } - srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError, false, false, nil) + srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options, false, false, nil) return srcNode, STREAM, nodeName, nil case "table": return nil, ILLEGAL, "", fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule") diff --git a/internal/topo/planner/planner_test.go b/internal/topo/planner/planner_test.go index 17ebfb1b21..948fe70a45 100644 --- a/internal/topo/planner/planner_test.go +++ b/internal/topo/planner/planner_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -4749,7 +4749,7 @@ func TestTransformSourceNode(t *testing.T) { }, node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{ TYPE: "file", - }, false, false, false, nil), + }, &api.RuleOption{SendError: false}, false, false, nil), }, { name: "schema source node", @@ -4769,7 +4769,7 @@ func TestTransformSourceNode(t *testing.T) { }, node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{ TYPE: "file", - }, false, false, false, schema), + }, &api.RuleOption{SendError: false}, false, false, schema), }, } for _, tc := range testCases {