-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
addrConn: change address to slice of address #1376
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rebase this and if it doesn't go pretty smoothly, let's refactor addrconn into its own file before working on this.
clientconn.go
Outdated
@@ -517,7 +517,7 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { | |||
} | |||
if !keep { | |||
del = append(del, c) | |||
delete(cc.conns, c.addr) | |||
delete(cc.conns, c.curAddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this not be addrs[0]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change it to k seems better.
clientconn.go
Outdated
ac.printf("ready") | ||
if ac.state == Shutdown { | ||
// ac.tearDown(...) has been invoked. | ||
ac.curAddr = addr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to clear this ever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initiate it each time as "0" at the beginning of resetTransport. It also avoids the race when other thread want to use conn[current address] to find current addrconn.
And also set to back to "0" when calling ac.teardown()
c062b44
to
5870195
Compare
3004921
to
9ea895d
Compare
balancer.go
Outdated
|
||
// PickFirst Balancer is a simple balancer for testing multi-addresses in one addrConn. | ||
// By using this balancer, all address shares the same addrConn. | ||
// Although it wrapped by RoundRobin balancer, the logic of all methods work fine because |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should go on pickFirst instead, otherwise it will be visible in our godoc, but it's intended for readers of the code (not the API).
balancer.go
Outdated
|
||
// The only difference is using ff.watchAddrUpdates() to use findFirstMD | ||
func (ff *pickFirst) Start(target string, config BalancerConfig) error { | ||
return ff.rr.Start(target, config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you embed the rr balancer in pickFirst, you won't need to rewrite functions like this:
type pickFirst struct {
*roundRobin
}
balancer.go
Outdated
return &pickFirst{rr: &roundRobin{r: r}} | ||
} | ||
|
||
// The only difference is using ff.watchAddrUpdates() to use findFirstMD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function comments must start with the name of the function.
In this case, you should just leave them out because the type (and, consequently this function) are not exported.
balancer.go
Outdated
} | ||
|
||
// The only difference is using ff.watchAddrUpdates() to use findFirstMD | ||
func (ff *pickFirst) Start(target string, config BalancerConfig) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pickFirst -> pf for receiver name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I'll change that. In my first implementation, I thought the word was findFirst, so I used ff.
And also I forgot to update the comment because I was using metadata type(findFirstMD) to decide which balancer in that version.
balancer.go
Outdated
return &pickFirst{rr: &roundRobin{r: r}} | ||
} | ||
|
||
// The only difference is using ff.watchAddrUpdates() to use findFirstMD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think this comment is stale. watchAddrUpdates() wasn't changed and Start just calls the RR balancer's Start.
I think you can comment pickFirst as:
pickFirst is the same as roundRobin, but with a different type to control conditional behavior in grpc internals.
clientconn.go
Outdated
for _, a := range addrs { | ||
if _, ok := cc.conns[a]; !ok { | ||
add = append(add, a) | ||
for _, addr := range addrs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is gratuitous and go style prefers shorter variable names for small loops like this (as it was before); please revert.
clientconn.go
Outdated
// not work for roundrobin | ||
cc.mu.Lock() | ||
if len(cc.conns) != 0 && (reflect.TypeOf(cc.dopts.balancer) == reflect.TypeOf(&pickFirst{})) { | ||
var currentAc *addrConn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the next 4 lines can be currentAc := cc.conns[0]
, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc.conns
is a map, not slice. This would work:
var currentAc *addrConn
for _, currentAc = range cc.conns {
 break
}
clientconn.go
Outdated
currentAc = v | ||
break | ||
} | ||
var addrInUse bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving the rest of this logic to a method in addrConn (UpdateAddresses).
clientconn.go
Outdated
stale := cc.conns[ac.addr] | ||
cc.conns[ac.addr] = ac | ||
stale := cc.conns[ac.addrs[0]] | ||
for i := 0; i < len(ac.addrs); i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for _, a := range ac.addrs {
...
clientconn.go
Outdated
// copy ac.addrs in case of race | ||
addrsIter := make([]Address, len(ac.addrs)) | ||
copy(addrsIter, ac.addrs) | ||
ac.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I was going to suggest this:
// No ac.mu.Unlock()
for _, addr := range addrsIter {
// No ac.mu.Lock()
if state == shutdown
...
ac.mu.Unlock() // as before
......
ac.mu.Lock()
}
But the logic is so complicated (continues and returns) that that is hard.
Can the body of this loop be factored out into one or a few functions? It's quite long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns won't change cause it has to be unlock before return anyway. But add some lock and unlock around continue and 'for' loop seems make the code hard to read and be modified next time.
Seems the second for loop (iterate each address) can create a single function like resetClientTransport([]Address) ?
balancer.go
Outdated
} | ||
|
||
func (ff *pickFirst) Notify() <-chan []Address { | ||
return ff.rr.addrCh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return ff.rr.Notify()
clientconn.go
Outdated
if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil { | ||
var inputaddr []Address | ||
inputaddr = append(inputaddr, Address{Addr: target}) | ||
if err := cc.resetAddrConn(inputaddr, cc.dopts.block, nil); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc.resetAddrConn([]Address{{Addr: target}}, cc.dopts.block, nil)
clientconn.go
Outdated
close(doneChan) | ||
doneChan = nil | ||
} | ||
} else { // isroundRobin, remain the same but change a to []Address{a} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not pickfirst.
clientconn.go
Outdated
// not work for roundrobin | ||
cc.mu.Lock() | ||
if len(cc.conns) != 0 && (reflect.TypeOf(cc.dopts.balancer) == reflect.TypeOf(&pickFirst{})) { | ||
var currentAc *addrConn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc.conns
is a map, not slice. This would work:
var currentAc *addrConn
for _, currentAc = range cc.conns {
 break
}
clientconn.go
Outdated
@@ -675,23 +694,81 @@ func (cc *ClientConn) scWatcher() { | |||
} | |||
} | |||
|
|||
// UpdateAddresses checks whether current address in the updating list, Update the list if true. | |||
func (cc *ClientConn) UpdateAddresses(addrs []Address) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not export this function.
clientconn.go
Outdated
// not work for roundrobin | ||
cc.mu.Lock() | ||
_, isPickFirst := cc.dopts.balancer.(*pickFirst) | ||
if isPickFirst { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if _, isPickFirst := cc.dopts.balancer.(*pickFirst); isPickFirst {
balancer.go
Outdated
*roundRobin | ||
} | ||
|
||
// PickFirstBalancer is a simple balancer for testing multi-addresses in one addrConn. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/PickFirstBalancer/pickFirstBalancer
And add period at the end.
clientconn.go
Outdated
for k, c := range cc.conns { | ||
var keep bool | ||
} else { | ||
// Not pickFirst. All remains the same but changing addr to []Address{addr} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment would look wired without the context of this PR. Try to make it more general, just describe what's happening.
Something like "Not pickFirst, create a new addrConn for each address."
balancer.go
Outdated
*roundRobin | ||
} | ||
|
||
// pickFirstBalancer is a simple balancer for testing multi-addresses in one addrConn. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this comment to type pickFirst
.
balancer.go
Outdated
// pickFirstBalancer is a simple balancer for testing multi-addresses in one addrConn. | ||
// By using this balancer, all address shares the same addrConn. | ||
// Although it wrapped by RoundRobin balancer, the logic of all methods work fine because | ||
// balancer. Get() returns the address Up by resetTransport(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra space before Get.
balancer.go
Outdated
|
||
// pickFirstBalancer is a simple balancer for testing multi-addresses in one addrConn. | ||
// By using this balancer, all address shares the same addrConn. | ||
// Although it wrapped by RoundRobin balancer, the logic of all methods work fine because |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although it's a wrapper around RoundRobin?
clientconn.go
Outdated
ac.ready = nil | ||
} | ||
ac.mu.Unlock() | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this select.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to check ac.ctx.Done()? Cause eg. there are 6 address and the ac.ctx.Done() return at the 2nd loop, the rest 4 loops don't need to continue?
clientconn.go
Outdated
return errConnClosing | ||
} | ||
ac.mu.Unlock() | ||
timeout := minConnectTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this before the for loop? We only need to calculate this once.
clientconn.go
Outdated
@@ -1150,6 +1245,7 @@ func (ac *addrConn) tearDown(err error) { | |||
ac.cancel() | |||
|
|||
ac.mu.Lock() | |||
ac.curAddr = Address{Addr: "0"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make curAddr a pointer and set it to nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Didn't know go can use nil as key.
clientconn.go
Outdated
cc.conns[addr] = currentAc | ||
} | ||
currentAc.addrs = addrs | ||
//cc.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
clientconn.go
Outdated
@@ -675,23 +694,81 @@ func (cc *ClientConn) scWatcher() { | |||
} | |||
} | |||
|
|||
// addressesUpdated checks whether current address in the updating list, Update the list if true. | |||
func (cc *ClientConn) addressesUpdated(addrs []Address) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name this as pickFirstUpdateAddresses() and comment that it's only used when balancer is pickfirst.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple nits about comments.
balancer.go
Outdated
@@ -395,3 +395,14 @@ func (rr *roundRobin) Close() error { | |||
} | |||
return nil | |||
} | |||
|
|||
// pickFirst is used to test multi-addresses in one addrConn which all address shares the same addrConn. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... in which all addresses share ...
balancer.go
Outdated
@@ -395,3 +395,14 @@ func (rr *roundRobin) Close() error { | |||
} | |||
return nil | |||
} | |||
|
|||
// pickFirst is used to test multi-addresses in one addrConn which all address shares the same addrConn. | |||
// It wrappers around roundRobin balancer. The logic of all methods works fine because balancer.Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"It wraps around" or "It is a wrapper around"
* addrConn: change address to slice of address * add pickfirst balancer to test new addrconn
addrConn.addr:Address
toaddrConn.addrs:[]Address
. cc.resetAddrConn receives []Address to create addrConn. All address shares the same addrConn/transport.ac.resetTransport
will iterate from the beginning to find a usable address to create transport. Backoff time now is for all address fail, not just single address fail.If current transport's address is in the slice, just update the slice without close and restart the transport to avoid reconnect. Otherwise reset the transport.