diff --git a/errno/errcode.go b/errno/errcode.go index e2b33faf628aa..42d1d474f78a9 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -826,6 +826,8 @@ const ( ErrInvalidFieldSize = 3013 ErrInvalidArgumentForLogarithm = 3020 ErrAggregateOrderNonAggQuery = 3029 + ErrUserLockWrongName = 3057 + ErrUserLockDeadlock = 3058 ErrIncorrectType = 3064 ErrFieldInOrderNotSelect = 3065 ErrAggregateInOrderNotSelect = 3066 diff --git a/errno/errname.go b/errno/errname.go index 21d51d298c35c..34644172bf567 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1048,6 +1048,8 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrLockExpire: mysql.Message("TTL manager has timed out, pessimistic locks may expire, please commit or rollback this transaction", nil), ErrTableOptionUnionUnsupported: mysql.Message("CREATE/ALTER table with union option is not supported", nil), ErrTableOptionInsertMethodUnsupported: mysql.Message("CREATE/ALTER table with insert method option is not supported", nil), + ErrUserLockDeadlock: mysql.Message("Deadlock found when trying to get user-level lock; try rolling back transaction/releasing locks and restarting lock acquisition.", nil), + ErrUserLockWrongName: mysql.Message("Incorrect user-level lock name '%s'.", nil), ErrBRIEBackupFailed: mysql.Message("Backup failed: %s", nil), ErrBRIERestoreFailed: mysql.Message("Restore failed: %s", nil), diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index e4a61da29c287..95ab9f271b538 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -330,7 +330,7 @@ func (s *infosSchemaClusterTableSuite) TestTableStorageStats() { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - result := 31 + result := 32 s.Require().Len(rows, result) // More tests about the privileges. diff --git a/executor/set_test.go b/executor/set_test.go index b1dce7b824621..89068e58cdd27 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -1336,28 +1336,16 @@ func TestEnableNoopFunctionsVar(t *testing.T) { tk.MustQuery(`select @@global.tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) - err := tk.ExecToErr(`select get_lock('lock1', 2);`) - require.True(t, terror.ErrorEqual(err, expression.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) - err = tk.ExecToErr(`select release_lock('lock1');`) - require.True(t, terror.ErrorEqual(err, expression.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) - // change session var to 1 tk.MustExec(`set tidb_enable_noop_functions=1;`) tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("ON")) tk.MustQuery(`select @@global.tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) - tk.MustQuery(`select get_lock("lock", 10)`).Check(testkit.Rows("1")) - tk.MustQuery(`select release_lock("lock")`).Check(testkit.Rows("1")) // restore to 0 tk.MustExec(`set tidb_enable_noop_functions=0;`) tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) tk.MustQuery(`select @@global.tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) - err = tk.ExecToErr(`select get_lock('lock2', 10);`) - require.True(t, terror.ErrorEqual(err, expression.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) - err = tk.ExecToErr(`select release_lock('lock2');`) - require.True(t, terror.ErrorEqual(err, expression.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) - // set test require.Error(t, tk.ExecToErr(`set tidb_enable_noop_functions='abc'`)) require.Error(t, tk.ExecToErr(`set tidb_enable_noop_functions=11`)) @@ -1368,7 +1356,7 @@ func TestEnableNoopFunctionsVar(t *testing.T) { tk.MustExec(`set tidb_enable_noop_functions=0;`) tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("OFF")) - err = tk.ExecToErr("SET SESSION tx_read_only = 1") + err := tk.ExecToErr("SET SESSION tx_read_only = 1") require.True(t, terror.ErrorEqual(err, variable.ErrFunctionsNoopImpl), fmt.Sprintf("err %v", err)) tk.MustExec("SET SESSION tx_read_only = 0") diff --git a/expression/builtin.go b/expression/builtin.go index 4200653a5a3ef..e6015a7504c31 100644 --- a/expression/builtin.go +++ b/expression/builtin.go @@ -743,8 +743,6 @@ var funcs = map[string]functionClass{ ast.BinToUUID: &binToUUIDFunctionClass{baseFunctionClass{ast.BinToUUID, 1, 2}}, ast.TiDBShard: &tidbShardFunctionClass{baseFunctionClass{ast.TiDBShard, 1, 1}}, - // get_lock() and release_lock() are parsed but do nothing. - // It is used for preventing error in Ruby's activerecord migrations. ast.GetLock: &lockFunctionClass{baseFunctionClass{ast.GetLock, 2, 2}}, ast.ReleaseLock: &releaseLockFunctionClass{baseFunctionClass{ast.ReleaseLock, 1, 1}}, diff --git a/expression/builtin_miscellaneous.go b/expression/builtin_miscellaneous.go index 18a5839ddb24a..a5ddaf9d3d6e5 100644 --- a/expression/builtin_miscellaneous.go +++ b/expression/builtin_miscellaneous.go @@ -20,12 +20,16 @@ import ( "fmt" "math" "net" + "strconv" "strings" "time" "github.com/google/uuid" + "github.com/pingcap/errors" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" "github.com/pingcap/tidb/util/chunk" @@ -65,6 +69,7 @@ var ( _ builtinFunc = &builtinSleepSig{} _ builtinFunc = &builtinLockSig{} _ builtinFunc = &builtinReleaseLockSig{} + _ builtinFunc = &builtinReleaseAllLocksSig{} _ builtinFunc = &builtinDecimalAnyValueSig{} _ builtinFunc = &builtinDurationAnyValueSig{} _ builtinFunc = &builtinIntAnyValueSig{} @@ -186,9 +191,55 @@ func (b *builtinLockSig) Clone() builtinFunc { // evalInt evals a builtinLockSig. // See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock -// The lock function will do nothing. -// Warning: get_lock() function is parsed but ignored. -func (b *builtinLockSig) evalInt(_ chunk.Row) (int64, bool, error) { +func (b *builtinLockSig) evalInt(row chunk.Row) (int64, bool, error) { + lockName, isNull, err := b.args[0].EvalString(b.ctx, row) + if err != nil { + return 0, isNull, err + } + // Validate that lockName is NOT NULL or empty string + if isNull { + return 0, false, errUserLockWrongName.GenWithStackByArgs("NULL") + } + if lockName == "" || len(lockName) > 64 { + return 0, false, errUserLockWrongName.GenWithStackByArgs(lockName) + } + maxTimeout := int64(variable.GetSysVar(variable.InnodbLockWaitTimeout).MaxValue) + timeout, isNullTimeout, err := b.args[1].EvalInt(b.ctx, row) + if err != nil { + return 0, false, err + } + if isNullTimeout { + timeout = maxTimeout // Observed behavior in MySQL + } + // A timeout less than zero is expected to be treated as unlimited. + // Because of our implementation being based on pessimistic locks, + // We can't have a timeout greater than innodb_lock_wait_timeout. + // So users are aware, we also attach a warning. + if timeout < 0 || timeout > maxTimeout { + err := errTruncatedWrongValue.GenWithStackByArgs("get_lock", strconv.FormatInt(timeout, 10)) + b.ctx.GetSessionVars().StmtCtx.AppendWarning(err) + timeout = maxTimeout + } + // Lock names are case insensitive. Because we can't rely on collations + // being enabled on the internal table, we have to lower it. + lockName = strings.ToLower(lockName) + if len(lockName) > 64 { + return 0, false, errIncorrectArgs.GenWithStackByArgs("get_lock") + } + err = b.ctx.GetAdvisoryLock(lockName, timeout) + if err != nil { + switch errors.Cause(err).(*terror.Error).Code() { + case mysql.ErrLockWaitTimeout: + return 0, false, nil // Another user has the lock + case mysql.ErrLockDeadlock: + // Currently this code is not reachable because each Advisory Lock + // Uses a separate session. Deadlock detection does not work across + // independent sessions. + return 0, false, errUserLockDeadlock + default: + return 0, false, err + } + } return 1, false, nil } @@ -221,10 +272,29 @@ func (b *builtinReleaseLockSig) Clone() builtinFunc { // evalInt evals a builtinReleaseLockSig. // See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_release-lock -// The release lock function will do nothing. -// Warning: release_lock() function is parsed but ignored. -func (b *builtinReleaseLockSig) evalInt(_ chunk.Row) (int64, bool, error) { - return 1, false, nil +func (b *builtinReleaseLockSig) evalInt(row chunk.Row) (int64, bool, error) { + lockName, isNull, err := b.args[0].EvalString(b.ctx, row) + if err != nil { + return 0, isNull, err + } + // Validate that lockName is NOT NULL or empty string + if isNull { + return 0, false, errUserLockWrongName.GenWithStackByArgs("NULL") + } + if lockName == "" || len(lockName) > 64 { + return 0, false, errUserLockWrongName.GenWithStackByArgs(lockName) + } + // Lock names are case insensitive. Because we can't rely on collations + // being enabled on the internal table, we have to lower it. + lockName = strings.ToLower(lockName) + if len(lockName) > 64 { + return 0, false, errIncorrectArgs.GenWithStackByArgs("release_lock") + } + released := int64(0) + if b.ctx.ReleaseAdvisoryLock(lockName) { + released = 1 + } + return released, false, nil } type anyValueFunctionClass struct { @@ -1070,7 +1140,33 @@ type releaseAllLocksFunctionClass struct { } func (c *releaseAllLocksFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { - return nil, errFunctionNotExists.GenWithStackByArgs("FUNCTION", "RELEASE_ALL_LOCKS") + if err := c.verifyArgs(args); err != nil { + return nil, err + } + bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETInt) + if err != nil { + return nil, err + } + sig := &builtinReleaseAllLocksSig{bf} + bf.tp.SetFlen(1) + return sig, nil +} + +type builtinReleaseAllLocksSig struct { + baseBuiltinFunc +} + +func (b *builtinReleaseAllLocksSig) Clone() builtinFunc { + newSig := &builtinReleaseAllLocksSig{} + newSig.cloneFrom(&b.baseBuiltinFunc) + return newSig +} + +// evalInt evals a builtinReleaseAllLocksSig. +// See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_release-all-locks +func (b *builtinReleaseAllLocksSig) evalInt(_ chunk.Row) (int64, bool, error) { + count := b.ctx.ReleaseAllAdvisoryLocks() + return int64(count), false, nil } type uuidFunctionClass struct { diff --git a/expression/builtin_miscellaneous_vec.go b/expression/builtin_miscellaneous_vec.go index 31a617cd31f10..cebdc56447a8b 100644 --- a/expression/builtin_miscellaneous_vec.go +++ b/expression/builtin_miscellaneous_vec.go @@ -227,23 +227,6 @@ func (b *builtinNameConstDurationSig) vecEvalDuration(input *chunk.Chunk, result return b.args[1].VecEvalDuration(b.ctx, input, result) } -func (b *builtinLockSig) vectorized() bool { - return true -} - -// See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock -// The lock function will do nothing. -// Warning: get_lock() function is parsed but ignored. -func (b *builtinLockSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { - n := input.NumRows() - result.ResizeInt64(n, false) - i64s := result.Int64s() - for i := range i64s { - i64s[i] = 1 - } - return nil -} - func (b *builtinDurationAnyValueSig) vectorized() bool { return true } @@ -633,23 +616,6 @@ func (b *builtinNameConstRealSig) vecEvalReal(input *chunk.Chunk, result *chunk. return b.args[1].VecEvalReal(b.ctx, input, result) } -func (b *builtinReleaseLockSig) vectorized() bool { - return true -} - -// See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_release-lock -// The release lock function will do nothing. -// Warning: release_lock() function is parsed but ignored. -func (b *builtinReleaseLockSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { - n := input.NumRows() - result.ResizeInt64(n, false) - i64s := result.Int64s() - for i := range i64s { - i64s[i] = 1 - } - return nil -} - func (b *builtinVitessHashSig) vectorized() bool { return true } diff --git a/expression/builtin_test.go b/expression/builtin_test.go index bdabe437df5e1..2e63c1f9f51d8 100644 --- a/expression/builtin_test.go +++ b/expression/builtin_test.go @@ -139,14 +139,14 @@ func TestIsNullFunc(t *testing.T) { func TestLock(t *testing.T) { ctx := createContext(t) lock := funcs[ast.GetLock] - f, err := lock.getFunction(ctx, datumsToConstants(types.MakeDatums(nil, 1))) + f, err := lock.getFunction(ctx, datumsToConstants(types.MakeDatums("mylock", 1))) require.NoError(t, err) v, err := evalBuiltinFunc(f, chunk.Row{}) require.NoError(t, err) require.Equal(t, int64(1), v.GetInt64()) releaseLock := funcs[ast.ReleaseLock] - f, err = releaseLock.getFunction(ctx, datumsToConstants(types.MakeDatums(1))) + f, err = releaseLock.getFunction(ctx, datumsToConstants(types.MakeDatums("mylock"))) require.NoError(t, err) v, err = evalBuiltinFunc(f, chunk.Row{}) require.NoError(t, err) diff --git a/expression/errors.go b/expression/errors.go index b30918b35bf5e..8a64c09e9787d 100644 --- a/expression/errors.go +++ b/expression/errors.go @@ -53,6 +53,8 @@ var ( errWrongValueForType = dbterror.ClassExpression.NewStd(mysql.ErrWrongValueForType) errUnknown = dbterror.ClassExpression.NewStd(mysql.ErrUnknown) errSpecificAccessDenied = dbterror.ClassExpression.NewStd(mysql.ErrSpecificAccessDenied) + errUserLockDeadlock = dbterror.ClassExpression.NewStd(mysql.ErrUserLockDeadlock) + errUserLockWrongName = dbterror.ClassExpression.NewStd(mysql.ErrUserLockWrongName) // Sequence usage privilege check. errSequenceAccessDenied = dbterror.ClassExpression.NewStd(mysql.ErrTableaccessDenied) diff --git a/expression/function_traits.go b/expression/function_traits.go index ed0188b13f36a..51a024e2da1dd 100644 --- a/expression/function_traits.go +++ b/expression/function_traits.go @@ -226,13 +226,9 @@ var mutableEffectsFunctions = map[string]struct{}{ ast.AnyValue: {}, } -// some functions like "get_lock" and "release_lock" currently do NOT have -// right implementations, but may have noop ones(like with any inputs, always return 1) +// some functions do NOT have right implementations, but may have noop ones(like with any inputs, always return 1) // if apps really need these "funcs" to run, we offer sys var(tidb_enable_noop_functions) to enable noop usage -var noopFuncs = map[string]struct{}{ - ast.GetLock: {}, - ast.ReleaseLock: {}, -} +var noopFuncs = map[string]struct{}{} // booleanFunctions stores boolean functions var booleanFunctions = map[string]struct{}{ diff --git a/expression/integration_serial_test.go b/expression/integration_serial_test.go index 2d0fb11ff28ef..fcebac007c9c8 100644 --- a/expression/integration_serial_test.go +++ b/expression/integration_serial_test.go @@ -4181,8 +4181,6 @@ func TestNoopFunctions(t *testing.T) { "SELECT * FROM t1 LOCK IN SHARE MODE", "SELECT * FROM t1 GROUP BY a DESC", "SELECT * FROM t1 GROUP BY a ASC", - "SELECT GET_LOCK('acdc', 10)", - "SELECT RELEASE_LOCK('acdc')", } for _, stmt := range stmts { diff --git a/expression/integration_test.go b/expression/integration_test.go index d1e2615508272..e3f2f0c628274 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -249,6 +249,136 @@ func TestBuiltinFuncJsonPretty(t *testing.T) { require.Equal(t, errors.ErrCode(mysql.ErrInvalidJSONText), terr.Code()) } +func TestGetLock(t *testing.T) { + ctx := context.Background() + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + // No timeout specified + err := tk.ExecToErr("SELECT get_lock('testlock')") + require.Error(t, err) + terr := errors.Cause(err).(*terror.Error) + require.Equal(t, errors.ErrCode(mysql.ErrWrongParamcountToNativeFct), terr.Code()) + + // 0 timeout = immediate + // Negative timeout = convert to max value + tk.MustQuery("SELECT get_lock('testlock1', 0)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT get_lock('testlock2', -10)").Check(testkit.Rows("1")) + // show warnings: + tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect get_lock value: '-10'")) + tk.MustQuery("SELECT release_lock('testlock1'), release_lock('testlock2')").Check(testkit.Rows("1 1")) + tk.MustQuery("SELECT release_all_locks()").Check(testkit.Rows("0")) + + // GetLock/ReleaseLock with NULL name or '' name + rs, _ := tk.Exec("SELECT get_lock('', 10)") + _, err = session.GetRows4Test(ctx, tk.Session(), rs) + require.Error(t, err) + terr = errors.Cause(err).(*terror.Error) + require.Equal(t, errors.ErrCode(errno.ErrUserLockWrongName), terr.Code()) + + rs, _ = tk.Exec("SELECT get_lock(NULL, 10)") + _, err = session.GetRows4Test(ctx, tk.Session(), rs) + require.Error(t, err) + terr = errors.Cause(err).(*terror.Error) + require.Equal(t, errors.ErrCode(errno.ErrUserLockWrongName), terr.Code()) + + rs, _ = tk.Exec("SELECT release_lock('')") + _, err = session.GetRows4Test(ctx, tk.Session(), rs) + require.Error(t, err) + terr = errors.Cause(err).(*terror.Error) + require.Equal(t, errors.ErrCode(errno.ErrUserLockWrongName), terr.Code()) + + rs, _ = tk.Exec("SELECT release_lock(NULL)") + _, err = session.GetRows4Test(ctx, tk.Session(), rs) + require.Error(t, err) + terr = errors.Cause(err).(*terror.Error) + require.Equal(t, errors.ErrCode(errno.ErrUserLockWrongName), terr.Code()) + + // NULL timeout is fine (= unlimited) + tk.MustQuery("SELECT get_lock('aaa', NULL)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock('aaa')").Check(testkit.Rows("1")) + + // GetLock in CAPS, release lock in different case. + tk.MustQuery("SELECT get_lock('aBC', -10)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock('AbC')").Check(testkit.Rows("1")) + + // Release unacquired LOCK and previously released lock + tk.MustQuery("SELECT release_lock('randombytes')").Check(testkit.Rows("0")) + tk.MustQuery("SELECT release_lock('abc')").Check(testkit.Rows("0")) + + // GetLock with integer name, 64, character name. + tk.MustQuery("SELECT get_lock(1234, 10)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT get_lock(REPEAT('a', 64), 10)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock(1234), release_lock(REPEAT('aa', 32))").Check(testkit.Rows("1 1")) + tk.MustQuery("SELECT release_all_locks()").Check(testkit.Rows("0")) + + // 65 character name + rs, _ = tk.Exec("SELECT get_lock(REPEAT('a', 65), 10)") + _, err = session.GetRows4Test(ctx, tk.Session(), rs) + require.Error(t, err) + terr = errors.Cause(err).(*terror.Error) + require.Equal(t, errors.ErrCode(errno.ErrUserLockWrongName), terr.Code()) + + rs, _ = tk.Exec("SELECT release_lock(REPEAT('a', 65))") + _, err = session.GetRows4Test(ctx, tk.Session(), rs) + require.Error(t, err) + terr = errors.Cause(err).(*terror.Error) + require.Equal(t, errors.ErrCode(errno.ErrUserLockWrongName), terr.Code()) + + // Floating point timeout. + tk.MustQuery("SELECT get_lock('nnn', 1.2)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock('nnn')").Check(testkit.Rows("1")) + + // Multiple locks acquired in one statement. + // Release all locks and one not held lock + tk.MustQuery("SELECT get_lock('a1', 1.2), get_lock('a2', 1.2), get_lock('a3', 1.2), get_lock('a4', 1.2)").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("SELECT release_lock('a1'),release_lock('a2'),release_lock('a3'), release_lock('random'), release_lock('a4')").Check(testkit.Rows("1 1 1 0 1")) + tk.MustQuery("SELECT release_all_locks()").Check(testkit.Rows("0")) + + // Multiple locks acquired, released all at once. + tk.MustQuery("SELECT get_lock('a1', 1.2), get_lock('a2', 1.2), get_lock('a3', 1.2), get_lock('a4', 1.2)").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("SELECT release_all_locks()").Check(testkit.Rows("4")) + tk.MustQuery("SELECT release_lock('a1')").Check(testkit.Rows("0")) // lock is free + + // Multiple locks acquired, reference count increased, released all at once. + tk.MustQuery("SELECT get_lock('a1', 1.2), get_lock('a2', 1.2), get_lock('a3', 1.2), get_lock('a4', 1.2)").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("SELECT get_lock('a1', 1.2), get_lock('a2', 1.2), get_lock('a5', 1.2)").Check(testkit.Rows("1 1 1")) + tk.MustQuery("SELECT release_all_locks()").Check(testkit.Rows("7")) // 7 not 5, because the it includes ref count + tk.MustQuery("SELECT release_lock('a1')").Check(testkit.Rows("0")) // lock is free + tk.MustQuery("SELECT release_lock('a5')").Check(testkit.Rows("0")) // lock is free + tk.MustQuery("SELECT release_all_locks()").Check(testkit.Rows("0")) + + // Test common cases: + // Get a lock, release it immediately. + // Try to release it again (its released) + tk.MustQuery("SELECT get_lock('mygloballock', 1)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock('mygloballock')").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock('mygloballock')").Check(testkit.Rows("0")) + + // Get a lock, acquire it again, release it twice. + tk.MustQuery("SELECT get_lock('mygloballock', 1)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT get_lock('mygloballock', 1)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock('mygloballock')").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock('mygloballock')").Check(testkit.Rows("1")) + tk.MustQuery("SELECT release_lock('mygloballock')").Check(testkit.Rows("0")) + + // Test someone else has the lock with short timeout. + tk2 := testkit.NewTestKit(t, store) + tk2.MustQuery("SELECT get_lock('mygloballock', 1)").Check(testkit.Rows("1")) + tk.MustQuery("SELECT get_lock('mygloballock', 1)").Check(testkit.Rows("0")) // someone else has the lock + tk.MustQuery("SELECT release_lock('mygloballock')").Check(testkit.Rows("0")) // never had the lock + // try again + tk.MustQuery("SELECT get_lock('mygloballock', 0)").Check(testkit.Rows("0")) // someone else has the lock + tk.MustQuery("SELECT release_lock('mygloballock')").Check(testkit.Rows("0")) // never had the lock + // release it + tk2.MustQuery("SELECT release_lock('mygloballock')").Check(testkit.Rows("1")) // works + + // Confirm all locks are released + tk2.MustQuery("SELECT release_all_locks()").Check(testkit.Rows("0")) + tk.MustQuery("SELECT release_all_locks()").Check(testkit.Rows("0")) +} + func TestMiscellaneousBuiltin(t *testing.T) { ctx := context.Background() store, clean := testkit.CreateMockStore(t) @@ -320,7 +450,6 @@ func TestMiscellaneousBuiltin(t *testing.T) { tk.MustQuery("select a,any_value(b),sum(c) from t1 group by a order by a;").Check(testkit.Rows("1 10 0", "2 30 0")) // for locks - tk.MustExec(`set tidb_enable_noop_functions=1;`) result := tk.MustQuery(`SELECT GET_LOCK('test_lock1', 10);`) result.Check(testkit.Rows("1")) result = tk.MustQuery(`SELECT GET_LOCK('test_lock2', 10);`) @@ -330,6 +459,10 @@ func TestMiscellaneousBuiltin(t *testing.T) { result.Check(testkit.Rows("1")) result = tk.MustQuery(`SELECT RELEASE_LOCK('test_lock1');`) result.Check(testkit.Rows("1")) + result = tk.MustQuery(`SELECT RELEASE_LOCK('test_lock3');`) // not acquired + result.Check(testkit.Rows("0")) + tk.MustQuery(`SELECT RELEASE_ALL_LOCKS()`).Check(testkit.Rows("0")) // none acquired + } func TestConvertToBit(t *testing.T) { diff --git a/parser/ast/functions.go b/parser/ast/functions.go index 9779f7ba446b6..ae61e06682656 100644 --- a/parser/ast/functions.go +++ b/parser/ast/functions.go @@ -290,10 +290,8 @@ const ( BinToUUID = "bin_to_uuid" VitessHash = "vitess_hash" TiDBShard = "tidb_shard" - // get_lock() and release_lock() is parsed but do nothing. - // It is used for preventing error in Ruby's activerecord migrations. - GetLock = "get_lock" - ReleaseLock = "release_lock" + GetLock = "get_lock" + ReleaseLock = "release_lock" // encryption and compression functions AesDecrypt = "aes_decrypt" diff --git a/session/advisory_locks.go b/session/advisory_locks.go new file mode 100644 index 0000000000000..aca6914de2029 --- /dev/null +++ b/session/advisory_locks.go @@ -0,0 +1,88 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package session + +import ( + "context" + + "github.com/pingcap/tidb/parser/terror" +) + +// Advisory Locks are the locks in GET_LOCK() and RELEASE_LOCK(). +// We implement them in TiDB by using an INSERT into mysql.advisory_locks +// inside of a pessimistic transaction that is never committed. +// +// Each advisory lock requires its own session, since the pessimistic locks +// can be rolled back in any order (transactions can't release random locks +// like this even if savepoints was supported). +// +// We use referenceCount to track the number of references to the lock in the session. +// A little known feature of advisory locks is that you can call GET_LOCK +// multiple times on the same lock, and it will only be released when +// the reference count reaches zero. + +type advisoryLock struct { + ctx context.Context + session *session + referenceCount int +} + +// IncrReferences increments the reference count for the advisory lock. +func (a *advisoryLock) IncrReferences() { + a.referenceCount++ +} + +// DecrReferences decrements the reference count for the advisory lock. +func (a *advisoryLock) DecrReferences() { + a.referenceCount-- +} + +// References returns the current reference count for the advisory lock. +func (a *advisoryLock) ReferenceCount() int { + return a.referenceCount +} + +// Close releases the advisory lock, which includes +// rolling back the transaction and closing the session. +func (a *advisoryLock) Close() { + _, err := a.session.ExecuteInternal(a.ctx, "ROLLBACK") + terror.Log(err) + a.session.Close() +} + +// GetLock acquires a new advisory lock using a pessimistic transaction. +// The timeout is implemented by using the pessimistic lock timeout. +// We will never COMMIT the transaction, but the err indicates +// if the lock was successfully acquired. +func (a *advisoryLock) GetLock(lockName string, timeout int64) error { + _, err := a.session.ExecuteInternal(a.ctx, "SET innodb_lock_wait_timeout = %?", timeout) + if err != nil { + return err + } + _, err = a.session.ExecuteInternal(a.ctx, "BEGIN PESSIMISTIC") + if err != nil { + return err + } + _, err = a.session.ExecuteInternal(a.ctx, "INSERT INTO mysql.advisory_locks (lock_name) VALUES (%?)", lockName) + if err != nil { + // We couldn't acquire the LOCK so we close the session cleanly + // and return the error to the caller. The caller will need to interpret + // this differently if it is lock wait timeout or a deadlock. + a.Close() + return err + } + a.referenceCount++ + return nil +} diff --git a/session/bootstrap.go b/session/bootstrap.go index 9c8101826a674..c420a6c7fdf7a 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -416,6 +416,10 @@ const ( PRIMARY KEY (id), KEY (update_time) );` + // CreateAdvisoryLocks stores the advisory locks (get_lock, release_lock). + CreateAdvisoryLocks = `CREATE TABLE IF NOT EXISTS mysql.advisory_locks ( + lock_name VARCHAR(64) NOT NULL PRIMARY KEY + );` ) // bootstrap initiates system DB for a store. @@ -609,11 +613,13 @@ const ( version87 = 87 // version88 fixes the issue https://github.com/pingcap/tidb/issues/33650. version88 = 88 + // version89 adds the tables mysql.advisory_locks + version89 = 89 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version88 +var currentBootstrapVersion int64 = version89 var ( bootstrapVersion = []func(Session, int64){ @@ -705,6 +711,7 @@ var ( upgradeToVer86, upgradeToVer87, upgradeToVer88, + upgradeToVer89, } ) @@ -1808,6 +1815,13 @@ func upgradeToVer88(s Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.user CHANGE `Repl_client_priv` `Repl_client_priv` ENUM('N','Y') NOT NULL DEFAULT 'N' AFTER `Repl_slave_priv`") } +func upgradeToVer89(s Session, ver int64) { + if ver >= version89 { + return + } + doReentrantDDL(s, CreateAdvisoryLocks) +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1900,6 +1914,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateStatsMetaHistory) // Create analyze_jobs table. mustExecute(s, CreateAnalyzeJobs) + // Create advisory_locks table. + mustExecute(s, CreateAdvisoryLocks) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/session/session.go b/session/session.go index d7662bec8dbd6..5d584d9257f4a 100644 --- a/session/session.go +++ b/session/session.go @@ -244,6 +244,9 @@ type session struct { // all the local data in each session, and finally report them to the remote // regularly. stmtStats *stmtstats.StatementStats + + // Contains a list of sessions used to collect advisory locks. + advisoryLocks map[string]*advisoryLock } var parserPool = &sync.Pool{New: func() interface{} { return parser.New() }} @@ -1592,6 +1595,62 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...inter return stmts[0], nil } +// GetAdvisoryLock acquires an advisory lock of lockName. +// Note that a lock can be acquired multiple times by the same session, +// in which case we increment a reference count. +// Each lock needs to be held in a unique session because +// we need to be able to ROLLBACK in any arbitrary order +// in order to release the locks. +func (s *session) GetAdvisoryLock(lockName string, timeout int64) error { + if lock, ok := s.advisoryLocks[lockName]; ok { + lock.IncrReferences() + return nil + } + sess, err := createSession(s.GetStore()) + if err != nil { + return err + } + lock := &advisoryLock{session: sess, ctx: context.TODO()} + err = lock.GetLock(lockName, timeout) + if err != nil { + return err + } + s.advisoryLocks[lockName] = lock + return nil +} + +// ReleaseAdvisoryLock releases an advisory locks held by the session. +// It returns FALSE if no lock by this name was held (by this session), +// and TRUE if a lock was held and "released". +// Note that the lock is not actually released if there are multiple +// references to the same lockName by the session, instead the reference +// count is decremented. +func (s *session) ReleaseAdvisoryLock(lockName string) (released bool) { + if lock, ok := s.advisoryLocks[lockName]; ok { + lock.DecrReferences() + if lock.ReferenceCount() <= 0 { + lock.Close() + delete(s.advisoryLocks, lockName) + } + return true + } + return false +} + +// ReleaseAllAdvisoryLocks releases all advisory locks held by the session +// and returns a count of the locks that were released. +// The count is based on unique locks held, so multiple references +// to the same lock do not need to be accounted for. +func (s *session) ReleaseAllAdvisoryLocks() int { + var count int + for lockName, lock := range s.advisoryLocks { + lock.Close() + count += lock.ReferenceCount() + delete(s.advisoryLocks, lockName) + } + return count +} + // ParseWithParams4Test wrapper (s *session) ParseWithParams for test func ParseWithParams4Test(ctx context.Context, s Session, sql string, args ...interface{}) (ast.StmtNode, error) { @@ -2544,6 +2603,7 @@ func (s *session) Close() { logutil.BgLogger().Error("release table lock failed", zap.Uint64("conn", s.sessionVars.ConnectionID)) } } + s.ReleaseAllAdvisoryLocks() if s.statsCollector != nil { s.statsCollector.Delete() } @@ -2933,6 +2993,8 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { } s.mu.values = make(map[fmt.Stringer]interface{}) s.lockedTables = make(map[int64]model.TableLockTpInfo) + s.advisoryLocks = make(map[string]*advisoryLock) + domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s diff --git a/sessionctx/context.go b/sessionctx/context.go index dbc55c2635a66..a188eac69081e 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -150,6 +150,12 @@ type Context interface { GetStmtStats() *stmtstats.StatementStats // ShowProcess returns ProcessInfo running in current Context ShowProcess() *util.ProcessInfo + // GetAdvisoryLock acquires an advisory lock (aka GET_LOCK()). + GetAdvisoryLock(string, int64) error + // ReleaseAdvisoryLock releases an advisory lock (aka RELEASE_LOCK()). + ReleaseAdvisoryLock(string) bool + // ReleaseAllAdvisoryLocks releases all advisory locks that this session holds. + ReleaseAllAdvisoryLocks() int } type basicCtxType int diff --git a/util/mock/context.go b/util/mock/context.go index ea6a1109309f9..05fb15b2f933f 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -358,6 +358,21 @@ func (c *Context) GetStmtStats() *stmtstats.StatementStats { return nil } +// GetAdvisoryLock acquires an advisory lock +func (c *Context) GetAdvisoryLock(lockName string, timeout int64) error { + return nil +} + +// ReleaseAdvisoryLock releases an advisory lock +func (c *Context) ReleaseAdvisoryLock(lockName string) bool { + return true +} + +// ReleaseAllAdvisoryLocks releases all advisory locks +func (c *Context) ReleaseAllAdvisoryLocks() int { + return 0 +} + // Close implements the sessionctx.Context interface. func (c *Context) Close() { }