Skip to content

Commit

Permalink
Merge branch 'master' into extract-conn
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Jun 13, 2023
2 parents fe0d036 + d09aaf5 commit 06ddb44
Show file tree
Hide file tree
Showing 16 changed files with 453 additions and 177 deletions.
6 changes: 4 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,10 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf("%s timestamp fallback, newly acquired ts (%d, %d) is less or equal to last one (%d, %d)",
dcLocation, physical, firstLogical, lastPhysical, lastLogical))
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID()))
}
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(kgm.Bootstrap(suite.ctx))
suite.NoError(suite.manager.Bootstrap())
}

Expand Down
55 changes: 36 additions & 19 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -51,9 +52,11 @@ const (

// GroupManager is the manager of keyspace group related data.
type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client
clusterID uint64

sync.RWMutex
// groups is the cache of keyspace group related information.
Expand Down Expand Up @@ -90,24 +93,24 @@ func NewKeyspaceGroupManager(
cancel: cancel,
store: store,
groups: groups,
client: client,
clusterID: clusterID,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if client != nil {
m.initTSONodesWatcher(client, clusterID)
m.wg.Add(2)
if m.client != nil {
m.initTSONodesWatcher(m.client, m.clusterID)
m.wg.Add(1)
go m.tsoNodesWatcher.StartWatchLoop()
go m.allocNodesToAllKeyspaceGroups()
}

return m
}

// Bootstrap saves default keyspace group info and init group mapping in the memory.
func (m *GroupManager) Bootstrap() error {
func (m *GroupManager) Bootstrap(ctx context.Context) error {
// Force the membership restriction that the default keyspace must belong to default keyspace group.
// Have no information to specify the distribution of the default keyspace group replicas, so just
// leave the replica/member list empty. The TSO service will assign the default keyspace group replica
Expand Down Expand Up @@ -137,6 +140,11 @@ func (m *GroupManager) Bootstrap() error {
m.groups[userKind].Put(group)
}

// It will only alloc node when the group manager is on API leader.
if m.client != nil {
m.wg.Add(1)
go m.allocNodesToAllKeyspaceGroups(ctx)
}
return nil
}

Expand All @@ -146,7 +154,7 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
Expand All @@ -158,7 +166,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
log.Info("start to alloc nodes to all keyspace groups")
for {
select {
case <-m.ctx.Done():
case <-ctx.Done():
log.Info("stop to alloc nodes to all keyspace groups")
return
case <-ticker.C:
Expand Down Expand Up @@ -338,11 +346,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
Members: keyspaceGroup.Members,
Keyspaces: keyspaceGroup.Keyspaces,
}
if oldKG.IsSplitting() {
newKG.SplitState = &endpoint.SplitState{
SplitSource: oldKG.SplitState.SplitSource,
}
}
err = m.store.SaveKeyspaceGroup(txn, newKG)
if err != nil {
return err
Expand Down Expand Up @@ -380,6 +383,8 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind)
return config, nil
}

var failpointOnce sync.Once

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
// when server is not in API mode, we don't need to update the keyspace for keyspace group
Expand All @@ -391,6 +396,12 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
return err
}

failpoint.Inject("externalAllocNode", func(val failpoint.Value) {
failpointOnce.Do(func() {
addrs := val.(string)
m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, strings.Split(addrs, ","))
})
})
m.Lock()
defer m.Unlock()
return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation)
Expand Down Expand Up @@ -425,7 +436,6 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
return err
}

m.groups[userKind].Put(kg)
}
return nil
Expand Down Expand Up @@ -696,8 +706,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
var kg *endpoint.KeyspaceGroup
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
var err error
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
Expand All @@ -714,6 +726,11 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
if err != nil {
return err
}
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
return nil
}

// IsExistNode checks if the node exists.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.NoError(suite.kgm.Bootstrap())
suite.NoError(suite.kgm.Bootstrap(suite.ctx))
}

func (suite *keyspaceGroupTestSuite) TearDownTest() {
Expand Down
13 changes: 12 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ type LoopWatcher struct {
postEventFn func() error

// forceLoadMu is used to ensure two force loads have minimal interval.
forceLoadMu sync.Mutex
forceLoadMu sync.RWMutex
// lastTimeForceLoad is used to record the last time force loading data from etcd.
lastTimeForceLoad time.Time

Expand Down Expand Up @@ -608,6 +608,17 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)

// ForceLoad forces to load the key.
func (lw *LoopWatcher) ForceLoad() {
// When NotLeader error happens, a large volume of force load requests will be received here,
// so the minimal interval between two force loads (from etcd) is used to avoid the congestion.
// Two-phase locking is also used to let most of the requests return directly without acquiring
// the write lock and causing the system to choke.
lw.forceLoadMu.RLock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
lw.forceLoadMu.RUnlock()
return
}
lw.forceLoadMu.RUnlock()

lw.forceLoadMu.Lock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
lw.forceLoadMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (c *RaftCluster) Start(s Server) error {
}

if s.IsAPIServiceMode() {
err = c.keyspaceGroupManager.Bootstrap()
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
return err
}
Expand Down
28 changes: 26 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ type Config struct {
LogFileDeprecated string `toml:"log-file" json:"log-file,omitempty"`
LogLevelDeprecated string `toml:"log-level" json:"log-level,omitempty"`

// MaxConcurrentTSOProxyStreamings is the maximum number of concurrent TSO proxy streaming process routines allowed.
// Exceeding this limit will result in an error being returned to the client when a new client starts a TSO streaming.
// Set this to 0 will disable TSO Proxy.
// Set this to the negative value to disable the limit.
MaxConcurrentTSOProxyStreamings int `toml:"max-concurrent-tso-proxy-streamings" json:"max-concurrent-tso-proxy-streamings"`
// TSOProxyClientRecvTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream.
// After the timeout, the TSO proxy will close the grpc TSO stream.
TSOProxyClientRecvTimeout typeutil.Duration `toml:"tso-proxy-client-recv-timeout" json:"tso-proxy-client-recv-timeout"`

// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`

Expand Down Expand Up @@ -219,6 +228,9 @@ const (

defaultDRWaitStoreTimeout = time.Minute

defaultMaxConcurrentTSOProxyStreamings = 5000
defaultTSOProxyClientRecvTimeout = 1 * time.Hour

defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
Expand Down Expand Up @@ -442,10 +454,11 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
}
}

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)
configutil.AdjustInt(&c.MaxConcurrentTSOProxyStreamings, defaultMaxConcurrentTSOProxyStreamings)
configutil.AdjustDuration(&c.TSOProxyClientRecvTimeout, defaultTSOProxyClientRecvTimeout)

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)
configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval)

configutil.AdjustDuration(&c.TSOUpdatePhysicalInterval, defaultTSOUpdatePhysicalInterval)

if c.TSOUpdatePhysicalInterval.Duration > maxTSOUpdatePhysicalInterval {
Expand Down Expand Up @@ -1252,6 +1265,17 @@ func (c *Config) IsLocalTSOEnabled() bool {
return c.EnableLocalTSO
}

// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings.
// If the value is negative, there is no limit.
func (c *Config) GetMaxConcurrentTSOProxyStreamings() int {
return c.MaxConcurrentTSOProxyStreamings
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (c *Config) GetTSOProxyClientRecvTimeout() time.Duration {
return c.TSOProxyClientRecvTimeout.Duration
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (c *Config) GetTSOUpdatePhysicalInterval() time.Duration {
return c.TSOUpdatePhysicalInterval.Duration
Expand Down
Loading

0 comments on commit 06ddb44

Please sign in to comment.