From 8c118cecc2a43130ea214920a87b48bd5160becb Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 8 Apr 2019 15:21:37 +0800 Subject: [PATCH 01/10] *: clean code for restore table (#9090) --- ddl/ddl.go | 2 +- ddl/ddl_api.go | 6 +-- ddl/ddl_worker.go | 16 +++--- ddl/serial_test.go | 94 ++++++++++++++++++------------------ ddl/table.go | 48 +++++++++--------- executor/builder.go | 12 ----- executor/ddl.go | 68 +++++++------------------- go.mod | 2 +- go.sum | 4 +- infoschema/builder.go | 2 +- planner/core/common_plans.go | 8 --- planner/core/planbuilder.go | 11 ++--- planner/core/preprocess.go | 10 ++-- 13 files changed, 112 insertions(+), 171 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index c21ac119c4def..e2ca983d82902 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -242,7 +242,7 @@ type DDL interface { CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error) - RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) + RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error) CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, unique bool, indexName model.CIStr, columnNames []*ast.IndexColName, indexOption *ast.IndexOption) error diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index faf2953f56ed2..8a83294f627a9 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1207,7 +1207,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e return errors.Trace(err) } -func (d *ddl) RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) { +func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) { is := d.GetInfoSchemaWithInterceptor(ctx) // Check schema exist. schema, ok := is.SchemaByID(schemaID) @@ -1225,9 +1225,9 @@ func (d *ddl) RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, sche job := &model.Job{ SchemaID: schemaID, TableID: tbInfo.ID, - Type: model.ActionRestoreTable, + Type: model.ActionRecoverTable, BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, restoreTableCheckFlagNone}, + Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, recoverTableCheckFlagNone}, } err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 7292c42a0da28..3a0e9c8f74459 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -285,8 +285,8 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { } } switch job.Type { - case model.ActionRestoreTable: - err = finishRestoreTable(w, t, job) + case model.ActionRecoverTable: + err = finishRecoverTable(w, t, job) } if err != nil { return errors.Trace(err) @@ -303,15 +303,15 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { return errors.Trace(err) } -func finishRestoreTable(w *worker, t *meta.Meta, job *model.Job) error { +func finishRecoverTable(w *worker, t *meta.Meta, job *model.Job) error { tbInfo := &model.TableInfo{} - var autoID, dropJobID, restoreTableCheckFlag int64 + var autoID, dropJobID, recoverTableCheckFlag int64 var snapshotTS uint64 - err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &restoreTableCheckFlag) + err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag) if err != nil { return errors.Trace(err) } - if restoreTableCheckFlag == restoreTableCheckFlagEnableGC { + if recoverTableCheckFlag == recoverTableCheckFlagEnableGC { err = enableGC(w) if err != nil { return errors.Trace(err) @@ -532,8 +532,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onAddTablePartition(t, job) case model.ActionModifyTableCharsetAndCollate: ver, err = onModifyTableCharsetAndCollate(t, job) - case model.ActionRestoreTable: - ver, err = w.onRestoreTable(d, t, job) + case model.ActionRecoverTable: + ver, err = w.onRecoverTable(d, t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/serial_test.go b/ddl/serial_test.go index 0dbc9c2b20a00..1b1a50313b357 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -128,10 +128,10 @@ func (s *testSerialSuite) TestCancelAddIndexPanic(c *C) { c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job") } -func (s *testSerialSuite) TestRestoreTableByJobID(c *C) { +func (s *testSerialSuite) TestRecoverTableByJobID(c *C) { tk := testkit.NewTestKit(c, s.store) - tk.MustExec("create database if not exists test_restore") - tk.MustExec("use test_restore") + tk.MustExec("create database if not exists test_recover") + tk.MustExec("use test_recover") tk.MustExec("drop table if exists t_recover") tk.MustExec("create table t_recover (a int);") defer func(originGC bool) { @@ -162,19 +162,19 @@ func (s *testSerialSuite) TestRestoreTableByJobID(c *C) { rows, err := session.GetRows4Test(context.Background(), tk.Se, rs) c.Assert(err, IsNil) row := rows[0] - c.Assert(row.GetString(1), Equals, "test_restore") + c.Assert(row.GetString(1), Equals, "test_recover") c.Assert(row.GetString(3), Equals, "drop table") jobID := row.GetInt64(0) // if GC safe point is not exists in mysql.tidb - _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID)) + _, err = tk.Exec(fmt.Sprintf("recover table by job %d", jobID)) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "can not get 'tikv_gc_safe_point'") // set GC safe point tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) // if GC enable is not exists in mysql.tidb - _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID)) + _, err = tk.Exec(fmt.Sprintf("recover table by job %d", jobID)) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[ddl:-1]can not get 'tikv_gc_enable'") @@ -183,7 +183,7 @@ func (s *testSerialSuite) TestRestoreTableByJobID(c *C) { // recover job is before GC safe point tk.MustExec(fmt.Sprintf(safePointSQL, timeAfterDrop)) - _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID)) + _, err = tk.Exec(fmt.Sprintf("recover table by job %d", jobID)) c.Assert(err, NotNil) c.Assert(strings.Contains(err.Error(), "snapshot is older than GC safe point"), Equals, true) @@ -191,14 +191,14 @@ func (s *testSerialSuite) TestRestoreTableByJobID(c *C) { tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) // if there is a new table with the same name, should return failed. tk.MustExec("create table t_recover (a int);") - _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID)) + _, err = tk.Exec(fmt.Sprintf("recover table by job %d", jobID)) c.Assert(err.Error(), Equals, infoschema.ErrTableExists.GenWithStackByArgs("t_recover").Error()) - // drop the new table with the same name, then restore table. + // drop the new table with the same name, then recover table. tk.MustExec("drop table t_recover") - // do restore table. - tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID)) + // do recover table. + tk.MustExec(fmt.Sprintf("recover table by job %d", jobID)) // check recover table meta and data record. tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3")) @@ -206,8 +206,8 @@ func (s *testSerialSuite) TestRestoreTableByJobID(c *C) { tk.MustExec("insert into t_recover values (4),(5),(6)") tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6")) - // restore table by none exits job. - _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", 10000000)) + // recover table by none exits job. + _, err = tk.Exec(fmt.Sprintf("recover table by job %d", 10000000)) c.Assert(err, NotNil) // Disable GC by manual first, then after recover table, the GC enable status should also be disabled. @@ -221,11 +221,11 @@ func (s *testSerialSuite) TestRestoreTableByJobID(c *C) { rows, err = session.GetRows4Test(context.Background(), tk.Se, rs) c.Assert(err, IsNil) row = rows[0] - c.Assert(row.GetString(1), Equals, "test_restore") + c.Assert(row.GetString(1), Equals, "test_recover") c.Assert(row.GetString(3), Equals, "drop table") jobID = row.GetInt64(0) - tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID)) + tk.MustExec(fmt.Sprintf("recover table by job %d", jobID)) // check recover table meta and data record. tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1")) @@ -238,10 +238,10 @@ func (s *testSerialSuite) TestRestoreTableByJobID(c *C) { c.Assert(gcEnable, Equals, false) } -func (s *testSerialSuite) TestRestoreTableByTableName(c *C) { +func (s *testSerialSuite) TestRecoverTableByTableName(c *C) { tk := testkit.NewTestKit(c, s.store) - tk.MustExec("create database if not exists test_restore") - tk.MustExec("use test_restore") + tk.MustExec("create database if not exists test_recover") + tk.MustExec("use test_recover") tk.MustExec("drop table if exists t_recover, t_recover2") tk.MustExec("create table t_recover (a int);") defer func(originGC bool) { @@ -268,14 +268,14 @@ func (s *testSerialSuite) TestRestoreTableByTableName(c *C) { tk.MustExec("drop table t_recover") // if GC safe point is not exists in mysql.tidb - _, err := tk.Exec("admin restore table t_recover") + _, err := tk.Exec("recover table t_recover") c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "can not get 'tikv_gc_safe_point'") // set GC safe point tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) // if GC enable is not exists in mysql.tidb - _, err = tk.Exec("admin restore table t_recover") + _, err = tk.Exec("recover table t_recover") c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[ddl:-1]can not get 'tikv_gc_enable'") @@ -284,7 +284,7 @@ func (s *testSerialSuite) TestRestoreTableByTableName(c *C) { // recover job is before GC safe point tk.MustExec(fmt.Sprintf(safePointSQL, timeAfterDrop)) - _, err = tk.Exec("admin restore table t_recover") + _, err = tk.Exec("recover table t_recover") c.Assert(err, NotNil) c.Assert(strings.Contains(err.Error(), "snapshot is older than GC safe point"), Equals, true) @@ -292,14 +292,14 @@ func (s *testSerialSuite) TestRestoreTableByTableName(c *C) { tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) // if there is a new table with the same name, should return failed. tk.MustExec("create table t_recover (a int);") - _, err = tk.Exec("admin restore table t_recover") + _, err = tk.Exec("recover table t_recover") c.Assert(err.Error(), Equals, infoschema.ErrTableExists.GenWithStackByArgs("t_recover").Error()) - // drop the new table with the same name, then restore table. + // drop the new table with the same name, then recover table. tk.MustExec("rename table t_recover to t_recover2") - // do restore table. - tk.MustExec("admin restore table t_recover") + // do recover table. + tk.MustExec("recover table t_recover") // check recover table meta and data record. tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3")) @@ -309,8 +309,8 @@ func (s *testSerialSuite) TestRestoreTableByTableName(c *C) { // check rebase auto id. tk.MustQuery("select a,_tidb_rowid from t_recover;").Check(testkit.Rows("1 1", "2 2", "3 3", "4 5001", "5 5002", "6 5003")) - // restore table by none exits job. - _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", 10000000)) + // recover table by none exits job. + _, err = tk.Exec(fmt.Sprintf("recover table by job %d", 10000000)) c.Assert(err, NotNil) // Disable GC by manual first, then after recover table, the GC enable status should also be disabled. @@ -320,7 +320,7 @@ func (s *testSerialSuite) TestRestoreTableByTableName(c *C) { tk.MustExec("delete from t_recover where a > 1") tk.MustExec("drop table t_recover") - tk.MustExec("admin restore table t_recover") + tk.MustExec("recover table t_recover") // check recover table meta and data record. tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1")) @@ -333,10 +333,10 @@ func (s *testSerialSuite) TestRestoreTableByTableName(c *C) { c.Assert(gcEnable, Equals, false) } -func (s *testSerialSuite) TestRestoreTableByJobIDFail(c *C) { +func (s *testSerialSuite) TestRecoverTableByJobIDFail(c *C) { tk := testkit.NewTestKit(c, s.store) - tk.MustExec("create database if not exists test_restore") - tk.MustExec("use test_restore") + tk.MustExec("create database if not exists test_recover") + tk.MustExec("use test_recover") tk.MustExec("drop table if exists t_recover") tk.MustExec("create table t_recover (a int);") defer func(originGC bool) { @@ -364,7 +364,7 @@ func (s *testSerialSuite) TestRestoreTableByJobIDFail(c *C) { rows, err := session.GetRows4Test(context.Background(), tk.Se, rs) c.Assert(err, IsNil) row := rows[0] - c.Assert(row.GetString(1), Equals, "test_restore") + c.Assert(row.GetString(1), Equals, "test_recover") c.Assert(row.GetString(3), Equals, "drop table") jobID := row.GetInt64(0) @@ -376,21 +376,21 @@ func (s *testSerialSuite) TestRestoreTableByJobIDFail(c *C) { // set hook hook := &ddl.TestDDLCallback{} hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.Type == model.ActionRestoreTable { + if job.Type == model.ActionRecoverTable { gofail.Enable("github.com/pingcap/tidb/store/tikv/mockCommitError", `return(true)`) - gofail.Enable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr", `return(true)`) + gofail.Enable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr", `return(true)`) } } origHook := s.dom.DDL().GetHook() defer s.dom.DDL().(ddl.DDLForTest).SetHook(origHook) s.dom.DDL().(ddl.DDLForTest).SetHook(hook) - // do restore table. - tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID)) + // do recover table. + tk.MustExec(fmt.Sprintf("recover table by job %d", jobID)) gofail.Disable("github.com/pingcap/tidb/store/tikv/mockCommitError") - gofail.Disable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr") + gofail.Disable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr") - // make sure enable GC after restore table. + // make sure enable GC after recover table. enable, err := gcutil.CheckGCEnable(tk.Se) c.Assert(err, IsNil) c.Assert(enable, Equals, true) @@ -402,10 +402,10 @@ func (s *testSerialSuite) TestRestoreTableByJobIDFail(c *C) { tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6")) } -func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) { +func (s *testSerialSuite) TestRecoverTableByTableNameFail(c *C) { tk := testkit.NewTestKit(c, s.store) - tk.MustExec("create database if not exists test_restore") - tk.MustExec("use test_restore") + tk.MustExec("create database if not exists test_recover") + tk.MustExec("use test_recover") tk.MustExec("drop table if exists t_recover") tk.MustExec("create table t_recover (a int);") defer func(originGC bool) { @@ -436,21 +436,21 @@ func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) { // set hook hook := &ddl.TestDDLCallback{} hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.Type == model.ActionRestoreTable { + if job.Type == model.ActionRecoverTable { gofail.Enable("github.com/pingcap/tidb/store/tikv/mockCommitError", `return(true)`) - gofail.Enable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr", `return(true)`) + gofail.Enable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr", `return(true)`) } } origHook := s.dom.DDL().GetHook() defer s.dom.DDL().(ddl.DDLForTest).SetHook(origHook) s.dom.DDL().(ddl.DDLForTest).SetHook(hook) - // do restore table. - tk.MustExec("admin restore table t_recover") + // do recover table. + tk.MustExec("recover table t_recover") gofail.Disable("github.com/pingcap/tidb/store/tikv/mockCommitError") - gofail.Disable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr") + gofail.Disable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr") - // make sure enable GC after restore table. + // make sure enable GC after recover table. enable, err := gcutil.CheckGCEnable(tk.Se) c.Assert(err, IsNil) c.Assert(enable, Equals, true) diff --git a/ddl/table.go b/ddl/table.go index 253d2482af2ae..d0dd67816d7db 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -167,17 +167,17 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) { } const ( - restoreTableCheckFlagNone int64 = iota - restoreTableCheckFlagEnableGC - restoreTableCheckFlagDisableGC + recoverTableCheckFlagNone int64 = iota + recoverTableCheckFlagEnableGC + recoverTableCheckFlagDisableGC ) -func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { +func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { schemaID := job.SchemaID tblInfo := &model.TableInfo{} - var autoID, dropJobID, restoreTableCheckFlag int64 + var autoID, dropJobID, recoverTableCheckFlag int64 var snapshotTS uint64 - if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &restoreTableCheckFlag); err != nil { + if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag); err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -195,19 +195,19 @@ func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in return ver, errors.Trace(err) } - // Restore table divide into 2 steps: - // 1. Check GC enable status, to decided whether enable GC after restore table. + // Recover table divide into 2 steps: + // 1. Check GC enable status, to decided whether enable GC after recover table. // a. Why not disable GC before put the job to DDL job queue? - // Think about concurrency problem. If a restore job-1 is doing and already disabled GC, - // then, another restore table job-2 check GC enable will get disable before into the job queue. - // then, after restore table job-2 finished, the GC will be disabled. - // b. Why split into 2 steps? 1 step also can finish this job: check GC -> disable GC -> restore table -> finish job. + // Think about concurrency problem. If a recover job-1 is doing and already disabled GC, + // then, another recover table job-2 check GC enable will get disable before into the job queue. + // then, after recover table job-2 finished, the GC will be disabled. + // b. Why split into 2 steps? 1 step also can finish this job: check GC -> disable GC -> recover table -> finish job. // What if the transaction commit failed? then, the job will retry, but the GC already disabled when first running. // So, after this job retry succeed, the GC will be disabled. - // 2. Do restore table job. + // 2. Do recover table job. // a. Check whether GC enabled, if enabled, disable GC first. - // b. Check GC safe point. If drop table time if after safe point time, then can do restore. - // otherwise, can't restore table, because the records of the table may already delete by gc. + // b. Check GC safe point. If drop table time if after safe point time, then can do recover. + // otherwise, can't recover table, because the records of the table may already delete by gc. // c. Remove GC task of the table from gc_delete_range table. // d. Create table and rebase table auto ID. // e. Finish. @@ -216,9 +216,9 @@ func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in // none -> write only // check GC enable and update flag. if gcEnable { - job.Args[len(job.Args)-1] = restoreTableCheckFlagEnableGC + job.Args[len(job.Args)-1] = recoverTableCheckFlagEnableGC } else { - job.Args[len(job.Args)-1] = restoreTableCheckFlagDisableGC + job.Args[len(job.Args)-1] = recoverTableCheckFlagDisableGC } job.SchemaState = model.StateWriteOnly @@ -229,7 +229,7 @@ func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in } case model.StateWriteOnly: // write only -> public - // do restore table. + // do recover table. if gcEnable { err = disableGC(w) if err != nil { @@ -256,9 +256,9 @@ func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in return ver, errors.Trace(err) } - // gofail: var mockRestoreTableCommitErr bool - // if mockRestoreTableCommitErr && mockRestoreTableCommitErrOnce { - // mockRestoreTableCommitErrOnce = false + // gofail: var mockRecoverTableCommitErr bool + // if mockRecoverTableCommitErr && mockRecoverTableCommitErrOnce { + // mockRecoverTableCommitErrOnce = false // kv.MockCommitErrorEnable() // } @@ -270,13 +270,13 @@ func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) default: - return ver, ErrInvalidTableState.GenWithStack("invalid restore table state %v", tblInfo.State) + return ver, ErrInvalidTableState.GenWithStack("invalid recover table state %v", tblInfo.State) } return ver, nil } -// mockRestoreTableCommitErrOnce uses to make sure `mockRestoreTableCommitErr` only mock error once. -var mockRestoreTableCommitErrOnce = true +// mockRecoverTableCommitErrOnce uses to make sure `mockRecoverTableCommitErr` only mock error once. +var mockRecoverTableCommitErrOnce = true func enableGC(w *worker) error { ctx, err := w.sessPool.get() diff --git a/executor/builder.go b/executor/builder.go index 31ac09a75c354..1960fac2096bb 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -92,8 +92,6 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildCheckIndexRange(v) case *plannercore.ChecksumTable: return b.buildChecksumTable(v) - case *plannercore.RestoreTable: - return b.buildRestoreTable(v) case *plannercore.DDL: return b.buildDDL(v) case *plannercore.Deallocate: @@ -357,16 +355,6 @@ func (b *executorBuilder) buildRecoverIndex(v *plannercore.RecoverIndex) Executo return e } -func (b *executorBuilder) buildRestoreTable(v *plannercore.RestoreTable) Executor { - e := &RestoreTableExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - jobID: v.JobID, - Table: v.Table, - JobNum: v.JobNum, - } - return e -} - func buildCleanupIndexCols(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) []*model.ColumnInfo { columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+1) for _, idxCol := range indexInfo.Columns { diff --git a/executor/ddl.go b/executor/ddl.go index d1e5f582795d2..e157bce138775 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -105,6 +105,8 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error) err = e.executeAlterTable(x) case *ast.RenameTableStmt: err = e.executeRenameTable(x) + case *ast.RecoverTableStmt: + err = e.executeRecoverTable(x) } if err != nil { return e.toErr(err) @@ -296,46 +298,10 @@ func (e *DDLExec) executeAlterTable(s *ast.AlterTableStmt) error { return err } -// RestoreTableExec represents a recover table executor. -// It is built from "admin restore table by job" statement, +// executeRecoverTable represents a recover table executor. +// It is built from "recover table" statement, // is used to recover the table that deleted by mistake. -type RestoreTableExec struct { - baseExecutor - jobID int64 - Table *ast.TableName - JobNum int64 -} - -// Open implements the Executor Open interface. -func (e *RestoreTableExec) Open(ctx context.Context) error { - return e.baseExecutor.Open(ctx) -} - -// Next implements the Executor Open interface. -func (e *RestoreTableExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error) { - // Should commit the previous transaction and create a new transaction. - if err = e.ctx.NewTxn(ctx); err != nil { - return err - } - defer func() { e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false }() - - err = e.executeRestoreTable() - if err != nil { - return err - } - - dom := domain.GetDomain(e.ctx) - // Update InfoSchema in TxnCtx, so it will pass schema check. - is := dom.InfoSchema() - txnCtx := e.ctx.GetSessionVars().TxnCtx - txnCtx.InfoSchema = is - txnCtx.SchemaVersion = is.SchemaMetaVersion() - // DDL will force commit old transaction, after DDL, in transaction status should be false. - e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false) - return nil -} - -func (e *RestoreTableExec) executeRestoreTable() error { +func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error { txn, err := e.ctx.Txn(true) if err != nil { return err @@ -344,10 +310,10 @@ func (e *RestoreTableExec) executeRestoreTable() error { dom := domain.GetDomain(e.ctx) var job *model.Job var tblInfo *model.TableInfo - if e.jobID != 0 { - job, tblInfo, err = getRestoreTableByJobID(e, t, dom) + if s.JobID != 0 { + job, tblInfo, err = e.getRecoverTableByJobID(s, t, dom) } else { - job, tblInfo, err = getRestoreTableByTableName(e, t, dom) + job, tblInfo, err = e.getRecoverTableByTableName(s, t, dom) } if err != nil { return err @@ -361,18 +327,18 @@ func (e *RestoreTableExec) executeRestoreTable() error { if err != nil { return errors.Errorf("recover table_id: %d, get original autoID from snapshot meta err: %s", job.TableID, err.Error()) } - // Call DDL RestoreTable - err = domain.GetDomain(e.ctx).DDL().RestoreTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS) + // Call DDL RecoverTable + err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS) return err } -func getRestoreTableByJobID(e *RestoreTableExec, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) { - job, err := t.GetHistoryDDLJob(e.jobID) +func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) { + job, err := t.GetHistoryDDLJob(s.JobID) if err != nil { return nil, nil, err } if job == nil { - return nil, nil, admin.ErrDDLJobNotFound.GenWithStackByArgs(e.jobID) + return nil, nil, admin.ErrDDLJobNotFound.GenWithStackByArgs(s.JobID) } if job.Type != model.ActionDropTable { return nil, nil, errors.Errorf("Job %v type is %v, not drop table", job.ID, job.Type) @@ -400,7 +366,7 @@ func getRestoreTableByJobID(e *RestoreTableExec, t *meta.Meta, dom *domain.Domai return job, table.Meta(), nil } -func getRestoreTableByTableName(e *RestoreTableExec, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) { +func (e *DDLExec) getRecoverTableByTableName(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) { jobs, err := t.GetAllHistoryDDLJobs() if err != nil { return nil, nil, err @@ -411,7 +377,7 @@ func getRestoreTableByTableName(e *RestoreTableExec, t *meta.Meta, dom *domain.D if err != nil { return nil, nil, err } - schemaName := e.Table.Schema.L + schemaName := s.Table.Schema.L if schemaName == "" { schemaName = e.ctx.GetSessionVars().CurrentDB } @@ -442,7 +408,7 @@ func getRestoreTableByTableName(e *RestoreTableExec, t *meta.Meta, dom *domain.D fmt.Sprintf("(Table ID %d)", job.TableID), ) } - if table.Meta().Name.L == e.Table.Name.L { + if table.Meta().Name.L == s.Table.Name.L { schema, ok := dom.InfoSchema().SchemaByID(job.SchemaID) if !ok { return nil, nil, infoschema.ErrDatabaseNotExists.GenWithStackByArgs( @@ -456,7 +422,7 @@ func getRestoreTableByTableName(e *RestoreTableExec, t *meta.Meta, dom *domain.D } } if tblInfo == nil { - return nil, nil, errors.Errorf("Can't found drop table: %v in ddl history jobs", e.Table.Name) + return nil, nil, errors.Errorf("Can't found drop table: %v in ddl history jobs", s.Table.Name) } return job, tblInfo, nil } diff --git a/go.mod b/go.mod index 0e842c9ea0399..6fa5728227e4d 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 - github.com/pingcap/parser v0.0.0-20190404052804-c29af83ede25 + github.com/pingcap/parser v0.0.0-20190408064140-cdceeb2c5476 github.com/pingcap/pd v2.1.0-rc.4+incompatible github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible github.com/pingcap/tipb v0.0.0-20190107072121-abbec73437b7 diff --git a/go.sum b/go.sum index b5cf095c86cf2..d482caa82c799 100644 --- a/go.sum +++ b/go.sum @@ -119,8 +119,8 @@ github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562 h1:32oF1/8lVnBR2JV github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= -github.com/pingcap/parser v0.0.0-20190404052804-c29af83ede25 h1:K7DB0kOkSETe3/4rpbzF/Iv4IgfkGBNu5EfaXxaiBuc= -github.com/pingcap/parser v0.0.0-20190404052804-c29af83ede25/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/parser v0.0.0-20190408064140-cdceeb2c5476 h1:qKFG6B26Zfgpb7rUYB8PCGQzWB+USDCTmH+rR7rV+ow= +github.com/pingcap/parser v0.0.0-20190408064140-cdceeb2c5476/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/pd v2.1.0-rc.4+incompatible h1:/buwGk04aHO5odk/+O8ZOXGs4qkUjYTJ2UpCJXna8NE= github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E= github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU= diff --git a/infoschema/builder.go b/infoschema/builder.go index 42046820fa321..65eba8279c1b0 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -53,7 +53,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro var oldTableID, newTableID int64 tblIDs := make([]int64, 0, 2) switch diff.Type { - case model.ActionCreateTable, model.ActionRestoreTable: + case model.ActionCreateTable, model.ActionRecoverTable: newTableID = diff.TableID tblIDs = append(tblIDs, newTableID) case model.ActionDropTable, model.ActionDropView: diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index b761b7ca6cada..a5b2312a57d87 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -84,14 +84,6 @@ type RecoverIndex struct { IndexName string } -// RestoreTable is used for recover deleted files by mistake. -type RestoreTable struct { - baseSchemaProducer - JobID int64 - Table *ast.TableName - JobNum int64 -} - // CleanupIndex is used to delete dangling index data. type CleanupIndex struct { baseSchemaProducer diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 737a2dc67aacb..85ffed504a5db 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -625,14 +625,6 @@ func (b *PlanBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) { p := &ShowSlow{ShowSlow: as.ShowSlow} p.SetSchema(buildShowSlowSchema()) ret = p - case ast.AdminRestoreTable: - if len(as.JobIDs) > 0 { - ret = &RestoreTable{JobID: as.JobIDs[0]} - } else if len(as.Tables) > 0 { - ret = &RestoreTable{Table: as.Tables[0], JobNum: as.JobNumber} - } else { - return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as) - } default: return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as) } @@ -1701,6 +1693,9 @@ func (b *PlanBuilder) buildDDL(node ast.DDLNode) (Plan, error) { } b.visitInfo = appendVisitInfo(b.visitInfo, mysql.InsertPriv, v.NewTable.Schema.L, v.NewTable.Name.L, "", authErr) + case *ast.RecoverTableStmt: + // Recover table command can only be executed by administrator. + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil) } p := &DDL{Statement: node} return p, nil diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index f69d94b3111e7..3ebd9f3eb9de2 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -111,11 +111,11 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) { return in, true case *ast.Join: p.checkNonUniqTableAlias(node) - case *ast.AdminStmt: - // The specified table in admin restore syntax maybe already been dropped. - // So skip check table name here, otherwise, admin restore table [table_name] syntax will return - // table not exists error. But admin restore is use to restore the dropped table. So skip children here. - return in, node.Tp == ast.AdminRestoreTable + case *ast.RecoverTableStmt: + // The specified table in recover table statement maybe already been dropped. + // So skip check table name here, otherwise, recover table [table_name] syntax will return + // table not exists error. But recover table statement is use to recover the dropped table. So skip children here. + return in, true default: p.flag &= ^parentIsJoin } From b0f15ba214e54362d4f79952e1e12b1fcfc218b7 Mon Sep 17 00:00:00 2001 From: Keyi Xie Date: Mon, 8 Apr 2019 17:39:38 +0800 Subject: [PATCH 02/10] docs,config: fix misspell (#10067) * fix doc * fix config --- config/config.toml.example | 2 +- docs/design/2018-07-22-enhance-propagations.md | 2 +- docs/design/2018-09-10-adding-tz-env.md | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/config/config.toml.example b/config/config.toml.example index fa43bc04ef3d5..a8e8a15abd23e 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -274,7 +274,7 @@ enable = false # WriteTimeout specifies how long it will wait for writing binlog to pump. write-timeout = "15s" -# If IgnoreError is true, when writting binlog meets error, TiDB would stop writting binlog, +# If IgnoreError is true, when writing binlog meets error, TiDB would stop writing binlog, # but still provide service. ignore-error = false diff --git a/docs/design/2018-07-22-enhance-propagations.md b/docs/design/2018-07-22-enhance-propagations.md index ab5dc46a5810b..2209c4735230c 100644 --- a/docs/design/2018-07-22-enhance-propagations.md +++ b/docs/design/2018-07-22-enhance-propagations.md @@ -153,7 +153,7 @@ Constraint propagation is commonly used as logical plan optimization in traditio * For the columnar storage format to be supported in the future, we may apply some filters directly when accessing the raw storage. - * Reduce the data transfered from TiKV to TiDB. + * Reduce the data transferred from TiKV to TiDB. ### Disadvantages: diff --git a/docs/design/2018-09-10-adding-tz-env.md b/docs/design/2018-09-10-adding-tz-env.md index b95cfef472f6c..3efd2fe26344f 100644 --- a/docs/design/2018-09-10-adding-tz-env.md +++ b/docs/design/2018-09-10-adding-tz-env.md @@ -6,13 +6,13 @@ ## Abstract -When it comes to time-related calculation, it is hard for the distributed system. This proposal tries to resolve two problems: 1. timezone may be inconsistent across multiple `TiDB` instances, 2. performance degradation casued by pushing `System` down to `TiKV`. The impact of this proposal is changing the way of `TiDB` inferring system's timezone name. Before this proposal, the default timezone name pushed down to TiKV is `System` when session's timezone is not set. After this, TiDB evaluates system's timezone name via `TZ` environment variable and the path of the soft link of `/etc/localtime`. If both of them are failed, `TiDB` then push `UTC` to `TiKV`. +When it comes to time-related calculation, it is hard for the distributed system. This proposal tries to resolve two problems: 1. timezone may be inconsistent across multiple `TiDB` instances, 2. performance degradation caused by pushing `System` down to `TiKV`. The impact of this proposal is changing the way of `TiDB` inferring system's timezone name. Before this proposal, the default timezone name pushed down to TiKV is `System` when session's timezone is not set. After this, TiDB evaluates system's timezone name via `TZ` environment variable and the path of the soft link of `/etc/localtime`. If both of them are failed, `TiDB` then push `UTC` to `TiKV`. ## Background After we solved the daylight saving time issue, we found the performance degradation of TiKV side. Thanks for the investigation done by engineers from TiKV. The root cause of such performance degradation is that TiKV infers `System` timezone name via a third party lib, which calls a syscall and costs a lot. In our internal benchmark system, after [this PR](https://github.com/pingcap/tidb/pull/6823), our codebase is 1000 times slower than before. We have to address this. -Another problem needs also to be addressed is the potentially incosistent timezone name across multiple `TiDB` instances. `TiDB` instances may reside at different timezone which could cause incorrect calculation when it comes to time-related calculation. Just getting `TiDB`'s sytem timezone could be broken. We need find a way to ensure the uniqueness of global timezone name across multiple `TiDB`'s timezone name and also to leverage to resolve the performance degradation. +Another problem needs also to be addressed is the potentially incosistent timezone name across multiple `TiDB` instances. `TiDB` instances may reside at different timezone which could cause incorrect calculation when it comes to time-related calculation. Just getting `TiDB`'s system timezone could be broken. We need find a way to ensure the uniqueness of global timezone name across multiple `TiDB`'s timezone name and also to leverage to resolve the performance degradation. ## Proposal @@ -28,7 +28,7 @@ In our case, which means both `TiDB` and `TiKV`, we need care the first and thir In this proposal, we suggest setting `TZ` to a valid IANA timezone name which can be read from `TiDB` later. If `TiDB` can't get `TZ` or the supply of `TZ` is invalid, `TiDB` just falls back to evaluate the path of the soft link of `/etc/localtime`. In addition, a warning message telling the user you should set `TZ` properly will be printed. Setting `TZ` can be done in our `tidb-ansible` project, it is also can be done at user side by `export TZ="Asia/Shanghai"`. If both of them are failed, `TiDB` will use `UTC` as timezone name. -The positive side of this change is resolving performance degradation issue and ensuring the uniqueness of gloabl timezone name in multiple `TiDB` instances. +The positive side of this change is resolving performance degradation issue and ensuring the uniqueness of global timezone name in multiple `TiDB` instances. The negative side is just adding a config item which is a very small matter and the user probably does not care it if we can take care of it and more importantly guarantee the correctness. @@ -46,9 +46,9 @@ The upgrading process need to be handled in particular. `TZ` environment variabl ## Implementation -The implementation is relatively easy. We just get `TZ` environment from system and check whether it is valid or not. If it is invalid, TiDB evaluates the path of soft link of `/etc/localtime`. In addition, a warning message needs to be printed indicating user has to set `TZ` variable properly. For example, if `/etc/localtime` links to `/usr/share/zoneinfo/Asia/Shanghai`, then timezone name `TiDB` gets should be `Asia/Shangahi`. +The implementation is relatively easy. We just get `TZ` environment from system and check whether it is valid or not. If it is invalid, TiDB evaluates the path of soft link of `/etc/localtime`. In addition, a warning message needs to be printed indicating user has to set `TZ` variable properly. For example, if `/etc/localtime` links to `/usr/share/zoneinfo/Asia/Shanghai`, then timezone name `TiDB` gets should be `Asia/Shanghai`. -In order to ensure the uniqueness of global timezone across multiple `TiDB` instances, we need to write timezone name into `variable_value` with variable name `system_tz` in `mysql.tidb`. This cached value can be read once `TiDB` finishs its bootstrap stage. A method `loadLocalStr` can do this job. +In order to ensure the uniqueness of global timezone across multiple `TiDB` instances, we need to write timezone name into `variable_value` with variable name `system_tz` in `mysql.tidb`. This cached value can be read once `TiDB` finishes its bootstrap stage. A method `loadLocalStr` can do this job. ## Open issues (if applicable) From 1e5f620966370f3bc5c48d78c17bcd3a3234066f Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Mon, 8 Apr 2019 19:12:25 +0800 Subject: [PATCH 03/10] executor: close RecordSet to avoid leak test failure (#10063) --- executor/executor_test.go | 10 +++++----- executor/set_test.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index b6b95217a235e..0c289add4101e 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -3433,9 +3433,9 @@ func (s *testSuite3) TestSelectPartition(c *C) { tk.MustQuery("select b from tr partition (r1,R3) order by a").Check(testkit.Rows("4", "7", "8")) // test select unknown partition error - _, err := tk.Exec("select b from th partition (p0,p4)") + err := tk.ExecToErr("select b from th partition (p0,p4)") c.Assert(err.Error(), Equals, "[table:1735]Unknown partition 'p4' in table 'th'") - _, err = tk.Exec("select b from tr partition (r1,r4)") + err = tk.ExecToErr("select b from tr partition (r1,r4)") c.Assert(err.Error(), Equals, "[table:1735]Unknown partition 'r4' in table 'tr'") } @@ -3452,11 +3452,11 @@ func (s *testSuite) TestSelectView(c *C) { tk.MustQuery("select * from view3;").Check(testkit.Rows("1 2")) tk.MustExec("drop table view_t;") tk.MustExec("create table view_t(c int,d int)") - _, err := tk.Exec("select * from view1") + err := tk.ExecToErr("select * from view1") c.Assert(err.Error(), Equals, plannercore.ErrViewInvalid.GenWithStackByArgs("test", "view1").Error()) - _, err = tk.Exec("select * from view2") + err = tk.ExecToErr("select * from view2") c.Assert(err.Error(), Equals, plannercore.ErrViewInvalid.GenWithStackByArgs("test", "view2").Error()) - _, err = tk.Exec("select * from view3") + err = tk.ExecToErr("select * from view3") c.Assert(err.Error(), Equals, "[planner:1054]Unknown column 'a' in 'field list'") tk.MustExec("drop table view_t;") tk.MustExec("create table view_t(a int,b int,c int)") diff --git a/executor/set_test.go b/executor/set_test.go index 6fdb59fef08ab..d7e28bdd271fe 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -602,8 +602,8 @@ func (s *testSuite2) TestSelectGlobalVar(c *C) { tk.MustExec("set @@global.max_connections=151;") // test for unknown variable. - _, err := tk.Exec("select @@invalid") + err := tk.ExecToErr("select @@invalid") c.Assert(terror.ErrorEqual(err, variable.UnknownSystemVar), IsTrue, Commentf("err %v", err)) - _, err = tk.Exec("select @@global.invalid") + err = tk.ExecToErr("select @@global.invalid") c.Assert(terror.ErrorEqual(err, variable.UnknownSystemVar), IsTrue, Commentf("err %v", err)) } From f6a36e0b3634759b0e8f8afef63c70c06707279c Mon Sep 17 00:00:00 2001 From: bb7133 Date: Tue, 9 Apr 2019 10:17:41 +0800 Subject: [PATCH 04/10] metrics: improve ut coverage to 100% (#10066) --- metrics/metrics_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 909a338eaa0fb..5173349e42b8c 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -17,6 +17,8 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/parser/terror" ) func TestT(t *testing.T) { @@ -32,3 +34,18 @@ func (s *testSuite) TestMetrics(c *C) { // Make sure it doesn't panic. PanicCounter.WithLabelValues(LabelDomain).Inc() } + +func (s *testSuite) TestRegisterMetrics(c *C) { + // Make sure it doesn't panic. + RegisterMetrics() +} + +func (s *testSuite) TestRetLabel(c *C) { + c.Assert(RetLabel(nil), Equals, opSucc) + c.Assert(RetLabel(errors.New("test error")), Equals, opFailed) +} + +func (s *testSuite) TestExecuteErrorToLabel(c *C) { + c.Assert(ExecuteErrorToLabel(errors.New("test")), Equals, `unknown`) + c.Assert(ExecuteErrorToLabel(terror.ErrResultUndetermined), Equals, `global:2`) +} From 278c3e81b4af035e55d92bf470c49ad2de1e09fb Mon Sep 17 00:00:00 2001 From: Song Guo Date: Tue, 9 Apr 2019 10:45:22 +0800 Subject: [PATCH 05/10] *: support `EXPLAIN FOR CONNECTION` (#10030) --- executor/executor_pkg_test.go | 9 ++++ executor/explainfor_test.go | 81 +++++++++++++++++++++++++++++++++++ infoschema/tables_test.go | 5 +++ planner/core/common_plans.go | 5 ++- planner/core/errors.go | 6 +++ planner/core/planbuilder.go | 57 ++++++++++++++++++------ server/server.go | 12 ++++++ session/session.go | 4 ++ util/processinfo.go | 2 + 9 files changed, 166 insertions(+), 15 deletions(-) create mode 100644 executor/explainfor_test.go diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 612f9edaf1b28..bff4f3eae7396 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -50,6 +50,15 @@ func (msm *mockSessionManager) ShowProcessList() map[uint64]util.ProcessInfo { return ret } +func (msm *mockSessionManager) GetProcessInfo(id uint64) (util.ProcessInfo, bool) { + for _, item := range msm.PS { + if item.ID == id { + return item, true + } + } + return util.ProcessInfo{}, false +} + // Kill implements the SessionManager.Kill interface. func (msm *mockSessionManager) Kill(cid uint64, query bool) { diff --git a/executor/explainfor_test.go b/executor/explainfor_test.go new file mode 100644 index 0000000000000..04a64ce3ffc2a --- /dev/null +++ b/executor/explainfor_test.go @@ -0,0 +1,81 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "fmt" + + . "github.com/pingcap/check" + "github.com/pingcap/parser/auth" + "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/testkit" +) + +// mockSessionManager is a mocked session manager which is used for test. +type mockSessionManager1 struct { + PS []util.ProcessInfo +} + +// ShowProcessList implements the SessionManager.ShowProcessList interface. +func (msm *mockSessionManager1) ShowProcessList() map[uint64]util.ProcessInfo { + ret := make(map[uint64]util.ProcessInfo) + for _, item := range msm.PS { + ret[item.ID] = item + } + return ret +} + +func (msm *mockSessionManager1) GetProcessInfo(id uint64) (util.ProcessInfo, bool) { + for _, item := range msm.PS { + if item.ID == id { + return item, true + } + } + return util.ProcessInfo{}, false +} + +// Kill implements the SessionManager.Kill interface. +func (msm *mockSessionManager1) Kill(cid uint64, query bool) { + +} + +func (s *testSuite) TestExplainFor(c *C) { + tkRoot := testkit.NewTestKitWithInit(c, s.store) + tkUser := testkit.NewTestKitWithInit(c, s.store) + tkRoot.MustExec("create table t1(c1 int, c2 int)") + tkRoot.MustExec("create table t2(c1 int, c2 int)") + tkRoot.MustExec("create user tu@'%'") + tkRoot.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost", CurrentUser: true, AuthUsername: "root", AuthHostname: "%"}, nil, []byte("012345678901234567890")) + tkUser.Se.Auth(&auth.UserIdentity{Username: "tu", Hostname: "localhost", CurrentUser: true, AuthUsername: "tu", AuthHostname: "%"}, nil, []byte("012345678901234567890")) + + tkRoot.MustQuery("select * from t1;") + tkRootProcess := tkRoot.Se.ShowProcess() + ps := []util.ProcessInfo{tkRootProcess} + tkRoot.Se.SetSessionManager(&mockSessionManager1{PS: ps}) + tkUser.Se.SetSessionManager(&mockSessionManager1{PS: ps}) + tkRoot.MustQuery(fmt.Sprintf("explain for connection %d", tkRootProcess.ID)).Check(testkit.Rows( + "TableReader_5 10000.00 root data:TableScan_4", + "└─TableScan_4 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", + )) + err := tkUser.ExecToErr(fmt.Sprintf("explain for connection %d", tkRootProcess.ID)) + c.Check(core.ErrAccessDenied.Equal(err), IsTrue) + err = tkUser.ExecToErr("explain for connection 42") + c.Check(core.ErrNoSuchThread.Equal(err), IsTrue) + + tkRootProcess.Plan = nil + ps = []util.ProcessInfo{tkRootProcess} + tkRoot.Se.SetSessionManager(&mockSessionManager1{PS: ps}) + tkRoot.MustExec(fmt.Sprintf("explain for connection %d", tkRootProcess.ID)) +} diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 3faed0f2fc609..42811036d515b 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -176,6 +176,11 @@ type mockSessionManager struct { func (sm *mockSessionManager) ShowProcessList() map[uint64]util.ProcessInfo { return sm.processInfoMap } +func (sm *mockSessionManager) GetProcessInfo(id uint64) (util.ProcessInfo, bool) { + rs, ok := sm.processInfoMap[id] + return rs, ok +} + func (sm *mockSessionManager) Kill(connectionID uint64, query bool) {} func (s *testTableSuite) TestSomeTables(c *C) { diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index a5b2312a57d87..3419a7394f82f 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -29,7 +29,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/types/parser_driver" + driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/ranger" @@ -484,6 +484,9 @@ func (e *Explain) prepareSchema() error { // RenderResult renders the explain result as specified format. func (e *Explain) RenderResult() error { + if e.StmtPlan == nil { + return nil + } switch strings.ToLower(e.Format) { case ast.ExplainFormatROW: e.explainedPlans = map[int]bool{} diff --git a/planner/core/errors.go b/planner/core/errors.go index d3ff18f10988d..6612bcc0dcd71 100644 --- a/planner/core/errors.go +++ b/planner/core/errors.go @@ -136,6 +136,9 @@ var ( ErrWindowRangeBoundNotConstant = terror.ClassOptimizer.New(codeWindowRangeBoundNotConstant, mysql.MySQLErrName[mysql.ErrWindowRangeBoundNotConstant]) ErrWindowRowsIntervalUse = terror.ClassOptimizer.New(codeWindowRowsIntervalUse, mysql.MySQLErrName[mysql.ErrWindowRowsIntervalUse]) ErrWindowFunctionIgnoresFrame = terror.ClassOptimizer.New(codeWindowFunctionIgnoresFrame, mysql.MySQLErrName[mysql.ErrWindowFunctionIgnoresFrame]) + ErrNoSuchThread = terror.ClassOptimizer.New(mysql.ErrNoSuchThread, mysql.MySQLErrName[mysql.ErrNoSuchThread]) + // Since we cannot know if user loggined with a password, use message of ErrAccessDeniedNoPassword instead + ErrAccessDenied = terror.ClassOptimizer.New(mysql.ErrAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDeniedNoPassword]) ) func init() { @@ -187,6 +190,9 @@ func init() { codeWindowRangeBoundNotConstant: mysql.ErrWindowRangeBoundNotConstant, codeWindowRowsIntervalUse: mysql.ErrWindowRowsIntervalUse, codeWindowFunctionIgnoresFrame: mysql.ErrWindowFunctionIgnoresFrame, + + mysql.ErrNoSuchThread: mysql.ErrNoSuchThread, + mysql.ErrAccessDenied: mysql.ErrAccessDenied, } terror.ErrClassToMySQLCodes[terror.ClassOptimizer] = mysqlErrCodeMap } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 85ffed504a5db..ef0fb653d0702 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -33,7 +33,7 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/types/parser_driver" + driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util/ranger" ) @@ -238,6 +238,8 @@ func (b *PlanBuilder) Build(node ast.Node) (Plan, error) { return b.buildExecute(x) case *ast.ExplainStmt: return b.buildExplain(x) + case *ast.ExplainForStmt: + return b.buildExplainFor(x) case *ast.TraceStmt: return b.buildTrace(x) case *ast.InsertStmt: @@ -1090,8 +1092,7 @@ func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) { // Otherwise, you can kill only your own threads and statements. sm := b.ctx.GetSessionManager() if sm != nil { - processList := sm.ShowProcessList() - if pi, ok := processList[raw.ConnectionID]; ok { + if pi, ok := sm.GetProcessInfo(raw.ConnectionID); ok { loginUser := b.ctx.GetSessionVars().User if pi.User != loginUser.Username { b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil) @@ -1730,15 +1731,7 @@ func (b *PlanBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) { return p, nil } -func (b *PlanBuilder) buildExplain(explain *ast.ExplainStmt) (Plan, error) { - if show, ok := explain.Stmt.(*ast.ShowStmt); ok { - return b.buildShow(show) - } - targetPlan, err := OptimizeAstNode(b.ctx, explain.Stmt, b.is) - if err != nil { - return nil, err - } - +func (b *PlanBuilder) buildExplainPlan(targetPlan Plan, format string, analyze bool, execStmt ast.StmtNode) (Plan, error) { pp, ok := targetPlan.(PhysicalPlan) if !ok { switch x := targetPlan.(type) { @@ -1755,15 +1748,51 @@ func (b *PlanBuilder) buildExplain(explain *ast.ExplainStmt) (Plan, error) { return nil, ErrUnsupportedType.GenWithStackByArgs(targetPlan) } } - p := &Explain{StmtPlan: pp, Analyze: explain.Analyze, Format: explain.Format, ExecStmt: explain.Stmt, ExecPlan: targetPlan} + + p := &Explain{StmtPlan: pp, Analyze: analyze, Format: format, ExecStmt: execStmt, ExecPlan: targetPlan} p.ctx = b.ctx - err = p.prepareSchema() + err := p.prepareSchema() if err != nil { return nil, err } return p, nil } +// buildExplainFor gets *last* (maybe running or finished) query plan from connection #connection id. +// See https://dev.mysql.com/doc/refman/8.0/en/explain-for-connection.html. +func (b *PlanBuilder) buildExplainFor(explainFor *ast.ExplainForStmt) (Plan, error) { + processInfo, ok := b.ctx.GetSessionManager().GetProcessInfo(explainFor.ConnectionID) + if !ok { + return nil, ErrNoSuchThread.GenWithStackByArgs(explainFor.ConnectionID) + } + if b.ctx.GetSessionVars() != nil && b.ctx.GetSessionVars().User != nil { + if b.ctx.GetSessionVars().User.Username != processInfo.User { + err := ErrAccessDenied.GenWithStackByArgs(b.ctx.GetSessionVars().User.Username, b.ctx.GetSessionVars().User.Hostname) + // Different from MySQL's behavior and document. + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", err) + } + } + + targetPlan, ok := processInfo.Plan.(Plan) + if !ok || targetPlan == nil { + return &Explain{Format: explainFor.Format}, nil + } + + return b.buildExplainPlan(targetPlan, explainFor.Format, false, nil) +} + +func (b *PlanBuilder) buildExplain(explain *ast.ExplainStmt) (Plan, error) { + if show, ok := explain.Stmt.(*ast.ShowStmt); ok { + return b.buildShow(show) + } + targetPlan, err := OptimizeAstNode(b.ctx, explain.Stmt, b.is) + if err != nil { + return nil, err + } + + return b.buildExplainPlan(targetPlan, explain.Format, explain.Analyze, explain.Stmt) +} + func buildShowProcedureSchema() *expression.Schema { tblName := "ROUTINES" schema := expression.NewSchema(make([]*expression.Column, 0, 11)...) diff --git a/server/server.go b/server/server.go index 7ef69ffacf168..58705781fef14 100644 --- a/server/server.go +++ b/server/server.go @@ -43,6 +43,7 @@ import ( "sync" "sync/atomic" "time" + // For pprof _ "net/http/pprof" @@ -496,6 +497,17 @@ func (s *Server) ShowProcessList() map[uint64]util.ProcessInfo { return rs } +// GetProcessInfo implements the SessionManager interface. +func (s *Server) GetProcessInfo(id uint64) (util.ProcessInfo, bool) { + s.rwlock.RLock() + conn, ok := s.clients[uint32(id)] + s.rwlock.RUnlock() + if !ok || atomic.LoadInt32(&conn.status) == connStatusWaitShutdown { + return util.ProcessInfo{}, false + } + return conn.ctx.ShowProcess(), ok +} + // Kill implements the SessionManager interface. func (s *Server) Kill(connectionID uint64, query bool) { s.rwlock.Lock() diff --git a/session/session.go b/session/session.go index 145b5f521a8c4..93bf7b0f63e08 100644 --- a/session/session.go +++ b/session/session.go @@ -141,6 +141,8 @@ type session struct { values map[fmt.Stringer]interface{} } + currentPlan plannercore.Plan + store kv.Storage parser *parser.Parser @@ -881,6 +883,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte) { ID: s.sessionVars.ConnectionID, DB: s.sessionVars.CurrentDB, Command: command, + Plan: s.currentPlan, Time: t, State: s.Status(), Info: sql, @@ -972,6 +975,7 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec return nil, err } metrics.SessionExecuteCompileDuration.WithLabelValues(label).Observe(time.Since(startTS).Seconds()) + s.currentPlan = stmt.Plan // Step3: Execute the physical plan. if recordSets, err = s.executeStatement(ctx, connID, stmtNode, stmt, recordSets); err != nil { diff --git a/util/processinfo.go b/util/processinfo.go index fa32a405df84c..d6a992da0e318 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -27,6 +27,7 @@ type ProcessInfo struct { Host string DB string Command byte + Plan interface{} Time time.Time State uint16 Info string @@ -58,5 +59,6 @@ func (pi *ProcessInfo) ToRow(full bool) []interface{} { type SessionManager interface { // ShowProcessList returns map[connectionID]ProcessInfo ShowProcessList() map[uint64]ProcessInfo + GetProcessInfo(id uint64) (ProcessInfo, bool) Kill(connectionID uint64, query bool) } From 915b879aaee9d249da3bc7a41169abf328f1dd48 Mon Sep 17 00:00:00 2001 From: Jun-Seok Heo Date: Tue, 9 Apr 2019 11:57:27 +0900 Subject: [PATCH 06/10] expression: relocate getSystemTimestamp() not to be called unnecessarily (#10073) --- expression/helper.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/expression/helper.go b/expression/helper.go index 6b815f7b5a512..cd150fcb6cc84 100644 --- a/expression/helper.go +++ b/expression/helper.go @@ -49,15 +49,15 @@ func GetTimeValue(ctx sessionctx.Context, v interface{}, tp byte, fsp int) (d ty Fsp: fsp, } - defaultTime, err := getSystemTimestamp(ctx) - if err != nil { - return d, err - } sc := ctx.GetSessionVars().StmtCtx switch x := v.(type) { case string: upperX := strings.ToUpper(x) if upperX == strings.ToUpper(ast.CurrentTimestamp) { + defaultTime, err := getSystemTimestamp(ctx) + if err != nil { + return d, err + } value.Time = types.FromGoTime(defaultTime.Truncate(time.Duration(math.Pow10(9-fsp)) * time.Nanosecond)) if tp == mysql.TypeTimestamp || tp == mysql.TypeDatetime { err = value.ConvertTimeZone(time.Local, ctx.GetSessionVars().Location()) From dd08a0a7b0addc0e0f790de59134fa2cbe236720 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 9 Apr 2019 11:06:05 +0800 Subject: [PATCH 07/10] meta/autoid: remove memoryAllocator (#10061) --- meta/autoid/autoid.go | 60 ------------------------------------------- 1 file changed, 60 deletions(-) diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index bdabe02558e4e..8f52cb1e4adbb 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -290,59 +290,6 @@ func (alloc *allocator) Alloc(tableID int64) (int64, error) { return alloc.alloc4Signed(tableID) } -var ( - memID int64 - memIDLock sync.Mutex -) - -type memoryAllocator struct { - mu sync.Mutex - base int64 - end int64 - dbID int64 -} - -// Base implements autoid.Allocator Base interface. -func (alloc *memoryAllocator) Base() int64 { - return alloc.base -} - -// End implements autoid.Allocator End interface. -func (alloc *memoryAllocator) End() int64 { - return alloc.end -} - -// NextGlobalAutoID implements autoid.Allocator NextGlobalAutoID interface. -func (alloc *memoryAllocator) NextGlobalAutoID(tableID int64) (int64, error) { - memIDLock.Lock() - defer memIDLock.Unlock() - return memID + 1, nil -} - -// Rebase implements autoid.Allocator Rebase interface. -func (alloc *memoryAllocator) Rebase(tableID, newBase int64, allocIDs bool) error { - // TODO: implement it. - return nil -} - -// Alloc implements autoid.Allocator Alloc interface. -func (alloc *memoryAllocator) Alloc(tableID int64) (int64, error) { - if tableID == 0 { - return 0, errInvalidTableID.GenWithStack("Invalid tableID") - } - alloc.mu.Lock() - defer alloc.mu.Unlock() - if alloc.base == alloc.end { // step - memIDLock.Lock() - memID = memID + step - alloc.end = memID - alloc.base = alloc.end - step - memIDLock.Unlock() - } - alloc.base++ - return alloc.base, nil -} - // NewAllocator returns a new auto increment id generator on the store. func NewAllocator(store kv.Storage, dbID int64, isUnsigned bool) Allocator { return &allocator{ @@ -352,13 +299,6 @@ func NewAllocator(store kv.Storage, dbID int64, isUnsigned bool) Allocator { } } -// NewMemoryAllocator returns a new auto increment id generator in memory. -func NewMemoryAllocator(dbID int64) Allocator { - return &memoryAllocator{ - dbID: dbID, - } -} - //autoid error codes. const codeInvalidTableID terror.ErrCode = 1 From 7c510b7652b165b249794a6be15f1024dee1b310 Mon Sep 17 00:00:00 2001 From: kennytm Date: Tue, 9 Apr 2019 11:12:50 +0800 Subject: [PATCH 08/10] planner: ensure `execution info` column in EXPLAIN ANALYZE exists (#10035) --- planner/core/cbo_test.go | 62 ++++++++++++++++++++++++++++++++++++ planner/core/common_plans.go | 2 ++ 2 files changed, 64 insertions(+) diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index a29fb980a07d7..caefa22d9f1eb 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -924,3 +924,65 @@ func (s *testAnalyzeSuite) TestIssue9562(c *C) { " └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", )) } + +func (s *testAnalyzeSuite) TestIssue9805(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + defer func() { + dom.Close() + store.Close() + }() + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec(` + create table t1 ( + id bigint primary key, + a bigint not null, + b varchar(100) not null, + c varchar(10) not null, + d bigint as (a % 30) not null, + key (d, b, c) + ) + `) + tk.MustExec(` + create table t2 ( + id varchar(50) primary key, + a varchar(100) unique, + b datetime, + c varchar(45), + d int not null unique auto_increment + ) + `) + rs := tk.MustQuery("explain analyze select t1.id, t2.a from t1 join t2 on t1.a = t2.d where t1.b = 't2' and t1.d = 4") + + // Expected output is like: + // + // +--------------------------------+----------+------+----------------------------------------------------------------------------------+----------------------------------+ + // | id | count | task | operator info | execution info | + // +--------------------------------+----------+------+----------------------------------------------------------------------------------+----------------------------------+ + // | Projection_9 | 10.00 | root | test.t1.id, test.t2.a | time:203.355µs, loops:1, rows:0 | + // | └─IndexJoin_13 | 10.00 | root | inner join, inner:IndexLookUp_12, outer key:test.t1.a, inner key:test.t2.d | time:199.633µs, loops:1, rows:0 | + // | ├─Projection_16 | 8.00 | root | test.t1.id, test.t1.a, test.t1.b, cast(mod(test.t1.a, 30)) | time:164.587µs, loops:1, rows:0 | + // | │ └─Selection_17 | 8.00 | root | eq(cast(mod(test.t1.a, 30)), 4) | time:157.768µs, loops:1, rows:0 | + // | │ └─TableReader_20 | 10.00 | root | data:Selection_19 | time:154.61µs, loops:1, rows:0 | + // | │ └─Selection_19 | 10.00 | cop | eq(test.t1.b, "t2") | time:28.824µs, loops:1, rows:0 | + // | │ └─TableScan_18 | 10000.00 | cop | table:t1, range:[-inf,+inf], keep order:false, stats:pseudo | time:27.654µs, loops:1, rows:0 | + // | └─IndexLookUp_12 | 10.00 | root | | time:0ns, loops:0, rows:0 | + // | ├─IndexScan_10 | 10.00 | cop | table:t2, index:d, range: decided by [test.t1.a], keep order:false, stats:pseudo | time:0ns, loops:0, rows:0 | + // | └─TableScan_11 | 10.00 | cop | table:t2, keep order:false, stats:pseudo | time:0ns, loops:0, rows:0 | + // +--------------------------------+----------+------+----------------------------------------------------------------------------------+----------------------------------+ + // 10 rows in set (0.00 sec) + // + c.Assert(rs.Rows(), HasLen, 10) + hasIndexLookUp12 := false + for _, row := range rs.Rows() { + c.Assert(row, HasLen, 5) + if strings.HasSuffix(row[0].(string), "IndexLookUp_12") { + hasIndexLookUp12 = true + c.Assert(row[4], Equals, "time:0ns, loops:0, rows:0") + } + } + c.Assert(hasIndexLookUp12, IsTrue) +} diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 3419a7394f82f..9a6234d673ec6 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -538,6 +538,8 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st row = append(row, runtimeStatsColl.GetCopStats(p.ExplainID()).String()) } else if runtimeStatsColl.ExistsRootStats(p.ExplainID()) { row = append(row, runtimeStatsColl.GetRootStats(p.ExplainID()).String()) + } else { + row = append(row, "time:0ns, loops:0, rows:0") } } e.Rows = append(e.Rows, row) From 1c21151b1193a9d1416377dfd6b3743126ac8bfc Mon Sep 17 00:00:00 2001 From: Keyi Xie Date: Tue, 9 Apr 2019 13:29:42 +0800 Subject: [PATCH 09/10] types,types/json: improve test coverage (#9977) --- types/binary_literal_test.go | 12 ++++ types/convert_test.go | 19 ++++++ types/datum_test.go | 32 ++++++++++ types/helper_test.go | 19 ++++++ types/json/binary_functions_test.go | 31 ++++++++++ types/mytime_test.go | 43 ++++++++++++++ types/overflow_test.go | 1 + types/time_test.go | 92 +++++++++++++++++++++++++++++ 8 files changed, 249 insertions(+) create mode 100644 types/json/binary_functions_test.go diff --git a/types/binary_literal_test.go b/types/binary_literal_test.go index a8788ecd422bc..b1547fec5f24f 100644 --- a/types/binary_literal_test.go +++ b/types/binary_literal_test.go @@ -106,6 +106,9 @@ func (s *testBinaryLiteralSuite) TestParseBitStr(c *C) { c.Assert([]byte(b), DeepEquals, t.Expected, Commentf("%#v", t)) } } + b, err := ParseBitStr("") + c.Assert(b, IsNil) + c.Assert(err, ErrorMatches, "invalid empty .*") } func (s *testBinaryLiteralSuite) TestParseHexStr(c *C) { @@ -139,6 +142,9 @@ func (s *testBinaryLiteralSuite) TestParseHexStr(c *C) { c.Assert([]byte(hex), DeepEquals, t.Expected, Commentf("%#v", t)) } } + hex, err := ParseHexStr("") + c.Assert(hex, IsNil) + c.Assert(err, ErrorMatches, "invalid empty .*") } func (s *testBinaryLiteralSuite) TestString(c *C) { @@ -243,6 +249,12 @@ func (s *testBinaryLiteralSuite) TestNewBinaryLiteralFromUint(c *C) { hex := NewBinaryLiteralFromUint(t.Input, t.ByteSize) c.Assert([]byte(hex), DeepEquals, t.Expected, Commentf("%#v", t)) } + + defer func() { + r := recover() + c.Assert(r, NotNil) + }() + NewBinaryLiteralFromUint(0x123, -2) } func (s *testBinaryLiteralSuite) TestCompare(c *C) { diff --git a/types/convert_test.go b/types/convert_test.go index 6398b9b87eeac..262b76862cb9c 100644 --- a/types/convert_test.go +++ b/types/convert_test.go @@ -884,3 +884,22 @@ func (s *testTypeConvertSuite) TestNumberToDuration(c *C) { c.Assert(dur.Duration, Equals, tc.dur) } } + +func (s *testTypeConvertSuite) TestStrToDuration(c *C) { + sc := new(stmtctx.StatementContext) + var tests = []struct { + str string + fsp int + isDuration bool + }{ + {"20190412120000", 4, false}, + {"20190101180000", 6, false}, + {"20190101180000", 1, false}, + {"20190101181234", 3, false}, + } + for _, tt := range tests { + _, _, isDuration, err := StrToDuration(sc, tt.str, tt.fsp) + c.Assert(err, IsNil) + c.Assert(isDuration, Equals, tt.isDuration) + } +} diff --git a/types/datum_test.go b/types/datum_test.go index 3e67866bfe1a5..a5a145ba55e4e 100644 --- a/types/datum_test.go +++ b/types/datum_test.go @@ -40,9 +40,13 @@ func (ts *testDatumSuite) TestDatum(c *C) { } for _, val := range values { var d Datum + d.SetMinNotNull() d.SetValue(val) x := d.GetValue() c.Assert(x, DeepEquals, val) + d.SetCollation(d.Collation()) + c.Assert(d.Collation(), NotNil) + c.Assert(d.Length(), Equals, int(d.length)) } } @@ -194,6 +198,34 @@ func (ts *testTypeConvertSuite) TestToFloat32(c *C) { c.Assert(converted.GetFloat64(), Equals, datum.GetFloat64()) } +func (ts *testTypeConvertSuite) TestToFloat64(c *C) { + testCases := []struct { + d Datum + errMsg string + result float64 + }{ + {NewDatum(float32(3.00)), "", 3.00}, + {NewDatum(float64(12345.678)), "", 12345.678}, + {NewDatum("12345.678"), "", 12345.678}, + {NewDatum([]byte("12345.678")), "", 12345.678}, + {NewDatum(int64(12345)), "", 12345}, + {NewDatum(uint64(123456)), "", 123456}, + {NewDatum(byte(123)), "cannot convert .*", 0}, + } + + sc := new(stmtctx.StatementContext) + sc.IgnoreTruncate = true + for _, t := range testCases { + converted, err := t.d.ToFloat64(sc) + if t.errMsg == "" { + c.Assert(err, IsNil) + } else { + c.Assert(err, ErrorMatches, t.errMsg) + } + c.Assert(converted, Equals, t.result) + } +} + // mustParseTimeIntoDatum is similar to ParseTime but panic if any error occurs. func mustParseTimeIntoDatum(s string, tp byte, fsp int) (d Datum) { t, err := ParseTime(&stmtctx.StatementContext{TimeZone: time.UTC}, s, tp, fsp) diff --git a/types/helper_test.go b/types/helper_test.go index fa39980252901..434a88c7d0e2f 100644 --- a/types/helper_test.go +++ b/types/helper_test.go @@ -26,6 +26,7 @@ type testTypeHelperSuite struct { } func (s *testTypeHelperSuite) TestStrToInt(c *C) { + c.Parallel() tests := []struct { input string output string @@ -44,3 +45,21 @@ func (s *testTypeHelperSuite) TestStrToInt(c *C) { c.Check(strconv.FormatInt(output, 10), Equals, tt.output) } } + +func (s *testTypeHelperSuite) TestTruncate(c *C) { + c.Parallel() + tests := []struct { + f float64 + dec int + expected float64 + }{ + {123.45, 0, 123}, + {123.45, 1, 123.4}, + {123.45, 2, 123.45}, + {123.45, 3, 123.450}, + } + for _, tt := range tests { + res := Truncate(tt.f, tt.dec) + c.Assert(res, Equals, tt.expected) + } +} diff --git a/types/json/binary_functions_test.go b/types/json/binary_functions_test.go new file mode 100644 index 0000000000000..1d63e880e8aae --- /dev/null +++ b/types/json/binary_functions_test.go @@ -0,0 +1,31 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package json + +import ( + . "github.com/pingcap/check" +) + +var _ = Suite(&testJSONFuncSuite{}) + +type testJSONFuncSuite struct{} + +func (s *testJSONFuncSuite) TestdecodeEscapedUnicode(c *C) { + c.Parallel() + in := "597d" + r, size, err := decodeEscapedUnicode([]byte(in)) + c.Assert(string(r[:]), Equals, "好\x00") + c.Assert(size, Equals, 3) + c.Assert(err, IsNil) + +} diff --git a/types/mytime_test.go b/types/mytime_test.go index 8d9d8d016a68f..f2897c878908b 100644 --- a/types/mytime_test.go +++ b/types/mytime_test.go @@ -14,6 +14,8 @@ package types import ( + "time" + . "github.com/pingcap/check" ) @@ -228,3 +230,44 @@ func (s *testMyTimeSuite) TestGetLastDay(c *C) { c.Assert(day, Equals, t.expectedDay) } } + +func (s *testMyTimeSuite) TestgetFixDays(c *C) { + tests := []struct { + year int + month int + day int + ot time.Time + expectedDay int + }{ + {2000, 1, 0, time.Date(2000, 1, 31, 0, 0, 0, 0, time.UTC), -2}, + {2000, 1, 12, time.Date(2000, 1, 31, 0, 0, 0, 0, time.UTC), 0}, + {2000, 1, 12, time.Date(2000, 1, 0, 0, 0, 0, 0, time.UTC), 0}, + {2000, 2, 24, time.Date(2000, 2, 10, 0, 0, 0, 0, time.UTC), 0}, + {2019, 04, 05, time.Date(2019, 04, 01, 1, 2, 3, 4, time.UTC), 0}, + } + + for _, t := range tests { + res := getFixDays(t.year, t.month, t.day, t.ot) + c.Assert(res, Equals, t.expectedDay) + } +} + +func (s *testMyTimeSuite) TestAddDate(c *C) { + tests := []struct { + year int + month int + day int + ot time.Time + }{ + {01, 1, 0, time.Date(2000, 1, 01, 0, 0, 0, 0, time.UTC)}, + {02, 1, 12, time.Date(2000, 1, 01, 0, 0, 0, 0, time.UTC)}, + {03, 1, 12, time.Date(2000, 1, 01, 0, 0, 0, 0, time.UTC)}, + {04, 2, 24, time.Date(2000, 2, 10, 0, 0, 0, 0, time.UTC)}, + {01, 04, 05, time.Date(2019, 04, 01, 1, 2, 3, 4, time.UTC)}, + } + + for _, t := range tests { + res := AddDate(int64(t.year), int64(t.month), int64(t.day), t.ot) + c.Assert(res.Year(), Equals, t.year+t.ot.Year()) + } +} diff --git a/types/overflow_test.go b/types/overflow_test.go index 0f2fb0fc435c0..43d6d84a88c9a 100644 --- a/types/overflow_test.go +++ b/types/overflow_test.go @@ -305,6 +305,7 @@ func (s *testOverflowSuite) TestDiv(c *C) { {1, -1, 0, true}, {math.MaxInt64, math.MinInt64, 0, false}, {math.MaxInt64, -1, 0, true}, + {100, 20, 5, false}, } for _, t := range tblInt { diff --git a/types/time_test.go b/types/time_test.go index e7cf071ea23ee..d5a88c7c25c59 100644 --- a/types/time_test.go +++ b/types/time_test.go @@ -758,6 +758,21 @@ func (s *testTimeSuite) TestRoundFrac(c *C) { c.Assert(err, IsNil) c.Assert(nv.String(), Equals, t.Except) } + + cols := []struct { + input time.Time + fsp int + output time.Time + }{ + {time.Date(2011, 11, 11, 10, 10, 10, 888888, time.UTC), 0, time.Date(2011, 11, 11, 10, 10, 10, 11, time.UTC)}, + {time.Date(2011, 11, 11, 10, 10, 10, 111111, time.UTC), 0, time.Date(2011, 11, 11, 10, 10, 10, 10, time.UTC)}, + } + + for _, col := range cols { + res, err := types.RoundFrac(col.input, col.fsp) + c.Assert(res.Second(), Equals, col.output.Second()) + c.Assert(err, IsNil) + } } func (s *testTimeSuite) TestConvert(c *C) { @@ -832,6 +847,12 @@ func (s *testTimeSuite) TestCompare(c *C) { c.Assert(ret, Equals, t.Ret) } + v1, err := types.ParseTime(nil, "2011-10-10 11:11:11", mysql.TypeDatetime, types.MaxFsp) + c.Assert(err, IsNil) + res, err := v1.CompareString(nil, "Test should error") + c.Assert(err, NotNil) + c.Assert(res, Equals, 0) + tbl = []struct { Arg1 string Arg2 string @@ -1337,3 +1358,74 @@ func (s *testTimeSuite) TestgetFracIndex(c *C) { c.Assert(index, Equals, testCase.expectIndex) } } + +func (s *testTimeSuite) TestTimeOverflow(c *C) { + sc := mock.NewContext().GetSessionVars().StmtCtx + sc.IgnoreZeroInDate = true + defer testleak.AfterTest(c)() + table := []struct { + Input string + Output bool + }{ + {"2012-12-31 11:30:45", false}, + {"12-12-31 11:30:45", false}, + {"2012-12-31", false}, + {"20121231", false}, + {"2012-02-29", false}, + {"2018-01-01 18", false}, + {"18-01-01 18", false}, + {"2018.01.01", false}, + {"2018.01.01 00:00:00", false}, + {"2018/01/01-00:00:00", false}, + } + + for _, test := range table { + t, err := types.ParseDatetime(sc, test.Input) + c.Assert(err, IsNil) + isOverflow, err := types.DateTimeIsOverflow(sc, t) + c.Assert(err, IsNil) + c.Assert(isOverflow, Equals, test.Output) + } +} + +func (s *testTimeSuite) TestTruncateFrac(c *C) { + cols := []struct { + input time.Time + fsp int + output time.Time + }{ + {time.Date(2011, 11, 11, 10, 10, 10, 888888, time.UTC), 0, time.Date(2011, 11, 11, 10, 10, 10, 11, time.UTC)}, + {time.Date(2011, 11, 11, 10, 10, 10, 111111, time.UTC), 0, time.Date(2011, 11, 11, 10, 10, 10, 10, time.UTC)}, + } + + for _, col := range cols { + res, err := types.TruncateFrac(col.input, col.fsp) + c.Assert(res.Second(), Equals, col.output.Second()) + c.Assert(err, IsNil) + } +} +func (s *testTimeSuite) TestTimeSub(c *C) { + tbl := []struct { + Arg1 string + Arg2 string + Ret string + }{ + {"2017-01-18 01:01:01", "2017-01-18 00:00:01", "01:01:00"}, + {"2017-01-18 01:01:01", "2017-01-18 01:01:01", "00:00:00"}, + {"2019-04-12 18:20:00", "2019-04-12 14:00:00", "04:20:00"}, + } + + sc := &stmtctx.StatementContext{ + TimeZone: time.UTC, + } + for _, t := range tbl { + v1, err := types.ParseTime(nil, t.Arg1, mysql.TypeDatetime, types.MaxFsp) + c.Assert(err, IsNil) + v2, err := types.ParseTime(nil, t.Arg2, mysql.TypeDatetime, types.MaxFsp) + c.Assert(err, IsNil) + dur, err := types.ParseDuration(sc, t.Ret, types.MaxFsp) + c.Assert(err, IsNil) + rec := v1.Sub(sc, &v2) + c.Assert(rec, Equals, dur) + } +} From 7acbe520f48f64dadce31e52532574ba059de801 Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Tue, 9 Apr 2019 13:46:15 +0800 Subject: [PATCH 10/10] server: move out some code from http_handler.go (#10071) --- server/http_handler.go | 199 ++++++++++-------------------------- server/http_handler_test.go | 6 +- server/http_status.go | 2 +- store/helper/helper.go | 135 ++++++++++++++++++++++++ store/helper/helper_test.go | 118 +++++++++++++++++++++ 5 files changed, 311 insertions(+), 149 deletions(-) create mode 100644 store/helper/helper.go create mode 100644 store/helper/helper_test.go diff --git a/server/http_handler.go b/server/http_handler.go index 6378d6f4212fb..db8f2428aefdd 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/table" @@ -110,55 +111,30 @@ func writeData(w http.ResponseWriter, data interface{}) { } type tikvHandlerTool struct { - regionCache *tikv.RegionCache - store kvStore + helper.Helper } // newTikvHandlerTool checks and prepares for tikv handler. // It would panic when any error happens. func (s *Server) newTikvHandlerTool() *tikvHandlerTool { - var tikvStore kvStore + var tikvStore tikv.Storage store, ok := s.driver.(*TiDBDriver) if !ok { panic("Invalid KvStore with illegal driver") } - if tikvStore, ok = store.store.(kvStore); !ok { + if tikvStore, ok = store.store.(tikv.Storage); !ok { panic("Invalid KvStore with illegal store") } regionCache := tikvStore.GetRegionCache() return &tikvHandlerTool{ - regionCache: regionCache, - store: tikvStore, - } -} - -func (t *tikvHandlerTool) getMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - keyLocation, err := t.regionCache.LocateKey(tikv.NewBackoffer(context.Background(), 500), encodedKey) - if err != nil { - return nil, errors.Trace(err) - } - - tikvReq := &tikvrpc.Request{ - Type: tikvrpc.CmdMvccGetByKey, - MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{ - Key: encodedKey, + helper.Helper{ + RegionCache: regionCache, + Store: tikvStore, }, } - kvResp, err := t.store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute) - if err != nil { - logutil.Logger(context.Background()).Info("get MVCC by encoded key failed", - zap.Binary("encodeKey", encodedKey), - zap.Reflect("region", keyLocation.Region), - zap.Binary("startKey", keyLocation.StartKey), - zap.Binary("endKey", keyLocation.EndKey), - zap.Reflect("kvResp", kvResp), - zap.Error(err)) - return nil, errors.Trace(err) - } - return kvResp.MvccGetByKey, nil } type mvccKV struct { @@ -168,14 +144,14 @@ type mvccKV struct { func (t *tikvHandlerTool) getMvccByHandle(tableID, handle int64) (*mvccKV, error) { encodedKey := tablecodec.EncodeRowKeyWithHandle(tableID, handle) - data, err := t.getMvccByEncodedKey(encodedKey) - return &mvccKV{strings.ToUpper(hex.EncodeToString(encodedKey)), data}, err + data, err := t.GetMvccByEncodedKey(encodedKey) + return &mvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), Value: data}, err } func (t *tikvHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey []byte) (*kvrpcpb.MvccGetByStartTsResponse, error) { bo := tikv.NewBackoffer(context.Background(), 5000) for { - curRegion, err := t.regionCache.LocateKey(bo, startKey) + curRegion, err := t.RegionCache.LocateKey(bo, startKey) if err != nil { logutil.Logger(context.Background()).Error("get MVCC by startTS failed", zap.Uint64("txnStartTS", startTS), zap.Binary("startKey", startKey), zap.Error(err)) return nil, errors.Trace(err) @@ -188,7 +164,7 @@ func (t *tikvHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey []by }, } tikvReq.Context.Priority = kvrpcpb.CommandPri_Low - kvResp, err := t.store.SendReq(bo, tikvReq, curRegion.Region, time.Hour) + kvResp, err := t.Store.SendReq(bo, tikvReq, curRegion.Region, time.Hour) if err != nil { logutil.Logger(context.Background()).Error("get MVCC by startTS failed", zap.Uint64("txnStartTS", startTS), @@ -257,7 +233,7 @@ func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values, if err != nil { return nil, errors.Trace(err) } - data, err := t.getMvccByEncodedKey(encodedKey) + data, err := t.GetMvccByEncodedKey(encodedKey) return &mvccKV{strings.ToUpper(hex.EncodeToString(encodedKey)), data}, err } @@ -310,7 +286,7 @@ func (t *tikvHandlerTool) getTable(dbName, tableName string) (*model.TableInfo, } func (t *tikvHandlerTool) schema() (infoschema.InfoSchema, error) { - session, err := session.CreateSession(t.store.(kv.Storage)) + session, err := session.CreateSession(t.Store.(kv.Storage)) if err != nil { return nil, errors.Trace(err) } @@ -322,36 +298,11 @@ func (t *tikvHandlerTool) handleMvccGetByHex(params map[string]string) (interfac if err != nil { return nil, errors.Trace(err) } - return t.getMvccByEncodedKey(encodedKey) -} - -func (t *tikvHandlerTool) getAllHistoryDDL() ([]*model.Job, error) { - s, err := session.CreateSession(t.store.(kv.Storage)) - if err != nil { - return nil, errors.Trace(err) - } - - if s != nil { - defer s.Close() - } - - store := domain.GetDomain(s.(sessionctx.Context)).Store() - txn, err := store.Begin() - - if err != nil { - return nil, errors.Trace(err) - } - txnMeta := meta.NewMeta(txn) - - jobs, err := txnMeta.GetAllHistoryDDLJobs() - if err != nil { - return nil, errors.Trace(err) - } - return jobs, nil + return t.GetMvccByEncodedKey(encodedKey) } -func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]regionMetric, error) { - regionMetrics, err := t.fetchHotRegion(rw) +func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]helper.RegionMetric, error) { + regionMetrics, err := t.FetchHotRegion(rw) if err != nil { return nil, err } @@ -363,34 +314,6 @@ func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]regionMetric, e return tblIdx, nil } -// storeHotRegionInfos records all hog region stores. -// it's the response of PD. -type storeHotRegionInfos struct { - AsPeer map[uint64]*hotRegionsStat `json:"as_peer"` - AsLeader map[uint64]*hotRegionsStat `json:"as_leader"` -} - -// hotRegions records echo store's hot region. -// it's the response of PD. -type hotRegionsStat struct { - RegionsStat []regionStat `json:"statistics"` -} - -// regionStat records each hot region's statistics -// it's the response of PD. -type regionStat struct { - RegionID uint64 `json:"region_id"` - FlowBytes uint64 `json:"flow_bytes"` - HotDegree int `json:"hot_degree"` -} - -// regionMetric presents the final metric output entry. -type regionMetric struct { - FlowBytes uint64 `json:"flow_bytes"` - MaxHotDegree int `json:"max_hot_degree"` - Count int `json:"region_count"` -} - // tblIndex presents the aggregate key that combined with db,table,index type tblIndex struct { DbName string `json:"db_name"` @@ -398,54 +321,15 @@ type tblIndex struct { IndexName string `json:"index_name"` } -func (t *tikvHandlerTool) fetchHotRegion(rw string) (map[uint64]regionMetric, error) { - etcd, ok := t.store.(domain.EtcdBackend) - if !ok { - return nil, errors.New("not implemented") - } - pdHosts := etcd.EtcdAddrs() - if len(pdHosts) == 0 { - return nil, errors.New("pd unavailable") - } - req, err := http.NewRequest("GET", protocol+pdHosts[0]+rw, nil) - if err != nil { - return nil, errors.Trace(err) - } - timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Millisecond) - resp, err := http.DefaultClient.Do(req.WithContext(timeout)) - cancelFunc() - if err != nil { - return nil, errors.Trace(err) - } - defer func() { - err = resp.Body.Close() - if err != nil { - logutil.Logger(context.Background()).Error("close body failed", zap.Error(err)) - } - }() - var regionResp storeHotRegionInfos - err = json.NewDecoder(resp.Body).Decode(®ionResp) - if err != nil { - return nil, errors.Trace(err) - } - metric := make(map[uint64]regionMetric) - for _, hotRegions := range regionResp.AsLeader { - for _, region := range hotRegions.RegionsStat { - metric[region.RegionID] = regionMetric{FlowBytes: region.FlowBytes, MaxHotDegree: region.HotDegree} - } - } - return metric, nil -} - -func (t *tikvHandlerTool) fetchRegionTableIndex(metrics map[uint64]regionMetric) (map[tblIndex]regionMetric, error) { +func (t *tikvHandlerTool) fetchRegionTableIndex(metrics map[uint64]helper.RegionMetric) (map[tblIndex]helper.RegionMetric, error) { schema, err := t.schema() if err != nil { return nil, err } - idxMetrics := make(map[tblIndex]regionMetric) + idxMetrics := make(map[tblIndex]helper.RegionMetric) for regionID, regionMetric := range metrics { - region, err := t.regionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID) + region, err := t.RegionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID) if err != nil { logutil.Logger(context.Background()).Error("locate region failed", zap.Error(err)) continue @@ -713,7 +597,7 @@ type RegionFrameRange struct { func (t *tikvHandlerTool) getRegionsMeta(regionIDs []uint64) ([]RegionMeta, error) { regions := make([]RegionMeta, len(regionIDs)) for i, regionID := range regionIDs { - meta, leader, err := t.regionCache.PDClient().GetRegionByID(context.TODO(), regionID) + meta, leader, err := t.RegionCache.PDClient().GetRegionByID(context.TODO(), regionID) if err != nil { return nil, errors.Trace(err) } @@ -925,6 +809,31 @@ func (h ddlHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request writeData(w, jobs) } +func (h ddlHistoryJobHandler) getAllHistoryDDL() ([]*model.Job, error) { + s, err := session.CreateSession(h.Store.(kv.Storage)) + if err != nil { + return nil, errors.Trace(err) + } + + if s != nil { + defer s.Close() + } + + store := domain.GetDomain(s.(sessionctx.Context)).Store() + txn, err := store.Begin() + + if err != nil { + return nil, errors.Trace(err) + } + txnMeta := meta.NewMeta(txn) + + jobs, err := txnMeta.GetAllHistoryDDLJobs() + if err != nil { + return nil, errors.Trace(err) + } + return jobs, nil +} + func (h ddlResignOwnerHandler) resignDDLOwner() error { dom, err := session.GetDomain(h.store) if err != nil { @@ -958,7 +867,7 @@ func (h ddlResignOwnerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques func (h tableHandler) getPDAddr() ([]string, error) { var pdAddrs []string - etcd, ok := h.store.(domain.EtcdBackend) + etcd, ok := h.Store.(domain.EtcdBackend) if !ok { return nil, errors.New("not implemented") } @@ -1069,7 +978,7 @@ func (h tableHandler) handleRegionRequest(schema infoschema.InfoSchema, tbl tabl tableID := tbl.Meta().ID // for record startKey, endKey := tablecodec.GetTableHandleKeyRange(tableID) - recordRegionIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) + recordRegionIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) if err != nil { writeError(w, err) return @@ -1087,7 +996,7 @@ func (h tableHandler) handleRegionRequest(schema infoschema.InfoSchema, tbl tabl indices[i].Name = index.Meta().Name.String() indices[i].ID = indexID startKey, endKey := tablecodec.GetTableIndexKeyRange(tableID, indexID) - rIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) + rIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) if err != nil { writeError(w, err) return @@ -1161,7 +1070,7 @@ func (h tableHandler) handleDiskUsageRequest(schema infoschema.InfoSchema, tbl t type hotRegion struct { tblIndex - regionMetric + helper.RegionMetric } type hotRegions []hotRegion @@ -1187,7 +1096,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { startKey := []byte{'m'} endKey := []byte{'n'} - recordRegionIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) + recordRegionIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) if err != nil { writeError(w, err) return @@ -1212,7 +1121,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { writeError(w, err) return } - asSortedEntry := func(metric map[tblIndex]regionMetric) hotRegions { + asSortedEntry := func(metric map[tblIndex]helper.RegionMetric) hotRegions { hs := make(hotRegions, 0, len(metric)) for key, value := range metric { hs = append(hs, hotRegion{key, value}) @@ -1237,7 +1146,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { regionID := uint64(regionIDInt) // locate region - region, err := h.regionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID) + region, err := h.RegionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID) if err != nil { writeError(w, err) return @@ -1607,7 +1516,7 @@ type serverInfo struct { // ServeHTTP handles request of ddl server info. func (h serverInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - do, err := session.GetDomain(h.store.(kv.Storage)) + do, err := session.GetDomain(h.Store.(kv.Storage)) if err != nil { writeError(w, errors.New("create session error")) log.Error(err) @@ -1630,7 +1539,7 @@ type clusterServerInfo struct { // ServeHTTP handles request of all ddl servers info. func (h allServerInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - do, err := session.GetDomain(h.store.(kv.Storage)) + do, err := session.GetDomain(h.Store.(kv.Storage)) if err != nil { writeError(w, errors.New("create session error")) log.Error(err) diff --git a/server/http_handler_test.go b/server/http_handler_test.go index 1b90271a0aa75..826ce61481776 100644 --- a/server/http_handler_test.go +++ b/server/http_handler_test.go @@ -600,7 +600,7 @@ func (ts *HTTPHandlerTestSuite) TestAllHistory(c *C) { decoder := json.NewDecoder(resp.Body) var jobs []*model.Job - s, _ := session.CreateSession(ts.server.newTikvHandlerTool().store.(kv.Storage)) + s, _ := session.CreateSession(ts.server.newTikvHandlerTool().Store.(kv.Storage)) defer s.Close() store := domain.GetDomain(s.(sessionctx.Context)).Store() txn, _ := store.Begin() @@ -714,7 +714,7 @@ func (ts *HTTPHandlerTestSuite) TestServerInfo(c *C) { c.Assert(info.Version, Equals, mysql.ServerVersion) c.Assert(info.GitHash, Equals, printer.TiDBGitHash) - store := ts.server.newTikvHandlerTool().store.(kv.Storage) + store := ts.server.newTikvHandlerTool().Store.(kv.Storage) do, err := session.GetDomain(store.(kv.Storage)) c.Assert(err, IsNil) ddl := do.DDL() @@ -737,7 +737,7 @@ func (ts *HTTPHandlerTestSuite) TestAllServerInfo(c *C) { c.Assert(clusterInfo.IsAllServerVersionConsistent, IsTrue) c.Assert(clusterInfo.ServersNum, Equals, 1) - store := ts.server.newTikvHandlerTool().store.(kv.Storage) + store := ts.server.newTikvHandlerTool().Store.(kv.Storage) do, err := session.GetDomain(store.(kv.Storage)) c.Assert(err, IsNil) ddl := do.DDL() diff --git a/server/http_status.go b/server/http_status.go index 0a79ba0a95d70..62bd51c548c4f 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -68,7 +68,7 @@ func (s *Server) startHTTPServer() { router.Handle("/schema/{db}/{table}", schemaHandler{tikvHandlerTool}) router.Handle("/tables/{colID}/{colTp}/{colFlag}/{colLen}", valueHandler{}) router.Handle("/ddl/history", ddlHistoryJobHandler{tikvHandlerTool}).Name("DDL_History") - router.Handle("/ddl/owner/resign", ddlResignOwnerHandler{tikvHandlerTool.store.(kv.Storage)}).Name("DDL_Owner_Resign") + router.Handle("/ddl/owner/resign", ddlResignOwnerHandler{tikvHandlerTool.Store.(kv.Storage)}).Name("DDL_Owner_Resign") // HTTP path for get server info. router.Handle("/info", serverInfoHandler{tikvHandlerTool}).Name("Info") diff --git a/store/helper/helper.go b/store/helper/helper.go new file mode 100644 index 0000000000000..ee1ff6284eff9 --- /dev/null +++ b/store/helper/helper.go @@ -0,0 +1,135 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +const ( + protocol = "http://" +) + +// Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table. +type Helper struct { + Store tikv.Storage + RegionCache *tikv.RegionCache +} + +// GetMvccByEncodedKey get the MVCC value by the specific encoded key. +func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { + keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackoffer(context.Background(), 500), encodedKey) + if err != nil { + return nil, errors.Trace(err) + } + + tikvReq := &tikvrpc.Request{ + Type: tikvrpc.CmdMvccGetByKey, + MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{ + Key: encodedKey, + }, + } + kvResp, err := h.Store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute) + if err != nil { + logutil.Logger(context.Background()).Info("get MVCC by encoded key failed", + zap.Binary("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Binary("startKey", keyLocation.StartKey), + zap.Binary("endKey", keyLocation.EndKey), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + return kvResp.MvccGetByKey, nil +} + +// StoreHotRegionInfos records all hog region stores. +// it's the response of PD. +type StoreHotRegionInfos struct { + AsPeer map[uint64]*hotRegionsStat `json:"as_peer"` + AsLeader map[uint64]*hotRegionsStat `json:"as_leader"` +} + +// hotRegions records echo store's hot region. +// it's the response of PD. +type hotRegionsStat struct { + RegionsStat []regionStat `json:"statistics"` +} + +// regionStat records each hot region's statistics +// it's the response of PD. +type regionStat struct { + RegionID uint64 `json:"region_id"` + FlowBytes uint64 `json:"flow_bytes"` + HotDegree int `json:"hot_degree"` +} + +// RegionMetric presents the final metric output entry. +type RegionMetric struct { + FlowBytes uint64 `json:"flow_bytes"` + MaxHotDegree int `json:"max_hot_degree"` + Count int `json:"region_count"` +} + +// FetchHotRegion fetches the hot region information from PD's http api. +func (h *Helper) FetchHotRegion(rw string) (map[uint64]RegionMetric, error) { + etcd, ok := h.Store.(domain.EtcdBackend) + if !ok { + return nil, errors.WithStack(errors.New("not implemented")) + } + pdHosts := etcd.EtcdAddrs() + if len(pdHosts) == 0 { + return nil, errors.New("pd unavailable") + } + req, err := http.NewRequest("GET", protocol+pdHosts[0]+rw, nil) + if err != nil { + return nil, errors.Trace(err) + } + timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Millisecond) + resp, err := http.DefaultClient.Do(req.WithContext(timeout)) + cancelFunc() + if err != nil { + return nil, errors.Trace(err) + } + defer func() { + err = resp.Body.Close() + if err != nil { + logutil.Logger(context.Background()).Error("close body failed", zap.Error(err)) + } + }() + var regionResp StoreHotRegionInfos + err = json.NewDecoder(resp.Body).Decode(®ionResp) + if err != nil { + return nil, errors.Trace(err) + } + metric := make(map[uint64]RegionMetric) + for _, hotRegions := range regionResp.AsLeader { + for _, region := range hotRegions.RegionsStat { + metric[region.RegionID] = RegionMetric{FlowBytes: region.FlowBytes, MaxHotDegree: region.HotDegree} + } + } + return metric, nil +} diff --git a/store/helper/helper_test.go b/store/helper/helper_test.go new file mode 100644 index 0000000000000..697a46c5b5b81 --- /dev/null +++ b/store/helper/helper_test.go @@ -0,0 +1,118 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "testing" + "time" + + "github.com/gorilla/mux" + . "github.com/pingcap/check" + "github.com/pingcap/log" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/store/tikv" + "go.uber.org/zap" +) + +type HelperTestSuite struct { + store tikv.Storage +} + +var _ = Suite(new(HelperTestSuite)) + +func TestT(t *testing.T) { + CustomVerboseFlag = true + TestingT(t) +} + +type mockStore struct { + tikv.Storage + pdAddrs []string +} + +func (s *mockStore) EtcdAddrs() []string { + return s.pdAddrs +} + +func (s *mockStore) StartGCWorker() error { + panic("not implemented") +} + +func (s *mockStore) TLSConfig() *tls.Config { + panic("not implemented") +} + +func (s *HelperTestSuite) SetUpSuite(c *C) { + go s.mockPDHTTPServer(c) + time.Sleep(100 * time.Millisecond) + mvccStore := mocktikv.MustNewMVCCStore() + mockTikvStore, err := mockstore.NewMockTikvStore(mockstore.WithMVCCStore(mvccStore)) + s.store = &mockStore{ + mockTikvStore.(tikv.Storage), + []string{"127.0.0.1:10090/"}, + } + c.Assert(err, IsNil) +} + +func (s *HelperTestSuite) TestHotRegion(c *C) { + helper := Helper{ + Store: s.store, + RegionCache: s.store.GetRegionCache(), + } + regionMetric, err := helper.FetchHotRegion("/pd/api/v1/hotspot/regions/read") + c.Assert(err, IsNil, Commentf("err: %+v", err)) + c.Assert(fmt.Sprintf("%v", regionMetric), Equals, "map[1:{100 1 0}]") +} + +func (s *HelperTestSuite) mockPDHTTPServer(c *C) { + router := mux.NewRouter() + router.HandleFunc("/pd/api/v1/hotspot/regions/read", s.mockHotRegionResponse) + serverMux := http.NewServeMux() + serverMux.Handle("/", router) + server := &http.Server{Addr: "127.0.0.1:10090", Handler: serverMux} + err := server.ListenAndServe() + c.Assert(err, IsNil) +} + +func (s *HelperTestSuite) mockHotRegionResponse(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + regionsStat := hotRegionsStat{ + []regionStat{ + { + FlowBytes: 100, + RegionID: 1, + HotDegree: 1, + }, + }, + } + resp := StoreHotRegionInfos{ + AsLeader: make(map[uint64]*hotRegionsStat), + } + resp.AsLeader[0] = ®ionsStat + data, err := json.MarshalIndent(resp, "", " ") + if err != nil { + log.Panic("json marshal failed", zap.Error(err)) + } + _, err = w.Write(data) + if err != nil { + log.Panic("write http response failed", zap.Error(err)) + } + +}