Skip to content

Commit

Permalink
*: support /upgrade/start and upgrade/finish APIs (#45887)
Browse files Browse the repository at this point in the history
close #45886
  • Loading branch information
zimulala authored Aug 16, 2023
1 parent 95b4dcc commit e1a017c
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 10 deletions.
3 changes: 3 additions & 0 deletions server/handler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "handler",
srcs = [
"tikv_handler.go",
"upgrade_handler.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/server/handler",
Expand All @@ -22,10 +23,12 @@ go_library(
"//tablecodec",
"//types",
"//util/codec",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_zap//:zap",
],
)
2 changes: 1 addition & 1 deletion server/handler/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 35,
shard_count = 36,
deps = [
"//config",
"//ddl",
Expand Down
85 changes: 85 additions & 0 deletions server/handler/tests/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,3 +1241,88 @@ func TestSetLabelsConcurrentWithGetLabel(t *testing.T) {
conf.Labels = map[string]string{}
})
}

func TestUpgrade(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()
ts.startServer(t)
defer ts.stopServer(t)

resp, err := ts.FetchStatus("/upgrade")
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
require.NoError(t, resp.Body.Close())

require.NoError(t, err)
require.NotNil(t, resp)
// test upgrade start
resp, err = ts.PostStatus("/upgrade", "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(`op=start`)))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err := httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"success!\"", string(body))
// check the result
se, err := session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err := session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.True(t, isUpgrading)

// Do start upgrade again.
resp, err = ts.PostStatus("/upgrade", "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(`op=start`)))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"Be upgrading.\"\"success!\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.True(t, isUpgrading)

// test upgrade finish
resp, err = ts.PostStatus("/upgrade", "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(`op=finish`)))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"success!\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.False(t, isUpgrading)

// Do finish upgrade again.
resp, err = ts.PostStatus("/upgrade", "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(`op=finish`)))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"Be normal.\"\"success!\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.False(t, isUpgrading)
}
110 changes: 110 additions & 0 deletions server/handler/upgrade_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler

import (
"net/http"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// ClusterUpgradeHandler is the handler for upgrading cluster.
type ClusterUpgradeHandler struct {
store kv.Storage
}

// NewClusterUpgradeHandler creates a new ClusterUpgradeHandler.
func NewClusterUpgradeHandler(store kv.Storage) *ClusterUpgradeHandler {
return &ClusterUpgradeHandler{store: store}
}

// ServeHTTP handles request of ddl server info.
func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
WriteError(w, errors.Errorf("This API only support POST method"))
return
}

var err error
var hasDone bool
op := req.FormValue("op")
switch op {
case "start":
hasDone, err = h.startUpgrade()
case "finish":
hasDone, err = h.finishUpgrade()
default:
WriteError(w, errors.Errorf("wrong operation:%s", op))
return
}

if err != nil {
WriteError(w, err)
return
}
if hasDone {
switch op {
case "start":
WriteData(w, "Be upgrading.")
case "finish":
WriteData(w, "Be normal.")
}
}
WriteData(w, "success!")
logutil.Logger(req.Context()).Info("upgrade op success",
zap.String("category", "upgrading"), zap.String("op", req.FormValue("op")), zap.Bool("hasDone", hasDone))
}

func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) {
se, err := session.CreateSession(h.store)
if err != nil {
return false, err
}
defer se.Close()

isUpgrading, err := session.IsUpgradingClusterState(se)
if err != nil {
return false, err
}
if isUpgrading {
return true, nil
}

err = session.SyncUpgradeState(se)
return false, err
}

func (h ClusterUpgradeHandler) finishUpgrade() (hasDone bool, err error) {
se, err := session.CreateSession(h.store)
if err != nil {
return false, err
}
defer se.Close()

isUpgrading, err := session.IsUpgradingClusterState(se)
if err != nil {
return false, err
}
if !isUpgrading {
return true, nil
}

err = session.SyncNormalRunning(se)
return false, err
}
4 changes: 4 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/server/handler"
"github.com/pingcap/tidb/server/handler/optimizor"
"github.com/pingcap/tidb/server/handler/tikvhandler"
"github.com/pingcap/tidb/server/handler/ttlhandler"
Expand Down Expand Up @@ -246,6 +247,9 @@ func (s *Server) startHTTPServer() {
// HTTP path for get table tiflash replica info.
router.Handle("/tiflash/replica-deprecated", tikvhandler.NewFlashReplicaHandler(tikvHandlerTool))

// HTTP path for upgrade operations.
router.Handle("/upgrade", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations")

if s.cfg.Store == "tikv" {
// HTTP path for tikv.
router.Handle("/tables/{db}/{table}/regions", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRegions))
Expand Down
39 changes: 30 additions & 9 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@ func upgrade(s Session) {
}

if ver >= int64(SupportUpgradeStateVer) {
syncUpgradeState(s)
terror.MustNil(SyncUpgradeState(s))
}
if isNull {
upgradeToVer99Before(s)
Expand All @@ -1224,7 +1224,7 @@ func upgrade(s Session) {
upgradeToVer99After(s)
}
if ver >= int64(SupportUpgradeStateVer) {
syncNormalRunning(s)
terror.MustNil(SyncNormalRunning(s))
}

variable.DDLForce2Queue.Store(false)
Expand Down Expand Up @@ -1253,14 +1253,16 @@ func upgrade(s Session) {
}
}

func syncUpgradeState(s Session) {
// SyncUpgradeState syncs upgrade state to etcd.
func SyncUpgradeState(s Session) error {
totalInterval := time.Duration(internalSQLTimeout) * time.Second
ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval)
defer cancelFunc()
dom := domain.GetDomain(s)
err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading))
if err != nil {
logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err))
logutil.BgLogger().Error("update global state failed", zap.String("category", "upgrading"), zap.String("state", syncer.StateUpgrading), zap.Error(err))
return err
}

interval := 200 * time.Millisecond
Expand All @@ -1271,7 +1273,8 @@ func syncUpgradeState(s Session) {
break
}
if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
logutil.BgLogger().Error("get owner op failed", zap.String("category", "upgrading"), zap.Stringer("state", op), zap.Error(err))
return err
}
if i%10 == 0 {
logutil.BgLogger().Warn("get owner op failed", zap.String("category", "upgrading"), zap.Stringer("state", op), zap.Error(err))
Expand All @@ -1298,21 +1301,24 @@ func syncUpgradeState(s Session) {
}

if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
logutil.BgLogger().Error("pause all jobs failed", zap.String("category", "upgrading"), zap.Strings("errs", jobErrStrs), zap.Error(err))
return err
}
logutil.BgLogger().Warn("pause all jobs failed", zap.String("category", "upgrading"), zap.Strings("errs", jobErrStrs), zap.Error(err))
time.Sleep(interval)
}
logutil.BgLogger().Info("update global state to upgrading", zap.String("category", "upgrading"), zap.String("state", syncer.StateUpgrading))
return nil
}

func syncNormalRunning(s Session) {
// SyncNormalRunning syncs normal state to etcd.
func SyncNormalRunning(s Session) error {
failpoint.Inject("mockResumeAllJobsFailed", func(val failpoint.Value) {
if val.(bool) {
dom := domain.GetDomain(s)
//nolint: errcheck
dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), syncer.NewStateInfo(syncer.StateNormalRunning))
failpoint.Return()
failpoint.Return(nil)
}
})

Expand All @@ -1329,9 +1335,24 @@ func syncNormalRunning(s Session) {
dom := domain.GetDomain(s)
err = dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateNormalRunning))
if err != nil {
logutil.BgLogger().Fatal("[upgrading] update global state to normal failed", zap.Error(err))
logutil.BgLogger().Error("update global state to normal failed", zap.String("category", "upgrading"), zap.Error(err))
return err
}
logutil.BgLogger().Info("update global state to normal running finished", zap.String("category", "upgrading"))
return nil
}

// IsUpgradingClusterState checks whether the global state is upgrading.
func IsUpgradingClusterState(s Session) (bool, error) {
dom := domain.GetDomain(s)
ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
defer cancelFunc()
stateInfo, err := dom.DDL().StateSyncer().GetGlobalState(ctx)
if err != nil {
return false, err
}

return stateInfo.State == syncer.StateUpgrading, nil
}

// checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB.
Expand Down

0 comments on commit e1a017c

Please sign in to comment.