Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Fix for TRUNCATE PARTITION and Global Index #57724

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
480 changes: 224 additions & 256 deletions pkg/ddl/partition.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,7 @@ func TestTruncateTablePartitionWithPlacement(t *testing.T) {
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PLACEMENT POLICY=`p3` */,\n" +
" PARTITION `p3` VALUES LESS THAN (100000))"))
dom.Reload()
checkExistTableBundlesInPD(t, dom, "test", "tp")
checkWaitingGCPartitionBundlesInPD(t, dom, checkOldPartitions)

Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,8 @@ func getReorgInfo(ctx *ReorgContext, jobCtx *jobContext, rh *reorgHandler, job *
zap.String("startKey", hex.EncodeToString(start)),
zap.String("endKey", hex.EncodeToString(end)))

failpoint.Inject("errorUpdateReorgHandle", func() (*reorgInfo, error) {
return &info, errors.New("occur an error when update reorg handle")
failpoint.Inject("errorUpdateReorgHandle", func() {
failpoint.Return(&info, errors.New("occur an error when update reorg handle"))
})
err = rh.InitDDLReorgHandle(job, start, end, pid, elements[0])
if err != nil {
Expand Down
39 changes: 38 additions & 1 deletion pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,41 @@ func rollingbackExchangeTablePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}

func rollingbackTruncateTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

return convertTruncateTablePartitionJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob, tblInfo)
}

func convertTruncateTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) {
if !job.IsRollbackable() {
// Only Original state and StateWrite can be rolled back, otherwise new partitions
// may have been used and new data would get lost.
// So we must continue to roll forward!
job.State = model.JobStateRunning
return ver, nil
}
pi := tblInfo.Partition
if len(pi.NewPartitionIDs) != 0 || pi.DDLAction != model.ActionNone || pi.DDLState != model.StateNone {
// Rollback the changes, note that no new partitions has been used yet!
// so only metadata rollback and we can cancel the DDL
tblInfo.Partition.NewPartitionIDs = nil
tblInfo.Partition.DDLAction = model.ActionNone
tblInfo.Partition.DDLState = model.StateNone
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}
// No change yet, just cancel the job.
job.State = model.JobStateCancelled
return ver, errors.Trace(otherwiseErr)
}

func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) {
addingDefinitions := tblInfo.Partition.AddingDefinitions
partNames := make([]string, 0, len(addingDefinitions))
Expand Down Expand Up @@ -590,8 +625,10 @@ func convertJob2RollbackJob(w *worker, jobCtx *jobContext, job *model.Job) (ver
ver, err = rollingbackTruncateTable(jobCtx, job)
case model.ActionModifyColumn:
ver, err = rollingbackModifyColumn(jobCtx, job)
case model.ActionDropForeignKey, model.ActionTruncateTablePartition:
case model.ActionDropForeignKey:
ver, err = cancelOnlyNotHandledJob(job, model.StatePublic)
case model.ActionTruncateTablePartition:
ver, err = rollingbackTruncateTablePartition(jobCtx, job)
case model.ActionRebaseAutoID, model.ActionShardRowID, model.ActionAddForeignKey,
model.ActionRenameTable, model.ActionRenameTables,
model.ActionModifyTableCharsetAndCollate,
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_test(
timeout = "short",
srcs = [
"db_partition_test.go",
"error_injection_test.go",
"main_test.go",
"multi_domain_test.go",
"placement_test.go",
Expand Down
55 changes: 47 additions & 8 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,15 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}
}
waitFor(4, "write only")
tkTmp := testkit.NewTestKit(t, store)
tkTmp.MustExec(`begin`)
tkTmp.MustExec("use test")
tkTmp.MustQuery(`select count(*) from test_global`).Check(testkit.Rows("5"))
tk2.MustExec(`rollback`)
tk2.MustExec(`begin`)
tk2.MustExec(`insert into test_global values (5,5,5)`)
tkTmp.MustExec(`rollback`)
waitFor(4, "delete only")
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec(`begin`)
Expand All @@ -1437,16 +1446,21 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Point_Get")
tk3.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
tk3.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
// Here it will fail with
// the partition is not in public.
err := tk3.ExecToErr(`insert into test_global values (15,15,15)`)
require.Error(t, err)
require.ErrorContains(t, err, "the partition is in not in public")
require.ErrorContains(t, err, "[kv:1062]Duplicate entry '15' for key 'test_global.idx_b'")
tk2.MustExec(`commit`)
waitFor(4, "delete reorganization")
tk2.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
tk2.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
err = tk2.ExecToErr(`insert into test_global values (15,15,15)`)
require.NoError(t, err)
tk2.MustExec(`begin`)
tk3.MustExec(`commit`)
tk.MustExec(`commit`)
<-syncChan
result := tk.MustQuery("select * from test_global;")
result.Sort().Check(testkit.Rows(`1 1 1`, `2 2 2`, `5 5 5`))
result.Sort().Check(testkit.Rows(`1 1 1`, `15 15 15`, `2 2 2`, `5 5 5`))

tt = external.GetTableByName(t, tk, "test", "test_global")
idxInfo := tt.Meta().FindIndexByName("idx_b")
Expand Down Expand Up @@ -1487,12 +1501,12 @@ func TestGlobalIndexUpdateInTruncatePartition(t *testing.T) {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
err := tk1.ExecToErr("update test_global set a = 2 where a = 11")
assert.NotNil(t, err)
assert.NoError(t, err)
}
})

tk.MustExec("alter table test_global truncate partition p1")
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("11 11 11", "12 12 12"))
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("2 11 11", "12 12 12"))
}

func TestGlobalIndexUpdateInTruncatePartition4Hash(t *testing.T) {
Expand All @@ -1515,7 +1529,7 @@ func TestGlobalIndexUpdateInTruncatePartition4Hash(t *testing.T) {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
err = tk1.ExecToErr("update test_global set a = 1 where a = 12")
assert.NotNil(t, err)
assert.NoError(t, err)
}
})

Expand Down Expand Up @@ -1577,7 +1591,7 @@ func TestGlobalIndexInsertInTruncatePartition(t *testing.T) {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
err = tk1.ExecToErr("insert into test_global values(2, 2, 2)")
assert.NotNil(t, err)
assert.NoError(t, err)
}
})

Expand Down Expand Up @@ -3168,6 +3182,8 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk2.MustExec(`COMMIT`)

/*
// Currently there is an duplicate entry issue, so it will rollback in WriteReorganization
// instead of continuing.
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)
Expand Down Expand Up @@ -3655,3 +3671,26 @@ func checkGlobalAndPK(t *testing.T, tk *testkit.TestKit, name string, indexes in
require.True(t, idxInfo.Primary)
}
}

func TestTruncateNumberOfPhases(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`create table t (a int primary key , b varchar(255)) partition by hash(a) partitions 3`)
ctx := tk.Session()
dom := domain.GetDomain(ctx)
dom.Reload()
schemaVersion := dom.InfoSchema().SchemaMetaVersion()
tk.MustExec(`insert into t values (1,1),(2,2),(3,3)`)
tk.MustExec(`alter table t truncate partition p1`)
dom.Reload()
// Without global index, truncate partition could be a single state change
require.Equal(t, int64(4), dom.InfoSchema().SchemaMetaVersion()-schemaVersion)
tk.MustExec(`drop table t`)
tk.MustExec(`create table t (a int primary key , b varchar(255), unique key (b) global) partition by hash(a) partitions 3`)
schemaVersion = dom.InfoSchema().SchemaMetaVersion()
tk.MustExec(`insert into t values (1,1),(2,2),(3,3)`)
tk.MustExec(`alter table t truncate partition p1`)
dom.Reload()
require.Equal(t, int64(4), dom.InfoSchema().SchemaMetaVersion()-schemaVersion)
}
Loading