From f66bd486e7f712ca0754bdb8719fa8526de2b73f Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Fri, 7 Apr 2023 13:02:58 +0800 Subject: [PATCH] This is an automated cherry-pick of #42803 Signed-off-by: ti-chi-bot --- executor/adapter.go | 17 +-- executor/issuetest/BUILD.bazel | 2 + executor/issuetest/executor_issue_test.go | 114 ++++++++++++++++++++ executor/join.go | 9 ++ server/conn.go | 12 ++- sessionctx/stmtctx/stmtctx.go | 13 +++ testkit/mocksessionmanager.go | 14 +-- testkit/testkit.go | 6 +- util/memory/tracker.go | 5 + util/servermemorylimit/BUILD.bazel | 1 + util/servermemorylimit/servermemorylimit.go | 36 ++++++- 11 files changed, 200 insertions(+), 29 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 5e12cce1ccc69..61a10f9807b74 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -734,12 +734,7 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic // done in the `defer` function. If the rs is not nil, the detachment will be done in // `rs.Close` in `handleStmt` if handled && sc != nil && rs == nil { - if sc.MemTracker != nil { - sc.MemTracker.Detach() - } - if sc.DiskTracker != nil { - sc.DiskTracker.Detach() - } + sc.DetachMemDiskTracker() } }() @@ -1417,15 +1412,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) { a.FinishExecuteStmt(txnStartTS, lastErr, false) a.logAudit() - // Detach the Memory and disk tracker for the previous stmtCtx from GlobalMemoryUsageTracker and GlobalDiskUsageTracker - if stmtCtx := a.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil { - if stmtCtx.DiskTracker != nil { - stmtCtx.DiskTracker.Detach() - } - if stmtCtx.MemTracker != nil { - stmtCtx.MemTracker.Detach() - } - } + a.Ctx.GetSessionVars().StmtCtx.DetachMemDiskTracker() } // LogSlowQuery is used to print the slow query in the log files. diff --git a/executor/issuetest/BUILD.bazel b/executor/issuetest/BUILD.bazel index 77bfaf7f11290..1c2955d69327b 100644 --- a/executor/issuetest/BUILD.bazel +++ b/executor/issuetest/BUILD.bazel @@ -15,10 +15,12 @@ go_test( "//parser/auth", "//parser/charset", "//parser/mysql", + "//session", "//sessionctx/variable", "//statistics", "//testkit", "//util", + "//util/memory", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", diff --git a/executor/issuetest/executor_issue_test.go b/executor/issuetest/executor_issue_test.go index f528a54adb8a0..9f388adb34b8e 100644 --- a/executor/issuetest/executor_issue_test.go +++ b/executor/issuetest/executor_issue_test.go @@ -20,6 +20,7 @@ import ( "math/rand" "strings" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" @@ -27,10 +28,12 @@ import ( "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/charset" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/memory" "github.com/stretchr/testify/require" ) @@ -1348,3 +1351,114 @@ func TestIssue40158(t *testing.T) { tk.MustExec("insert into t1 values (1, null);") tk.MustQuery("select * from t1 where c1 is null and _id < 1;").Check(testkit.Rows()) } +<<<<<<< HEAD +======= + +func TestIssue40596(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec(`CREATE TABLE t1 ( + c1 double DEFAULT '1.335088259490289', + c2 set('mj','4s7ht','z','3i','b26','9','cg11','uvzcp','c','ns','fl9') NOT NULL DEFAULT 'mj,z,3i,9,cg11,c', + PRIMARY KEY (c2) /*T![clustered_index] CLUSTERED */, + KEY i1 (c1), + KEY i2 (c1), + KEY i3 (c1) +) ENGINE=InnoDB DEFAULT CHARSET=gbk COLLATE=gbk_chinese_ci;`) + tk.MustExec("INSERT INTO t1 VALUES (634.2783557491367,''),(2000.5041449792013,'4s7ht'),(634.2783557491367,'3i'),(634.2783557491367,'9'),(7803.173688589342,'uvzcp'),(634.2783557491367,'ns'),(634.2783557491367,'fl9');") + tk.MustExec(`CREATE TABLE t2 ( + c3 decimal(56,16) DEFAULT '931359772706767457132645278260455518957.9866038319986886', + c4 set('3bqx','g','6op3','2g','jf','arkd3','y0b','jdy','1g','ff5z','224b') DEFAULT '3bqx,2g,ff5z,224b', + c5 smallint(6) NOT NULL DEFAULT '-25973', + c6 year(4) DEFAULT '2122', + c7 text DEFAULT NULL, + PRIMARY KEY (c5) /*T![clustered_index] CLUSTERED */, + KEY i4 (c6), + KEY i5 (c5) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='' +PARTITION BY HASH (c5) PARTITIONS 4;`) + tk.MustExec("INSERT INTO t2 VALUES (465.0000000000000000,'jdy',-8542,2008,'FgZXe');") + tk.MustExec("set @@sql_mode='';") + tk.MustExec("set tidb_partition_prune_mode=dynamic;") + tk.MustExec("analyze table t1;") + tk.MustExec("analyze table t2;") + + // No nil pointer panic + tk.MustQuery("select /*+ inl_join( t1 , t2 ) */ avg( t2.c5 ) as r0 , repeat( t2.c7 , t2.c5 ) as r1 , locate( t2.c7 , t2.c7 ) as r2 , unhex( t1.c1 ) as r3 from t1 right join t2 on t1.c2 = t2.c5 where not( t2.c5 in ( -7860 ,-13384 ,-12940 ) ) and not( t1.c2 between '4s7ht' and 'mj' );").Check(testkit.Rows(" ")) + // Again, a simpler reproduce. + tk.MustQuery("select /*+ inl_join (t1, t2) */ t2.c5 from t1 right join t2 on t1.c2 = t2.c5 where not( t1.c2 between '4s7ht' and 'mj' );").Check(testkit.Rows()) +} + +func TestIssueRaceWhenBuildingExecutorConcurrently(t *testing.T) { + // issue: https://github.com/pingcap/tidb/issues/41412 + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int, c int, index idx_a(a), index idx_b(b))") + for i := 0; i < 2000; i++ { + v := i * 100 + tk.MustExec("insert into t values(?, ?, ?)", v, v, v) + } + tk.MustQuery("select /*+ inl_merge_join(t1, t2) */ * from t t1 right join t t2 on t1.a = t2.b and t1.c = t2.c") +} + +func TestIssue42298(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + tk.MustExec("alter table t add column b int") + res := tk.MustQuery("admin show ddl job queries limit 268430000") + require.Greater(t, len(res.Rows()), 0, len(res.Rows())) + res = tk.MustQuery("admin show ddl job queries limit 999 offset 268430000") + require.Zero(t, len(res.Rows()), len(res.Rows())) +} + +func TestIssue42662(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.Session().GetSessionVars().ConnectionID = 12345 + tk.Session().GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, -1) + tk.Session().GetSessionVars().MemTracker.SessionID = 12345 + tk.Session().GetSessionVars().MemTracker.IsRootTrackerOfSess = true + + sm := &testkit.MockSessionManager{ + PS: []*util.ProcessInfo{tk.Session().ShowProcess()}, + } + sm.Conn = make(map[uint64]session.Session) + sm.Conn[tk.Session().GetSessionVars().ConnectionID] = tk.Session() + dom.ServerMemoryLimitHandle().SetSessionManager(sm) + go dom.ServerMemoryLimitHandle().Run() + + tk.MustExec("use test") + tk.MustQuery("select connection_id()").Check(testkit.Rows("12345")) + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1 (a int, b int, c int)") + tk.MustExec("create table t2 (a int, b int, c int)") + tk.MustExec("insert into t1 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)") + tk.MustExec("insert into t2 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)") + // set tidb_server_memory_limit to 1.6GB, tidb_server_memory_limit_sess_min_size to 128MB + tk.MustExec("set global tidb_server_memory_limit='1600MB'") + tk.MustExec("set global tidb_server_memory_limit_sess_min_size=128*1024*1024") + tk.MustExec("set global tidb_mem_oom_action = 'cancel'") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/issue42662_1", `return(true)`)) + // tk.Session() should be marked as MemoryTop1Tracker but not killed. + tk.MustQuery("select /*+ hash_join(t1)*/ * from t1 join t2 on t1.a = t2.a and t1.b = t2.b") + + // try to trigger the kill top1 logic + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/servermemorylimit/issue42662_2", `return(true)`)) + time.Sleep(1 * time.Second) + + // no error should be returned + tk.MustQuery("select count(*) from t1") + tk.MustQuery("select count(*) from t1") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/issue42662_1")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/servermemorylimit/issue42662_2")) +} +>>>>>>> d6a5054b723 (*: global memory controller should not kill session whose mem less than limit_sess_min_size (#42803)) diff --git a/executor/join.go b/executor/join.go index 8a762ee6ef851..72499a2f302f4 100644 --- a/executor/join.go +++ b/executor/join.go @@ -310,6 +310,15 @@ func (w *buildWorker) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chun return } }) + failpoint.Inject("issue42662_1", func(val failpoint.Value) { + if val.(bool) { + if w.hashJoinCtx.sessCtx.GetSessionVars().ConnectionID != 0 { + // consume 170MB memory, this sql should be tracked into MemoryTop1Tracker + w.hashJoinCtx.memTracker.Consume(170 * 1024 * 1024) + } + return + } + }) sessVars := w.hashJoinCtx.sessCtx.GetSessionVars() for { if w.hashJoinCtx.finished.Load() { diff --git a/server/conn.go b/server/conn.go index d60c4042874a1..f98405ecb833c 100644 --- a/server/conn.go +++ b/server/conn.go @@ -2121,12 +2121,20 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [ cc.audit(plugin.Starting) rs, err := cc.ctx.ExecuteStmt(ctx, stmt) reg.End() - // The session tracker detachment from global tracker is solved in the `rs.Close` in most cases. - // If the rs is nil, the detachment will be done in the `handleNoDelay`. + // - If rs is not nil, the statement tracker detachment from session tracker + // is done in the `rs.Close` in most cases. + // - If the rs is nil and err is not nil, the detachment will be done in + // the `handleNoDelay`. if rs != nil { defer terror.Call(rs.Close) } if err != nil { + // If error is returned during the planner phase or the executor.Open + // phase, the rs will be nil, and StmtCtx.MemTracker StmtCtx.DiskTracker + // will not be detached. We need to detach them manually. + if sv := cc.ctx.GetSessionVars(); sv != nil && sv.StmtCtx != nil { + sv.StmtCtx.DetachMemDiskTracker() + } return true, err } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index cdd5728d3ac54..1b2fabd4fcee2 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -1114,6 +1114,19 @@ func (sc *StatementContext) UseDynamicPartitionPrune() bool { return sc.UseDynamicPruneMode } +// DetachMemDiskTracker detaches the memory and disk tracker from the sessionTracker. +func (sc *StatementContext) DetachMemDiskTracker() { + if sc == nil { + return + } + if sc.MemTracker != nil { + sc.MemTracker.Detach() + } + if sc.DiskTracker != nil { + sc.DiskTracker.Detach() + } +} + // CopTasksDetails collects some useful information of cop-tasks during execution. type CopTasksDetails struct { NumCopTasks int diff --git a/testkit/mocksessionmanager.go b/testkit/mocksessionmanager.go index 550ff69132d91..8793dff550e76 100644 --- a/testkit/mocksessionmanager.go +++ b/testkit/mocksessionmanager.go @@ -33,7 +33,7 @@ type MockSessionManager struct { SerID uint64 TxnInfo []*txninfo.TxnInfo Dom *domain.Domain - conn map[uint64]session.Session + Conn map[uint64]session.Session mu sync.Mutex } @@ -44,8 +44,8 @@ func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo { if len(msm.TxnInfo) > 0 { return msm.TxnInfo } - rs := make([]*txninfo.TxnInfo, 0, len(msm.conn)) - for _, se := range msm.conn { + rs := make([]*txninfo.TxnInfo, 0, len(msm.Conn)) + for _, se := range msm.Conn { info := se.TxnInfo() if info != nil { rs = append(rs, info) @@ -66,7 +66,7 @@ func (msm *MockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { return ret } msm.mu.Lock() - for connID, pi := range msm.conn { + for connID, pi := range msm.Conn { ret[connID] = pi.ShowProcess() } msm.mu.Unlock() @@ -89,7 +89,7 @@ func (msm *MockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, boo } msm.mu.Lock() defer msm.mu.Unlock() - if sess := msm.conn[id]; sess != nil { + if sess := msm.Conn[id]; sess != nil { return sess.ShowProcess(), true } if msm.Dom != nil { @@ -130,7 +130,7 @@ func (*MockSessionManager) GetInternalSessionStartTSList() []uint64 { // KillNonFlashbackClusterConn implement SessionManager interface. func (msm *MockSessionManager) KillNonFlashbackClusterConn() { - for _, se := range msm.conn { + for _, se := range msm.Conn { processInfo := se.ShowProcess() ddl, ok := processInfo.StmtCtx.GetPlan().(*core.DDL) if !ok { @@ -148,7 +148,7 @@ func (msm *MockSessionManager) KillNonFlashbackClusterConn() { // CheckOldRunningTxn implement SessionManager interface. func (msm *MockSessionManager) CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string) { msm.mu.Lock() - for _, se := range msm.conn { + for _, se := range msm.Conn { session.RemoveLockDDLJobs(se, job2ver, job2ids) } msm.mu.Unlock() diff --git a/testkit/testkit.go b/testkit/testkit.go index db86548ee3bfd..e4461fee82ac6 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -69,10 +69,10 @@ func NewTestKit(t testing.TB, store kv.Storage) *TestKit { mockSm, ok := sm.(*MockSessionManager) if ok { mockSm.mu.Lock() - if mockSm.conn == nil { - mockSm.conn = make(map[uint64]session.Session) + if mockSm.Conn == nil { + mockSm.Conn = make(map[uint64]session.Session) } - mockSm.conn[tk.session.GetSessionVars().ConnectionID] = tk.session + mockSm.Conn[tk.session.GetSessionVars().ConnectionID] = tk.session mockSm.mu.Unlock() } tk.session.SetSessionManager(sm) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index b4ffea612ec53..6ee472599e277 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -302,6 +302,9 @@ func (t *Tracker) AttachTo(parent *Tracker) { // Detach de-attach the tracker child from its parent, then set its parent property as nil func (t *Tracker) Detach() { + if t == nil { + return + } parent := t.getParent() if parent == nil { return @@ -446,6 +449,7 @@ func (t *Tracker) Consume(bs int64) { currentAction = nextAction nextAction = currentAction.GetFallback() } + logutil.BgLogger().Warn("global memory controller, lastAction", zap.Any("action", currentAction)) currentAction.Action(tracker) } } @@ -471,6 +475,7 @@ func (t *Tracker) Consume(bs int64) { } oldTracker = MemUsageTop1Tracker.Load() } + logutil.BgLogger().Error("global memory controller, update the Top1 session", zap.Int64("memUsage", memUsage), zap.Uint64("conn", sessionRootTracker.SessionID), zap.Uint64("limitSessMinSize", limitSessMinSize)) } } diff --git a/util/servermemorylimit/BUILD.bazel b/util/servermemorylimit/BUILD.bazel index 0d2c4d4f3cb59..881894c0e073b 100644 --- a/util/servermemorylimit/BUILD.bazel +++ b/util/servermemorylimit/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//util", "//util/logutil", "//util/memory", + "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], diff --git a/util/servermemorylimit/servermemorylimit.go b/util/servermemorylimit/servermemorylimit.go index 511a86703db17..38a679a4ad755 100644 --- a/util/servermemorylimit/servermemorylimit.go +++ b/util/servermemorylimit/servermemorylimit.go @@ -21,6 +21,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -88,6 +89,15 @@ type sessionToBeKilled struct { lastLogTime time.Time } +func (s *sessionToBeKilled) reset() { + s.isKilling = false + s.sqlStartTime = time.Time{} + s.sessionID = 0 + s.sessionTracker = nil + s.killStartTime = time.Time{} + s.lastLogTime = time.Time{} +} + func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) { if s.isKilling { if info, ok := sm.GetProcessInfo(s.sessionID); ok { @@ -104,7 +114,7 @@ func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) { return } } - s.isKilling = false + s.reset() IsKilling.Store(false) memory.MemUsageTop1Tracker.CompareAndSwap(s.sessionTracker, nil) //nolint: all_revive,revive @@ -115,14 +125,25 @@ func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) { if bt == 0 { return } + failpoint.Inject("issue42662_2", func(val failpoint.Value) { + if val.(bool) { + bt = 1 + } + }) instanceStats := memory.ReadMemStats() if instanceStats.HeapInuse > MemoryMaxUsed.Load() { MemoryMaxUsed.Store(instanceStats.HeapInuse) } + limitSessMinSize := memory.ServerMemoryLimitSessMinSize.Load() if instanceStats.HeapInuse > bt { t := memory.MemUsageTop1Tracker.Load() if t != nil { - if info, ok := sm.GetProcessInfo(t.SessionID); ok { + memUsage := t.BytesConsumed() + // If the memory usage of the top1 session is less than tidb_server_memory_limit_sess_min_size, we do not need to kill it. + if uint64(memUsage) < limitSessMinSize { + memory.MemUsageTop1Tracker.CompareAndSwap(t, nil) + t = nil + } else if info, ok := sm.GetProcessInfo(t.SessionID); ok { logutil.BgLogger().Warn("global memory controller tries to kill the top1 memory consumer", zap.Uint64("connID", info.ID), zap.String("sql digest", info.Digest), @@ -146,6 +167,17 @@ func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) { s.killStartTime = time.Now() } } + // If no one larger than tidb_server_memory_limit_sess_min_size is found, we will not kill any one. + if t == nil { + if s.lastLogTime.IsZero() { + s.lastLogTime = time.Now() + } + if time.Since(s.lastLogTime) < 5*time.Second { + return + } + logutil.BgLogger().Warn("global memory controller tries to kill the top1 memory consumer, but no one larger than tidb_server_memory_limit_sess_min_size is found", zap.Uint64("tidb_server_memory_limit_sess_min_size", limitSessMinSize)) + s.lastLogTime = time.Now() + } } }