diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 8fce0e88a98df..f4e23d6bd586a 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -120,7 +120,6 @@ func NewAddIndexIngestPipeline( sessPool opSessPool, backendCtx ingest.BackendCtx, engines []ingest.Engine, - sessCtx sessionctx.Context, jobID int64, tbl table.PhysicalTable, idxInfos []*model.IndexInfo, @@ -137,7 +136,7 @@ func NewAddIndexIngestPipeline( indexes = append(indexes, index) } reqSrc := getDDLRequestSource(model.ActionAddIndex) - copCtx, err := copr.NewCopContext(tbl.Meta(), idxInfos, sessCtx, reqSrc) + copCtx, err := NewReorgCopContext(store, reorgMeta, tbl.Meta(), idxInfos, reqSrc) if err != nil { return nil, err } @@ -174,7 +173,6 @@ func NewWriteIndexToExternalStoragePipeline( store kv.Storage, extStoreURI string, sessPool opSessPool, - sessCtx sessionctx.Context, jobID, subtaskID int64, tbl table.PhysicalTable, idxInfos []*model.IndexInfo, @@ -193,7 +191,7 @@ func NewWriteIndexToExternalStoragePipeline( indexes = append(indexes, index) } reqSrc := getDDLRequestSource(model.ActionAddIndex) - copCtx, err := copr.NewCopContext(tbl.Meta(), idxInfos, sessCtx, reqSrc) + copCtx, err := NewReorgCopContext(store, reorgMeta, tbl.Meta(), idxInfos, reqSrc) if err != nil { return nil, err } diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index 571f2f298f116..003e23157b879 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" @@ -105,20 +104,15 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta return err } - sessCtx, err := newSessCtx(r.d.store, r.job.ReorgMeta) - if err != nil { - return err - } - opCtx := NewOperatorCtx(ctx, subtask.TaskID, subtask.ID) defer opCtx.Cancel() r.curRowCount.Store(0) var pipe *operator.AsyncPipeline if len(r.cloudStorageURI) > 0 { - pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, sm, subtask.Concurrency) + pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sm, subtask.Concurrency) } else { - pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, sm, subtask.Concurrency) + pipe, err = r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency) } if err != nil { return err @@ -203,7 +197,6 @@ func (r *readIndexExecutor) getTableStartEndKey(sm *BackfillSubTaskMeta) ( func (r *readIndexExecutor) buildLocalStorePipeline( opCtx *OperatorCtx, - sessCtx sessionctx.Context, sm *BackfillSubTaskMeta, concurrency int, ) (*operator.AsyncPipeline, error) { @@ -232,7 +225,6 @@ func (r *readIndexExecutor) buildLocalStorePipeline( d.sessPool, r.bc, engines, - sessCtx, r.job.ID, tbl, r.indexes, @@ -249,7 +241,6 @@ func (r *readIndexExecutor) buildLocalStorePipeline( func (r *readIndexExecutor) buildExternalStorePipeline( opCtx *OperatorCtx, subtaskID int64, - sessCtx sessionctx.Context, sm *BackfillSubTaskMeta, concurrency int, ) (*operator.AsyncPipeline, error) { @@ -278,7 +269,6 @@ func (r *readIndexExecutor) buildExternalStorePipeline( d.store, r.cloudStorageURI, r.d.sessPool, - sessCtx, r.job.ID, subtaskID, tbl, diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index dca7b075c5f39..4766b94afaa4c 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -141,6 +141,29 @@ func (b *txnBackfillScheduler) resultChan() <-chan *backfillResult { return b.resultCh } +// NewReorgCopContext creates a CopContext for reorg +func NewReorgCopContext( + store kv.Storage, + reorgMeta *model.DDLReorgMeta, + tblInfo *model.TableInfo, + allIdxInfo []*model.IndexInfo, + requestSource string, +) (copr.CopContext, error) { + sessCtx, err := newSessCtx(store, reorgMeta) + if err != nil { + return nil, err + } + return copr.NewCopContext( + sessCtx.GetExprCtx(), + sessCtx.GetDistSQLCtx(), + sessCtx.GetSessionVars().StmtCtx.PushDownFlags(), + sessCtx.GetTableCtx(), + tblInfo, + allIdxInfo, + requestSource, + ) +} + func newSessCtx(store kv.Storage, reorgMeta *model.DDLReorgMeta) (sessionctx.Context, error) { sessCtx := newReorgSessCtx(store) if err := initSessCtx(sessCtx, reorgMeta); err != nil { @@ -519,18 +542,13 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e } allIndexInfos = append(allIndexInfos, indexInfo) } - sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta) - if err != nil { - logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err)) - return nil, err - } reqSrc := getDDLRequestSource(model.ActionAddIndex) - copCtx, err := copr.NewCopContext(b.tbl.Meta(), allIndexInfos, sessCtx, reqSrc) + copCtx, err := NewReorgCopContext(ri.d.store, ri.ReorgMeta, b.tbl.Meta(), allIndexInfos, reqSrc) if err != nil { logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err)) return nil, err } - return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh, b.sessPool, b.checkpointMgr), nil + return newCopReqSenderPool(b.ctx, copCtx, ri.d.store, b.taskCh, b.sessPool, b.checkpointMgr), nil } func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) { diff --git a/pkg/ddl/bench_test.go b/pkg/ddl/bench_test.go index e26f751618a2a..19f460ae0eb90 100644 --- a/pkg/ddl/bench_test.go +++ b/pkg/ddl/bench_test.go @@ -43,7 +43,10 @@ func BenchmarkExtractDatumByOffsets(b *testing.B) { require.NoError(b, err) tblInfo := tbl.Meta() idxInfo := tblInfo.FindIndexByName("idx") - copCtx, err := copr.NewCopContextSingleIndex(tblInfo, idxInfo, tk.Session(), "") + sctx := tk.Session() + copCtx, err := ddl.NewReorgCopContext(store, ddl.NewDDLReorgMeta(sctx), tblInfo, []*model.IndexInfo{idxInfo}, "") + require.NoError(b, err) + require.IsType(b, copCtx, &copr.CopContextSingleIndex{}) require.NoError(b, err) startKey := tbl.RecordPrefix() endKey := startKey.PrefixNext() diff --git a/pkg/ddl/copr/BUILD.bazel b/pkg/ddl/copr/BUILD.bazel index 3cb9d8a12584e..c00643a429ee5 100644 --- a/pkg/ddl/copr/BUILD.bazel +++ b/pkg/ddl/copr/BUILD.bazel @@ -6,9 +6,13 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/ddl/copr", visibility = ["//visibility:public"], deps = [ + "//pkg/distsql/context", "//pkg/expression", + "//pkg/expression/context", + "//pkg/infoschema", + "//pkg/infoschema/context", "//pkg/parser/model", - "//pkg/sessionctx", + "//pkg/table", "//pkg/table/tables", "//pkg/types", "@com_github_pingcap_errors//:errors", diff --git a/pkg/ddl/copr/copr_ctx.go b/pkg/ddl/copr/copr_ctx.go index 87f1b43adea6f..3a468b86f060a 100644 --- a/pkg/ddl/copr/copr_ctx.go +++ b/pkg/ddl/copr/copr_ctx.go @@ -16,9 +16,14 @@ package copr import ( "github.com/pingcap/errors" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/expression" + exprctx "github.com/pingcap/tidb/pkg/expression/context" + // make sure mock.MockInfoschema is initialized to make sure the test pass + _ "github.com/pingcap/tidb/pkg/infoschema" + infoschema "github.com/pingcap/tidb/pkg/infoschema/context" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/types" ) @@ -35,7 +40,11 @@ type CopContext interface { type CopContextBase struct { TableInfo *model.TableInfo PrimaryKeyInfo *model.IndexInfo - SessionContext sessionctx.Context + ExprCtx exprctx.BuildContext + DistSQLCtx *distsqlctx.DistSQLContext + PushDownFlags uint64 + InfoSchema infoschema.MetaOnlyInfoSchema + TableCtx table.MutateContext RequestSource string @@ -66,9 +75,12 @@ type CopContextMultiIndex struct { // NewCopContextBase creates a CopContextBase. func NewCopContextBase( + exprCtx exprctx.BuildContext, + distSQLCtx *distsqlctx.DistSQLContext, + pushDownFlags uint64, + tableCtx table.MutateContext, tblInfo *model.TableInfo, idxCols []*model.IndexColumn, - sessCtx sessionctx.Context, requestSource string, ) (*CopContextBase, error) { var err error @@ -115,7 +127,7 @@ func NewCopContextBase( handleIDs = []int64{extra.ID} } - expColInfos, _, err := expression.ColumnInfos2ColumnsAndNames(sessCtx.GetExprCtx(), + expColInfos, _, err := expression.ColumnInfos2ColumnsAndNames(exprCtx, model.CIStr{} /* unused */, tblInfo.Name, colInfos, tblInfo) if err != nil { return nil, err @@ -126,7 +138,10 @@ func NewCopContextBase( return &CopContextBase{ TableInfo: tblInfo, PrimaryKeyInfo: primaryIdx, - SessionContext: sessCtx, + ExprCtx: exprCtx, + DistSQLCtx: distSQLCtx, + PushDownFlags: pushDownFlags, + TableCtx: tableCtx, RequestSource: requestSource, ColumnInfos: colInfos, FieldTypes: fieldTps, @@ -139,25 +154,31 @@ func NewCopContextBase( // NewCopContext creates a CopContext. func NewCopContext( + exprCtx exprctx.BuildContext, + distSQLCtx *distsqlctx.DistSQLContext, + pushDownFlags uint64, + tableCtx table.MutateContext, tblInfo *model.TableInfo, allIdxInfo []*model.IndexInfo, - sessCtx sessionctx.Context, requestSource string, ) (CopContext, error) { if len(allIdxInfo) == 1 { - return NewCopContextSingleIndex(tblInfo, allIdxInfo[0], sessCtx, requestSource) + return NewCopContextSingleIndex(exprCtx, distSQLCtx, pushDownFlags, tableCtx, tblInfo, allIdxInfo[0], requestSource) } - return NewCopContextMultiIndex(tblInfo, allIdxInfo, sessCtx, requestSource) + return NewCopContextMultiIndex(exprCtx, distSQLCtx, pushDownFlags, tableCtx, tblInfo, allIdxInfo, requestSource) } // NewCopContextSingleIndex creates a CopContextSingleIndex. func NewCopContextSingleIndex( + exprCtx exprctx.BuildContext, + distSQLCtx *distsqlctx.DistSQLContext, + pushDownFlags uint64, + tableCtx table.MutateContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, - sessCtx sessionctx.Context, requestSource string, ) (*CopContextSingleIndex, error) { - base, err := NewCopContextBase(tblInfo, idxInfo.Columns, sessCtx, requestSource) + base, err := NewCopContextBase(exprCtx, distSQLCtx, pushDownFlags, tableCtx, tblInfo, idxInfo.Columns, requestSource) if err != nil { return nil, err } @@ -186,9 +207,12 @@ func (c *CopContextSingleIndex) IndexInfo(_ int64) *model.IndexInfo { // NewCopContextMultiIndex creates a CopContextMultiIndex. func NewCopContextMultiIndex( + exprCtx exprctx.BuildContext, + distSQLCtx *distsqlctx.DistSQLContext, + pushDownFlags uint64, + tableCtx table.MutateContext, tblInfo *model.TableInfo, allIdxInfo []*model.IndexInfo, - sessCtx sessionctx.Context, requestSource string, ) (*CopContextMultiIndex, error) { approxColLen := 0 @@ -206,7 +230,7 @@ func NewCopContextMultiIndex( } } - base, err := NewCopContextBase(tblInfo, allIdxCols, sessCtx, requestSource) + base, err := NewCopContextBase(exprCtx, distSQLCtx, pushDownFlags, tableCtx, tblInfo, allIdxCols, requestSource) if err != nil { return nil, err } diff --git a/pkg/ddl/copr/copr_ctx_test.go b/pkg/ddl/copr/copr_ctx_test.go index 4d43f14f6de00..9a467d047e456 100644 --- a/pkg/ddl/copr/copr_ctx_test.go +++ b/pkg/ddl/copr/copr_ctx_test.go @@ -104,7 +104,14 @@ func TestNewCopContextSingleIndex(t *testing.T) { }) } - copCtx, err := NewCopContextSingleIndex(mockTableInfo, mockIdxInfo, mock.NewContext(), "") + sctx := mock.NewContext() + copCtx, err := NewCopContextSingleIndex( + sctx.GetExprCtx(), + sctx.GetDistSQLCtx(), + sctx.GetSessionVars().StmtCtx.PushDownFlags(), + sctx.GetTableCtx(), + mockTableInfo, mockIdxInfo, "", + ) require.NoError(t, err) base := copCtx.GetBase() require.Equal(t, "t", base.TableInfo.Name.L) diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 824debcf2d349..877442e981a4a 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -26,13 +26,14 @@ import ( "github.com/pingcap/tidb/pkg/ddl/ingest" sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/distsql" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/expression" + exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" @@ -269,7 +270,7 @@ func (c *copReqSenderPool) recycleChunk(chk *chunk.Chunk) { } func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) { - dagPB, err := buildDAGPB(c.SessionContext, c.TableInfo, c.ColumnInfos) + dagPB, err := buildDAGPB(c.ExprCtx, c.DistSQLCtx, c.PushDownFlags, c.TableInfo, c.ColumnInfos) if err != nil { return nil, err } @@ -280,8 +281,7 @@ func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64, SetStartTS(startTS). SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}). SetKeepOrder(true). - SetFromSessionVars(c.SessionContext.GetDistSQLCtx()). - SetFromInfoSchema(c.SessionContext.GetDomainInfoSchema()). + SetFromSessionVars(c.DistSQLCtx). SetConcurrency(1). Build() kvReq.RequestSource.RequestSourceInternal = true @@ -290,7 +290,7 @@ func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64, if err != nil { return nil, err } - return distsql.Select(ctx, c.SessionContext.GetDistSQLCtx(), kvReq, c.FieldTypes) + return distsql.Select(ctx, c.DistSQLCtx, kvReq, c.FieldTypes) } func fetchTableScanResult( @@ -308,7 +308,7 @@ func fetchTableScanResult( } err = table.FillVirtualColumnValue( copCtx.VirtualColumnsFieldTypes, copCtx.VirtualColumnsOutputOffsets, - copCtx.ExprColumnInfos, copCtx.ColumnInfos, copCtx.SessionContext.GetExprCtx(), chk) + copCtx.ExprColumnInfos, copCtx.ColumnInfos, copCtx.ExprCtx, chk) return false, err } @@ -346,27 +346,26 @@ func getRestoreData(tblInfo *model.TableInfo, targetIdx, pkIdx *model.IndexInfo, return dtToRestored } -func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) { +func buildDAGPB(exprCtx exprctx.BuildContext, distSQLCtx *distsqlctx.DistSQLContext, pushDownFlags uint64, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} - dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location()) - sc := sCtx.GetSessionVars().StmtCtx - dagReq.Flags = sc.PushDownFlags() + dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(exprCtx.GetEvalCtx().Location()) + dagReq.Flags = pushDownFlags for i := range colInfos { dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i)) } - execPB, err := constructTableScanPB(sCtx, tblInfo, colInfos) + execPB, err := constructTableScanPB(exprCtx, tblInfo, colInfos) if err != nil { return nil, err } dagReq.Executors = append(dagReq.Executors, execPB) - distsql.SetEncodeType(sCtx.GetDistSQLCtx(), dagReq) + distsql.SetEncodeType(distSQLCtx, dagReq) return dagReq, nil } -func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) { +func constructTableScanPB(ctx exprctx.BuildContext, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) { tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos) tblScan.TableId = tblInfo.ID - err := tables.SetPBColumnsDefaultValue(sCtx.GetExprCtx(), tblScan.Columns, colInfos) + err := tables.SetPBColumnsDefaultValue(ctx, tblScan.Columns, colInfos) return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err } diff --git a/pkg/ddl/index_cop_test.go b/pkg/ddl/index_cop_test.go index 6b33e3bb5037e..807271ebb3b4a 100644 --- a/pkg/ddl/index_cop_test.go +++ b/pkg/ddl/index_cop_test.go @@ -40,8 +40,11 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { require.NoError(t, err) tblInfo := tbl.Meta() idxInfo := tblInfo.FindIndexByName(idx) - copCtx, err := copr.NewCopContextSingleIndex(tblInfo, idxInfo, tk.Session(), "") + + sctx := tk.Session() + copCtx, err := ddl.NewReorgCopContext(store, ddl.NewDDLReorgMeta(sctx), tblInfo, []*model.IndexInfo{idxInfo}, "") require.NoError(t, err) + require.IsType(t, copCtx, &copr.CopContextSingleIndex{}) startKey := tbl.RecordPrefix() endKey := startKey.PrefixNext() txn, err := store.Begin() diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index eb7eafaaa625c..130cfc673147c 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -189,7 +189,6 @@ func TestBackfillOperatorPipeline(t *testing.T) { sessPool, mockBackendCtx, []ingest.Engine{mockEngine}, - tk.Session(), 1, // job id tbl.(table.PhysicalTable), []*model.IndexInfo{idxInfo}, @@ -286,7 +285,6 @@ func TestBackfillOperatorPipelineException(t *testing.T) { sessPool, mockBackendCtx, []ingest.Engine{mockEngine}, - tk.Session(), 1, // job id tbl.(table.PhysicalTable), []*model.IndexInfo{idxInfo}, @@ -341,8 +339,10 @@ func prepare(t *testing.T, tk *testkit.TestKit, dom *domain.Domain, regionCnt in tblInfo := tbl.Meta() idxInfo = tblInfo.FindIndexByName("idx") - copCtx, err = copr.NewCopContextSingleIndex(tblInfo, idxInfo, tk.Session(), "") + sctx := tk.Session() + copCtx, err = ddl.NewReorgCopContext(dom.Store(), ddl.NewDDLReorgMeta(sctx), tblInfo, []*model.IndexInfo{idxInfo}, "") require.NoError(t, err) + require.IsType(t, copCtx, &copr.CopContextSingleIndex{}) return tbl, idxInfo, start, end, copCtx }