Skip to content

Commit

Permalink
Post merge fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss committed Dec 30, 2022
1 parent 94b65bd commit 7ef86c3
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 48 deletions.
14 changes: 12 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
typeUpdateColumnWorker backfillerType = 1
typeCleanUpIndexWorker backfillerType = 2
typeAddIndexMergeTmpWorker backfillerType = 3
typeReorgPartitionWorker backfillerType = 4

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
Expand All @@ -74,6 +75,8 @@ func (bT backfillerType) String() string {
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
case typeReorgPartitionWorker:
return "reorganize partition"
default:
return "unknown"
}
Expand Down Expand Up @@ -126,6 +129,7 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
// 1: add-index
// 2: modify-column-type
// 3: clean-up global index
// 4: reorganize partition
//
// They all have a write reorganization state to back fill data into the rows existed.
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
Expand Down Expand Up @@ -238,7 +242,7 @@ type backfillWorker struct {
sessCtx sessionctx.Context
taskCh chan *reorgBackfillTask
resultCh chan *backfillResult
table table.Table
table table.PhysicalTable
priority int
tp backfillerType
ctx context.Context
Expand Down Expand Up @@ -540,7 +544,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
}

// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.PhysicalTable,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := make([]*reorgBackfillTask, 0, backfillTaskChanSize)
reorgInfo := scheduler.reorgInfo
Expand Down Expand Up @@ -766,6 +770,12 @@ func (b *backfillScheduler) adjustWorkerSize() error {
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
worker, runner = idxWorker, idxWorker.backfillWorker
case typeReorgPartitionWorker:
partWorker, err := newReorgPartitionWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
worker, runner = partWorker, partWorker.backfillWorker
default:
return errors.New("unknown backfill type")
}
Expand Down
56 changes: 56 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4578,3 +4578,59 @@ func TestDropPartitionKeyColumn(t *testing.T) {
require.Equal(t, "[ddl:3855]Column 'a' has a partitioning function dependency and cannot be dropped or renamed", err.Error())
tk.MustExec("alter table t4 drop column b")
}

func TestReorgPartitionConcurrent(t *testing.T) {
t.Skip("Needs PR 38460 as well")
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "ReorgPartConcurrent"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` +
` partition by range (a) ` +
`(partition p0 values less than (10),` +
` partition p1 values less than (20),` +
` partition pMax values less than (MAXVALUE))`)
tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`)
dom := domain.GetDomain(tk.Session())
originHook := dom.DDL().GetHook()
defer dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{Do: dom}
dom.DDL().SetHook(hook)

wait := make(chan bool)
defer close(wait)

injected := false
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization && !injected {
injected = true
<-wait
<-wait
}
}
alterErr := make(chan error, 1)
go backgroundExec(store, schemaName, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr)
wait <- true
tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`)
wait <- true
require.NoError(t, <-alterErr)
tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+
"12 12 21",
"14 14 14",
"15 15 15"))
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" `c` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`),\n" +
" KEY `c` (`c`,`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`a`)\n" +
"(PARTITION `p0` VALUES LESS THAN (10),\n" +
" PARTITION `p1a` VALUES LESS THAN (15),\n" +
" PARTITION `p1b` VALUES LESS THAN (20),\n" +
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
}
4 changes: 2 additions & 2 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
}
}
rh := newReorgHandler(t, w.sess)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, pt, physicalTableIDs, elements)

if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down Expand Up @@ -2161,7 +2161,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo

func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, physTblIDs []int64) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
rh := newReorgHandler(t, w.sess)
elements := BuildElements(tbl.Meta().Columns[0], tbl.Meta().Indices)
partTbl, ok := tbl.(table.PartitionedTable)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
return &info, nil
}

func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.PartitionedTable, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand Down
4 changes: 0 additions & 4 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,10 +1066,6 @@ func (do *Domain) Init(
return err
}

do.wg.Run(func() {
do.runTTLJobManager(ctx)
})

return nil
}

Expand Down
12 changes: 8 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3524,10 +3524,14 @@ func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []i
keyColOffsets[i] = offset
}

pe, err := pt.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
t, ok := pt.(interface {
PartitionExpr() *tables.PartitionExpr
})
if !ok {
return nil
}
pe := t.PartitionExpr()
if pe == nil {
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,6 @@ func EvaluateExprWithNull(ctx sessionctx.Context, schema *Schema, expr Expressio
expr, _ = evaluateExprWithNullInNullRejectCheck(ctx, schema, expr)
return expr
}
if ctx.GetSessionVars().StmtCtx.InNullRejectCheck {
expr, _ = evaluateExprWithNullInNullRejectCheck(ctx, schema, expr)
return expr
}
return evaluateExprWithNull(ctx, schema, expr)
}

Expand Down
4 changes: 0 additions & 4 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,10 +1670,6 @@ func TestVariablesInfo(t *testing.T) {

tk := testkit.NewTestKit(t, store)

if !variable.EnableConcurrentDDL.Load() {
t.Skip("skip test when concurrent DDL is disabled")
}

tk.MustExec("use information_schema")
tk.MustExec("SET GLOBAL innodb_compression_level = 8;")

Expand Down
23 changes: 0 additions & 23 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,29 +637,6 @@ func (m *Meta) GetMetadataLock() (enable bool, isNull bool, err error) {
return bytes.Equal(val, []byte("1")), false, nil
}

// SetMetadataLock sets the metadata lock.
func (m *Meta) SetMetadataLock(b bool) error {
var data []byte
if b {
data = []byte("1")
} else {
data = []byte("0")
}
return errors.Trace(m.txn.Set(mMetaDataLock, data))
}

// GetMetadataLock gets the metadata lock.
func (m *Meta) GetMetadataLock() (enable bool, isNull bool, err error) {
val, err := m.txn.Get(mMetaDataLock)
if err != nil {
return false, false, errors.Trace(err)
}
if len(val) == 0 {
return false, true, nil
}
return bytes.Equal(val, []byte("1")), false, nil
}

// CreateTableAndSetAutoID creates a table with tableInfo in database,
// and rebases the table autoID.
func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoIncID, autoRandID int64) error {
Expand Down
4 changes: 0 additions & 4 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1982,10 +1982,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
}
attachPlan2Task(proj, newMpp)
return newMpp
case NoMpp:
t = mpp.convertToRootTask(p.ctx)
attachPlan2Task(p, t)
return t
default:
return invalidTask
}
Expand Down

0 comments on commit 7ef86c3

Please sign in to comment.