diff --git a/config/config.go b/config/config.go index 13288f7c7c950..b9e29b70ad1c1 100644 --- a/config/config.go +++ b/config/config.go @@ -87,6 +87,9 @@ type Config struct { // TreatOldVersionUTF8AsUTF8MB4 is use to treat old version table/column UTF8 charset as UTF8MB4. This is for compatibility. // Currently not support dynamic modify, because this need to reload all old version schema. TreatOldVersionUTF8AsUTF8MB4 bool `toml:"treat-old-version-utf8-as-utf8mb4" json:"treat-old-version-utf8-as-utf8mb4"` + // EnableTableLock indicate whether enable table lock. + // TODO: remove this after table lock features stable. + EnableTableLock bool `toml:"enable-table-lock" json:"enable-table-lock"` } // Log is the log section of config. @@ -321,6 +324,7 @@ var defaultConf = Config{ EnableStreaming: false, CheckMb4ValueInUTF8: true, TreatOldVersionUTF8AsUTF8MB4: true, + EnableTableLock: false, TxnLocalLatches: TxnLocalLatches{ Enabled: true, Capacity: 2048000, @@ -574,6 +578,11 @@ func hasRootPrivilege() bool { return os.Geteuid() == 0 } +// TableLockEnabled uses to check whether enabled the table lock feature. +func TableLockEnabled() bool { + return GetGlobalConfig().EnableTableLock +} + // ToLogConfig converts *Log to *logutil.LogConfig. func (l *Log) ToLogConfig() *logutil.LogConfig { return logutil.NewLogConfig(l.Level, l.Format, l.SlowQueryFile, l.File, l.DisableTimestamp) diff --git a/config/config.toml.example b/config/config.toml.example index 0e788b089d689..d7bba079cd6d8 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -54,6 +54,9 @@ check-mb4-value-in-utf8 = true # treat-old-version-utf8-as-utf8mb4 use for upgrade compatibility. Set to true will treat old version table/column UTF8 charset as UTF8MB4. treat-old-version-utf8-as-utf8mb4 = true +# enable-table-lock is used to control table lock feature. Default is false, indicate the table lock feature is disabled. +enable-table-lock = false + [log] # Log level: debug, info, warn, error, fatal. level = "info" diff --git a/config/config_test.go b/config/config_test.go index 173878c62c0ed..299cbeaeb81b0 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -60,6 +60,7 @@ unrecognized-option-test = true _, err = f.WriteString(` token-limit = 0 +enable-table-lock = true [performance] [tikv-client] commit-timeout="41s" @@ -78,6 +79,7 @@ max-batch-size=128 c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s") c.Assert(conf.TiKVClient.MaxBatchSize, Equals, uint(128)) c.Assert(conf.TokenLimit, Equals, uint(1000)) + c.Assert(conf.EnableTableLock, IsTrue) c.Assert(f.Close(), IsNil) c.Assert(os.Remove(configFile), IsNil) diff --git a/ddl/db_test.go b/ddl/db_test.go index 0b4537c725cc3..d176d60130f0b 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/parser/mysql" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" testddlutil "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/domain" @@ -2762,6 +2763,338 @@ func (s *testDBSuite4) TestAlterShardRowIDBits(c *C) { c.Assert(err.Error(), Equals, "[autoid:1467]Failed to read auto-increment value from storage engine") } +func (s *testDBSuite2) TestLockTables(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + tk := s.tk + tk.MustExec("use test") + tk.MustExec("drop table if exists t1,t2") + defer tk.MustExec("drop table if exists t1,t2") + tk.MustExec("create table t1 (a int)") + tk.MustExec("create table t2 (a int)") + + // recover table lock config. + originValue := config.GetGlobalConfig().EnableTableLock + defer func() { + config.GetGlobalConfig().EnableTableLock = originValue + }() + + // Test for enable table lock config. + config.GetGlobalConfig().EnableTableLock = false + tk.MustExec("lock tables t1 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone) + config.GetGlobalConfig().EnableTableLock = true + + // Test lock 1 table. + tk.MustExec("lock tables t1 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockWrite) + tk.MustExec("lock tables t1 read") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockRead) + tk.MustExec("lock tables t1 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockWrite) + + // Test lock multi tables. + tk.MustExec("lock tables t1 write, t2 read") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockWrite) + checkTableLock(c, tk.Se, "test", "t2", model.TableLockRead) + tk.MustExec("lock tables t1 read, t2 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockRead) + checkTableLock(c, tk.Se, "test", "t2", model.TableLockWrite) + tk.MustExec("lock tables t2 write") + checkTableLock(c, tk.Se, "test", "t2", model.TableLockWrite) + checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone) + tk.MustExec("lock tables t1 write") + checkTableLock(c, tk.Se, "test", "t1", model.TableLockWrite) + checkTableLock(c, tk.Se, "test", "t2", model.TableLockNone) + + tk2 := testkit.NewTestKit(c, s.store) + tk2.MustExec("use test") + + // Test read lock. + tk.MustExec("lock tables t1 read") + tk.MustQuery("select * from t1") + tk2.MustQuery("select * from t1") + _, err := tk.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLockedForWrite), IsTrue) + _, err = tk.Exec("update t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLockedForWrite), IsTrue) + _, err = tk.Exec("delete from t1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLockedForWrite), IsTrue) + + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("update t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("delete from t1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + tk2.MustExec("lock tables t1 read") + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLockedForWrite), IsTrue) + + // Test write lock. + _, err = tk.Exec("lock tables t1 write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + tk2.MustExec("unlock tables") + tk.MustExec("lock tables t1 write") + tk.MustQuery("select * from t1") + tk.MustExec("delete from t1") + tk.MustExec("insert into t1 set a=1") + + _, err = tk2.Exec("select * from t1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("lock tables t1 write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + + // Test write local lock. + tk.MustExec("lock tables t1 write local") + tk.MustQuery("select * from t1") + tk.MustExec("delete from t1") + tk.MustExec("insert into t1 set a=1") + + tk2.MustQuery("select * from t1") + _, err = tk2.Exec("delete from t1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("lock tables t1 write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("lock tables t1 read") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + + // Test none unique table. + _, err = tk.Exec("lock tables t1 read, t1 write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrNonuniqTable), IsTrue) + + // Test lock table by other session in transaction and commit without retry. + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + tk.MustExec("set @@session.tidb_disable_txn_auto_retry=1") + tk.MustExec("begin") + tk.MustExec("insert into t1 set a=1") + tk2.MustExec("lock tables t1 write") + _, err = tk.Exec("commit") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[domain:2]Information schema is changed. [try again later]") + + // Test lock table by other session in transaction and commit with retry. + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + tk.MustExec("begin") + tk.MustExec("insert into t1 set a=1") + tk.MustExec("set @@session.tidb_disable_txn_auto_retry=0") + tk2.MustExec("lock tables t1 write") + _, err = tk.Exec("commit") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue, Commentf("err: %v\n", err)) + + // Test for lock the same table multiple times. + tk2.MustExec("lock tables t1 write") + tk2.MustExec("lock tables t1 write, t2 read") + + // Test lock tables and drop tables + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + tk.MustExec("lock tables t1 write, t2 write") + tk.MustExec("drop table t1") + tk2.MustExec("create table t1 (a int)") + tk.MustExec("lock tables t1 write, t2 read") + + // Test lock tables and drop database. + tk.MustExec("unlock tables") + tk.MustExec("create database test_lock") + tk.MustExec("create table test_lock.t3 (a int)") + tk.MustExec("lock tables t1 write, test_lock.t3 write") + tk2.MustExec("create table t3 (a int)") + tk.MustExec("lock tables t1 write, t3 write") + tk.MustExec("drop table t3") + + // Test lock tables and truncate tables. + tk.MustExec("unlock tables") + tk.MustExec("lock tables t1 write, t2 read") + tk.MustExec("truncate table t1") + tk.MustExec("insert into t1 set a=1") + _, err = tk2.Exec("insert into t1 set a=1") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + + // Test for lock unsupported schema tables. + _, err = tk2.Exec("lock tables performance_schema.global_status write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrAccessDenied), IsTrue) + _, err = tk2.Exec("lock tables information_schema.tables write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrAccessDenied), IsTrue) + _, err = tk2.Exec("lock tables mysql.db write") + c.Assert(terror.ErrorEqual(err, infoschema.ErrAccessDenied), IsTrue) + + // Test create table/view when session is holding the table locks. + tk.MustExec("unlock tables") + tk.MustExec("lock tables t1 write, t2 read") + _, err = tk.Exec("create table t3 (a int)") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLocked), IsTrue) + _, err = tk.Exec("create view v1 as select * from t1;") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableNotLocked), IsTrue) + + // Test for lock view was not supported. + tk.MustExec("unlock tables") + tk.MustExec("create view v1 as select * from t1;") + _, err = tk.Exec("lock tables v1 read") + c.Assert(terror.ErrorEqual(err, table.ErrUnsupportedOp), IsTrue) + + // Test for create/drop/alter database when session is holding the table locks. + tk.MustExec("unlock tables") + tk.MustExec("lock table t1 write") + _, err = tk.Exec("drop database test") + c.Assert(terror.ErrorEqual(err, table.ErrLockOrActiveTransaction), IsTrue) + _, err = tk.Exec("create database test_lock") + c.Assert(terror.ErrorEqual(err, table.ErrLockOrActiveTransaction), IsTrue) + _, err = tk.Exec("alter database test charset='utf8mb4'") + c.Assert(terror.ErrorEqual(err, table.ErrLockOrActiveTransaction), IsTrue) + // Test alter/drop database when other session is holding the table locks of the database. + tk2.MustExec("create database test_lock2") + _, err = tk2.Exec("drop database test") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + _, err = tk2.Exec("alter database test charset='utf8mb4'") + c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue) + + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") +} + +// TestConcurrentLockTables test concurrent lock/unlock tables. +func (s *testDBSuite2) TestConcurrentLockTables(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + tk2 := testkit.NewTestKit(c, s.store) + tk := s.tk + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + defer tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int)") + tk2.MustExec("use test") + + // recover table lock config. + originValue := config.GetGlobalConfig().EnableTableLock + defer func() { + config.GetGlobalConfig().EnableTableLock = originValue + }() + + // Test for enable table lock config. + config.GetGlobalConfig().EnableTableLock = true + + // Test concurrent lock tables read. + sql1 := "lock tables t1 read" + sql2 := "lock tables t1 read" + s.testParallelExecSQL(c, sql1, sql2, tk.Se, tk2.Se, func(c *C, err1, err2 error) { + c.Assert(err1, IsNil) + c.Assert(err2, IsNil) + }) + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + + // Test concurrent lock tables write. + sql1 = "lock tables t1 write" + sql2 = "lock tables t1 write" + s.testParallelExecSQL(c, sql1, sql2, tk.Se, tk2.Se, func(c *C, err1, err2 error) { + c.Assert(err1, IsNil) + c.Assert(terror.ErrorEqual(err2, infoschema.ErrTableLocked), IsTrue) + }) + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") + + // Test concurrent lock tables write local. + sql1 = "lock tables t1 write local" + sql2 = "lock tables t1 write local" + s.testParallelExecSQL(c, sql1, sql2, tk.Se, tk2.Se, func(c *C, err1, err2 error) { + c.Assert(err1, IsNil) + c.Assert(terror.ErrorEqual(err2, infoschema.ErrTableLocked), IsTrue) + }) + + tk.MustExec("unlock tables") + tk2.MustExec("unlock tables") +} + +func (s *testDBSuite2) testParallelExecSQL(c *C, sql1, sql2 string, se1, se2 session.Session, f checkRet) { + callback := &ddl.TestDDLCallback{} + times := 0 + callback.OnJobRunBeforeExported = func(job *model.Job) { + if times != 0 { + return + } + var qLen int + for { + err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + jobs, err1 := admin.GetDDLJobs(txn) + if err1 != nil { + return err1 + } + qLen = len(jobs) + return nil + }) + c.Assert(err, IsNil) + if qLen == 2 { + break + } + time.Sleep(5 * time.Millisecond) + } + times++ + } + d := s.dom.DDL() + originalCallback := d.GetHook() + defer d.(ddl.DDLForTest).SetHook(originalCallback) + d.(ddl.DDLForTest).SetHook(callback) + + wg := sync.WaitGroup{} + var err1 error + var err2 error + wg.Add(2) + ch := make(chan struct{}) + // Make sure the sql1 is put into the DDLJobQueue. + go func() { + var qLen int + for { + err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + jobs, err3 := admin.GetDDLJobs(txn) + if err3 != nil { + return err3 + } + qLen = len(jobs) + return nil + }) + c.Assert(err, IsNil) + if qLen == 1 { + // Make sure sql2 is executed after the sql1. + close(ch) + break + } + time.Sleep(5 * time.Millisecond) + } + }() + go func() { + defer wg.Done() + _, err1 = se1.Execute(context.Background(), sql1) + }() + go func() { + defer wg.Done() + <-ch + _, err2 = se2.Execute(context.Background(), sql2) + }() + + wg.Wait() + f(c, err1, err2) +} + +func checkTableLock(c *C, se session.Session, dbName, tableName string, lockTp model.TableLockType) { + tb := testGetTableByName(c, se, dbName, tableName) + dom := domain.GetDomain(se) + if lockTp != model.TableLockNone { + c.Assert(tb.Meta().Lock, NotNil) + c.Assert(tb.Meta().Lock.Tp, Equals, lockTp) + c.Assert(tb.Meta().Lock.State, Equals, model.TableLockStatePublic) + c.Assert(len(tb.Meta().Lock.Sessions) == 1, IsTrue) + c.Assert(tb.Meta().Lock.Sessions[0].ServerID, Equals, dom.DDL().GetID()) + c.Assert(tb.Meta().Lock.Sessions[0].SessionID, Equals, se.GetSessionVars().ConnectionID) + } else { + c.Assert(tb.Meta().Lock, IsNil) + } +} + func (s *testDBSuite2) TestDDLWithInvalidTableInfo(c *C) { s.tk = testkit.NewTestKit(c, s.store) tk := s.tk diff --git a/ddl/ddl.go b/ddl/ddl.go index 4f45c53b3f7c6..66ae7cb911ea3 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -154,6 +154,8 @@ var ( ErrInvalidIndexState = terror.ClassDDL.New(codeInvalidIndexState, "invalid index state") // ErrInvalidForeignKeyState returns for invalid foreign key state. ErrInvalidForeignKeyState = terror.ClassDDL.New(codeInvalidForeignKeyState, "invalid foreign key state") + // ErrInvalidTableLockState returns for invalid table state. + ErrInvalidTableLockState = terror.ClassDDL.New(codeInvalidTableLockState, "invalid table lock state") // ErrUnsupportedModifyPrimaryKey returns an error when add or drop the primary key. // It's exported for testing. ErrUnsupportedModifyPrimaryKey = terror.ClassDDL.New(codeUnsupportedModifyPrimaryKey, "unsupported %s primary key") @@ -247,6 +249,8 @@ type DDL interface { AlterTable(ctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident, isAlterTable bool) error + LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error + UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error // GetLease returns current schema lease time. GetLease() time.Duration @@ -656,6 +660,7 @@ const ( codeInvalidColumnState = 102 codeInvalidIndexState = 103 codeInvalidForeignKeyState = 104 + codeInvalidTableLockState = 105 codeCantDropColWithIndex = 201 codeUnsupportedAddColumn = 202 diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 291a5b316687a..ea6353a46c564 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" field_types "github.com/pingcap/parser/types" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/meta/autoid" @@ -163,7 +164,22 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - return errors.Trace(err) + if err != nil { + return errors.Trace(err) + } + if !config.TableLockEnabled() { + return nil + } + // Clear table locks hold by the session. + tbs := is.SchemaTables(schema) + lockTableIDs := make([]int64, 0) + for _, tb := range tbs { + if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok { + lockTableIDs = append(lockTableIDs, tb.Meta().ID) + } + } + ctx.ReleaseTableLockByTableIDs(lockTableIDs) + return nil } func checkTooLongSchema(schema model.CIStr) error { @@ -2859,7 +2875,16 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) { err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - return errors.Trace(err) + if err != nil { + return errors.Trace(err) + } + if !config.TableLockEnabled() { + return nil + } + if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok { + ctx.ReleaseTableLockByTableIDs([]int64{tb.Meta().ID}) + } + return nil } // DropView will proceed even if some view in the list does not exists. @@ -2902,9 +2927,28 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error { BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{newTableID}, } + if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok && config.TableLockEnabled() { + // AddTableLock here to avoid this ddl job was executed successfully but the session was been kill before return. + // The session will release all table locks it holds, if we don't add the new locking table id here, + // the session may forget to release the new locked table id when this ddl job was executed successfully + // but the session was killed before return. + ctx.AddTableLock(([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: newTableID, Tp: tb.Meta().Lock.Tp}})) + } err = d.doDDLJob(ctx, job) err = d.callHookOnChanged(err) - return errors.Trace(err) + if err != nil { + if config.TableLockEnabled() { + ctx.ReleaseTableLockByTableIDs([]int64{newTableID}) + } + return errors.Trace(err) + } + if !config.TableLockEnabled() { + return nil + } + if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok { + ctx.ReleaseTableLockByTableIDs([]int64{tb.Meta().ID}) + } + return nil } func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, isAlterTable bool) error { @@ -3232,3 +3276,100 @@ func buildPartitionInfo(meta *model.TableInfo, d *ddl, spec *ast.AlterTableSpec) } return part, nil } + +// LockTables uses to execute lock tables statement. +func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error { + lockTables := make([]model.TableLockTpInfo, 0, len(stmt.TableLocks)) + sessionInfo := model.SessionInfo{ + ServerID: d.GetID(), + SessionID: ctx.GetSessionVars().ConnectionID, + } + uniqueTableID := make(map[int64]struct{}) + // Check whether the table was already locked by other. + for _, tl := range stmt.TableLocks { + tb := tl.Table + // TODO: replace const string "performance_schema" with xxx.LowerName. + // Currently use perfschema.LowerName will have import cycle problem. + if tb.Schema.L == infoschema.LowerName || tb.Schema.L == "performance_schema" || tb.Schema.L == mysql.SystemDB { + if ctx.GetSessionVars().User != nil { + return infoschema.ErrAccessDenied.GenWithStackByArgs(ctx.GetSessionVars().User.Username, ctx.GetSessionVars().User.Hostname) + } + return infoschema.ErrAccessDenied + } + schema, t, err := d.getSchemaAndTableByIdent(ctx, ast.Ident{Schema: tb.Schema, Name: tb.Name}) + if err != nil { + return errors.Trace(err) + } + if t.Meta().IsView() { + return table.ErrUnsupportedOp.GenWithStackByArgs() + } + err = checkTableLocked(t.Meta(), tl.Type, sessionInfo) + if err != nil { + return err + } + if _, ok := uniqueTableID[t.Meta().ID]; ok { + return infoschema.ErrNonuniqTable.GenWithStackByArgs(t.Meta().Name) + } + uniqueTableID[t.Meta().ID] = struct{}{} + lockTables = append(lockTables, model.TableLockTpInfo{SchemaID: schema.ID, TableID: t.Meta().ID, Tp: tl.Type}) + } + + unlockTables := ctx.GetAllTableLocks() + arg := &lockTablesArg{ + LockTables: lockTables, + UnlockTables: unlockTables, + SessionInfo: sessionInfo, + } + job := &model.Job{ + SchemaID: lockTables[0].SchemaID, + TableID: lockTables[0].TableID, + Type: model.ActionLockTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{arg}, + } + // AddTableLock here is avoiding this job was executed successfully but the session was killed before return. + ctx.AddTableLock(lockTables) + err := d.doDDLJob(ctx, job) + if err == nil { + ctx.ReleaseTableLocks(unlockTables) + ctx.AddTableLock(lockTables) + } + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + +// UnlockTables uses to execute unlock tables statement. +func (d *ddl) UnlockTables(ctx sessionctx.Context, unlockTables []model.TableLockTpInfo) error { + if len(unlockTables) == 0 { + return nil + } + arg := &lockTablesArg{ + UnlockTables: unlockTables, + SessionInfo: model.SessionInfo{ + ServerID: d.GetID(), + SessionID: ctx.GetSessionVars().ConnectionID, + }, + } + job := &model.Job{ + SchemaID: unlockTables[0].SchemaID, + TableID: unlockTables[0].TableID, + Type: model.ActionUnlockTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{arg}, + } + + err := d.doDDLJob(ctx, job) + if err == nil { + ctx.ReleaseAllTableLocks() + } + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + +type lockTablesArg struct { + LockTables []model.TableLockTpInfo + IndexOfLock int + UnlockTables []model.TableLockTpInfo + IndexOfUnlock int + SessionInfo model.SessionInfo +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 2151d9efebb78..752c66b996e36 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -537,6 +537,10 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onModifyTableCharsetAndCollate(t, job) case model.ActionRecoverTable: ver, err = w.onRecoverTable(d, t, job) + case model.ActionLockTable: + ver, err = onLockTables(t, job) + case model.ActionUnlockTable: + ver, err = onUnlockTables(t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/table.go b/ddl/table.go index a466f40e2f2af..1ee96144732ac 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -454,12 +454,21 @@ func getTableInfoAndCancelFaultJob(t *meta.Meta, job *model.Job, schemaID int64) } func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) { - tableID := job.TableID + tblInfo, err := getTableInfo(t, job.TableID, schemaID) + if err == nil { + return tblInfo, nil + } + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { + job.State = model.JobStateCancelled + } + return nil, err +} + +func getTableInfo(t *meta.Meta, tableID, schemaID int64) (*model.TableInfo, error) { // Check this table's database. tblInfo, err := t.GetTable(schemaID, tableID) if err != nil { if meta.ErrDBNotExists.Equal(err) { - job.State = model.JobStateCancelled return nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs( fmt.Sprintf("(Schema ID %d)", schemaID), )) @@ -469,7 +478,6 @@ func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID // Check the table. if tblInfo == nil { - job.State = model.JobStateCancelled return nil, errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs( fmt.Sprintf("(Schema ID %d)", schemaID), fmt.Sprintf("(Table ID %d)", tableID), diff --git a/ddl/table_lock.go b/ddl/table_lock.go new file mode 100644 index 0000000000000..03d798a3911ab --- /dev/null +++ b/ddl/table_lock.go @@ -0,0 +1,229 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/meta" +) + +func onLockTables(t *meta.Meta, job *model.Job) (ver int64, err error) { + arg := &lockTablesArg{} + if err := job.DecodeArgs(arg); err != nil { + // Invalid arguments, cancel this job. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + // Unlock table first. + if arg.IndexOfUnlock < len(arg.UnlockTables) { + return unlockTables(t, job, arg) + } + + // Check table locked by other, this can be only checked at the first time. + if arg.IndexOfLock == 0 { + for i, tl := range arg.LockTables { + job.SchemaID = tl.SchemaID + job.TableID = tl.TableID + tbInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, err + } + err = checkTableLocked(tbInfo, arg.LockTables[i].Tp, arg.SessionInfo) + if err != nil { + // If any request table was locked by other session, just cancel this job. + // No need to rolling back the unlocked tables, MySQL will release the lock first + // and block if the request table was locked by other. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } + } + + // Lock tables. + if arg.IndexOfLock < len(arg.LockTables) { + job.SchemaID = arg.LockTables[arg.IndexOfLock].SchemaID + job.TableID = arg.LockTables[arg.IndexOfLock].TableID + var tbInfo *model.TableInfo + tbInfo, err = getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, err + } + err = lockTable(tbInfo, arg.IndexOfLock, arg) + if err != nil { + job.State = model.JobStateCancelled + return ver, err + } + + switch tbInfo.Lock.State { + case model.TableLockStateNone: + // none -> pre_lock + tbInfo.Lock.State = model.TableLockStatePreLock + tbInfo.Lock.TS = t.StartTS + ver, err = updateVersionAndTableInfo(t, job, tbInfo, true) + // If the state of the lock is public, it means the lock is a read lock and already locked by other session, + // so this request of lock table doesn't need pre-lock state, just update the TS and table info is ok. + case model.TableLockStatePreLock, model.TableLockStatePublic: + tbInfo.Lock.State = model.TableLockStatePublic + tbInfo.Lock.TS = t.StartTS + ver, err = updateVersionAndTableInfo(t, job, tbInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + arg.IndexOfLock++ + job.Args = []interface{}{arg} + if arg.IndexOfLock == len(arg.LockTables) { + // Finish this job. + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, nil) + } + default: + job.State = model.JobStateCancelled + return ver, ErrInvalidTableLockState.GenWithStack("invalid table lock state %v", tbInfo.Lock.State) + } + } + + return ver, err +} + +// findSessionInfoIndex gets the index of sessionInfo in the sessions. return -1 if sessions doesn't contain the sessionInfo. +func findSessionInfoIndex(sessions []model.SessionInfo, sessionInfo model.SessionInfo) int { + for i := range sessions { + if sessions[i].ServerID == sessionInfo.ServerID && sessions[i].SessionID == sessionInfo.SessionID { + return i + } + } + return -1 +} + +// lockTable uses to check table locked and acquire the table lock for the request session. +func lockTable(tbInfo *model.TableInfo, idx int, arg *lockTablesArg) error { + if !tbInfo.IsLocked() { + tbInfo.Lock = &model.TableLockInfo{ + Tp: arg.LockTables[idx].Tp, + } + tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, arg.SessionInfo) + return nil + } + // If the state of the lock is in pre-lock, then the lock must be locked by the current request. So we can just return here. + // Because the lock/unlock job must be serial execution in DDL owner now. + if tbInfo.Lock.State == model.TableLockStatePreLock { + return nil + } + if tbInfo.Lock.Tp == model.TableLockRead && arg.LockTables[idx].Tp == model.TableLockRead { + sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo) + // repeat lock. + if sessionIndex >= 0 { + return nil + } + tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, arg.SessionInfo) + return nil + } + + // Unlock tables should execute before lock tables. + // Normally execute to here is impossible. + return infoschema.ErrTableLocked.GenWithStackByArgs(tbInfo.Name.L, tbInfo.Lock.Tp, tbInfo.Lock.Sessions[0]) +} + +// checkTableLocked uses to check whether table was locked. +func checkTableLocked(tbInfo *model.TableInfo, lockTp model.TableLockType, sessionInfo model.SessionInfo) error { + if !tbInfo.IsLocked() { + return nil + } + if tbInfo.Lock.State == model.TableLockStatePreLock { + return nil + } + if tbInfo.Lock.Tp == model.TableLockRead && lockTp == model.TableLockRead { + return nil + } + sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, sessionInfo) + // If the request session already locked the table before, In other words, repeat lock. + if sessionIndex >= 0 { + if tbInfo.Lock.Tp == lockTp { + return nil + } + // If no other session locked this table. + if len(tbInfo.Lock.Sessions) == 1 { + return nil + } + } + return infoschema.ErrTableLocked.GenWithStackByArgs(tbInfo.Name.L, tbInfo.Lock.Tp, tbInfo.Lock.Sessions[0]) +} + +// unlockTables uses unlock a batch of table lock one by one. +func unlockTables(t *meta.Meta, job *model.Job, arg *lockTablesArg) (ver int64, err error) { + if arg.IndexOfUnlock >= len(arg.UnlockTables) { + return ver, nil + } + job.SchemaID = arg.UnlockTables[arg.IndexOfUnlock].SchemaID + job.TableID = arg.UnlockTables[arg.IndexOfUnlock].TableID + tbInfo, err := getTableInfo(t, job.TableID, job.SchemaID) + if err != nil { + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { + // The table maybe has been dropped. just ignore this err and go on. + arg.IndexOfUnlock++ + job.Args = []interface{}{arg} + return ver, nil + } + return ver, err + } + + needUpdateTableInfo := unlockTable(tbInfo, arg) + if needUpdateTableInfo { + ver, err = updateVersionAndTableInfo(t, job, tbInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + } + + arg.IndexOfUnlock++ + job.Args = []interface{}{arg} + return ver, nil +} + +// unlockTable uses to unlock table lock that hold by the session. +func unlockTable(tbInfo *model.TableInfo, arg *lockTablesArg) (needUpdateTableInfo bool) { + if !tbInfo.IsLocked() { + return false + } + sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo) + if sessionIndex < 0 { + // When session clean table lock, session maybe send unlock table even the table lock maybe not hold by the session. + // so just ignore and return here. + return false + } + oldSessionInfo := tbInfo.Lock.Sessions + tbInfo.Lock.Sessions = oldSessionInfo[:sessionIndex] + tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, oldSessionInfo[sessionIndex+1:]...) + if len(tbInfo.Lock.Sessions) == 0 { + tbInfo.Lock = nil + } + return true +} + +func onUnlockTables(t *meta.Meta, job *model.Job) (ver int64, err error) { + arg := &lockTablesArg{} + if err := job.DecodeArgs(arg); err != nil { + // Invalid arguments, cancel this job. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + ver, err = unlockTables(t, job, arg) + if arg.IndexOfUnlock == len(arg.UnlockTables) { + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, nil) + } + return ver, err +} diff --git a/ddl/table_test.go b/ddl/table_test.go index ca189139b7d9d..335893cbe0c9b 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -181,6 +181,47 @@ func testRenameTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID, oldSchem return job } +func testLockTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo, lockTp model.TableLockType) *model.Job { + arg := &lockTablesArg{ + LockTables: []model.TableLockTpInfo{{SchemaID: newSchemaID, TableID: tblInfo.ID, Tp: lockTp}}, + SessionInfo: model.SessionInfo{ + ServerID: d.GetID(), + SessionID: ctx.GetSessionVars().ConnectionID, + }, + } + job := &model.Job{ + SchemaID: newSchemaID, + TableID: tblInfo.ID, + Type: model.ActionLockTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{arg}, + } + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + + v := getSchemaVer(c, ctx) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + return job +} + +func checkTableLockedTest(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, serverID string, sessionID uint64, lockTp model.TableLockType) { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + info, err := t.GetTable(dbInfo.ID, tblInfo.ID) + c.Assert(err, IsNil) + + c.Assert(info, NotNil) + c.Assert(info.Lock, NotNil) + c.Assert(len(info.Lock.Sessions) == 1, IsTrue) + c.Assert(info.Lock.Sessions[0].ServerID, Equals, serverID) + c.Assert(info.Lock.Sessions[0].SessionID, Equals, sessionID) + c.Assert(info.Lock.Tp, Equals, lockTp) + c.Assert(info.Lock.State, Equals, model.TableLockStatePublic) + return nil + }) + c.Assert(err, IsNil) +} + func testDropTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { job := &model.Job{ SchemaID: dbInfo.ID, @@ -217,7 +258,7 @@ func testTruncateTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInf } func testCheckTableState(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, state model.SchemaState) { - kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) info, err := t.GetTable(dbInfo.ID, tblInfo.ID) c.Assert(err, IsNil) @@ -231,6 +272,7 @@ func testCheckTableState(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.Tabl c.Assert(info.State, Equals, state) return nil }) + c.Assert(err, IsNil) } func testGetTable(c *C, d *ddl, schemaID int64, tableID int64) table.Table { @@ -317,6 +359,11 @@ func (s *testTableSuite) TestTable(c *C) { job = testRenameTable(c, ctx, d, dbInfo1.ID, s.dbInfo.ID, tblInfo) testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic) testCheckJobDone(c, d, job, true) + + job = testLockTable(c, ctx, d, dbInfo1.ID, tblInfo, model.TableLockWrite) + testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDone(c, d, job, true) + checkTableLockedTest(c, d, dbInfo1, tblInfo, d.GetID(), ctx.GetSessionVars().ConnectionID, model.TableLockWrite) } func (s *testTableSuite) TestTableResume(c *C) { diff --git a/executor/ddl.go b/executor/ddl.go index c7fbc4e79781a..d0d04f9d6e857 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -109,6 +109,10 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error) err = e.executeRenameTable(x) case *ast.TruncateTableStmt: err = e.executeTruncateTable(x) + case *ast.LockTablesStmt: + err = e.executeLockTables(x) + case *ast.UnlockTablesStmt: + err = e.executeUnlockTables(x) } if err != nil { return e.toErr(err) @@ -433,3 +437,18 @@ func (e *DDLExec) getRecoverTableByTableName(s *ast.RecoverTableStmt, t *meta.Me } return job, tblInfo, nil } + +func (e *DDLExec) executeLockTables(s *ast.LockTablesStmt) error { + if !config.TableLockEnabled() { + return nil + } + return domain.GetDomain(e.ctx).DDL().LockTables(e.ctx, s) +} + +func (e *DDLExec) executeUnlockTables(s *ast.UnlockTablesStmt) error { + if !config.TableLockEnabled() { + return nil + } + lockedTables := e.ctx.GetAllTableLocks() + return domain.GetDomain(e.ctx).DDL().UnlockTables(e.ctx, lockedTables) +} diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index ed455b3dacfcb..843b20f72a1bf 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -60,6 +60,16 @@ var ( ErrMultiplePriKey = terror.ClassSchema.New(codeMultiplePriKey, "Multiple primary key defined") // ErrTooManyKeyParts returns for too many key parts. ErrTooManyKeyParts = terror.ClassSchema.New(codeTooManyKeyParts, "Too many key parts specified; max %d parts allowed") + // ErrTableNotLockedForWrite returns for write tables when only hold the table read lock. + ErrTableNotLockedForWrite = terror.ClassSchema.New(codeErrTableNotLockedForWrite, mysql.MySQLErrName[mysql.ErrTableNotLockedForWrite]) + // ErrTableNotLocked returns when session has explicitly lock tables, then visit unlocked table will return this error. + ErrTableNotLocked = terror.ClassSchema.New(codeErrTableNotLocked, mysql.MySQLErrName[mysql.ErrTableNotLocked]) + // ErrNonuniqTable returns when none unique tables errors. + ErrNonuniqTable = terror.ClassSchema.New(codeErrTableNotLocked, mysql.MySQLErrName[mysql.ErrNonuniqTable]) + // ErrTableLocked returns when the table was locked by other session. + ErrTableLocked = terror.ClassSchema.New(codeTableLocked, mysql.MySQLErrName[mysql.ErrTableLocked]) + // ErrAccessDenied return when the user doesn't have the permission to access the table. + ErrAccessDenied = terror.ClassSchema.New(codeErrAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDenied]) ) // InfoSchema is the interface used to retrieve the schema information. @@ -86,7 +96,8 @@ type InfoSchema interface { // Information Schema Name. const ( - Name = "INFORMATION_SCHEMA" + Name = "INFORMATION_SCHEMA" + LowerName = "information_schema" ) type sortedTables []table.Table @@ -321,27 +332,38 @@ const ( codeTooManyKeyParts = 1070 codeKeyNameDuplicate = 1061 codeKeyNotExists = 1176 + + codeErrTableNotLockedForWrite = mysql.ErrTableNotLockedForWrite + codeErrTableNotLocked = mysql.ErrTableNotLocked + codeErrNonuniqTable = mysql.ErrNonuniqTable + codeErrAccessDenied = mysql.ErrAccessDenied + codeTableLocked = mysql.ErrTableLocked ) func init() { schemaMySQLErrCodes := map[terror.ErrCode]uint16{ - codeDBDropExists: mysql.ErrDBDropExists, - codeDatabaseNotExists: mysql.ErrBadDB, - codeTableNotExists: mysql.ErrNoSuchTable, - codeColumnNotExists: mysql.ErrBadField, - codeCannotAddForeign: mysql.ErrCannotAddForeign, - codeWrongFkDef: mysql.ErrWrongFkDef, - codeForeignKeyNotExists: mysql.ErrCantDropFieldOrKey, - codeDatabaseExists: mysql.ErrDBCreateExists, - codeTableExists: mysql.ErrTableExists, - codeBadTable: mysql.ErrBadTable, - codeBadUser: mysql.ErrBadUser, - codeColumnExists: mysql.ErrDupFieldName, - codeIndexExists: mysql.ErrDupIndex, - codeMultiplePriKey: mysql.ErrMultiplePriKey, - codeTooManyKeyParts: mysql.ErrTooManyKeyParts, - codeKeyNameDuplicate: mysql.ErrDupKeyName, - codeKeyNotExists: mysql.ErrKeyDoesNotExist, + codeDBDropExists: mysql.ErrDBDropExists, + codeDatabaseNotExists: mysql.ErrBadDB, + codeTableNotExists: mysql.ErrNoSuchTable, + codeColumnNotExists: mysql.ErrBadField, + codeCannotAddForeign: mysql.ErrCannotAddForeign, + codeWrongFkDef: mysql.ErrWrongFkDef, + codeForeignKeyNotExists: mysql.ErrCantDropFieldOrKey, + codeDatabaseExists: mysql.ErrDBCreateExists, + codeTableExists: mysql.ErrTableExists, + codeBadTable: mysql.ErrBadTable, + codeBadUser: mysql.ErrBadUser, + codeColumnExists: mysql.ErrDupFieldName, + codeIndexExists: mysql.ErrDupIndex, + codeMultiplePriKey: mysql.ErrMultiplePriKey, + codeTooManyKeyParts: mysql.ErrTooManyKeyParts, + codeKeyNameDuplicate: mysql.ErrDupKeyName, + codeKeyNotExists: mysql.ErrKeyDoesNotExist, + codeErrTableNotLockedForWrite: mysql.ErrTableNotLockedForWrite, + codeErrTableNotLocked: mysql.ErrTableNotLocked, + codeErrNonuniqTable: mysql.ErrNonuniqTable, + mysql.ErrAccessDenied: mysql.ErrAccessDenied, + codeTableLocked: mysql.ErrTableLocked, } terror.ErrClassToMySQLCodes[terror.ClassSchema] = schemaMySQLErrCodes initInfoSchemaDB() diff --git a/infoschema/perfschema/const.go b/infoschema/perfschema/const.go index db8f9af0f5e51..68dbbff24a080 100644 --- a/infoschema/perfschema/const.go +++ b/infoschema/perfschema/const.go @@ -15,7 +15,8 @@ package perfschema // Performance Schema Name. const ( - Name = "PERFORMANCE_SCHEMA" + Name = "PERFORMANCE_SCHEMA" + LowerName = "performance_schema" ) // perfSchemaTables is a shortcut to involve all table names. diff --git a/lock/lock.go b/lock/lock.go new file mode 100644 index 0000000000000..f96879b2a0a5d --- /dev/null +++ b/lock/lock.go @@ -0,0 +1,126 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package lock + +import ( + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/infoschema/perfschema" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/table" +) + +// Checker uses to check tables lock. +type Checker struct { + ctx sessionctx.Context + is infoschema.InfoSchema +} + +// NewChecker return new lock Checker. +func NewChecker(ctx sessionctx.Context, is infoschema.InfoSchema) *Checker { + return &Checker{ctx: ctx, is: is} +} + +// CheckTableLock uses to check table lock. +func (c *Checker) CheckTableLock(db, table string, privilege mysql.PrivilegeType) error { + if db == "" && table == "" { + return nil + } + // Below database are not support table lock. + if db == infoschema.LowerName || db == perfschema.LowerName || db == mysql.SystemDB { + return nil + } + // check operation on database. + if table == "" { + return c.CheckLockInDB(db, privilege) + } + switch privilege { + case mysql.ShowDBPriv, mysql.AllPrivMask: + // AllPrivMask only used in show create table statement now. + return nil + case mysql.CreatePriv, mysql.CreateViewPriv: + if c.ctx.HasLockedTables() { + // TODO: For `create table t_exists ...` statement, mysql will check out `t_exists` first, but in TiDB now, + // will return below error first. + return infoschema.ErrTableNotLocked.GenWithStackByArgs(table) + } + return nil + } + // TODO: try to remove this get for speed up. + tb, err := c.is.TableByName(model.NewCIStr(db), model.NewCIStr(table)) + // Ignore this error for "drop table if not exists t1" when t1 doesn't exists. + if infoschema.ErrTableNotExists.Equal(err) { + return nil + } + if err != nil { + return err + } + if c.ctx.HasLockedTables() { + if locked, tp := c.ctx.CheckTableLocked(tb.Meta().ID); locked { + if checkLockTpMeetPrivilege(tp, privilege) { + return nil + } + return infoschema.ErrTableNotLockedForWrite.GenWithStackByArgs(tb.Meta().Name) + } + return infoschema.ErrTableNotLocked.GenWithStackByArgs(tb.Meta().Name) + } + + if tb.Meta().Lock == nil { + return nil + } + + if privilege == mysql.SelectPriv { + switch tb.Meta().Lock.Tp { + case model.TableLockRead, model.TableLockWriteLocal: + return nil + } + } + return infoschema.ErrTableLocked.GenWithStackByArgs(tb.Meta().Name.L, tb.Meta().Lock.Tp, tb.Meta().Lock.Sessions[0]) +} + +func checkLockTpMeetPrivilege(tp model.TableLockType, privilege mysql.PrivilegeType) bool { + switch tp { + case model.TableLockWrite, model.TableLockWriteLocal: + return true + case model.TableLockRead: + // ShowDBPriv, AllPrivMask,CreatePriv, CreateViewPriv already checked before. + // The other privilege in read lock was not allowed. + if privilege == mysql.SelectPriv { + return true + } + } + return false +} + +// CheckLockInDB uses to check operation on database. +func (c *Checker) CheckLockInDB(db string, privilege mysql.PrivilegeType) error { + if c.ctx.HasLockedTables() { + switch privilege { + case mysql.CreatePriv, mysql.DropPriv, mysql.AlterPriv: + return table.ErrLockOrActiveTransaction.GenWithStackByArgs() + } + } + if privilege == mysql.CreatePriv { + return nil + } + tables := c.is.SchemaTables(model.NewCIStr(db)) + for _, tbl := range tables { + err := c.CheckTableLock(db, tbl.Meta().Name.L, privilege) + if err != nil { + return err + } + } + return nil +} diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index e0fd6c507c304..d41dadde11710 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -19,8 +19,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/lock" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" @@ -97,6 +99,21 @@ func CheckPrivilege(activeRoles []*auth.RoleIdentity, pm privilege.Manager, vs [ return nil } +// CheckTableLock checks the table lock. +func CheckTableLock(ctx sessionctx.Context, is infoschema.InfoSchema, vs []visitInfo) error { + if !config.TableLockEnabled() { + return nil + } + checker := lock.NewChecker(ctx, is) + for i := range vs { + err := checker.CheckTableLock(vs[i].db, vs[i].table, vs[i].privilege) + if err != nil { + return err + } + } + return nil +} + // DoOptimize optimizes a logical plan to a physical plan. func DoOptimize(flag uint64, logic LogicalPlan) (PhysicalPlan, error) { logic, err := logicalOptimize(flag, logic) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 6ed3eb6c397dc..29c981b1e649b 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1973,6 +1973,8 @@ func (b *PlanBuilder) buildDDL(node ast.DDLNode) (Plan, error) { case *ast.RecoverTableStmt: // Recover table command can only be executed by administrator. b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil) + case *ast.LockTablesStmt, *ast.UnlockTablesStmt: + // TODO: add Lock Table privilege check. } p := &DDL{Statement: node} return p, nil diff --git a/planner/optimize.go b/planner/optimize.go index 820983d505b97..e658899b0efaa 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -50,6 +50,10 @@ func Optimize(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) ( } } + if err := plannercore.CheckTableLock(ctx, is, builder.GetVisitInfo()); err != nil { + return nil, err + } + // Handle the execute statement. if execPlan, ok := p.(*plannercore.Execute); ok { err := execPlan.OptimizePreparedPlan(ctx, is) diff --git a/session/session.go b/session/session.go index 94bc261ef5aca..f8be82fe08fe3 100644 --- a/session/session.go +++ b/session/session.go @@ -185,6 +185,59 @@ type session struct { statsCollector *handle.SessionStatsCollector // ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement; ddlOwnerChecker owner.DDLOwnerChecker + // lockedTables use to record the table locks hold by the session. + lockedTables map[int64]model.TableLockTpInfo +} + +// AddTableLock adds table lock to the session lock map. +func (s *session) AddTableLock(locks []model.TableLockTpInfo) { + for _, l := range locks { + s.lockedTables[l.TableID] = l + } +} + +// ReleaseTableLocks releases table lock in the session lock map. +func (s *session) ReleaseTableLocks(locks []model.TableLockTpInfo) { + for _, l := range locks { + delete(s.lockedTables, l.TableID) + } +} + +// ReleaseTableLockByTableIDs releases table lock in the session lock map by table ID. +func (s *session) ReleaseTableLockByTableIDs(tableIDs []int64) { + for _, tblID := range tableIDs { + delete(s.lockedTables, tblID) + } +} + +// CheckTableLocked checks the table lock. +func (s *session) CheckTableLocked(tblID int64) (bool, model.TableLockType) { + lt, ok := s.lockedTables[tblID] + if !ok { + return false, model.TableLockNone + } + return true, lt.Tp +} + +// GetAllTableLocks gets all table locks table id and db id hold by the session. +func (s *session) GetAllTableLocks() []model.TableLockTpInfo { + lockTpInfo := make([]model.TableLockTpInfo, 0, len(s.lockedTables)) + for _, tl := range s.lockedTables { + lockTpInfo = append(lockTpInfo, tl) + } + return lockTpInfo +} + +// HasLockedTables uses to check whether this session locked any tables. +// If so, the session can only visit the table which locked by self. +func (s *session) HasLockedTables() bool { + b := len(s.lockedTables) > 0 + return b +} + +// ReleaseAllTableLocks releases all table locks hold by the session. +func (s *session) ReleaseAllTableLocks() { + s.lockedTables = make(map[int64]model.TableLockTpInfo) } // DDLOwnerChecker returns s.ddlOwnerChecker. @@ -1281,7 +1334,17 @@ func (s *session) ClearValue(key fmt.Stringer) { } // Close function does some clean work when session end. +// Close should release the table locks which hold by the session. func (s *session) Close() { + // TODO: do clean table locks when session exited without execute Close. + // TODO: do clean table locks when tidb-server was `kill -9`. + if s.HasLockedTables() && config.TableLockEnabled() { + lockedTables := s.GetAllTableLocks() + err := domain.GetDomain(s).DDL().UnlockTables(s, lockedTables) + if err != nil { + logutil.Logger(context.Background()).Error("release table lock failed", zap.Uint64("conn", s.sessionVars.ConnectionID)) + } + } if s.statsCollector != nil { s.statsCollector.Delete() } @@ -1536,6 +1599,7 @@ func createSession(store kv.Storage) (*session, error) { plannercore.PreparedPlanCacheMemoryGuardRatio, plannercore.PreparedPlanCacheMaxMemory.Load()) } s.mu.values = make(map[fmt.Stringer]interface{}) + s.lockedTables = make(map[int64]model.TableLockTpInfo) domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s @@ -1559,6 +1623,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er plannercore.PreparedPlanCacheMemoryGuardRatio, plannercore.PreparedPlanCacheMaxMemory.Load()) } s.mu.values = make(map[fmt.Stringer]interface{}) + s.lockedTables = make(map[int64]model.TableLockTpInfo) domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s diff --git a/sessionctx/context.go b/sessionctx/context.go index a8d71a8f4626c..5b430dc938352 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -17,13 +17,14 @@ import ( "context" "fmt" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" - binlog "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tipb/go-binlog" ) // Context is an interface for transaction and executive args environment. @@ -83,6 +84,20 @@ type Context interface { StmtAddDirtyTableOP(op int, physicalID int64, handle int64, row []types.Datum) // DDLOwnerChecker returns owner.DDLOwnerChecker. DDLOwnerChecker() owner.DDLOwnerChecker + // AddTableLock adds table lock to the session lock map. + AddTableLock([]model.TableLockTpInfo) + // ReleaseTableLocks releases table locks in the session lock map. + ReleaseTableLocks(locks []model.TableLockTpInfo) + // ReleaseTableLockByTableID releases table locks in the session lock map by table ID. + ReleaseTableLockByTableIDs(tableIDs []int64) + // CheckTableLocked checks the table lock. + CheckTableLocked(tblID int64) (bool, model.TableLockType) + // GetAllTableLocks gets all table locks table id and db id hold by the session. + GetAllTableLocks() []model.TableLockTpInfo + // ReleaseAllTableLocks releases all table locks hold by the session. + ReleaseAllTableLocks() + // HasLockedTables uses to check whether this session locked any tables. + HasLockedTables() bool } type basicCtxType int diff --git a/table/table.go b/table/table.go index 2fba027e6df1c..a67ecdf67f5ff 100644 --- a/table/table.go +++ b/table/table.go @@ -79,11 +79,12 @@ var ( ErrTruncateWrongValue = terror.ClassTable.New(codeTruncateWrongValue, "incorrect value") // ErrTruncatedWrongValueForField returns for truncate wrong value for field. ErrTruncatedWrongValueForField = terror.ClassTable.New(codeTruncateWrongValue, mysql.MySQLErrName[mysql.ErrTruncatedWrongValueForField]) - // ErrUnknownPartition returns unknown partition error. ErrUnknownPartition = terror.ClassTable.New(codeUnknownPartition, mysql.MySQLErrName[mysql.ErrUnknownPartition]) // ErrNoPartitionForGivenValue returns table has no partition for value. ErrNoPartitionForGivenValue = terror.ClassTable.New(codeNoPartitionForGivenValue, mysql.MySQLErrName[mysql.ErrNoPartitionForGivenValue]) + // ErrLockOrActiveTransaction returns when execute unsupported statement in a lock session or an active transaction. + ErrLockOrActiveTransaction = terror.ClassTable.New(codeLockOrActiveTransaction, mysql.MySQLErrName[mysql.ErrLockOrActiveTransaction]) ) // RecordIterFunc is used for low-level record iteration. @@ -212,6 +213,7 @@ const ( codeUnknownPartition = mysql.ErrUnknownPartition codeNoPartitionForGivenValue = mysql.ErrNoPartitionForGivenValue + codeLockOrActiveTransaction = mysql.ErrLockOrActiveTransaction ) // Slice is used for table sorting. @@ -234,6 +236,7 @@ func init() { codeTruncateWrongValue: mysql.ErrTruncatedWrongValueForField, codeUnknownPartition: mysql.ErrUnknownPartition, codeNoPartitionForGivenValue: mysql.ErrNoPartitionForGivenValue, + codeLockOrActiveTransaction: mysql.ErrLockOrActiveTransaction, } terror.ErrClassToMySQLCodes[terror.ClassTable] = tableMySQLErrCodes } diff --git a/util/mock/context.go b/util/mock/context.go index c3419792ac857..befd00241b08f 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" @@ -215,6 +216,41 @@ func (c *Context) StmtGetMutation(tableID int64) *binlog.TableMutation { func (c *Context) StmtAddDirtyTableOP(op int, tid int64, handle int64, row []types.Datum) { } +// AddTableLock implements the sessionctx.Context interface. +func (c *Context) AddTableLock(_ []model.TableLockTpInfo) { +} + +// ReleaseTableLocks implements the sessionctx.Context interface. +func (c *Context) ReleaseTableLocks(locks []model.TableLockTpInfo) { +} + +// ReleaseTableLockByTableIDs implements the sessionctx.Context interface. +func (c *Context) ReleaseTableLockByTableIDs(tableIDs []int64) { +} + +// CheckTableLocked implements the sessionctx.Context interface. +func (c *Context) CheckTableLocked(_ int64) (bool, model.TableLockType) { + return false, model.TableLockNone +} + +// GetAllTableLocks implements the sessionctx.Context interface. +func (c *Context) GetAllTableLocks() []model.TableLockTpInfo { + return nil +} + +// ReleaseAllTableLocks implements the sessionctx.Context interface. +func (c *Context) ReleaseAllTableLocks() { +} + +// HasLockedTables implements the sessionctx.Context interface. +func (c *Context) HasLockedTables() bool { + return false +} + +// Close implements the sessionctx.Context interface. +func (c *Context) Close() { +} + // NewContext creates a new mocked sessionctx.Context. func NewContext() *Context { ctx, cancel := context.WithCancel(context.Background())