Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1118 to release-2.0 (#1157)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

Co-authored-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
ti-srebot and lance6716 authored Oct 14, 2020
1 parent 01cdcf8 commit 666a89b
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 24 deletions.
38 changes: 30 additions & 8 deletions dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (
)

const (
oneselfLeader = "oneself"
oneselfLeader = "oneself"
oneselfStartingLeader = "starting"
)

func (s *Server) electionNotify(ctx context.Context) {
Expand All @@ -40,7 +41,7 @@ func (s *Server) electionNotify(ctx context.Context) {
case leaderInfo := <-s.election.LeaderNotify():
// retire from leader
if leaderInfo == nil {
if s.leader == oneselfLeader {
if s.leader.Get() == oneselfLeader {
s.retireLeader()
log.L().Info("current member retire from the leader", zap.String("current member", s.cfg.Name))
} else {
Expand All @@ -54,6 +55,7 @@ func (s *Server) electionNotify(ctx context.Context) {
if leaderInfo.ID == s.cfg.Name {
// this member become leader
log.L().Info("current member become the leader", zap.String("current member", s.cfg.Name))
s.leader.Set(oneselfStartingLeader)

ok := s.startLeaderComponent(ctx)

Expand All @@ -64,7 +66,7 @@ func (s *Server) electionNotify(ctx context.Context) {
}

s.Lock()
s.leader = oneselfLeader
s.leader.Set(oneselfLeader)
s.closeLeaderClient()
s.Unlock()

Expand All @@ -84,7 +86,7 @@ func (s *Server) electionNotify(ctx context.Context) {
log.L().Info("get new leader", zap.String("leader", leaderInfo.ID), zap.String("current member", s.cfg.Name))

s.Lock()
s.leader = leaderInfo.ID
s.leader.Set(leaderInfo.ID)
s.createLeaderClient(leaderInfo.Addr)
s.Unlock()
}
Expand Down Expand Up @@ -125,12 +127,32 @@ func (s *Server) closeLeaderClient() {
}
}

func (s *Server) isLeaderAndNeedForward() (isLeader bool, needForward bool) {
func (s *Server) isLeaderAndNeedForward(ctx context.Context) (isLeader bool, needForward bool) {
// maybe in `startLeaderComponent`, will wait for a short time
if s.leader.Get() == oneselfStartingLeader {
retry := 10
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for s.leader.Get() == oneselfStartingLeader {
if retry == 0 {
log.L().Error("leader didn't finish starting after retry, please manually retry later")
return false, false
}
select {
case <-ctx.Done():
return false, false
case <-ticker.C:
retry--
}
}
}

s.RLock()
defer s.RUnlock()

isLeader = (s.leader == oneselfLeader)
needForward = (s.leaderGrpcConn != nil)
isLeader = s.leader.Get() == oneselfLeader
needForward = s.leaderGrpcConn != nil
return
}

Expand Down Expand Up @@ -172,7 +194,7 @@ func (s *Server) retireLeader() {
s.scheduler.Close()

s.Lock()
s.leader = ""
s.leader.Set("")
s.closeLeaderClient()
s.Unlock()

Expand Down
9 changes: 6 additions & 3 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/atomic2"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/cputil"
Expand Down Expand Up @@ -95,7 +96,9 @@ type Server struct {
etcdClient *clientv3.Client
election *election.Election

leader string
// below three leader related variables should be protected by a lock (currently Server's lock) to provide integrity
// except for leader == oneselfStartingLeader which is a intermedia state, which means caller may retry sometime later
leader atomic2.AtomicString
leaderClient pb.MasterClient
leaderGrpcConn *grpc.ClientConn

Expand Down Expand Up @@ -1894,12 +1897,12 @@ func (s *Server) sharedLogic(ctx context.Context, req interface{}, respPointer i
// }
// return nil, terror.ErrMasterRequestIsNotForwardToLeader
// }
isLeader, needForward := s.isLeaderAndNeedForward()
isLeader, needForward := s.isLeaderAndNeedForward(ctx)
if isLeader {
return false
}
if needForward {
log.L().Info("will forward after a short interval", zap.String("from", s.cfg.Name), zap.String("to", s.leader), zap.String("request", methodName))
log.L().Info("will forward after a short interval", zap.String("from", s.cfg.Name), zap.String("to", s.leader.Get()), zap.String("request", methodName))
time.Sleep(100 * time.Millisecond)
params := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(req)}
results := reflect.ValueOf(s.leaderClient).MethodByName(methodName).Call(params)
Expand Down
8 changes: 4 additions & 4 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func testDefaultMasterServer(c *check.C) *Server {
c.Assert(err, check.IsNil)
cfg.DataDir = c.MkDir()
server := NewServer(cfg)
server.leader = oneselfLeader
server.leader.Set(oneselfLeader)
go server.ap.Start(context.Background())

return server
Expand Down Expand Up @@ -1058,7 +1058,7 @@ func (t *testMaster) TestOperateSource(c *check.C) {
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)

s1 := NewServer(cfg1)
s1.leader = oneselfLeader
s1.leader.Set(oneselfLeader)
c.Assert(s1.Start(ctx), check.IsNil)
defer s1.Close()
mysqlCfg := config.NewSourceConfig()
Expand Down Expand Up @@ -1259,15 +1259,15 @@ func (t *testMaster) TestOfflineMember(c *check.C) {
// ensure s2 has got the right leader info, because it will be used to `OfflineMember`.
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
s2.RLock()
leader := s2.leader
leader := s2.leader.Get()
s2.RUnlock()
if leader == "" {
return false
}
if leader == oneselfLeader {
leaderID = s2.cfg.Name
} else {
leaderID = s2.leader
leaderID = s2.leader.Get()
}
return true
}), check.IsTrue)
Expand Down
19 changes: 19 additions & 0 deletions pkg/atomic2/atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,22 @@ func (e *AtomicError) Get() error {
func (e *AtomicError) Set(err error) {
atomic.StorePointer(&e.p, unsafe.Pointer(&err))
}

// AtomicString implements atomic string method
type AtomicString struct {
p unsafe.Pointer
}

// Get returns string
func (s *AtomicString) Get() string {
p := atomic.LoadPointer(&s.p)
if p == nil {
return ""
}
return *(*string)(p)
}

// Set sets string to AtomicString
func (s *AtomicString) Set(str string) {
atomic.StorePointer(&s.p, unsafe.Pointer(&str))
}
40 changes: 31 additions & 9 deletions pkg/atomic2/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,57 @@ import (
"errors"
"testing"

"github.com/pingcap/check"
. "github.com/pingcap/check"
)

func TestT(t *testing.T) {
check.TestingT(t)
TestingT(t)
}

var _ = check.Suite(&testAtomicSuite{})
var _ = Suite(&testAtomicSuite{})

type testAtomicSuite struct{}

func (t *testAtomicSuite) TestAtomicError(c *check.C) {
func (t *testAtomicSuite) TestAtomicError(c *C) {
var (
e AtomicError
err = errors.New("test")
)
err2 := e.Get()
c.Assert(err2, check.Equals, nil)
c.Assert(err2, Equals, nil)

e.Set(err)
err2 = e.Get()
c.Assert(err2, check.DeepEquals, err)
c.Assert(err2, DeepEquals, err)

err = errors.New("test2")
err2 = e.Get()
c.Assert(err2.Error(), check.Equals, "test")
c.Assert(err2, check.Not(check.Equals), err)
c.Assert(err2.Error(), Equals, "test")
c.Assert(err2, Not(Equals), err)

e.Set(nil)
err2 = e.Get()
c.Assert(err2, check.Equals, nil)
c.Assert(err2, Equals, nil)
}

func (t *testAtomicSuite) TestAtomicString(c *C) {
var (
s AtomicString
str = "test"
)
str2 := s.Get()
c.Assert(str2, Equals, "")

s.Set(str)
str2 = s.Get()
c.Assert(str2, Equals, str)

originStr := str
str = "test2" //nolint:ineffassign
str2 = s.Get()
c.Assert(str2, Equals, originStr)

s.Set("")
str2 = s.Get()
c.Assert(str2, Equals, "")
}

0 comments on commit 666a89b

Please sign in to comment.