Skip to content

Commit

Permalink
Merge branch 'master' into remove-non-dry-sysvar-code
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo authored Sep 26, 2022
2 parents 634180f + 8a171c3 commit 0727a82
Show file tree
Hide file tree
Showing 59 changed files with 9,817 additions and 9,123 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4453,8 +4453,8 @@ def go_deps():
name = "org_uber_go_goleak",
build_file_proto_mode = "disable_global",
importpath = "go.uber.org/goleak",
sum = "h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=",
version = "v1.1.12",
sum = "h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=",
version = "v1.2.0",
)
go_repository(
name = "org_uber_go_multierr",
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/streamhelper/daemon/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "daemon",
srcs = [
"interface.go",
"owner_daemon.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/streamhelper/daemon",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//owner",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "daemon_test",
srcs = ["owner_daemon_test.go"],
flaky = True,
deps = [
":daemon",
"//owner",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
],
)
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/streamhelper",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/daemon",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/version",
Expand Down
1 change: 1 addition & 0 deletions ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//parser/mysql",
"//sessionctx/variable",
"//table",
"//util",
"//util/generic",
"//util/logutil",
"//util/mathutil",
Expand Down
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
deps = [
"//bindinfo",
"//br/pkg/streamhelper",
"//br/pkg/streamhelper/daemon",
"//config",
"//ddl",
"//ddl/placement",
Expand Down Expand Up @@ -51,6 +52,7 @@ go_library(
"//util/execdetails",
"//util/expensivequery",
"//util/logutil",
"//util/servermemorylimit",
"//util/sqlexec",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down
56 changes: 32 additions & 24 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/expensivequery"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/txnkv/transaction"
pd "github.com/tikv/pd/client"
Expand All @@ -87,30 +88,31 @@ func NewMockDomain() *Domain {
// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
store kv.Storage
infoCache *infoschema.InfoCache
privHandle *privileges.Handle
bindHandle *bindinfo.BindHandle
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *infosync.InfoSyncer
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
sysVarCache sysVarCache // replaces GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
wg util.WaitGroupWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
store kv.Storage
infoCache *infoschema.InfoCache
privHandle *privileges.Handle
bindHandle *bindinfo.BindHandle
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *infosync.InfoSyncer
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
sysVarCache sysVarCache // replaces GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
serverMemoryLimitHandle *servermemorylimit.Handle
wg util.WaitGroupWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -885,6 +887,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio

do.SchemaValidator = NewSchemaValidator(ddlLease, do)
do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit)
do.serverMemoryLimitHandle = servermemorylimit.NewServerMemoryLimitHandle(do.exit)
do.sysProcesses = SysProcesses{mu: &sync.RWMutex{}, procMap: make(map[uint64]sessionctx.Context)}
do.initDomainSysVars()
return do
Expand Down Expand Up @@ -1818,6 +1821,11 @@ func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle {
return do.expensiveQueryHandle
}

// ServerMemoryLimitHandle returns the expensive query handle.
func (do *Domain) ServerMemoryLimitHandle() *servermemorylimit.Handle {
return do.serverMemoryLimitHandle
}

const (
privilegeKey = "/tidb/privilege"
sysVarCacheKey = "/tidb/sysvars"
Expand Down
1 change: 0 additions & 1 deletion domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_library(
"//sessionctx/variable",
"//store/helper",
"//tablecodec",
"//types",
"//util",
"//util/codec",
"//util/dbterror",
Expand Down
2 changes: 2 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"errors.go",
"executor.go",
"explain.go",
"foreign_key.go",
"grant.go",
"hash_table.go",
"index_advise.go",
Expand Down Expand Up @@ -270,6 +271,7 @@ go_test(
"explain_test.go",
"explain_unit_test.go",
"explainfor_test.go",
"foreign_key_test.go",
"grant_test.go",
"hash_table_test.go",
"hot_regions_history_table_test.go",
Expand Down
1 change: 1 addition & 0 deletions executor/analyzetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//domain",
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else {
sc.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery)
sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker)
sc.MemTracker.IsRootTrackerOfSess, sc.MemTracker.SessionID = true, vars.ConnectionID
}

sc.InitDiskTracker(memory.LabelForSQLText, -1)
Expand Down
63 changes: 63 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6063,3 +6063,66 @@ func TestTableLockPrivilege(t *testing.T) {
tk2.MustExec("LOCK TABLE test.t WRITE, test2.t2 WRITE")
tk.MustExec("LOCK TABLE test.t WRITE, test2.t2 WRITE")
}

func TestGlobalMemoryControl(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_server_memory_limit = 512 << 20")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

tk1 := testkit.NewTestKit(t, store)
tracker1 := tk1.Session().GetSessionVars().StmtCtx.MemTracker

tk2 := testkit.NewTestKit(t, store)
tracker2 := tk2.Session().GetSessionVars().StmtCtx.MemTracker

tk3 := testkit.NewTestKit(t, store)
tracker3 := tk3.Session().GetSessionVars().StmtCtx.MemTracker

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk1.Session().ShowProcess(), tk2.Session().ShowProcess(), tk3.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tracker1.Consume(100 << 20) // 100 MB
tracker2.Consume(200 << 20) // 200 MB
tracker3.Consume(300 << 20) // 300 MB

test := make([]int, 128<<20) // Keep 1GB HeapInUse
time.Sleep(500 * time.Millisecond) // The check goroutine checks the memory usage every 100ms. The Sleep() make sure that Top1Tracker can be Canceled.

// Kill Top1
require.False(t, tracker1.NeedKill.Load())
require.False(t, tracker2.NeedKill.Load())
require.True(t, tracker3.NeedKill.Load())
require.Equal(t, memory.MemUsageTop1Tracker.Load(), tracker3)
util.WithRecovery( // Next Consume() will panic and cancel the SQL
func() {
tracker3.Consume(1)
}, func(r interface{}) {
require.True(t, strings.Contains(r.(string), "Out Of Memory Quota!"))
})
tracker2.Consume(300 << 20) // Sum 500MB, Not Panic, Waiting t3 cancel finish.
time.Sleep(500 * time.Millisecond)
require.False(t, tracker2.NeedKill.Load())
// Kill Finished
tracker3.Consume(-(300 << 20))
// Simulated SQL is Canceled and the time is updated
sm.PSMu.Lock()
ps := *sm.PS[2]
ps.Time = time.Now()
sm.PS[2] = &ps
sm.PSMu.Unlock()
time.Sleep(500 * time.Millisecond)
// Kill the Next SQL
util.WithRecovery( // Next Consume() will panic and cancel the SQL
func() {
tracker2.Consume(1)
}, func(r interface{}) {
require.True(t, strings.Contains(r.(string), "Out Of Memory Quota!"))
})
require.Equal(t, test[0], 0) // Keep 1GB HeapInUse
}
1 change: 1 addition & 0 deletions executor/memtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_test(
"mem_test.go",
],
flaky = True,
race = "on",
deps = [
"//config",
"//meta/autoid",
Expand Down
43 changes: 17 additions & 26 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/metrics"
Expand Down Expand Up @@ -53,28 +54,26 @@ func TestPrepared(t *testing.T) {
tk.MustExec(`execute stmt_test_1 using @a;`)
tk.MustExec(`prepare stmt_test_2 from 'select 1'`)
// Prepare multiple statement is not allowed.
_, err = tk.Exec(`prepare stmt_test_3 from 'select id from prepare_test where id > ?;select id from prepare_test where id > ?;'`)
require.True(t, executor.ErrPrepareMulti.Equal(err))
tk.MustGetErrCode(`prepare stmt_test_3 from 'select id from prepare_test where id > ?;select id from prepare_test where id > ?;'`, errno.ErrPrepareMulti)

// The variable count does not match.
tk.MustExec(`prepare stmt_test_4 from 'select id from prepare_test where id > ? and id < ?';`)
tk.MustExec(`set @a = 1;`)
_, err = tk.Exec(`execute stmt_test_4 using @a;`)
require.True(t, plannercore.ErrWrongParamCount.Equal(err))
tk.MustGetErrCode(`execute stmt_test_4 using @a;`, errno.ErrWrongParamCount)
// Prepare and deallocate prepared statement immediately.
tk.MustExec(`prepare stmt_test_5 from 'select id from prepare_test where id > ?';`)
tk.MustExec(`deallocate prepare stmt_test_5;`)

// Statement not found.
_, err = tk.Exec("deallocate prepare stmt_test_5")
err = tk.ExecToErr("deallocate prepare stmt_test_5")
require.True(t, plannercore.ErrStmtNotFound.Equal(err))

// incorrect SQLs in prepare. issue #3738, SQL in prepare stmt is parsed in DoPrepare.
_, err = tk.Exec(`prepare p from "delete from t where a = 7 or 1=1/*' and b = 'p'";`)
require.EqualError(t, err, `[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use near '/*' and b = 'p'' at line 1`)
tk.MustGetErrMsg(`prepare p from "delete from t where a = 7 or 1=1/*' and b = 'p'";`,
`[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use near '/*' and b = 'p'' at line 1`)

// The `stmt_test5` should not be found.
_, err = tk.Exec(`set @a = 1; execute stmt_test_5 using @a;`)
err = tk.ExecToErr(`set @a = 1; execute stmt_test_5 using @a;`)
require.True(t, plannercore.ErrStmtNotFound.Equal(err))

// Use parameter marker with argument will run prepared statement.
Expand Down Expand Up @@ -158,8 +157,7 @@ func TestPrepared(t *testing.T) {

// Make schema change.
tk.MustExec("drop table if exists prepare2")
_, err = tk.Exec("create table prepare2 (a int)")
require.NoError(t, err)
tk.MustExec("create table prepare2 (a int)")

// Should success as the changed schema do not affect the prepared statement.
rs, err = tk.Session().ExecutePreparedStmt(ctx, stmtID, expression.Args2Expressions4Test(1))
Expand All @@ -185,8 +183,7 @@ func TestPrepared(t *testing.T) {
tk.MustExec("drop table if exists prepare3")
tk.MustExec("create table prepare3 (a decimal(1))")
tk.MustExec("prepare stmt from 'insert into prepare3 value(123)'")
_, err = tk.Exec("execute stmt")
require.Error(t, err)
tk.MustExecToErr("execute stmt")

_, _, fields, err := tk.Session().PrepareStmt("select a from prepare3")
require.NoError(t, err)
Expand Down Expand Up @@ -230,27 +227,21 @@ func TestPrepared(t *testing.T) {
tk.MustExec("drop table if exists prepare1;")
tk.MustExec("create table prepare1 (a decimal(1))")
tk.MustExec("insert into prepare1 values(1);")
_, err = tk.Exec("prepare stmt FROM @sql1")
require.EqualError(t, err, "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 4 near \"NULL\" ")
tk.MustGetErrMsg("prepare stmt FROM @sql1",
"[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 4 near \"NULL\" ")
tk.MustExec("SET @sql = 'update prepare1 set a=5 where a=?';")
_, err = tk.Exec("prepare stmt FROM @sql")
require.NoError(t, err)
tk.MustExec("prepare stmt FROM @sql")
tk.MustExec("set @var=1;")
_, err = tk.Exec("execute stmt using @var")
require.NoError(t, err)
tk.MustExec("execute stmt using @var")
tk.MustQuery("select a from prepare1;").Check(testkit.Rows("5"))

// issue 19371
tk.MustExec("SET @sql = 'update prepare1 set a=a+1';")
_, err = tk.Exec("prepare stmt FROM @SQL")
require.NoError(t, err)
_, err = tk.Exec("execute stmt")
require.NoError(t, err)
tk.MustExec("prepare stmt FROM @SQL")
tk.MustExec("execute stmt")
tk.MustQuery("select a from prepare1;").Check(testkit.Rows("6"))
_, err = tk.Exec("prepare stmt FROM @Sql")
require.NoError(t, err)
_, err = tk.Exec("execute stmt")
require.NoError(t, err)
tk.MustExec("prepare stmt FROM @Sql")
tk.MustExec("execute stmt")
tk.MustQuery("select a from prepare1;").Check(testkit.Rows("7"))

// Coverage.
Expand Down
2 changes: 1 addition & 1 deletion executor/showtest/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,7 @@ func TestShowBuiltin(t *testing.T) {
res := tk.MustQuery("show builtins;")
require.NotNil(t, res)
rows := res.Rows()
const builtinFuncNum = 281
const builtinFuncNum = 282
require.Equal(t, builtinFuncNum, len(rows))
require.Equal(t, rows[0][0].(string), "abs")
require.Equal(t, rows[builtinFuncNum-1][0].(string), "yearweek")
Expand Down
1 change: 1 addition & 0 deletions executor/simpletest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_test(
"simple_test.go",
],
flaky = True,
race = "on",
shard_count = 30,
deps = [
"//config",
Expand Down
Loading

0 comments on commit 0727a82

Please sign in to comment.