Skip to content

Commit

Permalink
Merge branch 'release-4.0' into release-4.0-82bfdcab376e
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 authored May 8, 2021
2 parents b29820c + b8fb03c commit d68e2ce
Show file tree
Hide file tree
Showing 37 changed files with 516 additions and 94 deletions.
20 changes: 20 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5039,3 +5039,23 @@ func (s *testSerialDBSuite) TestDDLExitWhenCancelMeetPanic(c *C) {
c.Assert(job.ErrorCount, Equals, int64(4))
c.Assert(job.Error.Error(), Equals, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled")
}

// Close issue #23321.
// See https://github.com/pingcap/tidb/issues/23321
func (s *testSerialDBSuite) TestJsonUnmarshalErrWhenPanicInCancellingPath(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec("drop table if exists test_add_index_after_add_col")
tk.MustExec("create table test_add_index_after_add_col(a int, b int not null default '0');")
tk.MustExec("insert into test_add_index_after_add_col values(1, 2),(2,2);")
tk.MustExec("alter table test_add_index_after_add_col add column c int not null default '0';")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit"), IsNil)
}()

_, err := tk.Exec("alter table test_add_index_after_add_col add unique index cc(c);")
c.Assert(err.Error(), Equals, "[kv:1062]DDL job cancelled by panic in rollingback, error msg: Duplicate entry '0' for key 'cc'")
}
7 changes: 7 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,13 @@ func chooseLeaseTime(t, max time.Duration) time.Duration {
// countForPanic records the error count for DDL job.
func (w *worker) countForPanic(job *model.Job) {
// If run DDL job panic, just cancel the DDL jobs.
if job.State == model.JobStateRollingback {
job.State = model.JobStateCancelled
msg := fmt.Sprintf("DDL job cancelled by panic in rollingback, error msg: %s", terror.ToSQLError(job.Error).Message)
job.Error = terror.GetErrClass(job.Error).Synthesize(terror.ErrCode(job.Error.Code()), msg)
logutil.Logger(w.logCtx).Warn(msg)
return
}
job.State = model.JobStateCancelling
job.ErrorCount++

Expand Down
6 changes: 5 additions & 1 deletion executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,13 @@ func buildApproxPercentile(sctx sessionctx.Context, aggFuncDesc *aggregation.Agg

base := basePercentile{percent: int(percent), baseAggFunc: baseAggFunc{args: aggFuncDesc.Args, ordinal: ordinal}}

evalType := aggFuncDesc.Args[0].GetType().EvalType()
if aggFuncDesc.Args[0].GetType().Tp == mysql.TypeBit {
evalType = types.ETString // same as other aggregate function
}
switch aggFuncDesc.Mode {
case aggregation.CompleteMode, aggregation.Partial1Mode, aggregation.FinalMode:
switch aggFuncDesc.Args[0].GetType().EvalType() {
switch evalType {
case types.ETInt:
return &percentileOriginal4Int{base}
case types.ETReal:
Expand Down
5 changes: 5 additions & 0 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range) (hists []*statis
if err != nil {
return nil, nil, err
}
// When collation is enabled, we store the Key representation of the sampling data. So we set it to kind `Bytes` here
// to avoid to convert it to its Key representation once more.
if collectors[i].Samples[j].Value.Kind() == types.KindString {
collectors[i].Samples[j].Value.SetBytes(collectors[i].Samples[j].Value.GetBytes())
}
}
hg, err := statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -646,3 +647,26 @@ func (s *testSuite1) TestDefaultValForAnalyze(c *C) {
tk.MustQuery("explain select * from t where a = 1").Check(testkit.Rows("IndexReader_6 1.00 root index:IndexRangeScan_5",
"└─IndexRangeScan_5 1.00 cop[tikv] table:t, index:a(a) range:[1,1], keep order:false"))
}

func (s *testSerialSuite2) TestIssue20874(c *C) {
collate.SetNewCollationEnabledForTest(true)
defer collate.SetNewCollationEnabledForTest(false)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a char(10) collate utf8mb4_unicode_ci not null, b char(20) collate utf8mb4_general_ci not null)")
tk.MustExec("insert into t values ('#', 'C'), ('$', 'c'), ('a', 'a')")
tk.MustExec("analyze table t")
tk.MustQuery("show stats_buckets where db_name = 'test' and table_name = 't'").Sort().Check(testkit.Rows(
"test t a 0 0 1 1 \x02\xd2 \x02\xd2",
"test t a 0 1 2 1 \x0e\x0f \x0e\x0f",
"test t a 0 2 3 1 \x0e3 \x0e3",
"test t b 0 0 1 1 \x00A \x00A",
"test t b 0 1 3 2 \x00C \x00C",
))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a char(10) collate utf8mb4_general_ci not null)")
tk.MustExec("insert into t values ('汉字'), ('中文'), ('汉字'), ('中文'), ('汉字'), ('中文'), ('汉字'), ('中文'), ('汉字'), ('中文'), ('汉字'), ('中文'), ('汉字'), ('中文')")
tk.MustExec("analyze table t")
}
3 changes: 3 additions & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return err1
}
if idxKey == nil {
continue
}
s := hack.String(idxKey)
if _, found := dedup[s]; found {
continue
Expand Down
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,9 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
}
collectTable := false
e.tableRequest.CollectRangeCounts = &collectTable
if v.ExtraHandleCol != nil {
e.extraHandleIdx = v.ExtraHandleCol.Index
}
return e, nil
}

Expand Down
14 changes: 10 additions & 4 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var _ = Suite(&testSuite{&baseTestSuite{}})
var _ = Suite(&testSuiteP1{&baseTestSuite{}})
var _ = Suite(&testSuiteP2{&baseTestSuite{}})
var _ = Suite(&testSuite1{})
var _ = SerialSuites(&testSerialSuite2{})
var _ = Suite(&testSuite2{&baseTestSuite{}})
var _ = Suite(&testSuite3{&baseTestSuite{}})
var _ = Suite(&testSuite4{&baseTestSuite{}})
Expand Down Expand Up @@ -3033,6 +3034,10 @@ type testSuite1 struct {
testSuiteWithCliBase
}

type testSerialSuite2 struct {
testSuiteWithCliBase
}

func (s *testSuiteWithCliBase) SetUpSuite(c *C) {
cli := &checkRequestClient{}
hijackClient := func(c tikv.Client) tikv.Client {
Expand Down Expand Up @@ -6710,17 +6715,18 @@ func (s *testSuiteP1) TestIssue22941(c *C) {
PRIMARY KEY (mid),
KEY ind_bm_parent (ParentId,mid)
)`)

// mp should have more columns than m
tk.MustExec(`CREATE TABLE mp (
mpid bigint(20) unsigned NOT NULL DEFAULT '0',
mid varchar(50) DEFAULT NULL COMMENT '模块主键',
sid int,
PRIMARY KEY (mpid)
);`)

tk.MustExec(`insert into mp values("1","1");`)
tk.MustExec(`insert into mp values("1","1","0");`)
tk.MustExec(`insert into m values("0", "0");`)
rs := tk.MustQuery(`SELECT ( SELECT COUNT(1) FROM m WHERE ParentId = c.mid ) expand, bmp.mpid, bmp.mpid IS NULL,bmp.mpid IS NOT NULL FROM m c LEFT JOIN mp bmp ON c.mid = bmp.mid WHERE c.ParentId = '0'`)
rs.Check(testkit.Rows("1 <nil> 1 0"))
rs := tk.MustQuery(`SELECT ( SELECT COUNT(1) FROM m WHERE ParentId = c.mid ) expand, bmp.mpid, bmp.mpid IS NULL,bmp.mpid IS NOT NULL, sid FROM m c LEFT JOIN mp bmp ON c.mid = bmp.mid WHERE c.ParentId = '0'`)
rs.Check(testkit.Rows("1 <nil> 1 0 <nil>"))

rs = tk.MustQuery(`SELECT bmp.mpid, bmp.mpid IS NULL,bmp.mpid IS NOT NULL FROM m c LEFT JOIN mp bmp ON c.mid = bmp.mid WHERE c.ParentId = '0'`)
rs.Check(testkit.Rows("<nil> 1 0"))
Expand Down
1 change: 0 additions & 1 deletion executor/index_lookup_merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

// TODO: reopen the index merge join in future.

//func (s *testSuite9) TestIndexLookupMergeJoinHang(c *C) {
// c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/IndexMergeJoinMockOOM", `return(true)`), IsNil)
// defer func() {
Expand Down
10 changes: 7 additions & 3 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type IndexMergeReaderExecutor struct {
corColInAccess bool
idxCols [][]*expression.Column
colLens [][]int

// extraHandleIdx indicates the index of extraHandleCol when the partial
// reader is TableReader.
extraHandleIdx int
}

// Open implements the Executor Open interface
Expand Down Expand Up @@ -280,7 +284,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
var err error
util.WithRecovery(
func() {
_, err = worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished)
_, err = worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished, e.extraHandleIdx)
},
e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialTableWorker"),
)
Expand All @@ -305,7 +309,7 @@ type partialTableWorker struct {
}

func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask,
finished <-chan struct{}) (count int64, err error) {
finished <-chan struct{}, extraHandleIdx int) (count int64, err error) {
var chk *chunk.Chunk
handleOffset := -1
if w.tableInfo.PKIsHandle {
Expand All @@ -318,7 +322,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str
}
}
} else {
return 0, errors.Errorf("cannot find the column for handle")
handleOffset = extraHandleIdx
}

chk = chunk.NewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize)
Expand Down
37 changes: 37 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package executor_test

import (
"fmt"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testkit"
"strings"
)

func (s *testSuite1) TestSingleTableRead(c *C) {
Expand Down Expand Up @@ -78,3 +80,38 @@ func (s *testSuite1) TestIssue16910(c *C) {
tk.MustExec("insert into t2 values (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (16, 16), (17, 17), (18, 18), (19, 19), (20, 20), (21, 21), (22, 22), (23, 23);")
tk.MustQuery("select /*+ USE_INDEX_MERGE(t1, a, b) */ * from t1 partition (p0) join t2 partition (p1) on t1.a = t2.a where t1.a < 40 or t1.b < 30;").Check(testkit.Rows("1 1 1 1"))
}

func (s *testSuite1) TestIssue23569(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists tt;")
tk.MustExec(`create table tt(id bigint(20) NOT NULL,create_time bigint(20) NOT NULL DEFAULT '0' ,driver varchar(64), PRIMARY KEY (id,create_time))
PARTITION BY RANGE ( create_time ) (
PARTITION p201901 VALUES LESS THAN (1577808000),
PARTITION p202001 VALUES LESS THAN (1585670400),
PARTITION p202002 VALUES LESS THAN (1593532800),
PARTITION p202003 VALUES LESS THAN (1601481600),
PARTITION p202004 VALUES LESS THAN (1609430400),
PARTITION p202101 VALUES LESS THAN (1617206400),
PARTITION p202102 VALUES LESS THAN (1625068800),
PARTITION p202103 VALUES LESS THAN (1633017600),
PARTITION p202104 VALUES LESS THAN (1640966400),
PARTITION p202201 VALUES LESS THAN (1648742400),
PARTITION p202202 VALUES LESS THAN (1656604800),
PARTITION p202203 VALUES LESS THAN (1664553600),
PARTITION p202204 VALUES LESS THAN (1672502400),
PARTITION p202301 VALUES LESS THAN (1680278400)
);`)
tk.MustExec("insert tt value(1, 1577807000, 'jack'), (2, 1577809000, 'mike'), (3, 1585670500, 'right'), (4, 1601481500, 'hello');")
tk.MustExec("set @@tidb_enable_index_merge=true;")
rows := tk.MustQuery("explain select count(*) from tt partition(p202003) where _tidb_rowid is null or (_tidb_rowid>=1 and _tidb_rowid<100);").Rows()
containsIndexMerge := false
for _, r := range rows {
if strings.Contains(fmt.Sprintf("%s", r[0]), "IndexMerge") {
containsIndexMerge = true
break
}
}
c.Assert(containsIndexMerge, IsTrue)
tk.MustQuery("select count(*) from tt partition(p202003) where _tidb_rowid is null or (_tidb_rowid>=1 and _tidb_rowid<100);").Check(testkit.Rows("1"))
}
3 changes: 3 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ func (e *PointGetExecutor) getAndLock(ctx context.Context, key kv.Key) (val []by
}

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if len(key) == 0 {
return nil
}
if e.lock {
seVars := e.ctx.GetSessionVars()
lockCtx := newLockCtx(seVars, e.lockWaitTime)
Expand Down
2 changes: 1 addition & 1 deletion executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (e *ShowExec) fetchShowTableStatus() error {
data_free, auto_increment, create_time, update_time, check_time,
table_collation, IFNULL(checksum,''), create_options, table_comment
FROM information_schema.tables
WHERE table_schema=%? ORDER BY table_name`, e.DBName.L)
WHERE lower(table_schema)=%? ORDER BY table_name`, e.DBName.L)
if err != nil {
return errors.Trace(err)
}
Expand Down
36 changes: 30 additions & 6 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -491,12 +490,13 @@ func (s *testSuite5) TestShowTableStatus(c *C) {
// It's not easy to test the result contents because every time the test runs, "Create_time" changed.
tk.MustExec("show table status;")
rs, err := tk.Exec("show table status;")
c.Assert(errors.ErrorStack(err), Equals, "")
c.Assert(err, IsNil)
c.Assert(rs, NotNil)
rows, err := session.GetRows4Test(context.Background(), tk.Se, rs)
c.Assert(errors.ErrorStack(err), Equals, "")
c.Assert(err, IsNil)
err = rs.Close()
c.Assert(errors.ErrorStack(err), Equals, "")
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 1)

for i := range rows {
row := rows[i]
Expand All @@ -513,10 +513,34 @@ func (s *testSuite5) TestShowTableStatus(c *C) {
partition p2 values less than (maxvalue)
);`)
rs, err = tk.Exec("show table status from test like 'tp';")
c.Assert(errors.ErrorStack(err), Equals, "")
c.Assert(err, IsNil)
rows, err = session.GetRows4Test(context.Background(), tk.Se, rs)
c.Assert(errors.ErrorStack(err), Equals, "")
c.Assert(err, IsNil)
c.Assert(rows[0].GetString(16), Equals, "partitioned")

tk.MustExec("create database UPPER_CASE")
tk.MustExec("use UPPER_CASE")
tk.MustExec("create table t (i int)")
rs, err = tk.Exec("show table status")
c.Assert(err, IsNil)
c.Assert(rs, NotNil)
rows, err = session.GetRows4Test(context.Background(), tk.Se, rs)
c.Assert(err, IsNil)
err = rs.Close()
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 1)

tk.MustExec("use upper_case")
rs, err = tk.Exec("show table status")
c.Assert(err, IsNil)
c.Assert(rs, NotNil)
rows, err = session.GetRows4Test(context.Background(), tk.Se, rs)
c.Assert(err, IsNil)
err = rs.Close()
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 1)

tk.MustExec("drop database UPPER_CASE")
}

func (s *testSuite5) TestShowSlow(c *C) {
Expand Down
15 changes: 10 additions & 5 deletions executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (e *ShuffleExec) Open(ctx context.Context) error {

// Close implements the Executor Close interface.
func (e *ShuffleExec) Close() error {
var firstErr error
if !e.prepared {
for _, w := range e.workers {
close(w.inputHolderCh)
Expand All @@ -139,6 +140,9 @@ func (e *ShuffleExec) Close() error {
for _, w := range e.workers {
for range w.inputCh {
}
if err := w.childExec.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
for range e.outputCh { // workers exit before `e.outputCh` is closed.
}
Expand All @@ -150,12 +154,13 @@ func (e *ShuffleExec) Close() error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}

err := e.dataSource.Close()
err1 := e.baseExecutor.Close()
if err != nil {
return errors.Trace(err)
if err := e.dataSource.Close(); err != nil && firstErr == nil {
firstErr = err
}
if err := e.baseExecutor.Close(); err != nil && firstErr == nil {
firstErr = err
}
return errors.Trace(err1)
return errors.Trace(firstErr)
}

func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) {
Expand Down
6 changes: 5 additions & 1 deletion expression/builtin_cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1899,7 +1899,11 @@ func WrapWithCastAsString(ctx sessionctx.Context, expr Expression) Expression {
argLen = -1
}
tp := types.NewFieldType(mysql.TypeVarString)
tp.Charset, tp.Collate = expr.CharsetAndCollation(ctx)
if expr.Coercibility() == CoercibilityExplicit {
tp.Charset, tp.Collate = expr.CharsetAndCollation(ctx)
} else {
tp.Charset, tp.Collate = ctx.GetSessionVars().GetCharsetInfo()
}
tp.Flen, tp.Decimal = argLen, types.UnspecifiedLength
return BuildCastFunction(ctx, expr, tp)
}
Expand Down
Loading

0 comments on commit d68e2ce

Please sign in to comment.