Skip to content

Commit

Permalink
feat(planner): plan decompressor for source
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Jan 31, 2024
1 parent bc4af8f commit 671e9ee
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 28 deletions.
16 changes: 4 additions & 12 deletions internal/topo/node/source_connector_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ package node
import (
"fmt"

nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
"github.com/lf-edge/ekuiper/internal/topo/node/metric"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/infra"
)

Expand All @@ -36,25 +34,19 @@ type SourceConnectorNode struct {
}

// NewSourceConnectorNode creates a SourceConnectorNode
func NewSourceConnectorNode(name string, ss api.SourceConnector, options *ast.Options, rOpt *api.RuleOption) (*SourceConnectorNode, error) {
func NewSourceConnectorNode(name string, ss api.SourceConnector, dataSource string, props map[string]any, rOpt *api.RuleOption) (*SourceConnectorNode, error) {
m := &SourceConnectorNode{
defaultNode: newDefaultNode(name, rOpt),
s: ss,
buffLen: rOpt.BufferLength,
}
return m, m.setup(options)
return m, m.setup(dataSource, props)
}

// Setup read configuration and validate and initialize the sourceConnector
func (m *SourceConnectorNode) setup(options *ast.Options) error {
t := options.TYPE
if t == "" {
t = "mqtt"
}
// Get configurations
props := nodeConf.GetSourceConf(t, options)
func (m *SourceConnectorNode) setup(dataSource string, props map[string]any) error {
// Initialize sourceConnector
err := m.s.Configure(options.DATASOURCE, props)
err := m.s.Configure(dataSource, props)
if err != nil {
return err
}
Expand Down
17 changes: 4 additions & 13 deletions internal/topo/node/source_connector_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
)

func TestSCNLC(t *testing.T) {
Expand Down Expand Up @@ -58,9 +57,7 @@ func TestSCNLC(t *testing.T) {
[]byte("world"),
},
}
scn, err := NewSourceConnectorNode("mock_connector", sc, &ast.Options{
DATASOURCE: "demo",
}, &api.RuleOption{
scn, err := NewSourceConnectorNode("mock_connector", sc, "demo", map[string]any{}, &api.RuleOption{
BufferLength: 1024,
SendError: true,
})
Expand Down Expand Up @@ -113,9 +110,7 @@ func TestNewError(t *testing.T) {
[]byte("world"),
},
}
_, err := NewSourceConnectorNode("mock_connector", sc, &ast.Options{
DATASOURCE: "",
}, &api.RuleOption{
_, err := NewSourceConnectorNode("mock_connector", sc, "", map[string]any{}, &api.RuleOption{
BufferLength: 1024,
SendError: true,
})
Expand All @@ -127,9 +122,7 @@ func TestConnError(t *testing.T) {
var sc api.SourceConnector = &MockSourceConnector{
data: nil, // nil data to produce mock connect error
}
scn, err := NewSourceConnectorNode("mock_connector", sc, &ast.Options{
DATASOURCE: "demo2",
}, &api.RuleOption{
scn, err := NewSourceConnectorNode("mock_connector", sc, "demo2", map[string]any{}, &api.RuleOption{
BufferLength: 1024,
SendError: true,
})
Expand Down Expand Up @@ -172,9 +165,7 @@ func TestSubError(t *testing.T) {
[]byte("world"),
},
}
scn, err := NewSourceConnectorNode("mock_connector", sc, &ast.Options{
DATASOURCE: "demo2",
}, &api.RuleOption{
scn, err := NewSourceConnectorNode("mock_connector", sc, "demo2", map[string]any{}, &api.RuleOption{
BufferLength: 1024,
SendError: true,
})
Expand Down
27 changes: 25 additions & 2 deletions internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
"github.com/lf-edge/ekuiper/internal/topo"
"github.com/lf-edge/ekuiper/internal/topo/node"
nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
"github.com/lf-edge/ekuiper/internal/topo/operator"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/cast"
"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/kv"
)
Expand Down Expand Up @@ -363,20 +365,41 @@ func transformSourceNode(t *DataSourcePlan, mockSourcesProp map[string]map[strin
return nil, nil, fmt.Errorf("unknown stream type %d", t.streamStmt.StreamType)
}

type SourcePropsForSplit struct {
Decompression string `json:"decompression"`
}

func splitSource(t *DataSourcePlan, ss api.SourceConnector, options *api.RuleOption, index int, ruleId string, pp node.UnOperation) (*node.SourceConnectorNode, []node.OperatorNode, error) {
// Get all props
props := nodeConf.GetSourceConf(t.streamStmt.Options.TYPE, t.streamStmt.Options)
sp := &SourcePropsForSplit{}
_ = cast.MapToStruct(props, sp)

// Create the connector node as source node
srcConnNode, err := node.NewSourceConnectorNode(string(t.name), ss, t.streamStmt.Options, options)
srcConnNode, err := node.NewSourceConnectorNode(string(t.name), ss, t.streamStmt.Options.DATASOURCE, props, options)
if err != nil {
return nil, nil, err
}
index++
var ops []node.OperatorNode

if sp.Decompression != "" {
dco, err := node.NewDecompressOp(fmt.Sprintf("%d_decompress", index), options, sp.Decompression)
if err != nil {
return nil, nil, err
}

Check warning on line 390 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L389-L390

Added lines #L389 - L390 were not covered by tests
index++
ops = append(ops, dco)
}

// Create the decode node
decodeNode, err := node.NewDecodeOp(fmt.Sprintf("%d_decoder", index), ruleId, options, t.streamStmt.Options, t.isWildCard, t.isSchemaless, t.streamFields)
if err != nil {
return nil, nil, err
}
index++
ops := []node.OperatorNode{decodeNode}
ops = append(ops, decodeNode)

// Create the preprocessor node if needed
if pp != nil {
ops = append(ops, Transform(pp, fmt.Sprintf("%d_preprocessor", index), options))
Expand Down
56 changes: 55 additions & 1 deletion internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@ package planner
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"testing"

"github.com/gdexlab/go-render/render"
"github.com/stretchr/testify/assert"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/io/mqtt"
"github.com/lf-edge/ekuiper/internal/meta"
"github.com/lf-edge/ekuiper/internal/pkg/store"
"github.com/lf-edge/ekuiper/internal/testx"
"github.com/lf-edge/ekuiper/internal/topo/node"
nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
Expand Down Expand Up @@ -4722,15 +4727,41 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
}

func TestTransformSourceNode(t *testing.T) {
// add decompression for meta
a1 := map[string]interface{}{
"decompression": "gzip",
}
bs, err := json.Marshal(a1)
assert.NoError(t, err)
meta.InitYamlConfigManager()
dataDir, _ := conf.GetDataLoc()
err = os.MkdirAll(filepath.Join(dataDir, "sources"), 0o755)
assert.NoError(t, err)
err = meta.AddSourceConfKey("mqtt", "testCom", "", bs)
assert.NoError(t, err)
defer func() {
err = meta.DelSourceConfKey("mqtt", "testCom", "")
assert.NoError(t, err)
}()
// create expected nodes
schema := map[string]*ast.JsonStreamField{
"a": {
Type: "bigint",
},
}
srcNode, err := node.NewSourceConnectorNode("test", &mqtt.SourceConnector{}, &ast.Options{TYPE: "mqtt"}, &api.RuleOption{SendError: false})
props := nodeConf.GetSourceConf("mqtt", &ast.Options{TYPE: "mqtt"})
srcNode, err := node.NewSourceConnectorNode("test", &mqtt.SourceConnector{}, "", props, &api.RuleOption{SendError: false})
assert.NoError(t, err)
decodeNode, err := node.NewDecodeOp("2_decoder", "test", &api.RuleOption{SendError: false}, &ast.Options{TYPE: "mqtt"}, false, false, schema)
assert.NoError(t, err)
decomNode, err := node.NewDecompressOp("2_decompressor", &api.RuleOption{SendError: false}, "gzip")
assert.NoError(t, err)
decodeNode2, err := node.NewDecodeOp("3_decoder", "test", &api.RuleOption{SendError: false}, &ast.Options{TYPE: "mqtt"}, false, false, schema)
assert.NoError(t, err)
props2 := nodeConf.GetSourceConf("mqtt", &ast.Options{TYPE: "mqtt", CONF_KEY: "testCom"})
srcNode2, err := node.NewSourceConnectorNode("test", &mqtt.SourceConnector{}, "", props2, &api.RuleOption{SendError: false})
assert.NoError(t, err)

testCases := []struct {
name string
plan *DataSourcePlan
Expand Down Expand Up @@ -4798,6 +4829,29 @@ func TestTransformSourceNode(t *testing.T) {
decodeNode,
},
},
{
name: "split source node with decompression",
plan: &DataSourcePlan{
name: "test",
streamStmt: &ast.StreamStmt{
StreamType: ast.TypeStream,
Options: &ast.Options{
TYPE: "mqtt",
CONF_KEY: "testCom",
},
},
streamFields: schema,
allMeta: false,
metaFields: []string{},
iet: false,
isBinary: false,
},
node: srcNode2,
ops: []node.OperatorNode{
decomNode,
decodeNode2,
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down

0 comments on commit 671e9ee

Please sign in to comment.