Skip to content

Commit

Permalink
Merge branch 'master' into trace
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Aug 4, 2023
2 parents f153616 + c8c5e1e commit 19d92ff
Show file tree
Hide file tree
Showing 24 changed files with 814 additions and 220 deletions.
9 changes: 7 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Require review from domain experts when the PR modified significant config files.
# Require review from domain experts when the PR modified significant config files

/conf/config.toml @tikv/pd-configuration-reviewer
/server/config/config.go @tikv/pd-configuration-reviewer
/pkg/schedule/config/config.go @tikv/pd-configuration-reviewer
/pkg/schedule/schedulers/hot_region_config.go @tikv/pd-configuration-reviewer
/conf/config.toml @tikv/pd-configuration-reviewer
/pkg/mcs/resourcemanager/server/config.go @tikv/pd-configuration-reviewer
/pkg/mcs/scheduling/server/config/config.go @tikv/pd-configuration-reviewer
/pkg/mcs/tso/server/config.go @tikv/pd-configuration-reviewer
/metrics/grafana/pd.json @tikv/pd-configuration-reviewer
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ install-tools:

#### Static checks ####

check: install-tools tidy static generate-errdoc check-plugin check-test
check: install-tools tidy static generate-errdoc check-test

static: install-tools
@ echo "gofmt ..."
Expand Down
7 changes: 5 additions & 2 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ flag_management:
- type: project
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.
- type: patch
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.

ignore:
- tests/** # integration test cases or tools.
# Ignore the tool tests
- tests/dashboard
- tests/pdbackup
- tests/pdctl
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er
})

start := time.Now()
keyspaceRule := makeLabelRule(id)
keyspaceRule := MakeLabelRule(id)
cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler })
if !ok {
return errors.New("cluster does not support region label")
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func getRegionLabelID(id uint32) string {
return regionLabelIDPrefix + strconv.FormatUint(uint64(id), endpoint.SpaceIDBase)
}

// makeLabelRule makes the label rule for the given keyspace id.
func makeLabelRule(id uint32) *labeler.LabelRule {
// MakeLabelRule makes the label rule for the given keyspace id.
func MakeLabelRule(id uint32) *labeler.LabelRule {
return &labeler.LabelRule{
ID: getRegionLabelID(id),
Index: 0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ func TestMakeLabelRule(t *testing.T) {
},
}
for _, testCase := range testCases {
re.Equal(testCase.expectedLabelRule, makeLabelRule(testCase.id))
re.Equal(testCase.expectedLabelRule, MakeLabelRule(testCase.id))
}
}
69 changes: 35 additions & 34 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -54,18 +55,12 @@ func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, se

// Register registers the service to etcd.
func (sr *ServiceRegister) Register() error {
resp, err := sr.cli.Grant(sr.ctx, sr.ttl)
id, err := sr.putWithTTL()
if err != nil {
sr.cancel()
return fmt.Errorf("grant lease failed: %v", err)
return fmt.Errorf("put the key with lease %s failed: %v", sr.key, err)
}

if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil {
sr.cancel()
return fmt.Errorf("put the key %s failed: %v", sr.key, err)
}

kresp, err := sr.cli.KeepAlive(sr.ctx, resp.ID)
kresp, err := sr.cli.KeepAlive(sr.ctx, id)
if err != nil {
sr.cancel()
return fmt.Errorf("keepalive failed: %v", err)
Expand All @@ -80,31 +75,7 @@ func (sr *ServiceRegister) Register() error {
case _, ok := <-kresp:
if !ok {
log.Error("keep alive failed", zap.String("key", sr.key))
// retry
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2)
for {
select {
case <-sr.ctx.Done():
log.Info("exit register process", zap.String("key", sr.key))
t.Stop()
return
default:
}

<-t.C
resp, err := sr.cli.Grant(sr.ctx, sr.ttl)
if err != nil {
log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err))
t.Stop()
continue
}

if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil {
log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err))
t.Stop()
continue
}
}
kresp = sr.renewKeepalive()
}
}
}
Expand All @@ -113,6 +84,36 @@ func (sr *ServiceRegister) Register() error {
return nil
}

func (sr *ServiceRegister) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse {
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2)
defer t.Stop()
for {
select {
case <-sr.ctx.Done():
log.Info("exit register process", zap.String("key", sr.key))
return nil
case <-t.C:
id, err := sr.putWithTTL()
if err != nil {
log.Error("put the key with lease failed", zap.String("key", sr.key), zap.Error(err))
continue
}
kresp, err := sr.cli.KeepAlive(sr.ctx, id)
if err != nil {
log.Error("client keep alive failed", zap.String("key", sr.key), zap.Error(err))
continue
}
return kresp
}
}
}

func (sr *ServiceRegister) putWithTTL() (clientv3.LeaseID, error) {
ctx, cancel := context.WithTimeout(sr.ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
return etcdutil.EtcdKVPutWithTTL(ctx, sr.cli, sr.key, sr.value, sr.ttl)
}

// Deregister deregisters the service from etcd.
func (sr *ServiceRegister) Deregister() error {
sr.cancel()
Expand Down
44 changes: 33 additions & 11 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)
Expand All @@ -29,17 +30,13 @@ func TestRegister(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.NewFromURL(ep)
re.NoError(err)

<-etcd.Server.ReadyNotify()
// with http prefix

// Test register with http prefix.
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
re.NoError(err)
err = sr.Register()
Expand All @@ -49,20 +46,45 @@ func TestRegister(t *testing.T) {
re.NoError(err)
re.Equal("http://127.0.0.1:1", string(resp.Kvs[0].Value))

// Test deregister.
err = sr.Deregister()
re.NoError(err)
resp, err = client.Get(context.Background(), sr.key)
re.NoError(err)
re.Empty(resp.Kvs)

// Test the case that ctx is canceled.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
re.NoError(err)
err = sr.Register()
re.NoError(err)
sr.cancel()
// ensure that the lease is expired
time.Sleep(3 * time.Second)
resp, err = client.Get(context.Background(), sr.key)
re.Empty(getKeyAfterLeaseExpired(re, client, sr.key))

// Test the case that keepalive is failed when the etcd is restarted.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
re.Empty(resp.Kvs)
for i := 0; i < 3; i++ {
re.Equal("127.0.0.1:2", getKeyAfterLeaseExpired(re, client, sr.key))
etcd.Server.HardStop() // close the etcd to make the keepalive failed
time.Sleep(etcdutil.DefaultDialTimeout) // ensure that the request is timeout
etcd.Close()
etcd, err = embed.StartEtcd(cfg)
re.NoError(err)
<-etcd.Server.ReadyNotify()
testutil.Eventually(re, func() bool {
return getKeyAfterLeaseExpired(re, client, sr.key) == "127.0.0.1:2"
})
}
etcd.Close()
}

func getKeyAfterLeaseExpired(re *require.Assertions, client *clientv3.Client, key string) string {
time.Sleep(3 * time.Second) // ensure that the lease is expired
resp, err := client.Get(context.Background(), key)
re.NoError(err)
if len(resp.Kvs) == 0 {
return ""
}
return string(resp.Kvs[0].Value)
}
File renamed without changes.
Loading

0 comments on commit 19d92ff

Please sign in to comment.