Skip to content

Commit

Permalink
Merge pull request #9083 from planetscale/semi-join-review-comments
Browse files Browse the repository at this point in the history
Semi join review comments and bug fixes
  • Loading branch information
harshit-gangal authored Oct 26, 2021
2 parents 7ac025b + caeb602 commit a1fcbe4
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 65 deletions.
25 changes: 0 additions & 25 deletions go/sqltypes/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,31 +76,6 @@ func MakeTestResult(fields []*querypb.Field, rows ...string) *Result {
return result
}

// MakeTestResultNoFields builds a *sqltypes.Result object for testing.
// result := sqltypes.MakeTestResult(
// fields,
// " 1|a",
// "10|abcd",
// )
// The field type values are set as the types for the rows built.
// Spaces are trimmed from row values. "null" is treated as NULL.
func MakeTestResultNoFields(fields []*querypb.Field, rows ...string) *Result {
result := &Result{}
if len(rows) > 0 {
result.Rows = make([][]Value, len(rows))
}
for i, row := range rows {
result.Rows[i] = make([]Value, len(fields))
for j, col := range split(row) {
if col == "null" {
continue
}
result.Rows[i][j] = MakeTrusted(fields[j].Type, []byte(col))
}
}
return result
}

// MakeTestStreamingResults builds a list of results for streaming.
// results := sqltypes.MakeStreamingResults(
// fields,
Expand Down
30 changes: 10 additions & 20 deletions go/vt/vtgate/engine/semi_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,16 @@ func TestSemiJoinStreamExecute(t *testing.T) {
"col4|col5|col6",
"int64|varchar|varchar",
)
rightPrim := &fakePrimitive{ // we'll return non-empty results for rows 2 and 4
results: []*sqltypes.Result{
// First right query will always be a GetFields.
sqltypes.MakeTestResultNoFields(
rightFields,
),
sqltypes.MakeTestResultNoFields(
rightFields,
"4|d|dd",
),
sqltypes.MakeTestResultNoFields(
rightFields,
),
sqltypes.MakeTestResultNoFields(
rightFields,
"5|e|ee",
"6|f|ff",
"7|g|gg",
),
},
rightPrim := &fakePrimitive{
// we'll return non-empty results for rows 2 and 4
results: sqltypes.MakeTestStreamingResults(rightFields,
"4|d|dd",
"---",
"---",
"5|e|ee",
"6|f|ff",
"7|g|gg",
),
}

jn := &SemiJoin{
Expand Down
6 changes: 4 additions & 2 deletions go/vt/vtgate/planbuilder/concantenatetree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package planbuilder

import (
"fmt"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -64,9 +66,9 @@ func (c *concatenateTree) pushOutputColumns(columns []*sqlparser.ColName, semTab
}

func (c *concatenateTree) pushPredicate(ctx *planningContext, expr sqlparser.Expr) error {
return vterrors.New(vtrpc.Code_UNIMPLEMENTED, "pushPredicate does not work on concatenate trees")
return vterrors.New(vtrpc.Code_INTERNAL, fmt.Sprintf("add '%s' predicate not supported on concatenate trees", sqlparser.String(expr)))
}

func (c *concatenateTree) removePredicate(ctx *planningContext, expr sqlparser.Expr) error {
return vterrors.New(vtrpc.Code_UNIMPLEMENTED, "removePredicate does not work on concatenate trees")
return vterrors.New(vtrpc.Code_INTERNAL, fmt.Sprintf("remove '%s' predicate not supported on concatenate trees", sqlparser.String(expr)))
}
6 changes: 4 additions & 2 deletions go/vt/vtgate/planbuilder/derivedtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package planbuilder

import (
"fmt"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -75,11 +77,11 @@ func (d *derivedTree) pushOutputColumns(names []*sqlparser.ColName, semTable *se
}

func (d *derivedTree) pushPredicate(ctx *planningContext, expr sqlparser.Expr) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "pushPredicate does not work on derivedTrees")
return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("add '%s' predicate not supported on derived trees", sqlparser.String(expr)))
}

func (d *derivedTree) removePredicate(ctx *planningContext, expr sqlparser.Expr) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "removePredicate does not work on derivedTrees")
return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("remove '%s' predicate not supported on derived trees", sqlparser.String(expr)))
}

// findOutputColumn returns the index on which the given name is found in the slice of
Expand Down
42 changes: 33 additions & 9 deletions go/vt/vtgate/planbuilder/jointree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package planbuilder

import (
"fmt"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -81,7 +83,7 @@ func (jp *joinTree) pushOutputColumns(columns []*sqlparser.ColName, semTable *se
outputColumns := make([]int, len(toTheLeft))
var l, r int
for i, isLeft := range toTheLeft {
outputColumns[i] = i
outputColumns[i] = len(jp.columns)
if isLeft {
jp.columns = append(jp.columns, -lhsOffset[l]-1)
l++
Expand All @@ -94,19 +96,41 @@ func (jp *joinTree) pushOutputColumns(columns []*sqlparser.ColName, semTable *se
}

func (jp *joinTree) pushPredicate(ctx *planningContext, expr sqlparser.Expr) error {
isPushed := false
if ctx.semTable.RecursiveDeps(expr).IsSolvedBy(jp.lhs.tableID()) {
return jp.lhs.pushPredicate(ctx, expr)
} else if ctx.semTable.RecursiveDeps(expr).IsSolvedBy(jp.rhs.tableID()) {
return jp.rhs.pushPredicate(ctx, expr)
if err := jp.lhs.pushPredicate(ctx, expr); err != nil {
return err
}
isPushed = true
}
if ctx.semTable.RecursiveDeps(expr).IsSolvedBy(jp.rhs.tableID()) {
if err := jp.rhs.pushPredicate(ctx, expr); err != nil {
return err
}
isPushed = true
}
if isPushed {
return nil
}
return vterrors.New(vtrpc.Code_UNIMPLEMENTED, "pushPredicate does not work on joinTrees with predicates having dependencies from both the sides")
return vterrors.New(vtrpc.Code_UNIMPLEMENTED, fmt.Sprintf("add '%s' predicate not supported on cross-shard join query", sqlparser.String(expr)))
}

func (jp *joinTree) removePredicate(ctx *planningContext, expr sqlparser.Expr) error {
isRemoved := false
if ctx.semTable.RecursiveDeps(expr).IsSolvedBy(jp.lhs.tableID()) {
return jp.lhs.removePredicate(ctx, expr)
} else if ctx.semTable.RecursiveDeps(expr).IsSolvedBy(jp.rhs.tableID()) {
return jp.rhs.removePredicate(ctx, expr)
if err := jp.lhs.removePredicate(ctx, expr); err != nil {
return err
}
isRemoved = true
}
if ctx.semTable.RecursiveDeps(expr).IsSolvedBy(jp.rhs.tableID()) {
if err := jp.rhs.removePredicate(ctx, expr); err != nil {
return err
}
isRemoved = true
}
if isRemoved {
return nil
}
return vterrors.New(vtrpc.Code_UNIMPLEMENTED, "removePredicate does not work on joinTrees with predicates having dependencies from both the sides")
return vterrors.New(vtrpc.Code_UNIMPLEMENTED, fmt.Sprintf("remove '%s' predicate not supported on cross-shard join query", sqlparser.String(expr)))
}
19 changes: 14 additions & 5 deletions go/vt/vtgate/planbuilder/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,29 @@ func createCorrelatedSubqueryTree(ctx *planningContext, innerTree, outerTree que
}

vars := map[string]int{}
bindVars := map[*sqlparser.ColName]string{}
for _, pred := range preds {
var rewriteError error
sqlparser.Rewrite(pred, func(cursor *sqlparser.Cursor) bool {
switch node := cursor.Node().(type) {
case *sqlparser.ColName:
if ctx.semTable.RecursiveDeps(node).IsSolvedBy(outerTree.tableID()) {
// check whether the bindVariable already exists in the map
// we do so by checking that the column names are the same and their recursive dependencies are the same
// so if the column names user.a and a would also be equal if the latter is also referencing the user table
for colName, bindVar := range bindVars {
if node.Name.Equal(colName.Name) && ctx.semTable.RecursiveDeps(node).Equals(ctx.semTable.RecursiveDeps(colName)) {
cursor.Replace(sqlparser.NewArgument(bindVar))
return false
}
}

// get the bindVariable for that column name and replace it in the predicate
bindVar := ctx.reservedVars.ReserveColName(node)
cursor.Replace(sqlparser.NewArgument(bindVar))
// check whether the bindVariable already exists in the map
_, alreadyExists := vars[bindVar]
if alreadyExists {
return false
}
// store it in the map for future comparisons
bindVars[node] = bindVar

// if it does not exist, then push this as an output column in the outerTree and add it to the joinVars
columnIndexes, err := outerTree.pushOutputColumns([]*sqlparser.ColName{node}, ctx.semTable)
if err != nil {
Expand Down
100 changes: 100 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/select_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2580,3 +2580,103 @@ Gen4 plan same as above
]
}
}

# correlated subquery having dependencies on two tables
"select 1 from user u1, user u2 where exists (select 1 from user_extra ue where ue.col = u1.col and ue.col = u2.col)"
"unsupported: cross-shard correlated subquery"
{
"QueryType": "SELECT",
"Original": "select 1 from user u1, user u2 where exists (select 1 from user_extra ue where ue.col = u1.col and ue.col = u2.col)",
"Instructions": {
"OperatorType": "SemiJoin",
"JoinVars": {
"u1_col": 0,
"u2_col": 1
},
"ProjectedIndexes": "-3",
"TableName": "`user`_`user`_user_extra",
"Inputs": [
{
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "-1,1,-2",
"TableName": "`user`_`user`",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select u1.col, 1 from `user` as u1 where 1 != 1",
"Query": "select u1.col, 1 from `user` as u1",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select u2.col from `user` as u2 where 1 != 1",
"Query": "select u2.col from `user` as u2",
"Table": "`user`"
}
]
},
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from user_extra as ue where 1 != 1",
"Query": "select 1 from user_extra as ue where ue.col = :u1_col and ue.col = :u2_col",
"Table": "user_extra"
}
]
}
}

# correlated subquery using a column twice
"select 1 from user u where exists (select 1 from user_extra ue where ue.col = u.col and u.col = ue.col2)"
"unsupported: cross-shard correlated subquery"
{
"QueryType": "SELECT",
"Original": "select 1 from user u where exists (select 1 from user_extra ue where ue.col = u.col and u.col = ue.col2)",
"Instructions": {
"OperatorType": "SemiJoin",
"JoinVars": {
"u_col": 0
},
"ProjectedIndexes": "-2",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select u.col, 1 from `user` as u where 1 != 1",
"Query": "select u.col, 1 from `user` as u",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from user_extra as ue where 1 != 1",
"Query": "select 1 from user_extra as ue where ue.col = :u_col and ue.col2 = :u_col",
"Table": "user_extra"
}
]
}
}
6 changes: 4 additions & 2 deletions go/vt/vtgate/planbuilder/vindextree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package planbuilder

import (
"fmt"

"vitess.io/vitess/go/sqltypes"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -67,9 +69,9 @@ outer:
}

func (v *vindexTree) pushPredicate(ctx *planningContext, expr sqlparser.Expr) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "pushPredicate does not work on vindexTrees")
return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("add '%s' predicate not supported on vindex trees", sqlparser.String(expr)))
}

func (v *vindexTree) removePredicate(ctx *planningContext, expr sqlparser.Expr) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "removePredicate does not work on vindexTrees")
return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("remove '%s' predicate not supported on vindex trees", sqlparser.String(expr)))
}
5 changes: 5 additions & 0 deletions go/vt/vtgate/semantics/tabletset.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ func (ts TableSet) IsSolvedBy(other TableSet) bool {
}
}

// Equals returns true if `ts` and `other` contain the same tables
func (ts TableSet) Equals(other TableSet) bool {
return ts.IsSolvedBy(other) && other.IsSolvedBy(ts)
}

// NumberOfTables returns the number of bits set
func (ts TableSet) NumberOfTables() int {
if ts.large == nil {
Expand Down

0 comments on commit a1fcbe4

Please sign in to comment.