Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-5.2' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Apr 12, 2022
2 parents 553c2d3 + 31b7654 commit 41360e2
Show file tree
Hide file tree
Showing 30 changed files with 407 additions and 39 deletions.
7 changes: 1 addition & 6 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,6 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
opt = &WalkOption{}
}

maxKeys := int64(1000)
if opt.ListCount > 0 {
maxKeys = opt.ListCount
}

prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
Expand All @@ -194,7 +189,7 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
// only need each object's name and size
query.SetAttrSelection([]string{"Name", "Size"})
iter := s.bucket.Objects(ctx, query)
for i := int64(0); i != maxKeys; i++ {
for {
attrs, err := iter.Next()
if err == iterator.Done {
break
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package storage

import (
"context"
"fmt"
"io"
"os"

Expand Down Expand Up @@ -95,6 +96,31 @@ func (r *testStorageSuite) TestGCS(c *C) {
c.Assert(list, Equals, "keykey1key2")
c.Assert(totalSize, Equals, int64(42))

// test 1003 files
totalSize = 0
for i := 0; i < 1000; i += 1 {
err = stg.WriteFile(ctx, fmt.Sprintf("f%d", i), []byte("data"))
c.Assert(err, IsNil)
}
filesSet := make(map[string]struct{}, 1003)
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
filesSet[name] = struct{}{}
totalSize += size
return nil
})
c.Assert(err, IsNil)
c.Assert(totalSize, Equals, int64(42+4000))
_, ok := filesSet["key"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key1"]
c.Assert(ok, IsTrue)
_, ok = filesSet["key2"]
c.Assert(ok, IsTrue)
for i := 0; i < 1000; i += 1 {
_, ok = filesSet[fmt.Sprintf("f%d", i)]
c.Assert(ok, IsTrue)
}

efr, err := stg.Open(ctx, "key2")
c.Assert(err, IsNil)

Expand Down
10 changes: 7 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
if job.IsRollingback() {
// For those column-type-change jobs which don't reorg the data.
if !needChangeColumnData(oldCol, jobParam.newCol) {
return rollbackModifyColumnJob(t, tblInfo, job, oldCol, jobParam.modifyColumnTp)
return rollbackModifyColumnJob(t, tblInfo, job, jobParam.newCol, oldCol, jobParam.modifyColumnTp)
}
// For those column-type-change jobs which reorg the data.
return rollbackModifyColumnJobWithData(t, tblInfo, job, oldCol, jobParam)
Expand Down Expand Up @@ -1459,6 +1459,10 @@ func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.Ind
func (w *worker) doModifyColumn(
d *ddlCtx, t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) {
if oldCol.ID != newCol.ID {
job.State = model.JobStateRollingback
return ver, errKeyColumnDoesNotExits.GenWithStack("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", oldCol.Name, newCol.ID)
}
// Column from null to not null.
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag)
Expand Down Expand Up @@ -1767,9 +1771,9 @@ func checkAddColumnTooManyColumns(colNum int) error {
}

// rollbackModifyColumnJob rollbacks the job when an error occurs.
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, newCol, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
var err error
if modifyColumnTp == mysql.TypeNull {
if oldCol.ID == newCol.ID && modifyColumnTp == mysql.TypeNull {
// field NotNullFlag flag reset.
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.NotNullFlag
// field PreventNullInsertFlag flag reset.
Expand Down
53 changes: 52 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,54 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAlterModifyColumnWithData(c *C) {
sql := "ALTER TABLE t MODIFY COLUMN c int;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[0][2], Equals, "3")
c.Assert(rs[0].Close(), IsNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)")
c.Assert(err, IsNil)
rs, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[1][2], Equals, "33")
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAlterModifyColumnToNotNullWithData(c *C) {
sql := "ALTER TABLE t MODIFY COLUMN c int not null;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[0][2], Equals, "3")
c.Assert(rs[0].Close(), IsNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, null, 44, 55)")
c.Assert(err, NotNil)
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)")
c.Assert(err, IsNil)
rs, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
c.Assert(err, IsNil)
c.Assert(sRows[1][2], Equals, "33")
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColumn(c *C) {
sql1 := "ALTER TABLE t ADD COLUMN f INT GENERATED ALWAYS AS(a+1);"
sql2 := "ALTER TABLE t MODIFY COLUMN a tinyint;"
Expand Down Expand Up @@ -1334,12 +1382,15 @@ func (s *testStateChangeSuiteBase) prepareTestControlParallelExecSQL(c *C) (sess
func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c double default null, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
c.Assert(err, IsNil)
if len(s.preSQL) != 0 {
_, err := s.se.Execute(context.Background(), s.preSQL)
c.Assert(err, IsNil)
}
_, err = s.se.Execute(context.Background(), "insert into t values(1, 2, 3.1234, 4, 5)")
c.Assert(err, IsNil)

defer func() {
_, err := s.se.Execute(context.Background(), "drop table t")
c.Assert(err, IsNil)
Expand Down
3 changes: 3 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,10 @@ func (s *testSerialSuite) TestTableLocksEnable(c *C) {
})

tk.MustExec("lock tables t1 write")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1235 LOCK TABLES is not supported. To enable this experimental feature, set 'enable-table-lock' in the configuration file."))
checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone)
tk.MustExec("unlock tables")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1235 UNLOCK TABLES is not supported. To enable this experimental feature, set 'enable-table-lock' in the configuration file."))
}

func (s *testSerialDBSuite) TestAutoRandomOnTemporaryTable(c *C) {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,11 @@ error = '''
Incorrect usage of %s and %s
'''

["executor:1235"]
error = '''
%-.32s is not supported. To enable this experimental feature, set '%-.32s' in the configuration file.
'''

["executor:1242"]
error = '''
Subquery returns more than 1 row
Expand Down
9 changes: 4 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,13 +1014,12 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
if _, ok := a.StmtNode.(*ast.CommitStmt); ok {
slowItems.PrevStmt = sessVars.PrevStmt.String()
}
slowLog := sessVars.SlowLogFormat(slowItems)
if trace.IsEnabled() {
trace.Log(a.GoCtx, "details", sessVars.SlowLogFormat(slowItems))
trace.Log(a.GoCtx, "details", slowLog)
}
if costTime < threshold {
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(slowItems))
} else {
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(slowItems))
logutil.SlowQueryLogger.Warn(slowLog)
if costTime >= threshold {
if sessVars.InRestrictedSQL {
totalQueryProcHistogramInternal.Observe(costTime.Seconds())
totalCopProcHistogramInternal.Observe(execDetail.TimeDetail.ProcessTime.Seconds())
Expand Down
9 changes: 6 additions & 3 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,20 +862,23 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
}

func (e *DDLExec) executeLockTables(s *ast.LockTablesStmt) error {
if !config.TableLockEnabled() {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(ErrFuncNotEnabled.GenWithStackByArgs("LOCK TABLES", "enable-table-lock"))
return nil
}

for _, tb := range s.TableLocks {
if _, ok := e.getLocalTemporaryTable(tb.Table.Schema, tb.Table.Name); ok {
return ddl.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("LOCK TABLES")
}
}

if !config.TableLockEnabled() {
return nil
}
return domain.GetDomain(e.ctx).DDL().LockTables(e.ctx, s)
}

func (e *DDLExec) executeUnlockTables(_ *ast.UnlockTablesStmt) error {
if !config.TableLockEnabled() {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(ErrFuncNotEnabled.GenWithStackByArgs("UNLOCK TABLES", "enable-table-lock"))
return nil
}
lockedTables := e.ctx.GetAllTableLocks()
Expand Down
1 change: 1 addition & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
ErrCTEMaxRecursionDepth = dbterror.ClassExecutor.NewStd(mysql.ErrCTEMaxRecursionDepth)
ErrDataInConsistentExtraIndex = dbterror.ClassExecutor.NewStd(mysql.ErrDataInConsistentExtraIndex)
ErrDataInConsistentMisMatchIndex = dbterror.ClassExecutor.NewStd(mysql.ErrDataInConsistentMisMatchIndex)
ErrFuncNotEnabled = dbterror.ClassExecutor.NewStdErr(mysql.ErrNotSupportedYet, parser_mysql.Message("%-.32s is not supported. To enable this experimental feature, set '%-.32s' in the configuration file.", nil))

errUnsupportedFlashbackTmpTable = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("Recover/flashback table is not supported on temporary tables", nil))
errTruncateWrongInsertValue = dbterror.ClassTable.NewStdErr(mysql.ErrTruncatedWrongValue, parser_mysql.Message("Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", nil))
Expand Down
21 changes: 21 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,24 @@ func (s *testSuite2) TestExplainAnalyzeCTEMemoryAndDiskInfo(c *C) {
c.Assert(rows[4][7].(string), Not(Equals), "N/A")
c.Assert(rows[4][8].(string), Not(Equals), "N/A")
}

func (s *testSuite) TestFix29401(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists tt123;")
tk.MustExec(`CREATE TABLE tt123 (
id int(11) NOT NULL,
a bigint(20) DEFAULT NULL,
b char(20) DEFAULT NULL,
c datetime DEFAULT NULL,
d double DEFAULT NULL,
e json DEFAULT NULL,
f decimal(40,6) DEFAULT NULL,
PRIMARY KEY (id) /*T![clustered_index] CLUSTERED */,
KEY a (a),
KEY b (b),
KEY c (c),
KEY d (d),
KEY f (f)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`)
tk.MustExec(" explain select /*+ inl_hash_join(t1) */ * from tt123 t1 join tt123 t2 on t1.b=t2.e;")
}
4 changes: 2 additions & 2 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
}
if extraLen > 0 {
evalBufferTypes = append(evalBufferTypes, e.SelectExec.base().retFieldTypes[numWritableCols:]...)
evalBufferTypes = append(evalBufferTypes, e.SelectExec.base().retFieldTypes[e.rowLen:]...)
}
for _, col := range e.Table.Cols() {
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
Expand All @@ -364,7 +364,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {
evalBufferTypes = append(evalBufferTypes, types.NewFieldType(mysql.TypeLonglong))
}
e.evalBuffer4Dup = chunk.MutRowFromTypes(evalBufferTypes)
e.curInsertVals = chunk.MutRowFromTypes(evalBufferTypes[numWritableCols:])
e.curInsertVals = chunk.MutRowFromTypes(evalBufferTypes[numWritableCols+extraLen:])
e.row4Update = make([]types.Datum, 0, len(evalBufferTypes))
}

Expand Down
15 changes: 12 additions & 3 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,10 @@ func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Dat
if retryInfo.Retrying {
id, ok := retryInfo.GetCurrAutoIncrementID()
if ok {
d.SetAutoID(id, c.Flag)
err := setDatumAutoIDAndCast(e.ctx, &d, id, c)
if err != nil {
return types.Datum{}, err
}
return d, nil
}
}
Expand Down Expand Up @@ -858,7 +861,10 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
if retryInfo.Retrying {
autoRandomID, ok := retryInfo.GetCurrAutoRandomID()
if ok {
d.SetAutoID(autoRandomID, c.Flag)
err := setDatumAutoIDAndCast(e.ctx, &d, autoRandomID, c)
if err != nil {
return types.Datum{}, err
}
return d, nil
}
}
Expand All @@ -884,7 +890,10 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
return types.Datum{}, err
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
d.SetAutoID(recordID, c.Flag)
err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c)
if err != nil {
return types.Datum{}, err
}
retryInfo.AddAutoRandomID(recordID)
return d, nil
}
Expand Down
60 changes: 60 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,36 @@ func (s *testSuite8) TestInsertOnDuplicateKey(c *C) {
c.Assert(tk.Se.AffectedRows(), Equals, uint64(2))
tk.MustQuery("select * from a").Check(testkit.Rows("2"))

// Test issue 28078.
// Use different types of columns so that there's likely to be error if the types mismatches.
tk.MustExec("drop table if exists a, b")
tk.MustExec("create table a(id int, a1 timestamp, a2 varchar(10), a3 float, unique(id))")
tk.MustExec("create table b(id int, b1 time, b2 varchar(10), b3 int)")
tk.MustExec("insert into a values (1, '2022-01-04 07:02:04', 'a', 1.1), (2, '2022-01-04 07:02:05', 'b', 2.2)")
tk.MustExec("insert into b values (2, '12:34:56', 'c', 10), (3, '01:23:45', 'd', 20)")
tk.MustExec("insert into a (id) select id from b on duplicate key update a.a2 = b.b2, a.a3 = 3.3")
c.Assert(tk.Se.AffectedRows(), Equals, uint64(3))
tk.MustQuery("select * from a").Check(testutil.RowsWithSep("/",
"1/2022-01-04 07:02:04/a/1.1",
"2/2022-01-04 07:02:05/c/3.3",
"3/<nil>/<nil>/<nil>"))
tk.MustExec("insert into a (id) select 4 from b where b3 = 20 on duplicate key update a.a3 = b.b3")
c.Assert(tk.Se.AffectedRows(), Equals, uint64(1))
tk.MustQuery("select * from a").Check(testutil.RowsWithSep("/",
"1/2022-01-04 07:02:04/a/1.1",
"2/2022-01-04 07:02:05/c/3.3",
"3/<nil>/<nil>/<nil>",
"4/<nil>/<nil>/<nil>"))
tk.MustExec("insert into a (a2, a3) select 'x', 1.2 from b on duplicate key update a.a2 = b.b3")
c.Assert(tk.Se.AffectedRows(), Equals, uint64(2))
tk.MustQuery("select * from a").Check(testutil.RowsWithSep("/",
"1/2022-01-04 07:02:04/a/1.1",
"2/2022-01-04 07:02:05/c/3.3",
"3/<nil>/<nil>/<nil>",
"4/<nil>/<nil>/<nil>",
"<nil>/<nil>/x/1.2",
"<nil>/<nil>/x/1.2"))

// reproduce insert on duplicate key update bug under new row format.
tk.MustExec(`drop table if exists t1`)
tk.MustExec(`create table t1(c1 decimal(6,4), primary key(c1))`)
Expand Down Expand Up @@ -1756,6 +1786,36 @@ func (s *testSuite13) TestIssue26762(c *C) {
c.Assert(err.Error(), Equals, `[table:1292]Incorrect date value: '2020-02-31' for column 'c1' at row 1`)
}

// TestInsertIssue29892 test the double type with auto_increment problem, just leverage the serial test suite.
func (s *testAutoRandomSuite) TestInsertIssue29892(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)

tk.MustExec("set global tidb_txn_mode='optimistic';")
tk.MustExec("set global tidb_disable_txn_auto_retry=false;")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a double auto_increment key, b int)")
tk.MustExec("insert into t values (146576794, 1)")

tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec(`use test`)
tk1.MustExec("begin")
tk1.MustExec("insert into t(b) select 1")

tk2 := testkit.NewTestKit(c, s.store)
tk2.MustExec(`use test`)
tk2.MustExec("begin")
tk2.MustExec("insert into t values (146576795, 1)")
tk2.MustExec("insert into t values (146576796, 1)")
tk2.MustExec("commit")

// since the origin auto-id (146576795) is cached in retryInfo, it will be fetched again to do the retry again,
// which will duplicate with what has been inserted in tk1.
_, err := tk1.Exec("commit")
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "Duplicate entry"), Equals, true)
}

// https://github.com/pingcap/tidb/issues/29483.
func (s *testSuite13) TestReplaceAllocatingAutoID(c *C) {
tk := testkit.NewTestKit(c, s.store)
Expand Down
Loading

0 comments on commit 41360e2

Please sign in to comment.