From fbef542715a3ae7bbe28b5c1163376fc413b8dc9 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 6 Apr 2022 14:35:47 +0800 Subject: [PATCH 01/15] done Signed-off-by: wjhuang2016 --- ddl/delete_range.go | 54 ++++++++++++++++++++++++++------------------- ddl/sanity_check.go | 4 ---- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 2182c2919ec30..6b6377e2d641c 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -17,6 +17,7 @@ package ddl import ( "context" "encoding/hex" + "fmt" "math" "strings" "sync" @@ -233,12 +234,13 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { // insertJobIntoDeleteRangeTable parses the job into delete-range arguments, // and inserts a new record into gc_delete_range table. The primary key is -// job ID, so we ignore key conflict error. +// (job ID, element ID), so we ignore key conflict error. func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, job *model.Job) error { now, err := getNowTSO(sctx) if err != nil { return errors.Trace(err) } + var elementID int64 s := sctx.(sqlexec.SQLExecutor) switch job.Type { @@ -269,15 +271,16 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range physicalTableIDs { startKey = tablecodec.EncodeTablePrefix(pid) endKey := tablecodec.EncodeTablePrefix(pid + 1) - if err := doInsert(ctx, s, job.ID, pid, startKey, endKey, now); err != nil { + if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { return errors.Trace(err) } + elementID++ } return nil } startKey = tablecodec.EncodeTablePrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) - return doInsert(ctx, s, job.ID, tableID, startKey, endKey, now) + return doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) case model.ActionDropTablePartition, model.ActionTruncateTablePartition: var physicalTableIDs []int64 if err := job.DecodeArgs(&physicalTableIDs); err != nil { @@ -286,9 +289,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, physicalTableID := range physicalTableIDs { startKey := tablecodec.EncodeTablePrefix(physicalTableID) endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1) - if err := doInsert(ctx, s, job.ID, physicalTableID, startKey, endKey, now); err != nil { + if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil { return errors.Trace(err) } + elementID++ } // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: @@ -302,14 +306,15 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range partitionIDs { startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) - if err := doInsert(ctx, s, job.ID, indexID, startKey, endKey, now); err != nil { + if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { return errors.Trace(err) } + elementID++ } } else { startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now) + return doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) } case model.ActionDropIndex, model.ActionDropPrimaryKey: tableID := job.TableID @@ -323,14 +328,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range partitionIDs { startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) - if err := doInsert(ctx, s, job.ID, indexID, startKey, endKey, now); err != nil { + if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { return errors.Trace(err) } } } else { startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now) + return doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID)) } case model.ActionDropIndexes: var indexIDs []int64 @@ -343,10 +348,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } if len(partitionIDs) == 0 { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &elementID) } for _, pID := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now, &elementID); err != nil { return errors.Trace(err) } } @@ -360,12 +365,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if len(indexIDs) > 0 { if len(partitionIDs) > 0 { for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &elementID); err != nil { return errors.Trace(err) } } } else { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &elementID) } } case model.ActionDropColumns: @@ -379,12 +384,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if len(indexIDs) > 0 { if len(partitionIDs) > 0 { for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &elementID); err != nil { return errors.Trace(err) } } } else { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &elementID) } } case model.ActionModifyColumn: @@ -397,10 +402,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } if len(partitionIDs) == 0 { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &elementID) } for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &elementID); err != nil { return errors.Trace(err) } } @@ -408,8 +413,8 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } -func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64) error { - logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", indexIDs)) +func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64, elementID *int64) error { + logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64("tableID", tableID), zap.Int64s("indexIDs", indexIDs)) paramsList := make([]interface{}, 0, len(indexIDs)*5) var buf strings.Builder buf.WriteString(insertDeleteRangeSQLPrefix) @@ -422,14 +427,15 @@ func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, if i != len(indexIDs)-1 { buf.WriteString(",") } - paramsList = append(paramsList, jobID, indexID, startKeyEncoded, endKeyEncoded, ts) + paramsList = append(paramsList, jobID, *elementID, startKeyEncoded, endKeyEncoded, ts) + *elementID++ } _, err := s.ExecuteInternal(ctx, buf.String(), paramsList...) return errors.Trace(err) } -func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, elementID int64, startKey, endKey kv.Key, ts uint64) error { - logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID)) +func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID, elementID int64, startKey, endKey kv.Key, ts uint64, comment string) error { + logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID), zap.String("comment", comment)) startKeyEncoded := hex.EncodeToString(startKey) endKeyEncoded := hex.EncodeToString(endKey) // set session disk full opt @@ -442,10 +448,11 @@ func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, elementID } func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64) error { - logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", tableIDs)) + logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs)) var buf strings.Builder buf.WriteString(insertDeleteRangeSQLPrefix) paramsList := make([]interface{}, 0, len(tableIDs)*5) + elementID := 0 for i, tableID := range tableIDs { startKey := tablecodec.EncodeTablePrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) @@ -455,7 +462,8 @@ func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tabl if i != len(tableIDs)-1 { buf.WriteString(",") } - paramsList = append(paramsList, jobID, tableID, startKeyEncoded, endKeyEncoded, ts) + paramsList = append(paramsList, jobID, elementID, startKeyEncoded, endKeyEncoded, ts) + elementID++ } // set session disk full opt s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) diff --git a/ddl/sanity_check.go b/ddl/sanity_check.go index 2ec9ed638490c..de2e0c37c05bb 100644 --- a/ddl/sanity_check.go +++ b/ddl/sanity_check.go @@ -39,10 +39,6 @@ func checkRangeCntByTableIDs(physicalTableIDs []int64, cnt int64) { } func checkRangeCntByTableIDsAndIndexIDs(partitionTableIDs []int64, indexIDs []int64, cnt int64) { - if len(indexIDs) > 0 && len(partitionTableIDs) > 0 { - // Add this check after fixing the bug. - return - } if len(indexIDs) == 0 { return } From 06175723db6c4dabd4d32756d7671e8c335b98ab Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 6 Apr 2022 15:40:23 +0800 Subject: [PATCH 02/15] done Signed-off-by: wjhuang2016 --- ddl/delete_range.go | 1 + ddl/sanity_check.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 6b6377e2d641c..5c233d1388625 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -331,6 +331,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { return errors.Trace(err) } + elementID++ } } else { startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) diff --git a/ddl/sanity_check.go b/ddl/sanity_check.go index de2e0c37c05bb..87b316ced48d8 100644 --- a/ddl/sanity_check.go +++ b/ddl/sanity_check.go @@ -51,7 +51,7 @@ func checkRangeCntByTableIDsAndIndexIDs(partitionTableIDs []int64, indexIDs []in expectedCnt *= len(partitionTableIDs) } if expectedCnt != int(cnt) { - panic("should not happened") + panic("should not happened" + fmt.Sprintf("expect count: %d, rea count: %d", expectedCnt, cnt)) } } From e44e53ecba77f69c335b69f5bbeccda64b4f76b3 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 6 Apr 2022 17:20:04 +0800 Subject: [PATCH 03/15] fix test Signed-off-by: wjhuang2016 --- ddl/delete_range.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 5c233d1388625..47d9833b4785d 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -254,7 +254,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if batchEnd > i+batchInsertDeleteRangeSize { batchEnd = i + batchInsertDeleteRangeSize } - if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now); err != nil { + if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, &elementID); err != nil { return errors.Trace(err) } } @@ -448,12 +448,11 @@ func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID, elementID int64 return errors.Trace(err) } -func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64) error { +func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64, elementID *int64) error { logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs)) var buf strings.Builder buf.WriteString(insertDeleteRangeSQLPrefix) paramsList := make([]interface{}, 0, len(tableIDs)*5) - elementID := 0 for i, tableID := range tableIDs { startKey := tablecodec.EncodeTablePrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) @@ -463,8 +462,8 @@ func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tabl if i != len(tableIDs)-1 { buf.WriteString(",") } - paramsList = append(paramsList, jobID, elementID, startKeyEncoded, endKeyEncoded, ts) - elementID++ + paramsList = append(paramsList, jobID, *elementID, startKeyEncoded, endKeyEncoded, ts) + *elementID++ } // set session disk full opt s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) From e6cada32758bb7666e0dfa75b9f02d7011657045 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 6 Apr 2022 17:52:50 +0800 Subject: [PATCH 04/15] fix test Signed-off-by: wjhuang2016 --- ddl/column_modify_test.go | 4 ++-- ddl/db_partition_test.go | 4 ++-- ddl/index_modify_test.go | 6 +++--- ddl/main_test.go | 8 ++++---- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index 12cd8296682ac..f0e6821c67b1f 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -535,7 +535,7 @@ func TestCancelDropColumn(t *testing.T) { require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) if c3IdxID != 0 { // Check index is deleted - checkDelRangeAdded(tk, jobID, c3IdxID) + checkDelRangeAdded(tk, jobID) } } } @@ -636,7 +636,7 @@ func TestCancelDropColumns(t *testing.T) { require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) if c3IdxID != 0 { // Check index is deleted - checkDelRangeAdded(tk, jobID, c3IdxID) + checkDelRangeAdded(tk, jobID) } } } diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 74c2236c92f50..7fedd942f537b 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2780,7 +2780,7 @@ LOOP: num += step } } - checkDelRangeAdded(tk, jobIDExt.jobID, indexID) + checkDelRangeAdded(tk, jobIDExt.jobID) tk.MustExec("drop table partition_drop_idx;") } @@ -2873,7 +2873,7 @@ LOOP: times++ } } - checkDelRangeAdded(tk, jobIDExt.jobID, c3IdxInfo.ID) + checkDelRangeAdded(tk, jobIDExt.jobID) tk.MustExec("drop table t1") } diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go index 586e9c8ffa7c8..fc65aeb6af692 100644 --- a/ddl/index_modify_test.go +++ b/ddl/index_modify_test.go @@ -757,7 +757,7 @@ LOOP: times++ } } - checkDelRangeAdded(tk, jobIDExt.jobID, c3IdxInfo.ID) + checkDelRangeAdded(tk, jobIDExt.jobID) d.SetHook(originalHook) } @@ -1085,7 +1085,7 @@ LOOP: } } for _, idxID := range idxIDs { - checkDelRangeAdded(tk, jobIDExt.jobID, idxID) + checkDelRangeAdded(tk, jobIDExt.jobID) } } @@ -1285,7 +1285,7 @@ LOOP: rows := tk.MustQuery("explain select c1 from test_drop_index where c3 >= 0") require.NotContains(t, fmt.Sprintf("%v", rows), idxName) - checkDelRangeAdded(tk, jobIDExt.jobID, indexID) + checkDelRangeAdded(tk, jobIDExt.jobID) tk.MustExec("drop table test_drop_index") } diff --git a/ddl/main_test.go b/ddl/main_test.go index 3d8de547bd5dc..c0bcbc2730153 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -86,11 +86,11 @@ func setupJobIDExtCallback(ctx sessionctx.Context) (jobExt *testDDLJobIDCallback } } -func checkDelRangeAdded(tk *testkit.TestKit, jobID int64, elemID int64) { +func checkDelRangeAdded(tk *testkit.TestKit, jobID int64) { query := `select sum(cnt) from - (select count(1) cnt from mysql.gc_delete_range where job_id = ? and element_id = ? union - select count(1) cnt from mysql.gc_delete_range_done where job_id = ? and element_id = ?) as gdr;` - tk.MustQuery(query, jobID, elemID, jobID, elemID).Check(testkit.Rows("1")) + (select count(1) cnt from mysql.gc_delete_range where job_id = ? union + select count(1) cnt from mysql.gc_delete_range_done where job_id = ?) as gdr;` + tk.MustQuery(query, jobID, jobID).Check(testkit.Rows("1")) } type testDDLJobIDCallback struct { From 8e212ccaf397fe99dbb7b28eb1f966a1a9e717aa Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 6 Apr 2022 19:56:38 +0800 Subject: [PATCH 05/15] fix test Signed-off-by: wjhuang2016 --- ddl/column_modify_test.go | 8 -------- ddl/db_partition_test.go | 7 +------ ddl/index_modify_test.go | 8 +------- 3 files changed, 2 insertions(+), 21 deletions(-) diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index f0e6821c67b1f..863a863c9f3bb 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -533,10 +533,6 @@ func TestCancelDropColumn(t *testing.T) { require.Nil(t, col1) require.NoError(t, err) require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) - if c3IdxID != 0 { - // Check index is deleted - checkDelRangeAdded(tk, jobID) - } } } dom.DDL().SetHook(originalHook) @@ -634,10 +630,6 @@ func TestCancelDropColumns(t *testing.T) { require.Nil(t, idx3) require.NoError(t, err) require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) - if c3IdxID != 0 { - // Check index is deleted - checkDelRangeAdded(tk, jobID) - } } } dom.DDL().SetHook(originalHook) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 7fedd942f537b..d88e09c92dd3d 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2754,8 +2754,6 @@ func testPartitionDropIndex(t *testing.T, store kv.Storage, lease time.Duration, } tk.MustExec(addIdxSQL) - indexID := external.GetIndexID(t, tk, "test", "partition_drop_idx", idxName) - jobIDExt, reset := setupJobIDExtCallback(tk.Session()) defer reset() testutil.ExecMultiSQLInGoroutine(store, "test", []string{dropIdxSQL}, done) @@ -2780,7 +2778,6 @@ LOOP: num += step } } - checkDelRangeAdded(tk, jobIDExt.jobID) tk.MustExec("drop table partition_drop_idx;") } @@ -2833,13 +2830,12 @@ func testPartitionCancelAddIndex(t *testing.T, store kv.Storage, d ddl.DDL, leas } var checkErr error - var c3IdxInfo *model.IndexInfo hook := &ddl.TestDDLCallback{} originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") // Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job. tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 32") defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", originBatchSize.Rows()[0][0])) - hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExportedT(t, tk, store, hook, idxName) + hook.OnJobUpdatedExported, _, checkErr = backgroundExecOnJobUpdatedExportedT(t, tk, store, hook, idxName) originHook := d.GetHook() defer d.SetHook(originHook) jobIDExt := wrapJobIDExtCallback(hook) @@ -2873,7 +2869,6 @@ LOOP: times++ } } - checkDelRangeAdded(tk, jobIDExt.jobID) tk.MustExec("drop table t1") } diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go index fc65aeb6af692..c8aa0d5888b12 100644 --- a/ddl/index_modify_test.go +++ b/ddl/index_modify_test.go @@ -715,7 +715,6 @@ func testCancelAddIndex(t *testing.T, store kv.Storage, dom *domain.Domain, idxN batchInsert(tk, "t1", i, i+defaultBatchSize) } - var c3IdxInfo *model.IndexInfo hook := &ddl.TestDDLCallback{Do: dom} originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") // Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job. @@ -725,7 +724,7 @@ func testCancelAddIndex(t *testing.T, store kv.Storage, dom *domain.Domain, idxN // the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob. // After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case. var checkErr error - hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(t, tk, store, hook, idxName) + hook.OnJobUpdatedExported, _, checkErr = backgroundExecOnJobUpdatedExported(t, tk, store, hook, idxName) originalHook := d.GetHook() jobIDExt := wrapJobIDExtCallback(hook) d.SetHook(jobIDExt) @@ -757,7 +756,6 @@ LOOP: times++ } } - checkDelRangeAdded(tk, jobIDExt.jobID) d.SetHook(originalHook) } @@ -1084,9 +1082,6 @@ LOOP: num += step } } - for _, idxID := range idxIDs { - checkDelRangeAdded(tk, jobIDExt.jobID) - } } func testDropIndexesIfExists(t *testing.T, store kv.Storage) { @@ -1285,7 +1280,6 @@ LOOP: rows := tk.MustQuery("explain select c1 from test_drop_index where c3 >= 0") require.NotContains(t, fmt.Sprintf("%v", rows), idxName) - checkDelRangeAdded(tk, jobIDExt.jobID) tk.MustExec("drop table test_drop_index") } From 2bfef741b2255f97bf3802d9fb2080b1f006b66c Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 6 Apr 2022 20:14:05 +0800 Subject: [PATCH 06/15] fmt Signed-off-by: wjhuang2016 --- ddl/column_modify_test.go | 2 -- ddl/db_partition_test.go | 2 +- ddl/index_modify_test.go | 5 ++--- ddl/main_test.go | 4 ++-- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index 863a863c9f3bb..5cc6b4b8183f8 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -497,7 +497,6 @@ func TestCancelDropColumn(t *testing.T) { originalHook := dom.DDL().GetHook() dom.DDL().SetHook(hook) for i := range testCases { - var c3IdxID int64 testCase = &testCases[i] if testCase.needAddColumn { tk.MustExec("alter table test_drop_column add column c3 int") @@ -597,7 +596,6 @@ func TestCancelDropColumns(t *testing.T) { originalHook := dom.DDL().GetHook() dom.DDL().SetHook(hook) for i := range testCases { - var c3IdxID int64 testCase = &testCases[i] if testCase.needAddColumn { tk.MustExec("alter table test_drop_column add column c3 int, add column c4 int") diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index d88e09c92dd3d..82b0ad442a8e8 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2754,7 +2754,7 @@ func testPartitionDropIndex(t *testing.T, store kv.Storage, lease time.Duration, } tk.MustExec(addIdxSQL) - jobIDExt, reset := setupJobIDExtCallback(tk.Session()) + reset := setupJobIDExtCallback(tk.Session()) defer reset() testutil.ExecMultiSQLInGoroutine(store, "test", []string{dropIdxSQL}, done) ticker := time.NewTicker(lease / 2) diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go index c8aa0d5888b12..ffe1d86b44416 100644 --- a/ddl/index_modify_test.go +++ b/ddl/index_modify_test.go @@ -1057,7 +1057,7 @@ func testDropIndexes(t *testing.T, store kv.Storage, createSQL, dropIdxSQL strin for _, idxName := range idxNames { idxIDs = append(idxIDs, external.GetIndexID(t, tk, "test", "test_drop_indexes", idxName)) } - jobIDExt, reset := setupJobIDExtCallback(tk.Session()) + reset := setupJobIDExtCallback(tk.Session()) defer reset() testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done) @@ -1250,8 +1250,7 @@ func testDropIndex(t *testing.T, store kv.Storage, createSQL, dropIdxSQL, idxNam for i := 0; i < num; i++ { tk.MustExec("insert into test_drop_index values (?, ?, ?)", i, i, i) } - indexID := external.GetIndexID(t, tk, "test", "test_drop_index", idxName) - jobIDExt, reset := setupJobIDExtCallback(tk.Session()) + reset := setupJobIDExtCallback(tk.Session()) defer reset() testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done) diff --git a/ddl/main_test.go b/ddl/main_test.go index c0bcbc2730153..c938d0d5bbae4 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -76,12 +76,12 @@ func wrapJobIDExtCallback(oldCallback ddl.Callback) *testDDLJobIDCallback { } } -func setupJobIDExtCallback(ctx sessionctx.Context) (jobExt *testDDLJobIDCallback, tearDown func()) { +func setupJobIDExtCallback(ctx sessionctx.Context) (tearDown func()) { dom := domain.GetDomain(ctx) originHook := dom.DDL().GetHook() jobIDExt := wrapJobIDExtCallback(originHook) dom.DDL().SetHook(jobIDExt) - return jobIDExt, func() { + return func() { dom.DDL().SetHook(originHook) } } From ef249f3d38a5a5baf19245a2f8ae31fa3189afa3 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 6 Apr 2022 20:32:01 +0800 Subject: [PATCH 07/15] fmt Signed-off-by: wjhuang2016 --- ddl/column_modify_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index 5cc6b4b8183f8..d07e4ac25385f 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -501,7 +501,6 @@ func TestCancelDropColumn(t *testing.T) { if testCase.needAddColumn { tk.MustExec("alter table test_drop_column add column c3 int") tk.MustExec("alter table test_drop_column add index idx_c3(c3)") - c3IdxID = external.GetIndexID(t, tk, "test", "test_drop_column", "idx_c3") } err := tk.ExecToErr("alter table test_drop_column drop column c3") @@ -600,7 +599,6 @@ func TestCancelDropColumns(t *testing.T) { if testCase.needAddColumn { tk.MustExec("alter table test_drop_column add column c3 int, add column c4 int") tk.MustExec("alter table test_drop_column add index idx_c3(c3)") - c3IdxID = external.GetIndexID(t, tk, "test", "test_drop_column", "idx_c3") } err := tk.ExecToErr("alter table test_drop_column drop column c3, drop column c4") tbl := external.GetTableByName(t, tk, "test", "test_drop_column") From 98b445dbe8cf4ff84ec6d0fb9ff59254c35f932e Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 6 Apr 2022 21:12:12 +0800 Subject: [PATCH 08/15] fmt Signed-off-by: wjhuang2016 --- ddl/main_test.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/ddl/main_test.go b/ddl/main_test.go index c938d0d5bbae4..20ed43d1ed9c7 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/testbridge" "github.com/tikv/client-go/v2/tikv" "go.uber.org/goleak" @@ -86,13 +85,6 @@ func setupJobIDExtCallback(ctx sessionctx.Context) (tearDown func()) { } } -func checkDelRangeAdded(tk *testkit.TestKit, jobID int64) { - query := `select sum(cnt) from - (select count(1) cnt from mysql.gc_delete_range where job_id = ? union - select count(1) cnt from mysql.gc_delete_range_done where job_id = ?) as gdr;` - tk.MustQuery(query, jobID, jobID).Check(testkit.Rows("1")) -} - type testDDLJobIDCallback struct { ddl.Callback jobID int64 From ab3de43ac0617abe7ed0a39a2fae308050682488 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Thu, 7 Apr 2022 17:18:21 +0800 Subject: [PATCH 09/15] refine Signed-off-by: wjhuang2016 --- ddl/sanity_check.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ddl/sanity_check.go b/ddl/sanity_check.go index ad43fbc2329bd..db662ee02da40 100644 --- a/ddl/sanity_check.go +++ b/ddl/sanity_check.go @@ -31,10 +31,10 @@ import ( func checkRangeCntByTableIDs(physicalTableIDs []int64, cnt int64) { if len(physicalTableIDs) > 0 { if len(physicalTableIDs) != int(cnt) { - panic("should not happened") + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(physicalTableIDs), cnt)) } } else if cnt != 1 { - panic("should not happened") + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", 1, cnt)) } } @@ -51,7 +51,7 @@ func checkRangeCntByTableIDsAndIndexIDs(partitionTableIDs []int64, indexIDs []in expectedCnt *= len(partitionTableIDs) } if expectedCnt != int(cnt) { - panic("should not happened" + fmt.Sprintf("expect count: %d, rea count: %d", expectedCnt, cnt)) + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", expectedCnt, cnt)) } } @@ -89,7 +89,7 @@ func (d *ddl) checkDeleteRangeCnt(job *model.Job) { panic("should not happened") } if len(tableIDs) != int(cnt) { - panic("should not happened") + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(tableIDs), cnt)) } case model.ActionDropTable, model.ActionTruncateTable: var startKey kv.Key @@ -105,7 +105,7 @@ func (d *ddl) checkDeleteRangeCnt(job *model.Job) { panic("should not happened") } if len(physicalTableIDs) != int(cnt) { - panic("should not happened") + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(physicalTableIDs), cnt)) } case model.ActionAddIndex, model.ActionAddPrimaryKey: var indexID int64 From 52f421232ba050e42fa6ac513feadb7aec723316 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Thu, 7 Apr 2022 18:12:20 +0800 Subject: [PATCH 10/15] fix test Signed-off-by: wjhuang2016 --- ddl/sanity_check.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ddl/sanity_check.go b/ddl/sanity_check.go index db662ee02da40..528370eeee157 100644 --- a/ddl/sanity_check.go +++ b/ddl/sanity_check.go @@ -42,11 +42,7 @@ func checkRangeCntByTableIDsAndIndexIDs(partitionTableIDs []int64, indexIDs []in if len(indexIDs) == 0 { return } - uniqueIndexIDs := make(map[int64]struct{}) - for _, id := range indexIDs { - uniqueIndexIDs[id] = struct{}{} - } - expectedCnt := len(uniqueIndexIDs) + expectedCnt := len(indexIDs) if len(partitionTableIDs) > 0 { expectedCnt *= len(partitionTableIDs) } From d7d726dc338c225127557eabb66b06d248cbea67 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Fri, 8 Apr 2022 14:49:40 +0800 Subject: [PATCH 11/15] address comment Signed-off-by: wjhuang2016 --- ddl/db_partition_test.go | 2 -- ddl/delete_range.go | 2 +- ddl/index_modify_test.go | 4 ---- ddl/main_test.go | 11 ----------- 4 files changed, 1 insertion(+), 18 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 82b0ad442a8e8..58a644a6ff3ce 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2754,8 +2754,6 @@ func testPartitionDropIndex(t *testing.T, store kv.Storage, lease time.Duration, } tk.MustExec(addIdxSQL) - reset := setupJobIDExtCallback(tk.Session()) - defer reset() testutil.ExecMultiSQLInGoroutine(store, "test", []string{dropIdxSQL}, done) ticker := time.NewTicker(lease / 2) defer ticker.Stop() diff --git a/ddl/delete_range.go b/ddl/delete_range.go index e2ed4f6239cb1..2f403cedb1eac 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -276,7 +276,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range physicalTableIDs { startKey = tablecodec.EncodeTablePrefix(pid) endKey := tablecodec.EncodeTablePrefix(pid + 1) - if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { + if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil { return errors.Trace(err) } elementID++ diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go index ffe1d86b44416..1b682b9f8ac49 100644 --- a/ddl/index_modify_test.go +++ b/ddl/index_modify_test.go @@ -1057,8 +1057,6 @@ func testDropIndexes(t *testing.T, store kv.Storage, createSQL, dropIdxSQL strin for _, idxName := range idxNames { idxIDs = append(idxIDs, external.GetIndexID(t, tk, "test", "test_drop_indexes", idxName)) } - reset := setupJobIDExtCallback(tk.Session()) - defer reset() testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done) ticker := time.NewTicker(indexModifyLease / 2) @@ -1250,8 +1248,6 @@ func testDropIndex(t *testing.T, store kv.Storage, createSQL, dropIdxSQL, idxNam for i := 0; i < num; i++ { tk.MustExec("insert into test_drop_index values (?, ?, ?)", i, i, i) } - reset := setupJobIDExtCallback(tk.Session()) - defer reset() testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done) ticker := time.NewTicker(indexModifyLease / 2) diff --git a/ddl/main_test.go b/ddl/main_test.go index 20ed43d1ed9c7..c944d92799902 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/testbridge" "github.com/tikv/client-go/v2/tikv" "go.uber.org/goleak" @@ -75,16 +74,6 @@ func wrapJobIDExtCallback(oldCallback ddl.Callback) *testDDLJobIDCallback { } } -func setupJobIDExtCallback(ctx sessionctx.Context) (tearDown func()) { - dom := domain.GetDomain(ctx) - originHook := dom.DDL().GetHook() - jobIDExt := wrapJobIDExtCallback(originHook) - dom.DDL().SetHook(jobIDExt) - return func() { - dom.DDL().SetHook(originHook) - } -} - type testDDLJobIDCallback struct { ddl.Callback jobID int64 From 774fcd0a82e7addda093c8aed743bcbb63bd406b Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Fri, 8 Apr 2022 15:12:56 +0800 Subject: [PATCH 12/15] use elementID alloc Signed-off-by: wjhuang2016 --- ddl/delete_range.go | 57 ++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 2f403cedb1eac..7ff5f7c2c8809 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -237,6 +237,15 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { return nil } +type elementIDAlloc struct { + int64 +} + +func (ea elementIDAlloc) alloc() int64 { + ea.int64++ + return ea.int64 +} + // insertJobIntoDeleteRangeTable parses the job into delete-range arguments, // and inserts a new record into gc_delete_range table. The primary key is // (job ID, element ID), so we ignore key conflict error. @@ -245,7 +254,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if err != nil { return errors.Trace(err) } - var elementID int64 + var ea elementIDAlloc s := sctx.(sqlexec.SQLExecutor) switch job.Type { @@ -259,7 +268,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if batchEnd > i+batchInsertDeleteRangeSize { batchEnd = i + batchInsertDeleteRangeSize } - if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, &elementID); err != nil { + if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, &ea); err != nil { return errors.Trace(err) } } @@ -276,16 +285,15 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range physicalTableIDs { startKey = tablecodec.EncodeTablePrefix(pid) endKey := tablecodec.EncodeTablePrefix(pid + 1) - if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil { + if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil { return errors.Trace(err) } - elementID++ } return nil } startKey = tablecodec.EncodeTablePrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) - return doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) + return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) case model.ActionDropTablePartition, model.ActionTruncateTablePartition: var physicalTableIDs []int64 if err := job.DecodeArgs(&physicalTableIDs); err != nil { @@ -294,10 +302,9 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, physicalTableID := range physicalTableIDs { startKey := tablecodec.EncodeTablePrefix(physicalTableID) endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1) - if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil { + if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil { return errors.Trace(err) } - elementID++ } // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: @@ -311,15 +318,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range partitionIDs { startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) - if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { + if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { return errors.Trace(err) } - elementID++ } } else { startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - return doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) + return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) } case model.ActionDropIndex, model.ActionDropPrimaryKey: tableID := job.TableID @@ -333,15 +339,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range partitionIDs { startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) - if err := doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { + if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { return errors.Trace(err) } - elementID++ } } else { startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - return doInsert(ctx, s, job.ID, elementID, startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID)) + return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID)) } case model.ActionDropIndexes: var indexIDs []int64 @@ -354,10 +359,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } if len(partitionIDs) == 0 { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &elementID) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea) } for _, pID := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now, &elementID); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now, &ea); err != nil { return errors.Trace(err) } } @@ -371,12 +376,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if len(indexIDs) > 0 { if len(partitionIDs) > 0 { for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &elementID); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil { return errors.Trace(err) } } } else { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &elementID) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea) } } case model.ActionDropColumns: @@ -390,12 +395,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if len(indexIDs) > 0 { if len(partitionIDs) > 0 { for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &elementID); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil { return errors.Trace(err) } } } else { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &elementID) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea) } } case model.ActionModifyColumn: @@ -408,10 +413,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } if len(partitionIDs) == 0 { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &elementID) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea) } for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &elementID); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil { return errors.Trace(err) } } @@ -419,7 +424,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } -func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64, elementID *int64) error { +func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64, ea *elementIDAlloc) error { logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64("tableID", tableID), zap.Int64s("indexIDs", indexIDs)) paramsList := make([]interface{}, 0, len(indexIDs)*5) var buf strings.Builder @@ -433,8 +438,7 @@ func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, if i != len(indexIDs)-1 { buf.WriteString(",") } - paramsList = append(paramsList, jobID, *elementID, startKeyEncoded, endKeyEncoded, ts) - *elementID++ + paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts) } _, err := s.ExecuteInternal(ctx, buf.String(), paramsList...) return errors.Trace(err) @@ -453,7 +457,7 @@ func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID, elementID int64 return errors.Trace(err) } -func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64, elementID *int64) error { +func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64, ea *elementIDAlloc) error { logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs)) var buf strings.Builder buf.WriteString(insertDeleteRangeSQLPrefix) @@ -467,8 +471,7 @@ func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tabl if i != len(tableIDs)-1 { buf.WriteString(",") } - paramsList = append(paramsList, jobID, *elementID, startKeyEncoded, endKeyEncoded, ts) - *elementID++ + paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts) } // set session disk full opt s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) From b33c466eb5f792964035e0adc65167bcb003871e Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Fri, 8 Apr 2022 15:42:33 +0800 Subject: [PATCH 13/15] fix id Signed-off-by: wjhuang2016 --- ddl/delete_range.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 7ff5f7c2c8809..996f0f70f6c62 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -238,12 +238,12 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { } type elementIDAlloc struct { - int64 + id int64 } func (ea elementIDAlloc) alloc() int64 { - ea.int64++ - return ea.int64 + ea.id++ + return ea.id } // insertJobIntoDeleteRangeTable parses the job into delete-range arguments, From 43e6767f9d65b76f9206d6772eb151135af7e4a4 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Fri, 8 Apr 2022 16:27:08 +0800 Subject: [PATCH 14/15] fix Signed-off-by: wjhuang2016 --- ddl/delete_range.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 996f0f70f6c62..d5fcfb1901341 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -241,7 +241,7 @@ type elementIDAlloc struct { id int64 } -func (ea elementIDAlloc) alloc() int64 { +func (ea *elementIDAlloc) alloc() int64 { ea.id++ return ea.id } From d60c3cb7b05eb2929521fbdec45160a85c3b1984 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Fri, 8 Apr 2022 17:44:58 +0800 Subject: [PATCH 15/15] add testcases Signed-off-by: wjhuang2016 --- ddl/index_modify_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go index 1b682b9f8ac49..5072c036ba745 100644 --- a/ddl/index_modify_test.go +++ b/ddl/index_modify_test.go @@ -1121,6 +1121,11 @@ func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) { tk.MustExec("insert into test_drop_indexes_from_partitioned_table values (?, ?, ?)", i, i, i) } tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i2;") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table add index i1(c1)") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i1;") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column c2;") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table add column c1 int") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column if exists c1;") } func testCancelDropIndexes(t *testing.T, store kv.Storage, d ddl.DDL) {