Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support keyspace group RESTful API #6229

Merged
merged 6 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ func (c *tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoCo

type tsoConnectionContext struct {
streamAddr string
// Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluser,
// or tsopb.TSO_TsoClient for a primary/secondary in the TSO clusrer
// Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster,
// or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster
stream tsoStream
ctx context.Context
cancel context.CancelFunc
Expand Down
8 changes: 4 additions & 4 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type tsoServiceDiscovery struct {
clusterID uint64
keyspaceID uint32
urls atomic.Value // Store as []string
// primary key is the etcd path used for discoverying the serving endpoint of this keyspace
// primary key is the etcd path used for discovering the serving endpoint of this keyspace
primaryKey string
// TSO Primary URL
primary atomic.Value // Store as string
Expand All @@ -61,7 +61,7 @@ type tsoServiceDiscovery struct {
clientConns sync.Map // Store as map[string]*grpc.ClientConn

// localAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated.
// The input is a map {DC Localtion -> Leader Addr}
// The input is a map {DC Location -> Leader Addr}
localAllocPrimariesUpdatedCb tsoLocalServAddrsUpdatedFunc
// globalAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated.
globalAllocPrimariesUpdatedCb tsoGlobalServAddrUpdatedFunc
Expand Down Expand Up @@ -186,7 +186,7 @@ func (c *tsoServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn {
return nil
}

// GetClientConns returns the mapping {addr -> a gRPC connectio}
// GetClientConns returns the mapping {addr -> a gRPC connection}
func (c *tsoServiceDiscovery) GetClientConns() *sync.Map {
return &c.clientConns
}
Expand All @@ -209,7 +209,7 @@ func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...)
}

// ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in ervice endpoints.
// ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in service endpoints.
func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() {
select {
case c.checkMembershipCh <- struct{}{}:
Expand Down
12 changes: 12 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,16 @@ const (
TSOServiceName = "tso"
// ResourceManagerServiceName is the name of resource manager server.
ResourceManagerServiceName = "resource_manager"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
// that we support is MaxKeyspaceGroupCount. The keyspace group id is in the range
// [0, 99999], which explains we use five-digits number (%05d) to render the keyspace
// group id in the storage endpoint path.
MaxKeyspaceGroupCount = uint32(100000)
// MaxKeyspaceGroupCountInUse is the max count of keyspace groups in use, which should
// never exceed MaxKeyspaceGroupCount defined above. Compared to MaxKeyspaceGroupCount,
// MaxKeyspaceGroupCountInUse is a much more reasonable value of the max count in the
// foreseen future, and the former is just for extensibility in theory.
MaxKeyspaceGroupCountInUse = uint32(4096)
)
20 changes: 20 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ const (
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"

tsoKeyspaceGroupPrefix = "tso/keyspace_groups"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we change to utils.TSOServiceName + "keyspace_groups"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I think there is no need and it's simpler. Once some other services need the group concept, we can refactor it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My current change is using it. Leave this refactor to me then. Your pr goest first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it

keyspaceGroupMembershipKey = "membership"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
)
Expand Down Expand Up @@ -223,3 +226,20 @@ func KeyspaceIDAlloc() string {
func encodeKeyspaceID(spaceID uint32) string {
return fmt.Sprintf("%08d", spaceID)
}

// KeyspaceGroupIDPrefix returns the prefix of keyspace group id.
// Path: tso/keyspace_groups/membership
func KeyspaceGroupIDPrefix() string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey)
}

// KeyspaceGroupIDPath returns the path to keyspace id from the given name.
// Path: tso/keyspace_groups/membership/{id}
func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
}
93 changes: 93 additions & 0 deletions pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"context"
"encoding/json"

"github.com/tikv/pd/pkg/storage/kv"
"go.etcd.io/etcd/clientv3"
)

// KeyspaceGroup is the keyspace group.
type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
// TODO: add `Members` field
Copy link
Contributor

@binshi-bing binshi-bing Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should call it Replicas. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are ok to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to you then

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep members then. I'm using members in my change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

// KeyspaceGroupStorage is the interface for keyspace group storage.
type KeyspaceGroupStorage interface {
LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to the function "LoadKeyspaceGroup(txn kv.Txn, id uint32)", if we load multiple groups here, do we need to use transaction to provide snapshot isolation consistency level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGroup, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to load from single key-value pair, correct? If yes, do we still need transaction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we use the storage interface, it will create a txn. I think it's ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

SaveKeyspaceGroup(txn kv.Txn, kg *KeyspaceGroup) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto - SaveKeyspaceGroup and DeleteKeyspaceGroup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

DeleteKeyspaceGroup(txn kv.Txn, id uint32) error
// TODO: add more interfaces.
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
}

var _ KeyspaceGroupStorage = (*StorageEndpoint)(nil)

// LoadKeyspaceGroup loads the keyspace group by id.
func (se *StorageEndpoint) LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGroup, error) {
value, err := txn.Load(KeyspaceGroupIDPath(id))
if err != nil || value == "" {
return nil, err
}
kg := &KeyspaceGroup{}
if err := json.Unmarshal([]byte(value), kg); err != nil {
return nil, err
}
return kg, nil
}

// SaveKeyspaceGroup saves the keyspace group.
func (se *StorageEndpoint) SaveKeyspaceGroup(txn kv.Txn, kg *KeyspaceGroup) error {
key := KeyspaceGroupIDPath(kg.ID)
value, err := json.Marshal(kg)
if err != nil {
return err
}
return txn.Save(key, string(value))
}

// DeleteKeyspaceGroup deletes the keyspace group.
func (se *StorageEndpoint) DeleteKeyspaceGroup(txn kv.Txn, id uint32) error {
return txn.Remove(KeyspaceGroupIDPath(id))
}

// LoadKeyspaceGroups loads keyspace groups from the start ID with limit.
// If limit is 0, it will load all keyspace groups from the start ID.
func (se *StorageEndpoint) LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error) {
prefix := KeyspaceGroupIDPath(startID)
prefixEnd := clientv3.GetPrefixRangeEnd(KeyspaceGroupIDPrefix())
keys, values, err := se.LoadRange(prefix, prefixEnd, limit)
if err != nil {
return nil, err
}
if len(keys) == 0 {
return []*KeyspaceGroup{}, nil
}
kgs := make([]*KeyspaceGroup, 0, len(keys))
for _, value := range values {
kg := &KeyspaceGroup{}
if err = json.Unmarshal([]byte(value), kg); err != nil {
return nil, err
}
kgs = append(kgs, kg)
}
return kgs, nil
}
1 change: 1 addition & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Storage interface {
endpoint.KeyspaceStorage
endpoint.ResourceGroupStorage
endpoint.TSOStorage
endpoint.KeyspaceGroupStorage
}

// NewStorageWithMemoryBackend creates a new storage with memory backend.
Expand Down
27 changes: 7 additions & 20 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,8 @@ import (
"go.uber.org/zap"
)

const (
// maxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
// that we support is maxKeyspaceGroupCount. The keyspace group id is in the range
// [0, 99999], which explains we use five-digits number (%05d) to render the keyspace
// group id in the storage endpoint path.
maxKeyspaceGroupCount = uint32(100000)
// maxKeyspaceGroupCountInUse is the max count of keyspace groups in use, which should
// never exceed maxKeyspaceGroupCount defined above. Compared to maxKeyspaceGroupCount,
// maxKeyspaceGroupCountInUse is a much more reasonable value of the max count in the
// foreseen future, and the former is just for extensibility in theory.
maxKeyspaceGroupCountInUse = uint32(4096)
// primaryElectionSuffix is the suffix of the key for keyspace group primary election
primaryElectionSuffix = "primary"
)
// primaryElectionSuffix is the suffix of the key for keyspace group primary election
const primaryElectionSuffix = "primary"

// KeyspaceGroupManager manages the primary/secondaries of the keyspace groups
// assigned to this host. The primaries provide the tso service for the corresponding
Expand All @@ -56,7 +43,7 @@ type KeyspaceGroupManager struct {
// different keyspace groups for tso service.
// TODO: change item type to atomic.Value stored as *AllocatorManager after we
// support online keyspace group assignment.
ksgAllocatorManagers [maxKeyspaceGroupCountInUse]*AllocatorManager
ksgAllocatorManagers [mcsutils.MaxKeyspaceGroupCountInUse]*AllocatorManager

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -100,10 +87,10 @@ func NewKeyspaceGroupManager(
tsoSvcRootPath string,
cfg ServiceConfig,
) *KeyspaceGroupManager {
if maxKeyspaceGroupCountInUse > maxKeyspaceGroupCount {
log.Fatal("maxKeyspaceGroupCountInUse is larger than maxKeyspaceGroupCount",
zap.Uint32("maxKeyspaceGroupCountInUse", maxKeyspaceGroupCountInUse),
zap.Uint32("maxKeyspaceGroupCount", maxKeyspaceGroupCount))
if mcsutils.MaxKeyspaceGroupCountInUse > mcsutils.MaxKeyspaceGroupCount {
log.Fatal("MaxKeyspaceGroupCountInUse is larger than MaxKeyspaceGroupCount",
zap.Uint32("max-keyspace-group-count-in-use", mcsutils.MaxKeyspaceGroupCountInUse),
zap.Uint32("max-keyspace-group-count", mcsutils.MaxKeyspaceGroupCount))
}

ctx, cancel := context.WithCancel(ctx)
Expand Down
24 changes: 12 additions & 12 deletions server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type CreateKeyspaceParams struct {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces [post]
func CreateKeyspace(c *gin.Context) {
svr := c.MustGet("server").(*server.Server)
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
createParams := &CreateKeyspaceParams{}
err := c.BindJSON(createParams)
Expand Down Expand Up @@ -91,7 +91,7 @@ func CreateKeyspace(c *gin.Context) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces/{name} [get]
func LoadKeyspace(c *gin.Context) {
svr := c.MustGet("server").(*server.Server)
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
name := c.Param("name")
meta, err := manager.LoadKeyspace(name)
Expand All @@ -117,7 +117,7 @@ func LoadKeyspaceByID(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusInternalServerError, "invalid keyspace id")
return
}
svr := c.MustGet("server").(*server.Server)
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
meta, err := manager.LoadKeyspaceByID(uint32(id))
if err != nil {
Expand All @@ -127,18 +127,18 @@ func LoadKeyspaceByID(c *gin.Context) {
c.IndentedJSON(http.StatusOK, &KeyspaceMeta{meta})
}

// parseLoadAllQuery parses LoadAllKeyspaces' query parameters.
// parseLoadAllQuery parses LoadAllKeyspaces'/GetKeyspaceGroups' query parameters.
// page_token:
// The keyspace id of the scan start. If not set, scan from keyspace with id 1.
// It's string of spaceID of the previous scan result's last element (next_page_token).
// The keyspace/keyspace group id of the scan start. If not set, scan from keyspace/keyspace group with id 1.
// It's string of ID of the previous scan result's last element (next_page_token).
// limit:
// The maximum number of keyspace metas to return. If not set, no limit is posed.
// Every scan scans limit + 1 keyspaces (if limit != 0), the extra scanned keyspace
// The maximum number of keyspace metas/keyspace groups to return. If not set, no limit is posed.
// Every scan scans limit + 1 keyspaces/keyspace groups (if limit != 0), the extra scanned keyspace/keyspace group
// is to check if there's more, and used to set next_page_token in response.
func parseLoadAllQuery(c *gin.Context) (scanStart uint32, scanLimit int, err error) {
pageToken, set := c.GetQuery("page_token")
if !set || pageToken == "" {
// If pageToken is empty or unset, then scan from spaceID of 1.
// If pageToken is empty or unset, then scan from ID of 1.
scanStart = 0
} else {
scanStart64, err := strconv.ParseUint(pageToken, 10, 32)
Expand Down Expand Up @@ -185,7 +185,7 @@ type LoadAllKeyspacesResponse struct {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces [get]
func LoadAllKeyspaces(c *gin.Context) {
svr := c.MustGet("server").(*server.Server)
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
scanStart, scanLimit, err := parseLoadAllQuery(c)
if err != nil {
Expand Down Expand Up @@ -246,7 +246,7 @@ type UpdateConfigParams struct {
//
// Router /keyspaces/{name}/config [patch]
func UpdateKeyspaceConfig(c *gin.Context) {
svr := c.MustGet("server").(*server.Server)
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
name := c.Param("name")
configParams := &UpdateConfigParams{}
Expand Down Expand Up @@ -303,7 +303,7 @@ type UpdateStateParam struct {
//
// Router /keyspaces/{name}/state [put]
func UpdateKeyspaceState(c *gin.Context) {
svr := c.MustGet("server").(*server.Server)
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
name := c.Param("name")
param := &UpdateStateParam{}
Expand Down
Loading