-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: Exchange partition rollback #45877
Changes from 13 commits
638e273
7d80c9d
00bd267
d324142
50c666b
bab23d9
bb371a4
4c27d02
1b5c7c6
b42cb1c
6dbdd81
98584ec
04f7908
1aaf9eb
8402932
ccae6d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4746,7 +4746,6 @@ func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error { | |
return errors.Trace(dbterror.ErrPartitionExchangeForeignKey.GenWithStackByArgs(nt.Name)) | ||
} | ||
|
||
// NOTE: if nt is temporary table, it should be checked | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was already checked, in |
||
return nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1372,24 +1372,28 @@ | |
diff.OldSchemaID = oldSchemaIDs[0] | ||
diff.AffectedOpts = affects | ||
case model.ActionExchangeTablePartition: | ||
var ( | ||
ptSchemaID int64 | ||
ptTableID int64 | ||
partName string | ||
withValidation bool | ||
) | ||
err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the partitionID as diff.TableID most likely caused #45920 ! |
||
if err != nil { | ||
return 0, errors.Trace(err) | ||
} | ||
diff.OldTableID = job.TableID | ||
affects := make([]*model.AffectedOption, 1) | ||
affects[0] = &model.AffectedOption{ | ||
SchemaID: ptSchemaID, | ||
TableID: ptTableID, | ||
OldTableID: ptTableID, | ||
diff.OldSchemaID = job.SchemaID | ||
if job.SchemaState != model.StatePublic { | ||
diff.TableID = job.TableID | ||
diff.SchemaID = job.SchemaID | ||
} else { | ||
// Update the partitioned table (it is only done in the last state) | ||
var ( | ||
ptSchemaID int64 | ||
ptTableID int64 | ||
ptDefID int64 // Not needed, will reload the whole table | ||
partName string // Not used | ||
withValidation bool // Not used | ||
) | ||
// See ddl.ExchangeTablePartition | ||
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) | ||
if err != nil { | ||
return 0, errors.Trace(err) | ||
} | ||
diff.SchemaID = ptSchemaID | ||
diff.TableID = ptTableID | ||
} | ||
diff.AffectedOpts = affects | ||
case model.ActionTruncateTablePartition: | ||
diff.TableID = job.TableID | ||
if len(job.CtxVars) > 0 { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2380,6 +2380,9 @@ | |
return ver, errors.Trace(err) | ||
} | ||
|
||
if job.IsRollingback() { | ||
return rollbackExchangeTablePartition(d, t, job, nt) | ||
} | ||
pt, err := getTableInfo(t, ptID, ptSchemaID) | ||
if err != nil { | ||
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { | ||
|
@@ -2388,35 +2391,49 @@ | |
return ver, errors.Trace(err) | ||
} | ||
|
||
if pt.State != model.StatePublic { | ||
job.State = model.JobStateCancelled | ||
return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) | ||
} | ||
|
||
err = checkExchangePartition(pt, nt) | ||
index, partDef, err := getPartitionDef(pt, partName) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
if job.SchemaState == model.StateNone { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is a running Exchange Partition from previous version, there is no harm in redoing this step and set job.SchemaState and reset ExchangePartitionInfo. |
||
if pt.State != model.StatePublic { | ||
job.State = model.JobStateCancelled | ||
return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) | ||
} | ||
err = checkExchangePartition(pt, nt) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
err = checkTableDefCompatible(pt, nt) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
err = checkTableDefCompatible(pt, nt) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
err = checkExchangePartitionPlacementPolicy(t, nt.PlacementPolicyRef, pt.PlacementPolicyRef, partDef.PlacementPolicyRef) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
index, _, err := getPartitionDef(pt, partName) | ||
if err != nil { | ||
return ver, errors.Trace(err) | ||
} | ||
if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { | ||
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ | ||
ExchangePartitionFlag: true, | ||
ExchangePartitionID: ptID, | ||
ExchangePartitionDefID: defID, | ||
} | ||
// We need an interim schema version, | ||
// so there are no non-matching rows inserted | ||
// into the table using the schema version | ||
// before the exchange is made. | ||
job.SchemaState = model.StateWriteOnly | ||
return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) | ||
} | ||
// From now on, nt (the non-partitioned table) has | ||
// ExchangePartitionInfo set, meaning it is restricted | ||
// to only allow writes that would match the | ||
// partition to be exchange with. | ||
// So we need to rollback that change, instead of just cancelling. | ||
|
||
if d.lease > 0 { | ||
delayForAsyncCommit() | ||
|
@@ -2425,27 +2442,19 @@ | |
if withValidation { | ||
err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
job.State = model.JobStateRollingback | ||
return ver, errors.Trace(err) | ||
} | ||
} | ||
|
||
// partition table auto IDs. | ||
ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let the runDDLJob etc handle the error, by retrying instead of cancelling or rolling back. |
||
return ver, errors.Trace(err) | ||
} | ||
// non-partition table auto IDs. | ||
ntAutoIDs, err := t.GetAutoIDAccessors(job.SchemaID, nt.ID).Get() | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
_, partDef, err := getPartitionDef(pt, partName) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
|
@@ -2458,35 +2467,32 @@ | |
} | ||
} | ||
|
||
// exchange table meta id | ||
partDef.ID, nt.ID = nt.ID, partDef.ID | ||
|
||
err = t.UpdateTable(ptSchemaID, pt) | ||
// Recreate non-partition table meta info, | ||
// by first delete it with the old table id | ||
err = t.DropTableOrView(job.SchemaID, nt.ID) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { | ||
if val.(bool) { | ||
job.State = model.JobStateCancelled | ||
failpoint.Return(ver, errors.New("occur an error after updating partition id")) | ||
} | ||
}) | ||
// exchange table meta id | ||
partDef.ID, nt.ID = nt.ID, partDef.ID | ||
|
||
// recreate non-partition table meta info | ||
err = t.DropTableOrView(job.SchemaID, partDef.ID) | ||
err = t.UpdateTable(ptSchemaID, pt) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
err = t.CreateTableOrView(job.SchemaID, nt) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { | ||
if val.(bool) { | ||
failpoint.Return(ver, errors.New("occur an error after updating partition id")) | ||
} | ||
}) | ||
|
||
// Set both tables to the maximum auto IDs between normal table and partitioned table. | ||
newAutoIDs := meta.AutoIDGroup{ | ||
RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID), | ||
|
@@ -2495,12 +2501,10 @@ | |
} | ||
err = t.GetAutoIDAccessors(ptSchemaID, pt.ID).Put(newAutoIDs) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
err = t.GetAutoIDAccessors(job.SchemaID, nt.ID).Put(newAutoIDs) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
|
@@ -2519,23 +2523,15 @@ | |
} | ||
}) | ||
|
||
err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
// the follow code is a swap function for rules of two partitions | ||
// though partitions has exchanged their ID, swap still take effect | ||
|
||
bundles, err := bundlesForExchangeTablePartition(t, job, pt, partDef, nt) | ||
bundles, err := bundlesForExchangeTablePartition(t, pt, partDef, nt) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules") | ||
} | ||
|
||
|
@@ -2544,7 +2540,6 @@ | |
|
||
rules, err := infosync.GetLabelRules(context.TODO(), []string{ntrID, ptrID}) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return 0, errors.Wrapf(err, "failed to get PD the label rules") | ||
} | ||
|
||
|
@@ -2571,10 +2566,10 @@ | |
patch := label.NewRulePatch(setRules, deleteRules) | ||
err = infosync.UpdateLabelRules(context.TODO(), patch) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Wrapf(err, "failed to notify PD the label rules") | ||
} | ||
|
||
job.SchemaState = model.StatePublic | ||
nt.ExchangePartitionInfo = nil | ||
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) | ||
if err != nil { | ||
|
@@ -3223,7 +3218,7 @@ | |
return nil | ||
} | ||
|
||
func bundlesForExchangeTablePartition(t *meta.Meta, _ *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { | ||
func bundlesForExchangeTablePartition(t *meta.Meta, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { | ||
bundles := make([]*placement.Bundle, 0, 3) | ||
|
||
ptBundle, err := placement.NewTableBundle(t, pt) | ||
|
@@ -3321,16 +3316,21 @@ | |
return nil | ||
} | ||
|
||
func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) error { | ||
if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { | ||
func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPPRef, ptPPRef, partPPRef *model.PolicyRefInfo) error { | ||
partitionPPRef := partPPRef | ||
if partitionPPRef == nil { | ||
partitionPPRef = ptPPRef | ||
} | ||
|
||
if ntPPRef == nil && partitionPPRef == nil { | ||
return nil | ||
} | ||
if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { | ||
if ntPPRef == nil || partitionPPRef == nil { | ||
return dbterror.ErrTablesDifferentMetadata | ||
} | ||
|
||
ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) | ||
ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) | ||
ptPlacementPolicyInfo, _ := getPolicyInfo(t, partitionPPRef.ID) | ||
ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPPRef.ID) | ||
if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { | ||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can add some cases in
TestExchangePartitionWithPlacement
in the fileplacement_policy_test.go
so that you can check some placement bundle settings are also set correctly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, I just added more complete tests there.