Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ctl: replace gc_safepoint call with PD HTTP SDK #8504

Merged
merged 36 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3d65bc8
gc_safepoint HTTP SDK
JackL9u Aug 7, 2024
c307b89
fix an indentation mismatch
JackL9u Aug 7, 2024
637c50c
fix some coding style issues
JackL9u Aug 8, 2024
002862c
format the code
JackL9u Aug 8, 2024
419b903
fix a var-name issue
JackL9u Aug 8, 2024
6ac2bc2
add test functions
JackL9u Aug 8, 2024
8a8a40f
add test functions for gc safepoint call
JackL9u Aug 8, 2024
cc78799
fix coding style for testing suite
JackL9u Aug 8, 2024
c88639b
fix coding style for testing suite again
JackL9u Aug 8, 2024
870f9cc
changed the test case for get SafePoint
JackL9u Aug 8, 2024
be5c4a1
edit test function
JackL9u Aug 8, 2024
d6ed9d8
edited some testcases
JackL9u Aug 8, 2024
2d278ca
put get/delete safepoint test into a single function
JackL9u Aug 8, 2024
6c7de04
removed unnecessary comments
JackL9u Aug 8, 2024
11d5353
made other modifications
JackL9u Aug 9, 2024
acec910
changed test funciton name
JackL9u Aug 9, 2024
1a1912a
do some experiments
JackL9u Aug 9, 2024
83de36f
delete my test function
JackL9u Aug 9, 2024
cd0a7d5
add an empty test funciton
JackL9u Aug 9, 2024
3858cb3
remove the test delete logic from test funciton
JackL9u Aug 9, 2024
11be503
remove the test delete logic from test funciton, and move the test lo…
JackL9u Aug 9, 2024
dc62cc2
add requirePDClient to cobra command
JackL9u Aug 9, 2024
7adce30
add the delete test back
JackL9u Aug 9, 2024
014ec40
try again
JackL9u Aug 9, 2024
4666fb5
add sorting logic to gc_safepoint test
JackL9u Aug 15, 2024
670fe6a
add a second-round get check after safepoints are deleted
JackL9u Aug 15, 2024
fb8a5b5
remove the requirePDClient in delete subcommand
JackL9u Aug 15, 2024
7ef43f2
fix a typo
JackL9u Aug 15, 2024
98b09b7
added tests
JackL9u Aug 15, 2024
785e975
add the delete gc_worker check
JackL9u Aug 15, 2024
f522d45
add a comment
JackL9u Aug 15, 2024
6f4d633
modified the delete gc_worker test
JackL9u Aug 16, 2024
e3517c8
run the tests again
JackL9u Aug 16, 2024
d25953b
add a comment
JackL9u Aug 16, 2024
dded6c8
add delete non-exist safepoints tests
JackL9u Aug 21, 2024
7965d10
Merge branch 'master' into dev1
ti-chi-bot[bot] Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
operators = "/pd/api/v1/operators"
safepoint = "/pd/api/v1/gc/safepoint"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
// Keyspace
Expand Down Expand Up @@ -215,3 +216,8 @@ func GetUpdateKeyspaceConfigURL(keyspaceName string) string {
func GetKeyspaceMetaByNameURL(keyspaceName string) string {
return fmt.Sprintf(GetKeyspaceMetaByName, keyspaceName)
}

// GetDeleteSafePointURI returns the URI for delete safepoint service
func GetDeleteSafePointURI(serviceID string) string {
return fmt.Sprintf("%s/%s", safepoint, serviceID)
}
30 changes: 30 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetPDVersion(context.Context) (string, error)
GetGCSafePoint(context.Context) (ListServiceGCSafepoint, error)
DeleteGCSafePoint(context.Context, string) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error)
GetMicroServicePrimary(context.Context, string) (string, error)
Expand Down Expand Up @@ -1024,3 +1026,31 @@
}
return &keyspaceMetaPB, nil
}

// GetGCSafePoint gets the GC safe point list.
func (c *client) GetGCSafePoint(ctx context.Context) (ListServiceGCSafepoint, error) {
var gcSafePoint ListServiceGCSafepoint
err := c.request(ctx, newRequestInfo().
WithName(GetGCSafePointName).
WithURI(safepoint).
WithMethod(http.MethodGet).
WithResp(&gcSafePoint))
if err != nil {
return gcSafePoint, err

Check warning on line 1039 in client/http/interface.go

View check run for this annotation

Codecov / codecov/patch

client/http/interface.go#L1039

Added line #L1039 was not covered by tests
}
return gcSafePoint, nil
}

// DeleteGCSafePoint deletes a GC safe point with the given service ID.
func (c *client) DeleteGCSafePoint(ctx context.Context, serviceID string) (string, error) {
var msg string
err := c.request(ctx, newRequestInfo().
WithName(DeleteGCSafePointName).
WithURI(GetDeleteSafePointURI(serviceID)).
WithMethod(http.MethodDelete).
WithResp(&msg))
if err != nil {
return msg, err
}
return msg, nil
}
2 changes: 2 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
deleteOperators = "DeleteOperators"
UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType"
GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName"
GetGCSafePointName = "GetGCSafePoint"
DeleteGCSafePointName = "DeleteGCSafePoint"
)

type requestInfo struct {
Expand Down
14 changes: 14 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ import (
pd "github.com/tikv/pd/client"
)

// NOTE: This type is in sync with pd/pkg/storage/endpoint/gc_safe_point.go
JackL9u marked this conversation as resolved.
Show resolved Hide resolved
type ServiceSafePoint struct {
ServiceID string `json:"service_id"`
ExpiredAt int64 `json:"expired_at"`
SafePoint uint64 `json:"safe_point"`
}

// NOTE: This type is in sync with pd/server/api/service_gc_safepoint.go
JackL9u marked this conversation as resolved.
Show resolved Hide resolved
type ListServiceGCSafepoint struct {
ServiceGCSafepoints []*ServiceSafePoint `json:"service_gc_safe_points"`
MinServiceGcSafepoint uint64 `json:"min_service_gc_safe_point,omitempty"`
GCSafePoint uint64 `json:"gc_safe_point"`
}

// ClusterState saves some cluster state information.
// NOTE: This type sync with https://github.com/tikv/pd/blob/5eae459c01a797cbd0c416054c6f0cad16b8740a/server/cluster/cluster.go#L173
type ClusterState struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

// ServiceSafePoint is the safepoint for a specific service
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
// This type is in sync with `client/http/types.go`.
type ServiceSafePoint struct {
ServiceID string `json:"service_id"`
ExpiredAt int64 `json:"expired_at"`
Expand Down
1 change: 1 addition & 0 deletions server/api/service_gc_safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func newServiceGCSafepointHandler(svr *server.Server, rd *render.Render) *servic

// ListServiceGCSafepoint is the response for list service GC safepoint.
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
// This type is in sync with `pd/client/http/types.go`.
JackL9u marked this conversation as resolved.
Show resolved Hide resolved
type ListServiceGCSafepoint struct {
ServiceGCSafepoints []*endpoint.ServiceSafePoint `json:"service_gc_safe_points"`
MinServiceGcSafepoint uint64 `json:"min_service_gc_safe_point,omitempty"`
Expand Down
75 changes: 75 additions & 0 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ import (
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server/api"
"github.com/tikv/pd/tests"
)

Expand Down Expand Up @@ -835,3 +837,76 @@ func (suite *httpClientTestSuite) TestRetryOnLeaderChange() {
cancel()
wg.Wait()
}

func (suite *httpClientTestSuite) TestGetGCSafePoint() {
re := suite.Require()
client := suite.client
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

// adding some safepoints to the server
list := &api.ListServiceGCSafepoint{
ServiceGCSafepoints: []*endpoint.ServiceSafePoint{
{
ServiceID: "AAA",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 1,
},
{
ServiceID: "BBB",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 2,
},
{
ServiceID: "CCC",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 3,
},
},
GCSafePoint: 1,
MinServiceGcSafepoint: 1,
}

storage := suite.cluster.GetLeaderServer().GetServer().GetStorage()
for _, ssp := range list.ServiceGCSafepoints {
err := storage.SaveServiceGCSafePoint(ssp)
re.NoError(err)
}
storage.SaveGCSafePoint(1)

// get the safepoints and start testing
l, err := client.GetGCSafePoint(ctx)
re.NoError(err)

re.Equal(uint64(1), l.GCSafePoint)
re.Equal(uint64(1), l.MinServiceGcSafepoint)
re.Len(l.ServiceGCSafepoints, 3)

// sort the gc safepoints based on order of ServiceID
sort.Slice(l.ServiceGCSafepoints, func(i, j int) bool {
return l.ServiceGCSafepoints[i].ServiceID < l.ServiceGCSafepoints[j].ServiceID
})

for i, val := range l.ServiceGCSafepoints {
re.Equal(list.ServiceGCSafepoints[i].ServiceID, val.ServiceID)
re.Equal(list.ServiceGCSafepoints[i].SafePoint, val.SafePoint)
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}

// delete the safepoints
for i := 0; i < 3; i++ {
msg, err := client.DeleteGCSafePoint(ctx, list.ServiceGCSafepoints[i].ServiceID)
re.NoError(err)
re.Equal("Delete service GC safepoint successfully.", msg)
}

_, err = client.DeleteGCSafePoint(ctx, "gc_worker")
re.Error(err)
JackL9u marked this conversation as resolved.
Show resolved Hide resolved

// check that the safepoitns are indeed deleted
l, err = client.GetGCSafePoint(ctx)
re.NoError(err)

re.Equal(uint64(1), l.GCSafePoint)
re.Equal(uint64(0), l.MinServiceGcSafepoint)
re.Empty(l.ServiceGCSafepoints)
}
34 changes: 8 additions & 26 deletions tools/pd-ctl/pdctl/command/gc_safepoint_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,18 @@
package command

import (
"encoding/json"
"net/http"
"sort"

"github.com/spf13/cobra"
"github.com/tikv/pd/server/api"
)

var (
serviceGCSafepointPrefix = "pd/api/v1/gc/safepoint"
)

// NewServiceGCSafepointCommand return a service gc safepoint subcommand of rootCmd
func NewServiceGCSafepointCommand() *cobra.Command {
l := &cobra.Command{
Use: "service-gc-safepoint",
Short: "show all service gc safepoint",
Run: showSSPs,
Use: "service-gc-safepoint",
Short: "show all service gc safepoint",
PersistentPreRunE: requirePDClient,
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
Run: showSSPs,
}
l.AddCommand(NewDeleteServiceGCSafepointCommand())
return l
Expand All @@ -50,38 +44,26 @@ func NewDeleteServiceGCSafepointCommand() *cobra.Command {
}

func showSSPs(cmd *cobra.Command, _ []string) {
r, err := doRequest(cmd, serviceGCSafepointPrefix, http.MethodGet, http.Header{})
safepoint, err := PDCli.GetGCSafePoint(cmd.Context())
if err != nil {
cmd.Printf("Failed to get service GC safepoint: %s\n", err)
return
}
var safepoint api.ListServiceGCSafepoint
if err := json.Unmarshal([]byte(r), &safepoint); err != nil {
cmd.Printf("Failed to unmarshal service GC safepoint: %s\n", err)
return
}
sort.Slice(safepoint.ServiceGCSafepoints, func(i, j int) bool {
return safepoint.ServiceGCSafepoints[i].SafePoint < safepoint.ServiceGCSafepoints[j].SafePoint
})
data, err := json.MarshalIndent(safepoint, "", " ")
if err != nil {
cmd.Printf("Failed to marshal service GC safepoint: %s\n", err)
return
}
cmd.Println(string(data))
jsonPrint(cmd, safepoint)
}

func deleteSSP(cmd *cobra.Command, args []string) {
if len(args) != 1 {
cmd.Usage()
return
}
serviceID := args[0]
deleteURL := serviceGCSafepointPrefix + "/" + serviceID
r, err := doRequest(cmd, deleteURL, http.MethodDelete, http.Header{})
r, err := PDCli.DeleteGCSafePoint(cmd.Context(), args[0])
if err != nil {
cmd.Printf("Failed to delete service GC safepoint: %s\n", err)
return
}
cmd.Println(r)
jsonPrint(cmd, r)
}
130 changes: 130 additions & 0 deletions tools/pd-ctl/tests/safepoint/safepoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2019 TiKV Project Authors.
JackL9u marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package safepoint_test

import (
"context"
"encoding/json"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server/api"
pdTests "github.com/tikv/pd/tests"
ctl "github.com/tikv/pd/tools/pd-ctl/pdctl"
"github.com/tikv/pd/tools/pd-ctl/tests"
)

func TestSafepoint(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := pdTests.NewTestCluster(ctx, 3)
re.NoError(err)
defer tc.Destroy()
err = tc.RunInitialServers()
re.NoError(err)
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
re.NoError(leaderServer.BootstrapCluster())
pdAddr := tc.GetConfig().GetClientURL()
cmd := ctl.GetRootCmd()

// add some gc_safepoint to the server
list := &api.ListServiceGCSafepoint{
ServiceGCSafepoints: []*endpoint.ServiceSafePoint{
{
ServiceID: "AAA",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 1,
},
{
ServiceID: "BBB",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 2,
},
{
ServiceID: "CCC",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 3,
},
},
GCSafePoint: 1,
MinServiceGcSafepoint: 1,
}

storage := leaderServer.GetServer().GetStorage()
for _, ssp := range list.ServiceGCSafepoints {
err := storage.SaveServiceGCSafePoint(ssp)
re.NoError(err)
}
storage.SaveGCSafePoint(1)

// get the safepoints
args := []string{"-u", pdAddr, "service-gc-safepoint"}
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)

// create an container to hold the received values
var l api.ListServiceGCSafepoint
re.NoError(json.Unmarshal(output, &l))

// test if the points are what we expected
re.Equal(uint64(1), l.GCSafePoint)
re.Equal(uint64(1), l.MinServiceGcSafepoint)
re.Len(l.ServiceGCSafepoints, 3)

// sort the gc safepoints based on order of ServiceID
sort.Slice(l.ServiceGCSafepoints, func(i, j int) bool {
return l.ServiceGCSafepoints[i].ServiceID < l.ServiceGCSafepoints[j].ServiceID
})

for i, val := range l.ServiceGCSafepoints {
re.Equal(list.ServiceGCSafepoints[i].ServiceID, val.ServiceID)
re.Equal(list.ServiceGCSafepoints[i].SafePoint, val.SafePoint)
}

// delete the safepoints
for i := 0; i < 3; i++ {
args = []string{"-u", pdAddr, "service-gc-safepoint", "delete", list.ServiceGCSafepoints[i].ServiceID}
output, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)
var msg string
re.NoError(json.Unmarshal(output, &msg))
re.Equal("Delete service GC safepoint successfully.", msg)
}

// do a second round of get safepoints to ensure that the safe points are indeed deleted
args = []string{"-u", pdAddr, "service-gc-safepoint"}
output, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)

var ll api.ListServiceGCSafepoint
re.NoError(json.Unmarshal(output, &ll))

re.Equal(uint64(1), ll.GCSafePoint)
re.Equal(uint64(0), ll.MinServiceGcSafepoint)
re.Empty(ll.ServiceGCSafepoints)

// try delete the "gc_worker", should get an error message
args = []string{"-u", pdAddr, "service-gc-safepoint", "delete", "gc_worker"}
output, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)

// output should be an error message
re.Equal("Failed to delete service GC safepoint: request pd http api failed with status: '500 Internal Server Error', body: '\"cannot remove service safe point of gc_worker\"'\n", string(output))
}