Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add support for ALTER TABLE t PARTITION BY ... | tidb-test=pr/2112 #42882

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,11 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
workType := typeReorgPartitionWorker
if reorgInfo.Job.Type != model.ActionReorganizePartition {
switch reorgInfo.Job.Type {
case model.ActionReorganizePartition,
model.ActionAlterTablePartitioning:
// Expected
default:
// workType = typeUpdateColumnWorker
// TODO: Support Modify Column on partitioned table
// https://github.com/pingcap/tidb/issues/38297
Expand Down
159 changes: 125 additions & 34 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3440,8 +3440,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
isAlterTable := true
err = d.renameTable(sctx, ident, newIdent, isAlterTable)
case ast.AlterTablePartition:
// Prevent silent succeed if user executes ALTER TABLE x PARTITION BY ...
err = errors.New("alter table partition is unsupported")
err = d.AlterTablePartitioning(sctx, ident, spec)
case ast.AlterTableOption:
var placementPolicyRef *model.PolicyRefInfo
for i, opt := range spec.Options {
Expand Down Expand Up @@ -3994,6 +3993,85 @@ func getReplacedPartitionIDs(names []model.CIStr, pi *model.PartitionInfo) (int,
return firstPartIdx, lastPartIdx, idMap, nil
}

func getPartitionInfoTypeNone() *model.PartitionInfo {
return &model.PartitionInfo{
Type: model.PartitionTypeNone,
Enable: true,
Definitions: []model.PartitionDefinition{{
Name: model.NewCIStr("pFullTable"),
Comment: "Intermediate partition during ALTER TABLE ... PARTITION BY ...",
}},
Num: 1,
}
}

// AlterTablePartitioning reorganize one set of partitions to a new set of partitions.
func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ident)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.FastGenByArgs(ident.Schema, ident.Name))
}

meta := t.Meta().Clone()
piOld := meta.GetPartitionInfo()
var partNames []model.CIStr
if piOld != nil {
partNames = make([]model.CIStr, 0, len(piOld.Definitions))
for i := range piOld.Definitions {
partNames = append(partNames, piOld.Definitions[i].Name)
}
} else {
piOld = getPartitionInfoTypeNone()
meta.Partition = piOld
partNames = append(partNames, piOld.Definitions[0].Name)
}
newMeta := meta.Clone()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meta is either cloned from t.Meta() or generated by getPartitionInfoTypeNone(), do we need to clone it again?

err = buildTablePartitionInfo(ctx, spec.Partition, newMeta)
if err != nil {
return err
}
newPartInfo := newMeta.Partition

if err = d.assignPartitionIDs(newPartInfo.Definitions); err != nil {
return errors.Trace(err)
}
// A new table ID would be needed for
// the global index, which cannot be the same as the current table id,
// since this table id will be removed in the final state when removing
// all the data with this table id.
var newID []int64
newID, err = d.genGlobalIDs(1)
if err != nil {
return errors.Trace(err)
}
newPartInfo.NewTableID = newID[0]

tzName, tzOffset := ddlutil.GetTimeZone(ctx)
job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionAlterTablePartitioning,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partNames, newPartInfo},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
}

// No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead.
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
if err == nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("The statistics of new partitions will be outdated after reorganizing partitions. Please use 'ANALYZE TABLE' statement if you want to update it now"))
}
return errors.Trace(err)
}

// ReorganizePartitions reorganize one set of partitions to a new set of partitions.
func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ident)
Expand Down Expand Up @@ -4022,7 +4100,7 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec
if err = d.assignPartitionIDs(partInfo.Definitions); err != nil {
return errors.Trace(err)
}
if err = checkReorgPartitionDefs(ctx, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil {
if err = checkReorgPartitionDefs(ctx, model.ActionReorganizePartition, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil {
return errors.Trace(err)
}
if err = handlePartitionPlacement(ctx, partInfo); err != nil {
Expand Down Expand Up @@ -4055,55 +4133,66 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec
return errors.Trace(err)
}

func checkReorgPartitionDefs(ctx sessionctx.Context, tblInfo *model.TableInfo, partInfo *model.PartitionInfo, firstPartIdx, lastPartIdx int, idMap map[int]struct{}) error {
func checkReorgPartitionDefs(ctx sessionctx.Context, action model.ActionType, tblInfo *model.TableInfo, partInfo *model.PartitionInfo, firstPartIdx, lastPartIdx int, idMap map[int]struct{}) error {
// partInfo contains only the new added partition, we have to combine it with the
// old partitions to check all partitions is strictly increasing.
pi := tblInfo.Partition
clonedMeta := tblInfo.Clone()
clonedMeta.Partition.AddingDefinitions = partInfo.Definitions
clonedMeta.Partition.Definitions = getReorganizedDefinitions(clonedMeta.Partition, firstPartIdx, lastPartIdx, idMap)
if action == model.ActionAlterTablePartitioning {
clonedMeta.Partition = partInfo
clonedMeta.ID = partInfo.NewTableID
}
if err := checkPartitionDefinitionConstraints(ctx, clonedMeta); err != nil {
return errors.Trace(err)
}
if pi.Type == model.PartitionTypeRange {
if lastPartIdx == len(pi.Definitions)-1 {
// Last partition dropped, OK to change the end range
// Also includes MAXVALUE
return nil
}
// Check if the replaced end range is the same as before
lastAddingPartition := partInfo.Definitions[len(partInfo.Definitions)-1]
lastOldPartition := pi.Definitions[lastPartIdx]
if len(pi.Columns) > 0 {
newGtOld, err := checkTwoRangeColumns(ctx, &lastAddingPartition, &lastOldPartition, pi, tblInfo)
if action == model.ActionReorganizePartition {
if pi.Type == model.PartitionTypeRange {
if lastPartIdx == len(pi.Definitions)-1 {
// Last partition dropped, OK to change the end range
// Also includes MAXVALUE
return nil
}
// Check if the replaced end range is the same as before
lastAddingPartition := partInfo.Definitions[len(partInfo.Definitions)-1]
lastOldPartition := pi.Definitions[lastPartIdx]
if len(pi.Columns) > 0 {
newGtOld, err := checkTwoRangeColumns(ctx, &lastAddingPartition, &lastOldPartition, pi, tblInfo)
if err != nil {
return errors.Trace(err)
}
if newGtOld {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
}
oldGtNew, err := checkTwoRangeColumns(ctx, &lastOldPartition, &lastAddingPartition, pi, tblInfo)
if err != nil {
return errors.Trace(err)
}
if oldGtNew {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
}
return nil
}

isUnsigned := isPartExprUnsigned(tblInfo)
currentRangeValue, _, err := getRangeValue(ctx, pi.Definitions[lastPartIdx].LessThan[0], isUnsigned)
if err != nil {
return errors.Trace(err)
}
if newGtOld {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
}
oldGtNew, err := checkTwoRangeColumns(ctx, &lastOldPartition, &lastAddingPartition, pi, tblInfo)
newRangeValue, _, err := getRangeValue(ctx, partInfo.Definitions[len(partInfo.Definitions)-1].LessThan[0], isUnsigned)
if err != nil {
return errors.Trace(err)
}
if oldGtNew {

if currentRangeValue != newRangeValue {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
}
return nil
}

isUnsigned := isPartExprUnsigned(tblInfo)
currentRangeValue, _, err := getRangeValue(ctx, pi.Definitions[lastPartIdx].LessThan[0], isUnsigned)
if err != nil {
return errors.Trace(err)
}
newRangeValue, _, err := getRangeValue(ctx, partInfo.Definitions[len(partInfo.Definitions)-1].LessThan[0], isUnsigned)
if err != nil {
return errors.Trace(err)
}

if currentRangeValue != newRangeValue {
return errors.Trace(dbterror.ErrRangeNotIncreasing)
} else {
if len(pi.Definitions) != (lastPartIdx - firstPartIdx + 1) {
// if not ActionReorganizePartition, require all partitions to be changed.
return errors.Trace(dbterror.ErrAlterOperationNotSupported)
}
}
return nil
Expand Down Expand Up @@ -7064,6 +7153,8 @@ func validateCommentLength(vars *variable.SessionVars, name string, comment *str
// BuildAddedPartitionInfo build alter table add partition info
func BuildAddedPartitionInfo(ctx sessionctx.Context, meta *model.TableInfo, spec *ast.AlterTableSpec) (*model.PartitionInfo, error) {
switch meta.Partition.Type {
case model.PartitionTypeNone:
// OK
case model.PartitionTypeList:
if len(spec.PartDefinitions) == 0 {
return nil, ast.ErrPartitionsMustBeDefined.GenWithStackByArgs(meta.Partition.Type)
Expand Down
30 changes: 30 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onTTLInfoChange(d, t, job)
case model.ActionAlterTTLRemove:
ver, err = onTTLInfoRemove(d, t, job)
case model.ActionAlterTablePartitioning:
ver, err = w.onAlterTablePartitioning(d, t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -1371,6 +1373,34 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
}
}
}
case model.ActionAlterTablePartitioning:
diff.OldTableID = job.TableID
if job.SchemaState == model.StateDeleteReorganization {
// Final part, new table id is assigned
partInfo := &model.PartitionInfo{}
var partNames []model.CIStr
err = job.DecodeArgs(&partNames, &partInfo)
if err != nil {
return 0, errors.Trace(err)
}
diff.TableID = partInfo.NewTableID
if len(job.CtxVars) > 0 {
if droppedIDs, ok := job.CtxVars[0].([]int64); ok {
if addedIDs, ok := job.CtxVars[1].([]int64); ok {
// to use AffectedOpts we need both new and old to have the same length
maxParts := mathutil.Max[int](len(droppedIDs), len(addedIDs))
// Also initialize them to 0!
oldIDs := make([]int64, maxParts)
copy(oldIDs, droppedIDs)
newIDs := make([]int64, maxParts)
copy(newIDs, addedIDs)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
}
}
} else {
diff.TableID = job.TableID
}
case model.ActionCreateTable:
diff.TableID = job.TableID
if len(job.Args) > 0 {
Expand Down
Loading