Skip to content

Commit

Permalink
feat: add alias reference calculate swtich (#3477) (#3483)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Co-authored-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
ngjaying and Yisaer authored Dec 26, 2024
1 parent 374bd7d commit 09ba4e4
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 47 deletions.
44 changes: 28 additions & 16 deletions internal/pkg/def/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,34 @@ import (
)

type RuleOption struct {
Debug bool `json:"debug" yaml:"debug"`
LogFilename string `json:"logFilename,omitempty" yaml:"logFilename,omitempty"`
IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
LateTol cast.DurationConf `json:"lateTolerance,omitempty" yaml:"lateTolerance,omitempty"`
Concurrency int `json:"concurrency" yaml:"concurrency"`
BufferLength int `json:"bufferLength" yaml:"bufferLength"`
SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
SendNil bool `json:"sendNilField" yaml:"sendNilField"`
SendError bool `json:"sendError" yaml:"sendError"`
Qos Qos `json:"qos,omitempty" yaml:"qos,omitempty"`
CheckpointInterval cast.DurationConf `json:"checkpointInterval,omitempty" yaml:"checkpointInterval,omitempty"`
RestartStrategy *RestartStrategy `json:"restartStrategy,omitempty" yaml:"restartStrategy,omitempty"`
Cron string `json:"cron,omitempty" yaml:"cron,omitempty"`
Duration string `json:"duration,omitempty" yaml:"duration,omitempty"`
CronDatetimeRange []schedule.DatetimeRange `json:"cronDatetimeRange,omitempty" yaml:"cronDatetimeRange,omitempty"`
NotifySub bool `json:"notifySub,omitempty" yaml:"notifySub,omitempty"`
Debug bool `json:"debug" yaml:"debug"`
LogFilename string `json:"logFilename,omitempty" yaml:"logFilename,omitempty"`
IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
LateTol cast.DurationConf `json:"lateTolerance,omitempty" yaml:"lateTolerance,omitempty"`
Concurrency int `json:"concurrency" yaml:"concurrency"`
BufferLength int `json:"bufferLength" yaml:"bufferLength"`
SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
SendNil bool `json:"sendNilField" yaml:"sendNilField"`
SendError bool `json:"sendError" yaml:"sendError"`
Qos Qos `json:"qos,omitempty" yaml:"qos,omitempty"`
CheckpointInterval cast.DurationConf `json:"checkpointInterval,omitempty" yaml:"checkpointInterval,omitempty"`
RestartStrategy *RestartStrategy `json:"restartStrategy,omitempty" yaml:"restartStrategy,omitempty"`
Cron string `json:"cron,omitempty" yaml:"cron,omitempty"`
Duration string `json:"duration,omitempty" yaml:"duration,omitempty"`
CronDatetimeRange []schedule.DatetimeRange `json:"cronDatetimeRange,omitempty" yaml:"cronDatetimeRange,omitempty"`
PlanOptimizeStrategy *PlanOptimizeStrategy `json:"planOptimizeStrategy,omitempty" yaml:"planOptimizeStrategy,omitempty"`
NotifySub bool `json:"notifySub,omitempty" yaml:"notifySub,omitempty"`
}

type PlanOptimizeStrategy struct {
DisableAliasRefCal bool `json:"disableAliasRefCal,omitempty" yaml:"disableAliasRefCal,omitempty"`
}

func (p *PlanOptimizeStrategy) IsAliasRefCalEnable() bool {
if p == nil {
return true
}
return !p.DisableAliasRefCal
}

type RestartStrategy struct {
Expand Down
36 changes: 19 additions & 17 deletions internal/topo/planner/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/lf-edge/ekuiper/v2/internal/binder/function"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/schema"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
Expand All @@ -33,7 +34,7 @@ type streamInfo struct {

// Analyze the select statement by decorating the info from stream statement.
// Typically, set the correct stream name for fieldRefs
func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*ast.Call, []*ast.Call, error) {
func decorateStmt(s *ast.SelectStatement, store kv.KeyValue, opt *def.RuleOption) ([]*streamInfo, []*ast.Call, []*ast.Call, error) {
streamsFromStmt := xsql.GetStreams(s)
streamStmts := make([]*streamInfo, len(streamsFromStmt))
isSchemaless := false
Expand All @@ -51,10 +52,10 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
isSchemaless = true
}
}
if checkAliasReferenceCycle(s) {
if opt.PlanOptimizeStrategy.IsAliasRefCalEnable() && checkAliasReferenceCycle(s) {
return nil, nil, nil, fmt.Errorf("select fields have cycled alias")
}
if !isSchemaless {
if !isSchemaless && opt.PlanOptimizeStrategy.IsAliasRefCalEnable() {
if err := aliasFieldTopoSort(s, streamStmts); err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -120,21 +121,22 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
AliasRef: ar,
}
walkErr = fieldsMap.save(f.AName, ast.AliasStream, ar)
for _, subF := range s.Fields {
if f.AName == subF.AName {
continue
}
ast.WalkFunc(&subF, func(node ast.Node) bool {
switch fr := node.(type) {
case *ast.FieldRef:
if fr.Name == f.AName && fr.StreamName == streamName {
fr.StreamName = ast.AliasStream
fr.AliasRef = ar
}
return false
if opt.PlanOptimizeStrategy.IsAliasRefCalEnable() {
for _, subF := range s.Fields {
if f.AName == subF.AName {
continue
}
return true
})
ast.WalkFunc(&subF, func(node ast.Node) bool {
switch fr := node.(type) {
case *ast.FieldRef:
if fr.Name == f.AName && fr.StreamName == streamName {
fr.StreamName = ast.AliasStream
fr.AliasRef = ar
}
}
return true
})
}
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions internal/topo/planner/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ var tests = []struct {
r: newErrorStruct(""),
},
{ // 10
sql: `SELECT sum(temp) as temp1, count(temp) as temp FROM src1`,
sql: `SELECT count(temp) as temp, sum(temp) as temp1 FROM src1`,
r: newErrorStruct("invalid argument for func sum: aggregate argument is not allowed"),
},
{ // 11
Expand Down Expand Up @@ -198,7 +198,6 @@ func TestCheckTopoSort(t *testing.T) {
}

func Test_validation(t *testing.T) {
tests[10].r = newErrorStruct("invalid argument for func sum: aggregate argument is not allowed")
store, err := store.GetKV("stream")
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -311,9 +310,6 @@ func Test_validationSchemaless(t *testing.T) {
SendError: true,
}, store)
serr := tt.r.Serr()
if tt.sql == "SELECT sum(temp) as temp1, count(temp) as temp FROM src1" {
serr = ""
}
require.Equal(t, serr, testx.Errstring(err))
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv.
ds ast.Dimensions
)

streamStmts, analyticFuncs, analyticFieldFuncs, err := decorateStmt(stmt, store)
streamStmts, analyticFuncs, analyticFieldFuncs, err := decorateStmt(stmt, store, opt)
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions internal/topo/planner/planner_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,15 @@ func TestPlannerAlias(t *testing.T) {
continue
}
p, _ := createLogicalPlan(stmt, &def.RuleOption{
IsEventTime: false,
LateTol: 0,
Concurrency: 0,
BufferLength: 0,
SendMetaToSink: false,
Qos: 0,
CheckpointInterval: 0,
SendError: true,
IsEventTime: false,
LateTol: 0,
Concurrency: 0,
BufferLength: 0,
SendMetaToSink: false,
Qos: 0,
CheckpointInterval: 0,
SendError: true,
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{},
}, kv)
if !reflect.DeepEqual(tt.p, p) {
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
Expand Down

0 comments on commit 09ba4e4

Please sign in to comment.