From 545a313c84abbc6aeb78c411feb37b0fdd339eed Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 10 Apr 2023 02:44:16 +0200 Subject: [PATCH 1/4] WIP Add support for ALTER TABLE t PARTITION BY ... --- ddl/column.go | 6 +- ddl/ddl_api.go | 84 +++++++++++++++++- ddl/ddl_worker.go | 2 + ddl/partition.go | 156 ++++++++++++++++++++++++++++++---- ddl/partition_test.go | 28 ++++++ expression/simple_rewriter.go | 3 + parser/model/ddl.go | 2 + parser/model/model.go | 12 +++ table/tables/partition.go | 16 ++++ 9 files changed, 288 insertions(+), 21 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 3c8a31156c6d7..fecb4a6ce8d90 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1063,7 +1063,11 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID) } workType := typeReorgPartitionWorker - if reorgInfo.Job.Type != model.ActionReorganizePartition { + switch reorgInfo.Job.Type { + case model.ActionReorganizePartition, + model.ActionAlterTablePartitioning: + // Expected + default: // workType = typeUpdateColumnWorker // TODO: Support Modify Column on partitioned table // https://github.com/pingcap/tidb/issues/38297 diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 58cc5030d24b0..8da0548ddb8ea 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3440,8 +3440,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast isAlterTable := true err = d.renameTable(sctx, ident, newIdent, isAlterTable) case ast.AlterTablePartition: - // Prevent silent succeed if user executes ALTER TABLE x PARTITION BY ... - err = errors.New("alter table partition is unsupported") + err = d.AlterTablePartitioning(sctx, ident, spec) case ast.AlterTableOption: var placementPolicyRef *model.PolicyRefInfo for i, opt := range spec.Options { @@ -3994,6 +3993,85 @@ func getReplacedPartitionIDs(names []model.CIStr, pi *model.PartitionInfo) (int, return firstPartIdx, lastPartIdx, idMap, nil } +func getPartitionInfoTypeNone() *model.PartitionInfo { + return &model.PartitionInfo{ + Type: model.PartitionTypeNone, + Enable: true, + Definitions: []model.PartitionDefinition{{ + Name: model.NewCIStr("pFullTable"), + Comment: "Intermediate partition during ALTER TABLE ... PARTITION BY ...", + }}, + Num: 1, + } +} + +// AlterTablePartitioning reorganize one set of partitions to a new set of partitions. +func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { + schema, t, err := d.getSchemaAndTableByIdent(ctx, ident) + if err != nil { + return errors.Trace(infoschema.ErrTableNotExists.FastGenByArgs(ident.Schema, ident.Name)) + } + + meta := t.Meta().Clone() + piOld := meta.GetPartitionInfo() + var partNames []model.CIStr + if piOld != nil { + partNames := make([]model.CIStr, 0, len(piOld.Definitions)) + for i := range piOld.Definitions { + partNames = append(partNames, piOld.Definitions[i].Name) + } + } else { + piOld = getPartitionInfoTypeNone() + meta.Partition = piOld + partNames = append(partNames, piOld.Definitions[0].Name) + } + newMeta := meta.Clone() + err = buildTablePartitionInfo(ctx, spec.Partition, newMeta) + if err != nil { + return err + } + newPartInfo := newMeta.Partition + + if err = d.assignPartitionIDs(newPartInfo.Definitions); err != nil { + return errors.Trace(err) + } + // A new table ID would be needed for + // the global index, which cannot be the same as the current table id, + // since this table id will be removed in the final state when removing + // all the data with this table id. + var newID []int64 + newID, err = d.genGlobalIDs(1) + if err != nil { + return errors.Trace(err) + } + newPartInfo.NewTableID = newID[0] + + tzName, tzOffset := ddlutil.GetTimeZone(ctx) + job := &model.Job{ + SchemaID: schema.ID, + TableID: meta.ID, + SchemaName: schema.Name.L, + TableName: t.Meta().Name.L, + Type: model.ActionAlterTablePartitioning, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{partNames, newPartInfo}, + ReorgMeta: &model.DDLReorgMeta{ + SQLMode: ctx.GetSessionVars().SQLMode, + Warnings: make(map[errors.ErrorID]*terror.Error), + WarningsCount: make(map[errors.ErrorID]int64), + Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, + }, + } + + // No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead. + err = d.DoDDLJob(ctx, job) + err = d.callHookOnChanged(job, err) + if err == nil { + ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("The statistics of new partitions will be outdated after reorganizing partitions. Please use 'ANALYZE TABLE' statement if you want to update it now")) + } + return errors.Trace(err) +} + // ReorganizePartitions reorganize one set of partitions to a new set of partitions. func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { schema, t, err := d.getSchemaAndTableByIdent(ctx, ident) @@ -7064,6 +7142,8 @@ func validateCommentLength(vars *variable.SessionVars, name string, comment *str // BuildAddedPartitionInfo build alter table add partition info func BuildAddedPartitionInfo(ctx sessionctx.Context, meta *model.TableInfo, spec *ast.AlterTableSpec) (*model.PartitionInfo, error) { switch meta.Partition.Type { + case model.PartitionTypeNone: + // OK case model.PartitionTypeList: if len(spec.PartDefinitions) == 0 { return nil, ast.ErrPartitionsMustBeDefined.GenWithStackByArgs(meta.Partition.Type) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 5536582a8576f..205960f247f4d 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1086,6 +1086,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onTTLInfoChange(d, t, job) case model.ActionAlterTTLRemove: ver, err = onTTLInfoRemove(d, t, job) + case model.ActionAlterTablePartitioning: + ver, err = w.onAlterTablePartitioning(d, t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/partition.go b/ddl/partition.go index 716d4fc7bb18e..156ac5e683c1d 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1741,7 +1741,8 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( if err != nil { return ver, errors.Trace(err) } - if job.Type == model.ActionAddTablePartition || job.Type == model.ActionReorganizePartition { + if job.Type == model.ActionAddTablePartition || job.Type == model.ActionReorganizePartition || + job.Type == model.ActionAlterTablePartitioning { // It is rollback from reorganize partition, just remove DroppingDefinitions from tableInfo tblInfo.Partition.DroppingDefinitions = nil // It is rollback from adding table partition, just remove addingDefinitions from tableInfo. @@ -1761,6 +1762,19 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( job.State = model.JobStateCancelled return ver, err } + if job.Type == model.ActionAlterTablePartitioning { + // Also remove anything with the new table id + physicalTableIDs = append(physicalTableIDs, tblInfo.Partition.NewTableID) + // Reset if it was normal table before + if tblInfo.Partition.Type == model.PartitionTypeNone { + tblInfo.Partition = nil + } else { + tblInfo.Partition.NewTableID = 0 + tblInfo.Partition.NewExpr = "" + tblInfo.Partition.NewColumns = nil + tblInfo.Partition.NewType = model.PartitionTypeNone + } + } ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) if err != nil { @@ -2226,7 +2240,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, nil } -func checkReorgPartition(t *meta.Meta, job *model.Job) (*model.TableInfo, []model.CIStr, *model.PartitionInfo, []model.PartitionDefinition, []model.PartitionDefinition, error) { +func getReorgPartitionInfo(t *meta.Meta, job *model.Job) (*model.TableInfo, []model.CIStr, *model.PartitionInfo, []model.PartitionDefinition, []model.PartitionDefinition, error) { schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil { @@ -2239,8 +2253,18 @@ func checkReorgPartition(t *meta.Meta, job *model.Job) (*model.TableInfo, []mode job.State = model.JobStateCancelled return nil, nil, nil, nil, nil, errors.Trace(err) } - addingDefs := tblInfo.Partition.AddingDefinitions - droppingDefs := tblInfo.Partition.DroppingDefinitions + var addingDefs, droppingDefs []model.PartitionDefinition + if tblInfo.Partition != nil { + addingDefs = tblInfo.Partition.AddingDefinitions + droppingDefs = tblInfo.Partition.DroppingDefinitions + } else { + tblInfo.Partition = getPartitionInfoTypeNone() + tblInfo.Partition.NewTableID = partInfo.NewTableID + tblInfo.Partition.Definitions[0].ID = tblInfo.ID + tblInfo.Partition.NewType = partInfo.Type + tblInfo.Partition.NewExpr = partInfo.Expr + tblInfo.Partition.NewColumns = partInfo.Columns + } if len(addingDefs) == 0 { addingDefs = []model.PartitionDefinition{} } @@ -2250,6 +2274,104 @@ func checkReorgPartition(t *meta.Meta, job *model.Job) (*model.TableInfo, []mode return tblInfo, partNames, partInfo, droppingDefs, addingDefs, nil } +// onAlterTablePartitioning uses onReorganizePartition to do the heavy loading, +// and only some differences in how to finalize the table with a different partitioning scheme. +func (w *worker) onAlterTablePartitioning(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + // Handle the rolling back job + if job.IsRollingback() { + // TODO: Check and test + // including restoring to non-partitioned table if the original was (I.e. if the table id == first partition id) + ver, err = w.onDropTablePartition(d, t, job) + if err != nil { + return ver, errors.Trace(err) + } + return + } + + // In order to skip maintaining the state check in partitionDefinition, TiDB use dropping/addingDefinition instead of state field. + // So here using `job.SchemaState` to judge what the stage of this job is. + switch job.SchemaState { + case model.StateNone, model.StateDeleteOnly, model.StateWriteOnly, + model.StateWriteReorganization: + return w.onReorganizePartition(d, t, job) + case model.StateDeleteReorganization: + // Drop the droppingDefinitions and finish the DDL + // This state is needed for the case where client A sees the schema + // with version of StateWriteReorg and would not see updates of + // client B that writes to the new partitions, previously + // addingDefinitions, since it would not double write to + // the droppingDefinitions during this time + // By adding StateDeleteReorg state, client B will write to both + // the new (previously addingDefinitions) AND droppingDefinitions + + tblInfo, partNamesCIStr, partInfo, _, _, err1 := getReorgPartitionInfo(t, job) + if err1 != nil { + return ver, err1 + } + partNames := make([]string, len(partNamesCIStr)) + for i := range partNamesCIStr { + partNames[i] = partNamesCIStr[i].L + } + + // Register the droppingDefinitions ids for rangeDelete + // and the addingDefinitions for handling in the updateSchemaVersion + physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) + newIDs := getPartitionIDsFromDefinitions(partInfo.Definitions) + job.CtxVars = []interface{}{physicalTableIDs, newIDs} + oldTblId := tblInfo.ID + // If no global index this is not really needed? + physicalTableIDs = append(physicalTableIDs, oldTblId) + err = t.DropTableOrView(job.SchemaID, tblInfo.ID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + // TODO: How to carrie over AUTO_INCREMENT etc.? + err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Del() + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + // TODO: Add failpoint here + definitionsToAdd := tblInfo.Partition.AddingDefinitions + tblInfo.Partition.AddingDefinitions = nil + tblInfo.Partition.DroppingDefinitions = nil + tblInfo.Partition.Type = tblInfo.Partition.NewType + tblInfo.Partition.Expr = tblInfo.Partition.NewExpr + tblInfo.Partition.Columns = tblInfo.Partition.NewColumns + tblInfo.ID = tblInfo.Partition.NewTableID + tblInfo.Partition.NewType = model.PartitionTypeNone + tblInfo.Partition.NewExpr = "" + tblInfo.Partition.NewColumns = nil + tblInfo.Partition.NewTableID = 0 + + err = t.CreateTableOrView(job.SchemaID, tblInfo) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) + failpoint.Inject("reorgPartWriteReorgSchemaVersionUpdateFail", func(val failpoint.Value) { + if val.(bool) { + err = errors.New("Injected error by reorgPartWriteReorgSchemaVersionUpdateFail") + } + }) + if err != nil { + return ver, errors.Trace(err) + } + job.SchemaState = model.StateNone + tblInfo.Partition.DDLState = model.StateNone + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + // Register for update the statistics + // Seems to only trigger asynchronous. + // Should it actually be synchronous? + asyncNotifyEvent(d, &util.Event{Tp: job.Type, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: definitionsToAdd}}) + // A background job will be created to delete old partition data. + job.Args = []interface{}{physicalTableIDs} + + default: + err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState) + } + + return ver, errors.Trace(err) +} + func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { // Handle the rolling back job if job.IsRollingback() { @@ -2260,7 +2382,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) return ver, nil } - tblInfo, partNamesCIStr, partInfo, _, addingDefinitions, err := checkReorgPartition(t, job) + tblInfo, partNamesCIStr, partInfo, _, addingDefinitions, err := getReorgPartitionInfo(t, job) if err != nil { return ver, err } @@ -2281,9 +2403,8 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) // The partInfo may have been checked against an older schema version for example. // If the check is done here, it does not need to be repeated, since no other // DDL on the same table can be run concurrently. - err = checkAddPartitionTooManyPartitions(uint64(len(tblInfo.Partition.Definitions) + - len(partInfo.Definitions) - - len(partNames))) + num := len(partInfo.Definitions) - len(partNames) + len(tblInfo.Partition.Definitions) + err = checkAddPartitionTooManyPartitions(uint64(num)) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -2592,22 +2713,19 @@ func newReorgPartitionWorker(sessCtx sessionctx.Context, i int, t table.Physical if pt == nil { return nil, dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs() } - partColIDs := pt.GetPartitionColumnIDs() + partColIDs := reorgedTbl.GetPartitionColumnIDs() writeColOffsetMap := make(map[int64]int, len(partColIDs)) maxOffset := 0 - for _, col := range pt.Cols() { - found := false - for _, id := range partColIDs { + for _, id := range partColIDs { + var offset int + for _, col := range pt.Cols() { if col.ID == id { - found = true + offset = col.Offset break } } - if !found { - continue - } - writeColOffsetMap[col.ID] = col.Offset - maxOffset = mathutil.Max[int](maxOffset, col.Offset) + writeColOffsetMap[id] = offset + maxOffset = mathutil.Max[int](maxOffset, offset) } return &reorgPartitionWorker{ backfillCtx: newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false), @@ -2705,6 +2823,8 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo return false, nil } + // TODO: Extend for normal tables + // TODO: Extend for REMOVE PARTITIONING _, err := w.rowDecoder.DecodeTheExistedColumnMap(w.sessCtx, handle, rawRow, sysTZ, w.rowMap) if err != nil { return false, errors.Trace(err) diff --git a/ddl/partition_test.go b/ddl/partition_test.go index dd4024770cf2d..583655781cf6d 100644 --- a/ddl/partition_test.go +++ b/ddl/partition_test.go @@ -15,6 +15,7 @@ package ddl_test import ( + "fmt" "testing" "time" @@ -242,3 +243,30 @@ func TestReorganizePartitionRollback(t *testing.T) { // test then add index should success tk.MustExec("alter table t1 add index idx_kc (k, c)") } + +func TestAlterPartitionByRange(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("create schema AlterPartitionBy") + tk.MustExec("use AlterPartitionBy") + // Just for debug... + //tk.MustExec(`create table t (a int primary key, b varchar(255), key (b)) partition by range (a) (partition p0 values less than (10))`) + // First easy example non-partitioned -> partitioned + tk.MustExec(`create table t (a int primary key, b varchar(255), key (b))`) + for i := 0; i < 1000; i++ { + tk.MustExec(fmt.Sprintf(`insert into t values (%d,'filler%d')`, i, i/3)) + } + tk.MustExec(`alter table t partition by range (a) (partition p0 values less than (1000000), partition p1 values less than (2000000), partition pMax values less than (maxvalue))`) + tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1105 The statistics of new partitions will be outdated after reorganizing partitions. Please use 'ANALYZE TABLE' statement if you want to update it now")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (1000000),\n" + + " PARTITION `p1` VALUES LESS THAN (2000000),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) +} diff --git a/expression/simple_rewriter.go b/expression/simple_rewriter.go index 3343a0cbaa169..17c2849a9669b 100644 --- a/expression/simple_rewriter.go +++ b/expression/simple_rewriter.go @@ -29,6 +29,9 @@ import ( // ParseSimpleExprWithTableInfo parses simple expression string to Expression. // The expression string must only reference the column in table Info. func ParseSimpleExprWithTableInfo(ctx sessionctx.Context, exprStr string, tableInfo *model.TableInfo) (Expression, error) { + if len(exprStr) == 0 { + return nil, nil + } exprStr = "select " + exprStr var stmts []ast.StmtNode var err error diff --git a/parser/model/ddl.go b/parser/model/ddl.go index d6d8790962382..cb5caddb2963a 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -102,6 +102,7 @@ const ( ActionCreateResourceGroup ActionType = 68 ActionAlterResourceGroup ActionType = 69 ActionDropResourceGroup ActionType = 70 + ActionAlterTablePartitioning ActionType = 71 ) var actionMap = map[ActionType]string{ @@ -170,6 +171,7 @@ var actionMap = map[ActionType]string{ ActionCreateResourceGroup: "create resource group", ActionAlterResourceGroup: "alter resource group", ActionDropResourceGroup: "drop resource group", + ActionAlterTablePartitioning: "alter table partition by", // `ActionAlterTableAlterPartition` is removed and will never be used. // Just left a tombstone here for compatibility. diff --git a/parser/model/model.go b/parser/model/model.go index 924188c903cdc..3ce18c1b8de6a 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1118,6 +1118,9 @@ const ( PartitionTypeList PartitionType = 3 PartitionTypeKey PartitionType = 4 PartitionTypeSystemTime PartitionType = 5 + // Actually non-partitioned, but during DDL keeping the table as + // a single partition + PartitionTypeNone PartitionType = 6 ) func (p PartitionType) String() string { @@ -1132,6 +1135,8 @@ func (p PartitionType) String() string { return "KEY" case PartitionTypeSystemTime: return "SYSTEM_TIME" + case PartitionTypeNone: + return "NONE" default: return "" } @@ -1164,6 +1169,13 @@ type PartitionInfo struct { Num uint64 `json:"num"` // Only used during ReorganizePartition so far DDLState SchemaState `json:"ddl_state"` + // Set during ALTER TABLE ... if the table id needs to change + // like if there is a global index + NewTableID int64 `json:"new_table_id"` + // Set during ALTER TABLE ... PARTITION BY ... + NewType PartitionType `json:"new_type"` + NewExpr string `json:"new_expr"` + NewColumns []CIStr `json:"new_columns"` } // Clone clones itself. diff --git a/table/tables/partition.go b/table/tables/partition.go index ad5ee36d54c90..3ae6da70fe442 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -237,6 +237,9 @@ func newPartitionExpr(tblInfo *model.TableInfo, defs []model.PartitionDefinition } pi := tblInfo.GetPartitionInfo() switch pi.Type { + case model.PartitionTypeNone: + // Nothing to do + return nil, nil case model.PartitionTypeRange: return generateRangePartitionExpr(ctx, pi, defs, columns, names) case model.PartitionTypeHash: @@ -1163,6 +1166,9 @@ func (t *partitionedTable) GetPartitionColumnIDs() []int64 { } return colIDs } + if t.partitionExpr == nil { + return nil + } partitionCols := expression.ExtractColumns(t.partitionExpr.Expr) colIDs := make([]int64, 0, len(partitionCols)) @@ -1426,6 +1432,16 @@ func GetReorganizedPartitionedTable(t table.Table) (table.PartitionedTable, erro tblInfo.Partition.Definitions = tblInfo.Partition.AddingDefinitions tblInfo.Partition.AddingDefinitions = nil tblInfo.Partition.DroppingDefinitions = nil + if tblInfo.Partition.NewType != model.PartitionTypeNone { + tblInfo.Partition.Type = tblInfo.Partition.NewType + tblInfo.Partition.Expr = tblInfo.Partition.NewExpr + tblInfo.Partition.Columns = tblInfo.Partition.NewColumns + tblInfo.ID = tblInfo.Partition.NewTableID + tblInfo.Partition.NewType = model.PartitionTypeNone + tblInfo.Partition.NewExpr = "" + tblInfo.Partition.NewColumns = nil + tblInfo.Partition.NewTableID = 0 + } var tc TableCommon initTableCommon(&tc, tblInfo, tblInfo.ID, t.Cols(), t.Allocators(nil)) From cb3020bc0223d93d4086c12b6c7f0c235549514c Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 10 Apr 2023 10:23:29 +0200 Subject: [PATCH 2/4] Linting --- ddl/partition.go | 7 +++++-- parser/model/model.go | 7 ++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index 156ac5e683c1d..8e52e3157c356 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2318,9 +2318,9 @@ func (w *worker) onAlterTablePartitioning(d *ddlCtx, t *meta.Meta, job *model.Jo physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) newIDs := getPartitionIDsFromDefinitions(partInfo.Definitions) job.CtxVars = []interface{}{physicalTableIDs, newIDs} - oldTblId := tblInfo.ID + oldTblID := tblInfo.ID // If no global index this is not really needed? - physicalTableIDs = append(physicalTableIDs, oldTblId) + physicalTableIDs = append(physicalTableIDs, oldTblID) err = t.DropTableOrView(job.SchemaID, tblInfo.ID) if err != nil { job.State = model.JobStateCancelled @@ -2346,6 +2346,9 @@ func (w *worker) onAlterTablePartitioning(d *ddlCtx, t *meta.Meta, job *model.Jo tblInfo.Partition.NewTableID = 0 err = t.CreateTableOrView(job.SchemaID, tblInfo) + if err != nil { + return ver, errors.Trace(err) + } ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) failpoint.Inject("reorgPartWriteReorgSchemaVersionUpdateFail", func(val failpoint.Value) { if val.(bool) { diff --git a/parser/model/model.go b/parser/model/model.go index 3ce18c1b8de6a..02524735bc9c1 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1113,14 +1113,15 @@ type PartitionType int // Partition types. const ( + // Actually non-partitioned, but during DDL keeping the table as + // a single partition + PartitionTypeNone PartitionType = 0 + PartitionTypeRange PartitionType = 1 PartitionTypeHash PartitionType = 2 PartitionTypeList PartitionType = 3 PartitionTypeKey PartitionType = 4 PartitionTypeSystemTime PartitionType = 5 - // Actually non-partitioned, but during DDL keeping the table as - // a single partition - PartitionTypeNone PartitionType = 6 ) func (p PartitionType) String() string { From 592ad803210590ed73da3a1e1f1b2f42c21777ff Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 10 Apr 2023 21:31:51 +0200 Subject: [PATCH 3/4] Added support for repartitioning an already partitioned table --- ddl/ddl_api.go | 77 ++++++++++++++------------ ddl/ddl_worker.go | 28 ++++++++++ ddl/partition.go | 71 +++++++++++++----------- ddl/partition_test.go | 18 +++++++ infoschema/builder.go | 5 +- meta/autoid/autoid.go | 2 + parser/model/model.go | 11 ++-- table/tables/partition.go | 110 ++++++++++++++++++++------------------ 8 files changed, 200 insertions(+), 122 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 8da0548ddb8ea..f23b64173951f 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4016,7 +4016,7 @@ func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, sp piOld := meta.GetPartitionInfo() var partNames []model.CIStr if piOld != nil { - partNames := make([]model.CIStr, 0, len(piOld.Definitions)) + partNames = make([]model.CIStr, 0, len(piOld.Definitions)) for i := range piOld.Definitions { partNames = append(partNames, piOld.Definitions[i].Name) } @@ -4100,7 +4100,7 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec if err = d.assignPartitionIDs(partInfo.Definitions); err != nil { return errors.Trace(err) } - if err = checkReorgPartitionDefs(ctx, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil { + if err = checkReorgPartitionDefs(ctx, model.ActionAlterTablePartitioning, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil { return errors.Trace(err) } if err = handlePartitionPlacement(ctx, partInfo); err != nil { @@ -4133,55 +4133,66 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec return errors.Trace(err) } -func checkReorgPartitionDefs(ctx sessionctx.Context, tblInfo *model.TableInfo, partInfo *model.PartitionInfo, firstPartIdx, lastPartIdx int, idMap map[int]struct{}) error { +func checkReorgPartitionDefs(ctx sessionctx.Context, action model.ActionType, tblInfo *model.TableInfo, partInfo *model.PartitionInfo, firstPartIdx, lastPartIdx int, idMap map[int]struct{}) error { // partInfo contains only the new added partition, we have to combine it with the // old partitions to check all partitions is strictly increasing. pi := tblInfo.Partition clonedMeta := tblInfo.Clone() clonedMeta.Partition.AddingDefinitions = partInfo.Definitions clonedMeta.Partition.Definitions = getReorganizedDefinitions(clonedMeta.Partition, firstPartIdx, lastPartIdx, idMap) + if action == model.ActionAlterTablePartitioning { + clonedMeta.Partition = partInfo + clonedMeta.ID = partInfo.NewTableID + } if err := checkPartitionDefinitionConstraints(ctx, clonedMeta); err != nil { return errors.Trace(err) } - if pi.Type == model.PartitionTypeRange { - if lastPartIdx == len(pi.Definitions)-1 { - // Last partition dropped, OK to change the end range - // Also includes MAXVALUE - return nil - } - // Check if the replaced end range is the same as before - lastAddingPartition := partInfo.Definitions[len(partInfo.Definitions)-1] - lastOldPartition := pi.Definitions[lastPartIdx] - if len(pi.Columns) > 0 { - newGtOld, err := checkTwoRangeColumns(ctx, &lastAddingPartition, &lastOldPartition, pi, tblInfo) + if action == model.ActionReorganizePartition { + if pi.Type == model.PartitionTypeRange { + if lastPartIdx == len(pi.Definitions)-1 { + // Last partition dropped, OK to change the end range + // Also includes MAXVALUE + return nil + } + // Check if the replaced end range is the same as before + lastAddingPartition := partInfo.Definitions[len(partInfo.Definitions)-1] + lastOldPartition := pi.Definitions[lastPartIdx] + if len(pi.Columns) > 0 { + newGtOld, err := checkTwoRangeColumns(ctx, &lastAddingPartition, &lastOldPartition, pi, tblInfo) + if err != nil { + return errors.Trace(err) + } + if newGtOld { + return errors.Trace(dbterror.ErrRangeNotIncreasing) + } + oldGtNew, err := checkTwoRangeColumns(ctx, &lastOldPartition, &lastAddingPartition, pi, tblInfo) + if err != nil { + return errors.Trace(err) + } + if oldGtNew { + return errors.Trace(dbterror.ErrRangeNotIncreasing) + } + return nil + } + + isUnsigned := isPartExprUnsigned(tblInfo) + currentRangeValue, _, err := getRangeValue(ctx, pi.Definitions[lastPartIdx].LessThan[0], isUnsigned) if err != nil { return errors.Trace(err) } - if newGtOld { - return errors.Trace(dbterror.ErrRangeNotIncreasing) - } - oldGtNew, err := checkTwoRangeColumns(ctx, &lastOldPartition, &lastAddingPartition, pi, tblInfo) + newRangeValue, _, err := getRangeValue(ctx, partInfo.Definitions[len(partInfo.Definitions)-1].LessThan[0], isUnsigned) if err != nil { return errors.Trace(err) } - if oldGtNew { + + if currentRangeValue != newRangeValue { return errors.Trace(dbterror.ErrRangeNotIncreasing) } - return nil } - - isUnsigned := isPartExprUnsigned(tblInfo) - currentRangeValue, _, err := getRangeValue(ctx, pi.Definitions[lastPartIdx].LessThan[0], isUnsigned) - if err != nil { - return errors.Trace(err) - } - newRangeValue, _, err := getRangeValue(ctx, partInfo.Definitions[len(partInfo.Definitions)-1].LessThan[0], isUnsigned) - if err != nil { - return errors.Trace(err) - } - - if currentRangeValue != newRangeValue { - return errors.Trace(dbterror.ErrRangeNotIncreasing) + } else { + if len(pi.Definitions) != (lastPartIdx - firstPartIdx + 1) { + // if not ActionReorganizePartition, require all partitions to be changed. + return errors.Trace(dbterror.ErrAlterOperationNotSupported) } } return nil diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 205960f247f4d..00345b8b5167f 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1373,6 +1373,34 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... } } } + case model.ActionAlterTablePartitioning: + diff.OldTableID = job.TableID + if job.SchemaState == model.StateDeleteReorganization { + // Final part, new table id is assigned + partInfo := &model.PartitionInfo{} + var partNames []model.CIStr + err = job.DecodeArgs(&partNames, &partInfo) + if err != nil { + return 0, errors.Trace(err) + } + diff.TableID = partInfo.NewTableID + if len(job.CtxVars) > 0 { + if droppedIDs, ok := job.CtxVars[0].([]int64); ok { + if addedIDs, ok := job.CtxVars[1].([]int64); ok { + // to use AffectedOpts we need both new and old to have the same length + maxParts := mathutil.Max[int](len(droppedIDs), len(addedIDs)) + // Also initialize them to 0! + oldIDs := make([]int64, maxParts) + copy(oldIDs, droppedIDs) + newIDs := make([]int64, maxParts) + copy(newIDs, addedIDs) + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + } + } + } + } else { + diff.TableID = job.TableID + } case model.ActionCreateTable: diff.TableID = job.TableID if len(job.Args) > 0 { diff --git a/ddl/partition.go b/ddl/partition.go index 8e52e3157c356..47c9bd132ed96 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1770,9 +1770,9 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( tblInfo.Partition = nil } else { tblInfo.Partition.NewTableID = 0 - tblInfo.Partition.NewExpr = "" - tblInfo.Partition.NewColumns = nil - tblInfo.Partition.NewType = model.PartitionTypeNone + tblInfo.Partition.DDLExpr = "" + tblInfo.Partition.DDLColumns = nil + tblInfo.Partition.DDLType = model.PartitionTypeNone } } @@ -2257,13 +2257,19 @@ func getReorgPartitionInfo(t *meta.Meta, job *model.Job) (*model.TableInfo, []mo if tblInfo.Partition != nil { addingDefs = tblInfo.Partition.AddingDefinitions droppingDefs = tblInfo.Partition.DroppingDefinitions + if job.Type == model.ActionAlterTablePartitioning { + tblInfo.Partition.NewTableID = partInfo.NewTableID + tblInfo.Partition.DDLType = partInfo.Type + tblInfo.Partition.DDLExpr = partInfo.Expr + tblInfo.Partition.DDLColumns = partInfo.Columns + } } else { tblInfo.Partition = getPartitionInfoTypeNone() tblInfo.Partition.NewTableID = partInfo.NewTableID tblInfo.Partition.Definitions[0].ID = tblInfo.ID - tblInfo.Partition.NewType = partInfo.Type - tblInfo.Partition.NewExpr = partInfo.Expr - tblInfo.Partition.NewColumns = partInfo.Columns + tblInfo.Partition.DDLType = partInfo.Type + tblInfo.Partition.DDLExpr = partInfo.Expr + tblInfo.Partition.DDLColumns = partInfo.Columns } if len(addingDefs) == 0 { addingDefs = []model.PartitionDefinition{} @@ -2327,23 +2333,27 @@ func (w *worker) onAlterTablePartitioning(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } // TODO: How to carrie over AUTO_INCREMENT etc.? - err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Del() - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } + // Check if they are carried over in ApplyDiff?!? + /* + err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Del() + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + */ // TODO: Add failpoint here definitionsToAdd := tblInfo.Partition.AddingDefinitions tblInfo.Partition.AddingDefinitions = nil tblInfo.Partition.DroppingDefinitions = nil - tblInfo.Partition.Type = tblInfo.Partition.NewType - tblInfo.Partition.Expr = tblInfo.Partition.NewExpr - tblInfo.Partition.Columns = tblInfo.Partition.NewColumns + tblInfo.Partition.Type = tblInfo.Partition.DDLType + tblInfo.Partition.Expr = tblInfo.Partition.DDLExpr + tblInfo.Partition.Columns = tblInfo.Partition.DDLColumns tblInfo.ID = tblInfo.Partition.NewTableID - tblInfo.Partition.NewType = model.PartitionTypeNone - tblInfo.Partition.NewExpr = "" - tblInfo.Partition.NewColumns = nil + tblInfo.Partition.DDLType = model.PartitionTypeNone + tblInfo.Partition.DDLExpr = "" + tblInfo.Partition.DDLColumns = nil tblInfo.Partition.NewTableID = 0 + tblInfo.Partition.DDLState = model.StateNone err = t.CreateTableOrView(job.SchemaID, tblInfo) if err != nil { @@ -2358,8 +2368,6 @@ func (w *worker) onAlterTablePartitioning(d *ddlCtx, t *meta.Meta, job *model.Jo if err != nil { return ver, errors.Trace(err) } - job.SchemaState = model.StateNone - tblInfo.Partition.DDLState = model.StateNone job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) // Register for update the statistics // Seems to only trigger asynchronous. @@ -2394,9 +2402,6 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) partNames[i] = partNamesCIStr[i].L } - // In order to skip maintaining the state check in partitionDefinition, TiDB use dropping/addingDefinition instead of state field. - // So here using `job.SchemaState` to judge what the stage of this job is. - originalState := job.SchemaState switch job.SchemaState { case model.StateNone: // job.SchemaState == model.StateNone means the job is in the initial state of reorg partition. @@ -2426,7 +2431,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) return ver, err } sctx := w.sess.Context - if err = checkReorgPartitionDefs(sctx, tblInfo, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil { + if err = checkReorgPartitionDefs(sctx, job.Type, tblInfo, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil { job.State = model.JobStateCancelled return ver, err } @@ -2555,18 +2560,18 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) } } - job.SchemaState = model.StateWriteOnly tblInfo.Partition.DDLState = model.StateWriteOnly metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.2 / float64(math.MaxUint64)) - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) + job.SchemaState = model.StateWriteOnly case model.StateWriteOnly: // Insert this state to confirm all servers can see the new partitions when reorg is running, // so that new data will be updated in both old and new partitions when reorganizing. job.SnapshotVer = 0 - job.SchemaState = model.StateWriteReorganization tblInfo.Partition.DDLState = model.StateWriteReorganization metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64)) - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) + job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) tbl, err2 := getTable(d.store, job.SchemaID, tblInfo) @@ -2597,16 +2602,21 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) } newDefs := getReorganizedDefinitions(tblInfo.Partition, firstPartIdx, lastPartIdx, idMap) - // From now on, use the new definitions, but keep the Adding and Dropping for double write + // From now on, use the new partitioning, but keep the Adding and Dropping for double write tblInfo.Partition.Definitions = newDefs tblInfo.Partition.Num = uint64(len(newDefs)) + if job.Type == model.ActionAlterTablePartitioning { + tblInfo.Partition.Type, tblInfo.Partition.DDLType = tblInfo.Partition.DDLType, tblInfo.Partition.Type + tblInfo.Partition.Expr, tblInfo.Partition.DDLExpr = tblInfo.Partition.DDLExpr, tblInfo.Partition.Expr + tblInfo.Partition.Columns, tblInfo.Partition.DDLColumns = tblInfo.Partition.DDLColumns, tblInfo.Partition.Columns + } // Now all the data copying is done, but we cannot simply remove the droppingDefinitions // since they are a part of the normal Definitions that other nodes with // the current schema version. So we need to double write for one more schema version - job.SchemaState = model.StateDeleteReorganization tblInfo.Partition.DDLState = model.StateDeleteReorganization - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) + job.SchemaState = model.StateDeleteReorganization case model.StateDeleteReorganization: // Drop the droppingDefinitions and finish the DDL @@ -2635,7 +2645,6 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) if err != nil { return ver, errors.Trace(err) } - job.SchemaState = model.StateNone tblInfo.Partition.DDLState = model.StateNone job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) // How to handle this? diff --git a/ddl/partition_test.go b/ddl/partition_test.go index 583655781cf6d..9ab00ef8affb1 100644 --- a/ddl/partition_test.go +++ b/ddl/partition_test.go @@ -269,4 +269,22 @@ func TestAlterPartitionByRange(t *testing.T) { "(PARTITION `p0` VALUES LESS THAN (1000000),\n" + " PARTITION `p1` VALUES LESS THAN (2000000),\n" + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustExec(`alter table t partition by hash(a) partitions 7`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY HASH (`a`) PARTITIONS 7")) + tk.MustExec(`alter table t partition by key(a) partitions 5`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY KEY (`a`) PARTITIONS 5")) } diff --git a/infoschema/builder.go b/infoschema/builder.go index 3240e1c5c5c49..0c5a8818bb7f0 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -227,6 +227,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro return b.applyCreateTables(m, diff) case model.ActionReorganizePartition: return b.applyReorganizePartition(m, diff) + case model.ActionAlterTablePartitioning: + return b.applyReorganizePartition(m, diff) case model.ActionFlashbackCluster: return []int64{-1}, nil default: @@ -411,7 +413,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID - case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: + case model.ActionTruncateTable, model.ActionCreateView, + model.ActionExchangeTablePartition, model.ActionAlterTablePartitioning: oldTableID = diff.OldTableID newTableID = diff.TableID default: diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 4cd03bea89219..0fffa8410baa9 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -656,6 +656,8 @@ func NewSequenceAllocator(store kv.Storage, dbID, tbID int64, info *model.Sequen } } +// TODO: Handle allocators when changing Table ID during ALTER TABLE t PARTITION BY ... + // NewAllocatorsFromTblInfo creates an array of allocators of different types with the information of model.TableInfo. func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) Allocators { var allocs []Allocator diff --git a/parser/model/model.go b/parser/model/model.go index 02524735bc9c1..ca9ee1d9a1002 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1171,12 +1171,15 @@ type PartitionInfo struct { // Only used during ReorganizePartition so far DDLState SchemaState `json:"ddl_state"` // Set during ALTER TABLE ... if the table id needs to change - // like if there is a global index + // like if there is a global index or going between non-partitioned + // and partitioned table, to make the data dropping / range delete + // optimized. NewTableID int64 `json:"new_table_id"` // Set during ALTER TABLE ... PARTITION BY ... - NewType PartitionType `json:"new_type"` - NewExpr string `json:"new_expr"` - NewColumns []CIStr `json:"new_columns"` + // First as the new partition scheme, then in StateDeleteReorg as the old + DDLType PartitionType `json:"ddl_type"` + DDLExpr string `json:"ddl_expr"` + DDLColumns []CIStr `json:"ddl_columns"` } // Clone clones itself. diff --git a/table/tables/partition.go b/table/tables/partition.go index 3ae6da70fe442..65fcaf26540e3 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -112,7 +112,7 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part return nil, table.ErrUnknownPartition } ret := &partitionedTable{TableCommon: *tbl} - partitionExpr, err := newPartitionExpr(tblInfo, pi.Definitions) + partitionExpr, err := newPartitionExpr(tblInfo, pi.Type, pi.Expr, pi.Columns, pi.Definitions) if err != nil { return nil, errors.Trace(err) } @@ -147,7 +147,14 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part if pi.DDLState == model.StateDeleteReorganization { origIdx := setIndexesState(ret, pi.DDLState) defer unsetIndexesState(ret, origIdx) - ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.DroppingDefinitions) + if pi.NewTableID != 0 { + if pi.DDLType != model.PartitionTypeNone { + ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.DDLType, pi.DDLExpr, pi.DDLColumns, pi.DroppingDefinitions) + } + // If PartitionTypeNone, no partition expression + } else { + ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.Type, pi.Expr, pi.Columns, pi.DroppingDefinitions) + } if err != nil { return nil, errors.Trace(err) } @@ -168,7 +175,7 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part if len(pi.AddingDefinitions) > 0 { origIdx := setIndexesState(ret, pi.DDLState) defer unsetIndexesState(ret, origIdx) - ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.AddingDefinitions) + ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.DDLType, pi.DDLExpr, pi.DDLColumns, pi.AddingDefinitions) if err != nil { return nil, errors.Trace(err) } @@ -227,7 +234,7 @@ func initPartition(t *partitionedTable, def model.PartitionDefinition) (*partiti return &newPart, nil } -func newPartitionExpr(tblInfo *model.TableInfo, defs []model.PartitionDefinition) (*PartitionExpr, error) { +func newPartitionExpr(tblInfo *model.TableInfo, tp model.PartitionType, expr string, partCols []model.CIStr, defs []model.PartitionDefinition) (*PartitionExpr, error) { // a partitioned table cannot rely on session context/sql modes, so use a default one! ctx := mock.NewContext() dbName := model.NewCIStr(ctx.GetSessionVars().CurrentDB) @@ -235,19 +242,18 @@ func newPartitionExpr(tblInfo *model.TableInfo, defs []model.PartitionDefinition if err != nil { return nil, err } - pi := tblInfo.GetPartitionInfo() - switch pi.Type { + switch tp { case model.PartitionTypeNone: // Nothing to do return nil, nil case model.PartitionTypeRange: - return generateRangePartitionExpr(ctx, pi, defs, columns, names) + return generateRangePartitionExpr(ctx, expr, partCols, defs, columns, names) case model.PartitionTypeHash: - return generateHashPartitionExpr(ctx, pi, columns, names) + return generateHashPartitionExpr(ctx, expr, columns, names) case model.PartitionTypeKey: - return generateKeyPartitionExpr(ctx, pi, columns, names) + return generateKeyPartitionExpr(ctx, expr, partCols, columns, names) case model.PartitionTypeList: - return generateListPartitionExpr(ctx, tblInfo, defs, columns, names) + return generateListPartitionExpr(ctx, tblInfo, expr, partCols, defs, columns, names) } panic("cannot reach here") } @@ -641,25 +647,25 @@ func fixOldVersionPartitionInfo(sctx sessionctx.Context, str string) (int64, boo return ret, true } -func rangePartitionExprStrings(pi *model.PartitionInfo) []string { +func rangePartitionExprStrings(cols []model.CIStr, expr string) []string { var s []string - if len(pi.Columns) > 0 { - s = make([]string, 0, len(pi.Columns)) - for _, col := range pi.Columns { + if len(cols) > 0 { + s = make([]string, 0, len(cols)) + for _, col := range cols { s = append(s, stringutil.Escape(col.O, mysql.ModeNone)) } } else { - s = []string{pi.Expr} + s = []string{expr} } return s } -func generateKeyPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, +func generateKeyPartitionExpr(ctx sessionctx.Context, expr string, partCols []model.CIStr, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { ret := &PartitionExpr{ ForKeyPruning: &ForKeyPruning{}, } - _, partColumns, offset, err := extractPartitionExprColumns(ctx, pi, columns, names) + _, partColumns, offset, err := extractPartitionExprColumns(ctx, expr, partCols, columns, names) if err != nil { return nil, errors.Trace(err) } @@ -669,12 +675,12 @@ func generateKeyPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, return ret, nil } -func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, +func generateRangePartitionExpr(ctx sessionctx.Context, expr string, partCols []model.CIStr, defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { // The caller should assure partition info is not nil. p := parser.New() schema := expression.NewSchema(columns...) - partStrs := rangePartitionExprStrings(pi) + partStrs := rangePartitionExprStrings(partCols, expr) locateExprs, err := getRangeLocateExprs(ctx, p, defs, partStrs, schema, names) if err != nil { return nil, errors.Trace(err) @@ -683,13 +689,13 @@ func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, UpperBounds: locateExprs, } - partExpr, _, offset, err := extractPartitionExprColumns(ctx, pi, columns, names) + partExpr, _, offset, err := extractPartitionExprColumns(ctx, expr, partCols, columns, names) if err != nil { return nil, errors.Trace(err) } ret.ColumnOffset = offset - if len(pi.Columns) < 1 { + if len(partCols) < 1 { tmp, err := dataForRangePruning(ctx, defs) if err != nil { return nil, errors.Trace(err) @@ -759,19 +765,19 @@ func findIdxByColUniqueID(cols []*expression.Column, col *expression.Column) int return -1 } -func extractPartitionExprColumns(ctx sessionctx.Context, pi *model.PartitionInfo, columns []*expression.Column, names types.NameSlice) (expression.Expression, []*expression.Column, []int, error) { +func extractPartitionExprColumns(ctx sessionctx.Context, expr string, partCols []model.CIStr, columns []*expression.Column, names types.NameSlice) (expression.Expression, []*expression.Column, []int, error) { var cols []*expression.Column var partExpr expression.Expression - if len(pi.Columns) == 0 { + if len(partCols) == 0 { schema := expression.NewSchema(columns...) - exprs, err := expression.ParseSimpleExprsWithNames(ctx, pi.Expr, schema, names) + exprs, err := expression.ParseSimpleExprsWithNames(ctx, expr, schema, names) if err != nil { return nil, nil, nil, err } cols = expression.ExtractColumns(exprs[0]) partExpr = exprs[0] } else { - for _, col := range pi.Columns { + for _, col := range partCols { idx := expression.FindFieldNameIdxByColName(names, col.L) if idx < 0 { panic("should never happen") @@ -790,19 +796,18 @@ func extractPartitionExprColumns(ctx sessionctx.Context, pi *model.PartitionInfo return partExpr, deDupCols, offset, nil } -func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, +func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, expr string, partCols []model.CIStr, defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { // The caller should assure partition info is not nil. - pi := tblInfo.GetPartitionInfo() - partExpr, exprCols, offset, err := extractPartitionExprColumns(ctx, pi, columns, names) + partExpr, exprCols, offset, err := extractPartitionExprColumns(ctx, expr, partCols, columns, names) if err != nil { return nil, err } listPrune := &ForListPruning{} - if len(pi.Columns) == 0 { - err = listPrune.buildListPruner(ctx, tblInfo, defs, exprCols, columns, names) + if len(partCols) == 0 { + err = listPrune.buildListPruner(ctx, expr, defs, exprCols, columns, names) } else { - err = listPrune.buildListColumnsPruner(ctx, tblInfo, defs, columns, names) + err = listPrune.buildListColumnsPruner(ctx, tblInfo, partCols, defs, columns, names) } if err != nil { return nil, err @@ -838,15 +843,14 @@ func (lp *ForListPruning) Clone() *ForListPruning { return &ret } -func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model.TableInfo, defs []model.PartitionDefinition, exprCols []*expression.Column, +func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, exprStr string, defs []model.PartitionDefinition, exprCols []*expression.Column, columns []*expression.Column, names types.NameSlice) error { - pi := tblInfo.GetPartitionInfo() schema := expression.NewSchema(columns...) p := parser.New() - expr, err := parseSimpleExprWithNames(p, ctx, pi.Expr, schema, names) + expr, err := parseSimpleExprWithNames(p, ctx, exprStr, schema, names) if err != nil { // If it got an error here, ddl may hang forever, so this error log is important. - logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", pi.Expr), zap.Error(err)) + logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", exprStr), zap.Error(err)) return errors.Trace(err) } // Since need to change the column index of the expression, clone the expression first. @@ -869,20 +873,19 @@ func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model } func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, - tblInfo *model.TableInfo, defs []model.PartitionDefinition, + tblInfo *model.TableInfo, partCols []model.CIStr, defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) error { - pi := tblInfo.GetPartitionInfo() schema := expression.NewSchema(columns...) p := parser.New() - colPrunes := make([]*ForListColumnPruning, 0, len(pi.Columns)) - for colIdx := range pi.Columns { - colInfo := model.FindColumnInfo(tblInfo.Columns, pi.Columns[colIdx].L) + colPrunes := make([]*ForListColumnPruning, 0, len(partCols)) + for colIdx := range partCols { + colInfo := model.FindColumnInfo(tblInfo.Columns, partCols[colIdx].L) if colInfo == nil { - return table.ErrUnknownColumn.GenWithStackByArgs(pi.Columns[colIdx].L) + return table.ErrUnknownColumn.GenWithStackByArgs(partCols[colIdx].L) } - idx := expression.FindFieldNameIdxByColName(names, pi.Columns[colIdx].L) + idx := expression.FindFieldNameIdxByColName(names, partCols[colIdx].L) if idx < 0 { - return table.ErrUnknownColumn.GenWithStackByArgs(pi.Columns[colIdx].L) + return table.ErrUnknownColumn.GenWithStackByArgs(partCols[colIdx].L) } colPrune := &ForListColumnPruning{ ctx: ctx, @@ -1114,18 +1117,18 @@ func (lp *ForListColumnPruning) LocateRanges(sc *stmtctx.StatementContext, r *ra return locations, nil } -func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, +func generateHashPartitionExpr(ctx sessionctx.Context, exprStr string, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { // The caller should assure partition info is not nil. schema := expression.NewSchema(columns...) - origExpr, err := parseExpr(parser.New(), pi.Expr) + origExpr, err := parseExpr(parser.New(), exprStr) if err != nil { return nil, err } exprs, err := rewritePartitionExpr(ctx, origExpr, schema, names) if err != nil { // If it got an error here, ddl may hang forever, so this error log is important. - logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", pi.Expr), zap.Error(err)) + logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", exprStr), zap.Error(err)) return nil, errors.Trace(err) } // build column offset. @@ -1430,16 +1433,17 @@ func GetReorganizedPartitionedTable(t table.Table) (table.PartitionedTable, erro } tblInfo := t.Meta().Clone() tblInfo.Partition.Definitions = tblInfo.Partition.AddingDefinitions + tblInfo.Partition.Num = uint64(len(tblInfo.Partition.Definitions)) tblInfo.Partition.AddingDefinitions = nil tblInfo.Partition.DroppingDefinitions = nil - if tblInfo.Partition.NewType != model.PartitionTypeNone { - tblInfo.Partition.Type = tblInfo.Partition.NewType - tblInfo.Partition.Expr = tblInfo.Partition.NewExpr - tblInfo.Partition.Columns = tblInfo.Partition.NewColumns + if tblInfo.Partition.DDLType != model.PartitionTypeNone { + tblInfo.Partition.Type = tblInfo.Partition.DDLType + tblInfo.Partition.Expr = tblInfo.Partition.DDLExpr + tblInfo.Partition.Columns = tblInfo.Partition.DDLColumns tblInfo.ID = tblInfo.Partition.NewTableID - tblInfo.Partition.NewType = model.PartitionTypeNone - tblInfo.Partition.NewExpr = "" - tblInfo.Partition.NewColumns = nil + tblInfo.Partition.DDLType = model.PartitionTypeNone + tblInfo.Partition.DDLExpr = "" + tblInfo.Partition.DDLColumns = nil tblInfo.Partition.NewTableID = 0 } var tc TableCommon From a0c8c1f4a4ec390b36477142bef33978923daac2 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 10 Apr 2023 22:14:26 +0200 Subject: [PATCH 4/4] Fixed a broken test case --- ddl/ddl_api.go | 2 +- ddl/partition_test.go | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index f23b64173951f..d9e95842a7ad0 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4100,7 +4100,7 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec if err = d.assignPartitionIDs(partInfo.Definitions); err != nil { return errors.Trace(err) } - if err = checkReorgPartitionDefs(ctx, model.ActionAlterTablePartitioning, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil { + if err = checkReorgPartitionDefs(ctx, model.ActionReorganizePartition, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil { return errors.Trace(err) } if err = handlePartitionPlacement(ctx, partInfo); err != nil { diff --git a/ddl/partition_test.go b/ddl/partition_test.go index 9ab00ef8affb1..0c14f1bae7ead 100644 --- a/ddl/partition_test.go +++ b/ddl/partition_test.go @@ -288,3 +288,23 @@ func TestAlterPartitionByRange(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + "PARTITION BY KEY (`a`) PARTITIONS 5")) } + +func TestReorgRangeTimestampMaxvalue(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("create schema AlterPartitionBy") + tk.MustExec("use AlterPartitionBy") + tk.MustExec(`CREATE TABLE t1 ( +a timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, +b varchar(10), +PRIMARY KEY (a) +) +PARTITION BY RANGE (UNIX_TIMESTAMP(a)) ( +PARTITION p1 VALUES LESS THAN (1199134800), +PARTITION pmax VALUES LESS THAN MAXVALUE +)`) + + tk.MustExec(`ALTER TABLE t1 REORGANIZE PARTITION pmax INTO ( +PARTITION p3 VALUES LESS THAN (1247688000), +PARTITION pmax VALUES LESS THAN MAXVALUE)`) +}