Skip to content

Commit

Permalink
Merge branch 'release-6.5' into cherry-pick-41003-to-release-6.5
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Feb 7, 2023
2 parents 9454c51 + 45f2d06 commit 1472b02
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 52 deletions.
39 changes: 35 additions & 4 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -96,6 +95,16 @@ func recoverPDSchedule(pdScheduleParam map[string]interface{}) error {
return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam)
}

func getStoreGlobalMinSafeTS(s kv.Storage) time.Time {
minSafeTS := s.GetMinSafeTS(kv.GlobalTxnScope)
// Inject mocked SafeTS for test.
failpoint.Inject("injectSafeTS", func(val failpoint.Value) {
injectTS := val.(int)
minSafeTS = uint64(injectTS)
})
return oracle.GetTimeFromTS(minSafeTS)
}

// ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS).
func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
Expand All @@ -108,12 +117,34 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) {
oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS)
if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
}
if oracle.GetTimeFromTS(flashBackTS).After(expression.GetMinSafeTime(sctx)) {
return errors.Errorf("cannot set flashback timestamp to too close to present time")

flashbackGetMinSafeTimeTimeout := time.Minute
failpoint.Inject("changeFlashbackGetMinSafeTimeTimeout", func(val failpoint.Value) {
t := val.(int)
flashbackGetMinSafeTimeTimeout = time.Duration(t)
})

start := time.Now()
minSafeTime := getStoreGlobalMinSafeTS(sctx.GetStore())
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for oracleFlashbackTS.After(minSafeTime) {
if time.Since(start) >= flashbackGetMinSafeTimeTimeout {
return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime)
}
select {
case <-ticker.C:
minSafeTime = getStoreGlobalMinSafeTS(sctx.GetStore())
break
case <-ctx.Done():
return ctx.Err()
}
}

gcSafePoint, err := gcutil.GetGCSafePoint(sctx)
if err != nil {
return err
Expand Down
28 changes: 8 additions & 20 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(time.Now().Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

oldValue := map[string]interface{}{
Expand Down Expand Up @@ -131,8 +129,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
require.EqualValues(t, finishValue["merge-schedule-limit"], 1)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestAddDDLDuringFlashback(t *testing.T) {
Expand All @@ -147,9 +144,7 @@ func TestAddDDLDuringFlashback(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand All @@ -169,8 +164,7 @@ func TestAddDDLDuringFlashback(t *testing.T) {

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestGlobalVariablesOnFlashback(t *testing.T) {
Expand All @@ -185,9 +179,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand Down Expand Up @@ -239,8 +231,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestCancelFlashbackCluster(t *testing.T) {
Expand All @@ -252,9 +243,7 @@ func TestCancelFlashbackCluster(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand All @@ -280,6 +269,5 @@ func TestCancelFlashbackCluster(t *testing.T) {
dom.DDL().SetHook(originHook)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
24 changes: 16 additions & 8 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,13 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co
defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End()
var task *indexMergeTableTask
util.WithRecovery(
func() { task = worker.pickAndExecTask(ctx1) },
worker.handlePickAndExecTaskPanic(ctx1, task),
// Note we use the address of `task` as the argument of both `pickAndExecTask` and `handlePickAndExecTaskPanic`
// because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible
// in `handlePickAndExecTaskPanic` since it will get `doneCh` from `task`. Golang always pass argument by value,
// so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is
// not visible in `handlePickAndExecTaskPanic`
func() { worker.pickAndExecTask(ctx1, &task) },
worker.handlePickAndExecTaskPanic(ctx1, &task),
)
cancel()
e.tblWorkerWg.Done()
Expand Down Expand Up @@ -1107,38 +1112,41 @@ type indexMergeTableScanWorker struct {
memTracker *memory.Tracker
}

func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task *indexMergeTableTask) {
func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **indexMergeTableTask) {
var ok bool
for {
waitStart := time.Now()
select {
case task, ok = <-w.workCh:
case *task, ok = <-w.workCh:
if !ok {
return
}
case <-w.finished:
return
}
execStart := time.Now()
err := w.executeTask(ctx, task)
err := w.executeTask(ctx, *task)
if w.stats != nil {
atomic.AddInt64(&w.stats.WaitTime, int64(execStart.Sub(waitStart)))
atomic.AddInt64(&w.stats.FetchRow, int64(time.Since(execStart)))
atomic.AddInt64(&w.stats.TableTaskNum, 1)
}
task.doneCh <- err
failpoint.Inject("testIndexMergePickAndExecTaskPanic", nil)
(*task).doneCh <- err
}
}

func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task *indexMergeTableTask) func(r interface{}) {
func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task **indexMergeTableTask) func(r interface{}) {
return func(r interface{}) {
if r == nil {
return
}

err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r)
logutil.Logger(ctx).Error(err4Panic.Error())
task.doneCh <- err4Panic
if *task != nil {
(*task).doneCh <- err4Panic
}
}
}

Expand Down
19 changes: 19 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@ func TestSingleTableRead(t *testing.T) {
tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(a) from t1 where a < 2 or b > 4").Check(testkit.Rows("6"))
}

func TestIndexMergePickAndExecTaskPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(id int primary key, a int, b int, c int, d int)")
tk.MustExec("create index t1a on t1(a)")
tk.MustExec("create index t1b on t1(b)")
tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)")
tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1",
"5 5 5 5 5"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic", "panic(\"pickAndExecTaskPanic\")"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic"))
}()
err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id")
require.Contains(t, err.Error(), "pickAndExecTaskPanic")
}

func TestJoin(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
61 changes: 47 additions & 14 deletions executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,7 @@ func TestRecoverClusterMeetError(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

// Get GC safe point error.
Expand Down Expand Up @@ -337,8 +335,7 @@ func TestRecoverClusterMeetError(t *testing.T) {
tk.MustExec("create table t(a int);")
tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Detected schema change due to another DDL job during \\[.*, now\\), can't do flashback")

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
}

Expand All @@ -347,6 +344,7 @@ func TestFlashbackWithSafeTs(t *testing.T) {
tk := testkit.NewTestKit(t, store)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout", `return(0)`))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()
Expand All @@ -371,9 +369,8 @@ func TestFlashbackWithSafeTs(t *testing.T) {
compareWithSafeTS: 0,
},
{
name: "10 seconds ago to now, safeTS 5 secs ago",
// Add flashbackTs.Add(-500*time.Millisecond) to avoid flashback time range overlapped.
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)),
name: "10 seconds ago to now, safeTS 5 secs ago",
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)),
compareWithSafeTS: -1,
},
Expand All @@ -386,19 +383,55 @@ func TestFlashbackWithSafeTs(t *testing.T) {
}
for _, testcase := range testcases {
t.Log(testcase.name)
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)))
if testcase.compareWithSafeTS == 1 {
start := time.Now()
tk.MustContainErrMsg(testcase.sql,
"cannot set flashback timestamp to too close to present time")
"cannot set flashback timestamp after min-resolved-ts")
// When set `flashbackGetMinSafeTimeTimeout` = 0, no retry for `getStoreGlobalMinSafeTS`.
require.Less(t, time.Since(start), time.Second)
} else {
tk.MustExec(testcase.sql)
}
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout"))
}

func TestFlashbackRetryGetMinSafeTime(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()

// Set GC safe point.
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

time.Sleep(time.Second)
ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
flashbackTs := oracle.GetTimeFromTS(ts)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", oracle.GoTimeToTS(flashbackTs.Add(-10*time.Minute)))))

go func() {
time.Sleep(2 * time.Second)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", oracle.GoTimeToTS(flashbackTs.Add(10*time.Minute)))))
}()

start := time.Now()
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs))
duration := time.Since(start)
require.Greater(t, duration, 2*time.Second)
require.Less(t, duration, 5*time.Second)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
}

Expand Down
7 changes: 2 additions & 5 deletions tests/realtikvtest/brietest/flashback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ func TestFlashback(t *testing.T) {
require.NoError(t, err)

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

tk.MustExec("insert t values (4), (5), (6)")
Expand All @@ -86,7 +84,6 @@ func TestFlashback(t *testing.T) {
require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3")
require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3")

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
}
4 changes: 3 additions & 1 deletion util/chunk/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ func (a *allocator) Reset() {

//column objects and put them to the column allocator for reuse.
for id, pool := range a.columnAlloc.pool {
for _, col := range pool.allocColumns {
for i, col := range pool.allocColumns {
if (len(pool.freeColumns) < a.columnAlloc.freeColumnsPerType) && checkColumnType(id, col) {
col.reset()
pool.freeColumns = append(pool.freeColumns, col)
}
pool.allocColumns[i] = nil
}
pool.allocColumns = pool.allocColumns[:0]
}
Expand Down Expand Up @@ -196,6 +197,7 @@ func (cList *columnList) pop() *Column {
return nil
}
col := cList.freeColumns[len(cList.freeColumns)-1]
cList.freeColumns[len(cList.freeColumns)-1] = nil
cList.freeColumns = cList.freeColumns[:len(cList.freeColumns)-1]
return col
}
Expand Down

0 comments on commit 1472b02

Please sign in to comment.