Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql): support wildcard with other selects #1436

Merged
merged 1 commit into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions internal/topo/operator/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,29 @@ func TestProjectPlan_Apply1(t *testing.T) {
"b": "true",
}},
},
//35
{
sql: `SELECT a->b AS ab, *, abs(f1) FROM test`,
data: &xsql.Tuple{
Emitter: "test",
Message: xsql.Message{
"a": map[string]interface{}{
"b": "test",
},
"b": "b",
"f1": -12,
},
},
result: []map[string]interface{}{{
"a": map[string]interface{}{
"b": "test",
},
"ab": "test",
"abs": 12,
"b": "b",
"f1": -12,
}},
},
}

fmt.Printf("The test bucket size is %d.\n\n", len(tests))
Expand Down
68 changes: 65 additions & 3 deletions internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func Test_createLogicalPlan(t *testing.T) {
fields: []ast.Field{
{
Expr: &ast.Wildcard{Token: ast.ASTERISK},
Name: "",
Name: "*",
AName: ""},
},
isAggregate: false,
Expand Down Expand Up @@ -1182,6 +1182,68 @@ func Test_createLogicalPlan(t *testing.T) {
sendMeta: false,
}.Init(),
},
{ // 14
sql: `SELECT name, *, meta(device) FROM src1`,
p: ProjectPlan{
baseLogicalPlan: baseLogicalPlan{
children: []LogicalPlan{
DataSourcePlan{
baseLogicalPlan: baseLogicalPlan{},
name: "src1",
streamFields: []interface{}{
&ast.StreamField{
Name: "id1",
FieldType: &ast.BasicType{Type: ast.BIGINT},
},
&ast.StreamField{
Name: "temp",
FieldType: &ast.BasicType{Type: ast.BIGINT},
},
&ast.StreamField{
Name: "name",
FieldType: &ast.BasicType{Type: ast.STRINGS},
},
&ast.StreamField{
Name: "myarray",
FieldType: &ast.ArrayType{Type: ast.STRINGS},
},
},
streamStmt: streams["src1"],
metaFields: []string{"device"},
isWildCard: true,
}.Init(),
},
},
fields: []ast.Field{
{
Expr: &ast.FieldRef{Name: "name", StreamName: "src1"},
Name: "name",
AName: "",
},
{
Name: "*",
Expr: &ast.Wildcard{
Token: ast.ASTERISK,
},
},
{
Name: "meta",
Expr: &ast.Call{
Name: "meta",
Args: []ast.Expr{
&ast.MetaRef{
StreamName: ast.DefaultStream,
Name: "device",
},
},
},
},
},
isAggregate: false,
allWildcard: true,
sendMeta: false,
}.Init(),
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))

Expand Down Expand Up @@ -1520,7 +1582,7 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
fields: []ast.Field{
{
Expr: &ast.Wildcard{Token: ast.ASTERISK},
Name: "",
Name: "*",
AName: ""},
},
isAggregate: false,
Expand Down Expand Up @@ -2600,7 +2662,7 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
fields: []ast.Field{
{
Expr: &ast.Wildcard{Token: ast.ASTERISK},
Name: "",
Name: "*",
AName: "",
},
},
Expand Down
7 changes: 6 additions & 1 deletion internal/topo/topotest/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,37 @@ func TestSingleSQL(t *testing.T) {
var tests = []RuleTest{
{
Name: `TestSingleSQLRule1`,
Sql: `SELECT * FROM demo`,
Sql: `SELECT *, upper(color) FROM demo`,
R: [][]map[string]interface{}{
{{
"color": "red",
"size": float64(3),
"ts": float64(1541152486013),
"upper": "RED",
}},
{{
"color": "blue",
"size": float64(6),
"ts": float64(1541152486822),
"upper": "BLUE",
}},
{{
"color": "blue",
"size": float64(2),
"ts": float64(1541152487632),
"upper": "BLUE",
}},
{{
"color": "yellow",
"size": float64(4),
"ts": float64(1541152488442),
"upper": "YELLOW",
}},
{{
"color": "red",
"size": float64(1),
"ts": float64(1541152489252),
"upper": "RED",
}},
},
M: map[string]interface{}{
Expand Down
16 changes: 6 additions & 10 deletions internal/xsql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,6 @@ func (p *Parser) parseSorts() (ast.SortFields, error) {
func (p *Parser) parseFields() (ast.Fields, error) {
var fields ast.Fields

tok, _ := p.scanIgnoreWhitespace()
if tok == ast.ASTERISK {
fields = append(fields, ast.Field{AName: "", Expr: &ast.Wildcard{Token: tok}})
return fields, nil
}
p.unscan()

for {
field, err := p.parseField()

Expand All @@ -427,7 +420,7 @@ func (p *Parser) parseFields() (ast.Fields, error) {
fields = append(fields, *field)
}

tok, _ = p.scanIgnoreWhitespace()
tok, _ := p.scanIgnoreWhitespace()
if tok != ast.COMMA {
p.unscan()
break
Expand All @@ -449,6 +442,9 @@ func (p *Parser) parseField() (*ast.Field, error) {
return nil, err
} else {
if alias != "" {
if field.Name == "*" {
return nil, fmt.Errorf("alias is not supported for *")
}
field.AName = alias
}
}
Expand All @@ -466,6 +462,8 @@ func nameExpr(exp ast.Expr) string {
return e.Name
case *ast.Call:
return e.Name
case *ast.Wildcard:
return ast.Tokens[ast.ASTERISK]
default:
return ""
}
Expand Down Expand Up @@ -1487,8 +1485,6 @@ func (p *Parser) parseAsterisk() (ast.Expr, error) {
switch p.inFunc {
case "mqtt", "meta":
return &ast.MetaRef{StreamName: ast.DefaultStream, Name: "*"}, nil
case "":
return nil, fmt.Errorf("unsupported * expression, it must be used inside fields or function parameters.")
default:
return &ast.Wildcard{Token: ast.ASTERISK}, nil
}
Expand Down
Loading