Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: Interleave IPv6 and IPv4 addresses for happy eyeballs #7742

Merged
74 changes: 71 additions & 3 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
"encoding/json"
"errors"
"fmt"
"net"
"sync"

"google.golang.org/grpc/balancer"
@@ -61,6 +62,14 @@
// TODO: change to pick-first when this becomes the default pick_first policy.
const logPrefix = "[pick-first-leaf-lb %p] "

type ipAddrType int

const (
ipTypeUnknown ipAddrType = iota
ipv4
ipv6
)

type pickfirstBuilder struct{}

func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
@@ -206,9 +215,6 @@
// "Flatten the list by concatenating the ordered list of addresses for
// each of the endpoints, in order." - A61
for _, endpoint := range endpoints {
// "In the flattened list, interleave addresses from the two address
// families, as per RFC-8305 section 4." - A61
// TODO: support the above language.
newAddrs = append(newAddrs, endpoint.Addresses...)
}
} else {
@@ -232,6 +238,10 @@
// SubConn multiple times in the same pass. We don't want this.
newAddrs = deDupAddresses(newAddrs)

// Interleave addresses of both families (IPv4 and IPv6) as per RFC-8305
// section 4.
newAddrs = interleaveAddresses(newAddrs)

// Since we have a new set of addresses, we are again at first pass.
b.firstPass = true

@@ -314,6 +324,64 @@
return retAddrs
}

func interleaveAddresses(addrs []resolver.Address) []resolver.Address {
// Group the addresses by their type and determine the order in which to
// interleave the address families. The order of interleaving the families
// is the order in which the first address of a particular type appears in
// the address list.
familyAddrsMap := map[ipAddrType][]resolver.Address{}
interleavingOrder := []ipAddrType{}
for _, addr := range addrs {
typ := addressType(addr.Addr)
if _, found := familyAddrsMap[typ]; !found {
interleavingOrder = append(interleavingOrder, typ)
}
familyAddrsMap[typ] = append(familyAddrsMap[typ], addr)
}

interleavedAddrs := make([]resolver.Address, 0, len(addrs))

for curTypeIndex := 0; len(interleavedAddrs) < len(addrs); curTypeIndex = (curTypeIndex + 1) % len(interleavingOrder) {
// Some IP types may have fewer addresses than others, so we look for
// the next type that has a remaining member to add to the interleaved
// list.
typ := interleavingOrder[curTypeIndex]
remainingMembers := familyAddrsMap[typ]
if len(remainingMembers) > 0 {
interleavedAddrs = append(interleavedAddrs, remainingMembers[0])
familyAddrsMap[typ] = remainingMembers[1:]
}
}

return interleavedAddrs
}

func addressType(address string) ipAddrType {
// Try parsing addresses without a port specified.
ip := net.ParseIP(address)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not safely assume a port is always present (if it's any IP address)?

I was pretty sure we require that? The DNS resolver is where we add the default port to addresses if needed. So I think we can always do SplitHostPort and return Unknown on error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct, the port must be specified. I didn't check if the default port is added in the DNS resolver or somewhere before the dialer. I tried using passthrough to omit the port and the rpc fails with the following error:

code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp: address 127.0.0.1: missing port in address"

Removed the handling of addresses without ports, updating the test cases accordingly.

if ip == nil {
// Try to parse the IP after removing the port.
host, _, err := net.SplitHostPort(address)
if err != nil {
return ipTypeUnknown
}
ip = net.ParseIP(host)
}

// If using the passthrough resolver, the hostnames would be unresolved
// and therefore not valid IP addresses.
if ip == nil {
return ipTypeUnknown
}

if ip.To4() != nil {
return ipv4
} else if ip.To16() != nil {
return ipv6
}
return ipTypeUnknown

Check warning on line 382 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L382

Added line #L382 was not covered by tests
}

// reconcileSubConnsLocked updates the active subchannels based on a new address
// list from the resolver. It does this by:
// - closing subchannels: any existing subchannels associated with addresses
108 changes: 108 additions & 0 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
@@ -851,6 +851,114 @@ func (s) TestPickFirstLeaf_EmptyAddressList(t *testing.T) {
}
}

func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}, // no port
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
{Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}},
{Addresses: []resolver.Address{{Addr: "4.4.4.4:4"}}},
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // ipv6 with port
{Addresses: []resolver.Address{{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"}}},
{Addresses: []resolver.Address{{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"}}},
{Addresses: []resolver.Address{{Addr: "grpc.io:80"}}}, // not an IP.
},
},
}
if err := bal.UpdateClientConnState(ccState); err != nil {
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
}

wantAddrs := []resolver.Address{
{Addr: "1.1.1.1"},
{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"},
{Addr: "grpc.io:80"},
{Addr: "2.2.2.2:2"},
{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"},
{Addr: "3.3.3.3:3"},
{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"},
{Addr: "4.4.4.4:4"},
}

gotAddrs, err := subConnAddresses(ctx, cc, 8)
if err != nil {
t.Fatalf("%v", err)
}
if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" {
t.Errorf("subconn creation order mismatch (-want +got):\n%s", diff)
}
}

func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // ipv6 with port
{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}, // no port
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
{Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}},
{Addresses: []resolver.Address{{Addr: "4.4.4.4:4"}}},
{Addresses: []resolver.Address{{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"}}},
{Addresses: []resolver.Address{{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"}}},
{Addresses: []resolver.Address{{Addr: "grpc.io:80"}}}, // not an IP.
},
},
}
if err := bal.UpdateClientConnState(ccState); err != nil {
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
}

wantAddrs := []resolver.Address{
{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"},
{Addr: "1.1.1.1"},
{Addr: "grpc.io:80"},
{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"},
{Addr: "2.2.2.2:2"},
{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"},
{Addr: "3.3.3.3:3"},
{Addr: "4.4.4.4:4"},
}

gotAddrs, err := subConnAddresses(ctx, cc, 8)
if err != nil {
t.Fatalf("%v", err)
}
if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" {
t.Errorf("subconn creation order mismatch (-want +got):\n%s", diff)
}
}
func subConnAddresses(ctx context.Context, cc *testutils.BalancerClientConn, subConnCount int) ([]resolver.Address, error) {
addresses := []resolver.Address{}
for i := 0; i < subConnCount; i++ {
select {
case <-ctx.Done():
return nil, fmt.Errorf("Context timed out waiting for SubConn")
case sc := <-cc.NewSubConnCh:
if len(sc.Addresses) != 1 {
return nil, fmt.Errorf("len(SubConn.Addresses) = %d, want 1", len(sc.Addresses))
}
addresses = append(addresses, sc.Addresses[0])
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: fmt.Errorf("test error"),
})
}
}
return addresses, nil
}

// stateStoringBalancer stores the state of the subconns being created.
type stateStoringBalancer struct {
balancer.Balancer
2 changes: 2 additions & 0 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ type TestSubConn struct {
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
connectCalled *grpcsync.Event
Addresses []resolver.Address
}

// NewTestSubConn returns a newly initialized SubConn. Typically, subconns
@@ -131,6 +132,7 @@ func (tcc *BalancerClientConn) NewSubConn(a []resolver.Address, o balancer.NewSu
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
connectCalled: grpcsync.NewEvent(),
Addresses: a,
}
tcc.subConnIdx++
tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)