Skip to content

Commit

Permalink
ddl: introduce newDefaultReorgDistSQLCtx to replace mock context in…
Browse files Browse the repository at this point in the history
… DDL reorg (#53467)

ref #53388
  • Loading branch information
lcwangchao authored May 28, 2024
1 parent bf704fd commit 4bf624b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/ddl/syncer",
"//pkg/ddl/util",
"//pkg/distsql",
"//pkg/distsql/context",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
Expand Down Expand Up @@ -143,13 +144,15 @@ go_library(
"//pkg/util/dbterror/exeerrors",
"//pkg/util/domainutil",
"//pkg/util/engine",
"//pkg/util/execdetails",
"//pkg/util/filter",
"//pkg/util/gcutil",
"//pkg/util/generic",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/memory",
"//pkg/util/mock",
"//pkg/util/ranger",
"//pkg/util/resourcegrouptag",
Expand All @@ -162,6 +165,7 @@ go_library(
"//pkg/util/sqlkiller",
"//pkg/util/stringutil",
"//pkg/util/syncutil",
"//pkg/util/tiflash",
"//pkg/util/timeutil",
"//pkg/util/topsql",
"//pkg/util/topsql/state",
Expand Down
33 changes: 33 additions & 0 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
Expand All @@ -34,15 +35,22 @@ import (
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
poolutil "github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/mock"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/tiflash"
tikvstore "github.com/tikv/client-go/v2/kv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -148,6 +156,31 @@ func newSessCtx(
return sessCtx, nil
}

func newDefaultReorgDistSQLCtx(kvClient kv.Client) *distsqlctx.DistSQLContext {
warnHandler := contextutil.NewStaticWarnHandler(0)
var sqlKiller sqlkiller.SQLKiller
var execDetails execdetails.SyncExecDetails
return &distsqlctx.DistSQLContext{
WarnHandler: warnHandler,
Client: kvClient,
EnableChunkRPC: true,
EnabledRateLimitAction: variable.DefTiDBEnableRateLimitAction,
KVVars: tikvstore.NewVariables(&sqlKiller.Signal),
SessionMemTracker: memory.NewTracker(memory.LabelForSession, -1),
Location: time.UTC,
SQLKiller: &sqlKiller,
ErrCtx: errctx.NewContextWithLevels(stmtctx.DefaultStmtErrLevels, warnHandler),
TiFlashReplicaRead: tiflash.GetTiFlashReplicaReadByStr(variable.DefTiFlashReplicaRead),
TiFlashMaxThreads: variable.DefTiFlashMaxThreads,
TiFlashMaxBytesBeforeExternalJoin: variable.DefTiFlashMaxBytesBeforeExternalJoin,
TiFlashMaxBytesBeforeExternalGroupBy: variable.DefTiFlashMaxBytesBeforeExternalGroupBy,
TiFlashMaxBytesBeforeExternalSort: variable.DefTiFlashMaxBytesBeforeExternalSort,
TiFlashMaxQueryMemoryPerNode: variable.DefTiFlashMemQuotaQueryPerNode,
TiFlashQuerySpillRatio: variable.DefTiFlashQuerySpillRatio,
ExecDetails: &execDetails,
}
}

// initSessCtx initializes the session context. Be careful to the timezone.
func initSessCtx(
sessCtx sessionctx.Context,
Expand Down
48 changes: 48 additions & 0 deletions pkg/ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -144,3 +147,48 @@ func TestReorgExprContext(t *testing.T) {
require.Equal(t, evalCtx1.GetDefaultWeekFormatMode(), evalCtx.GetDefaultWeekFormatMode())
require.Equal(t, evalCtx1.GetDivPrecisionIncrement(), evalCtx.GetDivPrecisionIncrement())
}

type mockStorage struct {
kv.Storage
client kv.Client
}

func (s *mockStorage) GetClient() kv.Client {
return s.client
}

// TestReorgExprContext is used in refactor stage to make sure the newDefaultReorgDistSQLCtx() is
// compatible with newReorgSessCtx(nil).GetDistSQLCtx() to make it safe to replace `mock.Context` usage.
// After refactor, the TestReorgExprContext can be removed.
func TestReorgDistSQLCtx(t *testing.T) {
store := &mockStorage{client: &mock.Client{}}
ctx1 := newReorgSessCtx(store).GetDistSQLCtx()
ctx2 := newDefaultReorgDistSQLCtx(store.client)

// set the same warnHandler to make two contexts equal
ctx1.WarnHandler = ctx2.WarnHandler

// set the same KVVars to make two contexts equal
require.Equal(t, uint32(0), *ctx1.KVVars.Killed)
require.Equal(t, uint32(0), *ctx2.KVVars.Killed)
ctx1.KVVars.Killed = ctx2.KVVars.Killed

// set the same SessionMemTracker to make two contexts equal
require.Equal(t, ctx1.SessionMemTracker.Label(), ctx2.SessionMemTracker.Label())
require.Equal(t, ctx1.SessionMemTracker.GetBytesLimit(), ctx2.SessionMemTracker.GetBytesLimit())
ctx1.SessionMemTracker = ctx2.SessionMemTracker

// set the same ErrCtx to make two contexts equal
require.Equal(t, ctx1.ErrCtx.LevelMap(), ctx2.ErrCtx.LevelMap())
require.Equal(t, 0, ctx2.WarnHandler.(contextutil.WarnHandler).WarningCount())
ctx2.ErrCtx.AppendWarning(errors.New("warn"))
require.Equal(t, 1, ctx2.WarnHandler.(contextutil.WarnHandler).WarningCount())
ctx1.ErrCtx = ctx2.ErrCtx

// set the same ExecDetails to make two contexts equal
require.NotNil(t, ctx1.ExecDetails)
require.NotNil(t, ctx2.ExecDetails)
ctx1.ExecDetails = ctx2.ExecDetails

require.Equal(t, ctx1, ctx2)
}
13 changes: 7 additions & 6 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/distsql"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/expression/contextstatic"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -510,7 +511,7 @@ func constructLimitPB(count uint64) *tipb.Executor {
return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}
}

func buildDescTableScanDAG(ctx sessionctx.Context, tbl table.PhysicalTable, handleCols []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
func buildDescTableScanDAG(distSQLCtx *distsqlctx.DistSQLContext, tbl table.PhysicalTable, handleCols []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
_, timeZoneOffset := time.Now().In(time.UTC).Zone()
dagReq.TimeZoneOffset = int64(timeZoneOffset)
Expand All @@ -522,7 +523,7 @@ func buildDescTableScanDAG(ctx sessionctx.Context, tbl table.PhysicalTable, hand
tblScanExec := constructDescTableScanPB(tbl.GetPhysicalID(), tbl.Meta(), handleCols)
dagReq.Executors = append(dagReq.Executors, tblScanExec)
dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit))
distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq)
distsql.SetEncodeType(distSQLCtx, dagReq)
return dagReq, nil
}

Expand All @@ -537,8 +538,8 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType {
// buildDescTableScan builds a desc table scan upon tblInfo.
func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.PhysicalTable,
handleCols []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
sctx := newReorgSessCtx(dc.store)
dagPB, err := buildDescTableScanDAG(sctx, tbl, handleCols, limit)
distSQLCtx := newDefaultReorgDistSQLCtx(dc.store.GetClient())
dagPB, err := buildDescTableScanDAG(distSQLCtx, tbl, handleCols, limit)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -550,7 +551,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.
} else {
ranges = ranger.FullIntRange(false)
}
builder = b.SetHandleRanges(sctx.GetDistSQLCtx(), tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges)
builder = b.SetHandleRanges(distSQLCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges)
builder.SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeepOrder(true).
Expand All @@ -569,7 +570,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.
return nil, errors.Trace(err)
}

result, err := distsql.Select(ctx.ddlJobCtx, sctx.GetDistSQLCtx(), kvReq, getColumnsTypes(handleCols))
result, err := distsql.Select(ctx.ddlJobCtx, distSQLCtx, kvReq, getColumnsTypes(handleCols))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit 4bf624b

Please sign in to comment.