From 774c4e76f62e778346539dea533f15221734e358 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sat, 16 Sep 2023 23:27:10 +0800 Subject: [PATCH] ddl: Exchange part schema load fix (#46126) (#46191) close pingcap/tidb#45791, ref pingcap/tidb#46125 --- ddl/ddl_worker.go | 45 ++++++++++++++++++++------------ ddl/metadatalocktest/BUILD.bazel | 1 - ddl/metadatalocktest/mdl_test.go | 13 ++++----- ddl/partition.go | 23 ++++++++++++++++ infoschema/builder.go | 28 +++++++++++++++----- 5 files changed, 79 insertions(+), 31 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index e744e467f9650..b5cd5c9c06c6a 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1556,27 +1556,40 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: + // From start of function: diff.SchemaID = job.SchemaID + // Old is original non partitioned table diff.OldTableID = job.TableID diff.OldSchemaID = job.SchemaID + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return 0, errors.Trace(err) + } + // This is needed for not crashing TiFlash! + // TODO: Update TiFlash, to handle StateWriteOnly + diff.AffectedOpts = []*model.AffectedOption{{ + TableID: ptTableID, + }} if job.SchemaState != model.StatePublic { + // No change, just to refresh the non-partitioned table + // with its new ExchangePartitionInfo. diff.TableID = job.TableID - diff.SchemaID = job.SchemaID + // Keep this as Schema ID of non-partitioned table + // to avoid trigger early rename in TiFlash + diff.AffectedOpts[0].SchemaID = job.SchemaID } else { - // Update the partitioned table (it is only done in the last state) - var ( - ptSchemaID int64 - ptTableID int64 - ptDefID int64 // Not needed, will reload the whole table - partName string // Not used - withValidation bool // Not used - ) - // See ddl.ExchangeTablePartition - err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) - if err != nil { - return 0, errors.Trace(err) - } - diff.SchemaID = ptSchemaID - diff.TableID = ptTableID + // Swap + diff.TableID = ptDefID + // Also add correct SchemaID in case different schemas + diff.AffectedOpts[0].SchemaID = ptSchemaID } case model.ActionTruncateTablePartition: diff.TableID = job.TableID diff --git a/ddl/metadatalocktest/BUILD.bazel b/ddl/metadatalocktest/BUILD.bazel index 413fd554034f5..25e6058a3a7b4 100644 --- a/ddl/metadatalocktest/BUILD.bazel +++ b/ddl/metadatalocktest/BUILD.bazel @@ -13,7 +13,6 @@ go_test( "//ddl", "//errno", "//server", - "//sessionctx/variable", "//testkit", "//testkit/testsetup", "//util/logutil", diff --git a/ddl/metadatalocktest/mdl_test.go b/ddl/metadatalocktest/mdl_test.go index 5364809aecebf..0e9c3e32f06a6 100644 --- a/ddl/metadatalocktest/mdl_test.go +++ b/ddl/metadatalocktest/mdl_test.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/failpoint" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/server" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/logutil" "github.com/stretchr/testify/require" @@ -1148,6 +1147,7 @@ func TestExchangePartitionStates(t *testing.T) { tk.MustExec("create database " + dbName) tk.MustExec("use " + dbName) tk.MustExec(`set @@global.tidb_enable_metadata_lock = ON`) + defer tk.MustExec(`set @@global.tidb_enable_metadata_lock = DEFAULT`) tk2 := testkit.NewTestKit(t, store) tk2.MustExec("use " + dbName) tk3 := testkit.NewTestKit(t, store) @@ -1159,7 +1159,6 @@ func TestExchangePartitionStates(t *testing.T) { tk.MustExec(`insert into t values (1, "1")`) tk.MustExec(`insert into tp values (2, "2")`) tk.MustExec(`analyze table t,tp`) - tk.MustQuery(`select * from information_schema.global_variables`).Check(testkit.Rows()) var wg sync.WaitGroup wg.Add(1) dumpChan := make(chan struct{}) @@ -1171,7 +1170,6 @@ func TestExchangePartitionStates(t *testing.T) { tk.MustExec("BEGIN") tk.MustQuery(`select * from t`).Check(testkit.Rows("1 1")) tk.MustQuery(`select * from tp`).Check(testkit.Rows("2 2")) - //require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `pause`)) alterChan := make(chan error) go func() { // WITH VALIDATION is the default @@ -1193,6 +1191,8 @@ func TestExchangePartitionStates(t *testing.T) { } time.Sleep(50 * time.Millisecond) } + // Sleep 50ms to wait load InfoSchema finish, issue #46815. + time.Sleep(50 * time.Millisecond) } waitFor("t", "write only", 4) tk3.MustExec(`BEGIN`) @@ -1205,7 +1205,6 @@ func TestExchangePartitionStates(t *testing.T) { // MDL will block the alter to not continue until all clients // are in StateWriteOnly, which tk is blocking until it commits tk.MustExec(`COMMIT`) - //require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID")) waitFor("t", "rollback done", 11) // MDL will block the alter from finish, tk is in 'rollbacked' schema version // but the alter is still waiting for tk3 to commit, before continuing @@ -1260,10 +1259,8 @@ func TestExchangePartitionStates(t *testing.T) { } func TestExchangePartitionMultiTable(t *testing.T) { - logutil.BgLogger().Info("mdl related variable status before bootstrap", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) store := testkit.CreateMockStore(t) tk1 := testkit.NewTestKit(t, store) - logutil.BgLogger().Info("mdl related variable status after bootstrap", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) dbName := "ExchangeMultiTable" tk1.MustExec(`create schema ` + dbName) @@ -1275,7 +1272,6 @@ func TestExchangePartitionMultiTable(t *testing.T) { tk1.MustExec(`insert into t1 values (0)`) tk1.MustExec(`insert into t2 values (3)`) tk1.MustExec(`insert into tp values (6)`) - logutil.BgLogger().Info("mdl related variable status after inserting rows", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) tk2 := testkit.NewTestKit(t, store) tk2.MustExec(`use ` + dbName) @@ -1304,6 +1300,8 @@ func TestExchangePartitionMultiTable(t *testing.T) { } time.Sleep(100 * time.Millisecond) } + // Sleep 50ms to wait load InfoSchema finish, issue #46815. + time.Sleep(50 * time.Millisecond) } var wg sync.WaitGroup wg.Add(1) @@ -1319,7 +1317,6 @@ func TestExchangePartitionMultiTable(t *testing.T) { tk3.MustExec(`insert into t1 values (1)`) tk3.MustExec(`insert into t2 values (2)`) tk3.MustExec(`insert into tp values (3)`) - //require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `pause`)) go func() { alterChan1 <- tk1.ExecToErr(`alter table tp exchange partition p0 with table t1`) }() diff --git a/ddl/partition.go b/ddl/partition.go index ba6110439c89a..734cf6179cf73 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2073,6 +2073,16 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + if defID != partDef.ID { + logutil.BgLogger().Info("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"), + zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID)) + job.Args[0] = partDef.ID + defID = partDef.ID + err = updateDDLJob2Table(w.sess, job, true) + if err != nil { + return ver, errors.Trace(err) + } + } nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ ExchangePartitionID: ptID, ExchangePartitionDefID: defID, @@ -2094,6 +2104,18 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo delayForAsyncCommit() } + if defID != partDef.ID { + // Should never happen, should have been updated above, in previous state! + logutil.BgLogger().Error("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"), + zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID)) + job.Args[0] = partDef.ID + defID = partDef.ID + err = updateDDLJob2Table(w.sess, job, true) + if err != nil { + return ver, errors.Trace(err) + } + } + if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { @@ -2201,6 +2223,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo ntr := rules[ntrID] ptr := rules[ptrID] + // This must be a bug, nt cannot be partitioned! partIDs := getPartitionIDs(nt) var setRules []*label.Rule diff --git a/infoschema/builder.go b/infoschema/builder.go index c8d36558c3800..18077a2663766 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -296,31 +296,46 @@ func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDi ntID := diff.OldTableID ptSchemaID := diff.SchemaID ptID := diff.TableID + partID := diff.TableID if len(diff.AffectedOpts) > 0 { - // From old version ptID = diff.AffectedOpts[0].TableID - ptSchemaID = diff.AffectedOpts[0].SchemaID + if diff.AffectedOpts[0].SchemaID != 0 { + ptSchemaID = diff.AffectedOpts[0].SchemaID + } } // The normal table needs to be updated first: // Just update the tables separately currDiff := &model.SchemaDiff{ + // This is only for the case since https://github.com/pingcap/tidb/pull/45877 + // Fixed now, by adding back the AffectedOpts + // to carry the partitioned Table ID. + Type: diff.Type, Version: diff.Version, TableID: ntID, SchemaID: ntSchemaID, } + if ptID != partID { + currDiff.TableID = partID + currDiff.OldTableID = ntID + currDiff.OldSchemaID = ntSchemaID + } ntIDs, err := b.applyTableUpdate(m, currDiff) if err != nil { return nil, errors.Trace(err) } - b.markPartitionBundleShouldUpdate(ntID) - // Then the partitioned table + // partID is the new id for the non-partitioned table! + b.markTableBundleShouldUpdate(partID) + // Then the partitioned table, will re-read the whole table, including all partitions! currDiff.TableID = ptID currDiff.SchemaID = ptSchemaID + currDiff.OldTableID = ptID + currDiff.OldSchemaID = ptSchemaID ptIDs, err := b.applyTableUpdate(m, currDiff) if err != nil { return nil, errors.Trace(err) } - b.markTableBundleShouldUpdate(ptID) + // ntID is the new id for the partition! + b.markPartitionBundleShouldUpdate(ntID) err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) if err != nil { return nil, errors.Trace(err) @@ -426,7 +441,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID - case model.ActionTruncateTable, model.ActionCreateView: + case model.ActionTruncateTable, model.ActionCreateView, + model.ActionExchangeTablePartition: oldTableID = diff.OldTableID newTableID = diff.TableID default: