Skip to content

Commit

Permalink
ttl: always enable all read engines for TTL sessions (#56604) (#56805)
Browse files Browse the repository at this point in the history
close #56402
  • Loading branch information
ti-chi-bot authored Dec 10, 2024
1 parent b298e21 commit b7de64f
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
35 changes: 35 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/domain"
Expand Down Expand Up @@ -64,6 +65,40 @@ func sessionFactory(t *testing.T, store kv.Storage) func() session.Session {
}
}

func TestGetSession(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@time_zone = 'Asia/Shanghai'")
tk.MustExec("set @@global.time_zone= 'Europe/Berlin'")
tk.MustExec("set @@tidb_retry_limit=1")
tk.MustExec("set @@tidb_enable_1pc=0")
tk.MustExec("set @@tidb_enable_async_commit=0")
tk.MustExec("set @@tidb_isolation_read_engines='tiflash,tidb'")
var getCnt atomic.Int32

pool := pools.NewResourcePool(func() (pools.Resource, error) {
if getCnt.CompareAndSwap(0, 1) {
return tk.Session(), nil
}
require.FailNow(t, "get session more than once")
return nil, nil
}, 1, 1, 0)
defer pool.Close()

se, err := ttlworker.GetSessionForTest(pool)
require.NoError(t, err)
defer se.Close()

// session variables should be set
tk.MustQuery("select @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit, @@tidb_isolation_read_engines").
Check(testkit.Rows("0 1 1 tikv,tiflash,tidb"))

// all session variables should be restored after close
se.Close()
tk.MustQuery("select @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit, @@tidb_isolation_read_engines").
Check(testkit.Rows("1 0 0 tiflash,tidb"))
}

func TestParallelLockNewJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
Expand Down
40 changes: 40 additions & 0 deletions pkg/ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -54,6 +55,12 @@ var DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
return s
}

var allIsolationReadEngines = map[kv.StoreType]struct{}{
kv.TiKV: {},
kv.TiFlash: {},
kv.TiDB: {},
}

type sessionPool interface {
Get() (pools.Resource, error)
Put(pools.Resource)
Expand Down Expand Up @@ -85,6 +92,8 @@ func getSession(pool sessionPool) (session.Session, error) {
originalRetryLimit := sctx.GetSessionVars().RetryLimit
originalEnable1PC := sctx.GetSessionVars().Enable1PC
originalEnableAsyncCommit := sctx.GetSessionVars().EnableAsyncCommit
originalIsolationReadEngines, restoreIsolationReadEngines := "", false

se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
if err != nil {
Expand All @@ -101,6 +110,11 @@ func getSession(pool sessionPool) (session.Session, error) {
terror.Log(err)
}

if restoreIsolationReadEngines {
_, err = se.ExecuteSQL(context.Background(), "set tidb_isolation_read_engines=%?", originalIsolationReadEngines)
terror.Log(err)
}

DetachStatsCollector(exec)

pool.Put(resource)
Expand Down Expand Up @@ -135,6 +149,32 @@ func getSession(pool sessionPool) (session.Session, error) {
return nil, err
}

// allow the session in TTL to use all read engines.
_, hasTiDBEngine := se.GetSessionVars().IsolationReadEngines[kv.TiDB]
_, hasTiKVEngine := se.GetSessionVars().IsolationReadEngines[kv.TiKV]
_, hasTiFlashEngine := se.GetSessionVars().IsolationReadEngines[kv.TiFlash]
if !hasTiDBEngine || !hasTiKVEngine || !hasTiFlashEngine {
rows, err := se.ExecuteSQL(context.Background(), "select @@tidb_isolation_read_engines")
if err != nil {
se.Close()
return nil, err
}

if len(rows) == 0 || rows[0].Len() == 0 {
se.Close()
return nil, errors.New("failed to get tidb_isolation_read_engines variable")
}
originalIsolationReadEngines = rows[0].GetString(0)

_, err = se.ExecuteSQL(context.Background(), "set tidb_isolation_read_engines='tikv,tiflash,tidb'")
if err != nil {
se.Close()
return nil, err
}

restoreIsolationReadEngines = true
}

return se, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/ttl/ttlworker/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,6 @@ func TestValidateTTLWork(t *testing.T) {
err = validateTTLWork(ctx, s, tbl, expire)
require.EqualError(t, err, "physical id changed")
}

// GetSessionForTest is used for test
var GetSessionForTest = getSession

0 comments on commit b7de64f

Please sign in to comment.