diff --git a/client/http/api.go b/client/http/api.go index 3376a48770d..d1bce99f4f9 100644 --- a/client/http/api.go +++ b/client/http/api.go @@ -79,6 +79,7 @@ const ( Status = "/pd/api/v1/status" Version = "/pd/api/v1/version" operators = "/pd/api/v1/operators" + safepoint = "/pd/api/v1/gc/safepoint" // Micro Service microServicePrefix = "/pd/api/v2/ms" // Keyspace @@ -215,3 +216,8 @@ func GetUpdateKeyspaceConfigURL(keyspaceName string) string { func GetKeyspaceMetaByNameURL(keyspaceName string) string { return fmt.Sprintf(GetKeyspaceMetaByName, keyspaceName) } + +// GetDeleteSafePointURI returns the URI for delete safepoint service +func GetDeleteSafePointURI(serviceID string) string { + return fmt.Sprintf("%s/%s", safepoint, serviceID) +} diff --git a/client/http/interface.go b/client/http/interface.go index cd9fc22702e..f5cd1a38211 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -100,6 +100,8 @@ type Client interface { /* Other interfaces */ GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) GetPDVersion(context.Context) (string, error) + GetGCSafePoint(context.Context) (ListServiceGCSafepoint, error) + DeleteGCSafePoint(context.Context, string) (string, error) /* Micro Service interfaces */ GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error) GetMicroServicePrimary(context.Context, string) (string, error) @@ -1024,3 +1026,31 @@ func (c *client) GetKeyspaceMetaByName(ctx context.Context, keyspaceName string) } return &keyspaceMetaPB, nil } + +// GetGCSafePoint gets the GC safe point list. +func (c *client) GetGCSafePoint(ctx context.Context) (ListServiceGCSafepoint, error) { + var gcSafePoint ListServiceGCSafepoint + err := c.request(ctx, newRequestInfo(). + WithName(GetGCSafePointName). + WithURI(safepoint). + WithMethod(http.MethodGet). + WithResp(&gcSafePoint)) + if err != nil { + return gcSafePoint, err + } + return gcSafePoint, nil +} + +// DeleteGCSafePoint deletes a GC safe point with the given service ID. +func (c *client) DeleteGCSafePoint(ctx context.Context, serviceID string) (string, error) { + var msg string + err := c.request(ctx, newRequestInfo(). + WithName(DeleteGCSafePointName). + WithURI(GetDeleteSafePointURI(serviceID)). + WithMethod(http.MethodDelete). + WithResp(&msg)) + if err != nil { + return msg, err + } + return msg, nil +} diff --git a/client/http/request_info.go b/client/http/request_info.go index 783220bcc60..94f71c6186e 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -85,6 +85,8 @@ const ( deleteOperators = "DeleteOperators" UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType" GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName" + GetGCSafePointName = "GetGCSafePoint" + DeleteGCSafePointName = "DeleteGCSafePoint" ) type requestInfo struct { diff --git a/client/http/types.go b/client/http/types.go index 55f9b65caad..4bc60978a0e 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -25,6 +25,22 @@ import ( pd "github.com/tikv/pd/client" ) +// ServiceSafePoint is the safepoint for a specific service +// NOTE: This type is in sync with pd/pkg/storage/endpoint/gc_safe_point.go +type ServiceSafePoint struct { + ServiceID string `json:"service_id"` + ExpiredAt int64 `json:"expired_at"` + SafePoint uint64 `json:"safe_point"` +} + +// ListServiceGCSafepoint is the response for list service GC safepoint. +// NOTE: This type is in sync with pd/server/api/service_gc_safepoint.go +type ListServiceGCSafepoint struct { + ServiceGCSafepoints []*ServiceSafePoint `json:"service_gc_safe_points"` + MinServiceGcSafepoint uint64 `json:"min_service_gc_safe_point,omitempty"` + GCSafePoint uint64 `json:"gc_safe_point"` +} + // ClusterState saves some cluster state information. // NOTE: This type sync with https://github.com/tikv/pd/blob/5eae459c01a797cbd0c416054c6f0cad16b8740a/server/cluster/cluster.go#L173 type ClusterState struct { diff --git a/pkg/storage/endpoint/gc_safe_point.go b/pkg/storage/endpoint/gc_safe_point.go index 8d59d827fa4..7b0b0bf86a7 100644 --- a/pkg/storage/endpoint/gc_safe_point.go +++ b/pkg/storage/endpoint/gc_safe_point.go @@ -28,6 +28,7 @@ import ( // ServiceSafePoint is the safepoint for a specific service // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +// This type is in sync with `client/http/types.go`. type ServiceSafePoint struct { ServiceID string `json:"service_id"` ExpiredAt int64 `json:"expired_at"` diff --git a/server/api/service_gc_safepoint.go b/server/api/service_gc_safepoint.go index d6bb153eb6f..ca29f9c352f 100644 --- a/server/api/service_gc_safepoint.go +++ b/server/api/service_gc_safepoint.go @@ -38,6 +38,7 @@ func newServiceGCSafepointHandler(svr *server.Server, rd *render.Render) *servic // ListServiceGCSafepoint is the response for list service GC safepoint. // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +// This type is in sync with `pd/client/http/types.go`. type ListServiceGCSafepoint struct { ServiceGCSafepoints []*endpoint.ServiceSafePoint `json:"service_gc_safe_points"` MinServiceGcSafepoint uint64 `json:"min_service_gc_safe_point,omitempty"` diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index fd8b65f01ba..fe0962012e6 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -37,9 +37,11 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/versioninfo" + "github.com/tikv/pd/server/api" "github.com/tikv/pd/tests" ) @@ -835,3 +837,83 @@ func (suite *httpClientTestSuite) TestRetryOnLeaderChange() { cancel() wg.Wait() } + +func (suite *httpClientTestSuite) TestGetGCSafePoint() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + // adding some safepoints to the server + list := &api.ListServiceGCSafepoint{ + ServiceGCSafepoints: []*endpoint.ServiceSafePoint{ + { + ServiceID: "AAA", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 1, + }, + { + ServiceID: "BBB", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 2, + }, + { + ServiceID: "CCC", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 3, + }, + }, + GCSafePoint: 1, + MinServiceGcSafepoint: 1, + } + + storage := suite.cluster.GetLeaderServer().GetServer().GetStorage() + for _, ssp := range list.ServiceGCSafepoints { + err := storage.SaveServiceGCSafePoint(ssp) + re.NoError(err) + } + storage.SaveGCSafePoint(1) + + // get the safepoints and start testing + l, err := client.GetGCSafePoint(ctx) + re.NoError(err) + + re.Equal(uint64(1), l.GCSafePoint) + re.Equal(uint64(1), l.MinServiceGcSafepoint) + re.Len(l.ServiceGCSafepoints, 3) + + // sort the gc safepoints based on order of ServiceID + sort.Slice(l.ServiceGCSafepoints, func(i, j int) bool { + return l.ServiceGCSafepoints[i].ServiceID < l.ServiceGCSafepoints[j].ServiceID + }) + + for i, val := range l.ServiceGCSafepoints { + re.Equal(list.ServiceGCSafepoints[i].ServiceID, val.ServiceID) + re.Equal(list.ServiceGCSafepoints[i].SafePoint, val.SafePoint) + } + + // delete the safepoints + for i := 0; i < 3; i++ { + msg, err := client.DeleteGCSafePoint(ctx, list.ServiceGCSafepoints[i].ServiceID) + re.NoError(err) + re.Equal("Delete service GC safepoint successfully.", msg) + } + + // check that the safepoitns are indeed deleted + l, err = client.GetGCSafePoint(ctx) + re.NoError(err) + + re.Equal(uint64(1), l.GCSafePoint) + re.Equal(uint64(0), l.MinServiceGcSafepoint) + re.Empty(l.ServiceGCSafepoints) + + // try delete gc_worker, should get an error + _, err = client.DeleteGCSafePoint(ctx, "gc_worker") + re.Error(err) + + // try delete some non-exist safepoints, should return normally + var msg string + msg, err = client.DeleteGCSafePoint(ctx, "non_exist") + re.NoError(err) + re.Equal("Delete service GC safepoint successfully.", msg) +} diff --git a/tools/pd-ctl/pdctl/command/gc_safepoint_command.go b/tools/pd-ctl/pdctl/command/gc_safepoint_command.go index f4a6b6fcfd0..9a07d92937f 100644 --- a/tools/pd-ctl/pdctl/command/gc_safepoint_command.go +++ b/tools/pd-ctl/pdctl/command/gc_safepoint_command.go @@ -15,24 +15,18 @@ package command import ( - "encoding/json" - "net/http" "sort" "github.com/spf13/cobra" - "github.com/tikv/pd/server/api" -) - -var ( - serviceGCSafepointPrefix = "pd/api/v1/gc/safepoint" ) // NewServiceGCSafepointCommand return a service gc safepoint subcommand of rootCmd func NewServiceGCSafepointCommand() *cobra.Command { l := &cobra.Command{ - Use: "service-gc-safepoint", - Short: "show all service gc safepoint", - Run: showSSPs, + Use: "service-gc-safepoint", + Short: "show all service gc safepoint", + PersistentPreRunE: requirePDClient, + Run: showSSPs, } l.AddCommand(NewDeleteServiceGCSafepointCommand()) return l @@ -50,25 +44,15 @@ func NewDeleteServiceGCSafepointCommand() *cobra.Command { } func showSSPs(cmd *cobra.Command, _ []string) { - r, err := doRequest(cmd, serviceGCSafepointPrefix, http.MethodGet, http.Header{}) + safepoint, err := PDCli.GetGCSafePoint(cmd.Context()) if err != nil { cmd.Printf("Failed to get service GC safepoint: %s\n", err) return } - var safepoint api.ListServiceGCSafepoint - if err := json.Unmarshal([]byte(r), &safepoint); err != nil { - cmd.Printf("Failed to unmarshal service GC safepoint: %s\n", err) - return - } sort.Slice(safepoint.ServiceGCSafepoints, func(i, j int) bool { return safepoint.ServiceGCSafepoints[i].SafePoint < safepoint.ServiceGCSafepoints[j].SafePoint }) - data, err := json.MarshalIndent(safepoint, "", " ") - if err != nil { - cmd.Printf("Failed to marshal service GC safepoint: %s\n", err) - return - } - cmd.Println(string(data)) + jsonPrint(cmd, safepoint) } func deleteSSP(cmd *cobra.Command, args []string) { @@ -76,12 +60,10 @@ func deleteSSP(cmd *cobra.Command, args []string) { cmd.Usage() return } - serviceID := args[0] - deleteURL := serviceGCSafepointPrefix + "/" + serviceID - r, err := doRequest(cmd, deleteURL, http.MethodDelete, http.Header{}) + r, err := PDCli.DeleteGCSafePoint(cmd.Context(), args[0]) if err != nil { cmd.Printf("Failed to delete service GC safepoint: %s\n", err) return } - cmd.Println(r) + jsonPrint(cmd, r) } diff --git a/tools/pd-ctl/tests/safepoint/safepoint_test.go b/tools/pd-ctl/tests/safepoint/safepoint_test.go new file mode 100644 index 00000000000..5551cce1fff --- /dev/null +++ b/tools/pd-ctl/tests/safepoint/safepoint_test.go @@ -0,0 +1,138 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package safepoint_test + +import ( + "context" + "encoding/json" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/server/api" + pdTests "github.com/tikv/pd/tests" + ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" + "github.com/tikv/pd/tools/pd-ctl/tests" +) + +func TestSafepoint(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := pdTests.NewTestCluster(ctx, 3) + re.NoError(err) + defer tc.Destroy() + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + re.NoError(leaderServer.BootstrapCluster()) + pdAddr := tc.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + // add some gc_safepoint to the server + list := &api.ListServiceGCSafepoint{ + ServiceGCSafepoints: []*endpoint.ServiceSafePoint{ + { + ServiceID: "AAA", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 1, + }, + { + ServiceID: "BBB", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 2, + }, + { + ServiceID: "CCC", + ExpiredAt: time.Now().Unix() + 10, + SafePoint: 3, + }, + }, + GCSafePoint: 1, + MinServiceGcSafepoint: 1, + } + + storage := leaderServer.GetServer().GetStorage() + for _, ssp := range list.ServiceGCSafepoints { + err := storage.SaveServiceGCSafePoint(ssp) + re.NoError(err) + } + storage.SaveGCSafePoint(1) + + // get the safepoints + args := []string{"-u", pdAddr, "service-gc-safepoint"} + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + + // create an container to hold the received values + var l api.ListServiceGCSafepoint + re.NoError(json.Unmarshal(output, &l)) + + // test if the points are what we expected + re.Equal(uint64(1), l.GCSafePoint) + re.Equal(uint64(1), l.MinServiceGcSafepoint) + re.Len(l.ServiceGCSafepoints, 3) + + // sort the gc safepoints based on order of ServiceID + sort.Slice(l.ServiceGCSafepoints, func(i, j int) bool { + return l.ServiceGCSafepoints[i].ServiceID < l.ServiceGCSafepoints[j].ServiceID + }) + + for i, val := range l.ServiceGCSafepoints { + re.Equal(list.ServiceGCSafepoints[i].ServiceID, val.ServiceID) + re.Equal(list.ServiceGCSafepoints[i].SafePoint, val.SafePoint) + } + + // delete the safepoints + for i := 0; i < 3; i++ { + args = []string{"-u", pdAddr, "service-gc-safepoint", "delete", list.ServiceGCSafepoints[i].ServiceID} + output, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + var msg string + re.NoError(json.Unmarshal(output, &msg)) + re.Equal("Delete service GC safepoint successfully.", msg) + } + + // do a second round of get safepoints to ensure that the safe points are indeed deleted + args = []string{"-u", pdAddr, "service-gc-safepoint"} + output, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + + var ll api.ListServiceGCSafepoint + re.NoError(json.Unmarshal(output, &ll)) + + re.Equal(uint64(1), ll.GCSafePoint) + re.Equal(uint64(0), ll.MinServiceGcSafepoint) + re.Empty(ll.ServiceGCSafepoints) + + // try delete the "gc_worker", should get an error message + args = []string{"-u", pdAddr, "service-gc-safepoint", "delete", "gc_worker"} + output, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + + // output should be an error message + re.Equal("Failed to delete service GC safepoint: request pd http api failed with status: '500 Internal Server Error', body: '\"cannot remove service safe point of gc_worker\"'\n", string(output)) + + // try delete a non-exist safepoint, should return normally + args = []string{"-u", pdAddr, "service-gc-safepoint", "delete", "non_exist"} + output, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + var msg string + re.NoError(json.Unmarshal(output, &msg)) + re.Equal("Delete service GC safepoint successfully.", msg) +}