Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix duplicate elementID allocation to make sure gc work for partition table #33726

Merged
merged 22 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,10 @@ func TestCancelDropColumn(t *testing.T) {
originalHook := dom.DDL().GetHook()
dom.DDL().SetHook(hook)
for i := range testCases {
var c3IdxID int64
testCase = &testCases[i]
if testCase.needAddColumn {
tk.MustExec("alter table test_drop_column add column c3 int")
tk.MustExec("alter table test_drop_column add index idx_c3(c3)")
c3IdxID = external.GetIndexID(t, tk, "test", "test_drop_column", "idx_c3")
}

err := tk.ExecToErr("alter table test_drop_column drop column c3")
Expand Down Expand Up @@ -533,10 +531,6 @@ func TestCancelDropColumn(t *testing.T) {
require.Nil(t, col1)
require.NoError(t, err)
require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
if c3IdxID != 0 {
// Check index is deleted
checkDelRangeAdded(tk, jobID, c3IdxID)
}
}
}
dom.DDL().SetHook(originalHook)
Expand Down Expand Up @@ -601,12 +595,10 @@ func TestCancelDropColumns(t *testing.T) {
originalHook := dom.DDL().GetHook()
dom.DDL().SetHook(hook)
for i := range testCases {
var c3IdxID int64
testCase = &testCases[i]
if testCase.needAddColumn {
tk.MustExec("alter table test_drop_column add column c3 int, add column c4 int")
tk.MustExec("alter table test_drop_column add index idx_c3(c3)")
c3IdxID = external.GetIndexID(t, tk, "test", "test_drop_column", "idx_c3")
}
err := tk.ExecToErr("alter table test_drop_column drop column c3, drop column c4")
tbl := external.GetTableByName(t, tk, "test", "test_drop_column")
Expand Down Expand Up @@ -634,10 +626,6 @@ func TestCancelDropColumns(t *testing.T) {
require.Nil(t, idx3)
require.NoError(t, err)
require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
if c3IdxID != 0 {
// Check index is deleted
checkDelRangeAdded(tk, jobID, c3IdxID)
}
}
}
dom.DDL().SetHook(originalHook)
Expand Down
9 changes: 1 addition & 8 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2754,10 +2754,6 @@ func testPartitionDropIndex(t *testing.T, store kv.Storage, lease time.Duration,
}
tk.MustExec(addIdxSQL)

indexID := external.GetIndexID(t, tk, "test", "partition_drop_idx", idxName)

jobIDExt, reset := setupJobIDExtCallback(tk.Session())
defer reset()
testutil.ExecMultiSQLInGoroutine(store, "test", []string{dropIdxSQL}, done)
ticker := time.NewTicker(lease / 2)
defer ticker.Stop()
Expand All @@ -2780,7 +2776,6 @@ LOOP:
num += step
}
}
checkDelRangeAdded(tk, jobIDExt.jobID, indexID)
tk.MustExec("drop table partition_drop_idx;")
}

Expand Down Expand Up @@ -2833,13 +2828,12 @@ func testPartitionCancelAddIndex(t *testing.T, store kv.Storage, d ddl.DDL, leas
}

var checkErr error
var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{}
originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
// Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job.
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 32")
defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", originBatchSize.Rows()[0][0]))
hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExportedT(t, tk, store, hook, idxName)
hook.OnJobUpdatedExported, _, checkErr = backgroundExecOnJobUpdatedExportedT(t, tk, store, hook, idxName)
originHook := d.GetHook()
defer d.SetHook(originHook)
jobIDExt := wrapJobIDExtCallback(hook)
Expand Down Expand Up @@ -2873,7 +2867,6 @@ LOOP:
times++
}
}
checkDelRangeAdded(tk, jobIDExt.jobID, c3IdxInfo.ID)
tk.MustExec("drop table t1")
}

Expand Down
61 changes: 36 additions & 25 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl
import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -236,14 +237,24 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
return nil
}

type elementIDAlloc struct {
id int64
}

func (ea *elementIDAlloc) alloc() int64 {
ea.id++
return ea.id
}

// insertJobIntoDeleteRangeTable parses the job into delete-range arguments,
// and inserts a new record into gc_delete_range table. The primary key is
// job ID, so we ignore key conflict error.
// (job ID, element ID), so we ignore key conflict error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the bug exists from day 1 of table partition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's a long story.

func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, job *model.Job) error {
now, err := getNowTSO(sctx)
if err != nil {
return errors.Trace(err)
}
var ea elementIDAlloc

s := sctx.(sqlexec.SQLExecutor)
switch job.Type {
Expand All @@ -257,7 +268,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
if batchEnd > i+batchInsertDeleteRangeSize {
batchEnd = i + batchInsertDeleteRangeSize
}
if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now); err != nil {
if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, &ea); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -274,15 +285,15 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range physicalTableIDs {
startKey = tablecodec.EncodeTablePrefix(pid)
endKey := tablecodec.EncodeTablePrefix(pid + 1)
if err := doInsert(ctx, s, job.ID, pid, startKey, endKey, now); err != nil {
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
return nil
}
startKey = tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
return doInsert(ctx, s, job.ID, tableID, startKey, endKey, now)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
Expand All @@ -291,7 +302,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, physicalTableID := range physicalTableIDs {
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
if err := doInsert(ctx, s, job.ID, physicalTableID, startKey, endKey, now); err != nil {
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -307,14 +318,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
if err := doInsert(ctx, s, job.ID, indexID, startKey, endKey, now); err != nil {
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
Expand All @@ -328,14 +339,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
if err := doInsert(ctx, s, job.ID, indexID, startKey, endKey, now); err != nil {
if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now)
return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID))
}
case model.ActionDropIndexes:
var indexIDs []int64
Expand All @@ -348,10 +359,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
return nil
}
if len(partitionIDs) == 0 {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
}
for _, pID := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now, &ea); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -365,12 +376,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
if len(indexIDs) > 0 {
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil {
return errors.Trace(err)
}
}
} else {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
}
}
case model.ActionDropColumns:
Expand All @@ -384,12 +395,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
if len(indexIDs) > 0 {
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil {
return errors.Trace(err)
}
}
} else {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
}
}
case model.ActionModifyColumn:
Expand All @@ -402,19 +413,19 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
return nil
}
if len(partitionIDs) == 0 {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea)
}
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil {
return errors.Trace(err)
}
}
}
return nil
}

func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", indexIDs))
func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64, ea *elementIDAlloc) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64("tableID", tableID), zap.Int64s("indexIDs", indexIDs))
paramsList := make([]interface{}, 0, len(indexIDs)*5)
var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
Expand All @@ -427,14 +438,14 @@ func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID,
if i != len(indexIDs)-1 {
buf.WriteString(",")
}
paramsList = append(paramsList, jobID, indexID, startKeyEncoded, endKeyEncoded, ts)
paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts)
}
_, err := s.ExecuteInternal(ctx, buf.String(), paramsList...)
return errors.Trace(err)
}

func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, elementID int64, startKey, endKey kv.Key, ts uint64) error {
logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID))
func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID, elementID int64, startKey, endKey kv.Key, ts uint64, comment string) error {
logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID), zap.String("comment", comment))
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
// set session disk full opt
Expand All @@ -446,8 +457,8 @@ func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, elementID
return errors.Trace(err)
}

func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", tableIDs))
func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64, ea *elementIDAlloc) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs))
var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
paramsList := make([]interface{}, 0, len(tableIDs)*5)
Expand All @@ -460,7 +471,7 @@ func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tabl
if i != len(tableIDs)-1 {
buf.WriteString(",")
}
paramsList = append(paramsList, jobID, tableID, startKeyEncoded, endKeyEncoded, ts)
paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts)
}
// set session disk full opt
s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
Expand Down
13 changes: 1 addition & 12 deletions ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,6 @@ func testCancelAddIndex(t *testing.T, store kv.Storage, dom *domain.Domain, idxN
batchInsert(tk, "t1", i, i+defaultBatchSize)
}

var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{Do: dom}
originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
// Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job.
Expand All @@ -725,7 +724,7 @@ func testCancelAddIndex(t *testing.T, store kv.Storage, dom *domain.Domain, idxN
// the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob.
// After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case.
var checkErr error
hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(t, tk, store, hook, idxName)
hook.OnJobUpdatedExported, _, checkErr = backgroundExecOnJobUpdatedExported(t, tk, store, hook, idxName)
originalHook := d.GetHook()
jobIDExt := wrapJobIDExtCallback(hook)
d.SetHook(jobIDExt)
Expand Down Expand Up @@ -757,7 +756,6 @@ LOOP:
times++
}
}
checkDelRangeAdded(tk, jobIDExt.jobID, c3IdxInfo.ID)
d.SetHook(originalHook)
}

Expand Down Expand Up @@ -1059,8 +1057,6 @@ func testDropIndexes(t *testing.T, store kv.Storage, createSQL, dropIdxSQL strin
for _, idxName := range idxNames {
idxIDs = append(idxIDs, external.GetIndexID(t, tk, "test", "test_drop_indexes", idxName))
}
jobIDExt, reset := setupJobIDExtCallback(tk.Session())
defer reset()
testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done)

ticker := time.NewTicker(indexModifyLease / 2)
Expand All @@ -1084,9 +1080,6 @@ LOOP:
num += step
}
}
for _, idxID := range idxIDs {
checkDelRangeAdded(tk, jobIDExt.jobID, idxID)
}
}

func testDropIndexesIfExists(t *testing.T, store kv.Storage) {
Expand Down Expand Up @@ -1255,9 +1248,6 @@ func testDropIndex(t *testing.T, store kv.Storage, createSQL, dropIdxSQL, idxNam
for i := 0; i < num; i++ {
tk.MustExec("insert into test_drop_index values (?, ?, ?)", i, i, i)
}
indexID := external.GetIndexID(t, tk, "test", "test_drop_index", idxName)
jobIDExt, reset := setupJobIDExtCallback(tk.Session())
defer reset()
testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done)

ticker := time.NewTicker(indexModifyLease / 2)
Expand Down Expand Up @@ -1285,7 +1275,6 @@ LOOP:
rows := tk.MustQuery("explain select c1 from test_drop_index where c3 >= 0")
require.NotContains(t, fmt.Sprintf("%v", rows), idxName)

checkDelRangeAdded(tk, jobIDExt.jobID, indexID)
tk.MustExec("drop table test_drop_index")
}

Expand Down
19 changes: 0 additions & 19 deletions ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/testbridge"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/goleak"
Expand Down Expand Up @@ -76,23 +74,6 @@ func wrapJobIDExtCallback(oldCallback ddl.Callback) *testDDLJobIDCallback {
}
}

func setupJobIDExtCallback(ctx sessionctx.Context) (jobExt *testDDLJobIDCallback, tearDown func()) {
dom := domain.GetDomain(ctx)
originHook := dom.DDL().GetHook()
jobIDExt := wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(jobIDExt)
return jobIDExt, func() {
dom.DDL().SetHook(originHook)
}
}

func checkDelRangeAdded(tk *testkit.TestKit, jobID int64, elemID int64) {
query := `select sum(cnt) from
(select count(1) cnt from mysql.gc_delete_range where job_id = ? and element_id = ? union
select count(1) cnt from mysql.gc_delete_range_done where job_id = ? and element_id = ?) as gdr;`
tk.MustQuery(query, jobID, elemID, jobID, elemID).Check(testkit.Rows("1"))
}

type testDDLJobIDCallback struct {
ddl.Callback
jobID int64
Expand Down
Loading