diff --git a/session/bootstrap.go b/session/bootstrap.go index ab4ab97ef4136..4c200adab7f35 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -1179,11 +1179,8 @@ func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) { } var ( - // SupportUpgradeStateVer is exported for testing. - // The minimum version that can be upgraded by paused user DDL. - SupportUpgradeStateVer int64 = version145 // SupportUpgradeHTTPOpVer is exported for testing. - // The minimum version of the upgrade can be notified through the HTTP API. + // The minimum version of the upgrade by paused user DDL can be notified through the HTTP API. SupportUpgradeHTTPOpVer int64 = version173 ) @@ -1196,9 +1193,7 @@ func upgrade(s Session) { // It is already bootstrapped/upgraded by a higher version TiDB server. return } - if ver >= SupportUpgradeStateVer { - checkOrSyncUpgrade(s, ver) - } + printClusterState(s, ver) // Only upgrade from under version92 and this TiDB is not owner set. // The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue. diff --git a/session/bootstraptest/BUILD.bazel b/session/bootstraptest/BUILD.bazel index b5a3893d8c4f3..be1feb9c4c709 100644 --- a/session/bootstraptest/BUILD.bazel +++ b/session/bootstraptest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 12, + shard_count = 13, deps = [ "//config", "//ddl", diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index 12e14ebbc3940..4dc1be3c72ed8 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -257,7 +257,7 @@ func TestUpgradeVersionMockLatest(t *testing.T) { require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion-1, ver) dom.Close() - startUpgrade(store, session.CurrentBootstrapVersion-1) + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -299,8 +299,8 @@ func TestUpgradeVersionMockLatest(t *testing.T) { " PARTITION `p4` VALUES LESS THAN (7096))")) } -// TestUpgradeVersionForUpgradeHTTPOp tests SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++. -func TestUpgradeVersionForUpgradeHTTPOp(t *testing.T) { +// TestUpgradeVersionWithUpgradeHTTPOp tests SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++ with HTTP op. +func TestUpgradeVersionWithUpgradeHTTPOp(t *testing.T) { *session.WithMockUpgrade = true session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest @@ -347,6 +347,53 @@ func TestUpgradeVersionForUpgradeHTTPOp(t *testing.T) { require.Equal(t, false, isUpgrading) } +// TestUpgradeVersionWithoutUpgradeHTTPOp tests SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++ without HTTP op. +func TestUpgradeVersionWithoutUpgradeHTTPOp(t *testing.T) { + *session.WithMockUpgrade = true + session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest + + store, dom := session.CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + seV := session.CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(session.SupportUpgradeHTTPOpVer) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.SupportUpgradeHTTPOpVer)) + session.UnsetStoreBootstrapped(store.UUID()) + ver, err := session.GetBootstrapVersion(seV) + require.NoError(t, err) + require.Equal(t, session.SupportUpgradeHTTPOpVer, ver) + dom.Close() + + // Start the upgrade test. + // Current cluster state is normal. + isUpgrading, err := session.IsUpgradingClusterState(seV) + require.NoError(t, err) + require.Equal(t, false, isUpgrading) + domLatestV, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domLatestV.Close() + seLatestV := session.CreateSessionAndSetID(t, store) + ver, err = session.GetBootstrapVersion(seLatestV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion+1, ver) + // Current cluster state is upgrading. + isUpgrading, err = session.IsUpgradingClusterState(seLatestV) + require.NoError(t, err) + require.Equal(t, false, isUpgrading) + upgradeHandler := handler.NewClusterUpgradeHandler(store) + upgradeHandler.FinishUpgrade() + // Upgrading is finished and current cluster state is normal. + isUpgrading, err = session.IsUpgradingClusterState(seV) + require.NoError(t, err) + require.Equal(t, false, isUpgrading) +} + func TestUpgradeVersionForPausedJob(t *testing.T) { store, dom := session.CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -386,7 +433,7 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { <-ch dom.Close() // Make sure upgrade is successful. - startUpgrade(store, session.CurrentBootstrapVersion-1) + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -471,7 +518,7 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) { <-ch dom.Close() // Make sure upgrade is successful. - startUpgrade(store, session.CurrentBootstrapVersion-1) + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -560,7 +607,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { <-ch dom.Close() // Make sure upgrade is successful. - startUpgrade(store, session.CurrentBootstrapVersion-1) + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -614,11 +661,7 @@ func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.R return rows, nil } -func startUpgrade(store kv.Storage, currVer int64) { - // It's used for compatible tests upgraded from previous versions of SupportUpgradeHTTPOpVer. - if currVer < session.SupportUpgradeHTTPOpVer { - return - } +func startUpgrade(store kv.Storage) { upgradeHandler := handler.NewClusterUpgradeHandler(store) upgradeHandler.StartUpgrade() } @@ -633,7 +676,7 @@ func finishUpgrade(store kv.Storage) { // 1.Before and after each test bootstrap, the DDL of the user DB is paused, but the DDL of the system DB is not paused. // 2.Check user DDLs are handled after system DDLs. func TestUpgradeWithPauseDDL(t *testing.T) { - session.SupportUpgradeStateVer-- + session.SupportUpgradeHTTPOpVer-- ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond) store, dom := session.CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -739,7 +782,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) { require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion-1, ver) dom.Close() - startUpgrade(store, session.CurrentBootstrapVersion-1) + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() diff --git a/session/sync_upgrade.go b/session/sync_upgrade.go index 311c2423d7e3c..ae51998830648 100644 --- a/session/sync_upgrade.go +++ b/session/sync_upgrade.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/tidb/ddl/syncer" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/owner" - "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -120,12 +119,12 @@ func IsUpgradingClusterState(s sessionctx.Context) (bool, error) { return stateInfo.State == syncer.StateUpgrading, nil } -func checkOrSyncUpgrade(s Session, ver int64) { - if ver < SupportUpgradeHTTPOpVer { - terror.MustNil(SyncUpgradeState(s, time.Duration(internalSQLTimeout)*time.Second)) - return +func printClusterState(s Session, ver int64) { + // After SupportUpgradeHTTPOpVer version, the upgrade by paused user DDL can be notified through the HTTP API. + // We check the global state see if we are upgrading by paused the user DDL. + if ver >= SupportUpgradeHTTPOpVer { + isUpgradingClusterStateWithRetry(s, ver, currentBootstrapVersion, time.Duration(internalSQLTimeout)*time.Second) } - isUpgradingClusterStateWithRetry(s, ver, currentBootstrapVersion, time.Duration(internalSQLTimeout)*time.Second) } func isUpgradingClusterStateWithRetry(s sessionctx.Context, oldVer, newVer int64, timeout time.Duration) { @@ -135,19 +134,17 @@ func isUpgradingClusterStateWithRetry(s sessionctx.Context, oldVer, newVer int64 for i := 0; ; i++ { isUpgrading, err := IsUpgradingClusterState(s) if err == nil { - if isUpgrading { - break - } - logger.Fatal("global state isn't upgrading, please send a request to start the upgrade first", zap.Error(err)) + logger.Info("get global state", zap.Int64("old version", oldVer), zap.Int64("latest version", newVer), zap.Bool("is upgrading state", isUpgrading)) + return } if time.Since(now) >= timeout { - logger.Fatal("get global state failed", zap.Error(err)) + logger.Error("get global state failed", zap.Int64("old version", oldVer), zap.Int64("latest version", newVer), zap.Error(err)) + return } - if i%10 == 0 { - logger.Warn("get global state failed", zap.Error(err)) + if i%25 == 0 { + logger.Warn("get global state failed", zap.Int64("old version", oldVer), zap.Int64("latest version", newVer), zap.Error(err)) } time.Sleep(interval) } - logger.Info("global state is upgrading", zap.Int64("old version", oldVer), zap.Int64("latest version", newVer)) }