Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: Implement Happy Eyeballs #7725

Merged
merged 26 commits into from
Nov 12, 2024
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
db0dda7
Implement happy eyeballs
arjan-bal Oct 8, 2024
826bb03
Use timeAfterFunc
arjan-bal Oct 11, 2024
4e68e58
Address review comments
arjan-bal Oct 11, 2024
fe69816
Move timer func to internal, improve log statement and address review…
arjan-bal Oct 16, 2024
3022304
Remove env var
arjan-bal Oct 16, 2024
0a3ffd3
Change to e2e style test
arjan-bal Oct 16, 2024
6697267
Fix vet
arjan-bal Oct 16, 2024
67f7a1a
Fix vet
arjan-bal Oct 16, 2024
9712ec5
refactor test
arjan-bal Oct 16, 2024
6c8fb41
Avoid creating a context
arjan-bal Oct 16, 2024
04a912f
address review comments
arjan-bal Oct 17, 2024
0bb745f
Use BlockingDialer instead of implementing a hanging server
arjan-bal Oct 17, 2024
99e2e89
Fix test synchronization
arjan-bal Oct 17, 2024
592ba0d
Test refactorings
arjan-bal Oct 18, 2024
84d6ed4
Cancel timer when processing new resolver update
arjan-bal Oct 19, 2024
8f63d8e
Improve whitespaces and comments
arjan-bal Oct 23, 2024
09f27c6
Merge branch 'master' of github.com:grpc/grpc-go into grpc-go-happy-e…
arjan-bal Oct 23, 2024
6610516
Refactor fake timer
arjan-bal Oct 23, 2024
d6bc007
Don't use expired context
arjan-bal Oct 23, 2024
19a3165
Remove unnecessary timer in test
arjan-bal Oct 23, 2024
598fdd0
Address review comments
arjan-bal Oct 24, 2024
d3bde50
Merge remote-tracking branch 'source/master' into grpc-go-happy-eyeballs
arjan-bal Nov 6, 2024
8b4b28e
Remove stale comment
arjan-bal Nov 6, 2024
6c16943
Use rand/v2
arjan-bal Nov 7, 2024
11fe515
Address review comments
arjan-bal Nov 8, 2024
5c4ff49
Rename to connectionFailedInFirstPass
arjan-bal Nov 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Implement happy eyeballs
arjan-bal committed Oct 10, 2024
commit db0dda79189880c2606c62604c91d52322f72205
4 changes: 3 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
@@ -73,7 +73,9 @@ jobs:
- type: tests
goversion: '1.22'
testflags: -race
grpcenv: 'GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true'
grpcenv: |
'GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true'
'GRPC_EXPERIMENTAL_ENABLE_PICK_FIRST_HAPPY_EYEBALLS=true'
steps:
# Setup the environment.
182 changes: 148 additions & 34 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
@@ -26,10 +26,12 @@
package pickfirstleaf

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/internal"
@@ -56,20 +58,29 @@
// It is changed to "pick_first" in init() if this balancer is to be
// registered as the default pickfirst.
Name = "pick_first_leaf"
// timerFunc allows mocking the timer for testing connection delay related
// functionality.
timerFunc = time.After
)

// TODO: change to pick-first when this becomes the default pick_first policy.
const logPrefix = "[pick-first-leaf-lb %p] "
const (
// TODO: change to pick-first when this becomes the default pick_first policy.
logPrefix = "[pick-first-leaf-lb %p] "
// connectionDelayInterval is the time to wait for during the happy eyeballs
// pass before starting the next connection attempt.
connectionDelayInterval = 250 * time.Millisecond
)

type pickfirstBuilder struct{}

func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
callbackScheduler: callbackScheduler{},
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
@@ -104,8 +115,9 @@
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
lastErr error
state connectivity.State
lastErr error
connectionAttempted bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
@@ -137,10 +149,11 @@
mu sync.Mutex
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
callbackScheduler callbackScheduler
}

// ResolverError is called by the ClientConn when the name resolver produces
@@ -232,9 +245,6 @@
// SubConn multiple times in the same pass. We don't want this.
newAddrs = deDupAddresses(newAddrs)

// Since we have a new set of addresses, we are again at first pass.
b.firstPass = true

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
prevAddr := b.addressList.currentAddress()
@@ -259,11 +269,11 @@
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.requestConnectionLocked()
b.startFirstPassLocked()
} else if b.state == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.requestConnectionLocked()
b.startFirstPassLocked()
}
return nil
}
@@ -278,6 +288,7 @@
b.mu.Lock()
defer b.mu.Unlock()
b.closeSubConnsLocked()
b.callbackScheduler.close()
b.state = connectivity.Shutdown
}

@@ -288,9 +299,18 @@
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle && b.addressList.currentAddress() == b.addressList.first() {
b.firstPass = true
b.requestConnectionLocked()
b.startFirstPassLocked()
}
}

func (b *pickfirstBalancer) startFirstPassLocked() {
b.firstPass = true
b.numTF = 0
// Reset the connection attempt record for existing SubConns.
for _, sd := range b.subConns.Values() {
sd.(*scData).connectionAttempted = false
}
b.requestConnectionLocked()
}

func (b *pickfirstBalancer) closeSubConnsLocked() {
@@ -341,6 +361,7 @@
// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
// becomes ready, which means that all other subConn must be shutdown.
func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
b.callbackScheduler.cancel()
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.subConn != selected.subConn {
@@ -384,8 +405,10 @@
switch scd.state {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
case connectivity.TransientFailure:
// Try the next address.
scd.connectionAttempted = true
lastErr = scd.lastErr
continue
case connectivity.Ready:
@@ -396,18 +419,44 @@
b.logger.Errorf("SubConn with state SHUTDOWN present in SubConns map")
case connectivity.Connecting:
// Wait for the SubConn to report success or failure.
b.scheduleNextConnectionLocked()
}
return
}

// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
// first pass.
b.endFirstPassLocked(lastErr)
// first pass if possible.
b.endFirstPassIfPossibleLocked(lastErr)
}

func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
if !envconfig.PickFirstHappyEyeballsEnabled {
return
}
b.callbackScheduler.schedule(func(ctx context.Context) {
b.mu.Lock()
defer b.mu.Unlock()
// If the scheduled task is cancelled while acquiring the mutex, return.
if ctx.Err() != nil {
return

Check warning on line 441 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L441

Added line #L441 was not covered by tests
}
if b.logger.V(2) {
b.logger.Infof("Happy Eyeballs timer expired.")

Check warning on line 444 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L444

Added line #L444 was not covered by tests
}
if b.addressList.increment() {
b.requestConnectionLocked()
}
}, connectionDelayInterval)
}

func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
// Record a connection attempt when existing CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionAttempted = true
}
sd.state = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
@@ -473,17 +522,20 @@
sd.lastErr = newState.ConnectionError
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
// a pass over the previous address list. We ignore such updates.

if curAddr := b.addressList.currentAddress(); !equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
return
}
if b.addressList.increment() {
b.requestConnectionLocked()
return
// a pass over the previous address list. Happy Eyeballs will also
// cause out of order updates to arrive.

if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
b.callbackScheduler.cancel()
if b.addressList.increment() {
b.requestConnectionLocked()
return
}
}
// End of the first pass.
b.endFirstPassLocked(newState.ConnectionError)

// End the first pass if we've seen a TRANSIENT_FAILURE from all
// SubConns once.
b.endFirstPassIfPossibleLocked(newState.ConnectionError)
}
return
}
@@ -508,9 +560,17 @@
}
}

func (b *pickfirstBalancer) endFirstPassLocked(lastErr error) {
func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
if b.addressList.isValid() || b.subConns.Len() < b.addressList.size() {
return
}
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if !sd.connectionAttempted {
return

Check warning on line 570 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L570

Added line #L570 was not covered by tests
}
}
b.firstPass = false
b.numTF = 0
b.state = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
@@ -622,3 +682,57 @@
a.Attributes.Equal(b.Attributes) &&
a.Metadata == b.Metadata
}

// callbackScheduleris used to schedule the execution of a callback after a
// a specified delay. It is not safe for concurrent access.
type callbackScheduler struct {
cancelScheduled func()
closed bool
wg sync.WaitGroup
}

// schedule schedules the execution of a callback. It cancels any previously
// scheduled callbacks.
func (c *callbackScheduler) schedule(f func(context.Context), after time.Duration) {
if c.closed {
return

Check warning on line 698 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L698

Added line #L698 was not covered by tests
}
c.cancel()
ctx, cancel := context.WithCancel(context.Background())
c.cancelScheduled = sync.OnceFunc(cancel)
c.wg.Add(1)

go func() {
select {
case <-timerFunc(after):
c.wg.Done()
// f() may try to acquire the balancer mutex. Calling wg.Done()
// after f() finishes may cause a dedlock because balancer.Close()
// would be holding the mutex when calling callbackScheduler.close()
// which waits for wg.Done().
f(ctx)
case <-ctx.Done():
c.wg.Done()
}
}()
}

// cancel prevents the execution of the scheduled callback if a callback is
// awaiting execution. If a callback is a callback is already being executed,
// it cancels the context passed to it.
func (c *callbackScheduler) cancel() {
if c.cancelScheduled != nil {
c.cancelScheduled()
}
}

// close closes the callbackScheduler and waits for all spawned goroutines to
// exit. No callbacks are scheduled after this method returns.
func (c *callbackScheduler) close() {
if c.closed {
return

Check warning on line 733 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L733

Added line #L733 was not covered by tests
}
c.cancel()
c.closed = true
c.wg.Wait()
}
Loading