Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: add resource group for ttl query #40930

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,21 +743,21 @@ func (e *DDLExec) executeAlterPlacementPolicy(s *ast.AlterPlacementPolicyStmt) e
}

func (e *DDLExec) executeCreateResourceGroup(s *ast.CreateResourceGroupStmt) error {
if !variable.EnableResourceControl.Load() {
if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL {
return infoschema.ErrResourceGroupSupportDisabled
}
return domain.GetDomain(e.ctx).DDL().AddResourceGroup(e.ctx, s)
}

func (e *DDLExec) executeAlterResourceGroup(s *ast.AlterResourceGroupStmt) error {
if !variable.EnableResourceControl.Load() {
if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL {
return infoschema.ErrResourceGroupSupportDisabled
}
return domain.GetDomain(e.ctx).DDL().AlterResourceGroup(e.ctx, s)
}

func (e *DDLExec) executeDropResourceGroup(s *ast.DropResourceGroupStmt) error {
if !variable.EnableResourceControl.Load() {
if !variable.EnableResourceControl.Load() && !e.ctx.GetSessionVars().InRestrictedSQL {
return infoschema.ErrResourceGroupSupportDisabled
}
return domain.GetDomain(e.ctx).DDL().DropResourceGroup(e.ctx, s)
Expand Down
18 changes: 17 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ const (
key(parent_table_id, create_time),
key(create_time)
);`

YangKeao marked this conversation as resolved.
Show resolved Hide resolved
// CreateTTLResourceGroup creates the resource group for ttl
// TODO: set a more proper value for the RU_PER_SECOND
CreateTTLResourceGroup = `CREATE RESOURCE GROUP IF NOT EXISTS tidb_ttl RU_PER_SEC=10000000`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set a super big value for the tidb_ttl, so the ttl is not limited by default. If the users want to limit the TTL, they could alter the resource group tidb_ttl. Is it a good pattern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can user CREATE RESOURCE GROUP IF NOT EXISTS tidb_ttl RU_PER_SEC=10000 BURSTABLE instead.

)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -804,11 +808,13 @@ const (
// - tidb_enable_foreign_key: off -> on
// - tidb_store_batch_size: 0 -> 4
version134 = 134
// version135 adds the tidb_ttl resource group
version135 = 135
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version134
var currentBootstrapVersion int64 = version135

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -930,6 +936,7 @@ var (
upgradeToVer132,
upgradeToVer133,
upgradeToVer134,
upgradeToVer135,
}
)

Expand Down Expand Up @@ -2316,6 +2323,13 @@ func upgradeToVer134(s Session, ver int64) {
mustExecute(s, "UPDATE HIGH_PRIORITY %n.%n SET VARIABLE_VALUE = %? WHERE VARIABLE_NAME = %? AND VARIABLE_VALUE = %?;", mysql.SystemDB, mysql.GlobalVariablesTable, "4", variable.TiDBStoreBatchSize, "0")
}

func upgradeToVer135(s Session, ver int64) {
if ver >= version135 {
return
}
doReentrantDDL(s, CreateTTLResourceGroup)
YangKeao marked this conversation as resolved.
Show resolved Hide resolved
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down Expand Up @@ -2426,6 +2440,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateTTLTask)
// Create tidb_ttl_job_history table
mustExecute(s, CreateTTLJobHistory)
// Create tidb_ttl resource group
mustExecute(s, CreateTTLResourceGroup)
YangKeao marked this conversation as resolved.
Show resolved Hide resolved
}

// doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap.
Expand Down
13 changes: 7 additions & 6 deletions telemetry/data_feature_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,40 +356,41 @@ func TestPlacementPolicies(t *testing.T) {

func TestResourceGroups(t *testing.T) {
store := testkit.CreateMockStore(t)
systemResourceGroupCount := 1

tk := testkit.NewTestKit(t, store)

usage, err := telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(0), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, false, usage.ResourceControlUsage.Enabled)

tk.MustExec("set global tidb_enable_resource_control = 'ON'")
tk.MustExec("create resource group x ru_per_sec=100")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, true, usage.ResourceControlUsage.Enabled)
require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups)

tk.MustExec("create resource group y ru_per_sec=100")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(2), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+2), usage.ResourceControlUsage.NumResourceGroups)

tk.MustExec("alter resource group y ru_per_sec=200")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(2), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+2), usage.ResourceControlUsage.NumResourceGroups)

tk.MustExec("drop resource group y")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups)

tk.MustExec("set global tidb_enable_resource_control = 'OFF'")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(1), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, uint64(systemResourceGroupCount+1), usage.ResourceControlUsage.NumResourceGroups)
require.Equal(t, false, usage.ResourceControlUsage.Enabled)
}

Expand Down
8 changes: 7 additions & 1 deletion ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -173,7 +174,12 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
return t.result(sqlErr)
}
retrySQL = sql
retryTimes++

// if the sql returns this error, it means we have reached the limit of resource group (and has waited for 1 second).
// TODO: throttle the traffic rather than a stable interval to make it easier to pass the limiter.
if !strings.Contains(sqlErr.Error(), "[resource group controller] limiter has no enough token or needs wait too long") {
retryTimes++
}

tracer.EnterPhase(metrics.PhaseWaitRetry)
select {
Expand Down
15 changes: 15 additions & 0 deletions ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ type mockScanTask struct {
tbl *cache.PhysicalTable
sessPool *mockSessionPool
sqlRetry []int
sqlErr map[int]error

delCh chan *ttlDeleteTask
prevSQL string
Expand All @@ -237,6 +238,7 @@ func newMockScanTask(t *testing.T, sqlCnt int) *mockScanTask {
delCh: make(chan *ttlDeleteTask, sqlCnt*(scanTaskExecuteSQLMaxRetry+1)),
sessPool: newMockSessionPool(t),
sqlRetry: make([]int, sqlCnt),
sqlErr: make(map[int]error),
schemaChangeIdx: -1,
}
task.sessPool.se.executeSQL = task.execSQL
Expand Down Expand Up @@ -379,6 +381,9 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...interface{})
}

if curRetry < t.sqlRetry[i] {
if err := t.sqlErr[i]; err != nil {
return nil, err
}
return nil, errors.New("mockErr")
}

Expand Down Expand Up @@ -407,3 +412,13 @@ func TestScanTaskDoScan(t *testing.T) {
task.schemaChangeInRetry = 2
task.runDoScanForTest(1, "table 'test.t1' meta changed, should abort current job: [schema:1146]Table 'test.t1' doesn't exist")
}

func TestScanTaskRetryWithLimit(t *testing.T) {
task := newMockScanTask(t, 3)
task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry + 1
task.runDoScanForTest(1, "mockErr")

task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry + 1
task.sqlErr[1] = fmt.Errorf("[resource group controller] limiter has no enough token or needs wait too long")
task.runDoScanForTest(3, "")
}
43 changes: 10 additions & 33 deletions ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@ package ttlworker

import (
"context"
"fmt"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

// The following two functions are using `sqlexec.SQLExecutor` to represent session
Expand Down Expand Up @@ -85,21 +81,12 @@ func getSession(pool sessionPool) (session.Session, error) {
originalRetryLimit := sctx.GetSessionVars().RetryLimit
originalEnable1PC := sctx.GetSessionVars().Enable1PC
originalEnableAsyncCommit := sctx.GetSessionVars().EnableAsyncCommit
originalResourceGroupName := sctx.GetSessionVars().ResourceGroupName
se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
if err != nil {
logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err))
}

if !originalEnable1PC {
_, err = se.ExecuteSQL(context.Background(), "set tidb_enable_1pc=OFF")
terror.Log(err)
}

if !originalEnableAsyncCommit {
_, err = se.ExecuteSQL(context.Background(), "set tidb_enable_async_commit=OFF")
terror.Log(err)
}
sctx.GetSessionVars().RetryLimit = originalRetryLimit
sctx.GetSessionVars().Enable1PC = originalEnable1PC
sctx.GetSessionVars().EnableAsyncCommit = originalEnableAsyncCommit
sctx.GetSessionVars().ResourceGroupName = originalResourceGroupName

DetachStatsCollector(exec)

Expand All @@ -109,32 +96,22 @@ func getSession(pool sessionPool) (session.Session, error) {
exec = AttachStatsCollector(exec)

// store and set the retry limit to 0
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
if err != nil {
se.Close()
return nil, err
}
sctx.GetSessionVars().RetryLimit = 0

// set enable 1pc to ON
_, err = se.ExecuteSQL(context.Background(), "set tidb_enable_1pc=ON")
if err != nil {
se.Close()
return nil, err
}
sctx.GetSessionVars().Enable1PC = true

// set enable async commit to ON
_, err = se.ExecuteSQL(context.Background(), "set tidb_enable_async_commit=ON")
if err != nil {
se.Close()
return nil, err
}
sctx.GetSessionVars().EnableAsyncCommit = true

// Force rollback the session to guarantee the session is not in any explicit transaction
if _, err = se.ExecuteSQL(context.Background(), "ROLLBACK"); err != nil {
se.Close()
return nil, err
}

sctx.GetSessionVars().ResourceGroupName = "tidb_ttl"

return se, nil
}

Expand Down