diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 7a7cda43361..a852b62331b 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -497,8 +497,8 @@ func (c *tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoCo type tsoConnectionContext struct { streamAddr string - // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluser, - // or tsopb.TSO_TsoClient for a primary/secondary in the TSO clusrer + // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster, + // or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster stream tsoStream ctx context.Context cancel context.CancelFunc diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 759e5306906..05d7acc3623 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -49,7 +49,7 @@ type tsoServiceDiscovery struct { clusterID uint64 keyspaceID uint32 urls atomic.Value // Store as []string - // primary key is the etcd path used for discoverying the serving endpoint of this keyspace + // primary key is the etcd path used for discovering the serving endpoint of this keyspace primaryKey string // TSO Primary URL primary atomic.Value // Store as string @@ -61,7 +61,7 @@ type tsoServiceDiscovery struct { clientConns sync.Map // Store as map[string]*grpc.ClientConn // localAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated. - // The input is a map {DC Localtion -> Leader Addr} + // The input is a map {DC Location -> Leader Addr} localAllocPrimariesUpdatedCb tsoLocalServAddrsUpdatedFunc // globalAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated. globalAllocPrimariesUpdatedCb tsoGlobalServAddrUpdatedFunc @@ -186,7 +186,7 @@ func (c *tsoServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil } -// GetClientConns returns the mapping {addr -> a gRPC connectio} +// GetClientConns returns the mapping {addr -> a gRPC connection} func (c *tsoServiceDiscovery) GetClientConns() *sync.Map { return &c.clientConns } @@ -209,7 +209,7 @@ func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) } -// ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in ervice endpoints. +// ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in service endpoints. func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() { select { case c.checkMembershipCh <- struct{}{}: diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index a6b0b6aca7b..64e04d2a111 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -50,4 +50,16 @@ const ( TSOServiceName = "tso" // ResourceManagerServiceName is the name of resource manager server. ResourceManagerServiceName = "resource_manager" + + // MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso + // is the sharding unit, i.e., by the definition here, the max count of the shards + // that we support is MaxKeyspaceGroupCount. The keyspace group id is in the range + // [0, 99999], which explains we use five-digits number (%05d) to render the keyspace + // group id in the storage endpoint path. + MaxKeyspaceGroupCount = uint32(100000) + // MaxKeyspaceGroupCountInUse is the max count of keyspace groups in use, which should + // never exceed MaxKeyspaceGroupCount defined above. Compared to MaxKeyspaceGroupCount, + // MaxKeyspaceGroupCountInUse is a much more reasonable value of the max count in the + // foreseen future, and the former is just for extensibility in theory. + MaxKeyspaceGroupCountInUse = uint32(4096) ) diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index bdd53925a15..5eda3cb8fc7 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -53,6 +53,9 @@ const ( tsoServiceKey = utils.TSOServiceName timestampKey = "timestamp" + tsoKeyspaceGroupPrefix = "tso/keyspace_groups" + keyspaceGroupMembershipKey = "membership" + // we use uint64 to represent ID, the max length of uint64 is 20. keyLen = 20 ) @@ -223,3 +226,20 @@ func KeyspaceIDAlloc() string { func encodeKeyspaceID(spaceID uint32) string { return fmt.Sprintf("%08d", spaceID) } + +// KeyspaceGroupIDPrefix returns the prefix of keyspace group id. +// Path: tso/keyspace_groups/membership +func KeyspaceGroupIDPrefix() string { + return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey) +} + +// KeyspaceGroupIDPath returns the path to keyspace id from the given name. +// Path: tso/keyspace_groups/membership/{id} +func KeyspaceGroupIDPath(id uint32) string { + return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id)) +} + +// encodeKeyspaceGroupID from uint32 to string. +func encodeKeyspaceGroupID(groupID uint32) string { + return fmt.Sprintf("%05d", groupID) +} diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go new file mode 100644 index 00000000000..2d65dfee28a --- /dev/null +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -0,0 +1,93 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "context" + "encoding/json" + + "github.com/tikv/pd/pkg/storage/kv" + "go.etcd.io/etcd/clientv3" +) + +// KeyspaceGroup is the keyspace group. +type KeyspaceGroup struct { + ID uint32 `json:"id"` + UserKind string `json:"user-kind"` + // TODO: add `Members` field +} + +// KeyspaceGroupStorage is the interface for keyspace group storage. +type KeyspaceGroupStorage interface { + LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error) + LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGroup, error) + SaveKeyspaceGroup(txn kv.Txn, kg *KeyspaceGroup) error + DeleteKeyspaceGroup(txn kv.Txn, id uint32) error + // TODO: add more interfaces. + RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error +} + +var _ KeyspaceGroupStorage = (*StorageEndpoint)(nil) + +// LoadKeyspaceGroup loads the keyspace group by id. +func (se *StorageEndpoint) LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGroup, error) { + value, err := txn.Load(KeyspaceGroupIDPath(id)) + if err != nil || value == "" { + return nil, err + } + kg := &KeyspaceGroup{} + if err := json.Unmarshal([]byte(value), kg); err != nil { + return nil, err + } + return kg, nil +} + +// SaveKeyspaceGroup saves the keyspace group. +func (se *StorageEndpoint) SaveKeyspaceGroup(txn kv.Txn, kg *KeyspaceGroup) error { + key := KeyspaceGroupIDPath(kg.ID) + value, err := json.Marshal(kg) + if err != nil { + return err + } + return txn.Save(key, string(value)) +} + +// DeleteKeyspaceGroup deletes the keyspace group. +func (se *StorageEndpoint) DeleteKeyspaceGroup(txn kv.Txn, id uint32) error { + return txn.Remove(KeyspaceGroupIDPath(id)) +} + +// LoadKeyspaceGroups loads keyspace groups from the start ID with limit. +// If limit is 0, it will load all keyspace groups from the start ID. +func (se *StorageEndpoint) LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error) { + prefix := KeyspaceGroupIDPath(startID) + prefixEnd := clientv3.GetPrefixRangeEnd(KeyspaceGroupIDPrefix()) + keys, values, err := se.LoadRange(prefix, prefixEnd, limit) + if err != nil { + return nil, err + } + if len(keys) == 0 { + return []*KeyspaceGroup{}, nil + } + kgs := make([]*KeyspaceGroup, 0, len(keys)) + for _, value := range values { + kg := &KeyspaceGroup{} + if err = json.Unmarshal([]byte(value), kg); err != nil { + return nil, err + } + kgs = append(kgs, kg) + } + return kgs, nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index f2bfe1f74fb..da6a06c588a 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -44,6 +44,7 @@ type Storage interface { endpoint.KeyspaceStorage endpoint.ResourceGroupStorage endpoint.TSOStorage + endpoint.KeyspaceGroupStorage } // NewStorageWithMemoryBackend creates a new storage with memory backend. diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 43d178b972c..ccd5ddce169 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -31,21 +31,8 @@ import ( "go.uber.org/zap" ) -const ( - // maxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso - // is the sharding unit, i.e., by the definition here, the max count of the shards - // that we support is maxKeyspaceGroupCount. The keyspace group id is in the range - // [0, 99999], which explains we use five-digits number (%05d) to render the keyspace - // group id in the storage endpoint path. - maxKeyspaceGroupCount = uint32(100000) - // maxKeyspaceGroupCountInUse is the max count of keyspace groups in use, which should - // never exceed maxKeyspaceGroupCount defined above. Compared to maxKeyspaceGroupCount, - // maxKeyspaceGroupCountInUse is a much more reasonable value of the max count in the - // foreseen future, and the former is just for extensibility in theory. - maxKeyspaceGroupCountInUse = uint32(4096) - // primaryElectionSuffix is the suffix of the key for keyspace group primary election - primaryElectionSuffix = "primary" -) +// primaryElectionSuffix is the suffix of the key for keyspace group primary election +const primaryElectionSuffix = "primary" // KeyspaceGroupManager manages the primary/secondaries of the keyspace groups // assigned to this host. The primaries provide the tso service for the corresponding @@ -56,7 +43,7 @@ type KeyspaceGroupManager struct { // different keyspace groups for tso service. // TODO: change item type to atomic.Value stored as *AllocatorManager after we // support online keyspace group assignment. - ksgAllocatorManagers [maxKeyspaceGroupCountInUse]*AllocatorManager + ksgAllocatorManagers [mcsutils.MaxKeyspaceGroupCountInUse]*AllocatorManager ctx context.Context cancel context.CancelFunc @@ -100,10 +87,10 @@ func NewKeyspaceGroupManager( tsoSvcRootPath string, cfg ServiceConfig, ) *KeyspaceGroupManager { - if maxKeyspaceGroupCountInUse > maxKeyspaceGroupCount { - log.Fatal("maxKeyspaceGroupCountInUse is larger than maxKeyspaceGroupCount", - zap.Uint32("maxKeyspaceGroupCountInUse", maxKeyspaceGroupCountInUse), - zap.Uint32("maxKeyspaceGroupCount", maxKeyspaceGroupCount)) + if mcsutils.MaxKeyspaceGroupCountInUse > mcsutils.MaxKeyspaceGroupCount { + log.Fatal("MaxKeyspaceGroupCountInUse is larger than MaxKeyspaceGroupCount", + zap.Uint32("max-keyspace-group-count-in-use", mcsutils.MaxKeyspaceGroupCountInUse), + zap.Uint32("max-keyspace-group-count", mcsutils.MaxKeyspaceGroupCount)) } ctx, cancel := context.WithCancel(ctx) diff --git a/server/apiv2/handlers/keyspace.go b/server/apiv2/handlers/keyspace.go index a5f930a5375..1d607ab6017 100644 --- a/server/apiv2/handlers/keyspace.go +++ b/server/apiv2/handlers/keyspace.go @@ -60,7 +60,7 @@ type CreateKeyspaceParams struct { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /keyspaces [post] func CreateKeyspace(c *gin.Context) { - svr := c.MustGet("server").(*server.Server) + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceManager() createParams := &CreateKeyspaceParams{} err := c.BindJSON(createParams) @@ -91,7 +91,7 @@ func CreateKeyspace(c *gin.Context) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /keyspaces/{name} [get] func LoadKeyspace(c *gin.Context) { - svr := c.MustGet("server").(*server.Server) + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceManager() name := c.Param("name") meta, err := manager.LoadKeyspace(name) @@ -117,7 +117,7 @@ func LoadKeyspaceByID(c *gin.Context) { c.AbortWithStatusJSON(http.StatusInternalServerError, "invalid keyspace id") return } - svr := c.MustGet("server").(*server.Server) + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceManager() meta, err := manager.LoadKeyspaceByID(uint32(id)) if err != nil { @@ -127,18 +127,18 @@ func LoadKeyspaceByID(c *gin.Context) { c.IndentedJSON(http.StatusOK, &KeyspaceMeta{meta}) } -// parseLoadAllQuery parses LoadAllKeyspaces' query parameters. +// parseLoadAllQuery parses LoadAllKeyspaces'/GetKeyspaceGroups' query parameters. // page_token: -// The keyspace id of the scan start. If not set, scan from keyspace with id 1. -// It's string of spaceID of the previous scan result's last element (next_page_token). +// The keyspace/keyspace group id of the scan start. If not set, scan from keyspace/keyspace group with id 1. +// It's string of ID of the previous scan result's last element (next_page_token). // limit: -// The maximum number of keyspace metas to return. If not set, no limit is posed. -// Every scan scans limit + 1 keyspaces (if limit != 0), the extra scanned keyspace +// The maximum number of keyspace metas/keyspace groups to return. If not set, no limit is posed. +// Every scan scans limit + 1 keyspaces/keyspace groups (if limit != 0), the extra scanned keyspace/keyspace group // is to check if there's more, and used to set next_page_token in response. func parseLoadAllQuery(c *gin.Context) (scanStart uint32, scanLimit int, err error) { pageToken, set := c.GetQuery("page_token") if !set || pageToken == "" { - // If pageToken is empty or unset, then scan from spaceID of 1. + // If pageToken is empty or unset, then scan from ID of 1. scanStart = 0 } else { scanStart64, err := strconv.ParseUint(pageToken, 10, 32) @@ -185,7 +185,7 @@ type LoadAllKeyspacesResponse struct { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /keyspaces [get] func LoadAllKeyspaces(c *gin.Context) { - svr := c.MustGet("server").(*server.Server) + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceManager() scanStart, scanLimit, err := parseLoadAllQuery(c) if err != nil { @@ -246,7 +246,7 @@ type UpdateConfigParams struct { // // Router /keyspaces/{name}/config [patch] func UpdateKeyspaceConfig(c *gin.Context) { - svr := c.MustGet("server").(*server.Server) + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceManager() name := c.Param("name") configParams := &UpdateConfigParams{} @@ -303,7 +303,7 @@ type UpdateStateParam struct { // // Router /keyspaces/{name}/state [put] func UpdateKeyspaceState(c *gin.Context) { - svr := c.MustGet("server").(*server.Server) + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceManager() name := c.Param("name") param := &UpdateStateParam{} diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go new file mode 100644 index 00000000000..a41d1f4c1b2 --- /dev/null +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -0,0 +1,138 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/pkg/errors" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/apiv2/middlewares" +) + +// RegisterTSOKeyspaceGroup registers keyspace group handlers to the server. +func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) { + router := r.Group("tso/keyspace-groups") + router.Use(middlewares.BootstrapChecker()) + router.POST("", CreateKeyspaceGroups) + router.GET("", GetKeyspaceGroups) + router.GET("/:id", GetKeyspaceGroupByID) + router.DELETE("/:id", DeleteKeyspaceGroupByID) +} + +// CreateKeyspaceGroupParams defines the params for creating keyspace groups. +type CreateKeyspaceGroupParams struct { + KeyspaceGroups []*endpoint.KeyspaceGroup `json:"keyspace-groups"` +} + +// CreateKeyspaceGroups creates keyspace groups. +func CreateKeyspaceGroups(c *gin.Context) { + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + createParams := &CreateKeyspaceGroupParams{} + err := c.BindJSON(createParams) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) + return + } + + for _, keyspaceGroup := range createParams.KeyspaceGroups { + if !isValid(keyspaceGroup.ID) { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + } + + err = manager.CreateKeyspaceGroups(createParams.KeyspaceGroups) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nil) +} + +// GetKeyspaceGroups gets keyspace groups from the start ID with limit. +// If limit is 0, it will load all keyspace groups from the start ID. +func GetKeyspaceGroups(c *gin.Context) { + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + scanStart, scanLimit, err := parseLoadAllQuery(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, err.Error()) + return + } + keyspaceGroups, err := manager.GetKeyspaceGroups(scanStart, scanLimit) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + + c.IndentedJSON(http.StatusOK, keyspaceGroups) +} + +// GetKeyspaceGroupByID gets keyspace group by id. +func GetKeyspaceGroupByID(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + kg, err := manager.GetKeyspaceGroupByID(id) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + + c.IndentedJSON(http.StatusOK, kg) +} + +// DeleteKeyspaceGroupByID deletes keyspace group by id. +func DeleteKeyspaceGroupByID(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + err = manager.DeleteKeyspaceGroupByID(id) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nil) +} + +func validateKeyspaceGroupID(c *gin.Context) (uint32, error) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil { + return 0, err + } + if !isValid(uint32(id)) { + return 0, errors.Errorf("invalid keyspace group id: %d", id) + } + return uint32(id), nil +} + +func isValid(id uint32) bool { + return id >= utils.DefaultKeySpaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse +} diff --git a/server/apiv2/middlewares/bootstrap_checker.go b/server/apiv2/middlewares/bootstrap_checker.go index 384847be931..794316d3d0f 100644 --- a/server/apiv2/middlewares/bootstrap_checker.go +++ b/server/apiv2/middlewares/bootstrap_checker.go @@ -22,10 +22,13 @@ import ( "github.com/tikv/pd/server" ) +// ServerContextKey is the key to get server from gin.Context. +const ServerContextKey = "server" + // BootstrapChecker is a middleware to check if raft cluster is started. func BootstrapChecker() gin.HandlerFunc { return func(c *gin.Context) { - svr := c.MustGet("server").(*server.Server) + svr := c.MustGet(ServerContextKey).(*server.Server) rc := svr.GetRaftCluster() if rc == nil { c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error()) diff --git a/server/apiv2/middlewares/redirector.go b/server/apiv2/middlewares/redirector.go index 6f92c15754e..5539dd089dc 100644 --- a/server/apiv2/middlewares/redirector.go +++ b/server/apiv2/middlewares/redirector.go @@ -30,7 +30,7 @@ import ( // Redirector is a middleware to redirect the request to the right place. func Redirector() gin.HandlerFunc { return func(c *gin.Context) { - svr := c.MustGet("server").(*server.Server) + svr := c.MustGet(ServerContextKey).(*server.Server) allowFollowerHandle := len(c.Request.Header.Get(serverapi.PDAllowFollowerHandle)) > 0 isLeader := svr.GetMember().IsLeader() if !svr.IsClosed() && (allowFollowerHandle || isLeader) { diff --git a/server/apiv2/router.go b/server/apiv2/router.go index 5307b2b5c0f..383d336caae 100644 --- a/server/apiv2/router.go +++ b/server/apiv2/router.go @@ -57,11 +57,12 @@ func NewV2Handler(_ context.Context, svr *server.Server) (http.Handler, apiutil. }) router := gin.New() router.Use(func(c *gin.Context) { - c.Set("server", svr) + c.Set(middlewares.ServerContextKey, svr) c.Next() }) router.Use(middlewares.Redirector()) root := router.Group(apiV2Prefix) handlers.RegisterKeyspace(root) + handlers.RegisterTSOKeyspaceGroup(root) return router, group, nil } diff --git a/server/keyspace/tso_keyspace_group.go b/server/keyspace/tso_keyspace_group.go new file mode 100644 index 00000000000..a2165666aad --- /dev/null +++ b/server/keyspace/tso_keyspace_group.go @@ -0,0 +1,113 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspace + +import ( + "context" + + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" +) + +// GroupManager is the manager of keyspace group related data. +type GroupManager struct { + ctx context.Context + // store is the storage for keyspace group related information. + store endpoint.KeyspaceGroupStorage +} + +// NewKeyspaceGroupManager creates a Manager of keyspace group related data. +func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupStorage) *GroupManager { + return &GroupManager{ + ctx: ctx, + store: store, + } +} + +// Bootstrap saves default keyspace group info. +func (m *GroupManager) Bootstrap() error { + defaultKeyspaceGroup := &endpoint.KeyspaceGroup{ + ID: utils.DefaultKeySpaceGroupID, + // TODO: define a user kind type + UserKind: "default", + } + err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}) + // It's possible that default keyspace group already exists in the storage (e.g. PD restart/recover), + // so we ignore the ErrKeyspaceGroupExists. + if err != nil && err != ErrKeyspaceGroupExists { + return err + } + + return nil +} + +// CreateKeyspaceGroups creates keyspace groups. +func (m *GroupManager) CreateKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup) error { + return m.saveKeyspaceGroups(keyspaceGroups) +} + +// GetKeyspaceGroups gets keyspace groups from the start ID with limit. +// If limit is 0, it will load all keyspace groups from the start ID. +func (m *GroupManager) GetKeyspaceGroups(startID uint32, limit int) ([]*endpoint.KeyspaceGroup, error) { + return m.store.LoadKeyspaceGroups(startID, limit) +} + +// GetKeyspaceGroupByID returns the keyspace group by id. +func (m *GroupManager) GetKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGroup, error) { + var ( + kg *endpoint.KeyspaceGroup + err error + ) + + m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { + kg, err = m.store.LoadKeyspaceGroup(txn, id) + if err != nil { + return err + } + return nil + }) + return kg, nil +} + +// DeleteKeyspaceGroupByID deletes the keyspace group by id. +func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) error { + m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { + return m.store.DeleteKeyspaceGroup(txn, id) + }) + return nil +} + +func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup) error { + return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { + for _, keyspaceGroup := range keyspaceGroups { + // TODO: add replica count + newKG := &endpoint.KeyspaceGroup{ + ID: keyspaceGroup.ID, + UserKind: keyspaceGroup.UserKind, + } + // Check if keyspace group has already existed. + oldKG, err := m.store.LoadKeyspaceGroup(txn, keyspaceGroup.ID) + if err != nil { + return err + } + if oldKG != nil { + return ErrKeyspaceGroupExists + } + m.store.SaveKeyspaceGroup(txn, newKG) + } + return nil + }) +} diff --git a/server/keyspace/tso_keyspace_group_test.go b/server/keyspace/tso_keyspace_group_test.go new file mode 100644 index 00000000000..f55d3ce8028 --- /dev/null +++ b/server/keyspace/tso_keyspace_group_test.go @@ -0,0 +1,89 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspace + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" +) + +type keyspaceGroupTestSuite struct { + suite.Suite + manager *GroupManager +} + +func TestKeyspaceGroupTestSuite(t *testing.T) { + suite.Run(t, new(keyspaceGroupTestSuite)) +} + +func (suite *keyspaceGroupTestSuite) SetupTest() { + store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) + suite.manager = NewKeyspaceGroupManager(context.Background(), store) + suite.NoError(suite.manager.Bootstrap()) +} + +func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() { + re := suite.Require() + + keyspaceGroups := []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: "business", + }, + { + ID: uint32(2), + UserKind: "business", + }, + { + ID: uint32(3), + UserKind: "business", + }, + } + err := suite.manager.CreateKeyspaceGroups(keyspaceGroups) + re.NoError(err) + // list all keyspace groups + kgs, err := suite.manager.GetKeyspaceGroups(uint32(0), 0) + re.NoError(err) + re.Len(kgs, 4) + // list part of keyspace groups + kgs, err = suite.manager.GetKeyspaceGroups(uint32(1), 2) + re.NoError(err) + re.Len(kgs, 2) + // get the default keyspace group + kg, err := suite.manager.GetKeyspaceGroupByID(0) + re.NoError(err) + re.Equal(uint32(0), kg.ID) + re.Equal("default", kg.UserKind) + kg, err = suite.manager.GetKeyspaceGroupByID(3) + re.NoError(err) + re.Equal(uint32(3), kg.ID) + re.Equal("business", kg.UserKind) + // remove the keyspace group 3 + err = suite.manager.DeleteKeyspaceGroupByID(3) + re.NoError(err) + // get non-existing keyspace group + kg, err = suite.manager.GetKeyspaceGroupByID(3) + re.NoError(err) + re.Empty(kg) + + // create an existing keyspace group + keyspaceGroups = []*endpoint.KeyspaceGroup{{ID: uint32(1), UserKind: "business"}} + err = suite.manager.CreateKeyspaceGroups(keyspaceGroups) + re.Error(err) +} diff --git a/server/keyspace/util.go b/server/keyspace/util.go index a3402846d03..aa6c602a3a2 100644 --- a/server/keyspace/util.go +++ b/server/keyspace/util.go @@ -38,10 +38,12 @@ var ( // ErrKeyspaceNotFound is used to indicate target keyspace does not exist. ErrKeyspaceNotFound = errors.New("keyspace does not exist") // ErrKeyspaceExists indicates target keyspace already exists. - // Used when creating a new keyspace. - ErrKeyspaceExists = errors.New("keyspace already exists") - errModifyDefault = errors.New("cannot modify default keyspace's state") - errIllegalOperation = errors.New("unknown operation") + // It's used when creating a new keyspace. + ErrKeyspaceExists = errors.New("keyspace already exists") + // ErrKeyspaceGroupExists indicates target keyspace group already exists. + ErrKeyspaceGroupExists = errors.New("keyspace group already exists") + errModifyDefault = errors.New("cannot modify default keyspace's state") + errIllegalOperation = errors.New("unknown operation") // stateTransitionTable lists all allowed next state for the given current state. // Note that transit from any state to itself is allowed for idempotence. diff --git a/server/server.go b/server/server.go index f72884e90e2..7586cc23fe2 100644 --- a/server/server.go +++ b/server/server.go @@ -162,6 +162,8 @@ type Server struct { gcSafePointManager *gc.SafePointManager // keyspace manager keyspaceManager *keyspace.Manager + // keyspace group manager + keyspaceGroupManager *keyspace.GroupManager // for basicCluster operation. basicCluster *core.BasicCluster // for tso. @@ -439,6 +441,7 @@ func (s *Server) startServer(ctx context.Context) error { Step: keyspace.AllocStep, }) s.keyspaceManager = keyspace.NewKeyspaceManager(s.storage, s.cluster, keyspaceIDAllocator, s.cfg.Keyspace) + s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster) // initial hot_region_storage in here. s.hotRegionStorage, err = storage.NewHotRegionsStorage( @@ -699,6 +702,10 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe log.Warn("bootstrap keyspace manager failed", errs.ZapError(err)) } + if err = s.GetKeyspaceGroupManager().Bootstrap(); err != nil { + log.Warn("bootstrap keyspace group manager failed", errs.ZapError(err)) + } + return &pdpb.BootstrapResponse{ ReplicationStatus: s.cluster.GetReplicationMode().GetReplicationStatus(), }, nil @@ -826,6 +833,11 @@ func (s *Server) GetKeyspaceManager() *keyspace.Manager { return s.keyspaceManager } +// GetKeyspaceGroupManager returns the keyspace group manager of server. +func (s *Server) GetKeyspaceGroupManager() *keyspace.GroupManager { + return s.keyspaceGroupManager +} + // Name returns the unique etcd Name for this server in etcd cluster. func (s *Server) Name() string { return s.cfg.Name diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go new file mode 100644 index 00000000000..cee3c2f69f5 --- /dev/null +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -0,0 +1,126 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers_test + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/tests" +) + +const keyspaceGroupsPrefix = "/pd/api/v2/tso/keyspace-groups" + +type keyspaceGroupTestSuite struct { + suite.Suite + cleanup func() + cluster *tests.TestCluster + server *tests.TestServer +} + +func TestKeyspaceGroupTestSuite(t *testing.T) { + suite.Run(t, new(keyspaceGroupTestSuite)) +} + +func (suite *keyspaceGroupTestSuite) SetupTest() { + ctx, cancel := context.WithCancel(context.Background()) + suite.cleanup = cancel + cluster, err := tests.NewTestCluster(ctx, 1) + suite.cluster = cluster + suite.NoError(err) + suite.NoError(cluster.RunInitialServers()) + suite.NotEmpty(cluster.WaitLeader()) + suite.server = cluster.GetServer(cluster.GetLeader()) + suite.NoError(suite.server.BootstrapCluster()) +} + +func (suite *keyspaceGroupTestSuite) TearDownTest() { + suite.cleanup() + suite.cluster.Destroy() +} + +func (suite *keyspaceGroupTestSuite) TestCreateKeyspaceGroups() { + re := suite.Require() + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: "business", + }, + { + ID: uint32(2), + UserKind: "business", + }, + }} + + mustCreateKeyspaceGroup(re, suite.server, kgs) +} + +func (suite *keyspaceGroupTestSuite) TestLoadKeyspaceGroup() { + re := suite.Require() + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: "business", + }, + { + ID: uint32(2), + UserKind: "business", + }, + }} + + mustCreateKeyspaceGroup(re, suite.server, kgs) + resp := sendLoadKeyspaceGroupRequest(re, suite.server, "0", "0") + re.Equal(3, len(resp)) +} + +func sendLoadKeyspaceGroupRequest(re *require.Assertions, server *tests.TestServer, token, limit string) []*endpoint.KeyspaceGroup { + // Construct load range request. + httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+keyspaceGroupsPrefix, nil) + re.NoError(err) + query := httpReq.URL.Query() + query.Add("page_token", token) + query.Add("limit", limit) + httpReq.URL.RawQuery = query.Encode() + // Send request. + httpResp, err := dialClient.Do(httpReq) + re.NoError(err) + defer httpResp.Body.Close() + re.Equal(http.StatusOK, httpResp.StatusCode) + // Receive & decode response. + data, err := io.ReadAll(httpResp.Body) + re.NoError(err) + var resp []*endpoint.KeyspaceGroup + re.NoError(json.Unmarshal(data, &resp)) + return resp +} + +func mustCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceGroupParams) { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix, bytes.NewBuffer(data)) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) +}