@@ -20,13 +20,15 @@ import (
20
20
"context"
21
21
"encoding/hex"
22
22
"encoding/json"
23
+ goerrors "errors"
23
24
"fmt"
24
25
"os"
25
26
"slices"
26
27
"strings"
27
28
"sync/atomic"
28
29
"time"
29
30
31
+ "github.com/docker/go-units"
30
32
"github.com/pingcap/errors"
31
33
"github.com/pingcap/failpoint"
32
34
"github.com/pingcap/kvproto/pkg/kvrpcpb"
@@ -36,6 +38,7 @@ import (
36
38
"github.com/pingcap/tidb/pkg/ddl/logutil"
37
39
"github.com/pingcap/tidb/pkg/ddl/notifier"
38
40
sess "github.com/pingcap/tidb/pkg/ddl/session"
41
+ "github.com/pingcap/tidb/pkg/ddl/systable"
39
42
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
40
43
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
41
44
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
@@ -1523,7 +1526,7 @@ func runReorgJobAndHandleErr(
1523
1526
func () {
1524
1527
addIndexErr = dbterror .ErrCancelledDDLJob .GenWithStack ("add table `%v` index `%v` panic" , tbl .Meta ().Name , allIndexInfos [0 ].Name )
1525
1528
}, false )
1526
- return w .addTableIndex (jobCtx . stepCtx , tbl , reorgInfo )
1529
+ return w .addTableIndex (jobCtx , tbl , reorgInfo )
1527
1530
})
1528
1531
if err != nil {
1529
1532
if dbterror .ErrPausedDDLJob .Equal (err ) {
@@ -2386,14 +2389,15 @@ func (w *worker) addPhysicalTableIndex(
2386
2389
2387
2390
// addTableIndex handles the add index reorganization state for a table.
2388
2391
func (w * worker ) addTableIndex (
2389
- ctx context. Context ,
2392
+ jobCtx * jobContext ,
2390
2393
t table.Table ,
2391
2394
reorgInfo * reorgInfo ,
2392
2395
) error {
2396
+ ctx := jobCtx .stepCtx
2393
2397
// TODO: Support typeAddIndexMergeTmpWorker.
2394
2398
if reorgInfo .ReorgMeta .IsDistReorg && ! reorgInfo .mergingTmpIdx {
2395
2399
if reorgInfo .ReorgMeta .ReorgTp == model .ReorgTypeLitMerge {
2396
- err := w .executeDistTask (ctx , t , reorgInfo )
2400
+ err := w .executeDistTask (jobCtx , t , reorgInfo )
2397
2401
if err != nil {
2398
2402
return err
2399
2403
}
@@ -2484,11 +2488,12 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
2484
2488
return nil
2485
2489
}
2486
2490
2487
- func (w * worker ) executeDistTask (stepCtx context. Context , t table.Table , reorgInfo * reorgInfo ) error {
2491
+ func (w * worker ) executeDistTask (jobCtx * jobContext , t table.Table , reorgInfo * reorgInfo ) error {
2488
2492
if reorgInfo .mergingTmpIdx {
2489
2493
return errors .New ("do not support merge index" )
2490
2494
}
2491
2495
2496
+ stepCtx := jobCtx .stepCtx
2492
2497
taskType := proto .Backfill
2493
2498
taskKey := fmt .Sprintf ("ddl/%s/%d" , taskType , reorgInfo .Job .ID )
2494
2499
g , ctx := errgroup .WithContext (w .workCtx )
@@ -2511,19 +2516,32 @@ func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgIn
2511
2516
return err
2512
2517
}
2513
2518
task , err := taskManager .GetTaskByKeyWithHistory (w .workCtx , taskKey )
2514
- if err != nil && err != storage .ErrTaskNotFound {
2519
+ if err != nil && ! goerrors . Is ( err , storage .ErrTaskNotFound ) {
2515
2520
return err
2516
2521
}
2522
+
2523
+ var (
2524
+ taskID int64
2525
+ lastConcurrency , lastBatchSize , lastMaxWriteSpeed int
2526
+ )
2517
2527
if task != nil {
2518
2528
// It's possible that the task state is succeed but the ddl job is paused.
2519
- // When task in succeed state, we can skip the dist task execution/scheduing process.
2529
+ // When task in succeed state, we can skip the dist task execution/scheduling process.
2520
2530
if task .State == proto .TaskStateSucceed {
2521
2531
w .updateDistTaskRowCount (taskKey , reorgInfo .Job .ID )
2522
2532
logutil .DDLLogger ().Info (
2523
2533
"task succeed, start to resume the ddl job" ,
2524
2534
zap .String ("task-key" , taskKey ))
2525
2535
return nil
2526
2536
}
2537
+ taskMeta := & BackfillTaskMeta {}
2538
+ if err := json .Unmarshal (task .Meta , taskMeta ); err != nil {
2539
+ return errors .Trace (err )
2540
+ }
2541
+ taskID = task .ID
2542
+ lastConcurrency = task .Concurrency
2543
+ lastBatchSize = taskMeta .Job .ReorgMeta .GetBatchSize ()
2544
+ lastMaxWriteSpeed = taskMeta .Job .ReorgMeta .GetMaxWriteSpeed ()
2527
2545
g .Go (func () error {
2528
2546
defer close (done )
2529
2547
backoffer := backoff .NewExponential (scheduler .RetrySQLInterval , 2 , scheduler .RetrySQLMaxInterval )
@@ -2547,11 +2565,10 @@ func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgIn
2547
2565
} else {
2548
2566
job := reorgInfo .Job
2549
2567
workerCntLimit := job .ReorgMeta .GetConcurrency ()
2550
- cpuCount , err := handle . GetCPUCountOfNode (ctx )
2568
+ concurrency , err := adjustConcurrency (ctx , taskManager , workerCntLimit )
2551
2569
if err != nil {
2552
2570
return err
2553
2571
}
2554
- concurrency := min (workerCntLimit , cpuCount )
2555
2572
logutil .DDLLogger ().Info ("adjusted add-index task concurrency" ,
2556
2573
zap .Int ("worker-cnt" , workerCntLimit ), zap .Int ("task-concurrency" , concurrency ),
2557
2574
zap .String ("task-key" , taskKey ))
@@ -2569,9 +2586,19 @@ func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgIn
2569
2586
return err
2570
2587
}
2571
2588
2589
+ task , err := handle .SubmitTask (ctx , taskKey , taskType , concurrency , reorgInfo .ReorgMeta .TargetScope , metaData )
2590
+ if err != nil {
2591
+ return err
2592
+ }
2593
+
2594
+ taskID = task .ID
2595
+ lastConcurrency = concurrency
2596
+ lastBatchSize = taskMeta .Job .ReorgMeta .GetBatchSize ()
2597
+ lastMaxWriteSpeed = taskMeta .Job .ReorgMeta .GetMaxWriteSpeed ()
2598
+
2572
2599
g .Go (func () error {
2573
2600
defer close (done )
2574
- err := submitAndWaitTask (ctx , taskKey , taskType , concurrency , reorgInfo . ReorgMeta . TargetScope , metaData )
2601
+ err := handle . WaitTaskDoneOrPaused (ctx , task . ID )
2575
2602
failpoint .InjectCall ("pauseAfterDistTaskFinished" )
2576
2603
if err := w .isReorgRunnable (stepCtx , true ); err != nil {
2577
2604
if dbterror .ErrPausedDDLJob .Equal (err ) {
@@ -2616,10 +2643,125 @@ func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgIn
2616
2643
}
2617
2644
}
2618
2645
})
2646
+
2647
+ g .Go (func () error {
2648
+ modifyTaskParamLoop (ctx , jobCtx .sysTblMgr , taskManager , done ,
2649
+ reorgInfo .Job .ID , taskID , lastConcurrency , lastBatchSize , lastMaxWriteSpeed )
2650
+ return nil
2651
+ })
2652
+
2619
2653
err = g .Wait ()
2620
2654
return err
2621
2655
}
2622
2656
2657
+ // Note: we can achieve the same effect by calling ModifyTaskByID directly inside
2658
+ // the process of 'ADMIN ALTER DDL JOB xxx', so we can eliminate the goroutine,
2659
+ // but if the task hasn't been created we need to make sure the task is created
2660
+ // with config after ALTER DDL JOB is executed. A possible solution is to make
2661
+ // the DXF task submission and 'ADMIN ALTER DDL JOB xxx' txn conflict with each
2662
+ // other when they overlap in time, by modify the job at the same time when submit
2663
+ // task, as we are using optimistic txn. But this will cause WRITE CONFLICT with
2664
+ // outer txn in transitOneJobStep.
2665
+ func modifyTaskParamLoop (
2666
+ ctx context.Context ,
2667
+ sysTblMgr systable.Manager ,
2668
+ taskManager storage.Manager ,
2669
+ done chan struct {},
2670
+ jobID , taskID int64 ,
2671
+ lastConcurrency , lastBatchSize , lastMaxWriteSpeed int ,
2672
+ ) {
2673
+ logger := logutil .DDLLogger ().With (zap .Int64 ("jobId" , jobID ), zap .Int64 ("taskId" , taskID ))
2674
+ ticker := time .NewTicker (UpdateDDLJobReorgCfgInterval )
2675
+ defer ticker .Stop ()
2676
+ for {
2677
+ select {
2678
+ case <- done :
2679
+ return
2680
+ case <- ticker .C :
2681
+ }
2682
+
2683
+ latestJob , err := sysTblMgr .GetJobByID (ctx , jobID )
2684
+ if err != nil {
2685
+ if goerrors .Is (err , systable .ErrNotFound ) {
2686
+ logger .Info ("job not found, might already finished" )
2687
+ return
2688
+ }
2689
+ logger .Error ("get job failed, will retry later" , zap .Error (err ))
2690
+ continue
2691
+ }
2692
+
2693
+ modifies := make ([]proto.Modification , 0 , 3 )
2694
+ workerCntLimit := latestJob .ReorgMeta .GetConcurrency ()
2695
+ concurrency , err := adjustConcurrency (ctx , taskManager , workerCntLimit )
2696
+ if err != nil {
2697
+ logger .Error ("adjust concurrency failed" , zap .Error (err ))
2698
+ continue
2699
+ }
2700
+ if concurrency != lastConcurrency {
2701
+ modifies = append (modifies , proto.Modification {
2702
+ Type : proto .ModifyConcurrency ,
2703
+ To : int64 (concurrency ),
2704
+ })
2705
+ }
2706
+ batchSize := latestJob .ReorgMeta .GetBatchSize ()
2707
+ if batchSize != lastBatchSize {
2708
+ modifies = append (modifies , proto.Modification {
2709
+ Type : proto .ModifyBatchSize ,
2710
+ To : int64 (batchSize ),
2711
+ })
2712
+ }
2713
+ maxWriteSpeed := latestJob .ReorgMeta .GetMaxWriteSpeed ()
2714
+ if maxWriteSpeed != lastMaxWriteSpeed {
2715
+ modifies = append (modifies , proto.Modification {
2716
+ Type : proto .ModifyMaxWriteSpeed ,
2717
+ To : int64 (maxWriteSpeed ),
2718
+ })
2719
+ }
2720
+ if len (modifies ) == 0 {
2721
+ continue
2722
+ }
2723
+ currTask , err := taskManager .GetTaskByID (ctx , taskID )
2724
+ if err != nil {
2725
+ if goerrors .Is (err , storage .ErrTaskNotFound ) {
2726
+ logger .Info ("task not found, might already finished" )
2727
+ return
2728
+ }
2729
+ logger .Error ("get task failed, will retry later" , zap .Error (err ))
2730
+ continue
2731
+ }
2732
+ if ! currTask .State .CanMoveToModifying () {
2733
+ // user might modify param again while another modify is ongoing.
2734
+ logger .Info ("task state is not suitable for modifying, will retry later" ,
2735
+ zap .String ("state" , currTask .State .String ()))
2736
+ continue
2737
+ }
2738
+ if err = taskManager .ModifyTaskByID (ctx , taskID , & proto.ModifyParam {
2739
+ PrevState : currTask .State ,
2740
+ Modifications : modifies ,
2741
+ }); err != nil {
2742
+ logger .Error ("modify task failed" , zap .Error (err ))
2743
+ continue
2744
+ }
2745
+ logger .Info ("modify task success" ,
2746
+ zap .Int ("oldConcurrency" , lastConcurrency ), zap .Int ("newConcurrency" , concurrency ),
2747
+ zap .Int ("oldBatchSize" , lastBatchSize ), zap .Int ("newBatchSize" , batchSize ),
2748
+ zap .String ("oldMaxWriteSpeed" , units .HumanSize (float64 (lastMaxWriteSpeed ))),
2749
+ zap .String ("newMaxWriteSpeed" , units .HumanSize (float64 (maxWriteSpeed ))),
2750
+ )
2751
+ lastConcurrency = concurrency
2752
+ lastBatchSize = batchSize
2753
+ lastMaxWriteSpeed = maxWriteSpeed
2754
+ }
2755
+ }
2756
+
2757
+ func adjustConcurrency (ctx context.Context , taskMgr storage.Manager , workerCnt int ) (int , error ) {
2758
+ cpuCount , err := taskMgr .GetCPUCountOfNode (ctx )
2759
+ if err != nil {
2760
+ return 0 , err
2761
+ }
2762
+ return min (workerCnt , cpuCount ), nil
2763
+ }
2764
+
2623
2765
// EstimateTableRowSizeForTest is used for test.
2624
2766
var EstimateTableRowSizeForTest = estimateTableRowSize
2625
2767
0 commit comments