Skip to content

Commit

Permalink
Added support for repartitioning an already partitioned table
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss committed Apr 10, 2023
1 parent cb3020b commit 592ad80
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 122 deletions.
77 changes: 44 additions & 33 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4016,7 +4016,7 @@ func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, sp
piOld := meta.GetPartitionInfo()
var partNames []model.CIStr
if piOld != nil {
partNames := make([]model.CIStr, 0, len(piOld.Definitions))
partNames = make([]model.CIStr, 0, len(piOld.Definitions))
for i := range piOld.Definitions {
partNames = append(partNames, piOld.Definitions[i].Name)
}
Expand Down Expand Up @@ -4100,7 +4100,7 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec
if err = d.assignPartitionIDs(partInfo.Definitions); err != nil {
return errors.Trace(err)
}
if err = checkReorgPartitionDefs(ctx, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil {
if err = checkReorgPartitionDefs(ctx, model.ActionAlterTablePartitioning, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil {
return errors.Trace(err)
}
if err = handlePartitionPlacement(ctx, partInfo); err != nil {
Expand Down Expand Up @@ -4133,55 +4133,66 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec
return errors.Trace(err)
}

func checkReorgPartitionDefs(ctx sessionctx.Context, tblInfo *model.TableInfo, partInfo *model.PartitionInfo, firstPartIdx, lastPartIdx int, idMap map[int]struct{}) error {
func checkReorgPartitionDefs(ctx sessionctx.Context, action model.ActionType, tblInfo *model.TableInfo, partInfo *model.PartitionInfo, firstPartIdx, lastPartIdx int, idMap map[int]struct{}) error {
// partInfo contains only the new added partition, we have to combine it with the
// old partitions to check all partitions is strictly increasing.
pi := tblInfo.Partition
clonedMeta := tblInfo.Clone()
clonedMeta.Partition.AddingDefinitions = partInfo.Definitions
clonedMeta.Partition.Definitions = getReorganizedDefinitions(clonedMeta.Partition, firstPartIdx, lastPartIdx, idMap)
if action == model.ActionAlterTablePartitioning {
clonedMeta.Partition = partInfo
clonedMeta.ID = partInfo.NewTableID
}
if err := checkPartitionDefinitionConstraints(ctx, clonedMeta); err != nil {
return errors.Trace(err)
}
if pi.Type == model.PartitionTypeRange {
if lastPartIdx == len(pi.Definitions)-1 {
// Last partition dropped, OK to change the end range
// Also includes MAXVALUE
return nil
}
// Check if the replaced end range is the same as before
lastAddingPartition := partInfo.Definitions[len(partInfo.Definitions)-1]
lastOldPartition := pi.Definitions[lastPartIdx]
if len(pi.Columns) > 0 {
newGtOld, err := checkTwoRangeColumns(ctx, &lastAddingPartition, &lastOldPartition, pi, tblInfo)
if action == model.ActionReorganizePartition {
if pi.Type == model.PartitionTypeRange {
if lastPartIdx == len(pi.Definitions)-1 {
// Last partition dropped, OK to change the end range
// Also includes MAXVALUE
return nil
}
// Check if the replaced end range is the same as before
lastAddingPartition := partInfo.Definitions[len(partInfo.Definitions)-1]
lastOldPartition := pi.Definitions[lastPartIdx]
if len(pi.Columns) > 0 {
newGtOld, err := checkTwoRangeColumns(ctx, &lastAddingPartition, &lastOldPartition, pi, tblInfo)
if err != nil {
return errors.Trace(err)
}
if newGtOld {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
}
oldGtNew, err := checkTwoRangeColumns(ctx, &lastOldPartition, &lastAddingPartition, pi, tblInfo)
if err != nil {
return errors.Trace(err)
}
if oldGtNew {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
}
return nil
}

isUnsigned := isPartExprUnsigned(tblInfo)
currentRangeValue, _, err := getRangeValue(ctx, pi.Definitions[lastPartIdx].LessThan[0], isUnsigned)
if err != nil {
return errors.Trace(err)
}
if newGtOld {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
}
oldGtNew, err := checkTwoRangeColumns(ctx, &lastOldPartition, &lastAddingPartition, pi, tblInfo)
newRangeValue, _, err := getRangeValue(ctx, partInfo.Definitions[len(partInfo.Definitions)-1].LessThan[0], isUnsigned)
if err != nil {
return errors.Trace(err)
}
if oldGtNew {

if currentRangeValue != newRangeValue {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
}
return nil
}

isUnsigned := isPartExprUnsigned(tblInfo)
currentRangeValue, _, err := getRangeValue(ctx, pi.Definitions[lastPartIdx].LessThan[0], isUnsigned)
if err != nil {
return errors.Trace(err)
}
newRangeValue, _, err := getRangeValue(ctx, partInfo.Definitions[len(partInfo.Definitions)-1].LessThan[0], isUnsigned)
if err != nil {
return errors.Trace(err)
}

if currentRangeValue != newRangeValue {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
} else {
if len(pi.Definitions) != (lastPartIdx - firstPartIdx + 1) {
// if not ActionReorganizePartition, require all partitions to be changed.
return errors.Trace(dbterror.ErrAlterOperationNotSupported)
}
}
return nil
Expand Down
28 changes: 28 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,34 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
}
}
}
case model.ActionAlterTablePartitioning:
diff.OldTableID = job.TableID
if job.SchemaState == model.StateDeleteReorganization {
// Final part, new table id is assigned
partInfo := &model.PartitionInfo{}
var partNames []model.CIStr
err = job.DecodeArgs(&partNames, &partInfo)
if err != nil {
return 0, errors.Trace(err)
}
diff.TableID = partInfo.NewTableID
if len(job.CtxVars) > 0 {
if droppedIDs, ok := job.CtxVars[0].([]int64); ok {
if addedIDs, ok := job.CtxVars[1].([]int64); ok {
// to use AffectedOpts we need both new and old to have the same length
maxParts := mathutil.Max[int](len(droppedIDs), len(addedIDs))
// Also initialize them to 0!
oldIDs := make([]int64, maxParts)
copy(oldIDs, droppedIDs)
newIDs := make([]int64, maxParts)
copy(newIDs, addedIDs)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
}
}
} else {
diff.TableID = job.TableID
}
case model.ActionCreateTable:
diff.TableID = job.TableID
if len(job.Args) > 0 {
Expand Down
71 changes: 40 additions & 31 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,9 +1770,9 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
tblInfo.Partition = nil
} else {
tblInfo.Partition.NewTableID = 0
tblInfo.Partition.NewExpr = ""
tblInfo.Partition.NewColumns = nil
tblInfo.Partition.NewType = model.PartitionTypeNone
tblInfo.Partition.DDLExpr = ""
tblInfo.Partition.DDLColumns = nil
tblInfo.Partition.DDLType = model.PartitionTypeNone
}
}

Expand Down Expand Up @@ -2257,13 +2257,19 @@ func getReorgPartitionInfo(t *meta.Meta, job *model.Job) (*model.TableInfo, []mo
if tblInfo.Partition != nil {
addingDefs = tblInfo.Partition.AddingDefinitions
droppingDefs = tblInfo.Partition.DroppingDefinitions
if job.Type == model.ActionAlterTablePartitioning {
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
}
} else {
tblInfo.Partition = getPartitionInfoTypeNone()
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.Definitions[0].ID = tblInfo.ID
tblInfo.Partition.NewType = partInfo.Type
tblInfo.Partition.NewExpr = partInfo.Expr
tblInfo.Partition.NewColumns = partInfo.Columns
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
}
if len(addingDefs) == 0 {
addingDefs = []model.PartitionDefinition{}
Expand Down Expand Up @@ -2327,23 +2333,27 @@ func (w *worker) onAlterTablePartitioning(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}
// TODO: How to carrie over AUTO_INCREMENT etc.?
err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Del()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// Check if they are carried over in ApplyDiff?!?
/*
err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Del()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
*/
// TODO: Add failpoint here
definitionsToAdd := tblInfo.Partition.AddingDefinitions
tblInfo.Partition.AddingDefinitions = nil
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.Type = tblInfo.Partition.NewType
tblInfo.Partition.Expr = tblInfo.Partition.NewExpr
tblInfo.Partition.Columns = tblInfo.Partition.NewColumns
tblInfo.Partition.Type = tblInfo.Partition.DDLType
tblInfo.Partition.Expr = tblInfo.Partition.DDLExpr
tblInfo.Partition.Columns = tblInfo.Partition.DDLColumns
tblInfo.ID = tblInfo.Partition.NewTableID
tblInfo.Partition.NewType = model.PartitionTypeNone
tblInfo.Partition.NewExpr = ""
tblInfo.Partition.NewColumns = nil
tblInfo.Partition.DDLType = model.PartitionTypeNone
tblInfo.Partition.DDLExpr = ""
tblInfo.Partition.DDLColumns = nil
tblInfo.Partition.NewTableID = 0
tblInfo.Partition.DDLState = model.StateNone

err = t.CreateTableOrView(job.SchemaID, tblInfo)
if err != nil {
Expand All @@ -2358,8 +2368,6 @@ func (w *worker) onAlterTablePartitioning(d *ddlCtx, t *meta.Meta, job *model.Jo
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateNone
tblInfo.Partition.DDLState = model.StateNone
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
// Register for update the statistics
// Seems to only trigger asynchronous.
Expand Down Expand Up @@ -2394,9 +2402,6 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
partNames[i] = partNamesCIStr[i].L
}

// In order to skip maintaining the state check in partitionDefinition, TiDB use dropping/addingDefinition instead of state field.
// So here using `job.SchemaState` to judge what the stage of this job is.
originalState := job.SchemaState
switch job.SchemaState {
case model.StateNone:
// job.SchemaState == model.StateNone means the job is in the initial state of reorg partition.
Expand Down Expand Up @@ -2426,7 +2431,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
return ver, err
}
sctx := w.sess.Context
if err = checkReorgPartitionDefs(sctx, tblInfo, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil {
if err = checkReorgPartitionDefs(sctx, job.Type, tblInfo, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil {
job.State = model.JobStateCancelled
return ver, err
}
Expand Down Expand Up @@ -2555,18 +2560,18 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
}
}

job.SchemaState = model.StateWriteOnly
tblInfo.Partition.DDLState = model.StateWriteOnly
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.2 / float64(math.MaxUint64))
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// Insert this state to confirm all servers can see the new partitions when reorg is running,
// so that new data will be updated in both old and new partitions when reorganizing.
job.SnapshotVer = 0
job.SchemaState = model.StateWriteReorganization
tblInfo.Partition.DDLState = model.StateWriteReorganization
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64))
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
tbl, err2 := getTable(d.store, job.SchemaID, tblInfo)
Expand Down Expand Up @@ -2597,16 +2602,21 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
}
newDefs := getReorganizedDefinitions(tblInfo.Partition, firstPartIdx, lastPartIdx, idMap)

// From now on, use the new definitions, but keep the Adding and Dropping for double write
// From now on, use the new partitioning, but keep the Adding and Dropping for double write
tblInfo.Partition.Definitions = newDefs
tblInfo.Partition.Num = uint64(len(newDefs))
if job.Type == model.ActionAlterTablePartitioning {
tblInfo.Partition.Type, tblInfo.Partition.DDLType = tblInfo.Partition.DDLType, tblInfo.Partition.Type
tblInfo.Partition.Expr, tblInfo.Partition.DDLExpr = tblInfo.Partition.DDLExpr, tblInfo.Partition.Expr
tblInfo.Partition.Columns, tblInfo.Partition.DDLColumns = tblInfo.Partition.DDLColumns, tblInfo.Partition.Columns
}

// Now all the data copying is done, but we cannot simply remove the droppingDefinitions
// since they are a part of the normal Definitions that other nodes with
// the current schema version. So we need to double write for one more schema version
job.SchemaState = model.StateDeleteReorganization
tblInfo.Partition.DDLState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
job.SchemaState = model.StateDeleteReorganization

case model.StateDeleteReorganization:
// Drop the droppingDefinitions and finish the DDL
Expand Down Expand Up @@ -2635,7 +2645,6 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateNone
tblInfo.Partition.DDLState = model.StateNone
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
// How to handle this?
Expand Down
18 changes: 18 additions & 0 deletions ddl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,22 @@ func TestAlterPartitionByRange(t *testing.T) {
"(PARTITION `p0` VALUES LESS THAN (1000000),\n" +
" PARTITION `p1` VALUES LESS THAN (2000000),\n" +
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
tk.MustExec(`alter table t partition by hash(a) partitions 7`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(11) NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY HASH (`a`) PARTITIONS 7"))
tk.MustExec(`alter table t partition by key(a) partitions 5`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(11) NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY KEY (`a`) PARTITIONS 5"))
}
5 changes: 4 additions & 1 deletion infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
return b.applyCreateTables(m, diff)
case model.ActionReorganizePartition:
return b.applyReorganizePartition(m, diff)
case model.ActionAlterTablePartitioning:
return b.applyReorganizePartition(m, diff)
case model.ActionFlashbackCluster:
return []int64{-1}, nil
default:
Expand Down Expand Up @@ -411,7 +413,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
newTableID = diff.TableID
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
oldTableID = diff.TableID
case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition:
case model.ActionTruncateTable, model.ActionCreateView,
model.ActionExchangeTablePartition, model.ActionAlterTablePartitioning:
oldTableID = diff.OldTableID
newTableID = diff.TableID
default:
Expand Down
2 changes: 2 additions & 0 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ func NewSequenceAllocator(store kv.Storage, dbID, tbID int64, info *model.Sequen
}
}

// TODO: Handle allocators when changing Table ID during ALTER TABLE t PARTITION BY ...

// NewAllocatorsFromTblInfo creates an array of allocators of different types with the information of model.TableInfo.
func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) Allocators {
var allocs []Allocator
Expand Down
11 changes: 7 additions & 4 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,12 +1171,15 @@ type PartitionInfo struct {
// Only used during ReorganizePartition so far
DDLState SchemaState `json:"ddl_state"`
// Set during ALTER TABLE ... if the table id needs to change
// like if there is a global index
// like if there is a global index or going between non-partitioned
// and partitioned table, to make the data dropping / range delete
// optimized.
NewTableID int64 `json:"new_table_id"`
// Set during ALTER TABLE ... PARTITION BY ...
NewType PartitionType `json:"new_type"`
NewExpr string `json:"new_expr"`
NewColumns []CIStr `json:"new_columns"`
// First as the new partition scheme, then in StateDeleteReorg as the old
DDLType PartitionType `json:"ddl_type"`
DDLExpr string `json:"ddl_expr"`
DDLColumns []CIStr `json:"ddl_columns"`
}

// Clone clones itself.
Expand Down
Loading

0 comments on commit 592ad80

Please sign in to comment.