Skip to content

Commit

Permalink
disttask: use merge sort meta (#47599)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
ywqzzy authored Oct 16, 2023
1 parent 34bc3a1 commit eafb78a
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 48 deletions.
62 changes: 53 additions & 9 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -85,17 +86,29 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch(
return nil, err
}

// StepOne: read index and write to backend.
// StepTwo: do merge sort to reduce the global sort reader reads files count. Only used in global sort.
// StepThree: ingest data.
// TODO: use planner.
switch step {
case proto.StepOne:
if tblInfo.Partition != nil {
return generatePartitionPlan(tblInfo)
}
return generateNonPartitionPlan(h.d, tblInfo, job)
case proto.StepTwo:
gTaskMeta.UseMergeSort = true
if err := updateMeta(gTask, &gTaskMeta); err != nil {
return nil, err
}
return generateMergePlan(taskHandle, gTask)
case proto.StepThree:
if useExtStore {
return generateMergeSortPlan(ctx, taskHandle, gTask, job.ID, gTaskMeta.CloudStorageURI)
prevStep := proto.StepOne
if gTaskMeta.UseMergeSort {
prevStep = proto.StepTwo
}
return generateGlobalSortIngestPlan(ctx, taskHandle, gTask, job.ID, gTaskMeta.CloudStorageURI, prevStep)
}
if tblInfo.Partition != nil {
return nil, nil
Expand All @@ -106,6 +119,15 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch(
}
}

func updateMeta(gTask *proto.Task, taskMeta *BackfillGlobalMeta) error {
bs, err := json.Marshal(taskMeta)
if err != nil {
return errors.Trace(err)
}
gTask.Meta = bs
return nil
}

func (*backfillingDispatcherExt) GetNextStep(
taskHandle dispatcher.TaskHandle,
task *proto.Task,
Expand All @@ -118,11 +140,24 @@ func (*backfillingDispatcherExt) GetNextStep(
if taskHandle == nil {
return proto.StepThree
}

var meta BackfillGlobalMeta
if err := json.Unmarshal(task.Meta, &meta); err != nil {
logutil.BgLogger().Info(
"unmarshal task meta met error",
zap.String("category", "ddl"),
zap.Error(err))
}
// don't need merge step in local backend.
if len(meta.CloudStorageURI) == 0 {
return proto.StepThree
}

// if data files overlaps too much, we need a merge step.
subTaskMetas, err := taskHandle.GetPreviousSubtaskMetas(task.ID, proto.StepInit)
if err != nil {
// TODO(lance6716): should we return error?
return proto.StepTwo
return proto.StepThree
}
multiStats := make([]external.MultipleFilesStat, 0, 100)
for _, bs := range subTaskMetas {
Expand All @@ -134,10 +169,10 @@ func (*backfillingDispatcherExt) GetNextStep(
}
multiStats = append(multiStats, subtask.MultipleFilesStats...)
}
if external.GetMaxOverlappingTotal(multiStats) > external.MergeSortOverlapThreshold {
return proto.StepTwo
if skipMergeSort(multiStats) {
return proto.StepThree
}
return proto.StepThree
return proto.StepTwo
case proto.StepTwo:
return proto.StepThree
default:
Expand All @@ -146,6 +181,13 @@ func (*backfillingDispatcherExt) GetNextStep(
}
}

func skipMergeSort(stats []external.MultipleFilesStat) bool {
failpoint.Inject("forceMergeSort", func() {
failpoint.Return(false)
})
return external.GetMaxOverlappingTotal(stats) <= external.MergeSortOverlapThreshold
}

// OnErrStage generate error handling stage's plan.
func (*backfillingDispatcherExt) OnErrStage(_ context.Context, _ dispatcher.TaskHandle, task *proto.Task, receiveErr []error) (meta []byte, err error) {
// We do not need extra meta info when rolling back
Expand Down Expand Up @@ -312,14 +354,15 @@ func generateIngestTaskPlan(
return subTaskMetas, nil
}

func generateMergeSortPlan(
func generateGlobalSortIngestPlan(
ctx context.Context,
taskHandle dispatcher.TaskHandle,
task *proto.Task,
jobID int64,
cloudStorageURI string,
step proto.Step,
) ([][]byte, error) {
firstKey, lastKey, totalSize, dataFiles, statFiles, err := getSummaryFromLastStep(taskHandle, task.ID)
firstKey, lastKey, totalSize, dataFiles, statFiles, err := getSummaryFromLastStep(taskHandle, task.ID, step)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -385,7 +428,7 @@ func generateMergePlan(
taskHandle dispatcher.TaskHandle,
task *proto.Task,
) ([][]byte, error) {
_, _, _, dataFiles, _, err := getSummaryFromLastStep(taskHandle, task.ID)
_, _, _, dataFiles, _, err := getSummaryFromLastStep(taskHandle, task.ID, proto.StepOne)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -456,8 +499,9 @@ func getRangeSplitter(
func getSummaryFromLastStep(
taskHandle dispatcher.TaskHandle,
gTaskID int64,
step proto.Step,
) (min, max kv.Key, totalKVSize uint64, dataFiles, statFiles []string, err error) {
subTaskMetas, err := taskHandle.GetPreviousSubtaskMetas(gTaskID, proto.StepOne)
subTaskMetas, err := taskHandle.GetPreviousSubtaskMetas(gTaskID, step)
if err != nil {
return nil, nil, 0, nil, nil, errors.Trace(err)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type BackfillGlobalMeta struct {
EleTypeKey []byte `json:"ele_type_key"`

CloudStorageURI string `json:"cloud_storage_uri"`
// UseMergeSort indicate whether the backfilling task use merge sort step for global sort.
// Merge Sort step aims to support more data.
UseMergeSort bool `json:"use_merge_sort"`
}

// BackfillSubTaskMeta is the sub-task meta for backfilling index.
Expand Down
44 changes: 28 additions & 16 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,27 @@ import (
"sync"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
)

type mergeSortExecutor struct {
jobID int64
index *model.IndexInfo
ptbl table.PhysicalTable
bc ingest.BackendCtx
cloudStoreURI string
dataFiles []string
statFiles []string
mu sync.Mutex
jobID int64
index *model.IndexInfo
ptbl table.PhysicalTable
bc ingest.BackendCtx
cloudStoreURI string
mu sync.Mutex
subtaskSortedKVMeta *external.SortedKVMeta
}

func newMergeSortExecutor(
Expand Down Expand Up @@ -78,21 +79,21 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
}

m.mu.Lock()
m.subtaskSortedKVMeta = &external.SortedKVMeta{}
onClose := func(summary *external.WriterSummary) {
for _, f := range summary.MultipleFilesStats {
for _, filename := range f.Filenames {
m.dataFiles = append(m.dataFiles, filename[0])
m.statFiles = append(m.statFiles, filename[1])
}
}
m.subtaskSortedKVMeta.MergeSummary(summary)
m.mu.Unlock()
}

storeBackend, err := storage.ParseBackend(m.cloudStoreURI, nil)
if err != nil {
return err
}
store, err := storage.New(ctx, storeBackend, nil)
opt := &storage.ExternalStorageOptions{}
if intest.InTest {
opt.NoCredentials = true
}
store, err := storage.New(ctx, storeBackend, opt)
if err != nil {
return err
}
Expand Down Expand Up @@ -120,8 +121,19 @@ func (*mergeSortExecutor) Cleanup(ctx context.Context) error {
return nil
}

func (*mergeSortExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) error {
func (m *mergeSortExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error {
logutil.Logger(ctx).Info("merge sort finish subtask")
var subtaskMeta BackfillSubTaskMeta
if err := json.Unmarshal(subtask.Meta, &subtaskMeta); err != nil {
return errors.Trace(err)
}
subtaskMeta.SortedKVMeta = *m.subtaskSortedKVMeta
m.subtaskSortedKVMeta = nil
newMeta, err := json.Marshal(subtaskMeta)
if err != nil {
return errors.Trace(err)
}
subtask.Meta = newMeta
return nil
}

Expand Down
22 changes: 12 additions & 10 deletions pkg/disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,16 +400,18 @@ func (d *BaseDispatcher) replaceDeadNodesIfAny() error {
replaceNodes[nodeID] = disttaskutil.GenerateExecID(n.IP, n.Port)
}
}
logutil.Logger(d.logCtx).Info("reschedule subtasks to other nodes", zap.Int("node-cnt", len(replaceNodes)))
if err := d.taskMgr.UpdateFailedSchedulerIDs(d.Task.ID, replaceNodes); err != nil {
return err
}
// replace local cache.
for k, v := range replaceNodes {
for m, n := range d.taskNodes {
if n == k {
d.taskNodes[m] = v
break
if len(replaceNodes) > 0 {
logutil.Logger(d.logCtx).Info("reschedule subtasks to other nodes", zap.Int("node-cnt", len(replaceNodes)))
if err := d.taskMgr.UpdateFailedSchedulerIDs(d.Task.ID, replaceNodes); err != nil {
return err
}
// replace local cache.
for k, v := range replaceNodes {
for m, n := range d.taskNodes {
if n == k {
d.taskNodes[m] = v
break
}
}
}
}
Expand Down
40 changes: 27 additions & 13 deletions tests/realtikvtest/addindextest/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,21 @@ func genStorageURI(t *testing.T) (host string, port uint16, uri string) {
fmt.Sprintf("gs://sorted/addindex?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint)
}

func TestGlobalSortCleanupCloudFiles(t *testing.T) {
func checkFileCleaned(t *testing.T, jobID int64, sortStorageURI string) {
storeBackend, err := storage.ParseBackend(sortStorageURI, nil)
require.NoError(t, err)
opts := &storage.ExternalStorageOptions{NoCredentials: true}
extStore, err := storage.New(context.Background(), storeBackend, opts)
require.NoError(t, err)
prefix := strconv.Itoa(int(jobID))
dataFiles, statFiles, err := external.GetAllFileNames(context.Background(), extStore, prefix)
require.NoError(t, err)
require.Greater(t, jobID, int64(0))
require.Equal(t, 0, len(dataFiles))
require.Equal(t, 0, len(statFiles))
}

func TestGlobalSortBasic(t *testing.T) {
gcsHost, gcsPort, cloudStorageURI := genStorageURI(t)
opt := fakestorage.Options{
Scheme: "http",
Expand All @@ -67,7 +81,7 @@ func TestGlobalSortCleanupCloudFiles(t *testing.T) {
tk.MustExec("use addindexlit;")
tk.MustExec(`set @@global.tidb_ddl_enable_fast_reorg = 1;`)
tk.MustExec("set @@global.tidb_enable_dist_task = 1;")
variable.CloudStorageURI.Store(cloudStorageURI)
tk.MustExec(fmt.Sprintf(`set @@global.tidb_cloud_storage_uri = "%s"`, cloudStorageURI))
defer func() {
tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
variable.CloudStorageURI.Store("")
Expand All @@ -94,22 +108,22 @@ func TestGlobalSortCleanupCloudFiles(t *testing.T) {
hook := &callback.TestDDLCallback{}
hook.OnJobUpdatedExported.Store(&onJobUpdated)
dom.DDL().SetHook(hook)

tk.MustExec("alter table t add index idx(a);")
dom.DDL().SetHook(origin)
tk.MustExec("admin check table t;")
<-dispatcher.WaitCleanUpFinished
storeBackend, err := storage.ParseBackend(cloudStorageURI, nil)
require.NoError(t, err)
opts := &storage.ExternalStorageOptions{NoCredentials: true}
extStore, err := storage.New(context.Background(), storeBackend, opts)
require.NoError(t, err)
prefix := strconv.Itoa(int(jobID))
dataFiles, statFiles, err := external.GetAllFileNames(context.Background(), extStore, prefix)
require.NoError(t, err)
require.Greater(t, jobID, int64(0))
require.Equal(t, 0, len(dataFiles))
require.Equal(t, 0, len(statFiles))
checkFileCleaned(t, jobID, cloudStorageURI)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()"))
tk.MustExec("alter table t add index idx1(a);")
dom.DDL().SetHook(origin)
tk.MustExec("admin check table t;")
<-dispatcher.WaitCleanUpFinished

checkFileCleaned(t, jobID, cloudStorageURI)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/WaitCleanUpFinished"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort"))
}

func TestGlobalSortMultiSchemaChange(t *testing.T) {
Expand Down

0 comments on commit eafb78a

Please sign in to comment.