Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Implement /upgrade/start and upgrade/finish APIs #47000

Merged
merged 6 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,43 +143,48 @@ func NewMockStateSyncer() syncer.StateSyncer {
return &MockStateSyncer{}
}

// clusterState mocks cluster state.
// We move it from MockStateSyncer to here. Because we want to make it unaffected by ddl close.
var clusterState *atomicutil.Pointer[syncer.StateInfo]

// MockStateSyncer is a mock state syncer, it is exported for testing.
type MockStateSyncer struct {
clusterState *atomicutil.Pointer[syncer.StateInfo]
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
}

// Init implements StateSyncer.Init interface.
func (s *MockStateSyncer) Init(context.Context) error {
s.globalVerCh = make(chan clientv3.WatchResponse, 1)
s.mockSession = make(chan struct{}, 1)
state := syncer.NewStateInfo(syncer.StateNormalRunning)
s.clusterState = atomicutil.NewPointer(state)
if clusterState == nil {
clusterState = atomicutil.NewPointer(state)
}
return nil
}

// UpdateGlobalState implements StateSyncer.UpdateGlobalState interface.
func (s *MockStateSyncer) UpdateGlobalState(_ context.Context, stateInfo *syncer.StateInfo) error {
failpoint.Inject("mockUpgradingState", func(val failpoint.Value) {
if val.(bool) {
s.clusterState.Store(stateInfo)
clusterState.Store(stateInfo)
failpoint.Return(nil)
}
})
s.globalVerCh <- clientv3.WatchResponse{}
s.clusterState.Store(stateInfo)
clusterState.Store(stateInfo)
return nil
}

// GetGlobalState implements StateSyncer.GetGlobalState interface.
func (s *MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) {
return s.clusterState.Load(), nil
func (*MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) {
return clusterState.Load(), nil
}

// IsUpgradingState implements StateSyncer.IsUpgradingState interface.
func (s *MockStateSyncer) IsUpgradingState() bool {
return s.clusterState.Load().State == syncer.StateUpgrading
func (*MockStateSyncer) IsUpgradingState() bool {
return clusterState.Load().State == syncer.StateUpgrading
}

// WatchChan implements StateSyncer.WatchChan interface.
Expand Down
12 changes: 11 additions & 1 deletion docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,6 @@ timezone.*
# reset the size of the ballast object (2GB in this example)
curl -v -X POST -d "2147483648" http://{TiDBIP}:10080/debug/ballast-object-sz
```


1. Set deadlock history table capacity

Expand All @@ -591,3 +590,14 @@ timezone.*
```shell
curl -X POST -d "transaction_summary_capacity={number}" http://{TiDBIP}:10080/settings
```

1. Send upgrade operations to the cluster. The operations here include `start` and `finish`.

```shell
curl -X POST http://{TiDBIP}:10080/upgrade/{op}
```

```shell
$curl -X POST http://127.0.0.1:10080/upgrade/start
"success!"
```
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"stat.go",
"statistics_handler.go",
"tokenlimiter.go",
"upgrade_handler.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/server",
Expand Down
Empty file added server/handler/BUILD.bazel
Empty file.
Empty file.
85 changes: 85 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,3 +1243,88 @@ func TestSetLabelsConcurrentWithGetLabel(t *testing.T) {
conf.Labels = map[string]string{}
})
}

func TestUpgrade(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()
ts.startServer(t)
defer ts.stopServer(t)

resp, err := ts.fetchStatus("/upgrade/start")
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
require.NoError(t, resp.Body.Close())

require.NoError(t, err)
require.NotNil(t, resp)
// test upgrade start
resp, err = ts.postStatus("/upgrade/start", "application/x-www-form-urlencoded", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err := httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"success!\"", string(body))
// check the result
se, err := session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err := session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.True(t, isUpgrading)

// Do start upgrade again.
resp, err = ts.postStatus("/upgrade/start", "application/x-www-form-urlencoded", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"It's a duplicated operation and the cluster is already in upgrading state.\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.True(t, isUpgrading)

// test upgrade finish
resp, err = ts.postStatus("/upgrade/finish", "application/x-www-form-urlencoded", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"success!\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.False(t, isUpgrading)

// Do finish upgrade again.
resp, err = ts.postStatus("/upgrade/finish", "application/x-www-form-urlencoded", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"It's a duplicated operation and the cluster is already in normal state.\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.False(t, isUpgrading)
}
3 changes: 3 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ func (s *Server) startHTTPServer() {
// HTTP path for get table tiflash replica info.
router.Handle("/tiflash/replica-deprecated", flashReplicaHandler{tikvHandlerTool})

// HTTP path for upgrade operations.
router.Handle("/upgrade/{op}", NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations")

if s.cfg.Store == "tikv" {
// HTTP path for tikv.
router.Handle("/tables/{db}/{table}/regions", tableHandler{tikvHandlerTool, opTableRegions})
Expand Down
Loading