diff --git a/errno/errcode.go b/errno/errcode.go index e061f0c25a827..aed20ef194135 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -825,6 +825,7 @@ const ( ErrInvalidFieldSize = 3013 ErrInvalidArgumentForLogarithm = 3020 ErrAggregateOrderNonAggQuery = 3029 + ErrUserLockDeadlock = 3058 ErrIncorrectType = 3064 ErrFieldInOrderNotSelect = 3065 ErrAggregateInOrderNotSelect = 3066 diff --git a/errno/errname.go b/errno/errname.go index 503578504b44c..0bc913b33baea 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1046,6 +1046,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrLockExpire: mysql.Message("TTL manager has timed out, pessimistic locks may expire, please commit or rollback this transaction", nil), ErrTableOptionUnionUnsupported: mysql.Message("CREATE/ALTER table with union option is not supported", nil), ErrTableOptionInsertMethodUnsupported: mysql.Message("CREATE/ALTER table with insert method option is not supported", nil), + ErrUserLockDeadlock: mysql.Message("Deadlock found when trying to get user-level lock; try rolling back transaction/releasing locks and restarting lock acquisition.", nil), ErrBRIEBackupFailed: mysql.Message("Backup failed: %s", nil), ErrBRIERestoreFailed: mysql.Message("Restore failed: %s", nil), diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index 3ca92396aa309..d39e82a8410a0 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -319,7 +319,7 @@ func (s *infosSchemaClusterTableSuite) TestTableStorageStats() { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - s.Require().Len(rows, 30) + s.Require().Len(rows, 31) // More tests about the privileges. tk.MustExec("create user 'testuser'@'localhost'") @@ -345,12 +345,12 @@ func (s *infosSchemaClusterTableSuite) TestTableStorageStats() { Hostname: "localhost", }, nil, nil)) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("30")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("31")) s.Require().True(tk.Session().Auth(&auth.UserIdentity{ Username: "testuser3", Hostname: "localhost", }, nil, nil)) - tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("30")) + tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("31")) } diff --git a/executor/set_test.go b/executor/set_test.go index 42cd1340f09c0..be8291d5ff708 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -1292,28 +1292,16 @@ func TestEnableNoopFunctionsVar(t *testing.T) { tk.MustQuery(`select @@global.tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) - err := tk.ExecToErr(`select get_lock('lock1', 2);`) - require.True(t, terror.ErrorEqual(err, expression.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) - err = tk.ExecToErr(`select release_lock('lock1');`) - require.True(t, terror.ErrorEqual(err, expression.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) - // change session var to 1 tk.MustExec(`set tidb_enable_noop_functions=1;`) tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("ON")) tk.MustQuery(`select @@global.tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) - tk.MustQuery(`select get_lock("lock", 10)`).Check(testkit.Rows("1")) - tk.MustQuery(`select release_lock("lock")`).Check(testkit.Rows("1")) // restore to 0 tk.MustExec(`set tidb_enable_noop_functions=0;`) tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) tk.MustQuery(`select @@global.tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) - err = tk.ExecToErr(`select get_lock('lock2', 10);`) - require.True(t, terror.ErrorEqual(err, expression.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) - err = tk.ExecToErr(`select release_lock('lock2');`) - require.True(t, terror.ErrorEqual(err, expression.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) - // set test require.Error(t, tk.ExecToErr(`set tidb_enable_noop_functions='abc'`)) require.Error(t, tk.ExecToErr(`set tidb_enable_noop_functions=11`)) @@ -1324,7 +1312,7 @@ func TestEnableNoopFunctionsVar(t *testing.T) { tk.MustExec(`set tidb_enable_noop_functions=0;`) tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) - err = tk.ExecToErr("SET SESSION tx_read_only = 1") + err := tk.ExecToErr("SET SESSION tx_read_only = 1") require.True(t, terror.ErrorEqual(err, variable.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) tk.MustExec("SET SESSION tx_read_only = 0") diff --git a/expression/builtin.go b/expression/builtin.go index b693d21145223..b3a9ff1ba3fc0 100644 --- a/expression/builtin.go +++ b/expression/builtin.go @@ -783,8 +783,6 @@ var funcs = map[string]functionClass{ ast.BinToUUID: &binToUUIDFunctionClass{baseFunctionClass{ast.BinToUUID, 1, 2}}, ast.TiDBShard: &tidbShardFunctionClass{baseFunctionClass{ast.TiDBShard, 1, 1}}, - // get_lock() and release_lock() are parsed but do nothing. - // It is used for preventing error in Ruby's activerecord migrations. ast.GetLock: &lockFunctionClass{baseFunctionClass{ast.GetLock, 2, 2}}, ast.ReleaseLock: &releaseLockFunctionClass{baseFunctionClass{ast.ReleaseLock, 1, 1}}, diff --git a/expression/builtin_miscellaneous.go b/expression/builtin_miscellaneous.go index 8963132c736cf..59b385bd95533 100644 --- a/expression/builtin_miscellaneous.go +++ b/expression/builtin_miscellaneous.go @@ -25,7 +25,9 @@ import ( "github.com/google/uuid" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" "github.com/pingcap/tidb/util/chunk" @@ -65,6 +67,7 @@ var ( _ builtinFunc = &builtinSleepSig{} _ builtinFunc = &builtinLockSig{} _ builtinFunc = &builtinReleaseLockSig{} + _ builtinFunc = &builtinReleaseAllLocksSig{} _ builtinFunc = &builtinDecimalAnyValueSig{} _ builtinFunc = &builtinDurationAnyValueSig{} _ builtinFunc = &builtinIntAnyValueSig{} @@ -186,9 +189,43 @@ func (b *builtinLockSig) Clone() builtinFunc { // evalInt evals a builtinLockSig. // See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock -// The lock function will do nothing. -// Warning: get_lock() function is parsed but ignored. -func (b *builtinLockSig) evalInt(_ chunk.Row) (int64, bool, error) { +func (b *builtinLockSig) evalInt(row chunk.Row) (int64, bool, error) { + lockName, isNull, err := b.args[0].EvalString(b.ctx, row) + if err != nil { + return 0, isNull, err + } + timeout, isNullTimeout, err := b.args[1].EvalInt(b.ctx, row) + if err != nil { + return 0, isNull, err + } + // Validate that neither argument is NULL and there is a lockName + if isNull || isNullTimeout || lockName == "" { + return 0, false, errIncorrectArgs.GenWithStackByArgs("get_lock") + } + // A timeout less than zero is expected to be treated as unlimited. + // Because of our implementation being based on pessimistic locks, + // We can't have a timeout greater than innodb_lock_wait_timeout. + maxTimeout := int64(variable.GetSysVar("innodb_lock_wait_timeout").MaxValue) + if timeout < 0 || timeout > maxTimeout { + timeout = maxTimeout + } + // Lock names are case insensitive. Because we can't rely on collations + // being enabled on the internal table, we have to lower it. + lockName = strings.ToLower(lockName) + err = b.ctx.GetAdvisoryLock(lockName, timeout) + if err != nil { + switch err.(*terror.Error).Code() { + case mysql.ErrLockWaitTimeout: + return 0, false, nil // Another user has the lock + case mysql.ErrLockDeadlock: + // TODO: currently this code is not reachable because each Advisory Lock + // Uses a separate session. Deadlock detection does not work across + // independent sessions. + return 0, false, errUserLockDeadlock + default: + return 0, false, err + } + } return 1, false, nil } @@ -221,10 +258,19 @@ func (b *builtinReleaseLockSig) Clone() builtinFunc { // evalInt evals a builtinReleaseLockSig. // See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_release-lock -// The release lock function will do nothing. -// Warning: release_lock() function is parsed but ignored. -func (b *builtinReleaseLockSig) evalInt(_ chunk.Row) (int64, bool, error) { - return 1, false, nil +func (b *builtinReleaseLockSig) evalInt(row chunk.Row) (int64, bool, error) { + lockName, isNull, err := b.args[0].EvalString(b.ctx, row) + if err != nil { + return 0, isNull, err + } + // Lock names are case insensitive. Because we can't rely on collations + // being enabled on the internal table, we have to lower it. + lockName = strings.ToLower(lockName) + released := int64(0) + if b.ctx.ReleaseAdvisoryLock(lockName) { + released = 1 + } + return released, false, nil } type anyValueFunctionClass struct { @@ -1062,7 +1108,33 @@ type releaseAllLocksFunctionClass struct { } func (c *releaseAllLocksFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { - return nil, errFunctionNotExists.GenWithStackByArgs("FUNCTION", "RELEASE_ALL_LOCKS") + if err := c.verifyArgs(args); err != nil { + return nil, err + } + bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETInt) + if err != nil { + return nil, err + } + sig := &builtinReleaseAllLocksSig{bf} + bf.tp.Flen = 1 + return sig, nil +} + +type builtinReleaseAllLocksSig struct { + baseBuiltinFunc +} + +func (b *builtinReleaseAllLocksSig) Clone() builtinFunc { + newSig := &builtinReleaseAllLocksSig{} + newSig.cloneFrom(&b.baseBuiltinFunc) + return newSig +} + +// evalInt evals a builtinReleaseAllLocksSig. +// See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_release-all-locks +func (b *builtinReleaseAllLocksSig) evalInt(_ chunk.Row) (int64, bool, error) { + count := b.ctx.ReleaseAllAdvisoryLocks() + return int64(count), false, nil } type uuidFunctionClass struct { diff --git a/expression/builtin_miscellaneous_vec.go b/expression/builtin_miscellaneous_vec.go index 31a617cd31f10..d03c5d8cfe686 100644 --- a/expression/builtin_miscellaneous_vec.go +++ b/expression/builtin_miscellaneous_vec.go @@ -228,20 +228,7 @@ func (b *builtinNameConstDurationSig) vecEvalDuration(input *chunk.Chunk, result } func (b *builtinLockSig) vectorized() bool { - return true -} - -// See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock -// The lock function will do nothing. -// Warning: get_lock() function is parsed but ignored. -func (b *builtinLockSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { - n := input.NumRows() - result.ResizeInt64(n, false) - i64s := result.Int64s() - for i := range i64s { - i64s[i] = 1 - } - return nil + return false } func (b *builtinDurationAnyValueSig) vectorized() bool { @@ -634,20 +621,7 @@ func (b *builtinNameConstRealSig) vecEvalReal(input *chunk.Chunk, result *chunk. } func (b *builtinReleaseLockSig) vectorized() bool { - return true -} - -// See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_release-lock -// The release lock function will do nothing. -// Warning: release_lock() function is parsed but ignored. -func (b *builtinReleaseLockSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { - n := input.NumRows() - result.ResizeInt64(n, false) - i64s := result.Int64s() - for i := range i64s { - i64s[i] = 1 - } - return nil + return false } func (b *builtinVitessHashSig) vectorized() bool { diff --git a/expression/builtin_test.go b/expression/builtin_test.go index d9a77a41bb8b2..6e55accb44bc4 100644 --- a/expression/builtin_test.go +++ b/expression/builtin_test.go @@ -139,14 +139,14 @@ func TestIsNullFunc(t *testing.T) { func TestLock(t *testing.T) { ctx := createContext(t) lock := funcs[ast.GetLock] - f, err := lock.getFunction(ctx, datumsToConstants(types.MakeDatums(nil, 1))) + f, err := lock.getFunction(ctx, datumsToConstants(types.MakeDatums("mylock", 1))) require.NoError(t, err) v, err := evalBuiltinFunc(f, chunk.Row{}) require.NoError(t, err) require.Equal(t, int64(1), v.GetInt64()) releaseLock := funcs[ast.ReleaseLock] - f, err = releaseLock.getFunction(ctx, datumsToConstants(types.MakeDatums(1))) + f, err = releaseLock.getFunction(ctx, datumsToConstants(types.MakeDatums("mylock"))) require.NoError(t, err) v, err = evalBuiltinFunc(f, chunk.Row{}) require.NoError(t, err) diff --git a/expression/errors.go b/expression/errors.go index b30918b35bf5e..a4b3f71f7b319 100644 --- a/expression/errors.go +++ b/expression/errors.go @@ -53,6 +53,7 @@ var ( errWrongValueForType = dbterror.ClassExpression.NewStd(mysql.ErrWrongValueForType) errUnknown = dbterror.ClassExpression.NewStd(mysql.ErrUnknown) errSpecificAccessDenied = dbterror.ClassExpression.NewStd(mysql.ErrSpecificAccessDenied) + errUserLockDeadlock = dbterror.ClassExpression.NewStd(mysql.ErrUserLockDeadlock) // Sequence usage privilege check. errSequenceAccessDenied = dbterror.ClassExpression.NewStd(mysql.ErrTableaccessDenied) diff --git a/expression/function_traits.go b/expression/function_traits.go index ed0188b13f36a..51a024e2da1dd 100644 --- a/expression/function_traits.go +++ b/expression/function_traits.go @@ -226,13 +226,9 @@ var mutableEffectsFunctions = map[string]struct{}{ ast.AnyValue: {}, } -// some functions like "get_lock" and "release_lock" currently do NOT have -// right implementations, but may have noop ones(like with any inputs, always return 1) +// some functions do NOT have right implementations, but may have noop ones(like with any inputs, always return 1) // if apps really need these "funcs" to run, we offer sys var(tidb_enable_noop_functions) to enable noop usage -var noopFuncs = map[string]struct{}{ - ast.GetLock: {}, - ast.ReleaseLock: {}, -} +var noopFuncs = map[string]struct{}{} // booleanFunctions stores boolean functions var booleanFunctions = map[string]struct{}{ diff --git a/expression/integration_serial_test.go b/expression/integration_serial_test.go index b1eaf4b2596f2..c37223fa96e00 100644 --- a/expression/integration_serial_test.go +++ b/expression/integration_serial_test.go @@ -4183,8 +4183,6 @@ func TestNoopFunctions(t *testing.T) { "SELECT * FROM t1 LOCK IN SHARE MODE", "SELECT * FROM t1 GROUP BY a DESC", "SELECT * FROM t1 GROUP BY a ASC", - "SELECT GET_LOCK('acdc', 10)", - "SELECT RELEASE_LOCK('acdc')", } for _, stmt := range stmts { diff --git a/expression/integration_test.go b/expression/integration_test.go index 6370346d7658a..b7ad32fe44ff5 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -320,7 +320,6 @@ func TestMiscellaneousBuiltin(t *testing.T) { tk.MustQuery("select a,any_value(b),sum(c) from t1 group by a order by a;").Check(testkit.Rows("1 10 0", "2 30 0")) // for locks - tk.MustExec(`set tidb_enable_noop_functions=1;`) result := tk.MustQuery(`SELECT GET_LOCK('test_lock1', 10);`) result.Check(testkit.Rows("1")) result = tk.MustQuery(`SELECT GET_LOCK('test_lock2', 10);`) @@ -330,6 +329,8 @@ func TestMiscellaneousBuiltin(t *testing.T) { result.Check(testkit.Rows("1")) result = tk.MustQuery(`SELECT RELEASE_LOCK('test_lock1');`) result.Check(testkit.Rows("1")) + result = tk.MustQuery(`SELECT RELEASE_LOCK('test_lock3');`) // not acquired + result.Check(testkit.Rows("0")) } func TestConvertToBit(t *testing.T) { diff --git a/parser/ast/functions.go b/parser/ast/functions.go index 9779f7ba446b6..ae61e06682656 100644 --- a/parser/ast/functions.go +++ b/parser/ast/functions.go @@ -290,10 +290,8 @@ const ( BinToUUID = "bin_to_uuid" VitessHash = "vitess_hash" TiDBShard = "tidb_shard" - // get_lock() and release_lock() is parsed but do nothing. - // It is used for preventing error in Ruby's activerecord migrations. - GetLock = "get_lock" - ReleaseLock = "release_lock" + GetLock = "get_lock" + ReleaseLock = "release_lock" // encryption and compression functions AesDecrypt = "aes_decrypt" diff --git a/session/advisory_locks.go b/session/advisory_locks.go new file mode 100644 index 0000000000000..145ee75015035 --- /dev/null +++ b/session/advisory_locks.go @@ -0,0 +1,87 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package session + +import ( + "context" + + "github.com/pingcap/tidb/parser/terror" +) + +// Advisory Locks are the locks in GET_LOCK() and RELEASE_LOCK(). +// We implement them in TiDB by using an INSERT into mysql.advisory_locks +// inside of a pessimistic transaction that is never committed. +// +// Each advisory lock requires its own session, since the pessimistic locks +// can be rolled back in any order (transactions can't release random locks +// like this even if savepoints was supported). +// +// We use referenceCount to track the number of references to the lock in the session. +// A little known feature of advisory locks is that you can call GET_LOCK +// multiple times on the same lock, and it will only be released when +// the reference count reaches zero. + +type advisoryLock struct { + session *session + referenceCount int +} + +// IncrReferences increments the reference count for the advisory lock. +func (a *advisoryLock) IncrReferences() { + a.referenceCount++ +} + +// DecrReferences decrements the reference count for the advisory lock. +func (a *advisoryLock) DecrReferences() { + a.referenceCount-- +} + +// References returns the current reference count for the advisory lock. +func (a *advisoryLock) ReferenceCount() int { + return a.referenceCount +} + +// Close releases the advisory lock, which includes +// rolling back the transaction and closing the session. +func (a *advisoryLock) Close() { + _, err := a.session.ExecuteInternal(context.Background(), "ROLLBACK") + terror.Log(err) + a.session.Close() +} + +// GetLock acquires a new advisory lock using a pessimistic transaction. +// The timeout is implemented by using the pessimistic lock timeout. +// We will never COMMIT the transaction, but the err indicates +// if the lock was successfully acquired. +func (a *advisoryLock) GetLock(lockName string, timeout int64) error { + _, err := a.session.ExecuteInternal(context.Background(), "SET innodb_lock_wait_timeout = %?", timeout) + if err != nil { + return err + } + _, err = a.session.ExecuteInternal(context.Background(), "BEGIN PESSIMISTIC") + if err != nil { + return err + } + _, err = a.session.ExecuteInternal(context.Background(), "INSERT INTO mysql.advisory_locks (lock_name) VALUES (%?)", lockName) + if err != nil { + // We couldn't acquire the LOCK so we close the session cleanly + // and return the error to the caller. The caller will need to interpret + // this differently if it is lock wait timeout or a deadlock. + a.Close() + return err + } + a.referenceCount++ + return nil +} diff --git a/session/bootstrap.go b/session/bootstrap.go index 50fae971872d3..d54837809c1c0 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -396,6 +396,11 @@ const ( UNIQUE KEY table_version (table_id, version), KEY table_create_time (table_id, create_time) );` + + // CreateAdvisoryLocks stores the advisory locks (get_lock, release_lock). + CreateAdvisoryLocks = `CREATE TABLE IF NOT EXISTS mysql.advisory_locks ( + lock_name VARCHAR(255) NOT NULL PRIMARY KEY + );` ) // bootstrap initiates system DB for a store. @@ -585,11 +590,13 @@ const ( version85 = 85 // version86 changes global variable `tidb_enable_top_sql` value from false to true. version86 = 86 + // version87 adds the tables mysql.advisory_locks + version87 = 87 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version86 +var currentBootstrapVersion int64 = version87 var ( bootstrapVersion = []func(Session, int64){ @@ -679,6 +686,7 @@ var ( upgradeToVer84, upgradeToVer85, upgradeToVer86, + upgradeToVer87, } ) @@ -1763,6 +1771,14 @@ func upgradeToVer86(s Session, ver int64) { mustExecute(s, "set @@global.tidb_enable_top_sql = 1") } +func upgradeToVer87(s Session, ver int64) { + if ver >= version87 { + return + } + // Create Advisory Locks + doReentrantDDL(s, CreateAdvisoryLocks) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1853,6 +1869,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateStatsHistory) // Create stats_meta_history table. mustExecute(s, CreateStatsMetaHistory) + // Create advisory_locks table. + mustExecute(s, CreateAdvisoryLocks) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/session/session.go b/session/session.go index e0236ef93675d..86988e072076e 100644 --- a/session/session.go +++ b/session/session.go @@ -245,6 +245,9 @@ type session struct { // all the local data in each session, and finally report them to the remote // regularly. stmtStats *stmtstats.StatementStats + + // Contains a list of sessions used to collect advisory locks. + advisoryLocks map[string]*advisoryLock } var parserPool = &sync.Pool{New: func() interface{} { return parser.New() }} @@ -1634,6 +1637,66 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...inter return stmts[0], nil } +func (s *session) hasAdvisoryLocks() bool { + return len(s.advisoryLocks) != 0 +} + +// GetAdvisoryLock acquires an advisory lock of lockName. +// Note that a lock can be acquired multiple times by the same session, +// in which case we increment a reference count. +// Each lock needs to be held in a unique session because +// we need to be able to ROLLBACK in any arbitrary order +// in order to release the locks. +func (s *session) GetAdvisoryLock(lockName string, timeout int64) error { + if lock, ok := s.advisoryLocks[lockName]; ok { + lock.IncrReferences() + return nil + } + sess, err := createSession(s.GetStore()) + if err != nil { + return err + } + lock := &advisoryLock{session: sess} + err = lock.GetLock(lockName, timeout) + if err != nil { + return err + } + s.advisoryLocks[lockName] = lock + return nil +} + +// ReleaseAdvisoryLock releases an advisory locks held by the session. +// It returns FALSE if no lock by this name was held (by this session), +// and TRUE if a lock was held and "released". +// Note that the lock is not actually released if there are multiple +// references to the same lockName by the session, instead the reference +// count is decremented. +func (s *session) ReleaseAdvisoryLock(lockName string) (released bool) { + if lock, ok := s.advisoryLocks[lockName]; ok { + lock.DecrReferences() + if lock.ReferenceCount() <= 0 { + lock.Close() + delete(s.advisoryLocks, lockName) + } + return true + } + return false +} + +// ReleaseAllAdvisoryLocks releases all advisory locks held by the session +// and returns a count of the locks that were released. +// The count is based on unique locks held, so multiple references +// to the same lock do not need to be accounted for. +func (s *session) ReleaseAllAdvisoryLocks() int { + var count int + for lockName, lock := range s.advisoryLocks { + lock.Close() + delete(s.advisoryLocks, lockName) + count++ + } + return count +} + // ExecRestrictedStmt implements RestrictedSQLExecutor interface. func (s *session) ExecRestrictedStmt(ctx context.Context, stmtNode ast.StmtNode, opts ...sqlexec.OptionFuncAlias) ( []chunk.Row, []*ast.ResultField, error) { @@ -2551,6 +2614,9 @@ func (s *session) Close() { logutil.BgLogger().Error("release table lock failed", zap.Uint64("conn", s.sessionVars.ConnectionID)) } } + if s.hasAdvisoryLocks() { + s.ReleaseAllAdvisoryLocks() + } if s.statsCollector != nil { s.statsCollector.Delete() } @@ -2962,6 +3028,8 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { } s.mu.values = make(map[fmt.Stringer]interface{}) s.lockedTables = make(map[int64]model.TableLockTpInfo) + s.advisoryLocks = make(map[string]*advisoryLock) + domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s diff --git a/sessionctx/context.go b/sessionctx/context.go index f7394a5e85e7e..dd63ee928099c 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -148,6 +148,12 @@ type Context interface { GetStmtStats() *stmtstats.StatementStats // ShowProcess returns ProcessInfo running in current Context ShowProcess() *util.ProcessInfo + // GetAdvisoryLock acquires an advisory lock (aka GET_LOCK()). + GetAdvisoryLock(string, int64) error + // ReleaseAdvisoryLock releases an advisory lock (aka RELEASE_LOCK()). + ReleaseAdvisoryLock(string) bool + // ReleaseAllAdvisoryLocks releases all advisory locks that this session holds. + ReleaseAllAdvisoryLocks() int } type basicCtxType int diff --git a/util/mock/context.go b/util/mock/context.go index ea6a1109309f9..05fb15b2f933f 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -358,6 +358,21 @@ func (c *Context) GetStmtStats() *stmtstats.StatementStats { return nil } +// GetAdvisoryLock acquires an advisory lock +func (c *Context) GetAdvisoryLock(lockName string, timeout int64) error { + return nil +} + +// ReleaseAdvisoryLock releases an advisory lock +func (c *Context) ReleaseAdvisoryLock(lockName string) bool { + return true +} + +// ReleaseAllAdvisoryLocks releases all advisory locks +func (c *Context) ReleaseAllAdvisoryLocks() int { + return 0 +} + // Close implements the sessionctx.Context interface. func (c *Context) Close() { }