diff --git a/ddl/mock.go b/ddl/mock.go index 010aef6666222..e3c786e517f6e 100644 --- a/ddl/mock.go +++ b/ddl/mock.go @@ -143,11 +143,14 @@ func NewMockStateSyncer() syncer.StateSyncer { return &MockStateSyncer{} } +// clusterState mocks cluster state. +// We move it from MockStateSyncer to here. Because we want to make it unaffected by ddl close. +var clusterState *atomicutil.Pointer[syncer.StateInfo] + // MockStateSyncer is a mock state syncer, it is exported for testing. type MockStateSyncer struct { - clusterState *atomicutil.Pointer[syncer.StateInfo] - globalVerCh chan clientv3.WatchResponse - mockSession chan struct{} + globalVerCh chan clientv3.WatchResponse + mockSession chan struct{} } // Init implements StateSyncer.Init interface. @@ -155,7 +158,9 @@ func (s *MockStateSyncer) Init(context.Context) error { s.globalVerCh = make(chan clientv3.WatchResponse, 1) s.mockSession = make(chan struct{}, 1) state := syncer.NewStateInfo(syncer.StateNormalRunning) - s.clusterState = atomicutil.NewPointer(state) + if clusterState == nil { + clusterState = atomicutil.NewPointer(state) + } return nil } @@ -163,23 +168,23 @@ func (s *MockStateSyncer) Init(context.Context) error { func (s *MockStateSyncer) UpdateGlobalState(_ context.Context, stateInfo *syncer.StateInfo) error { failpoint.Inject("mockUpgradingState", func(val failpoint.Value) { if val.(bool) { - s.clusterState.Store(stateInfo) + clusterState.Store(stateInfo) failpoint.Return(nil) } }) s.globalVerCh <- clientv3.WatchResponse{} - s.clusterState.Store(stateInfo) + clusterState.Store(stateInfo) return nil } // GetGlobalState implements StateSyncer.GetGlobalState interface. -func (s *MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) { - return s.clusterState.Load(), nil +func (*MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) { + return clusterState.Load(), nil } // IsUpgradingState implements StateSyncer.IsUpgradingState interface. -func (s *MockStateSyncer) IsUpgradingState() bool { - return s.clusterState.Load().State == syncer.StateUpgrading +func (*MockStateSyncer) IsUpgradingState() bool { + return clusterState.Load().State == syncer.StateUpgrading } // WatchChan implements StateSyncer.WatchChan interface. diff --git a/docs/tidb_http_api.md b/docs/tidb_http_api.md index a5fdfdf6bfb8a..b6fa14b6d43d5 100644 --- a/docs/tidb_http_api.md +++ b/docs/tidb_http_api.md @@ -564,7 +564,6 @@ timezone.* # reset the size of the ballast object (2GB in this example) curl -v -X POST -d "2147483648" http://{TiDBIP}:10080/debug/ballast-object-sz ``` - 1. Set deadlock history table capacity @@ -591,3 +590,14 @@ timezone.* ```shell curl -X POST -d "transaction_summary_capacity={number}" http://{TiDBIP}:10080/settings ``` + +1. Send upgrade operations to the cluster. The operations here include `start` and `finish`. + + ```shell + curl -X POST http://{TiDBIP}:10080/upgrade/{op} + ``` + + ```shell + $curl -X POST http://127.0.0.1:10080/upgrade/start + "success!" + ``` diff --git a/server/BUILD.bazel b/server/BUILD.bazel index 10e2e927341e5..82c8515a21502 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "stat.go", "statistics_handler.go", "tokenlimiter.go", + "upgrade_handler.go", "util.go", ], importpath = "github.com/pingcap/tidb/server", diff --git a/server/handler/BUILD.bazel b/server/handler/BUILD.bazel new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/server/handler/tests/BUILD.bazel b/server/handler/tests/BUILD.bazel new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/server/http_handler_test.go b/server/http_handler_test.go index 926a5d99b6997..e26e0931dfcbb 100644 --- a/server/http_handler_test.go +++ b/server/http_handler_test.go @@ -1243,3 +1243,88 @@ func TestSetLabelsConcurrentWithGetLabel(t *testing.T) { conf.Labels = map[string]string{} }) } + +func TestUpgrade(t *testing.T) { + ts := createBasicHTTPHandlerTestSuite() + ts.startServer(t) + defer ts.stopServer(t) + + resp, err := ts.fetchStatus("/upgrade/start") + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + require.NoError(t, err) + require.NotNil(t, resp) + // test upgrade start + resp, err = ts.postStatus("/upgrade/start", "application/x-www-form-urlencoded", nil) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + b, err := httputil.DumpResponse(resp, true) + require.NoError(t, err) + require.Greater(t, len(b), 0) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, "\"success!\"", string(body)) + // check the result + se, err := session.CreateSession(ts.store) + require.NoError(t, err) + isUpgrading, err := session.IsUpgradingClusterState(se) + require.NoError(t, err) + require.True(t, isUpgrading) + + // Do start upgrade again. + resp, err = ts.postStatus("/upgrade/start", "application/x-www-form-urlencoded", nil) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + b, err = httputil.DumpResponse(resp, true) + require.NoError(t, err) + require.Greater(t, len(b), 0) + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, "\"It's a duplicated operation and the cluster is already in upgrading state.\"", string(body)) + // check the result + se, err = session.CreateSession(ts.store) + require.NoError(t, err) + isUpgrading, err = session.IsUpgradingClusterState(se) + require.NoError(t, err) + require.True(t, isUpgrading) + + // test upgrade finish + resp, err = ts.postStatus("/upgrade/finish", "application/x-www-form-urlencoded", nil) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + b, err = httputil.DumpResponse(resp, true) + require.NoError(t, err) + require.Greater(t, len(b), 0) + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, "\"success!\"", string(body)) + // check the result + se, err = session.CreateSession(ts.store) + require.NoError(t, err) + isUpgrading, err = session.IsUpgradingClusterState(se) + require.NoError(t, err) + require.False(t, isUpgrading) + + // Do finish upgrade again. + resp, err = ts.postStatus("/upgrade/finish", "application/x-www-form-urlencoded", nil) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + b, err = httputil.DumpResponse(resp, true) + require.NoError(t, err) + require.Greater(t, len(b), 0) + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, "\"It's a duplicated operation and the cluster is already in normal state.\"", string(body)) + // check the result + se, err = session.CreateSession(ts.store) + require.NoError(t, err) + isUpgrading, err = session.IsUpgradingClusterState(se) + require.NoError(t, err) + require.False(t, isUpgrading) +} diff --git a/server/http_status.go b/server/http_status.go index 7ef34909f0779..4ded59a3b7ab3 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -239,6 +239,9 @@ func (s *Server) startHTTPServer() { // HTTP path for get table tiflash replica info. router.Handle("/tiflash/replica-deprecated", flashReplicaHandler{tikvHandlerTool}) + // HTTP path for upgrade operations. + router.Handle("/upgrade/{op}", NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations") + if s.cfg.Store == "tikv" { // HTTP path for tikv. router.Handle("/tables/{db}/{table}/regions", tableHandler{tikvHandlerTool, opTableRegions}) diff --git a/server/upgrade_handler.go b/server/upgrade_handler.go new file mode 100644 index 0000000000000..1a5aa75962f4d --- /dev/null +++ b/server/upgrade_handler.go @@ -0,0 +1,216 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "net/http" + "strings" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// ClusterUpgradeHandler is the handler for upgrading cluster. +type ClusterUpgradeHandler struct { + store kv.Storage +} + +// NewClusterUpgradeHandler creates a new ClusterUpgradeHandler. +func NewClusterUpgradeHandler(store kv.Storage) *ClusterUpgradeHandler { + return &ClusterUpgradeHandler{store: store} +} + +// ServeHTTP handles request of ddl server info. +func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + writeError(w, errors.Errorf("This API only support POST method")) + return + } + + var err error + var hasDone bool + params := mux.Vars(req) + op := params["op"] + switch op { + case "start": + hasDone, err = h.StartUpgrade() + case "finish": + hasDone, err = h.FinishUpgrade() + case "show": + err = h.showUpgrade(w) + default: + writeError(w, errors.Errorf("wrong operation:%s", op)) + return + } + + if err != nil { + writeError(w, err) + logutil.Logger(req.Context()).Info("upgrade operation failed", + zap.String("category", "upgrading"), zap.String("operation", op), zap.Bool("hasDone", hasDone)) + return + } + if hasDone { + switch op { + case "start": + writeData(w, "It's a duplicated operation and the cluster is already in upgrading state.") + case "finish": + writeData(w, "It's a duplicated operation and the cluster is already in normal state.") + } + } else { + writeData(w, "success!") + } + logutil.Logger(req.Context()).Info("upgrade operation success", + zap.String("category", "upgrading"), zap.String("operation", op), zap.Bool("hasDone", hasDone)) +} + +// StartUpgrade is used to start the upgrade. +func (h ClusterUpgradeHandler) StartUpgrade() (hasDone bool, err error) { + se, err := session.CreateSession(h.store) + if err != nil { + return false, err + } + defer se.Close() + + isUpgrading, err := session.IsUpgradingClusterState(se) + if err != nil { + return false, err + } + if isUpgrading { + return true, nil + } + + err = session.SyncUpgradeState(se, 10*time.Second) + return false, err +} + +// FinishUpgrade is used to finish the upgrade. +func (h ClusterUpgradeHandler) FinishUpgrade() (hasDone bool, err error) { + se, err := session.CreateSession(h.store) + if err != nil { + return false, err + } + defer se.Close() + + isUpgrading, err := session.IsUpgradingClusterState(se) + if err != nil { + return false, err + } + if !isUpgrading { + return true, nil + } + + err = session.SyncNormalRunning(se) + return false, err +} + +// SimpleServerInfo is some simple information such as version and address. +type SimpleServerInfo struct { + infosync.ServerVersionInfo + ID string `json:"ddl_id"` + IP string `json:"ip"` + Port uint `json:"listening_port"` + JSONServerID uint64 `json:"server_id"` +} + +// ClusterUpgradeInfo is used to report cluster upgrade info when do http request. +type ClusterUpgradeInfo struct { + ServersNum int `json:"servers_num,omitempty"` + OwnerID string `json:"owner_id"` + UpgradedPercent int `json:"upgraded_percent"` + IsAllUpgraded bool `json:"is_all_server_version_consistent,omitempty"` + AllServersDiffInfos []SimpleServerInfo `json:"all_servers_diff_info,omitempty"` +} + +func (h ClusterUpgradeHandler) showUpgrade(w http.ResponseWriter) error { + se, err := session.CreateSession(h.store) + if err != nil { + return err + } + defer se.Close() + + // Check if we are upgrading by pausing user DDL(in other words by "/upgrade/start"). + isUpgrading, err := session.IsUpgradingClusterState(se) + if err != nil { + return err + } + if !isUpgrading { + writeData(w, "The cluster state is normal.") + return nil + } + + do, err := session.GetDomain(h.store) + if err != nil { + return err + } + ctx := context.Background() + allServersInfo, err := infosync.GetAllServerInfo(ctx) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + ownerID, err := do.DDL().OwnerManager().GetOwnerID(ctx) + cancel() + if err != nil { + return err + } + + allVersionsMap := map[infosync.ServerVersionInfo]int{} + allVersions := make([]infosync.ServerVersionInfo, 0, len(allServersInfo)) + for _, v := range allServersInfo { + if _, ok := allVersionsMap[v.ServerVersionInfo]; ok { + allVersionsMap[v.ServerVersionInfo]++ + continue + } + allVersionsMap[v.ServerVersionInfo] = 1 + allVersions = append(allVersions, v.ServerVersionInfo) + } + maxVerInfo := allVersions[0] + for k := range allVersionsMap { + if strings.Compare(maxVerInfo.Version, k.Version) < 0 { + maxVerInfo = k + } + } + upgradedPercent := (allVersionsMap[maxVerInfo] * 100) / len(allServersInfo) + upgradeInfo := ClusterUpgradeInfo{ + ServersNum: len(allServersInfo), + OwnerID: ownerID, + UpgradedPercent: upgradedPercent, + IsAllUpgraded: upgradedPercent == 100, + } + // If IsAllUpgraded is false, return the all tidb servers version. + if !upgradeInfo.IsAllUpgraded { + allSimpleServerInfo := make([]SimpleServerInfo, 0, len(allServersInfo)) + for _, info := range allServersInfo { + sInfo := SimpleServerInfo{ + ServerVersionInfo: info.ServerVersionInfo, + ID: info.ID, + IP: info.IP, + Port: info.Port, + JSONServerID: info.JSONServerID, + } + allSimpleServerInfo = append(allSimpleServerInfo, sInfo) + } + upgradeInfo.AllServersDiffInfos = allSimpleServerInfo + } + writeData(w, upgradeInfo) + return nil +} diff --git a/session/BUILD.bazel b/session/BUILD.bazel index 07791d003f1a8..b0de25f509a44 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "mock_bootstrap.go", "nontransactional.go", "session.go", + "sync_upgrade.go", "testutil.go", #keep "tidb.go", "txn.go", diff --git a/session/bootstrap.go b/session/bootstrap.go index 042f37aec8549..8af2369c209ec 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" - "github.com/pingcap/tidb/ddl/syncer" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/expression" @@ -1071,9 +1070,6 @@ func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) { return row.GetString(0), false, nil } -// SupportUpgradeStateVer is exported for testing. -var SupportUpgradeStateVer = version145 - // upgrade function will do some upgrade works, when the system is bootstrapped by low version TiDB server // For example, add new system variables into mysql.global_variables table. func upgrade(s Session) { @@ -1083,6 +1079,7 @@ func upgrade(s Session) { // It is already bootstrapped/upgraded by a higher version TiDB server. return } + // Only upgrade from under version92 and this TiDB is not owner set. // The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue. if ver < version92 { @@ -1102,9 +1099,6 @@ func upgrade(s Session) { logutil.BgLogger().Fatal("[upgrade] init metadata lock failed", zap.Error(err)) } - if ver >= int64(SupportUpgradeStateVer) { - syncUpgradeState(s) - } if isNull { upgradeToVer99Before(s) } @@ -1117,9 +1111,6 @@ func upgrade(s Session) { if isNull { upgradeToVer99After(s) } - if ver >= int64(SupportUpgradeStateVer) { - syncNormalRunning(s) - } variable.DDLForce2Queue.Store(false) updateBootstrapVer(s) @@ -1147,54 +1138,6 @@ func upgrade(s Session) { } } -func syncUpgradeState(s Session) { - totalInterval := time.Duration(internalSQLTimeout) * time.Second - ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval) - defer cancelFunc() - dom := domain.GetDomain(s) - err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading)) - if err != nil { - logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err)) - } - - interval := 200 * time.Millisecond - retryTimes := int(totalInterval / interval) - for i := 0; i < retryTimes; i++ { - op, err := owner.GetOwnerOpValue(ctx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap") - if err == nil && op.String() == owner.OpGetUpgradingState.String() { - break - } - if i == retryTimes-1 { - logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) - } - if i%10 == 0 { - logutil.BgLogger().Warn("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) - } - time.Sleep(interval) - } - - logutil.BgLogger().Info("[upgrading] update global state to upgrading", zap.String("state", syncer.StateUpgrading)) -} - -func syncNormalRunning(s Session) { - jobErrs, err := ddl.ResumeAllJobsBySystem(s) - if err != nil { - logutil.BgLogger().Warn("[upgrading] resume all paused jobs failed", zap.Error(err)) - } - for _, e := range jobErrs { - logutil.BgLogger().Warn("[upgrading] resume the job failed ", zap.Error(e)) - } - - ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) - defer cancelFunc() - dom := domain.GetDomain(s) - err = dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateNormalRunning)) - if err != nil { - logutil.BgLogger().Fatal("[upgrading] update global state to normal failed", zap.Error(err)) - } - logutil.BgLogger().Info("[upgrading] update global state to normal running finished") -} - // checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB. func checkOwnerVersion(ctx context.Context, dom *domain.Domain) (bool, error) { ticker := time.NewTicker(100 * time.Millisecond) diff --git a/session/bootstraptest/BUILD.bazel b/session/bootstraptest/BUILD.bazel index cedd8d3d1d49a..09cc8e2a0324f 100644 --- a/session/bootstraptest/BUILD.bazel +++ b/session/bootstraptest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 9, + shard_count = 11, deps = [ "//config", "//ddl", @@ -17,6 +17,7 @@ go_test( "//meta", "//parser/model", "//parser/terror", + "//server", "//session", #keep "//sessionctx", "//testkit", #keep diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index 194fa5e41f55b..4d31c332cc768 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/testkit" @@ -255,10 +256,13 @@ func TestUpgradeVersionMockLatest(t *testing.T) { require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion-1, ver) dom.Close() + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() + finishUpgrade(store) + seLatestV := session.CreateSessionAndSetID(t, store) ver, err = session.GetBootstrapVersion(seLatestV) require.NoError(t, err) @@ -294,6 +298,103 @@ func TestUpgradeVersionMockLatest(t *testing.T) { " PARTITION `p4` VALUES LESS THAN (7096))")) } +const supportUpgradeHTTPOpVer = int64(146) + +// TestUpgradeVersionWithUpgradeHTTPOp tests supportUpgradeHTTPOpVer upgrade supportUpgradeHTTPOpVer++ with HTTP op. +func TestUpgradeVersionWithUpgradeHTTPOp(t *testing.T) { + *session.WithMockUpgrade = true + session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest + + store, dom := session.CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + seV := session.CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(supportUpgradeHTTPOpVer) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", supportUpgradeHTTPOpVer)) + session.UnsetStoreBootstrapped(store.UUID()) + ver, err := session.GetBootstrapVersion(seV) + require.NoError(t, err) + require.Equal(t, supportUpgradeHTTPOpVer, ver) + dom.Close() + + // Start the upgrade test. + // Current cluster state is normal. + isUpgrading, err := session.IsUpgradingClusterState(seV) + require.NoError(t, err) + require.Equal(t, false, isUpgrading) + upgradeHandler := server.NewClusterUpgradeHandler(store) + upgradeHandler.StartUpgrade() + domLatestV, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domLatestV.Close() + seLatestV := session.CreateSessionAndSetID(t, store) + ver, err = session.GetBootstrapVersion(seLatestV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion+1, ver) + // Current cluster state is upgrading. + isUpgrading, err = session.IsUpgradingClusterState(seLatestV) + require.NoError(t, err) + require.Equal(t, true, isUpgrading) + upgradeHandler.FinishUpgrade() + // Upgrading is finished and current cluster state is normal. + isUpgrading, err = session.IsUpgradingClusterState(seV) + require.NoError(t, err) + require.Equal(t, false, isUpgrading) +} + +// TestUpgradeVersionWithoutUpgradeHTTPOp tests supportUpgradeHTTPOpVer upgrade supportUpgradeHTTPOpVer++ without HTTP op. +func TestUpgradeVersionWithoutUpgradeHTTPOp(t *testing.T) { + *session.WithMockUpgrade = true + session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest + + store, dom := session.CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + seV := session.CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(supportUpgradeHTTPOpVer) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", supportUpgradeHTTPOpVer)) + session.UnsetStoreBootstrapped(store.UUID()) + ver, err := session.GetBootstrapVersion(seV) + require.NoError(t, err) + require.Equal(t, supportUpgradeHTTPOpVer, ver) + dom.Close() + + // Start the upgrade test. + // Current cluster state is normal. + isUpgrading, err := session.IsUpgradingClusterState(seV) + require.NoError(t, err) + require.Equal(t, false, isUpgrading) + domLatestV, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domLatestV.Close() + seLatestV := session.CreateSessionAndSetID(t, store) + ver, err = session.GetBootstrapVersion(seLatestV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion+1, ver) + // Current cluster state is upgrading. + isUpgrading, err = session.IsUpgradingClusterState(seLatestV) + require.NoError(t, err) + require.Equal(t, false, isUpgrading) + upgradeHandler := server.NewClusterUpgradeHandler(store) + upgradeHandler.FinishUpgrade() + // Upgrading is finished and current cluster state is normal. + isUpgrading, err = session.IsUpgradingClusterState(seV) + require.NoError(t, err) + require.Equal(t, false, isUpgrading) +} + func TestUpgradeVersionForPausedJob(t *testing.T) { store, dom := session.CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -333,6 +434,7 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { <-ch dom.Close() // Make sure upgrade is successful. + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -341,6 +443,8 @@ func TestUpgradeVersionForPausedJob(t *testing.T) { require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion, ver) + finishUpgrade(store) + // Resume the DDL job, then add index operation can be executed successfully. session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID)) checkDDLJobExecSucc(t, seLatestV, jobID) @@ -434,6 +538,7 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) { <-ch dom.Close() // Make sure upgrade is successful. + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -442,6 +547,8 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) { require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion+1, ver) + finishUpgrade(store) + checkDDLJobExecSucc(t, seLatestV, jobID) } @@ -463,12 +570,21 @@ func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.R return rows, nil } +func startUpgrade(store kv.Storage) { + upgradeHandler := server.NewClusterUpgradeHandler(store) + upgradeHandler.StartUpgrade() +} + +func finishUpgrade(store kv.Storage) { + upgradeHandler := server.NewClusterUpgradeHandler(store) + upgradeHandler.FinishUpgrade() +} + // TestUpgradeWithPauseDDL adds a user and a system DB's DDL operations, before every test bootstrap(DDL operation). It tests: // // 1.Before and after each test bootstrap, the DDL of the user DB is paused, but the DDL of the system DB is not paused. // 2.Check user DDLs are handled after system DDLs. func TestUpgradeWithPauseDDL(t *testing.T) { - session.SupportUpgradeStateVer-- ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond) store, dom := session.CreateStoreAndBootstrap(t) defer func() { require.NoError(t, store.Close()) }() @@ -574,6 +690,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) { require.NoError(t, err) require.Equal(t, session.CurrentBootstrapVersion-1, ver) dom.Close() + startUpgrade(store) domLatestV, err := session.BootstrapSession(store) require.NoError(t, err) defer domLatestV.Close() @@ -584,6 +701,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) { require.Equal(t, session.CurrentBootstrapVersion+1, ver) wg.Wait() + finishUpgrade(store) tk := testkit.NewTestKit(t, store) var rows []chunk.Row diff --git a/session/mock_bootstrap.go b/session/mock_bootstrap.go index cfad1f8fa24e4..6168aef6bceb5 100644 --- a/session/mock_bootstrap.go +++ b/session/mock_bootstrap.go @@ -22,10 +22,6 @@ import ( "flag" "time" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/logutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" @@ -130,29 +126,13 @@ func mockSimpleUpgradeToVerLatest(s Session, ver int64) { // TestHook is exported for testing. var TestHook = TestCallback{} -// modifyBootstrapVersionForTest is used to get the bootstrap version from the SQL, i.e. skipping the mBootstrapKey method. -// This makes it easy to modify the bootstrap version through SQL for easy testing. -func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 { +// modifyBootstrapVersionForTest is used to test currentBootstrapVersion upgrade currentBootstrapVersion++. +func modifyBootstrapVersionForTest(ver int64) { if !*WithMockUpgrade { - return ver - } - - s, err := createSession(store) - var tmpVer int64 - if err == nil { - tmpVer, err = getBootstrapVersion(s) - } - if err == nil { - return tmpVer + return } - originErr := errors.Cause(err) - tErr, ok := originErr.(*terror.Error) - // If the error is ErrTableNotExists(mysql.global_variables), we can't replace the bootstrap version. - if !ok || tErr.Code() != mysql.ErrNoSuchTable { - logutil.BgLogger().Fatal("mock upgrade, check bootstrapped failed", zap.Error(err)) - } - return ver + currentBootstrapVersion = mockLatestVer } const ( @@ -175,7 +155,7 @@ func addMockBootstrapVersionForTest(s Session) { } else { bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest) } - currentBootstrapVersion++ + currentBootstrapVersion = mockLatestVer } // Callback is used for Test. diff --git a/session/session.go b/session/session.go index aee1f51ebd302..6c977418e0c1d 100644 --- a/session/session.go +++ b/session/session.go @@ -3663,7 +3663,8 @@ func getStoreBootstrapVersion(store kv.Storage) int64 { storeBootstrapped[store.UUID()] = true } - return modifyBootstrapVersionForTest(store, ver) + modifyBootstrapVersionForTest(ver) + return ver } func finishBootstrap(store kv.Storage) { diff --git a/session/sync_upgrade.go b/session/sync_upgrade.go new file mode 100644 index 0000000000000..59a9691f48148 --- /dev/null +++ b/session/sync_upgrade.go @@ -0,0 +1,120 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package session + +import ( + "context" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/syncer" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/owner" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// isContextDone checks if context is done. +func isContextDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + } + return false +} + +// SyncUpgradeState syncs upgrade state to etcd. +func SyncUpgradeState(s sessionctx.Context, timeout time.Duration) error { + ctx, cancelFunc := context.WithTimeout(context.Background(), timeout) + defer cancelFunc() + dom := domain.GetDomain(s) + err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading)) + logger := logutil.BgLogger().With(zap.String("category", "upgrading")) + if err != nil { + logger.Error("update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err)) + return err + } + + interval := 200 * time.Millisecond + for i := 0; ; i++ { + if isContextDone(ctx) { + logger.Error("get owner op failed", zap.Duration("timeout", timeout), zap.Error(err)) + return ctx.Err() + } + + var op owner.OpType + childCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + op, err = owner.GetOwnerOpValue(childCtx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap") + cancel() + if err == nil && op.String() == owner.OpGetUpgradingState.String() { + break + } + if i%10 == 0 { + logger.Warn("get owner op failed", zap.Stringer("state", op), zap.Error(err)) + } + time.Sleep(interval) + } + + logger.Info("update global state to upgrading", zap.String("state", syncer.StateUpgrading)) + return nil +} + +// SyncNormalRunning syncs normal state to etcd. +func SyncNormalRunning(s sessionctx.Context) error { + failpoint.Inject("mockResumeAllJobsFailed", func(val failpoint.Value) { + if val.(bool) { + dom := domain.GetDomain(s) + //nolint: errcheck + dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), syncer.NewStateInfo(syncer.StateNormalRunning)) + failpoint.Return(nil) + } + }) + + logger := logutil.BgLogger().With(zap.String("category", "upgrading")) + jobErrs, err := ddl.ResumeAllJobsBySystem(s) + if err != nil { + logger.Warn("resume all paused jobs failed", zap.Error(err)) + } + for _, e := range jobErrs { + logger.Warn("resume the job failed", zap.Error(e)) + } + + ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) + defer cancelFunc() + dom := domain.GetDomain(s) + err = dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateNormalRunning)) + if err != nil { + logger.Error("update global state to normal failed", zap.Error(err)) + return err + } + logger.Info("update global state to normal running finished") + return nil +} + +// IsUpgradingClusterState checks whether the global state is upgrading. +func IsUpgradingClusterState(s sessionctx.Context) (bool, error) { + dom := domain.GetDomain(s) + ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) + defer cancelFunc() + stateInfo, err := dom.DDL().StateSyncer().GetGlobalState(ctx) + if err != nil { + return false, err + } + + return stateInfo.State == syncer.StateUpgrading, nil +}