Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Exchange partition rollback #45877

Merged
merged 16 commits into from
Aug 10, 2023
65 changes: 65 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3282,6 +3282,71 @@ func TestExchangePartitionTableCompatiable(t *testing.T) {
require.NoError(t, err)
}

func TestExchangePartitionValidation(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

dbName := "ExchangeValidation"
tk.MustExec(`create schema ` + dbName)
tk.MustExec(`use ` + dbName)
tk.MustExec(`CREATE TABLE t1 (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name))`)

tk.MustExec(`CREATE TABLE t1p (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
)
PARTITION BY RANGE COLUMNS(d)
(PARTITION p202307 VALUES LESS THAN ('2023-08-01'),
PARTITION p202308 VALUES LESS THAN ('2023-09-01'),
PARTITION p202309 VALUES LESS THAN ('2023-10-01'),
PARTITION p202310 VALUES LESS THAN ('2023-11-01'),
PARTITION p202311 VALUES LESS THAN ('2023-12-01'),
PARTITION p202312 VALUES LESS THAN ('2024-01-01'),
PARTITION pfuture VALUES LESS THAN (MAXVALUE))`)

tk.MustExec(`insert into t1 values ("2023-08-06","0000")`)
tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1 with validation`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`insert into t1 values ("2023-08-06","0001")`)
}

func TestExchangePartitionPlacementPolicy(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can add some cases in TestExchangePartitionWithPlacement in the file placement_policy_test.go so that you can check some placement bundle settings are also set correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I just added more complete tests there.

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec(`create schema ExchangePartWithPolicy`)
tk.MustExec(`use ExchangePartWithPolicy`)
tk.MustExec(`CREATE PLACEMENT POLICY rule1 FOLLOWERS=1`)
tk.MustExec(`CREATE PLACEMENT POLICY rule2 FOLLOWERS=2`)
tk.MustExec(`CREATE TABLE t1 (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
) PLACEMENT POLICY="rule1"`)

tk.MustExec(`CREATE TABLE t1p (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
) PLACEMENT POLICY="rule2"
PARTITION BY RANGE COLUMNS(d)
(PARTITION p202307 VALUES LESS THAN ('2023-08-01'),
PARTITION p202308 VALUES LESS THAN ('2023-09-01'),
PARTITION p202309 VALUES LESS THAN ('2023-10-01'),
PARTITION p202310 VALUES LESS THAN ('2023-11-01'),
PARTITION p202311 VALUES LESS THAN ('2023-12-01'),
PARTITION p202312 VALUES LESS THAN ('2024-01-01'),
PARTITION pfuture VALUES LESS THAN (MAXVALUE))`)

tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1`,
"[ddl:1736]Tables have different definitions")
tk.MustExec(`insert into t1 values ("2023-08-06","0000")`)
}

func TestExchangePartitionHook(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
1 change: 0 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4746,7 +4746,6 @@ func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error {
return errors.Trace(dbterror.ErrPartitionExchangeForeignKey.GenWithStackByArgs(nt.Name))
}

// NOTE: if nt is temporary table, it should be checked
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was already checked, in checkTableDefCompatible()

return nil
}

Expand Down
36 changes: 20 additions & 16 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,24 +1372,28 @@
diff.OldSchemaID = oldSchemaIDs[0]
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
var (
ptSchemaID int64
ptTableID int64
partName string
withValidation bool
)
err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the partitionID as diff.TableID most likely caused #45920 !

if err != nil {
return 0, errors.Trace(err)
}
diff.OldTableID = job.TableID
affects := make([]*model.AffectedOption, 1)
affects[0] = &model.AffectedOption{
SchemaID: ptSchemaID,
TableID: ptTableID,
OldTableID: ptTableID,
diff.OldSchemaID = job.SchemaID
if job.SchemaState != model.StatePublic {
diff.TableID = job.TableID
diff.SchemaID = job.SchemaID
} else {
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64 // Not needed, will reload the whole table
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}

Check warning on line 1393 in ddl/ddl_worker.go

View check run for this annotation

Codecov / codecov/patch

ddl/ddl_worker.go#L1392-L1393

Added lines #L1392 - L1393 were not covered by tests
diff.SchemaID = ptSchemaID
diff.TableID = ptTableID
}
diff.AffectedOpts = affects
case model.ActionTruncateTablePartition:
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
Expand Down
115 changes: 59 additions & 56 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2380,6 +2380,9 @@
return ver, errors.Trace(err)
}

if job.IsRollingback() {
return rollbackExchangeTablePartition(d, t, job, nt)
}
pt, err := getTableInfo(t, ptID, ptSchemaID)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
Expand All @@ -2388,35 +2391,57 @@
return ver, errors.Trace(err)
}

if pt.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State)
}

err = checkExchangePartition(pt, nt)
index, partDef, err := getPartitionDef(pt, partName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if job.SchemaState == model.StateNone {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a running Exchange Partition from previous version, there is no harm in redoing this step and set job.SchemaState and reset ExchangePartitionInfo.

if pt.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State)
}

Check warning on line 2402 in ddl/partition.go

View check run for this annotation

Codecov / codecov/patch

ddl/partition.go#L2400-L2402

Added lines #L2400 - L2402 were not covered by tests
err = checkExchangePartition(pt, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Check warning on line 2407 in ddl/partition.go

View check run for this annotation

Codecov / codecov/patch

ddl/partition.go#L2405-L2407

Added lines #L2405 - L2407 were not covered by tests

err = checkTableDefCompatible(pt, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
err = checkTableDefCompatible(pt, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef)
if err != nil {
if partDef.PlacementPolicyRef != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// Also check if it uses the table level default
err = checkExchangePartitionPlacementPolicy(t, pt.PlacementPolicyRef, nt.PlacementPolicyRef)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}

index, _, err := getPartitionDef(pt, partName)
if err != nil {
return ver, errors.Trace(err)
}
if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag {
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionFlag: true,
ExchangePartitionID: ptID,
ExchangePartitionDefID: defID,
}
// We need an interim schema version,
// so there are no non-matching rows inserted
// into the table using the schema version
// before the exchange is made.
job.SchemaState = model.StateWriteOnly
return updateVersionAndTableInfoWithCheck(d, t, job, nt, true)
}
// From now on, nt (the non-partitioned table) has
// ExchangePartitionInfo set, meaning it is restricted
// to only allow writes that would match the
// partition to be exchange with.
// So we need to rollback that change, instead of just cancelling.

if d.lease > 0 {
delayForAsyncCommit()
Expand All @@ -2425,27 +2450,19 @@
if withValidation {
err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name)
if err != nil {
job.State = model.JobStateCancelled
job.State = model.JobStateRollingback
return ver, errors.Trace(err)
}
}

// partition table auto IDs.
ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get()
if err != nil {
job.State = model.JobStateCancelled
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let the runDDLJob etc handle the error, by retrying instead of cancelling or rolling back.

return ver, errors.Trace(err)
}
// non-partition table auto IDs.
ntAutoIDs, err := t.GetAutoIDAccessors(job.SchemaID, nt.ID).Get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

_, partDef, err := getPartitionDef(pt, partName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -2458,35 +2475,32 @@
}
}

// exchange table meta id
partDef.ID, nt.ID = nt.ID, partDef.ID

err = t.UpdateTable(ptSchemaID, pt)
// Recreate non-partition table meta info,
// by first delete it with the old table id
err = t.DropTableOrView(job.SchemaID, nt.ID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) {
if val.(bool) {
job.State = model.JobStateCancelled
failpoint.Return(ver, errors.New("occur an error after updating partition id"))
}
})
// exchange table meta id
partDef.ID, nt.ID = nt.ID, partDef.ID

// recreate non-partition table meta info
err = t.DropTableOrView(job.SchemaID, partDef.ID)
err = t.UpdateTable(ptSchemaID, pt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = t.CreateTableOrView(job.SchemaID, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error after updating partition id"))
}
})

// Set both tables to the maximum auto IDs between normal table and partitioned table.
newAutoIDs := meta.AutoIDGroup{
RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID),
Expand All @@ -2495,12 +2509,10 @@
}
err = t.GetAutoIDAccessors(ptSchemaID, pt.ID).Put(newAutoIDs)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
err = t.GetAutoIDAccessors(job.SchemaID, nt.ID).Put(newAutoIDs)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -2519,23 +2531,15 @@
}
})

err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// the follow code is a swap function for rules of two partitions
// though partitions has exchanged their ID, swap still take effect

bundles, err := bundlesForExchangeTablePartition(t, job, pt, partDef, nt)
bundles, err := bundlesForExchangeTablePartition(t, pt, partDef, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

Expand All @@ -2544,7 +2548,6 @@

rules, err := infosync.GetLabelRules(context.TODO(), []string{ntrID, ptrID})
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to get PD the label rules")
}

Expand All @@ -2571,10 +2574,10 @@
patch := label.NewRulePatch(setRules, deleteRules)
err = infosync.UpdateLabelRules(context.TODO(), patch)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

job.SchemaState = model.StatePublic
nt.ExchangePartitionInfo = nil
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true)
if err != nil {
Expand Down Expand Up @@ -3223,7 +3226,7 @@
return nil
}

func bundlesForExchangeTablePartition(t *meta.Meta, _ *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) {
func bundlesForExchangeTablePartition(t *meta.Meta, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) {
bundles := make([]*placement.Bundle, 0, 3)

ptBundle, err := placement.NewTableBundle(t, pt)
Expand Down
Loading