From cfdd74f0538ae5ee1f9e244469591562cae5e258 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 16 Jan 2023 10:55:48 +0800 Subject: [PATCH] *: prevent cursor read from being cancelled by GC (#39950) (#39989) close pingcap/tidb#39447 --- bindinfo/session_handle_test.go | 4 + ddl/db_test.go | 21 ++++-- domain/infosync/info.go | 16 +--- executor/executor_pkg_test.go | 4 + executor/infoschema_cluster_table_test.go | 4 + executor/prepared_test.go | 4 + executor/seqtest/prepared_test.go | 4 + infoschema/tables_test.go | 4 + server/conn_stmt.go | 6 ++ server/conn_stmt_test.go | 90 +++++++++++++++++++++++ server/driver.go | 2 + server/driver_tidb.go | 45 ++++++++++++ server/driver_tidb_test.go | 25 +++++++ server/mock_conn.go | 3 + server/server.go | 34 +++++++++ session/session.go | 1 + sessionctx/variable/session.go | 53 +++++++++++++ sessionctx/variable/session_test.go | 54 ++++++++++++++ testkit/mocksessionmanager.go | 12 +++ util/processinfo.go | 28 +++++++ 20 files changed, 394 insertions(+), 20 deletions(-) diff --git a/bindinfo/session_handle_test.go b/bindinfo/session_handle_test.go index 402f9574d911e..26412efdfb21d 100644 --- a/bindinfo/session_handle_test.go +++ b/bindinfo/session_handle_test.go @@ -414,6 +414,10 @@ func (msm *mockSessionManager) GetInternalSessionStartTSList() []uint64 { return nil } +func (msm *mockSessionManager) GetMinStartTS(lowerBound uint64) uint64 { + return 0 +} + func TestIssue19836(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() diff --git a/ddl/db_test.go b/ddl/db_test.go index 4bb9cd175abc2..f5bcd7e1cb66c 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1415,8 +1415,10 @@ func TestLogAndShowSlowLog(t *testing.T) { } func TestReportingMinStartTimestamp(t *testing.T) { - _, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) defer clean() + tk := testkit.NewTestKit(t, store) + se := tk.Session() infoSyncer := dom.InfoSyncer() sm := &testkit.MockSessionManager{ @@ -1432,12 +1434,19 @@ func TestReportingMinStartTimestamp(t *testing.T) { validTS := oracle.GoTimeToLowerLimitStartTS(now.Add(time.Minute), tikv.MaxTxnTimeUse) lowerLimit := oracle.GoTimeToLowerLimitStartTS(now, tikv.MaxTxnTimeUse) sm.PS = []*util.ProcessInfo{ - {CurTxnStartTS: 0}, - {CurTxnStartTS: math.MaxUint64}, - {CurTxnStartTS: lowerLimit}, - {CurTxnStartTS: validTS}, + {CurTxnStartTS: 0, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, + {CurTxnStartTS: math.MaxUint64, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, + {CurTxnStartTS: lowerLimit, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, + {CurTxnStartTS: validTS, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, } - infoSyncer.SetSessionManager(sm) + infoSyncer.ReportMinStartTS(dom.Store()) + require.Equal(t, validTS, infoSyncer.GetMinStartTS()) + + unhold := se.GetSessionVars().ProtectedTSList.HoldTS(validTS - 1) + infoSyncer.ReportMinStartTS(dom.Store()) + require.Equal(t, validTS-1, infoSyncer.GetMinStartTS()) + + unhold() infoSyncer.ReportMinStartTS(dom.Store()) require.Equal(t, validTS, infoSyncer.GetMinStartTS()) } diff --git a/domain/infosync/info.go b/domain/infosync/info.go index cb2b844e440d6..232eb78688b86 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -629,8 +629,6 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { if sm == nil { return } - pl := sm.ShowProcessList() - innerSessionStartTSList := sm.GetInternalSessionStartTSList() // Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC. currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) @@ -644,18 +642,8 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { minStartTS := oracle.GoTimeToTS(now) logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("initial minStartTS", minStartTS), zap.Uint64("StartTSLowerLimit", startTSLowerLimit)) - for _, info := range pl { - if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS { - minStartTS = info.CurTxnStartTS - } - } - - for _, innerTS := range innerSessionStartTSList { - logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("Internal Session Transaction StartTS", innerTS)) - kv.PrintLongTimeInternalTxn(now, innerTS, false) - if innerTS > startTSLowerLimit && innerTS < minStartTS { - minStartTS = innerTS - } + if ts := sm.GetMinStartTS(startTSLowerLimit); ts > startTSLowerLimit && ts < minStartTS { + minStartTS = ts } is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS) diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index f8212dd6ff1ba..a0594e2835c38 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -106,6 +106,10 @@ func (msm *mockSessionManager) GetInternalSessionStartTSList() []uint64 { return nil } +func (msm *mockSessionManager) GetMinStartTS(lowerBound uint64) uint64 { + return 0 +} + func TestShowProcessList(t *testing.T) { // Compose schema. names := []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"} diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index c790919a17004..ba7161b1975cd 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -212,6 +212,10 @@ func (sm *mockSessionManager) SetServerID(serverID uint64) { sm.serverID = serverID } +func (sm *mockSessionManager) GetMinStartTS(lowerBound uint64) uint64 { + return 0 +} + type mockStore struct { helper.Storage host string diff --git a/executor/prepared_test.go b/executor/prepared_test.go index 9fbdaf3b4c50f..33001c1db3979 100644 --- a/executor/prepared_test.go +++ b/executor/prepared_test.go @@ -130,6 +130,10 @@ func (sm *mockSessionManager2) GetInternalSessionStartTSList() []uint64 { return nil } +func (sm *mockSessionManager2) GetMinStartTS(lowerBound uint64) uint64 { + return 0 +} + func TestPreparedStmtWithHint(t *testing.T) { // see https://github.com/pingcap/tidb/issues/18535 store, dom, clean := testkit.CreateMockStoreAndDomain(t) diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index 912bcf0014619..b8f9c860dbbaa 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -898,6 +898,10 @@ func (msm *mockSessionManager1) GetInternalSessionStartTSList() []uint64 { return nil } +func (msm *mockSessionManager1) GetMinStartTS(lowerBound uint64) uint64 { + return 0 +} + func TestPreparedIssue17419(t *testing.T) { store, dom, clean := testkit.CreateMockStoreAndDomain(t) defer clean() diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 962a8db8e0d13..1b6fb3aaf5f24 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -326,6 +326,10 @@ func (sm *mockSessionManager) GetInternalSessionStartTSList() []uint64 { return nil } +func (sm *mockSessionManager) GetMinStartTS(lowerBound uint64) uint64 { + return 0 +} + func TestSomeTables(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() diff --git a/server/conn_stmt.go b/server/conn_stmt.go index c59cba37693d3..3360feaccc331 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -245,6 +245,12 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm if useCursor { cc.initResultEncoder(ctx) defer cc.rsEncoder.clean() + // fix https://github.com/pingcap/tidb/issues/39447. we need to hold the start-ts here because the process info + // will be set to sleep after fetch returned. + if pi := cc.ctx.ShowProcess(); pi != nil && pi.ProtectedTSList != nil && pi.CurTxnStartTS > 0 { + unhold := pi.HoldTS(pi.CurTxnStartTS) + rs = &rsWithHooks{ResultSet: rs, onClosed: unhold} + } stmt.StoreResultSet(rs) err = cc.writeColumnInfo(rs.Columns(), mysql.ServerStatusCursorExists) if err != nil { diff --git a/server/conn_stmt_test.go b/server/conn_stmt_test.go index a4ff4c2ee7070..d4cda379138bf 100644 --- a/server/conn_stmt_test.go +++ b/server/conn_stmt_test.go @@ -15,11 +15,14 @@ package server import ( + "context" + "encoding/binary" "testing" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" "github.com/stretchr/testify/require" ) @@ -248,3 +251,90 @@ func TestParseStmtFetchCmd(t *testing.T) { require.Equal(t, tc.err, err) } } + +func TestCursorReadHoldTS(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + srv := CreateMockServer(t, store) + srv.SetDomain(dom) + defer srv.Close() + + appendUint32 := binary.LittleEndian.AppendUint32 + ctx := context.Background() + c := CreateMockConn(t, store, srv) + tk := testkit.NewTestKitWithSession(t, store, c.Context().Session) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key)") + tk.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (7), (8)") + tk.MustQuery("select count(*) from t").Check(testkit.Rows("8")) + + stmt, _, _, err := c.Context().Prepare("select * from t") + require.NoError(t, err) + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + + // should hold ts after executing stmt with cursor + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), + 0x1, 0x1, 0x0, 0x0, 0x0, + ))) + ts := tk.Session().ShowProcess().GetMinStartTS(0) + require.Positive(t, ts) + // should unhold ts when result set exhausted + require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) + require.Equal(t, ts, tk.Session().ShowProcess().GetMinStartTS(0)) + require.Equal(t, ts, srv.GetMinStartTS(0)) + require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) + require.Equal(t, ts, tk.Session().ShowProcess().GetMinStartTS(0)) + require.Equal(t, ts, srv.GetMinStartTS(0)) + require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + + // should hold ts after executing stmt with cursor + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), + 0x1, 0x1, 0x0, 0x0, 0x0, + ))) + require.Positive(t, tk.Session().ShowProcess().GetMinStartTS(0)) + // should unhold ts when stmt reset + require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtReset}, uint32(stmt.ID())))) + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + + // should hold ts after executing stmt with cursor + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), + 0x1, 0x1, 0x0, 0x0, 0x0, + ))) + require.Positive(t, tk.Session().ShowProcess().GetMinStartTS(0)) + // should unhold ts when stmt closed + require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtClose}, uint32(stmt.ID())))) + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + + // create another 2 stmts and execute them + stmt1, _, _, err := c.Context().Prepare("select * from t") + require.NoError(t, err) + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt1.ID())), + 0x1, 0x1, 0x0, 0x0, 0x0, + ))) + ts1 := tk.Session().ShowProcess().GetMinStartTS(0) + require.Positive(t, ts1) + stmt2, _, _, err := c.Context().Prepare("select * from t") + require.NoError(t, err) + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt2.ID())), + 0x1, 0x1, 0x0, 0x0, 0x0, + ))) + ts2 := tk.Session().ShowProcess().GetMinStartTS(ts1) + require.Positive(t, ts2) + + require.Less(t, ts1, ts2) + require.Equal(t, ts1, srv.GetMinStartTS(0)) + require.Equal(t, ts2, srv.GetMinStartTS(ts1)) + require.Zero(t, srv.GetMinStartTS(ts2)) + + // should unhold all when session closed + c.Close() + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + require.Zero(t, srv.GetMinStartTS(0)) +} diff --git a/server/driver.go b/server/driver.go index 0363758e3e47b..ae996715113ae 100644 --- a/server/driver.go +++ b/server/driver.go @@ -72,6 +72,8 @@ type ResultSet interface { StoreFetchedRows(rows []chunk.Row) GetFetchedRows() []chunk.Row Close() error + // IsClosed checks whether the result set is closed. + IsClosed() bool } // fetchNotifier represents notifier will be called in COM_FETCH. diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 23bc6f90a7484..77f60ab550f41 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -343,6 +343,11 @@ func (trs *tidbResultSet) Close() error { return err } +// IsClosed implements ResultSet.IsClosed interface. +func (trs *tidbResultSet) IsClosed() bool { + return atomic.LoadInt32(&trs.closed) == 1 +} + // OnFetchReturned implements fetchNotifier#OnFetchReturned func (trs *tidbResultSet) OnFetchReturned() { if cl, ok := trs.recordSet.(fetchNotifier); ok { @@ -375,6 +380,46 @@ func (trs *tidbResultSet) Columns() []*ColumnInfo { return trs.columns } +// rsWithHooks wraps a ResultSet with some hooks (currently only onClosed). +type rsWithHooks struct { + ResultSet + onClosed func() +} + +// Close implements ResultSet#Close +func (rs *rsWithHooks) Close() error { + closed := rs.IsClosed() + err := rs.ResultSet.Close() + if !closed && rs.onClosed != nil { + rs.onClosed() + } + return err +} + +// OnFetchReturned implements fetchNotifier#OnFetchReturned +func (rs *rsWithHooks) OnFetchReturned() { + if impl, ok := rs.ResultSet.(fetchNotifier); ok { + impl.OnFetchReturned() + } +} + +// Unwrap returns the underlying result set +func (rs *rsWithHooks) Unwrap() ResultSet { + return rs.ResultSet +} + +// unwrapResultSet likes errors.Cause but for ResultSet +func unwrapResultSet(rs ResultSet) ResultSet { + var unRS ResultSet + if u, ok := rs.(interface{ Unwrap() ResultSet }); ok { + unRS = u.Unwrap() + } + if unRS == nil { + return rs + } + return unwrapResultSet(unRS) +} + func convertColumnInfo(fld *ast.ResultField) (ci *ColumnInfo) { ci = &ColumnInfo{ Name: fld.ColumnAsName.O, diff --git a/server/driver_tidb_test.go b/server/driver_tidb_test.go index b5f7e670aded0..b56632937e078 100644 --- a/server/driver_tidb_test.go +++ b/server/driver_tidb_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" "github.com/stretchr/testify/require" ) @@ -95,3 +96,27 @@ func TestConvertColumnInfo(t *testing.T) { colInfo = convertColumnInfo(&resultField) require.Equal(t, uint32(4), colInfo.ColumnLength) } + +func TestRSWithHooks(t *testing.T) { + closeCount := 0 + rs := &rsWithHooks{ + ResultSet: &tidbResultSet{recordSet: new(sqlexec.SimpleRecordSet)}, + onClosed: func() { closeCount++ }, + } + require.Equal(t, 0, closeCount) + rs.Close() + require.Equal(t, 1, closeCount) + rs.Close() + require.Equal(t, 1, closeCount) +} + +func TestUnwrapRS(t *testing.T) { + var nilRS ResultSet + require.Nil(t, unwrapResultSet(nilRS)) + rs0 := new(tidbResultSet) + rs1 := &rsWithHooks{ResultSet: rs0} + rs2 := &rsWithHooks{ResultSet: rs1} + for _, rs := range []ResultSet{rs0, rs1, rs2} { + require.Equal(t, rs0, unwrapResultSet(rs)) + } +} diff --git a/server/mock_conn.go b/server/mock_conn.go index 7ecb304426aa8..ce4629b6e8009 100644 --- a/server/mock_conn.go +++ b/server/mock_conn.go @@ -104,6 +104,9 @@ func CreateMockConn(t *testing.T, store kv.Storage, server *Server) MockConn { }, } cc.setCtx(tc) + cc.server.rwlock.Lock() + server.clients[cc.connectionID] = cc + cc.server.rwlock.Unlock() return &mockConn{ clientConn: cc, t: t, diff --git a/server/server.go b/server/server.go index 90b4a7e27101b..ef9228cf79a04 100644 --- a/server/server.go +++ b/server/server.go @@ -870,3 +870,37 @@ func setSystemTimeZoneVariable() { variable.SetSysVar("system_time_zone", tz) }) } + +// GetMinStartTS implements SessionManager interface. +func (s *Server) GetMinStartTS(lowerBound uint64) (ts uint64) { + // sys processes + if s.dom != nil { + for _, pi := range s.dom.SysProcTracker().GetSysProcessList() { + if thisTS := pi.GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + } + // user sessions + func() { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + for _, client := range s.clients { + if thisTS := client.ctx.ShowProcess().GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + }() + // internal sessions + func() { + s.sessionMapMutex.Lock() + defer s.sessionMapMutex.Unlock() + analyzeProcID := util.GetAutoAnalyzeProcID(s.ServerID) + for se := range s.internalSessions { + if thisTS, processInfoID := session.GetStartTSFromSession(se); processInfoID != analyzeProcID && thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + }() + return +} diff --git a/session/session.go b/session/session.go index 999215977d26a..49fa0735dfc36 100644 --- a/session/session.go +++ b/session/session.go @@ -1410,6 +1410,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu StatsInfo: plannercore.GetStatsInfo, MaxExecutionTime: maxExecutionTime, RedactSQL: s.sessionVars.EnableRedactLog, + ProtectedTSList: &s.sessionVars.ProtectedTSList, } oldPi := s.ShowProcess() if p == nil { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index d77d14e351556..f688aef32dd91 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1047,6 +1047,9 @@ type SessionVars struct { // MaxAllowedPacket indicates the maximum size of a packet for the MySQL protocol. MaxAllowedPacket uint64 + + // ProtectedTSList holds a list of timestamps that should delay GC. + ProtectedTSList protectedTSList } // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. @@ -2416,3 +2419,53 @@ func (s *SessionVars) IsRcCheckTsRetryable(err error) bool { // The `RCCheckTS` flag of `stmtCtx` is set. return s.RcReadCheckTS && s.StmtCtx.RCCheckTS && errors.ErrorEqual(err, kv.ErrWriteConflict) } + +// protectedTSList implements util/processinfo#ProtectedTSList +type protectedTSList struct { + sync.Mutex + items map[uint64]int +} + +// HoldTS holds the timestamp to prevent its data from being GCed. +func (lst *protectedTSList) HoldTS(ts uint64) (unhold func()) { + lst.Lock() + if lst.items == nil { + lst.items = map[uint64]int{} + } + lst.items[ts] += 1 + lst.Unlock() + var once sync.Once + return func() { + once.Do(func() { + lst.Lock() + if lst.items != nil { + if lst.items[ts] > 1 { + lst.items[ts] -= 1 + } else { + delete(lst.items, ts) + } + } + lst.Unlock() + }) + } +} + +// GetMinProtectedTS returns the minimum protected timestamp that greater than `lowerBound` (0 if no such one). +func (lst *protectedTSList) GetMinProtectedTS(lowerBound uint64) (ts uint64) { + lst.Lock() + for k, v := range lst.items { + if v > 0 && k > lowerBound && (k < ts || ts == 0) { + ts = k + } + } + lst.Unlock() + return +} + +// Size returns the number of protected timestamps (exported for test). +func (lst *protectedTSList) Size() (size int) { + lst.Lock() + size = len(lst.items) + lst.Unlock() + return +} diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 52c8fd5c818f6..1b63cd7c49406 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -291,3 +291,57 @@ func TestIsolationRead(t *testing.T) { _, ok = sessVars.IsolationReadEngines[kv.TiFlash] require.True(t, ok) } + +func TestPretectedTSList(t *testing.T) { + lst := &variable.NewSessionVars().ProtectedTSList + + // empty set + require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) + require.Equal(t, 0, lst.Size()) + + // hold 1 + unhold1 := lst.HoldTS(1) + require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) + + // hold 2 twice + unhold2a := lst.HoldTS(2) + unhold2b := lst.HoldTS(2) + require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) + require.Equal(t, 2, lst.Size()) + + // unhold 2a + unhold2a() + require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) + require.Equal(t, 2, lst.Size()) + // unhold 2a again + unhold2a() + require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) + require.Equal(t, 2, lst.Size()) + + // unhold 1 + unhold1() + require.Equal(t, uint64(2), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) + require.Equal(t, 1, lst.Size()) + + // unhold 2b + unhold2b() + require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) + require.Equal(t, 0, lst.Size()) + + // unhold 2b again + unhold2b() + require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) + require.Equal(t, 0, lst.Size()) +} diff --git a/testkit/mocksessionmanager.go b/testkit/mocksessionmanager.go index 5e775c02087e4..e0017566ec21c 100644 --- a/testkit/mocksessionmanager.go +++ b/testkit/mocksessionmanager.go @@ -77,3 +77,15 @@ func (msm *MockSessionManager) DeleteInternalSession(se interface{}) {} func (msm *MockSessionManager) GetInternalSessionStartTSList() []uint64 { return nil } + +// GetMinStartTS implements SessionManager interface. +func (msm *MockSessionManager) GetMinStartTS(lowerBound uint64) (ts uint64) { + if len(msm.PS) > 0 { + for _, pi := range msm.PS { + if thisTS := pi.GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + } + return +} diff --git a/util/processinfo.go b/util/processinfo.go index 996136d77619e..d868434de96d9 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -29,8 +29,17 @@ import ( "github.com/tikv/client-go/v2/oracle" ) +// ProtectedTSList holds a list of timestamps that should delay GC. +type ProtectedTSList interface { + // HoldTS holds the timestamp to prevent its data from being GCed. + HoldTS(ts uint64) (unhold func()) + // GetMinProtectedTS returns the minimum protected timestamp that greater than `lowerBound` (0 if no such one). + GetMinProtectedTS(lowerBound uint64) (ts uint64) +} + // ProcessInfo is a struct used for show processlist statement. type ProcessInfo struct { + ProtectedTSList ID uint64 User string Host string @@ -118,6 +127,23 @@ func (pi *ProcessInfo) ToRow(tz *time.Location) []interface{} { return append(pi.ToRowForShow(true), pi.Digest, bytesConsumed, diskConsumed, pi.txnStartTs(tz)) } +// GetMinStartTS returns the minimum start-ts (used to delay GC) that greater than `lowerBound` (0 if no such one). +func (pi *ProcessInfo) GetMinStartTS(lowerBound uint64) (ts uint64) { + if pi == nil { + return + } + if thisTS := pi.CurTxnStartTS; thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + if pi.ProtectedTSList == nil { + return + } + if thisTS := pi.GetMinProtectedTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + return +} + // ascServerStatus is a slice of all defined server status in ascending order. var ascServerStatus = []uint16{ mysql.ServerStatusInTrans, @@ -181,6 +207,8 @@ type SessionManager interface { DeleteInternalSession(se interface{}) // Get all startTS of every transactions running in the current internal sessions GetInternalSessionStartTSList() []uint64 + // GetMinStartTS returns the minimum start-ts (used to delay GC) that greater than `lowerBound` (0 if no such one). + GetMinStartTS(lowerBound uint64) uint64 } // GlobalConnID is the global connection ID, providing UNIQUE connection IDs across the whole TiDB cluster.