Skip to content

Commit f68c03d

Browse files
committed
Interleave addresses for happy eyeballs
1 parent ad81c20 commit f68c03d

File tree

3 files changed

+189
-3
lines changed

3 files changed

+189
-3
lines changed

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

+78-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"encoding/json"
3030
"errors"
3131
"fmt"
32+
"net"
3233
"sync"
3334

3435
"google.golang.org/grpc/balancer"
@@ -61,6 +62,14 @@ var (
6162
// TODO: change to pick-first when this becomes the default pick_first policy.
6263
const logPrefix = "[pick-first-leaf-lb %p] "
6364

65+
type ipAddrType int
66+
67+
const (
68+
ipTypeUnknown ipAddrType = iota
69+
ipv4
70+
ipv6
71+
)
72+
6473
type pickfirstBuilder struct{}
6574

6675
func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
@@ -206,9 +215,6 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
206215
// "Flatten the list by concatenating the ordered list of addresses for
207216
// each of the endpoints, in order." - A61
208217
for _, endpoint := range endpoints {
209-
// "In the flattened list, interleave addresses from the two address
210-
// families, as per RFC-8305 section 4." - A61
211-
// TODO: support the above language.
212218
newAddrs = append(newAddrs, endpoint.Addresses...)
213219
}
214220
} else {
@@ -232,6 +238,10 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
232238
// SubConn multiple times in the same pass. We don't want this.
233239
newAddrs = deDupAddresses(newAddrs)
234240

241+
// Interleave addresses of both famalies (IPv4 and IPv6) as per RFC-8305
242+
// section 4.
243+
newAddrs = interleaveAddresses(newAddrs)
244+
235245
// Since we have a new set of addresses, we are again at first pass.
236246
b.firstPass = true
237247

@@ -314,6 +324,71 @@ func deDupAddresses(addrs []resolver.Address) []resolver.Address {
314324
return retAddrs
315325
}
316326

327+
func interleaveAddresses(addrs []resolver.Address) []resolver.Address {
328+
// Group the addresses by their type and determine the order in which to
329+
// interleave the address families. The order of interleaving the families
330+
// is the order in which the first address of a particular type appears in
331+
// the address list.
332+
familyAddrsMap := map[ipAddrType][]resolver.Address{}
333+
interleavingOrder := []ipAddrType{}
334+
for _, addr := range addrs {
335+
typ := addressType(addr.Addr)
336+
if _, found := familyAddrsMap[typ]; !found {
337+
interleavingOrder = append(interleavingOrder, typ)
338+
}
339+
familyAddrsMap[typ] = append(familyAddrsMap[typ], addr)
340+
}
341+
342+
interleavedAddrs := make([]resolver.Address, 0, len(addrs))
343+
curTypeIndex := 0
344+
for i := 0; i < len(addrs); i++ {
345+
// Some IP types may have fewer addresses than others, so we look for
346+
// the next type that has a remaining member to add to the interleaved
347+
// list.
348+
for {
349+
curType := interleavingOrder[curTypeIndex]
350+
remainingMembers := familyAddrsMap[curType]
351+
if len(remainingMembers) > 0 {
352+
break
353+
}
354+
curTypeIndex = (curTypeIndex + 1) % len(interleavingOrder)
355+
}
356+
curType := interleavingOrder[curTypeIndex]
357+
remainingMembers := familyAddrsMap[curType]
358+
interleavedAddrs = append(interleavedAddrs, remainingMembers[0])
359+
familyAddrsMap[curType] = remainingMembers[1:]
360+
curTypeIndex = (curTypeIndex + 1) % len(interleavingOrder)
361+
}
362+
363+
return interleavedAddrs
364+
}
365+
366+
func addressType(address string) ipAddrType {
367+
// Try parsing addresses without a port specified.
368+
ip := net.ParseIP(address)
369+
if ip == nil {
370+
// Try to parse the IP after removing the port.
371+
host, _, err := net.SplitHostPort(address)
372+
if err != nil {
373+
return ipTypeUnknown
374+
}
375+
ip = net.ParseIP(host)
376+
}
377+
378+
// If using the passthrough resolver, the hostnames would be unresolved
379+
// and therefore not valid IP addresses.
380+
if ip == nil {
381+
return ipTypeUnknown
382+
}
383+
384+
if ip.To4() != nil {
385+
return ipv4
386+
} else if ip.To16() != nil {
387+
return ipv6
388+
}
389+
return ipTypeUnknown
390+
}
391+
317392
// reconcileSubConnsLocked updates the active subchannels based on a new address
318393
// list from the resolver. It does this by:
319394
// - closing subchannels: any existing subchannels associated with addresses

balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go

+109
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/google/go-cmp/cmp"
2728
"google.golang.org/grpc/attributes"
2829
"google.golang.org/grpc/balancer"
2930
"google.golang.org/grpc/connectivity"
@@ -257,3 +258,111 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
257258
t.Fatalf("cc.WaitForPickerWithErr(%v) returned error: %v", newTfErr, err)
258259
}
259260
}
261+
262+
func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) {
263+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
264+
defer cancel()
265+
cc := testutils.NewBalancerClientConn(t)
266+
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
267+
defer bal.Close()
268+
ccState := balancer.ClientConnState{
269+
ResolverState: resolver.State{
270+
Endpoints: []resolver.Endpoint{
271+
{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}, // no port
272+
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
273+
{Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}},
274+
{Addresses: []resolver.Address{{Addr: "4.4.4.4:4"}}},
275+
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // ipv6 with port
276+
{Addresses: []resolver.Address{{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"}}},
277+
{Addresses: []resolver.Address{{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"}}},
278+
{Addresses: []resolver.Address{{Addr: "grpc.io:80"}}}, // not an IP.
279+
},
280+
},
281+
}
282+
if err := bal.UpdateClientConnState(ccState); err != nil {
283+
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
284+
}
285+
286+
wantAddrs := []resolver.Address{
287+
{Addr: "1.1.1.1"},
288+
{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"},
289+
{Addr: "grpc.io:80"},
290+
{Addr: "2.2.2.2:2"},
291+
{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"},
292+
{Addr: "3.3.3.3:3"},
293+
{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"},
294+
{Addr: "4.4.4.4:4"},
295+
}
296+
297+
gotAddrs, err := subConnAddresses(ctx, cc, 8)
298+
if err != nil {
299+
t.Fatalf("%v", err)
300+
}
301+
if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" {
302+
t.Errorf("subconn creation order mismatch (-want +got):\n%s", diff)
303+
}
304+
}
305+
306+
func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) {
307+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
308+
defer cancel()
309+
cc := testutils.NewBalancerClientConn(t)
310+
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
311+
defer bal.Close()
312+
ccState := balancer.ClientConnState{
313+
ResolverState: resolver.State{
314+
Endpoints: []resolver.Endpoint{
315+
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // ipv6 with port
316+
{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}, // not port
317+
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
318+
{Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}},
319+
{Addresses: []resolver.Address{{Addr: "4.4.4.4:4"}}},
320+
{Addresses: []resolver.Address{{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"}}},
321+
{Addresses: []resolver.Address{{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"}}},
322+
{Addresses: []resolver.Address{{Addr: "grpc.io:80"}}}, // not an IP.
323+
},
324+
},
325+
}
326+
if err := bal.UpdateClientConnState(ccState); err != nil {
327+
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
328+
}
329+
330+
wantAddrs := []resolver.Address{
331+
{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"},
332+
{Addr: "1.1.1.1"},
333+
{Addr: "grpc.io:80"},
334+
{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"},
335+
{Addr: "2.2.2.2:2"},
336+
{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"},
337+
{Addr: "3.3.3.3:3"},
338+
{Addr: "4.4.4.4:4"},
339+
}
340+
341+
gotAddrs, err := subConnAddresses(ctx, cc, 8)
342+
if err != nil {
343+
t.Fatalf("%v", err)
344+
}
345+
if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" {
346+
t.Errorf("subconn creation order mismatch (-want +got):\n%s", diff)
347+
}
348+
}
349+
func subConnAddresses(ctx context.Context, cc *testutils.BalancerClientConn, subConnCount int) ([]resolver.Address, error) {
350+
addresses := []resolver.Address{}
351+
for i := 0; i < subConnCount; i++ {
352+
select {
353+
case <-ctx.Done():
354+
return nil, fmt.Errorf("Context timed out waiting for SubConn")
355+
case sc := <-cc.NewSubConnCh:
356+
if len(sc.Addresses) != 1 {
357+
return nil, fmt.Errorf("len(SubConn.Addresses) = %d, want 1", len(sc.Addresses))
358+
}
359+
addresses = append(addresses, sc.Addresses[0])
360+
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
361+
sc.UpdateState(balancer.SubConnState{
362+
ConnectivityState: connectivity.TransientFailure,
363+
ConnectionError: fmt.Errorf("test error"),
364+
})
365+
}
366+
}
367+
return addresses, nil
368+
}

internal/testutils/balancer.go

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type TestSubConn struct {
3737
ConnectCh chan struct{}
3838
stateListener func(balancer.SubConnState)
3939
connectCalled *grpcsync.Event
40+
Addresses []resolver.Address
4041
}
4142

4243
// NewTestSubConn returns a newly initialized SubConn. Typically, subconns
@@ -131,6 +132,7 @@ func (tcc *BalancerClientConn) NewSubConn(a []resolver.Address, o balancer.NewSu
131132
ConnectCh: make(chan struct{}, 1),
132133
stateListener: o.StateListener,
133134
connectCalled: grpcsync.NewEvent(),
135+
Addresses: a,
134136
}
135137
tcc.subConnIdx++
136138
tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)

0 commit comments

Comments
 (0)