Skip to content

Commit ea942ff

Browse files
committed
ddl: remove mock.Context usage in addIndexIngestWorker
1 parent 6928519 commit ea942ff

9 files changed

+86
-89
lines changed

pkg/ddl/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ go_test(
276276
"//pkg/disttask/framework/storage",
277277
"//pkg/domain",
278278
"//pkg/domain/infosync",
279+
"//pkg/errctx",
279280
"//pkg/errno",
280281
"//pkg/executor",
281282
"//pkg/infoschema",
@@ -296,7 +297,6 @@ go_test(
296297
"//pkg/session",
297298
"//pkg/session/types",
298299
"//pkg/sessionctx",
299-
"//pkg/sessionctx/stmtctx",
300300
"//pkg/sessionctx/variable",
301301
"//pkg/sessiontxn",
302302
"//pkg/store/gcworker",

pkg/ddl/backfilling.go

-19
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"github.com/pingcap/tidb/pkg/util"
4343
"github.com/pingcap/tidb/pkg/util/dbterror"
4444
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
45-
"github.com/pingcap/tidb/pkg/util/timeutil"
4645
"github.com/pingcap/tidb/pkg/util/topsql"
4746
"github.com/prometheus/client_golang/prometheus"
4847
"github.com/tikv/client-go/v2/tikv"
@@ -543,24 +542,6 @@ func makeupDecodeColMap(dbName model.CIStr, t table.Table) (map[int64]decoder.Co
543542
return decodeColMap, nil
544543
}
545544

546-
func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocation) error {
547-
// It is set to SystemLocation to be compatible with nil LocationInfo.
548-
tz := *timeutil.SystemLocation()
549-
if sctx.GetSessionVars().TimeZone == nil {
550-
sctx.GetSessionVars().TimeZone = &tz
551-
} else {
552-
*sctx.GetSessionVars().TimeZone = tz
553-
}
554-
if tzLocation != nil {
555-
loc, err := tzLocation.GetLocation()
556-
if err != nil {
557-
return errors.Trace(err)
558-
}
559-
*sctx.GetSessionVars().TimeZone = *loc
560-
}
561-
return nil
562-
}
563-
564545
var backfillTaskChanSize = 128
565546

566547
// SetBackfillTaskChanSizeForTest is only used for test.

pkg/ddl/backfilling_operators.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -714,10 +714,7 @@ func (w *indexIngestBaseWorker) initSessCtx() {
714714
return
715715
}
716716
w.restore = restoreSessCtx(sessCtx)
717-
if err := initSessCtx(sessCtx,
718-
w.reorgMeta.SQLMode,
719-
w.reorgMeta.Location,
720-
w.reorgMeta.ResourceGroupName); err != nil {
717+
if err := initSessCtx(sessCtx, w.reorgMeta); err != nil {
721718
w.ctx.onError(err)
722719
return
723720
}
@@ -755,7 +752,8 @@ func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nex
755752

756753
oprStartTime := time.Now()
757754
vars := w.se.GetSessionVars()
758-
cnt, lastHandle, err := writeChunkToLocal(w.ctx, w.writers, w.indexes, w.copCtx, vars, rs.Chunk)
755+
sc := vars.StmtCtx
756+
cnt, lastHandle, err := writeChunkToLocal(w.ctx, w.writers, w.indexes, w.copCtx, sc.TimeZone(), sc.ErrCtx(), vars.GetWriteStmtBufs(), rs.Chunk)
759757
if err != nil || cnt == 0 {
760758
return 0, nil, err
761759
}

pkg/ddl/backfilling_read_index.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
105105
return err
106106
}
107107

108-
sessCtx, err := newSessCtx(
109-
r.d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName)
108+
sessCtx, err := newSessCtx(r.d.store, r.job.ReorgMeta)
110109
if err != nil {
111110
return err
112111
}

pkg/ddl/backfilling_scheduler.go

+16-39
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,14 @@ import (
2626
"github.com/pingcap/tidb/pkg/ddl/ingest"
2727
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
2828
ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil"
29-
"github.com/pingcap/tidb/pkg/errctx"
3029
"github.com/pingcap/tidb/pkg/kv"
3130
"github.com/pingcap/tidb/pkg/metrics"
3231
"github.com/pingcap/tidb/pkg/parser/model"
33-
"github.com/pingcap/tidb/pkg/parser/mysql"
3432
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
3533
poolutil "github.com/pingcap/tidb/pkg/resourcemanager/util"
3634
"github.com/pingcap/tidb/pkg/sessionctx"
3735
"github.com/pingcap/tidb/pkg/sessionctx/variable"
3836
"github.com/pingcap/tidb/pkg/table"
39-
"github.com/pingcap/tidb/pkg/types"
4037
"github.com/pingcap/tidb/pkg/util"
4138
"github.com/pingcap/tidb/pkg/util/dbterror"
4239
"github.com/pingcap/tidb/pkg/util/intest"
@@ -135,26 +132,16 @@ func (b *txnBackfillScheduler) resultChan() <-chan *backfillResult {
135132
return b.resultCh
136133
}
137134

138-
func newSessCtx(
139-
store kv.Storage,
140-
sqlMode mysql.SQLMode,
141-
tzLocation *model.TimeZoneLocation,
142-
resourceGroupName string,
143-
) (sessionctx.Context, error) {
135+
func newSessCtx(store kv.Storage, reorgMeta *model.DDLReorgMeta) (sessionctx.Context, error) {
144136
sessCtx := newReorgSessCtx(store)
145-
if err := initSessCtx(sessCtx, sqlMode, tzLocation, resourceGroupName); err != nil {
137+
if err := initSessCtx(sessCtx, reorgMeta); err != nil {
146138
return nil, errors.Trace(err)
147139
}
148140
return sessCtx, nil
149141
}
150142

151143
// initSessCtx initializes the session context. Be careful to the timezone.
152-
func initSessCtx(
153-
sessCtx sessionctx.Context,
154-
sqlMode mysql.SQLMode,
155-
tzLocation *model.TimeZoneLocation,
156-
resGroupName string,
157-
) error {
144+
func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) error {
158145
// Correct the initial timezone.
159146
tz := *time.UTC
160147
sessCtx.GetSessionVars().TimeZone = &tz
@@ -164,25 +151,21 @@ func initSessCtx(
164151
rowFormat := variable.GetDDLReorgRowFormat()
165152
sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != variable.DefTiDBRowFormatV1
166153
// Simulate the sql mode environment in the worker sessionCtx.
154+
sqlMode := reorgMeta.SQLMode
167155
sessCtx.GetSessionVars().SQLMode = sqlMode
168-
if err := setSessCtxLocation(sessCtx, tzLocation); err != nil {
156+
loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location)
157+
if err != nil {
169158
return errors.Trace(err)
170159
}
171-
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(sessCtx.GetSessionVars().Location())
160+
sessCtx.GetSessionVars().TimeZone = loc
161+
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(loc)
172162

173-
errLevels := sessCtx.GetSessionVars().StmtCtx.ErrLevels()
174-
errLevels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !sqlMode.HasStrictMode())
175-
errLevels[errctx.ErrGroupDividedByZero] =
176-
errctx.ResolveErrLevel(!sqlMode.HasErrorForDivisionByZeroMode(), !sqlMode.HasStrictMode())
163+
errLevels := reorgErrLevelsWithSQLMode(sqlMode)
177164
sessCtx.GetSessionVars().StmtCtx.SetErrLevels(errLevels)
178165

179-
typeFlags := types.StrictFlags.
180-
WithTruncateAsWarning(!sqlMode.HasStrictMode()).
181-
WithIgnoreInvalidDateErr(sqlMode.HasAllowInvalidDatesMode()).
182-
WithIgnoreZeroInDate(!sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode()).
183-
WithCastTimeToYearThroughConcat(true)
166+
typeFlags := reorgTypeFlagsWithSQLMode(sqlMode)
184167
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags)
185-
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = resGroupName
168+
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName
186169

187170
// Prevent initializing the mock context in the workers concurrently.
188171
// For details, see https://github.com/pingcap/tidb/issues/40879.
@@ -235,7 +218,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
235218
workerCnt := b.expectedWorkerSize()
236219
// Increase the worker.
237220
for i := len(b.workers); i < workerCnt; i++ {
238-
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
221+
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta)
239222
if err != nil {
240223
return err
241224
}
@@ -472,16 +455,10 @@ func (b *ingestBackfillScheduler) createWorker(
472455
) workerpool.Worker[IndexRecordChunk, workerpool.None] {
473456
reorgInfo := b.reorgInfo
474457
job := reorgInfo.Job
475-
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
476-
if err != nil {
477-
b.sendResult(&backfillResult{err: err})
478-
return nil
479-
}
480-
481458
worker, err := newAddIndexIngestWorker(
482-
b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID,
483-
reorgInfo.SchemaName, indexIDs, b.writerMaxID,
484-
b.copReqSenderPool, sessCtx, b.checkpointMgr)
459+
b.ctx, b.tbl, reorgInfo, engines, b.resultCh, job.ID,
460+
indexIDs, b.writerMaxID,
461+
b.copReqSenderPool, b.checkpointMgr)
485462
if err != nil {
486463
// Return an error only if it is the first worker.
487464
if b.writerMaxID == 0 {
@@ -508,7 +485,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
508485
}
509486
allIndexInfos = append(allIndexInfos, indexInfo)
510487
}
511-
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location, ri.ReorgMeta.ResourceGroupName)
488+
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta)
512489
if err != nil {
513490
logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err))
514491
return nil, err

pkg/ddl/export_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"github.com/ngaut/pools"
2222
"github.com/pingcap/tidb/pkg/ddl/copr"
2323
"github.com/pingcap/tidb/pkg/ddl/internal/session"
24+
"github.com/pingcap/tidb/pkg/errctx"
2425
"github.com/pingcap/tidb/pkg/kv"
25-
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
2626
"github.com/pingcap/tidb/pkg/sessionctx/variable"
2727
"github.com/pingcap/tidb/pkg/table"
2828
"github.com/pingcap/tidb/pkg/types"
@@ -72,7 +72,7 @@ func ConvertRowToHandleAndIndexDatum(
7272
c := copCtx.GetBase()
7373
idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
7474
handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
75-
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, stmtctx.NewStmtCtxWithTimeZone(time.Local))
75+
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, time.Local, errctx.StrictNoWarningContext)
7676
return handle, idxData, err
7777
}
7878

pkg/ddl/index.go

+30-17
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
4040
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
4141
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
42+
"github.com/pingcap/tidb/pkg/errctx"
4243
"github.com/pingcap/tidb/pkg/infoschema"
4344
"github.com/pingcap/tidb/pkg/kv"
4445
"github.com/pingcap/tidb/pkg/lightning/common"
@@ -50,7 +51,6 @@ import (
5051
"github.com/pingcap/tidb/pkg/parser/mysql"
5152
"github.com/pingcap/tidb/pkg/parser/terror"
5253
"github.com/pingcap/tidb/pkg/sessionctx"
53-
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
5454
"github.com/pingcap/tidb/pkg/sessionctx/variable"
5555
"github.com/pingcap/tidb/pkg/store/helper"
5656
"github.com/pingcap/tidb/pkg/table"
@@ -61,6 +61,7 @@ import (
6161
"github.com/pingcap/tidb/pkg/util/backoff"
6262
"github.com/pingcap/tidb/pkg/util/chunk"
6363
"github.com/pingcap/tidb/pkg/util/codec"
64+
contextutil "github.com/pingcap/tidb/pkg/util/context"
6465
"github.com/pingcap/tidb/pkg/util/dbterror"
6566
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
6667
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
@@ -1651,7 +1652,9 @@ type addIndexIngestWorker struct {
16511652
ctx context.Context
16521653
d *ddlCtx
16531654
metricCounter prometheus.Counter
1654-
sessCtx sessionctx.Context
1655+
writeLoc *time.Location
1656+
writeErrCtx errctx.Context
1657+
writeStmtBufs variable.WriteStmtBufs
16551658

16561659
tbl table.PhysicalTable
16571660
indexes []table.Index
@@ -1666,17 +1669,20 @@ type addIndexIngestWorker struct {
16661669
func newAddIndexIngestWorker(
16671670
ctx context.Context,
16681671
t table.PhysicalTable,
1669-
d *ddlCtx,
1672+
info *reorgInfo,
16701673
engines []ingest.Engine,
16711674
resultCh chan *backfillResult,
16721675
jobID int64,
1673-
schemaName string,
16741676
indexIDs []int64,
16751677
writerID int,
16761678
copReqSenderPool *copReqSenderPool,
1677-
sessCtx sessionctx.Context,
16781679
checkpointMgr *ingest.CheckpointManager,
16791680
) (*addIndexIngestWorker, error) {
1681+
writeLoc, err := reorgTimeZoneWithTzLoc(info.ReorgMeta.Location)
1682+
if err != nil {
1683+
return nil, err
1684+
}
1685+
16801686
indexes := make([]table.Index, 0, len(indexIDs))
16811687
writers := make([]ingest.Writer, 0, len(indexIDs))
16821688
for i, indexID := range indexIDs {
@@ -1690,12 +1696,18 @@ func newAddIndexIngestWorker(
16901696
writers = append(writers, lw)
16911697
}
16921698

1699+
writeErrCtx := errctx.NewContextWithLevels(
1700+
reorgErrLevelsWithSQLMode(info.ReorgMeta.SQLMode),
1701+
contextutil.IgnoreWarn,
1702+
)
1703+
16931704
return &addIndexIngestWorker{
1694-
ctx: ctx,
1695-
d: d,
1696-
sessCtx: sessCtx,
1705+
ctx: ctx,
1706+
d: info.d,
1707+
writeLoc: writeLoc,
1708+
writeErrCtx: writeErrCtx,
16971709
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
1698-
metrics.GenerateReorgLabel("add_idx_rate", schemaName, t.Meta().Name.O)),
1710+
metrics.GenerateReorgLabel("add_idx_rate", info.SchemaName, t.Meta().Name.O)),
16991711
tbl: t,
17001712
indexes: indexes,
17011713
writers: writers,
@@ -1710,9 +1722,8 @@ func newAddIndexIngestWorker(
17101722
func (w *addIndexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
17111723
oprStartTime := time.Now()
17121724
copCtx := w.copReqSenderPool.copCtx
1713-
vars := w.sessCtx.GetSessionVars()
17141725
cnt, lastHandle, err := writeChunkToLocal(
1715-
w.ctx, w.writers, w.indexes, copCtx, vars, rs.Chunk)
1726+
w.ctx, w.writers, w.indexes, copCtx, w.writeLoc, w.writeErrCtx, &w.writeStmtBufs, rs.Chunk)
17161727
if err != nil || cnt == 0 {
17171728
return 0, nil, err
17181729
}
@@ -1727,10 +1738,11 @@ func writeChunkToLocal(
17271738
writers []ingest.Writer,
17281739
indexes []table.Index,
17291740
copCtx copr.CopContext,
1730-
vars *variable.SessionVars,
1741+
loc *time.Location,
1742+
errCtx errctx.Context,
1743+
writeStmtBufs *variable.WriteStmtBufs,
17311744
copChunk *chunk.Chunk,
17321745
) (int, kv.Handle, error) {
1733-
sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs()
17341746
iter := chunk.NewIterator4Chunk(copChunk)
17351747
c := copCtx.GetBase()
17361748

@@ -1772,7 +1784,7 @@ func writeChunkToLocal(
17721784
restoreDataBuf[i] = *datum.Clone()
17731785
}
17741786
}
1775-
h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, sCtx)
1787+
h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, loc, errCtx)
17761788
if err != nil {
17771789
return 0, nil, errors.Trace(err)
17781790
}
@@ -1785,7 +1797,7 @@ func writeChunkToLocal(
17851797
if needRestoreForIndexes[i] {
17861798
rsData = getRestoreData(c.TableInfo, copCtx.IndexInfo(idxID), c.PrimaryKeyInfo, restoreDataBuf)
17871799
}
1788-
err = writeOneKVToLocal(ctx, writers[i], index, sCtx, writeBufs, idxData, rsData, h)
1800+
err = writeOneKVToLocal(ctx, writers[i], index, loc, errCtx, writeStmtBufs, idxData, rsData, h)
17891801
if err != nil {
17901802
return 0, nil, errors.Trace(err)
17911803
}
@@ -1811,12 +1823,13 @@ func writeOneKVToLocal(
18111823
ctx context.Context,
18121824
writer ingest.Writer,
18131825
index table.Index,
1814-
sCtx *stmtctx.StatementContext,
1826+
loc *time.Location,
1827+
errCtx errctx.Context,
18151828
writeBufs *variable.WriteStmtBufs,
18161829
idxDt, rsData []types.Datum,
18171830
handle kv.Handle,
18181831
) error {
1819-
iter := index.GenIndexKVIter(sCtx.ErrCtx(), sCtx.TimeZone(), idxDt, handle, rsData)
1832+
iter := index.GenIndexKVIter(errCtx, loc, idxDt, handle, rsData)
18201833
for iter.Valid() {
18211834
key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf, writeBufs.RowValBuf)
18221835
if err != nil {

pkg/ddl/index_cop.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ import (
2626
"github.com/pingcap/tidb/pkg/ddl/ingest"
2727
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
2828
"github.com/pingcap/tidb/pkg/distsql"
29+
"github.com/pingcap/tidb/pkg/errctx"
2930
"github.com/pingcap/tidb/pkg/expression"
3031
"github.com/pingcap/tidb/pkg/kv"
3132
"github.com/pingcap/tidb/pkg/metrics"
3233
"github.com/pingcap/tidb/pkg/parser/model"
3334
"github.com/pingcap/tidb/pkg/parser/terror"
3435
"github.com/pingcap/tidb/pkg/sessionctx"
35-
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
3636
"github.com/pingcap/tidb/pkg/sessionctx/variable"
3737
"github.com/pingcap/tidb/pkg/table"
3838
"github.com/pingcap/tidb/pkg/table/tables"
@@ -379,11 +379,11 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C
379379
}
380380

381381
func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
382-
pkInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) {
382+
pkInfo *model.IndexInfo, loc *time.Location, errCtx errctx.Context) (kv.Handle, error) {
383383
if tblInfo.IsCommonHandle {
384384
tablecodec.TruncateIndexValues(tblInfo, pkInfo, pkDts)
385-
handleBytes, err := codec.EncodeKey(stmtCtx.TimeZone(), nil, pkDts...)
386-
err = stmtCtx.HandleError(err)
385+
handleBytes, err := codec.EncodeKey(loc, nil, pkDts...)
386+
err = errCtx.HandleError(err)
387387
if err != nil {
388388
return nil, err
389389
}

0 commit comments

Comments
 (0)