Skip to content

Commit

Permalink
session: adjust the upgrade compatibility behavior (#46644)
Browse files Browse the repository at this point in the history
close #46639
  • Loading branch information
zimulala authored Sep 5, 2023
1 parent d4d025c commit 5d4cea5
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 35 deletions.
9 changes: 2 additions & 7 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,11 +1179,8 @@ func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) {
}

var (
// SupportUpgradeStateVer is exported for testing.
// The minimum version that can be upgraded by paused user DDL.
SupportUpgradeStateVer int64 = version145
// SupportUpgradeHTTPOpVer is exported for testing.
// The minimum version of the upgrade can be notified through the HTTP API.
// The minimum version of the upgrade by paused user DDL can be notified through the HTTP API.
SupportUpgradeHTTPOpVer int64 = version173
)

Expand All @@ -1196,9 +1193,7 @@ func upgrade(s Session) {
// It is already bootstrapped/upgraded by a higher version TiDB server.
return
}
if ver >= SupportUpgradeStateVer {
checkOrSyncUpgrade(s, ver)
}
printClusterState(s, ver)

// Only upgrade from under version92 and this TiDB is not owner set.
// The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue.
Expand Down
2 changes: 1 addition & 1 deletion session/bootstraptest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//config",
"//ddl",
Expand Down
69 changes: 56 additions & 13 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestUpgradeVersionMockLatest(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)
dom.Close()
startUpgrade(store, session.CurrentBootstrapVersion-1)
startUpgrade(store)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand Down Expand Up @@ -299,8 +299,8 @@ func TestUpgradeVersionMockLatest(t *testing.T) {
" PARTITION `p4` VALUES LESS THAN (7096))"))
}

// TestUpgradeVersionForUpgradeHTTPOp tests SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++.
func TestUpgradeVersionForUpgradeHTTPOp(t *testing.T) {
// TestUpgradeVersionWithUpgradeHTTPOp tests SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++ with HTTP op.
func TestUpgradeVersionWithUpgradeHTTPOp(t *testing.T) {
*session.WithMockUpgrade = true
session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest

Expand Down Expand Up @@ -347,6 +347,53 @@ func TestUpgradeVersionForUpgradeHTTPOp(t *testing.T) {
require.Equal(t, false, isUpgrading)
}

// TestUpgradeVersionWithoutUpgradeHTTPOp tests SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++ without HTTP op.
func TestUpgradeVersionWithoutUpgradeHTTPOp(t *testing.T) {
*session.WithMockUpgrade = true
session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest

store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

seV := session.CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
err = m.FinishBootstrap(session.SupportUpgradeHTTPOpVer)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.SupportUpgradeHTTPOpVer))
session.UnsetStoreBootstrapped(store.UUID())
ver, err := session.GetBootstrapVersion(seV)
require.NoError(t, err)
require.Equal(t, session.SupportUpgradeHTTPOpVer, ver)
dom.Close()

// Start the upgrade test.
// Current cluster state is normal.
isUpgrading, err := session.IsUpgradingClusterState(seV)
require.NoError(t, err)
require.Equal(t, false, isUpgrading)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion+1, ver)
// Current cluster state is upgrading.
isUpgrading, err = session.IsUpgradingClusterState(seLatestV)
require.NoError(t, err)
require.Equal(t, false, isUpgrading)
upgradeHandler := handler.NewClusterUpgradeHandler(store)
upgradeHandler.FinishUpgrade()
// Upgrading is finished and current cluster state is normal.
isUpgrading, err = session.IsUpgradingClusterState(seV)
require.NoError(t, err)
require.Equal(t, false, isUpgrading)
}

func TestUpgradeVersionForPausedJob(t *testing.T) {
store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()
Expand Down Expand Up @@ -386,7 +433,7 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store, session.CurrentBootstrapVersion-1)
startUpgrade(store)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand Down Expand Up @@ -471,7 +518,7 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store, session.CurrentBootstrapVersion-1)
startUpgrade(store)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand Down Expand Up @@ -560,7 +607,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store, session.CurrentBootstrapVersion-1)
startUpgrade(store)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand Down Expand Up @@ -614,11 +661,7 @@ func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.R
return rows, nil
}

func startUpgrade(store kv.Storage, currVer int64) {
// It's used for compatible tests upgraded from previous versions of SupportUpgradeHTTPOpVer.
if currVer < session.SupportUpgradeHTTPOpVer {
return
}
func startUpgrade(store kv.Storage) {
upgradeHandler := handler.NewClusterUpgradeHandler(store)
upgradeHandler.StartUpgrade()
}
Expand All @@ -633,7 +676,7 @@ func finishUpgrade(store kv.Storage) {
// 1.Before and after each test bootstrap, the DDL of the user DB is paused, but the DDL of the system DB is not paused.
// 2.Check user DDLs are handled after system DDLs.
func TestUpgradeWithPauseDDL(t *testing.T) {
session.SupportUpgradeStateVer--
session.SupportUpgradeHTTPOpVer--
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()
Expand Down Expand Up @@ -739,7 +782,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)
dom.Close()
startUpgrade(store, session.CurrentBootstrapVersion-1)
startUpgrade(store)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand Down
25 changes: 11 additions & 14 deletions session/sync_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/ddl/syncer"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -120,12 +119,12 @@ func IsUpgradingClusterState(s sessionctx.Context) (bool, error) {
return stateInfo.State == syncer.StateUpgrading, nil
}

func checkOrSyncUpgrade(s Session, ver int64) {
if ver < SupportUpgradeHTTPOpVer {
terror.MustNil(SyncUpgradeState(s, time.Duration(internalSQLTimeout)*time.Second))
return
func printClusterState(s Session, ver int64) {
// After SupportUpgradeHTTPOpVer version, the upgrade by paused user DDL can be notified through the HTTP API.
// We check the global state see if we are upgrading by paused the user DDL.
if ver >= SupportUpgradeHTTPOpVer {
isUpgradingClusterStateWithRetry(s, ver, currentBootstrapVersion, time.Duration(internalSQLTimeout)*time.Second)
}
isUpgradingClusterStateWithRetry(s, ver, currentBootstrapVersion, time.Duration(internalSQLTimeout)*time.Second)
}

func isUpgradingClusterStateWithRetry(s sessionctx.Context, oldVer, newVer int64, timeout time.Duration) {
Expand All @@ -135,19 +134,17 @@ func isUpgradingClusterStateWithRetry(s sessionctx.Context, oldVer, newVer int64
for i := 0; ; i++ {
isUpgrading, err := IsUpgradingClusterState(s)
if err == nil {
if isUpgrading {
break
}
logger.Fatal("global state isn't upgrading, please send a request to start the upgrade first", zap.Error(err))
logger.Info("get global state", zap.Int64("old version", oldVer), zap.Int64("latest version", newVer), zap.Bool("is upgrading state", isUpgrading))
return
}

if time.Since(now) >= timeout {
logger.Fatal("get global state failed", zap.Error(err))
logger.Error("get global state failed", zap.Int64("old version", oldVer), zap.Int64("latest version", newVer), zap.Error(err))
return
}
if i%10 == 0 {
logger.Warn("get global state failed", zap.Error(err))
if i%25 == 0 {
logger.Warn("get global state failed", zap.Int64("old version", oldVer), zap.Int64("latest version", newVer), zap.Error(err))
}
time.Sleep(interval)
}
logger.Info("global state is upgrading", zap.Int64("old version", oldVer), zap.Int64("latest version", newVer))
}

0 comments on commit 5d4cea5

Please sign in to comment.