Skip to content

Commit

Permalink
Manual merge of master into feature/reorganize-partition
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss committed Jan 25, 2023
1 parent 1e0956d commit 35dd111
Show file tree
Hide file tree
Showing 40 changed files with 2,454 additions and 226 deletions.
9 changes: 5 additions & 4 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Require review from domain experts when the PR modified significant config files.
/sessionctx/variable @pingcap/tidb-configuration-reviewer
/config/config.toml.example @pingcap/tidb-configuration-reviewer
/session/bootstrap.go @pingcap/tidb-configuration-reviewer
/telemetry/ @pingcap/telemetry-reviewer
# TODO: Enable these again before merging the feature branch to pingcap/master
#/sessionctx/variable @pingcap/tidb-configuration-reviewer
#/config/config.toml.example @pingcap/tidb-configuration-reviewer
#/session/bootstrap.go @pingcap/tidb-configuration-reviewer
#/telemetry/ @pingcap/telemetry-reviewer
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ go_test(
"//util/domainutil",
"//util/gcutil",
"//util/logutil",
"//util/mathutil",
"//util/mock",
"//util/sem",
"//util/sqlexec",
Expand Down
20 changes: 14 additions & 6 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
typeUpdateColumnWorker backfillerType = 1
typeCleanUpIndexWorker backfillerType = 2
typeAddIndexMergeTmpWorker backfillerType = 3
typeReorgPartitionWorker backfillerType = 4

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
Expand All @@ -80,6 +81,8 @@ func (bT backfillerType) String() string {
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
case typeReorgPartitionWorker:
return "reorganize partition"
default:
return "unknown"
}
Expand Down Expand Up @@ -141,6 +144,7 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
// 1: add-index
// 2: modify-column-type
// 3: clean-up global index
// 4: reorganize partition
//
// They all have a write reorganization state to back fill data into the rows existed.
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
Expand Down Expand Up @@ -606,7 +610,6 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return errors.Trace(err)
}

// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
dc.getReorgCtx(reorgInfo.Job.ID).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
Expand All @@ -620,7 +623,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return nil
}

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
func getBatchTasks(t table.PhysicalTable, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, batch)
physicalTableID := reorgInfo.PhysicalTableID
var prefix kv.Key
Expand Down Expand Up @@ -650,13 +653,11 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
endKey = prefix.PrefixNext()
}

//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
task := &reorgBackfillTask{
id: i,
jobID: reorgInfo.Job.ID,
physicalTableID: physicalTableID,
physicalTable: phyTbl,
physicalTable: t,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
Expand All @@ -672,7 +673,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
}

// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.PhysicalTable,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize)
if len(batchTasks) == 0 {
Expand Down Expand Up @@ -863,6 +864,13 @@ func (b *backfillScheduler) adjustWorkerSize() error {
idxWorker := newCleanUpIndexWorker(sessCtx, b.tbl, b.decodeColMap, reorgInfo, jc)
runner = newBackfillWorker(jc.ddlJobCtx, i, idxWorker)
worker = idxWorker
case typeReorgPartitionWorker:
partWorker, err := newReorgPartitionWorker(sessCtx, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
runner = newBackfillWorker(jc.ddlJobCtx, i, partWorker)
worker = partWorker
default:
return errors.New("unknown backfill type")
}
Expand Down
40 changes: 35 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,9 +1052,35 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf
return elements
}

func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error {
func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo)
if tbl, ok := t.(table.PartitionedTable); ok {
done := false
for !done {
p := tbl.GetPartition(reorgInfo.PhysicalTableID)
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
workType := typeReorgPartitionWorker
if reorgInfo.Job.Type != model.ActionReorganizePartition {
workType = typeUpdateColumnWorker
panic("FIXME: See https://github.com/pingcap/tidb/issues/39915")
}
err := w.writePhysicalTableRecord(w.sessPool, p, workType, reorgInfo)
if err != nil {
return err
}
done, err = w.updateReorgInfo(tbl, reorgInfo)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
if tbl, ok := t.(table.PhysicalTable); ok {
return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
}
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}

// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
Expand Down Expand Up @@ -1085,6 +1111,10 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
}
}

if _, ok := t.(table.PartitionedTable); ok {
// TODO: Remove this
panic("FIXME: this got reverted and needs to be back!")
}
// Get the original start handle and end handle.
currentVer, err := getValidCurrentVersion(reorgInfo.d.store)
if err != nil {
Expand Down Expand Up @@ -1213,8 +1243,8 @@ type rowRecord struct {
warning *terror.Error // It's used to record the cast warning of a record.
}

// getNextKey gets next handle of entry that we are going to process.
func (*updateColumnWorker) getNextKey(taskRange reorgBackfillTask,
// getNextHandleKey gets next handle of entry that we are going to process.
func getNextHandleKey(taskRange reorgBackfillTask,
taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) {
if !taskDone {
// The task is not done. So we need to pick the last processed entry's handle and add one.
Expand Down Expand Up @@ -1264,7 +1294,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
}

logutil.BgLogger().Debug("[ddl] txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.rowRecords, w.getNextKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
}

func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error {
Expand Down
4 changes: 2 additions & 2 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,15 @@ func TestTransactionWithWriteOnlyColumn(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t1 add column c int not null", done)
go backgroundExec(store, "test", "alter table t1 add column c int not null", done)
err := <-done
require.NoError(t, err)
require.NoError(t, checkErr)
tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
tk.MustExec("delete from t1")

// test transaction on drop column.
go backgroundExec(store, "alter table t1 drop column c", done)
go backgroundExec(store, "test", "alter table t1 drop column c", done)
err = <-done
require.NoError(t, err)
require.NoError(t, checkErr)
Expand Down
79 changes: 77 additions & 2 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,14 +1215,14 @@ func TestBitDefaultValue(t *testing.T) {
);`)
}

func backgroundExec(s kv.Storage, sql string, done chan error) {
func backgroundExec(s kv.Storage, schema, sql string, done chan error) {
se, err := session.CreateSession4Test(s)
if err != nil {
done <- errors.Trace(err)
return
}
defer se.Close()
_, err = se.Execute(context.Background(), "use test")
_, err = se.Execute(context.Background(), "use "+schema)
if err != nil {
done <- errors.Trace(err)
return
Expand Down Expand Up @@ -4311,3 +4311,78 @@ func TestRegexpFunctionsGeneratedColumn(t *testing.T) {

tk.MustExec("drop table if exists reg_like")
}

func TestReorgPartitionRangeFailure(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`create schema reorgfail`)
tk.MustExec("use reorgfail")

tk.MustExec("CREATE TABLE t (id int, d varchar(255)) partition by range (id) (partition p0 values less than (1000000), partition p1 values less than (2000000), partition p2 values less than (3000000))")
tk.MustContainErrMsg(`ALTER TABLE t REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (1000000))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
tk.MustContainErrMsg(`ALTER TABLE t REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (4000000))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
}

func TestReorgPartitionDocs(t *testing.T) {
// To test what is added as partition management in the docs
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`create schema reorgdocs`)
tk.MustExec("use reorgdocs")
tk.MustExec(`CREATE TABLE members (
id int,
fname varchar(255),
lname varchar(255),
dob date,
data json
)
PARTITION BY RANGE (YEAR(dob)) (
PARTITION pBefore1950 VALUES LESS THAN (1950),
PARTITION p1950 VALUES LESS THAN (1960),
PARTITION p1960 VALUES LESS THAN (1970),
PARTITION p1970 VALUES LESS THAN (1980),
PARTITION p1980 VALUES LESS THAN (1990),
PARTITION p1990 VALUES LESS THAN (2000))`)
tk.MustExec(`CREATE TABLE member_level (
id int,
level int,
achievements json
)
PARTITION BY LIST (level) (
PARTITION l1 VALUES IN (1),
PARTITION l2 VALUES IN (2),
PARTITION l3 VALUES IN (3),
PARTITION l4 VALUES IN (4),
PARTITION l5 VALUES IN (5));`)
tk.MustExec(`ALTER TABLE members DROP PARTITION p1990`)
tk.MustExec(`ALTER TABLE member_level DROP PARTITION l5`)
tk.MustExec(`ALTER TABLE members TRUNCATE PARTITION p1980`)
tk.MustExec(`ALTER TABLE member_level TRUNCATE PARTITION l4`)
tk.MustExec("ALTER TABLE members ADD PARTITION (PARTITION `p1990to2010` VALUES LESS THAN (2010))")
tk.MustExec(`ALTER TABLE member_level ADD PARTITION (PARTITION l5_6 VALUES IN (5,6))`)
tk.MustContainErrMsg(`ALTER TABLE members ADD PARTITION (PARTITION p1990 VALUES LESS THAN (2000))`, "[ddl:1493]VALUES LESS THAN value must be strictly increasing for each partition")
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p1990to2010 INTO
(PARTITION p1990 VALUES LESS THAN (2000),
PARTITION p2000 VALUES LESS THAN (2010),
PARTITION p2010 VALUES LESS THAN (2020),
PARTITION p2020 VALUES LESS THAN (2030),
PARTITION pMax VALUES LESS THAN (MAXVALUE))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l5_6 INTO
(PARTITION l5 VALUES IN (5),
PARTITION l6 VALUES IN (6))`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION pBefore1950,p1950 INTO (PARTITION pBefore1960 VALUES LESS THAN (1960))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l1,l2 INTO (PARTITION l1_2 VALUES IN (1,2))`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION pBefore1960,p1960,p1970,p1980,p1990,p2000,p2010,p2020,pMax INTO
(PARTITION p1800 VALUES LESS THAN (1900),
PARTITION p1900 VALUES LESS THAN (2000),
PARTITION p2000 VALUES LESS THAN (2100))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l1_2,l3,l4,l5,l6 INTO
(PARTITION lOdd VALUES IN (1,3,5),
PARTITION lEven VALUES IN (2,4,6))`)
tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p1800,p2000 INTO (PARTITION p2000 VALUES LESS THAN (2100))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
tk.MustExec(`INSERT INTO members VALUES (313, "John", "Doe", "2022-11-22", NULL)`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2050))`)
tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2020))`, "[table:1526]Table has no partition for value 2022")
tk.MustExec(`INSERT INTO member_level (id, level) values (313, 6)`)
tk.MustContainErrMsg(`ALTER TABLE member_level REORGANIZE PARTITION lEven INTO (PARTITION lEven VALUES IN (2,4))`, "[table:1526]Table has no partition for value 6")
}
Loading

0 comments on commit 35dd111

Please sign in to comment.