Skip to content

Commit

Permalink
internal/testutils: add a new test type that implements resolver.Clie…
Browse files Browse the repository at this point in the history
…ntConn (#6668)
  • Loading branch information
easwars authored Oct 12, 2023
1 parent 32e3ef1 commit dd4c0ad
Show file tree
Hide file tree
Showing 17 changed files with 1,158 additions and 1,370 deletions.
32 changes: 16 additions & 16 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func init() {
// glue code in weighted_target. It also tests an empty target config update,
// which should trigger a transient failure state update.
func (s) TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -329,7 +329,7 @@ func (s) TestWeightedTarget(t *testing.T) {
// have a weighted target balancer will one sub-balancer, and we add and remove
// backends from the subBalancer.
func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -427,7 +427,7 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
// TestWeightedTarget_TwoSubBalancers_OneBackend tests the case where we have a
// weighted target balancer with two sub-balancers, each with one backend.
func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -493,7 +493,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
// a weighted target balancer with two sub-balancers, each with more than one
// backend.
func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -637,7 +637,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
// case where we have a weighted target balancer with two sub-balancers of
// differing weights.
func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -718,7 +718,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *test
// have a weighted target balancer with three sub-balancers and we remove one of
// the subBalancers.
func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -879,7 +879,7 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// where we have a weighted target balancer with two sub-balancers, and we
// change the weight of these subBalancers.
func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -997,7 +997,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing
// the picks won't fail with transient_failure, and should instead wait for the
// other sub-balancer.
func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
// connecting, the overall state stays in transient_failure, and all picks
// return transient failure error.
func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes

// Verify that a SubConn is created with the expected address and hierarchy
// path cleared.
func verifyAddressInNewSubConn(t *testing.T, cc *testutils.TestClientConn, addr resolver.Address) {
func verifyAddressInNewSubConn(t *testing.T, cc *testutils.BalancerClientConn, addr resolver.Address) {
t.Helper()

gotAddr := <-cc.NewSubConnAddrsCh
Expand All @@ -1163,7 +1163,7 @@ type subConnWithAddr struct {
//
// Returned value is a map from subBalancer (identified by its config) to
// subConns created by it.
func waitForNewSubConns(t *testing.T, cc *testutils.TestClientConn, num int) map[string][]subConnWithAddr {
func waitForNewSubConns(t *testing.T, cc *testutils.BalancerClientConn, num int) map[string][]subConnWithAddr {
t.Helper()

scs := make(map[string][]subConnWithAddr)
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func init() {
// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func (s) TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -1274,7 +1274,7 @@ func (s) TestInitialIdle(t *testing.T) {
// TestIgnoreSubBalancerStateTransitions covers the case that if the child reports a
// transition from TF to Connecting, the overall state will still be TF.
func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}

wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
Expand Down Expand Up @@ -1314,17 +1314,17 @@ func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
// tcc wraps a testutils.TestClientConn but stores all state transitions in a
// slice.
type tcc struct {
*testutils.TestClientConn
*testutils.BalancerClientConn
states []balancer.State
}

func (t *tcc) UpdateState(bs balancer.State) {
t.states = append(t.states, bs)
t.TestClientConn.UpdateState(bs)
t.BalancerClientConn.UpdateState(bs)
}

func (s) TestUpdateStatePauses(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}

balFuncs := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
Expand Down
2 changes: 1 addition & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ import (
"google.golang.org/grpc/status"

_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
)

const (
Expand Down
4 changes: 2 additions & 2 deletions internal/balancer/gracefulswitch/gracefulswitch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

func setup(t *testing.T) (*testutils.TestClientConn, *Balancer) {
tcc := testutils.NewTestClientConn(t)
func setup(t *testing.T) (*testutils.BalancerClientConn, *Balancer) {
tcc := testutils.NewBalancerClientConn(t)
return tcc, NewBalancer(tcc, balancer.BuildOptions{})
}

Expand Down
14 changes: 7 additions & 7 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func Test(t *testing.T) {
// - b3, weight 1, backends [1,2]
// Start the balancer group again and check for behavior.
func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
stub.Register(balancerName, stub.BalancerFuncs{})
builder := balancer.Get(balancerName)

cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
Expand All @@ -203,8 +203,8 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
// Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
// is removed later, so the balancer group returned has one sub-balancer in its
// own map, and one sub-balancer in cache.
func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]*testutils.TestSubConn) {
cc := testutils.NewTestClientConn(t)
func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.BalancerClientConn, map[resolver.Address]*testutils.TestSubConn) {
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
Expand Down Expand Up @@ -503,7 +503,7 @@ func (s) TestBalancerGroupBuildOptions(t *testing.T) {
return nil
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bg := New(Options{
CC: cc,
BuildOpts: bOpts,
Expand Down Expand Up @@ -531,7 +531,7 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
exitIdleCh <- struct{}{}
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bg := New(Options{
CC: cc,
BuildOpts: balancer.BuildOptions{},
Expand Down Expand Up @@ -561,7 +561,7 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
// for the second passed in address and also only picks that created SubConn.
// The new aggregated picker should reflect this change for the child.
func (s) TestBalancerGracefulSwitch(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
Expand Down
69 changes: 20 additions & 49 deletions internal/resolver/dns/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package dns
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"os"
Expand All @@ -37,6 +36,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/resolver/dns/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand All @@ -47,15 +47,11 @@ var EnableSRVLookups = false

var logger = grpclog.Component("dns")

// Globals to stub out in tests. TODO: Perhaps these two can be combined into a
// single variable for testing the resolver?
var (
newTimer = time.NewTimer
newTimerDNSResRate = time.NewTimer
)

func init() {
resolver.Register(NewBuilder())
internal.TimeAfterFunc = time.After
internal.NewNetResolver = newNetResolver
internal.AddressDialer = addressDialer
}

const (
Expand All @@ -70,31 +66,18 @@ const (
txtAttribute = "grpc_config="
)

var (
errMissingAddr = errors.New("dns resolver: missing address")

// Addresses ending with a colon that is supposed to be the separator
// between host and port is not allowed. E.g. "::" is a valid address as
// it is an IPv6 address (host only) and "[::]:" is invalid as it ends with
// a colon as the host and port separator
errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
)

var (
defaultResolver netResolver = net.DefaultResolver
// To prevent excessive re-resolution, we enforce a rate limit on DNS
// resolution requests.
minDNSResRate = 30 * time.Second
)

var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) {
var dialer net.Dialer
return dialer.DialContext(ctx, network, address)
}
}

var newNetResolver = func(authority string) (netResolver, error) {
var newNetResolver = func(authority string) (internal.NetResolver, error) {
if authority == "" {
return net.DefaultResolver, nil
}

host, port, err := parseTarget(authority, defaultDNSSvrPort)
if err != nil {
return nil, err
Expand All @@ -104,7 +87,7 @@ var newNetResolver = func(authority string) (netResolver, error) {

return &net.Resolver{
PreferGo: true,
Dial: addressDialer(authorityWithPort),
Dial: internal.AddressDialer(authorityWithPort),
}, nil
}

Expand Down Expand Up @@ -142,13 +125,9 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
disableServiceConfig: opts.DisableServiceConfig,
}

if target.URL.Host == "" {
d.resolver = defaultResolver
} else {
d.resolver, err = newNetResolver(target.URL.Host)
if err != nil {
return nil, err
}
d.resolver, err = internal.NewNetResolver(target.URL.Host)
if err != nil {
return nil, err
}

d.wg.Add(1)
Expand All @@ -161,12 +140,6 @@ func (b *dnsBuilder) Scheme() string {
return "dns"
}

type netResolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}

// deadResolver is a resolver that does nothing.
type deadResolver struct{}

Expand All @@ -178,7 +151,7 @@ func (deadResolver) Close() {}
type dnsResolver struct {
host string
port string
resolver netResolver
resolver internal.NetResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
Expand Down Expand Up @@ -223,29 +196,27 @@ func (d *dnsResolver) watcher() {
err = d.cc.UpdateState(*state)
}

var timer *time.Timer
var waitTime time.Duration
if err == nil {
// Success resolving, wait for the next ResolveNow. However, also wait 30
// seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
timer = newTimerDNSResRate(minDNSResRate)
waitTime = internal.MinResolutionRate
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-d.rn:
}
} else {
// Poll on an error found in DNS Resolver or an error received from
// ClientConn.
timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
waitTime = backoff.DefaultExponential.Backoff(backoffIndex)
backoffIndex++
}
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-timer.C:
case <-internal.TimeAfterFunc(waitTime):
}
}
}
Expand Down Expand Up @@ -387,7 +358,7 @@ func formatIP(addr string) (addrIP string, ok bool) {
// target: ":80" defaultPort: "443" returns host: "localhost", port: "80"
func parseTarget(target, defaultPort string) (host, port string, err error) {
if target == "" {
return "", "", errMissingAddr
return "", "", internal.ErrMissingAddr
}
if ip := net.ParseIP(target); ip != nil {
// target is an IPv4 or IPv6(without brackets) address
Expand All @@ -397,7 +368,7 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {
if port == "" {
// If the port field is empty (target ends with colon), e.g. "[::1]:",
// this is an error.
return "", "", errEndsWithColon
return "", "", internal.ErrEndsWithColon
}
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {
Expand Down
Loading

0 comments on commit dd4c0ad

Please sign in to comment.