Skip to content

Commit 061729f

Browse files
committed
update after review
1 parent 181dc4f commit 061729f

File tree

3 files changed

+41
-28
lines changed

3 files changed

+41
-28
lines changed

balancer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,10 @@ type pickFirst struct {
400400
*roundRobin
401401
}
402402

403-
// PickFirstBalancer is a simple balancer for testing multi-addresses in one addrConn.
403+
// pickFirstBalancer is a simple balancer for testing multi-addresses in one addrConn.
404404
// By using this balancer, all address shares the same addrConn.
405405
// Although it wrapped by RoundRobin balancer, the logic of all methods work fine because
406-
// balancer.Get() returns the address Up by resetTransport()
406+
// balancer. Get() returns the address Up by resetTransport().
407407
func pickFirstBalancer(r naming.Resolver) Balancer {
408408
return &pickFirst{&roundRobin{r: r}}
409409
}

balancer_test.go

+33-20
Original file line numberDiff line numberDiff line change
@@ -442,10 +442,12 @@ func checkServerUp(t *testing.T, currentServer *server) {
442442

443443
func TestPickFirstEmptyAddrs(t *testing.T) {
444444
servers, r := startServers(t, 1, math.MaxUint32)
445+
defer servers[0].stop()
445446
cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
446447
if err != nil {
447448
t.Fatalf("Failed to create ClientConn: %v", err)
448449
}
450+
defer cc.Close()
449451
var reply string
450452
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
451453
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
@@ -465,12 +467,11 @@ func TestPickFirstEmptyAddrs(t *testing.T) {
465467
break
466468
}
467469
}
468-
cc.Close()
469-
servers[0].stop()
470470
}
471471

472472
func TestPickFirstCloseWithPendingRPC(t *testing.T) {
473473
servers, r := startServers(t, 1, math.MaxUint32)
474+
defer servers[0].stop()
474475
cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
475476
if err != nil {
476477
t.Fatalf("Failed to create ClientConn: %v", err)
@@ -514,18 +515,20 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) {
514515
time.Sleep(5 * time.Millisecond)
515516
cc.Close()
516517
wg.Wait()
517-
servers[0].stop()
518518
}
519519

520520
func TestPickFirstOrderAllServerUp(t *testing.T) {
521521
// Start 3 servers on 3 ports.
522522
numServers := 3
523523
servers, r := startServers(t, numServers, math.MaxUint32)
524+
for i := 0; i < numServers; i++ {
525+
defer servers[i].stop()
526+
}
524527
cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
525528
if err != nil {
526529
t.Fatalf("Failed to create ClientConn: %v", err)
527530
}
528-
531+
defer cc.Close()
529532
// Add servers[1] and [2] to the service discovery.
530533
u := &naming.Update{
531534
Op: naming.Add,
@@ -576,7 +579,7 @@ func TestPickFirstOrderAllServerUp(t *testing.T) {
576579
}
577580

578581
// Add server[0] back to the balancer, the incoming RPCs served in server[1]
579-
// Add is append operation, the order of Notify now is {server[2].port server[0].port}
582+
// Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
580583
u = &naming.Update{
581584
Op: naming.Add,
582585
Addr: "localhost:" + servers[0].port,
@@ -608,22 +611,38 @@ func TestPickFirstOrderAllServerUp(t *testing.T) {
608611
time.Sleep(10 * time.Millisecond)
609612
}
610613

611-
// After remove server[3], incoming RPCs still served in server[0]
612-
cc.Close()
613-
for i := 0; i < numServers; i++ {
614-
servers[i].stop()
614+
// Delete server[2] in the balancer, the incoming RPCs served in server[0]
615+
u = &naming.Update{
616+
Op: naming.Delete,
617+
Addr: "localhost:" + servers[2].port,
618+
}
619+
r.w.inject([]*naming.Update{u})
620+
for {
621+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
622+
break
623+
}
624+
time.Sleep(1 * time.Second)
625+
}
626+
for i := 0; i < 20; i++ {
627+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
628+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
629+
}
630+
time.Sleep(10 * time.Millisecond)
615631
}
616632
}
617633

618634
func TestPickFirstOrderOneServerDown(t *testing.T) {
619635
// Start 3 servers on 3 ports.
620636
numServers := 3
621637
servers, r := startServers(t, numServers, math.MaxUint32)
638+
for i := 0; i < numServers; i++ {
639+
defer servers[i].stop()
640+
}
622641
cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
623642
if err != nil {
624643
t.Fatalf("Failed to create ClientConn: %v", err)
625644
}
626-
645+
defer cc.Close()
627646
// Add servers[1] and [2] to the service discovery.
628647
u := &naming.Update{
629648
Op: naming.Add,
@@ -701,22 +720,20 @@ func TestPickFirstOrderOneServerDown(t *testing.T) {
701720
}
702721
time.Sleep(10 * time.Millisecond)
703722
}
704-
705-
// After remove server[3], incoming RPCs still served in server[0]
706-
cc.Close()
707-
for i := 0; i < numServers; i++ {
708-
servers[i].stop()
709-
}
710723
}
711724

712725
func TestPickFirstOneAddressRemoval(t *testing.T) {
713726
// Start 2 servers.
714727
numServers := 2
715728
servers, r := startServers(t, numServers, math.MaxUint32)
729+
for i := 0; i < numServers; i++ {
730+
defer servers[i].stop()
731+
}
716732
cc, err := Dial("localhost:"+servers[0].port, WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
717733
if err != nil {
718734
t.Fatalf("Failed to create ClientConn: %v", err)
719735
}
736+
defer cc.Close()
720737
// Add servers[1] to the service discovery.
721738
var updates []*naming.Update
722739
updates = append(updates, &naming.Update{
@@ -760,8 +777,4 @@ func TestPickFirstOneAddressRemoval(t *testing.T) {
760777
}()
761778
}
762779
wg.Wait()
763-
cc.Close()
764-
for i := 0; i < numServers; i++ {
765-
servers[i].stop()
766-
}
767780
}

clientconn.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
629629
}
630630
}
631631
} else {
632-
// Not pickFirst. All remains the same but changing addr to []Address{addr}
632+
// Not pickFirst, create a new addrConn for each address.
633633
var (
634634
add []Address // Addresses need to setup connections.
635635
del []*addrConn // Connections need to tear down.
@@ -694,8 +694,8 @@ func (cc *ClientConn) scWatcher() {
694694
}
695695
}
696696

697-
// UpdateAddresses checks whether current address in the updating list, Update the list if true.
698-
func (cc *ClientConn) UpdateAddresses(addrs []Address) bool {
697+
// addressesUpdated checks whether current address in the updating list, Update the list if true.
698+
func (cc *ClientConn) addressesUpdated(addrs []Address) bool {
699699
if len(cc.conns) == 0 {
700700
// No addrconn. Should go resetting addrconn.
701701
return false
@@ -723,6 +723,7 @@ func (cc *ClientConn) UpdateAddresses(addrs []Address) bool {
723723
return false
724724
}
725725

726+
// pickFirstAddrConnTearDown() should be called after lock.
726727
func (cc *ClientConn) pickFirstAddrConnTearDown() {
727728
if len(cc.conns) == 0 {
728729
return
@@ -748,11 +749,10 @@ func (cc *ClientConn) resetAddrConn(addrs []Address, block bool, tearDownErr err
748749
// if current transport in addrs, just change lists to update order and new addresses
749750
// not work for roundrobin
750751
cc.mu.Lock()
751-
_, isPickFirst := cc.dopts.balancer.(*pickFirst)
752-
if isPickFirst {
752+
if _, isPickFirst := cc.dopts.balancer.(*pickFirst); isPickFirst {
753753
// If Current address in use in the updating list, just update the list.
754754
// Otherwise, teardown current addrconn and create a new one.
755-
if cc.UpdateAddresses(addrs) {
755+
if cc.addressesUpdated(addrs) {
756756
cc.mu.Unlock()
757757
return nil
758758
}

0 commit comments

Comments
 (0)