Skip to content

Commit 4af33d1

Browse files
committed
fix race issues
1 parent 79d8768 commit 4af33d1

File tree

1 file changed

+9
-25
lines changed

1 file changed

+9
-25
lines changed

clientconn.go

+9-25
Original file line numberDiff line numberDiff line change
@@ -499,13 +499,7 @@ type ClientConn struct {
499499
// connections accordingly. If doneChan is not nil, it is closed after the
500500
// first successfull connection is made.
501501
func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
502-
var (
503-
firstFindConn *addrConn // tear down when the cc.conns is empty
504-
isPickFirst bool // true: roundrobin, false: pickfirst
505-
)
506-
if reflect.TypeOf(cc.dopts.balancer) == reflect.TypeOf(&pickFirst{}) {
507-
isPickFirst = true
508-
}
502+
isPickFirst := reflect.TypeOf(cc.dopts.balancer) == reflect.TypeOf(&pickFirst{})
509503
for addrs := range cc.dopts.balancer.Notify() {
510504
var (
511505
add []Address // Addresses need to setup connections.
@@ -515,12 +509,9 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
515509
for _, addr := range addrs {
516510
if _, ok := cc.conns[addr]; !ok {
517511
add = append(add, addr)
518-
} else {
519-
firstFindConn = cc.conns[addr]
520512
}
521513
}
522514
for k, c := range cc.conns {
523-
firstFindConn = c
524515
var keep bool
525516
for _, a := range addrs {
526517
if k == a {
@@ -554,20 +545,11 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
554545
}
555546
}
556547
}
557-
558-
if isPickFirst {
559-
// Notify may delete all address. time to tear down, or leave it?
560-
if len(cc.conns) == 0 {
561-
if firstFindConn != nil {
562-
firstFindConn.tearDown(errConnDrain)
563-
}
564-
}
565-
} else {
548+
if !isPickFirst {
566549
for _, c := range del {
567550
c.tearDown(errConnDrain)
568551
}
569552
}
570-
571553
}
572554
}
573555

@@ -599,6 +581,7 @@ func (cc *ClientConn) scWatcher() {
599581
func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr error) error {
600582
// if current transport in addrs, just change lists to update order and new addresses
601583
// not work for roundrobin
584+
cc.mu.Lock()
602585
if len(cc.conns) != 0 && (reflect.TypeOf(cc.dopts.balancer) == reflect.TypeOf(&pickFirst{})) {
603586
var currentAc *addrConn
604587
for _, v := range cc.conns {
@@ -607,7 +590,7 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err
607590
}
608591
var addrInUse bool
609592
for _, addr := range addrs {
610-
if addr == currentAc.curAddr {
593+
if strings.Compare(addr.Addr, currentAc.curAddr.Addr) == 0 {
611594
addrInUse = true
612595
break
613596
}
@@ -618,9 +601,11 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err
618601
cc.conns[addr] = currentAc
619602
}
620603
currentAc.addrs = addrs
604+
cc.mu.Unlock()
621605
return nil
622606
}
623607
}
608+
cc.mu.Unlock()
624609

625610
ac := &addrConn{
626611
cc: cc,
@@ -887,7 +872,7 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
887872
return ac.state, nil
888873
}
889874

890-
// resetTransport recreates a transport to the address for ac.
875+
/// resetTransport recreates a transport to the address for ac.
891876
// For the old transport:
892877
// - if drain is true, it will be gracefully closed.
893878
// - otherwise, it will be closed.
@@ -947,14 +932,13 @@ func (ac *addrConn) resetTransport(drain bool) error {
947932
cancel()
948933

949934
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
950-
ac.curAddr.Addr = ""
935+
//ac.curAddr.Addr = ""
951936
return err
952937
}
953938
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)
954939
ac.mu.Lock()
955940
if ac.state == Shutdown {
956941
// ac.tearDown(...) has been invoked.
957-
ac.curAddr.Addr = ""
958942
ac.mu.Unlock()
959943
return errConnClosing
960944
}
@@ -977,7 +961,7 @@ func (ac *addrConn) resetTransport(drain bool) error {
977961
ac.printf("ready")
978962
if ac.state == Shutdown {
979963
// ac.tearDown(...) has been invoked.
980-
ac.curAddr.Addr = ""
964+
//ac.curAddr.Addr = ""
981965
ac.mu.Unlock()
982966
newTransport.Close()
983967
return errConnClosing

0 commit comments

Comments
 (0)