Skip to content

Commit

Permalink
ranger: fix prefix index when charset is UTF-8 (#7194) (#7231)
Browse files Browse the repository at this point in the history
  • Loading branch information
birdstorm authored and coocood committed Aug 2, 2018
1 parent 50fdc54 commit 7ccecdd
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 19 deletions.
16 changes: 16 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3365,6 +3365,22 @@ func (s *testIntegrationSuite) TestPrefixIndex(c *C) {
tk.MustExec("insert into t1 values('借款策略集_网页');")
res := tk.MustQuery("select * from t1 where name = '借款策略集_网页';")
res.Check(testkit.Rows("借款策略集_网页"))

tk.MustExec(`CREATE TABLE prefix (
a int(11) NOT NULL,
b varchar(55) DEFAULT NULL,
c int(11) DEFAULT NULL,
PRIMARY KEY (a),
KEY prefix_index (b(2)),
KEY prefix_complex (a,b(2))
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;`)

tk.MustExec("INSERT INTO prefix VALUES(0, 'b', 2), (1, 'bbb', 3), (2, 'bbc', 4), (3, 'bbb', 5), (4, 'abc', 6), (5, 'abc', 7), (6, 'abc', 7), (7, 'ÿÿ', 8), (8, 'ÿÿ0', 9), (9, 'ÿÿÿ', 10);")
res = tk.MustQuery("select c, b from prefix where b > 'ÿ' and b < 'ÿÿc'")
res.Check(testkit.Rows("8 ÿÿ", "9 ÿÿ0"))

res = tk.MustQuery("select a, b from prefix where b LIKE 'ÿÿ%'")
res.Check(testkit.Rows("7 ÿÿ", "8 ÿÿ0", "9 ÿÿÿ"))
}

func (s *testIntegrationSuite) TestDecimalMul(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion plan/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test index filter condition push down.
{
sql: "select * from t use index(e_d_c_str_prefix) where t.c_str = 'abcdefghijk' and t.d_str = 'd' and t.e_str = 'e'",
best: "IndexLookUp(Index(t.e_d_c_str_prefix)[[e d [97 98 99 100 101 102 103 104 105 106],e d [97 98 99 100 101 102 103 104 105 106]]], Table(t)->Sel([eq(test.t.c_str, abcdefghijk)]))",
best: "IndexLookUp(Index(t.e_d_c_str_prefix)[[e d abcdefghij,e d abcdefghij]], Table(t)->Sel([eq(test.t.c_str, abcdefghijk)]))",
},
{
sql: "select * from t use index(e_d_c_str_prefix) where t.e_str = b'1110000'",
Expand Down
17 changes: 8 additions & 9 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,18 @@ func (c *index) truncateIndexValuesIfNeeded(indexedValues []types.Datum) []types
if v.Kind() == types.KindString || v.Kind() == types.KindBytes {
ic := c.idxInfo.Columns[i]
colCharset := c.tblInfo.Columns[ic.Offset].Charset
if colCharset == charset.CharsetUTF8 || colCharset == charset.CharsetUTF8MB4 {
val := v.GetBytes()
if ic.Length != types.UnspecifiedLength && utf8.RuneCount(val) > ic.Length {
rs := bytes.Runes(val)
colValue := v.GetBytes()
isUTF8Charset := colCharset == charset.CharsetUTF8 || colCharset == charset.CharsetUTF8MB4
if isUTF8Charset {
if ic.Length != types.UnspecifiedLength && utf8.RuneCount(colValue) > ic.Length {
rs := bytes.Runes(colValue)
truncateStr := string(rs[:ic.Length])
// truncate value and limit its length
v.SetString(truncateStr)
}
} else {
if ic.Length != types.UnspecifiedLength && len(v.GetBytes()) > ic.Length {
// truncate value and limit its length
v.SetBytes(v.GetBytes()[:ic.Length])
}
} else if ic.Length != types.UnspecifiedLength && len(colValue) > ic.Length {
// truncate value and limit its length
v.SetBytes(colValue[:ic.Length])
}
}
}
Expand Down
35 changes: 27 additions & 8 deletions util/ranger/ranger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"math"
"sort"
"unicode/utf8"

"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/charset"
"github.com/pingcap/tidb/util/codec"
)

Expand Down Expand Up @@ -326,7 +328,7 @@ func buildCNFIndexRange(sc *stmtctx.StatementContext, cols []*expression.Column,

// Take prefix index into consideration.
if hasPrefix(lengths) {
fixPrefixColRange(ranges, lengths)
fixPrefixColRange(ranges, lengths, newTp)
}

if len(ranges) > 0 && len(ranges[0].LowVal) < len(cols) {
Expand Down Expand Up @@ -409,23 +411,37 @@ func hasPrefix(lengths []int) bool {
return false
}

func fixPrefixColRange(ranges []*NewRange, lengths []int) {
func fixPrefixColRange(ranges []*NewRange, lengths []int, tp []*types.FieldType) {
for _, ran := range ranges {
for i := 0; i < len(ran.LowVal); i++ {
fixRangeDatum(&ran.LowVal[i], lengths[i])
fixRangeDatum(&ran.LowVal[i], lengths[i], tp[i])
}
ran.LowExclude = false
for i := 0; i < len(ran.HighVal); i++ {
fixRangeDatum(&ran.HighVal[i], lengths[i])
fixRangeDatum(&ran.HighVal[i], lengths[i], tp[i])
}
ran.HighExclude = false
}
}

func fixRangeDatum(v *types.Datum, length int) {
func fixRangeDatum(v *types.Datum, length int, tp *types.FieldType) {
// If this column is prefix and the prefix length is smaller than the range, cut it.
if length != types.UnspecifiedLength && length < len(v.GetBytes()) {
v.SetBytes(v.GetBytes()[:length])
// In case of UTF8, prefix should be cut by characters rather than bytes
if v.Kind() == types.KindString || v.Kind() == types.KindBytes {
colCharset := tp.Charset
colValue := v.GetBytes()
isUTF8Charset := colCharset == charset.CharsetUTF8 || colCharset == charset.CharsetUTF8MB4
if isUTF8Charset {
if length != types.UnspecifiedLength && utf8.RuneCount(colValue) > length {
rs := bytes.Runes(colValue)
truncateStr := string(rs[:length])
// truncate value and limit its length
v.SetString(truncateStr)
}
} else if length != types.UnspecifiedLength && len(colValue) > length {
// truncate value and limit its length
v.SetBytes(colValue[:length])
}
}
}

Expand All @@ -437,11 +453,14 @@ func newFieldType(tp *types.FieldType) *types.FieldType {
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong:
newTp := types.NewFieldType(mysql.TypeLonglong)
newTp.Flag = tp.Flag
newTp.Charset = tp.Charset
return newTp
// To avoid data truncate error.
case mysql.TypeFloat, mysql.TypeDouble, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob,
mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString:
return types.NewFieldType(tp.Tp)
newTp := types.NewFieldType(tp.Tp)
newTp.Charset = tp.Charset
return newTp
default:
return tp
}
Expand Down
16 changes: 15 additions & 1 deletion util/ranger/ranger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (s *testRangerSuite) TestIndexRange(c *C) {
testKit := testkit.NewTestKit(c, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("create table t(a varchar(50), b int, c double, index idx_ab(a(50), b), index idx_cb(c, a))")
testKit.MustExec("create table t(a varchar(50), b int, c double, d varchar(10), e binary(10), index idx_ab(a(50), b), index idx_cb(c, a), index idx_d(d(2)), index idx_e(e(2)))")

tests := []struct {
indexPos int
Expand Down Expand Up @@ -510,6 +510,20 @@ func (s *testRangerSuite) TestIndexRange(c *C) {
filterConds: "[or(gt(test.t.a, a), gt(test.t.c, 1))]",
resultStr: "[[<nil>,+inf]]",
},
{
indexPos: 2,
exprStr: `d = "你好啊"`,
accessConds: "[eq(test.t.d, 你好啊)]",
filterConds: "[eq(test.t.d, 你好啊)]",
resultStr: "[[你好,你好]]",
},
{
indexPos: 3,
exprStr: `e = "你好啊"`,
accessConds: "[eq(test.t.e, 你好啊)]",
filterConds: "[eq(test.t.e, 你好啊)]",
resultStr: "[[[228 189],[228 189]]]",
},
}

for _, tt := range tests {
Expand Down

0 comments on commit 7ccecdd

Please sign in to comment.