From fc1312f8689d5333f762d712e64a69aecd54ddb0 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Wed, 20 Nov 2024 15:49:08 +0100 Subject: [PATCH 1/2] *: Truncate partition with global index improvement (#55831) close pingcap/tidb#55819 --- pkg/ddl/partition.go | 461 +++++++--------- pkg/ddl/tests/partition/db_partition_test.go | 52 +- pkg/ddl/tests/partition/multi_domain_test.go | 533 ++++++++++++++++++- pkg/ddl/tests/tiflash/ddl_tiflash_test.go | 7 +- pkg/executor/test/admintest/admin_test.go | 4 +- pkg/meta/model/table.go | 13 - pkg/planner/core/find_best_task.go | 1 + pkg/planner/core/point_get_plan.go | 1 + pkg/table/tables/index.go | 35 +- pkg/table/tables/mutation_checker.go | 1 + pkg/table/tables/partition.go | 23 +- pkg/table/tables/tables.go | 2 + pkg/tablecodec/tablecodec.go | 14 +- 13 files changed, 834 insertions(+), 313 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index f0f210e44d659..635c05290ef03 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -2035,12 +2035,11 @@ func CheckDropTablePartition(meta *model.TableInfo, partLowerNames []string) err return nil } -// updateDroppingPartitionInfo move dropping partitions to DroppingDefinitions, and return partitionIDs -func updateDroppingPartitionInfo(tblInfo *model.TableInfo, partLowerNames []string) []int64 { +// updateDroppingPartitionInfo move dropping partitions to DroppingDefinitions +func updateDroppingPartitionInfo(tblInfo *model.TableInfo, partLowerNames []string) { oldDefs := tblInfo.Partition.Definitions newDefs := make([]model.PartitionDefinition, 0, len(oldDefs)-len(partLowerNames)) droppingDefs := make([]model.PartitionDefinition, 0, len(partLowerNames)) - pids := make([]int64, 0, len(partLowerNames)) // consider using a map to probe partLowerNames if too many partLowerNames for i := range oldDefs { @@ -2052,7 +2051,6 @@ func updateDroppingPartitionInfo(tblInfo *model.TableInfo, partLowerNames []stri } } if found { - pids = append(pids, oldDefs[i].ID) droppingDefs = append(droppingDefs, oldDefs[i]) } else { newDefs = append(newDefs, oldDefs[i]) @@ -2061,7 +2059,6 @@ func updateDroppingPartitionInfo(tblInfo *model.TableInfo, partLowerNames []stri tblInfo.Partition.Definitions = newDefs tblInfo.Partition.DroppingDefinitions = droppingDefs - return pids } func getPartitionDef(tblInfo *model.TableInfo, partName string) (index int, def *model.PartitionDefinition, _ error) { @@ -2105,34 +2102,6 @@ func getTableInfoWithDroppingPartitions(t *model.TableInfo) *model.TableInfo { return nt } -// getTableInfoWithOriginalPartitions builds oldTableInfo including truncating partitions, only used by onTruncateTablePartition. -func getTableInfoWithOriginalPartitions(t *model.TableInfo, oldIDs []int64, newIDs []int64) *model.TableInfo { - nt := t.Clone() - np := nt.Partition - - // reconstruct original definitions - for _, oldDef := range np.DroppingDefinitions { - var newID int64 - for i := range newIDs { - if oldDef.ID == oldIDs[i] { - newID = newIDs[i] - break - } - } - for i := range np.Definitions { - newDef := &np.Definitions[i] - if newDef.ID == newID { - newDef.ID = oldDef.ID - break - } - } - } - - np.DroppingDefinitions = nil - np.NewPartitionIDs = nil - return nt -} - func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames []string) error { deleteRules := make([]string, 0, len(partNames)) for _, partName := range partNames { @@ -2252,7 +2221,6 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i return ver, errors.Trace(err) } - var physicalTableIDs []int64 switch job.SchemaState { case model.StatePublic: // Here we mark the partitions to be dropped, so they are not read or written @@ -2264,7 +2232,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i // Reason, see https://github.com/pingcap/tidb/issues/55888 // Only mark the partitions as to be dropped, so they are not used, but not yet removed. originalDefs := tblInfo.Partition.Definitions - physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames) + updateDroppingPartitionInfo(tblInfo, partNames) tblInfo.Partition.Definitions = originalDefs job.SchemaState = model.StateWriteOnly tblInfo.Partition.DDLState = job.SchemaState @@ -2275,7 +2243,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i // Since the previous state do not use the dropping partitions, // we can now actually remove them, allowing to write into the overlapping range // of the higher range partition or LIST default partition. - physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames) + updateDroppingPartitionInfo(tblInfo, partNames) err = dropLabelRules(jobCtx.stepCtx, job.SchemaName, tblInfo.Name.L, partNames) if err != nil { // TODO: Add failpoint error/cancel injection and test failure/rollback and cancellation! @@ -2325,60 +2293,16 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i tblInfo.Partition.DDLState = job.SchemaState ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) case model.StateDeleteReorganization: - oldTblInfo := getTableInfoWithDroppingPartitions(tblInfo) - physicalTableIDs = getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) - tbl, err := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, oldTblInfo) - if err != nil { - return ver, errors.Trace(err) - } - dbInfo, err := metaMut.GetDatabase(job.SchemaID) - if err != nil { - return ver, errors.Trace(err) - } - // If table has global indexes, we need reorg to clean up them. - if pt, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) { - // Build elements for compatible with modify column type. elements will not be used when reorganizing. - elements := make([]*meta.Element, 0, len(tblInfo.Indices)) - for _, idxInfo := range tblInfo.Indices { - if idxInfo.Global { - elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) - } - } - sctx, err1 := w.sessPool.Get() - if err1 != nil { - return ver, err1 - } - defer w.sessPool.Put(sctx) - rh := newReorgHandler(sess.NewSession(sctx)) - reorgInfo, err := getReorgInfoFromPartitions(jobCtx.oldDDLCtx.jobContext(job.ID, job.ReorgMeta), jobCtx, rh, job, dbInfo, pt, physicalTableIDs, elements) - - if err != nil || reorgInfo.first { - // If we run reorg firstly, we should update the job snapshot version - // and then run the reorg next time. - return ver, errors.Trace(err) - } - err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) { - defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition", - func() { - dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic") - }, false) - return w.cleanupGlobalIndexes(pt, physicalTableIDs, reorgInfo) - }) - if err != nil { - if dbterror.ErrWaitReorgTimeout.Equal(err) { - // if timeout, we should return, check for the owner and re-wait job done. - return ver, nil - } - if dbterror.ErrPausedDDLJob.Equal(err) { - // if ErrPausedDDLJob, we should return, check for the owner and re-wait job done. - return ver, nil - } + physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) + if hasGlobalIndex(tblInfo) { + oldTblInfo := getTableInfoWithDroppingPartitions(tblInfo) + var done bool + done, err = w.cleanGlobalIndexEntriesFromDroppedPartitions(jobCtx, job, oldTblInfo, physicalTableIDs) + if err != nil || !done { return ver, errors.Trace(err) } } - if tblInfo.TiFlashReplica != nil { - removeTiFlashAvailablePartitionIDs(tblInfo, physicalTableIDs) - } + removeTiFlashAvailablePartitionIDs(tblInfo, physicalTableIDs) droppedDefs := tblInfo.Partition.DroppingDefinitions tblInfo.Partition.DroppingDefinitions = nil job.SchemaState = model.StateNone @@ -2410,6 +2334,9 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i } func removeTiFlashAvailablePartitionIDs(tblInfo *model.TableInfo, pids []int64) { + if tblInfo.TiFlashReplica == nil { + return + } // Remove the partitions ids := tblInfo.TiFlashReplica.AvailablePartitionIDs // Rarely called, so OK to take some time, to make it easy @@ -2426,209 +2353,221 @@ func removeTiFlashAvailablePartitionIDs(tblInfo *model.TableInfo, pids []int64) tblInfo.TiFlashReplica.AvailablePartitionIDs = ids } +func replaceTruncatePartitions(job *model.Job, t *meta.Mutator, tblInfo *model.TableInfo, oldIDs, newIDs []int64) ([]model.PartitionDefinition, []model.PartitionDefinition, error) { + oldDefinitions := make([]model.PartitionDefinition, 0, len(oldIDs)) + newDefinitions := make([]model.PartitionDefinition, 0, len(oldIDs)) + pi := tblInfo.Partition + for i, id := range oldIDs { + for defIdx := range pi.Definitions { + // use a reference to actually set the new ID! + def := &pi.Definitions[defIdx] + if id == def.ID { + oldDefinitions = append(oldDefinitions, def.Clone()) + def.ID = newIDs[i] + // Shallow copy, since we do not need to replace them. + newDefinitions = append(newDefinitions, *def) + break + } + } + } + + if err := clearTruncatePartitionTiflashStatus(tblInfo, newDefinitions, oldIDs); err != nil { + return nil, nil, err + } + + if err := updateTruncatePartitionLabelRules(job, t, oldDefinitions, newDefinitions, tblInfo, oldIDs); err != nil { + return nil, nil, err + } + return oldDefinitions, newDefinitions, nil +} + +func (w *worker) cleanGlobalIndexEntriesFromDroppedPartitions(jobCtx *jobContext, job *model.Job, tblInfo *model.TableInfo, oldIDs []int64) (bool, error) { + tbl, err := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, tblInfo) + if err != nil { + return false, errors.Trace(err) + } + dbInfo, err := jobCtx.metaMut.GetDatabase(job.SchemaID) + if err != nil { + return false, errors.Trace(err) + } + pt, ok := tbl.(table.PartitionedTable) + if !ok { + return false, dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState) + } + + elements := make([]*meta.Element, 0, len(tblInfo.Indices)) + for _, idxInfo := range tblInfo.Indices { + if idxInfo.Global { + elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) + } + } + if len(elements) == 0 { + return true, nil + } + sctx, err1 := w.sessPool.Get() + if err1 != nil { + return false, err1 + } + defer w.sessPool.Put(sctx) + rh := newReorgHandler(sess.NewSession(sctx)) + reorgInfo, err := getReorgInfoFromPartitions(jobCtx.oldDDLCtx.jobContext(job.ID, job.ReorgMeta), jobCtx, rh, job, dbInfo, pt, oldIDs, elements) + + if err != nil || reorgInfo.first { + // If we run reorg firstly, we should update the job snapshot version + // and then run the reorg next time. + return false, errors.Trace(err) + } + err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) { + defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition", + func() { + dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic") + }, false) + return w.cleanupGlobalIndexes(pt, oldIDs, reorgInfo) + }) + if err != nil { + if dbterror.ErrWaitReorgTimeout.Equal(err) { + // if timeout, we should return, check for the owner and re-wait job done. + return false, nil + } + if dbterror.ErrPausedDDLJob.Equal(err) { + // if ErrPausedDDLJob, we should return, check for the owner and re-wait job done. + return false, nil + } + return false, errors.Trace(err) + } + return true, nil +} + // onTruncateTablePartition truncates old partition meta. +// +// # StateNone +// +// Unaware of DDL. +// +// # StateWriteOnly +// +// Still sees and uses the old partition, but should filter out index reads of +// global index which has ids from pi.NewPartitionIDs. +// Allow duplicate key errors even if one cannot access the global index entry by reading! +// This state is not really needed if there are no global indexes, but used for consistency. +// +// # StateDeleteOnly +// +// Sees new partition, but should filter out index reads of global index which +// has ids from pi.DroppingDefinitions. +// Allow duplicate key errors even if one cannot access the global index entry by reading! +// +// # StateDeleteReorganization +// +// Now no other session has access to the old partition, +// but there are global index entries left pointing to the old partition, +// so they should be filtered out (see pi.DroppingDefinitions) and on write (insert/update) +// the old partition's row should be deleted and the global index key allowed +// to be overwritten. +// During this time the old partition is read and removing matching entries in +// smaller batches. +// This state is not really needed if there are no global indexes, but used for consistency. +// +// # StatePublic +// +// DDL done. func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (int64, error) { var ver int64 + canCancel := false + if job.SchemaState == model.StatePublic { + canCancel = true + } args, err := model.GetTruncateTableArgs(job) if err != nil { - job.State = model.JobStateCancelled + if canCancel { + job.State = model.JobStateCancelled + } return ver, errors.Trace(err) } jobCtx.jobArgs = args oldIDs, newIDs := args.OldPartitionIDs, args.NewPartitionIDs if len(oldIDs) != len(newIDs) { - job.State = model.JobStateCancelled + if canCancel { + job.State = model.JobStateCancelled + } return ver, errors.Trace(errors.New("len(oldIDs) must be the same as len(newIDs)")) } - metaMut := jobCtx.metaMut - tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID) + tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID) if err != nil { + if canCancel { + job.State = model.JobStateCancelled + } return ver, errors.Trace(err) } pi := tblInfo.GetPartitionInfo() if pi == nil { - return ver, errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned) - } - - if !hasGlobalIndex(tblInfo) { - oldPartitions := make([]model.PartitionDefinition, 0, len(oldIDs)) - newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs)) - for k, oldID := range oldIDs { - for i := 0; i < len(pi.Definitions); i++ { - def := &pi.Definitions[i] - if def.ID == oldID { - oldPartitions = append(oldPartitions, def.Clone()) - def.ID = newIDs[k] - // Shallow copy only use the def.ID in event handle. - newPartitions = append(newPartitions, *def) - break - } - } - } - if len(newPartitions) == 0 { - job.State = model.JobStateCancelled - return ver, table.ErrUnknownPartition.GenWithStackByArgs(fmt.Sprintf("pid:%v", oldIDs), tblInfo.Name.O) - } - - if err = clearTruncatePartitionTiflashStatus(tblInfo, newPartitions, oldIDs); err != nil { - job.State = model.JobStateCancelled - return ver, err - } - - if err = updateTruncatePartitionLabelRules(job, jobCtx.metaMut, oldPartitions, newPartitions, tblInfo, oldIDs); err != nil { + if canCancel { job.State = model.JobStateCancelled - return ver, err } - - preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions) - - args.ShouldUpdateAffectedPartitions = true - ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) - if err != nil { - return ver, errors.Trace(err) - } - truncatePartitionEvent := notifier.NewTruncatePartitionEvent( - tblInfo, - &model.PartitionInfo{Definitions: newPartitions}, - &model.PartitionInfo{Definitions: oldPartitions}, - ) - err = asyncNotifyEvent(jobCtx, truncatePartitionEvent, job, noSubJob, w.sess) - if err != nil { - return ver, errors.Trace(err) - } - - // Finish this job. - job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) - // A background job will be created to delete old partition data. - job.FillFinishedArgs(&model.TruncateTableArgs{ - OldPartitionIDs: oldIDs, - }) - - return ver, err + return ver, errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned) } - // When table has global index, public->deleteOnly->deleteReorg->none schema changes should be handled. + var oldDefinitions []model.PartitionDefinition + var newDefinitions []model.PartitionDefinition + switch job.SchemaState { case model.StatePublic: - // Step1: generate new partition ids - truncatingDefinitions := make([]model.PartitionDefinition, 0, len(oldIDs)) - for i, oldID := range oldIDs { - for j := 0; j < len(pi.Definitions); j++ { - def := &pi.Definitions[j] - if def.ID == oldID { - truncatingDefinitions = append(truncatingDefinitions, def.Clone()) - def.ID = newIDs[i] - break - } - } - } - pi.DroppingDefinitions = truncatingDefinitions + // This work as a flag to ignore Global Index entries from the new partitions! + // Used in IDsInDDLToIgnore() for filtering new partitions from + // the global index pi.NewPartitionIDs = newIDs[:] + pi.DDLAction = model.ActionTruncateTablePartition + job.SchemaState = model.StateWriteOnly + pi.DDLState = job.SchemaState + return updateVersionAndTableInfo(jobCtx, job, tblInfo, true) + case model.StateWriteOnly: + oldDefinitions, newDefinitions, err = replaceTruncatePartitions(job, jobCtx.metaMut, tblInfo, oldIDs, newIDs) + if err != nil { + return ver, errors.Trace(err) + } + preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newDefinitions) + // This work as a flag to ignore Global Index entries from the old partitions! + // Used in IDsInDDLToIgnore() for filtering old partitions from + // the global index + pi.DroppingDefinitions = oldDefinitions + // And we don't need to filter for new partitions any longer job.SchemaState = model.StateDeleteOnly pi.DDLState = job.SchemaState - pi.DDLAction = job.Type - ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) + return updateVersionAndTableInfo(jobCtx, job, tblInfo, true) case model.StateDeleteOnly: - // This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition. - // Insert this state to confirm all servers can not see the old partitions when reorg is running, - // so that no new data will be inserted into old partitions when reorganizing. + // Now we don't see the old partitions, but other sessions may still use them. + // So to keep the Global Index consistent, we will still keep it up-to-date with + // the old partitions, as well as the new partitions. + // Also ensures that no writes will happen after GC in DeleteRanges. + job.SchemaState = model.StateDeleteReorganization pi.DDLState = job.SchemaState - ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) + return updateVersionAndTableInfo(jobCtx, job, tblInfo, true) case model.StateDeleteReorganization: - // Step2: clear global index rows. - physicalTableIDs := oldIDs - oldTblInfo := getTableInfoWithOriginalPartitions(tblInfo, oldIDs, newIDs) + // Now the old partitions are no longer accessible, but they are still referenced in + // the global indexes (although allowed to be overwritten). + // So time to clear them. - tbl, err := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, oldTblInfo) - if err != nil { + var done bool + done, err = w.cleanGlobalIndexEntriesFromDroppedPartitions(jobCtx, job, tblInfo, oldIDs) + if err != nil || !done { return ver, errors.Trace(err) } - dbInfo, err := metaMut.GetDatabase(job.SchemaID) - if err != nil { - return ver, errors.Trace(err) + // For the truncatePartitionEvent + oldDefinitions = pi.DroppingDefinitions + newDefinitions = make([]model.PartitionDefinition, 0, len(oldIDs)) + for i, def := range oldDefinitions { + newDef := def.Clone() + newDef.ID = newIDs[i] + newDefinitions = append(newDefinitions, newDef) } - // If table has global indexes, we need reorg to clean up them. - if pt, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) { - // Build elements for compatible with modify column type. elements will not be used when reorganizing. - elements := make([]*meta.Element, 0, len(tblInfo.Indices)) - for _, idxInfo := range tblInfo.Indices { - if idxInfo.Global { - elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) - } - } - sctx, err1 := w.sessPool.Get() - if err1 != nil { - return ver, err1 - } - defer w.sessPool.Put(sctx) - rh := newReorgHandler(sess.NewSession(sctx)) - reorgInfo, err := getReorgInfoFromPartitions(jobCtx.oldDDLCtx.jobContext(job.ID, job.ReorgMeta), jobCtx, rh, job, dbInfo, pt, physicalTableIDs, elements) + // TODO: Test injecting failure - if err != nil || reorgInfo.first { - // If we run reorg firstly, we should update the job snapshot version - // and then run the reorg next time. - return ver, errors.Trace(err) - } - err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (dropIndexErr error) { - defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition", - func() { - dropIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("drop partition panic") - }, false) - return w.cleanupGlobalIndexes(pt, physicalTableIDs, reorgInfo) - }) - if err != nil { - if dbterror.ErrWaitReorgTimeout.Equal(err) { - // if timeout, we should return, check for the owner and re-wait job done. - return ver, nil - } - return ver, errors.Trace(err) - } - } - - // Step3: generate new partition ids and finish rest works - oldPartitions := make([]model.PartitionDefinition, 0, len(oldIDs)) - newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs)) - for _, oldDef := range pi.DroppingDefinitions { - var newID int64 - for i := range oldIDs { - if oldDef.ID == oldIDs[i] { - newID = newIDs[i] - break - } - } - for i := 0; i < len(pi.Definitions); i++ { - def := &pi.Definitions[i] - if newID == def.ID { - oldPartitions = append(oldPartitions, oldDef.Clone()) - newPartitions = append(newPartitions, def.Clone()) - break - } - } - } - if len(newPartitions) == 0 { - job.State = model.JobStateCancelled - return ver, table.ErrUnknownPartition.GenWithStackByArgs(fmt.Sprintf("pid:%v", oldIDs), tblInfo.Name.O) - } - - if err = clearTruncatePartitionTiflashStatus(tblInfo, newPartitions, oldIDs); err != nil { - job.State = model.JobStateCancelled - return ver, err - } - - if err = updateTruncatePartitionLabelRules(job, jobCtx.metaMut, oldPartitions, newPartitions, tblInfo, oldIDs); err != nil { - job.State = model.JobStateCancelled - return ver, err - } - - // Step4: clear DroppingDefinitions and finish job. - tblInfo.Partition.DroppingDefinitions = nil - tblInfo.Partition.NewPartitionIDs = nil - tblInfo.Partition.DDLAction = model.ActionNone - tblInfo.Partition.DDLState = model.StateNone - - preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions) + pi.DroppingDefinitions = nil + pi.NewPartitionIDs = nil + pi.DDLState = model.StateNone + pi.DDLAction = model.ActionNone // used by ApplyDiff in updateSchemaVersion args.ShouldUpdateAffectedPartitions = true @@ -2636,16 +2575,17 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i if err != nil { return ver, errors.Trace(err) } + // Finish this job. + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) truncatePartitionEvent := notifier.NewTruncatePartitionEvent( tblInfo, - &model.PartitionInfo{Definitions: newPartitions}, - &model.PartitionInfo{Definitions: oldPartitions}, + &model.PartitionInfo{Definitions: newDefinitions}, + &model.PartitionInfo{Definitions: oldDefinitions}, ) err = asyncNotifyEvent(jobCtx, truncatePartitionEvent, job, noSubJob, w.sess) if err != nil { return ver, errors.Trace(err) } - // Finish this job. job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) // A background job will be created to delete old partition data. @@ -2653,9 +2593,8 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i OldPartitionIDs: oldIDs, }) default: - err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState) + return ver, dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState) } - return ver, errors.Trace(err) } @@ -2685,7 +2624,6 @@ func updateTruncatePartitionLabelRules(job *model.Job, t *meta.Mutator, oldParti tableBundle, err := placement.NewTableBundle(t, tblInfo) if err != nil { - job.State = model.JobStateCancelled return errors.Trace(err) } @@ -2697,7 +2635,6 @@ func updateTruncatePartitionLabelRules(job *model.Job, t *meta.Mutator, oldParti // These placements groups will be deleted after GC keepDroppedBundles, err := droppedPartitionBundles(t, tblInfo, oldPartitions) if err != nil { - job.State = model.JobStateCancelled return errors.Trace(err) } bundles = append(bundles, keepDroppedBundles...) @@ -3225,7 +3162,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // move the adding definition into tableInfo. updateAddingPartitionInfo(partInfo, tblInfo) orgDefs := tblInfo.Partition.Definitions - _ = updateDroppingPartitionInfo(tblInfo, partNames) + updateDroppingPartitionInfo(tblInfo, partNames) // Reset original partitions, and keep DroppedDefinitions tblInfo.Partition.Definitions = orgDefs diff --git a/pkg/ddl/tests/partition/db_partition_test.go b/pkg/ddl/tests/partition/db_partition_test.go index 6d4c5f6fd3d5c..e50a2d0a2eb5b 100644 --- a/pkg/ddl/tests/partition/db_partition_test.go +++ b/pkg/ddl/tests/partition/db_partition_test.go @@ -1429,6 +1429,15 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) { time.Sleep(10 * time.Millisecond) } } + waitFor(4, "write only") + tkTmp := testkit.NewTestKit(t, store) + tkTmp.MustExec(`begin`) + tkTmp.MustExec("use test") + tkTmp.MustQuery(`select count(*) from test_global`).Check(testkit.Rows("5")) + tk2.MustExec(`rollback`) + tk2.MustExec(`begin`) + tk2.MustExec(`insert into test_global values (5,5,5)`) + tkTmp.MustExec(`rollback`) waitFor(4, "delete only") tk3 := testkit.NewTestKit(t, store) tk3.MustExec(`begin`) @@ -1437,16 +1446,21 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) { tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Point_Get") tk3.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows()) tk3.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows()) - // Here it will fail with - // the partition is not in public. err := tk3.ExecToErr(`insert into test_global values (15,15,15)`) require.Error(t, err) - require.ErrorContains(t, err, "the partition is in not in public") + require.ErrorContains(t, err, "[kv:1062]Duplicate entry '15' for key 'test_global.idx_b'") tk2.MustExec(`commit`) + waitFor(4, "delete reorganization") + tk2.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows()) + tk2.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows()) + err = tk2.ExecToErr(`insert into test_global values (15,15,15)`) + require.NoError(t, err) + tk2.MustExec(`begin`) tk3.MustExec(`commit`) + tk.MustExec(`commit`) <-syncChan result := tk.MustQuery("select * from test_global;") - result.Sort().Check(testkit.Rows(`1 1 1`, `2 2 2`, `5 5 5`)) + result.Sort().Check(testkit.Rows(`1 1 1`, `15 15 15`, `2 2 2`, `5 5 5`)) tt = external.GetTableByName(t, tk, "test", "test_global") idxInfo := tt.Meta().FindIndexByName("idx_b") @@ -1487,12 +1501,12 @@ func TestGlobalIndexUpdateInTruncatePartition(t *testing.T) { tk1 := testkit.NewTestKit(t, store) tk1.MustExec("use test") err := tk1.ExecToErr("update test_global set a = 2 where a = 11") - assert.NotNil(t, err) + assert.NoError(t, err) } }) tk.MustExec("alter table test_global truncate partition p1") - tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("11 11 11", "12 12 12")) + tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("2 11 11", "12 12 12")) } func TestGlobalIndexUpdateInTruncatePartition4Hash(t *testing.T) { @@ -1515,7 +1529,7 @@ func TestGlobalIndexUpdateInTruncatePartition4Hash(t *testing.T) { tk1 := testkit.NewTestKit(t, store) tk1.MustExec("use test") err = tk1.ExecToErr("update test_global set a = 1 where a = 12") - assert.NotNil(t, err) + assert.NoError(t, err) } }) @@ -1577,7 +1591,7 @@ func TestGlobalIndexInsertInTruncatePartition(t *testing.T) { tk1 := testkit.NewTestKit(t, store) tk1.MustExec("use test") err = tk1.ExecToErr("insert into test_global values(2, 2, 2)") - assert.NotNil(t, err) + assert.NoError(t, err) } }) @@ -3168,6 +3182,8 @@ func TestRemovePartitioningAutoIDs(t *testing.T) { tk2.MustExec(`COMMIT`) /* + // Currently there is an duplicate entry issue, so it will rollback in WriteReorganization + // instead of continuing. waitFor(4, "t", "delete reorganization") tk2.MustExec(`BEGIN`) tk2.MustExec(`insert into t values (null, 24)`) @@ -3655,3 +3671,23 @@ func checkGlobalAndPK(t *testing.T, tk *testkit.TestKit, name string, indexes in require.True(t, idxInfo.Primary) } } + +func TestTruncateNumberOfPhases(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t (a int primary key , b varchar(255)) partition by hash(a) partitions 3`) + ctx := tk.Session() + dom := domain.GetDomain(ctx) + schemaVersion := dom.InfoSchema().SchemaMetaVersion() + tk.MustExec(`insert into t values (1,1),(2,2),(3,3)`) + tk.MustExec(`alter table t truncate partition p1`) + // Without global index, truncate partition should be a single state change + require.Equal(t, int64(4), dom.InfoSchema().SchemaMetaVersion()-schemaVersion) + tk.MustExec(`drop table t`) + tk.MustExec(`create table t (a int primary key , b varchar(255), unique key (b) global) partition by hash(a) partitions 3`) + schemaVersion = dom.InfoSchema().SchemaMetaVersion() + tk.MustExec(`insert into t values (1,1),(2,2),(3,3)`) + tk.MustExec(`alter table t truncate partition p1`) + require.Equal(t, int64(4), dom.InfoSchema().SchemaMetaVersion()-schemaVersion) +} diff --git a/pkg/ddl/tests/partition/multi_domain_test.go b/pkg/ddl/tests/partition/multi_domain_test.go index 5b1503251e4ed..f3a21cf6fe862 100644 --- a/pkg/ddl/tests/partition/multi_domain_test.go +++ b/pkg/ddl/tests/partition/multi_domain_test.go @@ -51,10 +51,7 @@ func TestMultiSchemaReorganizePartitionIssue56819(t *testing.T) { tkO.MustQuery(`select * from t where b = "4"`).Sort().Check(testkit.Rows("4 4")) } } - postFn := func(_ *testkit.TestKit, _ kv.Storage) { - // nothing - } - runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) + runMultiSchemaTest(t, createSQL, alterSQL, initFn, nil, loopFn) } func TestMultiSchemaDropRangePartition(t *testing.T) { @@ -459,6 +456,140 @@ func TestMultiSchemaReorganizePartition(t *testing.T) { runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) } +// TestMultiSchemaModifyColumn to show behavior when changing a column +func TestMultiSchemaModifyColumn(t *testing.T) { + createSQL := `create table t (a int primary key, b varchar(255), unique key uk_b (b))` + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(`insert into t values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9)`) + } + alterSQL := `alter table t modify column b int unsigned not null` + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + switch schemaState { + case model.StateDeleteOnly.String(): + // we are only interested in StateWriteReorganization + case model.StateWriteOnly.String(): + // we are only interested in StateDeleteReorganization->StatePublic + case model.StateWriteReorganization.String(): + case model.StatePublic.String(): + // tkNO sees varchar column and tkO sees int column + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` int(10) unsigned NOT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `uk_b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tkNO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `uk_b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + tkO.MustExec(`insert into t values (10, " 09.60 ")`) + // No warning!? Same in MySQL... + tkNO.MustQuery(`show warnings`).Check(testkit.Rows()) + tkNO.MustContainErrMsg(`insert into t values (11, "09.60")`, "[kv:1062]Duplicate entry '10' for key 't._Idx$_uk_b_0'") + tkO.MustQuery(`select * from t where a = 10`).Check(testkit.Rows("10 10")) + // ?!? + tkNO.MustQuery(`select * from t where a = 10`).Check(testkit.Rows("10 ")) + // If the original b was defined as 'NOT NULL', then it would give an error: + // [table:1364]Field 'b' doesn't have a default value + + tkNO.MustExec(`insert into t values (11, " 011.50 ")`) + tkNO.MustQuery(`show warnings`).Check(testkit.Rows()) + // Anomaly, the different sessions sees different data. + // So it should be acceptable for partitioning DDLs as well. + // It may be possible to check that writes from StateWriteOnly convert 1:1 + // to the new type, and block writes otherwise. But then it would break the first tkO insert above... + tkO.MustQuery(`select * from t where a = 11`).Check(testkit.Rows("11 12")) + tkNO.MustQuery(`select * from t where a = 11`).Check(testkit.Rows("11 011.50 ")) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Greater(t, tblO.Columns[1].ID, tblNO.Columns[1].ID) + // This also means that old copies of the columns will be left in the row, until the row is updated or deleted. + // But I guess that is at least documented. + default: + require.Failf(t, "unhandled schema state '%s'", schemaState) + } + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, nil, loopFn) +} + +// TestMultiSchemaDropUniqueIndex to show behavior when +// dropping a unique index +func TestMultiSchemaDropUniqueIndex(t *testing.T) { + createSQL := `create table t (a int primary key, b varchar(255), unique key uk_b (b))` + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(`insert into t values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9)`) + } + alterSQL := `alter table t drop index uk_b` + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + switch schemaState { + case "write only": + tkNO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `uk_b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tkO.MustContainErrMsg(`insert into t values (10,1)`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + tkNO.MustContainErrMsg(`insert into t values (10,1)`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + case "delete only": + // Delete only from the uk_b unique index, cannot have errors + tkO.MustExec(`insert into t values (10,1)`) + tkO.MustExec(`insert into t values (11,11)`) + tkO.MustExec(`delete from t where a = 2`) + // Write only for uk_b, we cannot find anything through the index or read from the index, but still gives duplicate keys on insert/updates + // So we already have two duplicates of b = 1, but only one in the unique index uk_a, so here we cannot insert any. + tkNO.MustContainErrMsg(`insert into t values (12,1)`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + tkNO.MustContainErrMsg(`update t set b = 1 where a = 9`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + // Deleted from the index! + tkNO.MustExec(`insert into t values (13,2)`) + tkNO.MustContainErrMsg(`insert into t values (14,3)`, "[kv:1062]Duplicate entry '3' for key 't.uk_b'") + // b = 11 never written to the index! + tkNO.MustExec(`insert into t values (15,11)`) + tkNO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + case "delete reorganization": + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + case "none": + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + default: + require.Failf(t, "unhandled schema state '%s'", schemaState) + } + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, nil, loopFn) +} + // TODO: Also add test for REMOVE PARTITIONING! ///* //// TODO: complete this test, so that we test all four changes: @@ -603,11 +734,11 @@ func runMultiSchemaTest(t *testing.T, createSQL, alterSQL string, initFn func(*t logutil.BgLogger().Info("XXXXXXXXXXX Hook released", zap.String("job.State", job.State.String()), zap.String("job.SchemaStage", job.SchemaState.String())) } testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunAfter", hookFunc) - alterChan := make(chan struct{}) + alterChan := make(chan error) go func() { - tkDDLOwner.MustExec(alterSQL) - logutil.BgLogger().Info("XXXXXXXXXXX drop partition done!") - alterChan <- struct{}{} + err := tkDDLOwner.ExecToErr(alterSQL) + logutil.BgLogger().Info("XXXXXXXXXXX DDL done!", zap.String("alterSQL", alterSQL)) + alterChan <- err }() // Skip the first state, since we want to compare before vs after in the loop <-hookChan @@ -620,7 +751,8 @@ func runMultiSchemaTest(t *testing.T, createSQL, alterSQL string, initFn func(*t for { select { case <-hookChan: - case <-alterChan: + case err := <-alterChan: + require.NoError(t, err) releaseHook = false logutil.BgLogger().Info("XXXXXXXXXXX release hook") break @@ -690,3 +822,386 @@ func HaveEntriesForTableIndex(t *testing.T, tk *testkit.TestKit, tableID, indexI } return false } + +// TestMultiSchemaTruncatePartitionWithGlobalIndex to show behavior when +// truncating a partition with a global index +func TestMultiSchemaTruncatePartitionWithGlobalIndex(t *testing.T) { + // TODO: Also test non-int PK, multi-column PK + createSQL := `create table t (a int primary key, b varchar(255), c varchar(255) default 'Filler', unique key uk_b (b) global) partition by hash (a) partitions 2` + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(`insert into t (a,b) values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7)`) + } + alterSQL := `alter table t truncate partition p1` + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + logutil.BgLogger().Info("XXXXXXXXXXX loopFn", zap.String("schemaState", schemaState)) + switch schemaState { + case "write only": + // tkNO is seeing state None, so unaware of DDL + // tkO is seeing state write only, so using the old partition, + // but are aware of new ids, so should filter them from global index reads. + // Duplicate key errors (from delete only state) are allowed on insert/update, + // even if it cannot read them from the global index, due to filtering. + rows := tkNO.MustQuery(`select * from t`).Sort().Rows() + tkO.MustQuery(`select * from t`).Sort().Check(rows) + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateNone, tblNO.Partition.DDLState) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateWriteOnly, tblO.Partition.DDLState) + require.Equal(t, tblNO.Partition.Definitions[1].ID, tblO.Partition.Definitions[1].ID) + tkNO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + tkO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + case "delete only": + // tkNO is seeing state write only, so still can access the dropped partition + // tkO is seeing state delete only, so cannot see the dropped partition, + // but must still write to the shared global indexes. + // So they will get errors on the same entries in the global index. + + tkNO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + tkO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateWriteOnly, tblNO.Partition.DDLState) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateDeleteOnly, tblO.Partition.DDLState) + require.NotEqual(t, tblNO.Partition.Definitions[1].ID, tblO.Partition.Definitions[1].ID) + + tkNO.MustExec(`insert into t values (21,21,"OK")`) + tkNO.MustExec(`insert into t values (23,23,"OK")`) + tkO.MustContainErrMsg(`insert into t values (21,21,"Duplicate key")`, "[kv:1062]Duplicate entry '21' for key 't.uk_b'") + tkO.MustContainErrMsg(`insert into t values (6,23,"Duplicate key")`, "[kv:1062]Duplicate entry '23' for key 't.uk_b'") + // Primary is not global, so here we can insert into the new partition, without + // conflicting to the old one + tkO.MustExec(`insert into t values (21,25,"OK")`) + tkO.MustExec(`insert into t values (99,99,"OK")`) + tkNO.MustContainErrMsg(`insert into t values (8,25,"Duplicate key")`, "[kv:1062]Duplicate entry '25' for key 't.uk_b'") + // type differences, cannot use index + tkNO.MustQuery(`select count(*) from t where b = 25`).Check(testkit.Rows("0")) + tkNO.MustQuery(`select b from t where b = 25`).Check(testkit.Rows()) + // PointGet should not find new partitions for StateWriteOnly + tkNO.MustQuery(`select count(*) from t where b = "25"`).Check(testkit.Rows("0")) + tkNO.MustQuery(`select b from t where b = "25"`).Check(testkit.Rows()) + tkNO.MustExec(`update t set a = 2, c = "'a' Updated" where b = "25"`) + require.Equal(t, uint64(0), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkNO.MustExec(`update t set a = 2, c = "'a' Updated" where b = "25"`) + require.Equal(t, uint64(0), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + // Primary is not global, so here we can insert into the old partition, without + // conflicting to the new one + tkO.MustQuery(`select count(*) from t where a = 99`).Check(testkit.Rows("1")) + tkNO.MustExec(`insert into t values (99,27,"OK")`) + + tkO.MustQuery(`select count(*) from t where b = "23"`).Check(testkit.Rows("0")) + tkO.MustExec(`update t set a = 2, c = "'a' Updated" where b = "23"`) + require.Equal(t, uint64(0), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkNO.MustQuery(`select count(*) from t where a = 23`).Check(testkit.Rows("1")) + tkNO.MustQuery(`select * from t where a = 23`).Check(testkit.Rows("23 23 OK")) + tkNO.MustExec(`update t set b = 10 where a = 23`) + require.Equal(t, uint64(1), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkNO.MustExec(`update t set b = 23 where a = 23`) + require.Equal(t, uint64(1), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkNO.MustContainErrMsg(`update t set b = 25 where a = 23`, "[kv:1062]Duplicate entry '25' for key 't.uk_b'") + tkO.MustExec(`update t set b = 23 where a = 25`) + require.Equal(t, uint64(0), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkO.MustContainErrMsg(`update t set b = 21 where a = 21`, "[kv:1062]Duplicate entry '21' for key 't.uk_b'") + tkO.MustContainErrMsg(`update t set b = 23 where b = "25"`, "[kv:1062]Duplicate entry '23' for key 't.uk_b'") + + tkO.MustExec(`update t set b = 29 where a = 21`) + require.Equal(t, uint64(1), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkNO.MustExec(`update t set b = 25 where b = "27"`) + require.Equal(t, uint64(1), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkO.MustExec(`update t set b = 27, a = 27 where b = "29"`) + require.Equal(t, uint64(1), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "1 1 Filler", + "2 2 Filler", + "21 21 OK", + "23 23 OK", + "3 3 Filler", + "4 4 Filler", + "5 5 Filler", + "6 6 Filler", + "7 7 Filler", + "99 25 OK")) + tkNO.MustQuery(`select b from t order by b`).Check(testkit.Rows(""+ + "1", + "2", + "21", + "23", + "25", + "3", + "4", + "5", + "6", + "7")) + + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "2 2 Filler", + "27 27 OK", + "4 4 Filler", + "6 6 Filler", + "99 99 OK")) + tkO.MustQuery(`select b from t order by b`).Check(testkit.Rows(""+ + "2", + "27", + "4", + "6", + "99")) + // TODO: Add tests for delete + case "delete reorganization": + // tkNO is seeing state delete only, so cannot see the dropped partition, + // but must still must give duplicate errors when writes to the global indexes collide + // with the dropped partitions. + // tkO is seeing state delete reorganization, so cannot see the dropped partition, + // and can ignore the dropped partitions entries in the Global Indexes, i.e. overwrite them! + rows := tkO.MustQuery(`select * from t`).Sort().Rows() + tkNO.MustQuery(`select * from t`).Sort().Check(rows) + rows = tkO.MustQuery(`select b from t order by b`).Rows() + tkNO.MustQuery(`select b from t order by b`).Check(rows) + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateDeleteOnly, tblNO.Partition.DDLState) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateDeleteReorganization, tblO.Partition.DDLState) + require.Equal(t, tblNO.Partition.Definitions[1].ID, tblO.Partition.Definitions[1].ID) + tkO.MustQuery(`select b from t where b = "1"`).Check(testkit.Rows()) + tkO.MustExec(`insert into t values (1,1,"OK")`) + tkO.MustQuery(`select b from t where b = "1"`).Check(testkit.Rows("1")) + tkO.MustQuery(`select b from t where b = 1`).Check(testkit.Rows("1")) + tkO.MustContainErrMsg(`insert into t values (3,1,"Duplicate")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + // b = 23 was inserted into the dropped partition, OK to delete + tkO.MustExec(`insert into t values (10,23,"OK")`) + tkNO.MustExec(`insert into t values (41,41,"OK")`) + tkNO.MustContainErrMsg(`insert into t values (12,25,"Duplicate key")`, "[kv:1062]Duplicate entry '25' for key 't.uk_b'") + tkNO.MustContainErrMsg(`insert into t values (25,25,"Duplicate key")`, "[kv:1062]Duplicate entry '25' for key 't.uk_b'") + tkNO.MustContainErrMsg(`insert into t values (41,27,"Duplicate key")`, "[kv:1062]Duplicate entry '27' for key 't.uk_b'") + tkO.MustExec(`insert into t values (43,43,"OK")`) + tkO.MustContainErrMsg(`insert into t values (44,43,"Duplicate key")`, "[kv:1062]Duplicate entry '43' for key 't.uk_b'") + tkNO.MustContainErrMsg(`update t set b = 5 where a = 41`, "[kv:1062]Duplicate entry '5' for key 't.uk_b'") + tkNO.MustExec(`update t set a = 5 where b = "41"`) + require.Equal(t, uint64(1), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkO.MustExec(`update t set a = 7 where b = "43"`) + require.Equal(t, uint64(1), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + // This should be silently deleted / overwritten + tkO.MustExec(`update t set b = 5 where b = "43"`) + require.Equal(t, uint64(1), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkO.MustExec(`update t set b = 3 where b = 41`) + require.Equal(t, uint64(1), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + rows = tkNO.MustQuery(`select * from t`).Sort().Rows() + tkO.MustQuery(`select * from t`).Sort().Check(rows) + case "none": + tkNO.MustExec(`insert into t values (81,81,"OK")`) + tkO.MustContainErrMsg(`insert into t values (81,81,"Duplicate key")`, "[kv:1062]Duplicate entry '81' for key 't.uk_b'") + tkNO.MustExec(`insert into t values (85,85,"OK")`) + tkO.MustExec(`insert into t values (87,87,"OK")`) + rows := tkNO.MustQuery(`select * from t`).Sort().Rows() + tkO.MustQuery(`select * from t`).Sort().Check(rows) + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateDeleteReorganization, tblNO.Partition.DDLState) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateNone, tblO.Partition.DDLState) + default: + require.Failf(t, "unhandled schema state '%s'", schemaState) + } + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, nil, loopFn) +} + +func TestMultiSchemaTruncatePartitionWithPKGlobal(t *testing.T) { + createSQL := `create table t (a int primary key nonclustered global, b int, c varchar(255) default 'Filler', unique key uk_b (b)) partition by hash (b) partitions 2` + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(`insert into t (a,b) values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7)`) + } + alterSQL := `alter table t truncate partition p1` + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + switch schemaState { + case "write only": + // tkNO is seeing state None, so unaware of DDL + // tkO is seeing state write only, so using the old partition, + // but are aware of new ids, so should filter them from global index reads. + // Duplicate key errors (from delete only state) are allowed on insert/update, + // even if it cannot read them from the global index, due to filtering. + rows := tkNO.MustQuery(`select * from t`).Sort().Rows() + tkO.MustQuery(`select * from t`).Sort().Check(rows) + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateNone, tblNO.Partition.DDLState) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateWriteOnly, tblO.Partition.DDLState) + require.Equal(t, tblNO.Partition.Definitions[1].ID, tblO.Partition.Definitions[1].ID) + case "delete only": + // tkNO is seeing state write only, so still can access the dropped partition + // tkO is seeing state delete only, so cannot see the dropped partition, + // but must still write to the shared global indexes. + // So they will get errors on the same entries in the global index. + + tkNO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.") + tkNO.MustContainErrMsg(`insert into t values (11,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + + tkO.MustQuery(`select a from t where a = 1`).Check(testkit.Rows()) + // OK! PK violation due to old partition is still accessible!!! + // Similar to when dropping a unique index, see TestMultiSchemaDropUniqueIndex + tkO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.PRIMARY'") + // Note that PK (global) is not violated! and Unique key (b) is not global, + // and the partition is dropped, so OK to write. + tkO.MustExec(`insert into t values (11,1,"OK, non global unique index")`) + // The anomaly here is that tkNO and tkO sees different versions of the table, + // and therefore different data! + tkO.MustQuery(`select * from t where b = 1`).Check(testkit.Rows("11 1 OK, non global unique index")) + tkNO.MustQuery(`select * from t where b = 1`).Check(testkit.Rows("1 1 Filler")) + + tkO.MustExec(`insert into t values (13,13,"OK")`) + tkNO.MustExec(`insert into t values (15,13,"OK, non global unique index")`) + tkO.MustQuery(`select * from t where b = 13`).Check(testkit.Rows("13 13 OK")) + tkNO.MustQuery(`select * from t where b = 13`).Check(testkit.Rows("15 13 OK, non global unique index")) + + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateWriteOnly, tblNO.Partition.DDLState) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateDeleteOnly, tblO.Partition.DDLState) + require.NotEqual(t, tblNO.Partition.Definitions[1].ID, tblO.Partition.Definitions[1].ID) + + tkNO.MustExec(`insert into t values (21,21,"OK")`) + tkNO.MustExec(`insert into t values (23,23,"OK")`) + tkO.MustContainErrMsg(`insert into t values (21,21,"Duplicate key")`, "[kv:1062]Duplicate entry '21' for key 't.PRIMARY'") + tkO.MustContainErrMsg(`insert into t values (6,23,"Duplicate key")`, "[kv:1062]Duplicate entry '6' for key 't.PRIMARY'") + // Primary is global, so here we cannot insert into the new partition, without + // conflicting to the old one + tkO.MustContainErrMsg(`insert into t values (21,25,"Duplicate key")`, "[kv:1062]Duplicate entry '21' for key 't.PRIMARY'") + tkO.MustExec(`insert into t values (25,25,"OK")`) + // Should be able to insert to the new partition, with a duplicate of non-global key + tkNO.MustExec(`insert into t values (95,25,"OK, non global unique key")`) + tkNO.MustContainErrMsg(`insert into t values (25,95,"Duplicate key")`, "[kv:1062]Duplicate entry '25' for key 't.PRIMARY'") + // PointGet should not find new partitions for StateWriteOnly + tkNO.MustQuery(`select count(*) from t where a = 25`).Check(testkit.Rows("0")) + tkNO.MustQuery(`select count(*) from t where b = 25`).Check(testkit.Rows("1")) + tkNO.MustQuery(`select * from t where b = 25`).Check(testkit.Rows("95 25 OK, non global unique key")) + tkNO.MustExec(`update t set a = 17, c = "Updated" where b = 25`) + require.Equal(t, uint64(1), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + + tkO.MustQuery(`select count(*) from t where b = 23`).Check(testkit.Rows("0")) + tkO.MustExec(`update t set a = 19, c = "Updated" where b = 23`) + require.Equal(t, uint64(0), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkNO.MustQuery(`select count(*) from t where a = 23`).Check(testkit.Rows("1")) + tkNO.MustQuery(`select * from t where a = 23`).Check(testkit.Rows("23 23 OK")) + tkNO.MustExec(`update t set b = 10 where a = 23`) + require.Equal(t, uint64(1), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkNO.MustExec(`update t set b = 23 where a = 23`) + require.Equal(t, uint64(1), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) + tkNO.MustContainErrMsg(`update t set b = 25 where a = 23`, "[kv:1062]Duplicate entry '25' for key 't.uk_b'") + tkO.MustExec(`update t set b = 23 where a = 25`) + require.Equal(t, uint64(1), tkO.Session().GetSessionVars().StmtCtx.AffectedRows()) + // non-global unique index + // Same state's partition: + tkO.MustContainErrMsg(`update t set b = 23 where a = 13`, "[kv:1062]Duplicate entry '23' for key 't.uk_b'") + tkNO.MustContainErrMsg(`update t set b = 23 where a = 21`, "[kv:1062]Duplicate entry '23' for key 't.uk_b'") + // Others state's partition: + tkO.MustExec(`update t set b = 21, c = "Updated" where a = 13`) + tkO.MustExec(`insert into t values (19,19, "OK")`) + tkNO.MustExec(`update t set b = 19, c = "Updated" where a = 21`) + + // PK + // Same state's partition: + tkO.MustContainErrMsg(`update t set a = 13 where b = 19`, "[kv:1062]Duplicate entry '13' for key 't.PRIMARY'") + tkNO.MustContainErrMsg(`update t set a = 7 where b = 3`, "[kv:1062]Duplicate entry '7' for key 't.PRIMARY'") + // Others state's partition: + tkO.MustContainErrMsg(`update t set a = 7 where b = 19`, "[kv:1062]Duplicate entry '7' for key 't.PRIMARY'") + tkNO.MustContainErrMsg(`update t set a = 13 where b = 13`, "[kv:1062]Duplicate entry '13' for key 't.PRIMARY'") + + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "0 0 Filler", + "1 1 Filler", + "15 13 OK, non global unique index", + "17 25 Updated", + "2 2 Filler", + "21 19 Updated", + "23 23 OK", + "3 3 Filler", + "4 4 Filler", + "5 5 Filler", + "6 6 Filler", + "7 7 Filler")) + + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "0 0 Filler", + "11 1 OK, non global unique index", + "13 21 Updated", + "19 19 OK", + "2 2 Filler", + "25 23 OK", + "4 4 Filler", + "6 6 Filler")) + tkO.MustExec(`admin check table t`) + tkNO.MustExec(`admin check table t`) + // TODO: Add tests for delete as well + + case "delete reorganization": + // tkNO is seeing state delete only, so cannot see the dropped partition, + // but must still must give duplicate errors when writes to the global indexes collide + // with the dropped partitions. + // tkO is seeing state delete reorganization, so cannot see the dropped partition, + // and can ignore the dropped partitions entries in the Global Indexes, i.e. overwrite them! + rows := tkO.MustQuery(`select * from t`).Sort().Rows() + tkNO.MustQuery(`select * from t`).Sort().Check(rows) + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateDeleteOnly, tblNO.Partition.DDLState) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateDeleteReorganization, tblO.Partition.DDLState) + require.Equal(t, tblNO.Partition.Definitions[1].ID, tblO.Partition.Definitions[1].ID) + tkO.MustQuery(`select a, b from t where b = 1`).Check(testkit.Rows("11 1")) + tkO.MustQuery(`select b from t where a = 1`).Check(testkit.Rows()) + tkO.MustContainErrMsg(`insert into t values (3,1,"Duplicate")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + // Old partition should be OK to overwrite for tkO, but not tkNO! + tkO.MustExec(`insert into t values (3,3,"OK")`) + tkNO.MustContainErrMsg(`insert into t values (5,5, "Duplicate pk")`, "[kv:1062]Duplicate entry '5' for key 't.PRIMARY'") + tkO.MustExec(`update t set a = 5 where b = 3`) + tkNO.MustContainErrMsg(`update t set a = 7 where b = 3`, "[kv:1062]Duplicate entry '7' for key 't.PRIMARY'") + res := tkNO.MustQuery(`select * from t`).Sort() + res.Check(testkit.Rows(""+ + "0 0 Filler", + "11 1 OK, non global unique index", + "13 21 Updated", + "19 19 OK", + "2 2 Filler", + "25 23 OK", + "4 4 Filler", + "5 3 OK", + "6 6 Filler")) + tkO.MustQuery(`select * from t`).Sort().Check(res.Rows()) + + tkO.MustExec(`admin check table t`) + tkNO.MustExec(`admin check table t`) + case "none": + tkNO.MustExec(`insert into t values (81,81,"OK")`) + tkO.MustContainErrMsg(`insert into t values (81,81,"Duplicate key")`, "[kv:1062]Duplicate entry '81' for key 't.") + tkNO.MustExec(`insert into t values (85,85,"OK")`) + tkO.MustExec(`insert into t values (87,87,"OK")`) + rows := tkNO.MustQuery(`select * from t`).Sort().Rows() + tkO.MustQuery(`select * from t`).Sort().Check(rows) + tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateDeleteReorganization, tblNO.Partition.DDLState) + tblO, err := tkO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, model.StateNone, tblO.Partition.DDLState) + require.Equal(t, tblNO.Partition.Definitions[1].ID, tblO.Partition.Definitions[1].ID) + default: + require.Fail(t, "Unhandled schema state", "State: '%s'", schemaState) + } + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, nil, loopFn) +} diff --git a/pkg/ddl/tests/tiflash/ddl_tiflash_test.go b/pkg/ddl/tests/tiflash/ddl_tiflash_test.go index 32fdebd9dc124..7d8472ae956d7 100644 --- a/pkg/ddl/tests/tiflash/ddl_tiflash_test.go +++ b/pkg/ddl/tests/tiflash/ddl_tiflash_test.go @@ -411,20 +411,23 @@ func TestTiFlashFailTruncatePartition(t *testing.T) { s, teardown := createTiFlashContext(t) defer teardown() tk := testkit.NewTestKit(t, s.store) + // TODO: Fix this test, when fixing rollback in https://github.com/pingcap/tidb/pull/56029 + tk.MustExec("set @@global.tidb_ddl_error_count_limit = 3") tk.MustExec("use test") tk.MustExec("drop table if exists ddltiflash") tk.MustExec("create table ddltiflash(i int not null, s varchar(255)) partition by range (i) (partition p0 values less than (10), partition p1 values less than (20))") tk.MustExec("alter table ddltiflash set tiflash replica 1") - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/FailTiFlashTruncatePartition", `return`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/FailTiFlashTruncatePartition", `5*return`)) defer func() { failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/FailTiFlashTruncatePartition") }() time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) tk.MustExec("insert into ddltiflash values(1, 'abc'), (11, 'def')") - tk.MustGetErrMsg("alter table ddltiflash truncate partition p1", "[ddl:-1]enforced error") + tk.MustExec("alter table ddltiflash truncate partition p1") + //tk.MustGetErrMsg("alter table ddltiflash truncate partition p1", "[ddl:-1]enforced error") time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash") } diff --git a/pkg/executor/test/admintest/admin_test.go b/pkg/executor/test/admintest/admin_test.go index 5abeac81b11a0..66aab286beabf 100644 --- a/pkg/executor/test/admintest/admin_test.go +++ b/pkg/executor/test/admintest/admin_test.go @@ -2091,8 +2091,8 @@ func TestAdminCheckGlobalIndexDuringDDL(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecution")) testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated") - // Should have 3 different schema states, `none`, `deleteOnly`, `deleteReorg` - require.Len(t, schemaMap, 3) + // Should have 4 different schema states, `none`, `writeOnly`, `deleteOnly`, `deleteReorg` + require.Len(t, schemaMap, 4) for ss := range schemaMap { delete(schemaMap, ss) } diff --git a/pkg/meta/model/table.go b/pkg/meta/model/table.go index 17c963260851d..be68126488354 100644 --- a/pkg/meta/model/table.go +++ b/pkg/meta/model/table.go @@ -843,16 +843,6 @@ func (pi *PartitionInfo) GCPartitionStates() { pi.States = newStates } -// HasTruncatingPartitionID checks whether the pid is truncating. -func (pi *PartitionInfo) HasTruncatingPartitionID(pid int64) bool { - for i := range pi.NewPartitionIDs { - if pi.NewPartitionIDs[i] == pid { - return true - } - } - return false -} - // ClearReorgIntermediateInfo remove intermediate information used during reorganize partition. func (pi *PartitionInfo) ClearReorgIntermediateInfo() { pi.DDLAction = ActionNone @@ -1018,9 +1008,6 @@ func (pi *PartitionInfo) SetOriginalPartitionIDs() { // For example during truncate or drop partition. func (pi *PartitionInfo) IDsInDDLToIgnore() []int64 { // TODO: - // Truncate partition: - // write only => should not see NewPartitionIDs - // delete only => should not see DroppingPartitions // Drop partition: // TODO: Make similar changes as in Truncate Partition: // Add a state blocking read and write in the partitions to be dropped, diff --git a/pkg/planner/core/find_best_task.go b/pkg/planner/core/find_best_task.go index c6beb7f89b830..275d614bec2a0 100644 --- a/pkg/planner/core/find_best_task.go +++ b/pkg/planner/core/find_best_task.go @@ -2266,6 +2266,7 @@ func (is *PhysicalIndexScan) addSelectionConditionForGlobalIndex(p *logicalop.Da return nil, err } needNot := false + // TODO: Move all this into PartitionPruning or the PartitionProcessor! pInfo := p.TableInfo.GetPartitionInfo() if len(idxArr) == 1 && idxArr[0] == FullRange { // Filter away partitions that may exists in Global Index, diff --git a/pkg/planner/core/point_get_plan.go b/pkg/planner/core/point_get_plan.go index e42146f8c9d1b..a4b12096fb001 100644 --- a/pkg/planner/core/point_get_plan.go +++ b/pkg/planner/core/point_get_plan.go @@ -1394,6 +1394,7 @@ func checkTblIndexForPointPlan(ctx base.PlanContext, tblName *resolve.TableNameW if idxInfo.Global { if tblName.TableInfo == nil || len(tbl.GetPartitionInfo().AddingDefinitions) > 0 || + len(tbl.GetPartitionInfo().NewPartitionIDs) > 0 || len(tbl.GetPartitionInfo().DroppingDefinitions) > 0 { continue } diff --git a/pkg/table/tables/index.go b/pkg/table/tables/index.go index 4516ff8283d72..cdf502ee2733f 100644 --- a/pkg/table/tables/index.go +++ b/pkg/table/tables/index.go @@ -179,6 +179,15 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu } writeBufs := sctx.GetMutateBuffers().GetWriteStmtBufs() skipCheck := opt.DupKeyCheck() == table.DupKeyCheckSkip + allowOverwriteOfOldGlobalIndex := false + if c.idxInfo.Global && c.tblInfo.Partition.DDLState == model.StateDeleteReorganization && + // TODO: Also do the same for DROP PARTITION + c.tblInfo.Partition.DDLAction == model.ActionTruncateTablePartition { + allowOverwriteOfOldGlobalIndex = true + if len(c.tblInfo.Partition.DroppingDefinitions) > 0 { + skipCheck = false + } + } evalCtx := sctx.GetExprCtx().GetEvalCtx() loc, ec := evalCtx.Location(), evalCtx.ErrCtx() for _, value := range indexedValues { @@ -281,7 +290,31 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu } var value []byte - if c.tblInfo.TempTableType != model.TempTableNone { + if allowOverwriteOfOldGlobalIndex { + // In DeleteReorganization, overwrite Global Index keys pointing to + // old dropped/truncated partitions. + // Note that a partitioned table cannot be temporary table + value, err = txn.Get(ctx, key) + if err == nil && len(value) != 0 { + handle, errPart := tablecodec.DecodeHandleInIndexValue(value) + if errPart != nil { + return nil, errPart + } + if partHandle, ok := handle.(kv.PartitionHandle); ok { + for _, id := range c.tblInfo.Partition.IDsInDDLToIgnore() { + if id == partHandle.PartitionID { + // Simply overwrite it + err = txn.SetAssertion(key, kv.SetAssertUnknown) + if err != nil { + return nil, err + } + value = nil + break + } + } + } + } + } else if c.tblInfo.TempTableType != model.TempTableNone { // Always check key for temporary table because it does not write to TiKV value, err = txn.Get(ctx, key) } else if opt.DupKeyCheck() == table.DupKeyCheckLazy && !keyIsTempIdxKey { diff --git a/pkg/table/tables/mutation_checker.go b/pkg/table/tables/mutation_checker.go index 7ac1fc12f60b5..f894e3d4e3562 100644 --- a/pkg/table/tables/mutation_checker.go +++ b/pkg/table/tables/mutation_checker.go @@ -88,6 +88,7 @@ func checkDataConsistency( extraIndexesLayout table.IndexesLayout, ) error { if t.Meta().GetPartitionInfo() != nil { + // TODO: Support check for partitions as well return nil } if txn.IsPipelined() { diff --git a/pkg/table/tables/partition.go b/pkg/table/tables/partition.go index 331a581b0cd13..0ddc1b6277238 100644 --- a/pkg/table/tables/partition.go +++ b/pkg/table/tables/partition.go @@ -180,9 +180,20 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part partitions[p.ID] = &t } ret.partitions = partitions - if pi.DDLAction != model.ActionReorganizePartition && - pi.DDLAction != model.ActionRemovePartitioning && - pi.DDLAction != model.ActionAlterTablePartitioning { + switch pi.DDLAction { + case model.ActionReorganizePartition, model.ActionRemovePartitioning, + model.ActionAlterTablePartitioning: + // continue after switch! + case model.ActionTruncateTablePartition: + for _, def := range pi.DroppingDefinitions { + p, err := initPartition(ret, def) + if err != nil { + return nil, err + } + partitions[def.ID] = p + } + fallthrough + default: return ret, nil } // In StateWriteReorganization we are using the 'old' partition definitions @@ -1735,9 +1746,6 @@ func partitionedTableAddRecord(ctx table.MutateContext, txn kv.Transaction, t *p return nil, errors.WithStack(table.ErrRowDoesNotMatchGivenPartitionSet) } } - if t.Meta().Partition.HasTruncatingPartitionID(pid) { - return nil, errors.WithStack(dbterror.ErrInvalidDDLState.GenWithStack("the partition is in not in public")) - } exchangePartitionInfo := t.Meta().ExchangePartitionInfo if exchangePartitionInfo != nil && exchangePartitionInfo.ExchangePartitionDefID == pid && variable.EnableCheckConstraint.Load() { @@ -1872,9 +1880,6 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t return errors.WithStack(table.ErrRowDoesNotMatchGivenPartitionSet) } } - if t.Meta().Partition.HasTruncatingPartitionID(to) { - return errors.WithStack(dbterror.ErrInvalidDDLState.GenWithStack("the partition is in not in public")) - } exchangePartitionInfo := t.Meta().ExchangePartitionInfo if exchangePartitionInfo != nil && exchangePartitionInfo.ExchangePartitionDefID == to && variable.EnableCheckConstraint.Load() { diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index f756ac867fc2e..cff833538f04b 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -871,6 +871,8 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, txn kv.Transaction, r _, err = txn.Get(ctx, key) } if err == nil { + // If Global Index and reorganization truncate/drop partition, old partition, + // Accept and set Assertion key to kv.SetAssertUnknown for overwrite instead dupErr := getDuplicateError(t.Meta(), recordID, r) return recordID, dupErr } else if !kv.ErrNotExist.Equal(err) { diff --git a/pkg/tablecodec/tablecodec.go b/pkg/tablecodec/tablecodec.go index eadc777d27074..ea888ccbb772d 100644 --- a/pkg/tablecodec/tablecodec.go +++ b/pkg/tablecodec/tablecodec.go @@ -946,7 +946,7 @@ func decodeIndexKvOldCollation(key, value []byte, hdStatus HandleStatus, buf []b } } else { // In unique int handle index. - handle = decodeIntHandleInIndexValue(value) + handle = DecodeIntHandleInIndexValue(value) resultValues, err = reEncodeHandleTo(handle, hdStatus == HandleIsUnsigned, buf, resultValues) if err != nil { return nil, errors.Trace(err) @@ -1025,11 +1025,11 @@ func decodeHandleInIndexKey(keySuffix []byte) (kv.Handle, error) { // DecodeHandleInIndexValue decodes handle in unqiue index value. func DecodeHandleInIndexValue(value []byte) (handle kv.Handle, err error) { if len(value) <= MaxOldEncodeValueLen { - return decodeIntHandleInIndexValue(value), nil + return DecodeIntHandleInIndexValue(value), nil } seg := SplitIndexValue(value) if len(seg.IntHandle) != 0 { - handle = decodeIntHandleInIndexValue(seg.IntHandle) + handle = DecodeIntHandleInIndexValue(seg.IntHandle) } if len(seg.CommonHandle) != 0 { handle, err = kv.NewCommonHandle(seg.CommonHandle) @@ -1047,8 +1047,8 @@ func DecodeHandleInIndexValue(value []byte) (handle kv.Handle, err error) { return handle, nil } -// decodeIntHandleInIndexValue uses to decode index value as int handle id. -func decodeIntHandleInIndexValue(data []byte) kv.Handle { +// DecodeIntHandleInIndexValue uses to decode index value as int handle id. +func DecodeIntHandleInIndexValue(data []byte) kv.Handle { return kv.IntHandle(binary.BigEndian.Uint64(data)) } @@ -1434,7 +1434,7 @@ func (v *TempIndexValueElem) DecodeOne(b []byte) (remain []byte, err error) { hLen := (uint16(b[0]) << 8) + uint16(b[1]) b = b[2:] if hLen == idLen { - v.Handle = decodeIntHandleInIndexValue(b[:idLen]) + v.Handle = DecodeIntHandleInIndexValue(b[:idLen]) } else { v.Handle, _ = kv.NewCommonHandle(b[:hLen]) } @@ -1905,7 +1905,7 @@ func decodeIndexKvGeneral(key, value []byte, colsLen int, hdStatus HandleStatus, if segs.IntHandle != nil { // In unique int handle index. - handle = decodeIntHandleInIndexValue(segs.IntHandle) + handle = DecodeIntHandleInIndexValue(segs.IntHandle) } else if segs.CommonHandle != nil { // In unique common handle index. handle, err = decodeHandleInIndexKey(segs.CommonHandle) From 291a2e970569e2f7b04b54dbd06e440585351775 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 26 Nov 2024 04:48:53 +0100 Subject: [PATCH 2/2] *: Truncate partition failpoint tests (#56029) ref pingcap/tidb#55819 --- pkg/ddl/partition.go | 37 ++- pkg/ddl/placement_policy_test.go | 1 + pkg/ddl/reorg.go | 4 +- pkg/ddl/rollingback.go | 39 +++- pkg/ddl/tests/partition/BUILD.bazel | 1 + pkg/ddl/tests/partition/db_partition_test.go | 5 +- .../tests/partition/error_injection_test.go | 211 ++++++++++++++++++ pkg/ddl/tests/partition/multi_domain_test.go | 20 +- .../tests/partition/reorg_partition_test.go | 9 +- pkg/ddl/tests/tiflash/ddl_tiflash_test.go | 6 +- pkg/meta/model/job.go | 4 +- 11 files changed, 310 insertions(+), 27 deletions(-) create mode 100644 pkg/ddl/tests/partition/error_injection_test.go diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 635c05290ef03..43bcd1a785001 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -2507,6 +2507,18 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i return ver, errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned) } + if job.IsRollingback() { + return convertTruncateTablePartitionJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob, tblInfo) + } + + failpoint.Inject("truncatePartCancel1", func(val failpoint.Value) { + if val.(bool) { + job.State = model.JobStateCancelled + err = errors.New("Injected error by truncatePartCancel1") + failpoint.Return(ver, err) + } + }) + var oldDefinitions []model.PartitionDefinition var newDefinitions []model.PartitionDefinition @@ -2522,11 +2534,19 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i pi.DDLState = job.SchemaState return updateVersionAndTableInfo(jobCtx, job, tblInfo, true) case model.StateWriteOnly: + // We can still rollback here, since we have not yet started to write to the new partitions! oldDefinitions, newDefinitions, err = replaceTruncatePartitions(job, jobCtx.metaMut, tblInfo, oldIDs, newIDs) if err != nil { return ver, errors.Trace(err) } preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newDefinitions) + failpoint.Inject("truncatePartFail1", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + err = errors.New("Injected error by truncatePartFail1") + failpoint.Return(ver, err) + } + }) // This work as a flag to ignore Global Index entries from the old partitions! // Used in IDsInDDLToIgnore() for filtering old partitions from // the global index @@ -2554,6 +2574,13 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i if err != nil || !done { return ver, errors.Trace(err) } + failpoint.Inject("truncatePartFail2", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + err = errors.New("Injected error by truncatePartFail2") + failpoint.Return(ver, err) + } + }) // For the truncatePartitionEvent oldDefinitions = pi.DroppingDefinitions newDefinitions = make([]model.PartitionDefinition, 0, len(oldIDs)) @@ -2562,21 +2589,25 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i newDef.ID = newIDs[i] newDefinitions = append(newDefinitions, newDef) } - // TODO: Test injecting failure pi.DroppingDefinitions = nil pi.NewPartitionIDs = nil pi.DDLState = model.StateNone pi.DDLAction = model.ActionNone + failpoint.Inject("truncatePartFail3", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + err = errors.New("Injected error by truncatePartFail3") + failpoint.Return(ver, err) + } + }) // used by ApplyDiff in updateSchemaVersion args.ShouldUpdateAffectedPartitions = true ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) } - // Finish this job. - job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) truncatePartitionEvent := notifier.NewTruncatePartitionEvent( tblInfo, &model.PartitionInfo{Definitions: newDefinitions}, diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 20ed4212aa532..3e250f096824f 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -1956,6 +1956,7 @@ func TestTruncateTablePartitionWithPlacement(t *testing.T) { " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PLACEMENT POLICY=`p3` */,\n" + " PARTITION `p3` VALUES LESS THAN (100000))")) + dom.Reload() checkExistTableBundlesInPD(t, dom, "test", "tp") checkWaitingGCPartitionBundlesInPD(t, dom, checkOldPartitions) diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index ff2fae066f2de..40201d2e4b040 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -932,8 +932,8 @@ func getReorgInfo(ctx *ReorgContext, jobCtx *jobContext, rh *reorgHandler, job * zap.String("startKey", hex.EncodeToString(start)), zap.String("endKey", hex.EncodeToString(end))) - failpoint.Inject("errorUpdateReorgHandle", func() (*reorgInfo, error) { - return &info, errors.New("occur an error when update reorg handle") + failpoint.Inject("errorUpdateReorgHandle", func() { + failpoint.Return(&info, errors.New("occur an error when update reorg handle")) }) err = rh.InitDDLReorgHandle(job, start, end, pid, elements[0]) if err != nil { diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index 5270437a9b857..2960f109b669c 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -316,6 +316,41 @@ func rollingbackExchangeTablePartition(jobCtx *jobContext, job *model.Job) (ver return ver, errors.Trace(err) } +func rollingbackTruncateTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, err error) { + tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + + return convertTruncateTablePartitionJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob, tblInfo) +} + +func convertTruncateTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { + if !job.IsRollbackable() { + // Only Original state and StateWrite can be rolled back, otherwise new partitions + // may have been used and new data would get lost. + // So we must continue to roll forward! + job.State = model.JobStateRunning + return ver, nil + } + pi := tblInfo.Partition + if len(pi.NewPartitionIDs) != 0 || pi.DDLAction != model.ActionNone || pi.DDLState != model.StateNone { + // Rollback the changes, note that no new partitions has been used yet! + // so only metadata rollback and we can cancel the DDL + tblInfo.Partition.NewPartitionIDs = nil + tblInfo.Partition.DDLAction = model.ActionNone + tblInfo.Partition.DDLState = model.StateNone + ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + return ver, nil + } + // No change yet, just cancel the job. + job.State = model.JobStateCancelled + return ver, errors.Trace(otherwiseErr) +} + func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { addingDefinitions := tblInfo.Partition.AddingDefinitions partNames := make([]string, 0, len(addingDefinitions)) @@ -590,8 +625,10 @@ func convertJob2RollbackJob(w *worker, jobCtx *jobContext, job *model.Job) (ver ver, err = rollingbackTruncateTable(jobCtx, job) case model.ActionModifyColumn: ver, err = rollingbackModifyColumn(jobCtx, job) - case model.ActionDropForeignKey, model.ActionTruncateTablePartition: + case model.ActionDropForeignKey: ver, err = cancelOnlyNotHandledJob(job, model.StatePublic) + case model.ActionTruncateTablePartition: + ver, err = rollingbackTruncateTablePartition(jobCtx, job) case model.ActionRebaseAutoID, model.ActionShardRowID, model.ActionAddForeignKey, model.ActionRenameTable, model.ActionRenameTables, model.ActionModifyTableCharsetAndCollate, diff --git a/pkg/ddl/tests/partition/BUILD.bazel b/pkg/ddl/tests/partition/BUILD.bazel index 7f65dece9dc1e..8f6564a732171 100644 --- a/pkg/ddl/tests/partition/BUILD.bazel +++ b/pkg/ddl/tests/partition/BUILD.bazel @@ -5,6 +5,7 @@ go_test( timeout = "short", srcs = [ "db_partition_test.go", + "error_injection_test.go", "main_test.go", "multi_domain_test.go", "placement_test.go", diff --git a/pkg/ddl/tests/partition/db_partition_test.go b/pkg/ddl/tests/partition/db_partition_test.go index e50a2d0a2eb5b..18f1999063d84 100644 --- a/pkg/ddl/tests/partition/db_partition_test.go +++ b/pkg/ddl/tests/partition/db_partition_test.go @@ -3679,15 +3679,18 @@ func TestTruncateNumberOfPhases(t *testing.T) { tk.MustExec(`create table t (a int primary key , b varchar(255)) partition by hash(a) partitions 3`) ctx := tk.Session() dom := domain.GetDomain(ctx) + dom.Reload() schemaVersion := dom.InfoSchema().SchemaMetaVersion() tk.MustExec(`insert into t values (1,1),(2,2),(3,3)`) tk.MustExec(`alter table t truncate partition p1`) - // Without global index, truncate partition should be a single state change + dom.Reload() + // Without global index, truncate partition could be a single state change require.Equal(t, int64(4), dom.InfoSchema().SchemaMetaVersion()-schemaVersion) tk.MustExec(`drop table t`) tk.MustExec(`create table t (a int primary key , b varchar(255), unique key (b) global) partition by hash(a) partitions 3`) schemaVersion = dom.InfoSchema().SchemaMetaVersion() tk.MustExec(`insert into t values (1,1),(2,2),(3,3)`) tk.MustExec(`alter table t truncate partition p1`) + dom.Reload() require.Equal(t, int64(4), dom.InfoSchema().SchemaMetaVersion()-schemaVersion) } diff --git a/pkg/ddl/tests/partition/error_injection_test.go b/pkg/ddl/tests/partition/error_injection_test.go new file mode 100644 index 0000000000000..dac8aca75e5bd --- /dev/null +++ b/pkg/ddl/tests/partition/error_injection_test.go @@ -0,0 +1,211 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package partition + +import ( + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/external" + "github.com/stretchr/testify/require" +) + +type InjectedTest struct { + Name string + Recoverable bool + Rollback bool +} + +type FailureTest struct { + FailpointPrefix string + Tests []InjectedTest +} + +var truncateTests = FailureTest{ + FailpointPrefix: "truncatePart", + Tests: []InjectedTest{ + { + Name: "Cancel1", + Recoverable: false, + Rollback: true, + }, + { + Name: "Fail1", + Recoverable: true, + Rollback: true, + }, + { + Name: "Fail2", + Recoverable: true, + Rollback: false, + }, + { + Name: "Fail3", + Recoverable: true, + Rollback: false, + }, + }, +} + +func TestTruncatePartitionListFailuresWithGlobalIndex(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered global, b int not null, c varchar(255), unique index (c) global) partition by list(b) ( + partition p0 values in (1,2,3), + partition p1 values in (4,5,6), + partition p2 values in (7,8,9))` + alter := `alter table t truncate partition p0,p2` + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + `update t set a = 7, b = 7, c = 7 where a = 1`, + `update t set b = 3, c = 3 where c = 4`, + `delete from t where a = 8`, + `delete from t where b = 2`, + } + beforeResult := testkit.Rows("4 3 3", "6 6 6", "7 7 7", "9 9 9") + afterDML := []string{ + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + `update t set a = 2, b = 2, c = 2 where a = 1`, + `update t set a = 1, b = 1, c = 1 where c = 6`, + `update t set a = 6, b = 6 where a = 9`, + `delete from t where a = 5`, + `delete from t where b = 3`, + } + afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") + afterRecover := testkit.Rows("1 1 1", "2 2 2", "8 8 8") + testDDLWithInjectedErrors(t, truncateTests, create, alter, beforeDML, beforeResult, afterDML, afterResult, afterRecover, "Cancel2") +} + +func TestTruncatePartitionListFailures(t *testing.T) { + create := `create table t (a int unsigned primary key, b int not null, c varchar(255)) partition by list(a) ( + partition p0 values in (1,2,3), + partition p1 values in (4,5,6), + partition p2 values in (7,8,9))` + alter := `alter table t truncate partition p0,p2` + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + `update t set a = 7, b = 7, c = 7 where a = 1`, + `update t set b = 3, c = 3, a = 3 where c = 4`, + `delete from t where a = 8`, + `delete from t where b = 2`, + } + beforeResult := testkit.Rows("3 3 3", "6 6 6", "7 7 7", "9 9 9") + afterDML := []string{ + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + `update t set a = 2, b = 2, c = 2 where a = 1`, + `update t set a = 1, b = 1, c = 1 where c = 6`, + `update t set a = 6, b = 6, c = 6 where a = 9`, + `delete from t where a = 5`, + `delete from t where b = 3`, + } + afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 6", "7 7 7", "8 8 8") + afterRecover := testkit.Rows("1 1 1", "2 2 2", "8 8 8") + testDDLWithInjectedErrors(t, truncateTests, create, alter, beforeDML, beforeResult, afterDML, afterResult, afterRecover, "Fail1", "Fail2", "Fail3") +} + +func testDDLWithInjectedErrors(t *testing.T, tests FailureTest, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterRollback, afterRecover [][]any, skipTests ...string) { +TEST: + for _, test := range tests.Tests { + for _, skip := range skipTests { + if test.Name == skip { + continue TEST + } + } + if test.Recoverable { + runOneTest(t, test, true, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRecover) + } + if test.Rollback { + runOneTest(t, test, false, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRollback) + } + } +} + +func runOneTest(t *testing.T, test InjectedTest, recoverable bool, failpointName, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterResult [][]any) { + name := failpointName + test.Name + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_global_index=true") + defer func() { + tk.MustExec("set tidb_enable_global_index=default") + }() + oldWaitTimeWhenErrorOccurred := ddl.WaitTimeWhenErrorOccurred + defer func() { + ddl.WaitTimeWhenErrorOccurred = oldWaitTimeWhenErrorOccurred + }() + ddl.WaitTimeWhenErrorOccurred = 0 + tk.MustExec(createSQL) + for _, sql := range beforeDML { + tk.MustExec(sql + ` /* ` + name + ` */`) + } + tk.MustQuery(`select * from t /* ` + name + ` */`).Sort().Check(beforeResult) + tOrg := external.GetTableByName(t, tk, "test", "t") + idxIDs := make([]int64, 0, len(tOrg.Meta().Indices)) + for _, idx := range tOrg.Meta().Indices { + idxIDs = append(idxIDs, idx.ID) + } + pids := make([]int64, 0, len(tOrg.Meta().Partition.Definitions)) + for _, def := range tOrg.Meta().Partition.Definitions { + pids = append(pids, def.ID) + } + oldCreate := tk.MustQuery(`show create table t`).Rows() + fullName := "github.com/pingcap/tidb/pkg/ddl/" + name + term := "return(true)" + if recoverable { + // test that it should handle recover/retry on error + term = "1*return(true)" + } + require.NoError(t, failpoint.Enable(fullName, term)) + err := tk.ExecToErr(alterSQL + " /* " + name + " */") + require.NoError(t, failpoint.Disable(fullName)) + tt := external.GetTableByName(t, tk, "test", "t") + pi := tt.Meta().Partition + if recoverable { + require.NoError(t, err) + equal := true + for i, pid := range pids { + equal = equal && pid == pi.Definitions[i].ID + } + require.False(t, equal, name) + return + } + require.Error(t, err, "failpoint "+name) + require.ErrorContains(t, err, "Injected error by "+name) + tk.MustQuery(`show create table t /* ` + name + ` */`).Check(oldCreate) + require.Equal(t, len(tOrg.Meta().Partition.Definitions), len(pi.Definitions), name) + require.Equal(t, 0, len(pi.AddingDefinitions), name) + require.Equal(t, 0, len(pi.DroppingDefinitions), name) + require.Equal(t, 0, len(pi.NewPartitionIDs), name) + require.Equal(t, len(tOrg.Meta().Indices), len(tt.Meta().Indices), name) + for i := range tOrg.Meta().Indices { + require.Equal(t, idxIDs[i], tt.Meta().Indices[i].ID, name) + } + for i, pid := range pids { + require.Equal(t, pid, tt.Meta().Partition.Definitions[i].ID, name) + } + tk.MustExec(`admin check table t /* ` + name + ` */`) + tk.MustExec(`update t set b = 7 where a = 9 /* ` + name + ` */`) + for _, sql := range afterDML { + tk.MustExec(sql + " /* " + name + " */") + } + tk.MustQuery(`select * from t /* ` + name + ` */`).Sort().Check(afterResult) + tk.MustExec(`drop table t /* ` + name + ` */`) + // TODO: Check no rows on new partitions + // TODO: Check TiFlash replicas + // TODO: Check Label rules + // TODO: Check bundles + // TODO: Check autoIDs + // TODO: Check delete_range tables, so no delete request for old partitions in failed alters! +} diff --git a/pkg/ddl/tests/partition/multi_domain_test.go b/pkg/ddl/tests/partition/multi_domain_test.go index f3a21cf6fe862..eb618bd1f1f4f 100644 --- a/pkg/ddl/tests/partition/multi_domain_test.go +++ b/pkg/ddl/tests/partition/multi_domain_test.go @@ -852,16 +852,16 @@ func TestMultiSchemaTruncatePartitionWithGlobalIndex(t *testing.T) { require.NoError(t, err) require.Equal(t, model.StateWriteOnly, tblO.Partition.DDLState) require.Equal(t, tblNO.Partition.Definitions[1].ID, tblO.Partition.Definitions[1].ID) - tkNO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") - tkO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + tkNO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.") + tkO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.") case "delete only": // tkNO is seeing state write only, so still can access the dropped partition // tkO is seeing state delete only, so cannot see the dropped partition, // but must still write to the shared global indexes. // So they will get errors on the same entries in the global index. - tkNO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") - tkO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.uk_b'") + tkNO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.") + tkO.MustContainErrMsg(`insert into t values (1,1,"Duplicate key")`, "[kv:1062]Duplicate entry '1' for key 't.") tblNO, err := tkNO.Session().GetInfoSchema().TableInfoByName(pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) require.Equal(t, model.StateWriteOnly, tblNO.Partition.DDLState) @@ -872,8 +872,8 @@ func TestMultiSchemaTruncatePartitionWithGlobalIndex(t *testing.T) { tkNO.MustExec(`insert into t values (21,21,"OK")`) tkNO.MustExec(`insert into t values (23,23,"OK")`) - tkO.MustContainErrMsg(`insert into t values (21,21,"Duplicate key")`, "[kv:1062]Duplicate entry '21' for key 't.uk_b'") - tkO.MustContainErrMsg(`insert into t values (6,23,"Duplicate key")`, "[kv:1062]Duplicate entry '23' for key 't.uk_b'") + tkO.MustContainErrMsg(`insert into t values (21,21,"Duplicate key")`, "[kv:1062]Duplicate entry '21' for key 't.") + tkO.MustContainErrMsg(`insert into t values (6,23,"Duplicate key")`, "[kv:1062]Duplicate entry '") // Primary is not global, so here we can insert into the new partition, without // conflicting to the old one tkO.MustExec(`insert into t values (21,25,"OK")`) @@ -978,10 +978,10 @@ func TestMultiSchemaTruncatePartitionWithGlobalIndex(t *testing.T) { tkO.MustExec(`insert into t values (10,23,"OK")`) tkNO.MustExec(`insert into t values (41,41,"OK")`) tkNO.MustContainErrMsg(`insert into t values (12,25,"Duplicate key")`, "[kv:1062]Duplicate entry '25' for key 't.uk_b'") - tkNO.MustContainErrMsg(`insert into t values (25,25,"Duplicate key")`, "[kv:1062]Duplicate entry '25' for key 't.uk_b'") - tkNO.MustContainErrMsg(`insert into t values (41,27,"Duplicate key")`, "[kv:1062]Duplicate entry '27' for key 't.uk_b'") + tkNO.MustContainErrMsg(`insert into t values (25,25,"Duplicate key")`, "[kv:1062]Duplicate entry '25' for key 't.") + tkNO.MustContainErrMsg(`insert into t values (41,27,"Duplicate key")`, "[kv:1062]Duplicate entry '") tkO.MustExec(`insert into t values (43,43,"OK")`) - tkO.MustContainErrMsg(`insert into t values (44,43,"Duplicate key")`, "[kv:1062]Duplicate entry '43' for key 't.uk_b'") + tkO.MustContainErrMsg(`insert into t values (44,43,"Duplicate key")`, "[kv:1062]Duplicate entry '") tkNO.MustContainErrMsg(`update t set b = 5 where a = 41`, "[kv:1062]Duplicate entry '5' for key 't.uk_b'") tkNO.MustExec(`update t set a = 5 where b = "41"`) require.Equal(t, uint64(1), tkNO.Session().GetSessionVars().StmtCtx.AffectedRows()) @@ -996,7 +996,7 @@ func TestMultiSchemaTruncatePartitionWithGlobalIndex(t *testing.T) { tkO.MustQuery(`select * from t`).Sort().Check(rows) case "none": tkNO.MustExec(`insert into t values (81,81,"OK")`) - tkO.MustContainErrMsg(`insert into t values (81,81,"Duplicate key")`, "[kv:1062]Duplicate entry '81' for key 't.uk_b'") + tkO.MustContainErrMsg(`insert into t values (81,81,"Duplicate key")`, "[kv:1062]Duplicate entry '81' for key 't.") tkNO.MustExec(`insert into t values (85,85,"OK")`) tkO.MustExec(`insert into t values (87,87,"OK")`) rows := tkNO.MustQuery(`select * from t`).Sort().Rows() diff --git a/pkg/ddl/tests/partition/reorg_partition_test.go b/pkg/ddl/tests/partition/reorg_partition_test.go index 41871048893ad..54b883c9eff53 100644 --- a/pkg/ddl/tests/partition/reorg_partition_test.go +++ b/pkg/ddl/tests/partition/reorg_partition_test.go @@ -114,7 +114,7 @@ ROW: } } -func getAllDataForPhysicalTable(t *testing.T, ctx sessionctx.Context, physTable table.PhysicalTable) allTableData { +func getAllDataForTableID(t *testing.T, ctx sessionctx.Context, tableID int64) allTableData { require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx)) txn, err := ctx.Txn(true) require.NoError(t, err) @@ -128,8 +128,7 @@ func getAllDataForPhysicalTable(t *testing.T, ctx sessionctx.Context, physTable vals: make([][]byte, 0), tp: make([]string, 0), } - pid := physTable.GetPhysicalID() - prefix := tablecodec.EncodeTablePrefix(pid) + prefix := tablecodec.EncodeTablePrefix(tableID) it, err := txn.Iter(prefix, nil) require.NoError(t, err) for it.Valid() { @@ -141,7 +140,7 @@ func getAllDataForPhysicalTable(t *testing.T, ctx sessionctx.Context, physTable if tablecodec.IsRecordKey(it.Key()) { all.tp = append(all.tp, "Record") tblID, kv, _ := tablecodec.DecodeRecordKey(it.Key()) - require.Equal(t, pid, tblID) + require.Equal(t, tableID, tblID) vals, _ := tablecodec.DecodeValuesBytesToStrings(it.Value()) logutil.DDLLogger().Info("Record", zap.Int64("pid", tblID), @@ -736,7 +735,7 @@ func getNumRowsFromPartitionDefs(t *testing.T, tk *testkit.TestKit, tbl table.Ta require.NotNil(t, pt) cnt := 0 for _, def := range defs { - data := getAllDataForPhysicalTable(t, ctx, pt.GetPartition(def.ID)) + data := getAllDataForTableID(t, ctx, def.ID) require.True(t, len(data.keys) == len(data.vals)) require.True(t, len(data.keys) == len(data.tp)) for _, s := range data.tp { diff --git a/pkg/ddl/tests/tiflash/ddl_tiflash_test.go b/pkg/ddl/tests/tiflash/ddl_tiflash_test.go index 7d8472ae956d7..081670bba0133 100644 --- a/pkg/ddl/tests/tiflash/ddl_tiflash_test.go +++ b/pkg/ddl/tests/tiflash/ddl_tiflash_test.go @@ -411,7 +411,6 @@ func TestTiFlashFailTruncatePartition(t *testing.T) { s, teardown := createTiFlashContext(t) defer teardown() tk := testkit.NewTestKit(t, s.store) - // TODO: Fix this test, when fixing rollback in https://github.com/pingcap/tidb/pull/56029 tk.MustExec("set @@global.tidb_ddl_error_count_limit = 3") tk.MustExec("use test") @@ -419,15 +418,14 @@ func TestTiFlashFailTruncatePartition(t *testing.T) { tk.MustExec("create table ddltiflash(i int not null, s varchar(255)) partition by range (i) (partition p0 values less than (10), partition p1 values less than (20))") tk.MustExec("alter table ddltiflash set tiflash replica 1") - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/FailTiFlashTruncatePartition", `5*return`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/FailTiFlashTruncatePartition", `return`)) defer func() { failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/FailTiFlashTruncatePartition") }() time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) tk.MustExec("insert into ddltiflash values(1, 'abc'), (11, 'def')") - tk.MustExec("alter table ddltiflash truncate partition p1") - //tk.MustGetErrMsg("alter table ddltiflash truncate partition p1", "[ddl:-1]enforced error") + tk.MustGetErrMsg("alter table ddltiflash truncate partition p1", "[ddl:-1]DDL job rollback, error msg: enforced error") time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash") } diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index a2dc4d4d3e35a..3876f217bd29c 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -828,8 +828,10 @@ func (job *Job) IsRollbackable() bool { case ActionAddTablePartition: return job.SchemaState == StateNone || job.SchemaState == StateReplicaOnly case ActionDropColumn, ActionDropSchema, ActionDropTable, ActionDropSequence, - ActionDropForeignKey, ActionDropTablePartition, ActionTruncateTablePartition: + ActionDropForeignKey, ActionDropTablePartition: return job.SchemaState == StatePublic + case ActionTruncateTablePartition: + return job.SchemaState == StatePublic || job.SchemaState == StateWriteOnly case ActionRebaseAutoID, ActionShardRowID, ActionTruncateTable, ActionAddForeignKey, ActionRenameTable, ActionRenameTables, ActionModifyTableCharsetAndCollate,