From b86008584a9c1a04a0c870c44c45bcf3b3d4b1d6 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 23 Apr 2020 21:17:24 +0800 Subject: [PATCH] *: Support LOCK/UNLOCK TABLES feature (#10343) --- config/config.go | 9 + config/config.toml.example | 3 + config/config_test.go | 2 + ddl/db_test.go | 333 +++++++++++++++++++++++++++++++++ ddl/ddl.go | 5 + ddl/ddl_api.go | 146 ++++++++++++++- ddl/ddl_worker.go | 4 + ddl/table.go | 14 +- ddl/table_lock.go | 229 +++++++++++++++++++++++ ddl/table_test.go | 49 ++++- executor/ddl.go | 19 ++ go.mod | 2 + go.sum | 2 + infoschema/infoschema.go | 58 ++++-- infoschema/perfschema/const.go | 3 +- lock/lock.go | 126 +++++++++++++ planner/core/optimizer.go | 17 ++ planner/core/planbuilder.go | 2 + planner/optimize.go | 4 + session/session.go | 65 +++++++ sessionctx/context.go | 17 +- table/table.go | 5 +- util/mock/context.go | 36 ++++ 23 files changed, 1122 insertions(+), 28 deletions(-) create mode 100644 ddl/table_lock.go create mode 100644 lock/lock.go diff --git a/config/config.go b/config/config.go index bcd007f1b3fae..0549be42cb539 100644 --- a/config/config.go +++ b/config/config.go @@ -103,6 +103,9 @@ type Config struct { TreatOldVersionUTF8AsUTF8MB4 bool `toml:"treat-old-version-utf8-as-utf8mb4" json:"treat-old-version-utf8-as-utf8mb4"` SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"` StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"` + // EnableTableLock indicate whether enable table lock. + // TODO: remove this after table lock features stable. + EnableTableLock bool `toml:"enable-table-lock" json:"enable-table-lock"` } // Log is the log section of config. @@ -376,6 +379,7 @@ var defaultConf = Config{ AlterPrimaryKey: false, TreatOldVersionUTF8AsUTF8MB4: true, SplitRegionMaxNum: 1000, + EnableTableLock: false, TxnLocalLatches: TxnLocalLatches{ Enabled: false, Capacity: 2048000, @@ -652,6 +656,11 @@ func hasRootPrivilege() bool { return os.Geteuid() == 0 } +// TableLockEnabled uses to check whether enabled the table lock feature. +func TableLockEnabled() bool { + return GetGlobalConfig().EnableTableLock +} + // ToLogConfig converts *Log to *logutil.LogConfig. func (l *Log) ToLogConfig() *logutil.LogConfig { return logutil.NewLogConfig(l.Level, l.Format, l.SlowQueryFile, l.File, l.DisableTimestamp, func(config *zaplog.Config) { config.DisableErrorVerbose = l.DisableErrorStack }) diff --git a/config/config.toml.example b/config/config.toml.example index 41185ebdf62f3..594d1886672e9 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -74,6 +74,9 @@ alter-primary-key = false # if server-version = "", the default value(original TiDB version string) is used. server-version = "" +# enable-table-lock is used to control table lock feature. Default is false, indicate the table lock feature is disabled. +enable-table-lock = false + [log] # Log level: debug, info, warn, error, fatal. level = "info" diff --git a/config/config_test.go b/config/config_test.go index bd35ad13f3de5..9a864c1c3c74c 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -69,6 +69,7 @@ alter-primary-key = true split-region-max-num=10000 server-version = "test_version" max-index-length = 3080 +enable-table-lock = true [performance] txn-entry-count-limit=2000 txn-total-size-limit=2000 @@ -115,6 +116,7 @@ history-size=100 c.Assert(conf.StmtSummary.RefreshInterval, Equals, 100) c.Assert(conf.StmtSummary.HistorySize, Equals, 100) c.Assert(conf.MaxIndexLength, Equals, 3080) + c.Assert(conf.EnableTableLock, IsTrue) c.Assert(f.Close(), IsNil) c.Assert(os.Remove(configFile), IsNil) diff --git a/ddl/db_test.go b/ddl/db_test.go index 5fc5cd74c93f0..1d729ca4b854b 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/parser/mysql" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" testddlutil "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/domain" @@ -3222,6 +3223,338 @@ func (s *testDBSuite4) TestAlterShardRowIDBits(c *C) { c.Assert(err.Error(), Equals, "[autoid:1467]Failed to read auto-increment value from storage engine") } +func (s *testDBSuite2) TestLockTables(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + tk := s.tk + tk.MustExec("use test") + tk.MustExec("drop table if exists t1,t2") + defer tk.MustExec("drop table if exists t1,t2") + tk.MustExec("create table t1 (a int)") + tk.MustExec("create table t2 (a int)") + + // recover table lock config. + originValue := config.GetGlobalConfig().EnableTableLock + defer func() { + config.GetGlobalConfig().EnableTableLock = originValue + }() + + // Test for enable table lock config. + config.GetGlobalConfig().EnableTableLock = false + tk.MustExec("lock tables t1 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone) + config.GetGlobalConfig().EnableTableLock = true + + // Test lock 1 table. + tk.MustExec("lock tables t1 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockWrite) + tk.MustExec("lock tables t1 read") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockRead) + tk.MustExec("lock tables t1 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockWrite) + + // Test lock multi tables. + tk.MustExec("lock tables t1 write, t2 read") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockWrite) + checkTableLock(c, tk.Se, "test", "t2", model.TableLockRead) + tk.MustExec("lock tables t1 read, t2 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockRead) + checkTableLock(c, tk.Se, "test", "t2", model.TableLockWrite) + tk.MustExec("lock tables t2 write") + checkTableLock(c, tk.Se, "test", "t2", model.TableLockWrite) + checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone) + tk.MustExec("lock tables t1 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockWrite) + checkTableLock(c, tk.Se, "test", "t2", model.TableLockNone) + + tk2 := testkit.NewTestKit(c, s.store) + tk2.MustExec("use test") + + // Test read lock. + tk.MustExec("lock tables t1 read") + tk.MustQuery("select * from t1") + tk2.MustQuery("select * from t1") + _, err := tk.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLockedForWrite), IsTrue) + _, err = tk.Exec("update t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLockedForWrite), IsTrue) + _, err = tk.Exec("delete from t1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLockedForWrite), IsTrue) + + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("update t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("delete from t1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + tk2.MustExec("lock tables t1 read") + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLockedForWrite), IsTrue) + + // Test write lock. + _, err = tk.Exec("lock tables t1 write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + tk2.MustExec("unlock tables") + tk.MustExec("lock tables t1 write") + tk.MustQuery("select * from t1") + tk.MustExec("delete from t1") + tk.MustExec("insert into t1 set a=1") + + _, err = tk2.Exec("select * from t1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("lock tables t1 write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + + // Test write local lock. + tk.MustExec("lock tables t1 write local") + tk.MustQuery("select * from t1") + tk.MustExec("delete from t1") + tk.MustExec("insert into t1 set a=1") + + tk2.MustQuery("select * from t1") + _, err = tk2.Exec("delete from t1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("lock tables t1 write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("lock tables t1 read") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + + // Test none unique table. + _, err = tk.Exec("lock tables t1 read, t1 write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrNonuniqTable), IsTrue) + + // Test lock table by other session in transaction and commit without retry. + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + tk.MustExec("set @@session.tidb_disable_txn_auto_retry=1") + tk.MustExec("begin") + tk.MustExec("insert into t1 set a=1") + tk2.MustExec("lock tables t1 write") + _, err = tk.Exec("commit") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "previous statement: insert into t1 set a=1: [domain:2]Information schema is changed. [try again later]") + + // Test lock table by other session in transaction and commit with retry. + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + tk.MustExec("begin") + tk.MustExec("insert into t1 set a=1") + tk.MustExec("set @@session.tidb_disable_txn_auto_retry=0") + tk2.MustExec("lock tables t1 write") + _, err = tk.Exec("commit") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue, Commentf("err: %v\n", err)) + + // Test for lock the same table multiple times. + tk2.MustExec("lock tables t1 write") + tk2.MustExec("lock tables t1 write, t2 read") + + // Test lock tables and drop tables + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + tk.MustExec("lock tables t1 write, t2 write") + tk.MustExec("drop table t1") + tk2.MustExec("create table t1 (a int)") + tk.MustExec("lock tables t1 write, t2 read") + + // Test lock tables and drop database. + tk.MustExec("unlock tables") + tk.MustExec("create database test_lock") + tk.MustExec("create table test_lock.t3 (a int)") + tk.MustExec("lock tables t1 write, test_lock.t3 write") + tk2.MustExec("create table t3 (a int)") + tk.MustExec("lock tables t1 write, t3 write") + tk.MustExec("drop table t3") + + // Test lock tables and truncate tables. + tk.MustExec("unlock tables") + tk.MustExec("lock tables t1 write, t2 read") + tk.MustExec("truncate table t1") + tk.MustExec("insert into t1 set a=1") + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + + // Test for lock unsupported schema tables. + _, err = tk2.Exec("lock tables performance_schema.global_status write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrAccessDenied), IsTrue) + _, err = tk2.Exec("lock tables information_schema.tables write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrAccessDenied), IsTrue) + _, err = tk2.Exec("lock tables mysql.db write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrAccessDenied), IsTrue) + + // Test create table/view when session is holding the table locks. + tk.MustExec("unlock tables") + tk.MustExec("lock tables t1 write, t2 read") + _, err = tk.Exec("create table t3 (a int)") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLocked), IsTrue) + _, err = tk.Exec("create view v1 as select * from t1;") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLocked), IsTrue) + + // Test for lock view was not supported. + tk.MustExec("unlock tables") + tk.MustExec("create view v1 as select * from t1;") + _, err = tk.Exec("lock tables v1 read") + c.Assert(terror.ErrorEqual(err, table.ErrUnsupportedOp), IsTrue) + + // Test for create/drop/alter database when session is holding the table locks. + tk.MustExec("unlock tables") + tk.MustExec("lock table t1 write") + _, err = tk.Exec("drop database test") + c.Assert(terror.ErrorEqual(err, table.ErrLockOrActiveTransaction), IsTrue) + _, err = tk.Exec("create database test_lock") + c.Assert(terror.ErrorEqual(err, table.ErrLockOrActiveTransaction), IsTrue) + _, err = tk.Exec("alter database test charset='utf8mb4'") + c.Assert(terror.ErrorEqual(err, table.ErrLockOrActiveTransaction), IsTrue) + // Test alter/drop database when other session is holding the table locks of the database. + tk2.MustExec("create database test_lock2") + _, err = tk2.Exec("drop database test") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("alter database test charset='utf8mb4'") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") +} + +// TestConcurrentLockTables test concurrent lock/unlock tables. +func (s *testDBSuite2) TestConcurrentLockTables(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + tk2 := testkit.NewTestKit(c, s.store) + tk := s.tk + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + defer tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int)") + tk2.MustExec("use test") + + // recover table lock config. + originValue := config.GetGlobalConfig().EnableTableLock + defer func() { + config.GetGlobalConfig().EnableTableLock = originValue + }() + + // Test for enable table lock config. + config.GetGlobalConfig().EnableTableLock = true + + // Test concurrent lock tables read. + sql1 := "lock tables t1 read" + sql2 := "lock tables t1 read" + s.testParallelExecSQL(c, sql1, sql2, tk.Se, tk2.Se, func(c *C, err1, err2 error) { + c.Assert(err1, IsNil) + c.Assert(err2, IsNil) + }) + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + + // Test concurrent lock tables write. + sql1 = "lock tables t1 write" + sql2 = "lock tables t1 write" + s.testParallelExecSQL(c, sql1, sql2, tk.Se, tk2.Se, func(c *C, err1, err2 error) { + c.Assert(err1, IsNil) + c.Assert(terror.ErrorEqual(err2, infoschema.ErrTableLocked), IsTrue) + }) + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + + // Test concurrent lock tables write local. + sql1 = "lock tables t1 write local" + sql2 = "lock tables t1 write local" + s.testParallelExecSQL(c, sql1, sql2, tk.Se, tk2.Se, func(c *C, err1, err2 error) { + c.Assert(err1, IsNil) + c.Assert(terror.ErrorEqual(err2, infoschema.ErrTableLocked), IsTrue) + }) + + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") +} + +func (s *testDBSuite2) testParallelExecSQL(c *C, sql1, sql2 string, se1, se2 session.Session, f checkRet) { + callback := &ddl.TestDDLCallback{} + times := 0 + callback.OnJobRunBeforeExported = func(job *model.Job) { + if times != 0 { + return + } + var qLen int + for { + err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + jobs, err1 := admin.GetDDLJobs(txn) + if err1 != nil { + return err1 + } + qLen = len(jobs) + return nil + }) + c.Assert(err, IsNil) + if qLen == 2 { + break + } + time.Sleep(5 * time.Millisecond) + } + times++ + } + d := s.dom.DDL() + originalCallback := d.GetHook() + defer d.(ddl.DDLForTest).SetHook(originalCallback) + d.(ddl.DDLForTest).SetHook(callback) + + wg := sync.WaitGroup{} + var err1 error + var err2 error + wg.Add(2) + ch := make(chan struct{}) + // Make sure the sql1 is put into the DDLJobQueue. + go func() { + var qLen int + for { + err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + jobs, err3 := admin.GetDDLJobs(txn) + if err3 != nil { + return err3 + } + qLen = len(jobs) + return nil + }) + c.Assert(err, IsNil) + if qLen == 1 { + // Make sure sql2 is executed after the sql1. + close(ch) + break + } + time.Sleep(5 * time.Millisecond) + } + }() + go func() { + defer wg.Done() + _, err1 = se1.Execute(context.Background(), sql1) + }() + go func() { + defer wg.Done() + <-ch + _, err2 = se2.Execute(context.Background(), sql2) + }() + + wg.Wait() + f(c, err1, err2) +} + +func checkTableLock(c *C, se session.Session, dbName, tableName string, lockTp model.TableLockType) { + tb := testGetTableByName(c, se, dbName, tableName) + dom := domain.GetDomain(se) + if lockTp != model.TableLockNone { + c.Assert(tb.Meta().Lock, NotNil) + c.Assert(tb.Meta().Lock.Tp, Equals, lockTp) + c.Assert(tb.Meta().Lock.State, Equals, model.TableLockStatePublic) + c.Assert(len(tb.Meta().Lock.Sessions) == 1, IsTrue) + c.Assert(tb.Meta().Lock.Sessions[0].ServerID, Equals, dom.DDL().GetID()) + c.Assert(tb.Meta().Lock.Sessions[0].SessionID, Equals, se.GetSessionVars().ConnectionID) + } else { + c.Assert(tb.Meta().Lock, IsNil) + } +} + func (s *testDBSuite2) TestDDLWithInvalidTableInfo(c *C) { s.tk = testkit.NewTestKit(c, s.store) tk := s.tk diff --git a/ddl/ddl.go b/ddl/ddl.go index de228e2fe4d62..2c02b65faea22 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -159,6 +159,8 @@ var ( ErrInvalidIndexState = terror.ClassDDL.New(codeInvalidIndexState, "invalid index state") // ErrInvalidForeignKeyState returns for invalid foreign key state. ErrInvalidForeignKeyState = terror.ClassDDL.New(codeInvalidForeignKeyState, "invalid foreign key state") + // ErrInvalidTableLockState returns for invalid table state. + ErrInvalidTableLockState = terror.ClassDDL.New(codeInvalidTableLockState, "invalid table lock state") // ErrUnsupportedModifyPrimaryKey returns an error when add or drop the primary key. // It's exported for testing. ErrUnsupportedModifyPrimaryKey = terror.ClassDDL.New(codeUnsupportedModifyPrimaryKey, "unsupported %s primary key") @@ -254,6 +256,8 @@ type DDL interface { AlterTable(ctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident, isAlterTable bool) error + LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error + UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error // GetLease returns current schema lease time. GetLease() time.Duration @@ -700,6 +704,7 @@ const ( codeInvalidColumnState = 102 codeInvalidIndexState = 103 codeInvalidForeignKeyState = 104 + codeInvalidTableLockState = 105 codeCantDropColWithIndex = 201 codeUnsupportedAddColumn = 202 diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 45320bce44fb0..935beba6c8cfa 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -169,7 +169,22 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - return errors.Trace(err) + if err != nil { + return errors.Trace(err) + } + if !config.TableLockEnabled() { + return nil + } + // Clear table locks hold by the session. + tbs := is.SchemaTables(schema) + lockTableIDs := make([]int64, 0) + for _, tb := range tbs { + if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok { + lockTableIDs = append(lockTableIDs, tb.Meta().ID) + } + } + ctx.ReleaseTableLockByTableIDs(lockTableIDs) + return nil } func checkTooLongSchema(schema model.CIStr) error { @@ -3029,7 +3044,16 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) { err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - return errors.Trace(err) + if err != nil { + return errors.Trace(err) + } + if !config.TableLockEnabled() { + return nil + } + if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok { + ctx.ReleaseTableLockByTableIDs([]int64{tb.Meta().ID}) + } + return nil } // DropView will proceed even if some view in the list does not exists. @@ -3074,9 +3098,28 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error { BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{newTableID}, } + if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok && config.TableLockEnabled() { + // AddTableLock here to avoid this ddl job was executed successfully but the session was been kill before return. + // The session will release all table locks it holds, if we don't add the new locking table id here, + // the session may forget to release the new locked table id when this ddl job was executed successfully + // but the session was killed before return. + ctx.AddTableLock(([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: newTableID, Tp: tb.Meta().Lock.Tp}})) + } err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - return errors.Trace(err) + if err != nil { + if config.TableLockEnabled() { + ctx.ReleaseTableLockByTableIDs([]int64{newTableID}) + } + return errors.Trace(err) + } + if !config.TableLockEnabled() { + return nil + } + if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok { + ctx.ReleaseTableLockByTableIDs([]int64{tb.Meta().ID}) + } + return nil } func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, isAlterTable bool) error { @@ -3553,3 +3596,100 @@ func extractCollateFromOption(def *ast.ColumnDef) []string { } return specifiedCollates } + +// LockTables uses to execute lock tables statement. +func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error { + lockTables := make([]model.TableLockTpInfo, 0, len(stmt.TableLocks)) + sessionInfo := model.SessionInfo{ + ServerID: d.GetID(), + SessionID: ctx.GetSessionVars().ConnectionID, + } + uniqueTableID := make(map[int64]struct{}) + // Check whether the table was already locked by other. + for _, tl := range stmt.TableLocks { + tb := tl.Table + // TODO: replace const string "performance_schema" with xxx.LowerName. + // Currently use perfschema.LowerName will have import cycle problem. + if tb.Schema.L == infoschema.LowerName || tb.Schema.L == "performance_schema" || tb.Schema.L == mysql.SystemDB { + if ctx.GetSessionVars().User != nil { + return infoschema.ErrAccessDenied.GenWithStackByArgs(ctx.GetSessionVars().User.Username, ctx.GetSessionVars().User.Hostname) + } + return infoschema.ErrAccessDenied + } + schema, t, err := d.getSchemaAndTableByIdent(ctx, ast.Ident{Schema: tb.Schema, Name: tb.Name}) + if err != nil { + return errors.Trace(err) + } + if t.Meta().IsView() { + return table.ErrUnsupportedOp.GenWithStackByArgs() + } + err = checkTableLocked(t.Meta(), tl.Type, sessionInfo) + if err != nil { + return err + } + if _, ok := uniqueTableID[t.Meta().ID]; ok { + return infoschema.ErrNonuniqTable.GenWithStackByArgs(t.Meta().Name) + } + uniqueTableID[t.Meta().ID] = struct{}{} + lockTables = append(lockTables, model.TableLockTpInfo{SchemaID: schema.ID, TableID: t.Meta().ID, Tp: tl.Type}) + } + + unlockTables := ctx.GetAllTableLocks() + arg := &lockTablesArg{ + LockTables: lockTables, + UnlockTables: unlockTables, + SessionInfo: sessionInfo, + } + job := &model.Job{ + SchemaID: lockTables[0].SchemaID, + TableID: lockTables[0].TableID, + Type: model.ActionLockTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{arg}, + } + // AddTableLock here is avoiding this job was executed successfully but the session was killed before return. + ctx.AddTableLock(lockTables) + err := d.doDDLJob(ctx, job) + if err == nil { + ctx.ReleaseTableLocks(unlockTables) + ctx.AddTableLock(lockTables) + } + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + +// UnlockTables uses to execute unlock tables statement. +func (d *ddl) UnlockTables(ctx sessionctx.Context, unlockTables []model.TableLockTpInfo) error { + if len(unlockTables) == 0 { + return nil + } + arg := &lockTablesArg{ + UnlockTables: unlockTables, + SessionInfo: model.SessionInfo{ + ServerID: d.GetID(), + SessionID: ctx.GetSessionVars().ConnectionID, + }, + } + job := &model.Job{ + SchemaID: unlockTables[0].SchemaID, + TableID: unlockTables[0].TableID, + Type: model.ActionUnlockTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{arg}, + } + + err := d.doDDLJob(ctx, job) + if err == nil { + ctx.ReleaseAllTableLocks() + } + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + +type lockTablesArg struct { + LockTables []model.TableLockTpInfo + IndexOfLock int + UnlockTables []model.TableLockTpInfo + IndexOfUnlock int + SessionInfo model.SessionInfo +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 5ef40f319c6ad..8cbd9439fbbbe 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -603,6 +603,10 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onModifyTableCharsetAndCollate(t, job) case model.ActionRecoverTable: ver, err = w.onRecoverTable(d, t, job) + case model.ActionLockTable: + ver, err = onLockTables(t, job) + case model.ActionUnlockTable: + ver, err = onUnlockTables(t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/table.go b/ddl/table.go index 651eb28f5950b..a44368195fb9e 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -359,12 +359,21 @@ func getTableInfoAndCancelFaultJob(t *meta.Meta, job *model.Job, schemaID int64) } func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) { - tableID := job.TableID + tblInfo, err := getTableInfo(t, job.TableID, schemaID) + if err == nil { + return tblInfo, nil + } + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { + job.State = model.JobStateCancelled + } + return nil, err +} + +func getTableInfo(t *meta.Meta, tableID, schemaID int64) (*model.TableInfo, error) { // Check this table's database. tblInfo, err := t.GetTable(schemaID, tableID) if err != nil { if meta.ErrDBNotExists.Equal(err) { - job.State = model.JobStateCancelled return nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs( fmt.Sprintf("(Schema ID %d)", schemaID), )) @@ -374,7 +383,6 @@ func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID // Check the table. if tblInfo == nil { - job.State = model.JobStateCancelled return nil, errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs( fmt.Sprintf("(Schema ID %d)", schemaID), fmt.Sprintf("(Table ID %d)", tableID), diff --git a/ddl/table_lock.go b/ddl/table_lock.go new file mode 100644 index 0000000000000..03d798a3911ab --- /dev/null +++ b/ddl/table_lock.go @@ -0,0 +1,229 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/meta" +) + +func onLockTables(t *meta.Meta, job *model.Job) (ver int64, err error) { + arg := &lockTablesArg{} + if err := job.DecodeArgs(arg); err != nil { + // Invalid arguments, cancel this job. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + // Unlock table first. + if arg.IndexOfUnlock < len(arg.UnlockTables) { + return unlockTables(t, job, arg) + } + + // Check table locked by other, this can be only checked at the first time. + if arg.IndexOfLock == 0 { + for i, tl := range arg.LockTables { + job.SchemaID = tl.SchemaID + job.TableID = tl.TableID + tbInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, err + } + err = checkTableLocked(tbInfo, arg.LockTables[i].Tp, arg.SessionInfo) + if err != nil { + // If any request table was locked by other session, just cancel this job. + // No need to rolling back the unlocked tables, MySQL will release the lock first + // and block if the request table was locked by other. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } + } + + // Lock tables. + if arg.IndexOfLock < len(arg.LockTables) { + job.SchemaID = arg.LockTables[arg.IndexOfLock].SchemaID + job.TableID = arg.LockTables[arg.IndexOfLock].TableID + var tbInfo *model.TableInfo + tbInfo, err = getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, err + } + err = lockTable(tbInfo, arg.IndexOfLock, arg) + if err != nil { + job.State = model.JobStateCancelled + return ver, err + } + + switch tbInfo.Lock.State { + case model.TableLockStateNone: + // none -> pre_lock + tbInfo.Lock.State = model.TableLockStatePreLock + tbInfo.Lock.TS = t.StartTS + ver, err = updateVersionAndTableInfo(t, job, tbInfo, true) + // If the state of the lock is public, it means the lock is a read lock and already locked by other session, + // so this request of lock table doesn't need pre-lock state, just update the TS and table info is ok. + case model.TableLockStatePreLock, model.TableLockStatePublic: + tbInfo.Lock.State = model.TableLockStatePublic + tbInfo.Lock.TS = t.StartTS + ver, err = updateVersionAndTableInfo(t, job, tbInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + arg.IndexOfLock++ + job.Args = []interface{}{arg} + if arg.IndexOfLock == len(arg.LockTables) { + // Finish this job. + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, nil) + } + default: + job.State = model.JobStateCancelled + return ver, ErrInvalidTableLockState.GenWithStack("invalid table lock state %v", tbInfo.Lock.State) + } + } + + return ver, err +} + +// findSessionInfoIndex gets the index of sessionInfo in the sessions. return -1 if sessions doesn't contain the sessionInfo. +func findSessionInfoIndex(sessions []model.SessionInfo, sessionInfo model.SessionInfo) int { + for i := range sessions { + if sessions[i].ServerID == sessionInfo.ServerID && sessions[i].SessionID == sessionInfo.SessionID { + return i + } + } + return -1 +} + +// lockTable uses to check table locked and acquire the table lock for the request session. +func lockTable(tbInfo *model.TableInfo, idx int, arg *lockTablesArg) error { + if !tbInfo.IsLocked() { + tbInfo.Lock = &model.TableLockInfo{ + Tp: arg.LockTables[idx].Tp, + } + tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, arg.SessionInfo) + return nil + } + // If the state of the lock is in pre-lock, then the lock must be locked by the current request. So we can just return here. + // Because the lock/unlock job must be serial execution in DDL owner now. + if tbInfo.Lock.State == model.TableLockStatePreLock { + return nil + } + if tbInfo.Lock.Tp == model.TableLockRead && arg.LockTables[idx].Tp == model.TableLockRead { + sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo) + // repeat lock. + if sessionIndex >= 0 { + return nil + } + tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, arg.SessionInfo) + return nil + } + + // Unlock tables should execute before lock tables. + // Normally execute to here is impossible. + return infoschema.ErrTableLocked.GenWithStackByArgs(tbInfo.Name.L, tbInfo.Lock.Tp, tbInfo.Lock.Sessions[0]) +} + +// checkTableLocked uses to check whether table was locked. +func checkTableLocked(tbInfo *model.TableInfo, lockTp model.TableLockType, sessionInfo model.SessionInfo) error { + if !tbInfo.IsLocked() { + return nil + } + if tbInfo.Lock.State == model.TableLockStatePreLock { + return nil + } + if tbInfo.Lock.Tp == model.TableLockRead && lockTp == model.TableLockRead { + return nil + } + sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, sessionInfo) + // If the request session already locked the table before, In other words, repeat lock. + if sessionIndex >= 0 { + if tbInfo.Lock.Tp == lockTp { + return nil + } + // If no other session locked this table. + if len(tbInfo.Lock.Sessions) == 1 { + return nil + } + } + return infoschema.ErrTableLocked.GenWithStackByArgs(tbInfo.Name.L, tbInfo.Lock.Tp, tbInfo.Lock.Sessions[0]) +} + +// unlockTables uses unlock a batch of table lock one by one. +func unlockTables(t *meta.Meta, job *model.Job, arg *lockTablesArg) (ver int64, err error) { + if arg.IndexOfUnlock >= len(arg.UnlockTables) { + return ver, nil + } + job.SchemaID = arg.UnlockTables[arg.IndexOfUnlock].SchemaID + job.TableID = arg.UnlockTables[arg.IndexOfUnlock].TableID + tbInfo, err := getTableInfo(t, job.TableID, job.SchemaID) + if err != nil { + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { + // The table maybe has been dropped. just ignore this err and go on. + arg.IndexOfUnlock++ + job.Args = []interface{}{arg} + return ver, nil + } + return ver, err + } + + needUpdateTableInfo := unlockTable(tbInfo, arg) + if needUpdateTableInfo { + ver, err = updateVersionAndTableInfo(t, job, tbInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + } + + arg.IndexOfUnlock++ + job.Args = []interface{}{arg} + return ver, nil +} + +// unlockTable uses to unlock table lock that hold by the session. +func unlockTable(tbInfo *model.TableInfo, arg *lockTablesArg) (needUpdateTableInfo bool) { + if !tbInfo.IsLocked() { + return false + } + sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo) + if sessionIndex < 0 { + // When session clean table lock, session maybe send unlock table even the table lock maybe not hold by the session. + // so just ignore and return here. + return false + } + oldSessionInfo := tbInfo.Lock.Sessions + tbInfo.Lock.Sessions = oldSessionInfo[:sessionIndex] + tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, oldSessionInfo[sessionIndex+1:]...) + if len(tbInfo.Lock.Sessions) == 0 { + tbInfo.Lock = nil + } + return true +} + +func onUnlockTables(t *meta.Meta, job *model.Job) (ver int64, err error) { + arg := &lockTablesArg{} + if err := job.DecodeArgs(arg); err != nil { + // Invalid arguments, cancel this job. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + ver, err = unlockTables(t, job, arg) + if arg.IndexOfUnlock == len(arg.UnlockTables) { + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, nil) + } + return ver, err +} diff --git a/ddl/table_test.go b/ddl/table_test.go index ca189139b7d9d..335893cbe0c9b 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -181,6 +181,47 @@ func testRenameTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID, oldSchem return job } +func testLockTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo, lockTp model.TableLockType) *model.Job { + arg := &lockTablesArg{ + LockTables: []model.TableLockTpInfo{{SchemaID: newSchemaID, TableID: tblInfo.ID, Tp: lockTp}}, + SessionInfo: model.SessionInfo{ + ServerID: d.GetID(), + SessionID: ctx.GetSessionVars().ConnectionID, + }, + } + job := &model.Job{ + SchemaID: newSchemaID, + TableID: tblInfo.ID, + Type: model.ActionLockTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{arg}, + } + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + + v := getSchemaVer(c, ctx) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + return job +} + +func checkTableLockedTest(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, serverID string, sessionID uint64, lockTp model.TableLockType) { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + info, err := t.GetTable(dbInfo.ID, tblInfo.ID) + c.Assert(err, IsNil) + + c.Assert(info, NotNil) + c.Assert(info.Lock, NotNil) + c.Assert(len(info.Lock.Sessions) == 1, IsTrue) + c.Assert(info.Lock.Sessions[0].ServerID, Equals, serverID) + c.Assert(info.Lock.Sessions[0].SessionID, Equals, sessionID) + c.Assert(info.Lock.Tp, Equals, lockTp) + c.Assert(info.Lock.State, Equals, model.TableLockStatePublic) + return nil + }) + c.Assert(err, IsNil) +} + func testDropTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { job := &model.Job{ SchemaID: dbInfo.ID, @@ -217,7 +258,7 @@ func testTruncateTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInf } func testCheckTableState(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, state model.SchemaState) { - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) info, err := t.GetTable(dbInfo.ID, tblInfo.ID) c.Assert(err, IsNil) @@ -231,6 +272,7 @@ func testCheckTableState(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.Tabl c.Assert(info.State, Equals, state) return nil }) + c.Assert(err, IsNil) } func testGetTable(c *C, d *ddl, schemaID int64, tableID int64) table.Table { @@ -317,6 +359,11 @@ func (s *testTableSuite) TestTable(c *C) { job = testRenameTable(c, ctx, d, dbInfo1.ID, s.dbInfo.ID, tblInfo) testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic) testCheckJobDone(c, d, job, true) + + job = testLockTable(c, ctx, d, dbInfo1.ID, tblInfo, model.TableLockWrite) + testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDone(c, d, job, true) + checkTableLockedTest(c, d, dbInfo1, tblInfo, d.GetID(), ctx.GetSessionVars().ConnectionID, model.TableLockWrite) } func (s *testTableSuite) TestTableResume(c *C) { diff --git a/executor/ddl.go b/executor/ddl.go index ce52a704b3cf4..71ff1cdaf009c 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -109,6 +109,10 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { err = e.executeRenameTable(x) case *ast.TruncateTableStmt: err = e.executeTruncateTable(x) + case *ast.LockTablesStmt: + err = e.executeLockTables(x) + case *ast.UnlockTablesStmt: + err = e.executeUnlockTables(x) } if err != nil { return e.toErr(err) @@ -433,3 +437,18 @@ func (e *DDLExec) getRecoverTableByTableName(s *ast.RecoverTableStmt, t *meta.Me } return job, tblInfo, nil } + +func (e *DDLExec) executeLockTables(s *ast.LockTablesStmt) error { + if !config.TableLockEnabled() { + return nil + } + return domain.GetDomain(e.ctx).DDL().LockTables(e.ctx, s) +} + +func (e *DDLExec) executeUnlockTables(s *ast.UnlockTablesStmt) error { + if !config.TableLockEnabled() { + return nil + } + lockedTables := e.ctx.GetAllTableLocks() + return domain.GetDomain(e.ctx).DDL().UnlockTables(e.ctx, lockedTables) +} diff --git a/go.mod b/go.mod index 6be181e7d4e9e..e256e1c9253e9 100644 --- a/go.mod +++ b/go.mod @@ -75,3 +75,5 @@ require ( ) go 1.13 + +replace github.com/pingcap/parser => github.com/crazycs520/parser v0.0.0-20200423125022-56ac4c2800b5 diff --git a/go.sum b/go.sum index d571a525bd092..9bd9317025d5b 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/crazycs520/parser v0.0.0-20200423125022-56ac4c2800b5 h1:laZfKQtlsxTe939jlgjyrdjveuSD5R1d/e7xV6mbROc= +github.com/crazycs520/parser v0.0.0-20200423125022-56ac4c2800b5/go.mod h1:xLjI+gnWYexq011WPMEvCNS8rFM9qe1vdojIEzSKPuc= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65 h1:hxuZop6tSoOi0sxFzoGGYdRqNrPubyaIf9KoBG9tPiE= diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 46884395c2f7e..8c0a145285051 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -64,6 +64,16 @@ var ( ErrMultiplePriKey = terror.ClassSchema.New(codeMultiplePriKey, "Multiple primary key defined") // ErrTooManyKeyParts returns for too many key parts. ErrTooManyKeyParts = terror.ClassSchema.New(codeTooManyKeyParts, "Too many key parts specified; max %d parts allowed") + // ErrTableNotLockedForWrite returns for write tables when only hold the table read lock. + ErrTableNotLockedForWrite = terror.ClassSchema.New(codeErrTableNotLockedForWrite, mysql.MySQLErrName[mysql.ErrTableNotLockedForWrite]) + // ErrTableNotLocked returns when session has explicitly lock tables, then visit unlocked table will return this error. + ErrTableNotLocked = terror.ClassSchema.New(codeErrTableNotLocked, mysql.MySQLErrName[mysql.ErrTableNotLocked]) + // ErrNonuniqTable returns when none unique tables errors. + ErrNonuniqTable = terror.ClassSchema.New(codeErrTableNotLocked, mysql.MySQLErrName[mysql.ErrNonuniqTable]) + // ErrTableLocked returns when the table was locked by other session. + ErrTableLocked = terror.ClassSchema.New(codeTableLocked, mysql.MySQLErrName[mysql.ErrTableLocked]) + // ErrAccessDenied return when the user doesn't have the permission to access the table. + ErrAccessDenied = terror.ClassSchema.New(codeErrAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDenied]) ) // InfoSchema is the interface used to retrieve the schema information. @@ -90,7 +100,8 @@ type InfoSchema interface { // Information Schema Name. const ( - Name = "INFORMATION_SCHEMA" + Name = "INFORMATION_SCHEMA" + LowerName = "information_schema" ) type sortedTables []table.Table @@ -330,27 +341,38 @@ const ( codeTooManyKeyParts = 1070 codeKeyNameDuplicate = 1061 codeKeyNotExists = 1176 + + codeErrTableNotLockedForWrite = mysql.ErrTableNotLockedForWrite + codeErrTableNotLocked = mysql.ErrTableNotLocked + codeErrNonuniqTable = mysql.ErrNonuniqTable + codeErrAccessDenied = mysql.ErrAccessDenied + codeTableLocked = mysql.ErrTableLocked ) func init() { schemaMySQLErrCodes := map[terror.ErrCode]uint16{ - codeDBDropExists: mysql.ErrDBDropExists, - codeDatabaseNotExists: mysql.ErrBadDB, - codeTableNotExists: mysql.ErrNoSuchTable, - codeColumnNotExists: mysql.ErrBadField, - codeCannotAddForeign: mysql.ErrCannotAddForeign, - codeWrongFkDef: mysql.ErrWrongFkDef, - codeForeignKeyNotExists: mysql.ErrCantDropFieldOrKey, - codeDatabaseExists: mysql.ErrDBCreateExists, - codeTableExists: mysql.ErrTableExists, - codeBadTable: mysql.ErrBadTable, - codeBadUser: mysql.ErrBadUser, - codeColumnExists: mysql.ErrDupFieldName, - codeIndexExists: mysql.ErrDupIndex, - codeMultiplePriKey: mysql.ErrMultiplePriKey, - codeTooManyKeyParts: mysql.ErrTooManyKeyParts, - codeKeyNameDuplicate: mysql.ErrDupKeyName, - codeKeyNotExists: mysql.ErrKeyDoesNotExist, + codeDBDropExists: mysql.ErrDBDropExists, + codeDatabaseNotExists: mysql.ErrBadDB, + codeTableNotExists: mysql.ErrNoSuchTable, + codeColumnNotExists: mysql.ErrBadField, + codeCannotAddForeign: mysql.ErrCannotAddForeign, + codeWrongFkDef: mysql.ErrWrongFkDef, + codeForeignKeyNotExists: mysql.ErrCantDropFieldOrKey, + codeDatabaseExists: mysql.ErrDBCreateExists, + codeTableExists: mysql.ErrTableExists, + codeBadTable: mysql.ErrBadTable, + codeBadUser: mysql.ErrBadUser, + codeColumnExists: mysql.ErrDupFieldName, + codeIndexExists: mysql.ErrDupIndex, + codeMultiplePriKey: mysql.ErrMultiplePriKey, + codeTooManyKeyParts: mysql.ErrTooManyKeyParts, + codeKeyNameDuplicate: mysql.ErrDupKeyName, + codeKeyNotExists: mysql.ErrKeyDoesNotExist, + codeErrTableNotLockedForWrite: mysql.ErrTableNotLockedForWrite, + codeErrTableNotLocked: mysql.ErrTableNotLocked, + codeErrNonuniqTable: mysql.ErrNonuniqTable, + mysql.ErrAccessDenied: mysql.ErrAccessDenied, + codeTableLocked: mysql.ErrTableLocked, } terror.ErrClassToMySQLCodes[terror.ClassSchema] = schemaMySQLErrCodes initInfoSchemaDB() diff --git a/infoschema/perfschema/const.go b/infoschema/perfschema/const.go index 47307d216d24c..84a0b837da478 100644 --- a/infoschema/perfschema/const.go +++ b/infoschema/perfschema/const.go @@ -15,7 +15,8 @@ package perfschema // Performance Schema Name. const ( - Name = "PERFORMANCE_SCHEMA" + Name = "PERFORMANCE_SCHEMA" + LowerName = "performance_schema" ) // perfSchemaTables is a shortcut to involve all table names. diff --git a/lock/lock.go b/lock/lock.go new file mode 100644 index 0000000000000..f96879b2a0a5d --- /dev/null +++ b/lock/lock.go @@ -0,0 +1,126 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package lock + +import ( + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/infoschema/perfschema" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/table" +) + +// Checker uses to check tables lock. +type Checker struct { + ctx sessionctx.Context + is infoschema.InfoSchema +} + +// NewChecker return new lock Checker. +func NewChecker(ctx sessionctx.Context, is infoschema.InfoSchema) *Checker { + return &Checker{ctx: ctx, is: is} +} + +// CheckTableLock uses to check table lock. +func (c *Checker) CheckTableLock(db, table string, privilege mysql.PrivilegeType) error { + if db == "" && table == "" { + return nil + } + // Below database are not support table lock. + if db == infoschema.LowerName || db == perfschema.LowerName || db == mysql.SystemDB { + return nil + } + // check operation on database. + if table == "" { + return c.CheckLockInDB(db, privilege) + } + switch privilege { + case mysql.ShowDBPriv, mysql.AllPrivMask: + // AllPrivMask only used in show create table statement now. + return nil + case mysql.CreatePriv, mysql.CreateViewPriv: + if c.ctx.HasLockedTables() { + // TODO: For `create table t_exists ...` statement, mysql will check out `t_exists` first, but in TiDB now, + // will return below error first. + return infoschema.ErrTableNotLocked.GenWithStackByArgs(table) + } + return nil + } + // TODO: try to remove this get for speed up. + tb, err := c.is.TableByName(model.NewCIStr(db), model.NewCIStr(table)) + // Ignore this error for "drop table if not exists t1" when t1 doesn't exists. + if infoschema.ErrTableNotExists.Equal(err) { + return nil + } + if err != nil { + return err + } + if c.ctx.HasLockedTables() { + if locked, tp := c.ctx.CheckTableLocked(tb.Meta().ID); locked { + if checkLockTpMeetPrivilege(tp, privilege) { + return nil + } + return infoschema.ErrTableNotLockedForWrite.GenWithStackByArgs(tb.Meta().Name) + } + return infoschema.ErrTableNotLocked.GenWithStackByArgs(tb.Meta().Name) + } + + if tb.Meta().Lock == nil { + return nil + } + + if privilege == mysql.SelectPriv { + switch tb.Meta().Lock.Tp { + case model.TableLockRead, model.TableLockWriteLocal: + return nil + } + } + return infoschema.ErrTableLocked.GenWithStackByArgs(tb.Meta().Name.L, tb.Meta().Lock.Tp, tb.Meta().Lock.Sessions[0]) +} + +func checkLockTpMeetPrivilege(tp model.TableLockType, privilege mysql.PrivilegeType) bool { + switch tp { + case model.TableLockWrite, model.TableLockWriteLocal: + return true + case model.TableLockRead: + // ShowDBPriv, AllPrivMask,CreatePriv, CreateViewPriv already checked before. + // The other privilege in read lock was not allowed. + if privilege == mysql.SelectPriv { + return true + } + } + return false +} + +// CheckLockInDB uses to check operation on database. +func (c *Checker) CheckLockInDB(db string, privilege mysql.PrivilegeType) error { + if c.ctx.HasLockedTables() { + switch privilege { + case mysql.CreatePriv, mysql.DropPriv, mysql.AlterPriv: + return table.ErrLockOrActiveTransaction.GenWithStackByArgs() + } + } + if privilege == mysql.CreatePriv { + return nil + } + tables := c.is.SchemaTables(model.NewCIStr(db)) + for _, tbl := range tables { + err := c.CheckTableLock(db, tbl.Meta().Name.L, privilege) + if err != nil { + return err + } + } + return nil +} diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 04ac09e2be074..805219595b58a 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -20,8 +20,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/lock" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" @@ -100,6 +102,21 @@ func CheckPrivilege(activeRoles []*auth.RoleIdentity, pm privilege.Manager, vs [ return nil } +// CheckTableLock checks the table lock. +func CheckTableLock(ctx sessionctx.Context, is infoschema.InfoSchema, vs []visitInfo) error { + if !config.TableLockEnabled() { + return nil + } + checker := lock.NewChecker(ctx, is) + for i := range vs { + err := checker.CheckTableLock(vs[i].db, vs[i].table, vs[i].privilege) + if err != nil { + return err + } + } + return nil +} + // DoOptimize optimizes a logical plan to a physical plan. func DoOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, error) { logic, err := logicalOptimize(ctx, flag, logic) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index d65f121be0bce..41aca6d6d5487 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -2301,6 +2301,8 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err case *ast.RecoverTableStmt: // Recover table command can only be executed by administrator. b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil) + case *ast.LockTablesStmt, *ast.UnlockTablesStmt: + // TODO: add Lock Table privilege check. } p := &DDL{Statement: node} return p, nil diff --git a/planner/optimize.go b/planner/optimize.go index 7302b1f9819af..5b71f0708afb1 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -57,6 +57,10 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in } } + if err := plannercore.CheckTableLock(sctx, is, builder.GetVisitInfo()); err != nil { + return nil, err + } + // Handle the execute statement. if execPlan, ok := p.(*plannercore.Execute); ok { err := execPlan.OptimizePreparedPlan(ctx, sctx, is) diff --git a/session/session.go b/session/session.go index 0d7b7c763a4a7..9fcccf32981aa 100644 --- a/session/session.go +++ b/session/session.go @@ -186,6 +186,59 @@ type session struct { statsCollector *handle.SessionStatsCollector // ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement; ddlOwnerChecker owner.DDLOwnerChecker + // lockedTables use to record the table locks hold by the session. + lockedTables map[int64]model.TableLockTpInfo +} + +// AddTableLock adds table lock to the session lock map. +func (s *session) AddTableLock(locks []model.TableLockTpInfo) { + for _, l := range locks { + s.lockedTables[l.TableID] = l + } +} + +// ReleaseTableLocks releases table lock in the session lock map. +func (s *session) ReleaseTableLocks(locks []model.TableLockTpInfo) { + for _, l := range locks { + delete(s.lockedTables, l.TableID) + } +} + +// ReleaseTableLockByTableIDs releases table lock in the session lock map by table ID. +func (s *session) ReleaseTableLockByTableIDs(tableIDs []int64) { + for _, tblID := range tableIDs { + delete(s.lockedTables, tblID) + } +} + +// CheckTableLocked checks the table lock. +func (s *session) CheckTableLocked(tblID int64) (bool, model.TableLockType) { + lt, ok := s.lockedTables[tblID] + if !ok { + return false, model.TableLockNone + } + return true, lt.Tp +} + +// GetAllTableLocks gets all table locks table id and db id hold by the session. +func (s *session) GetAllTableLocks() []model.TableLockTpInfo { + lockTpInfo := make([]model.TableLockTpInfo, 0, len(s.lockedTables)) + for _, tl := range s.lockedTables { + lockTpInfo = append(lockTpInfo, tl) + } + return lockTpInfo +} + +// HasLockedTables uses to check whether this session locked any tables. +// If so, the session can only visit the table which locked by self. +func (s *session) HasLockedTables() bool { + b := len(s.lockedTables) > 0 + return b +} + +// ReleaseAllTableLocks releases all table locks hold by the session. +func (s *session) ReleaseAllTableLocks() { + s.lockedTables = make(map[int64]model.TableLockTpInfo) } // DDLOwnerChecker returns s.ddlOwnerChecker. @@ -1347,7 +1400,17 @@ func (s *session) ClearValue(key fmt.Stringer) { type inCloseSession struct{} // Close function does some clean work when session end. +// Close should release the table locks which hold by the session. func (s *session) Close() { + // TODO: do clean table locks when session exited without execute Close. + // TODO: do clean table locks when tidb-server was `kill -9`. + if s.HasLockedTables() && config.TableLockEnabled() { + lockedTables := s.GetAllTableLocks() + err := domain.GetDomain(s).DDL().UnlockTables(s, lockedTables) + if err != nil { + logutil.Logger(context.Background()).Error("release table lock failed", zap.Uint64("conn", s.sessionVars.ConnectionID)) + } + } if s.statsCollector != nil { s.statsCollector.Delete() } @@ -1596,6 +1659,7 @@ func createSession(store kv.Storage) (*session, error) { plannercore.PreparedPlanCacheMemoryGuardRatio, plannercore.PreparedPlanCacheMaxMemory.Load()) } s.mu.values = make(map[fmt.Stringer]interface{}) + s.lockedTables = make(map[int64]model.TableLockTpInfo) domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s @@ -1619,6 +1683,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er plannercore.PreparedPlanCacheMemoryGuardRatio, plannercore.PreparedPlanCacheMaxMemory.Load()) } s.mu.values = make(map[fmt.Stringer]interface{}) + s.lockedTables = make(map[int64]model.TableLockTpInfo) domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s diff --git a/sessionctx/context.go b/sessionctx/context.go index 0c7c834369406..80d8b94c3823b 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -17,12 +17,13 @@ import ( "context" "fmt" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" - binlog "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tipb/go-binlog" ) // Context is an interface for transaction and executive args environment. @@ -87,6 +88,20 @@ type Context interface { DDLOwnerChecker() owner.DDLOwnerChecker // PrepareTxnFuture uses to prepare txn by future. PrepareTxnFuture(ctx context.Context) + // AddTableLock adds table lock to the session lock map. + AddTableLock([]model.TableLockTpInfo) + // ReleaseTableLocks releases table locks in the session lock map. + ReleaseTableLocks(locks []model.TableLockTpInfo) + // ReleaseTableLockByTableID releases table locks in the session lock map by table ID. + ReleaseTableLockByTableIDs(tableIDs []int64) + // CheckTableLocked checks the table lock. + CheckTableLocked(tblID int64) (bool, model.TableLockType) + // GetAllTableLocks gets all table locks table id and db id hold by the session. + GetAllTableLocks() []model.TableLockTpInfo + // ReleaseAllTableLocks releases all table locks hold by the session. + ReleaseAllTableLocks() + // HasLockedTables uses to check whether this session locked any tables. + HasLockedTables() bool } type basicCtxType int diff --git a/table/table.go b/table/table.go index 5931a7879270b..018c51b64b139 100644 --- a/table/table.go +++ b/table/table.go @@ -80,11 +80,12 @@ var ( ErrTruncateWrongValue = terror.ClassTable.New(codeTruncateWrongValue, "incorrect value") // ErrTruncatedWrongValueForField returns for truncate wrong value for field. ErrTruncatedWrongValueForField = terror.ClassTable.New(codeTruncateWrongValue, mysql.MySQLErrName[mysql.ErrTruncatedWrongValueForField]) - // ErrUnknownPartition returns unknown partition error. ErrUnknownPartition = terror.ClassTable.New(codeUnknownPartition, mysql.MySQLErrName[mysql.ErrUnknownPartition]) // ErrNoPartitionForGivenValue returns table has no partition for value. ErrNoPartitionForGivenValue = terror.ClassTable.New(codeNoPartitionForGivenValue, mysql.MySQLErrName[mysql.ErrNoPartitionForGivenValue]) + // ErrLockOrActiveTransaction returns when execute unsupported statement in a lock session or an active transaction. + ErrLockOrActiveTransaction = terror.ClassTable.New(codeLockOrActiveTransaction, mysql.MySQLErrName[mysql.ErrLockOrActiveTransaction]) ) // RecordIterFunc is used for low-level record iteration. @@ -244,6 +245,7 @@ const ( codeUnknownPartition = mysql.ErrUnknownPartition codeNoPartitionForGivenValue = mysql.ErrNoPartitionForGivenValue + codeLockOrActiveTransaction = mysql.ErrLockOrActiveTransaction ) // Slice is used for table sorting. @@ -266,6 +268,7 @@ func init() { codeTruncateWrongValue: mysql.ErrTruncatedWrongValueForField, codeUnknownPartition: mysql.ErrUnknownPartition, codeNoPartitionForGivenValue: mysql.ErrNoPartitionForGivenValue, + codeLockOrActiveTransaction: mysql.ErrLockOrActiveTransaction, } terror.ErrClassToMySQLCodes[terror.ClassTable] = tableMySQLErrCodes } diff --git a/util/mock/context.go b/util/mock/context.go index 3c69a48bbad55..6e0521871d9c2 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" @@ -223,6 +224,41 @@ func (c *Context) StmtAddDirtyTableOP(op int, tid int64, handle int64) { func (c *Context) PrepareTxnFuture(ctx context.Context) { } +// AddTableLock implements the sessionctx.Context interface. +func (c *Context) AddTableLock(_ []model.TableLockTpInfo) { +} + +// ReleaseTableLocks implements the sessionctx.Context interface. +func (c *Context) ReleaseTableLocks(locks []model.TableLockTpInfo) { +} + +// ReleaseTableLockByTableIDs implements the sessionctx.Context interface. +func (c *Context) ReleaseTableLockByTableIDs(tableIDs []int64) { +} + +// CheckTableLocked implements the sessionctx.Context interface. +func (c *Context) CheckTableLocked(_ int64) (bool, model.TableLockType) { + return false, model.TableLockNone +} + +// GetAllTableLocks implements the sessionctx.Context interface. +func (c *Context) GetAllTableLocks() []model.TableLockTpInfo { + return nil +} + +// ReleaseAllTableLocks implements the sessionctx.Context interface. +func (c *Context) ReleaseAllTableLocks() { +} + +// HasLockedTables implements the sessionctx.Context interface. +func (c *Context) HasLockedTables() bool { + return false +} + +// Close implements the sessionctx.Context interface. +func (c *Context) Close() { +} + // NewContext creates a new mocked sessionctx.Context. func NewContext() *Context { ctx, cancel := context.WithCancel(context.Background())