Skip to content

Commit

Permalink
Implement happy eyeballs
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Oct 10, 2024
1 parent 00b9e14 commit 7cb88fe
Show file tree
Hide file tree
Showing 4 changed files with 500 additions and 35 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ jobs:
- type: tests
goversion: '1.22'
testflags: -race
grpcenv: 'GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true'
grpcenv: |
'GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true'
'GRPC_EXPERIMENTAL_ENABLE_PICK_FIRST_HAPPY_EYEBALLS=true'
steps:
# Setup the environment.
Expand Down
182 changes: 148 additions & 34 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
package pickfirstleaf

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/internal"
Expand All @@ -56,20 +58,29 @@ var (
// It is changed to "pick_first" in init() if this balancer is to be
// registered as the default pickfirst.
Name = "pick_first_leaf"
// timerFunc allows mocking the timer for testing connection delay related
// functionality.
timerFunc = time.After
)

// TODO: change to pick-first when this becomes the default pick_first policy.
const logPrefix = "[pick-first-leaf-lb %p] "
const (
// TODO: change to pick-first when this becomes the default pick_first policy.
logPrefix = "[pick-first-leaf-lb %p] "
// connectionDelayInterval is the time to wait for during the happy eyeballs
// pass before starting the next connection attempt.
connectionDelayInterval = 250 * time.Millisecond
)

type pickfirstBuilder struct{}

func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
callbackScheduler: callbackScheduler{},
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
Expand Down Expand Up @@ -104,8 +115,9 @@ type scData struct {
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
lastErr error
state connectivity.State
lastErr error
connectionAttempted bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
Expand Down Expand Up @@ -137,10 +149,11 @@ type pickfirstBalancer struct {
mu sync.Mutex
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
callbackScheduler callbackScheduler
}

// ResolverError is called by the ClientConn when the name resolver produces
Expand Down Expand Up @@ -232,9 +245,6 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// SubConn multiple times in the same pass. We don't want this.
newAddrs = deDupAddresses(newAddrs)

// Since we have a new set of addresses, we are again at first pass.
b.firstPass = true

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
prevAddr := b.addressList.currentAddress()
Expand All @@ -259,11 +269,11 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.requestConnectionLocked()
b.startFirstPassLocked()
} else if b.state == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.requestConnectionLocked()
b.startFirstPassLocked()
}
return nil
}
Expand All @@ -278,6 +288,7 @@ func (b *pickfirstBalancer) Close() {
b.mu.Lock()
defer b.mu.Unlock()
b.closeSubConnsLocked()
b.callbackScheduler.close()
b.state = connectivity.Shutdown
}

Expand All @@ -288,9 +299,18 @@ func (b *pickfirstBalancer) ExitIdle() {
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle && b.addressList.currentAddress() == b.addressList.first() {
b.firstPass = true
b.requestConnectionLocked()
b.startFirstPassLocked()
}
}

func (b *pickfirstBalancer) startFirstPassLocked() {
b.firstPass = true
b.numTF = 0
// Reset the connection attempt record for existing SubConns.
for _, sd := range b.subConns.Values() {
sd.(*scData).connectionAttempted = false
}
b.requestConnectionLocked()
}

func (b *pickfirstBalancer) closeSubConnsLocked() {
Expand Down Expand Up @@ -341,6 +361,7 @@ func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address)
// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
// becomes ready, which means that all other subConn must be shutdown.
func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
b.callbackScheduler.cancel()
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.subConn != selected.subConn {
Expand Down Expand Up @@ -384,8 +405,10 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
switch scd.state {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
case connectivity.TransientFailure:
// Try the next address.
scd.connectionAttempted = true
lastErr = scd.lastErr
continue
case connectivity.Ready:
Expand All @@ -396,18 +419,44 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
b.logger.Errorf("SubConn with state SHUTDOWN present in SubConns map")
case connectivity.Connecting:
// Wait for the SubConn to report success or failure.
b.scheduleNextConnectionLocked()
}
return
}

// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
// first pass.
b.endFirstPassLocked(lastErr)
// first pass if possible.
b.endFirstPassIfPossibleLocked(lastErr)
}

func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
if !envconfig.PickFirstHappyEyeballsEnabled {
return
}
b.callbackScheduler.schedule(func(ctx context.Context) {
b.mu.Lock()
defer b.mu.Unlock()
// If the scheduled task is cancelled while acquiring the mutex, return.
if ctx.Err() != nil {
return
}
if b.logger.V(2) {
b.logger.Infof("Happy Eyeballs timer expired.")
}
if b.addressList.increment() {
b.requestConnectionLocked()
}
}, connectionDelayInterval)
}

func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
// Record a connection attempt when existing CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionAttempted = true
}
sd.state = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
Expand Down Expand Up @@ -473,17 +522,20 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
sd.lastErr = newState.ConnectionError
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
// a pass over the previous address list. We ignore such updates.

if curAddr := b.addressList.currentAddress(); !equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
return
}
if b.addressList.increment() {
b.requestConnectionLocked()
return
// a pass over the previous address list. Happy Eyeballs will also
// cause out of order updates to arrive.

if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
if b.addressList.increment() {
b.callbackScheduler.cancel()
b.requestConnectionLocked()
return
}
}
// End of the first pass.
b.endFirstPassLocked(newState.ConnectionError)

// End the first pass if we've seen a TRANSIENT_FAILURE from all
// SubConns once.
b.endFirstPassIfPossibleLocked(newState.ConnectionError)
}
return
}
Expand All @@ -508,9 +560,17 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
}
}

func (b *pickfirstBalancer) endFirstPassLocked(lastErr error) {
func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
if b.addressList.isValid() || b.subConns.Len() < b.addressList.size() {
return
}
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if !sd.connectionAttempted {
return
}
}
b.firstPass = false
b.numTF = 0
b.state = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
Expand Down Expand Up @@ -622,3 +682,57 @@ func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
a.Attributes.Equal(b.Attributes) &&
a.Metadata == b.Metadata
}

// callbackScheduleris used to schedule the execution of a callback after a
// a specified delay. It is not safe for concurrent access.
type callbackScheduler struct {
cancelScheduled func()
closed bool
wg sync.WaitGroup
}

// schedule schedules the execution of a callback. It cancels any previously
// scheduled callbacks.
func (c *callbackScheduler) schedule(f func(context.Context), after time.Duration) {
if c.closed {
return
}
c.cancel()
ctx, cancel := context.WithCancel(context.Background())
c.cancelScheduled = sync.OnceFunc(cancel)
c.wg.Add(1)

go func() {
select {
case <-timerFunc(after):
c.wg.Done()
// f() may try to acquire the balancer mutex. Calling wg.Done()
// after f() finishes may cause a dedlock because balancer.Close()
// would be holding the mutex when calling callbackScheduler.close()
// which waits for wg.Done().
f(ctx)
case <-ctx.Done():
c.wg.Done()
}
}()
}

// cancel prevents the execution of the scheduled callback if a callback is
// awaiting execution. If a callback is a callback is already being executed,
// it cancels the context passed to it.
func (c *callbackScheduler) cancel() {
if c.cancelScheduled != nil {
c.cancelScheduled()
}
}

// close closes the callbackScheduler and waits for all spawned goroutines to
// exit. No callbacks are scheduled after this method returns.
func (c *callbackScheduler) close() {
if c.closed {
return
}
c.cancel()
c.closed = true
c.wg.Wait()
}
Loading

0 comments on commit 7cb88fe

Please sign in to comment.