Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for ordering on character fields for sharded keyspace queries #7678

Merged
merged 20 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
74e2943
adding weight strings is now the default unless the order by column t…
GuptaManan100 Mar 14, 2021
e30e5ba
added a testcase for integer column
GuptaManan100 Mar 14, 2021
610e4d4
added an end to end test
GuptaManan100 Mar 14, 2021
1e5762e
added both weighted string and normal column to ordering
GuptaManan100 Mar 15, 2021
7305da9
fixed vindex_func test
GuptaManan100 Mar 16, 2021
47989fc
added a function to get the return type of a column
GuptaManan100 Mar 17, 2021
dc4f008
fixed a test
GuptaManan100 Mar 17, 2021
58b7850
Merge remote-tracking branch 'upstream/master' into order-by
GuptaManan100 Mar 17, 2021
08091fa
added comments and fixed tests
GuptaManan100 Mar 17, 2021
f8e5471
removed weight_string col from planner string output
GuptaManan100 Mar 17, 2021
8667c1e
move GetRouteType to symtab and augmented it
GuptaManan100 Mar 17, 2021
005dc63
ran make sizegen
GuptaManan100 Mar 17, 2021
c6924d7
extract fallback comparisons into a type
systay Mar 22, 2021
ed25dcc
moved comparer to its own file and added tests for it
GuptaManan100 Mar 22, 2021
4336156
added a test for group by with order by
GuptaManan100 Mar 22, 2021
6e56257
added tests for streamExecute
GuptaManan100 Mar 22, 2021
316be68
decouple planner and executor by returning errors from supply weight …
GuptaManan100 Mar 22, 2021
5e43cff
refactor code
GuptaManan100 Mar 23, 2021
5397668
added a test for error in return type as well
GuptaManan100 Mar 23, 2021
1982137
use vterrors instead of fmt.error
systay Mar 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions go/test/endtoend/vtgate/aggr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,22 @@ func TestAggregateTypes(t *testing.T) {
assertMatches(t, conn, "select val1, count(distinct val2) k, count(*) from aggr_test group by val1 order by k desc, val1 limit 4", `[[VARCHAR("c") INT64(2) INT64(2)] [VARCHAR("a") INT64(1) INT64(2)] [VARCHAR("b") INT64(1) INT64(1)] [VARCHAR("e") INT64(1) INT64(2)]]`)
exec(t, conn, "delete from aggr_test")
}

GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
func TestGroupBy(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
exec(t, conn, "insert into t3(id5, id6, id7) values(1,1,2), (2,2,4), (3,2,4), (4,1,2), (5,1,2), (6,3,6)")
// test ordering and group by int column
assertMatches(t, conn, "select id6, id7, count(*) k from t3 group by id6, id7 order by k", `[[INT64(3) INT64(6) INT64(1)] [INT64(2) INT64(4) INT64(2)] [INT64(1) INT64(2) INT64(3)]]`)

defer func() {
exec(t, conn, "set workload = oltp")
exec(t, conn, "delete from t3")
}()
// Test the same queries in streaming mode
exec(t, conn, "set workload = olap")
assertMatches(t, conn, "select id6, id7, count(*) k from t3 group by id6, id7 order by k", `[[INT64(3) INT64(6) INT64(1)] [INT64(2) INT64(4) INT64(2)] [INT64(1) INT64(2) INT64(3)]]`)
}
23 changes: 23 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,29 @@ func TestShowVariables(t *testing.T) {
require.True(t, found, "Expected a row for version in show query")
}

func TestOrderBy(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
exec(t, conn, "insert into t4(id1, id2) values(1,'a'), (2,'Abc'), (3,'b'), (4,'c'), (5,'test')")
exec(t, conn, "insert into t4(id1, id2) values(6,'d'), (7,'e'), (8,'F')")
// test ordering of varchar column
assertMatches(t, conn, "select id1, id2 from t4 order by id2 desc", `[[INT64(5) VARCHAR("test")] [INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(2) VARCHAR("Abc")] [INT64(1) VARCHAR("a")]]`)
// test ordering of int column
assertMatches(t, conn, "select id1, id2 from t4 order by id1 desc", `[[INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(5) VARCHAR("test")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(2) VARCHAR("Abc")] [INT64(1) VARCHAR("a")]]`)

defer func() {
exec(t, conn, "set workload = oltp")
exec(t, conn, "delete from t4")
}()
// Test the same queries in streaming mode
exec(t, conn, "set workload = olap")
assertMatches(t, conn, "select id1, id2 from t4 order by id2 desc", `[[INT64(5) VARCHAR("test")] [INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(2) VARCHAR("Abc")] [INT64(1) VARCHAR("a")]]`)
assertMatches(t, conn, "select id1, id2 from t4 order by id1 desc", `[[INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(5) VARCHAR("test")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(2) VARCHAR("Abc")] [INT64(1) VARCHAR("a")]]`)
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions go/vt/vtgate/engine/comparer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vtgate/evalengine"
)

// comparer is the struct that has the logic for comparing two rows in the result set
type comparer struct {
orderBy, weightString int
desc bool
}

// compare compares two rows given the comparer and returns which one should be earlier in the result set
// -1 if the first row should be earlier
// 1 is the second row should be earlier
// 0 if both the rows have equal ordering
func (c *comparer) compare(r1, r2 []sqltypes.Value) (int, error) {
cmp, err := evalengine.NullsafeCompare(r1[c.orderBy], r2[c.orderBy])
if err != nil {
_, isComparisonErr := err.(evalengine.UnsupportedComparisonError)
if !(isComparisonErr && c.weightString != -1) {
return 0, err
}
// in case of a comparison error switch to using the weight string column for ordering
c.orderBy = c.weightString
c.weightString = -1
cmp, err = evalengine.NullsafeCompare(r1[c.orderBy], r2[c.orderBy])
if err != nil {
return 0, err
}
}
// change the result if descending ordering is required
if c.desc {
cmp = -cmp
}
return cmp, nil
}

// extractSlices extracts the three fields of OrderbyParams into a slice of comparers
func extractSlices(input []OrderbyParams) []*comparer {
var result []*comparer
for _, order := range input {
result = append(result, &comparer{
orderBy: order.Col,
weightString: order.WeightStringCol,
desc: order.Desc,
})
}
return result
}
114 changes: 114 additions & 0 deletions go/vt/vtgate/engine/comparer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"strconv"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
)

func TestComparer(t *testing.T) {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
tests := []struct {
comparer comparer
row1 []sqltypes.Value
row2 []sqltypes.Value
output int
}{
{
comparer: comparer{
orderBy: 0,
weightString: -1,
desc: true,
},
row1: []sqltypes.Value{
sqltypes.NewInt64(23),
},
row2: []sqltypes.Value{
sqltypes.NewInt64(34),
},
output: 1,
}, {
comparer: comparer{
orderBy: 0,
weightString: -1,
desc: false,
},
row1: []sqltypes.Value{
sqltypes.NewInt64(23),
},
row2: []sqltypes.Value{
sqltypes.NewInt64(23),
},
output: 0,
}, {
comparer: comparer{
orderBy: 0,
weightString: -1,
desc: false,
},
row1: []sqltypes.Value{
sqltypes.NewInt64(23),
},
row2: []sqltypes.Value{
sqltypes.NewInt64(12),
},
output: 1,
}, {
comparer: comparer{
orderBy: 1,
weightString: 0,
desc: false,
},
row1: []sqltypes.Value{
sqltypes.NewInt64(23),
sqltypes.NewVarChar("b"),
},
row2: []sqltypes.Value{
sqltypes.NewInt64(34),
sqltypes.NewVarChar("a"),
},
output: -1,
}, {
comparer: comparer{
orderBy: 1,
weightString: 0,
desc: true,
},
row1: []sqltypes.Value{
sqltypes.NewInt64(23),
sqltypes.NewVarChar("A"),
},
row2: []sqltypes.Value{
sqltypes.NewInt64(23),
sqltypes.NewVarChar("a"),
},
output: 0,
},
}

for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
got, err := test.comparer.compare(test.row1, test.row2)
require.NoError(t, err)
require.Equal(t, test.output, got)
})
}
}
40 changes: 16 additions & 24 deletions go/vt/vtgate/engine/memory_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (ms *MemorySort) Execute(vcursor VCursor, bindVars map[string]*querypb.Bind
return nil, err
}
sh := &sortHeap{
rows: result.Rows,
orderBy: ms.OrderBy,
rows: result.Rows,
comparers: extractSlices(ms.OrderBy),
}
sort.Sort(sh)
if sh.err != nil {
Expand All @@ -104,8 +104,8 @@ func (ms *MemorySort) StreamExecute(vcursor VCursor, bindVars map[string]*queryp
// You have to reverse the ordering because the highest values
// must be dropped once the upper limit is reached.
sh := &sortHeap{
orderBy: ms.OrderBy,
reverse: true,
comparers: extractSlices(ms.OrderBy),
reverse: true,
}
err = ms.Input.StreamExecute(vcursor, bindVars, wantfields, func(qr *sqltypes.Result) error {
if len(qr.Fields) != 0 {
Expand All @@ -115,9 +115,11 @@ func (ms *MemorySort) StreamExecute(vcursor VCursor, bindVars map[string]*queryp
}
for _, row := range qr.Rows {
heap.Push(sh, row)
}
for len(sh.rows) > count {
_ = heap.Pop(sh)
// Remove the highest element from the heap if the size is more than the count
// This optimization means that the maximum size of the heap is going to be (count + 1)
for len(sh.rows) > count {
_ = heap.Pop(sh)
}
}
if vcursor.ExceedsMaxMemoryRows(len(sh.rows)) {
return fmt.Errorf("in-memory row count exceeded allowed limit of %d", vcursor.MaxMemoryRows())
Expand Down Expand Up @@ -215,10 +217,10 @@ func GenericJoin(input interface{}, f func(interface{}) string) string {
// sortHeap is sorted based on the orderBy params.
// Implementation is similar to scatterHeap
type sortHeap struct {
rows [][]sqltypes.Value
orderBy []OrderbyParams
reverse bool
err error
rows [][]sqltypes.Value
comparers []*comparer
reverse bool
err error
}

// Len satisfies sort.Interface and heap.Interface.
Expand All @@ -228,29 +230,19 @@ func (sh *sortHeap) Len() int {

// Less satisfies sort.Interface and heap.Interface.
func (sh *sortHeap) Less(i, j int) bool {
for _, order := range sh.orderBy {
for _, c := range sh.comparers {
if sh.err != nil {
return true
}
cmp, err := evalengine.NullsafeCompare(sh.rows[i][order.Col], sh.rows[j][order.Col])
cmp, err := c.compare(sh.rows[i], sh.rows[j])
if err != nil {
sh.err = err
return true
}
if cmp == 0 {
continue
}
// This is equivalent to:
//if !sh.reverse {
// if order.Desc {
// cmp = -cmp
// }
//} else {
// if !order.Desc {
// cmp = -cmp
// }
//}
if sh.reverse != order.Desc {
if sh.reverse {
cmp = -cmp
}
return cmp < 0
Expand Down
Loading