Skip to content

Commit

Permalink
resolver: State: add Endpoints and deprecate Addresses (#6471)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Jul 31, 2023
1 parent 20c51a9 commit 94df716
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 43 deletions.
12 changes: 9 additions & 3 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ type SubConn interface {
//
// This will trigger a state transition for the SubConn.
//
// Deprecated: This method is now part of the ClientConn interface and will
// eventually be removed from here.
// Deprecated: this method will be removed. Create new SubConns for new
// addresses instead.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
Expand Down Expand Up @@ -150,6 +150,9 @@ type ClientConn interface {
// NewSubConn is called by balancer to create a new SubConn.
// It doesn't block and wait for the connections to be established.
// Behaviors of the SubConn can be controlled by options.
//
// Deprecated: please be aware that in a future version, SubConns will only
// support one address per SubConn.
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
Expand All @@ -159,7 +162,10 @@ type ClientConn interface {
// If so, the connection will be kept. Else, the connection will be
// gracefully closed, and a new connection will be created.
//
// This will trigger a state transition for the SubConn.
// This may trigger a state transition for the SubConn.
//
// Deprecated: this method will be removed. Create new SubConns for new
// addresses instead.
UpdateAddresses(SubConn, []resolver.Address)

// UpdateState notifies gRPC that the balancer's internal state has
Expand Down
4 changes: 4 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ var (

// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)

// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra
// metadata to RPCs.
GRPCResolverSchemeExtraMetadata string = "xds"
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
36 changes: 29 additions & 7 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Address struct {
// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
//
// Deprecated: when an Address is inside an Endpoint, this field should not
// be used, and it will eventually be removed entirely.
BalancerAttributes *attributes.Attributes

// Metadata is the information associated with Addr, which may be used
Expand Down Expand Up @@ -167,11 +170,37 @@ type BuildOptions struct {
Dialer func(context.Context, string) (net.Conn, error)
}

// An Endpoint is one network endpoint, or server, which may have multiple
// addresses with which it can be accessed.
type Endpoint struct {
// Addresses contains a list of addresses used to access this endpoint.
Addresses []Address

// Attributes contains arbitrary data about this endpoint intended for
// consumption by the LB policy.
Attributes *attributes.Attributes
}

// State contains the current Resolver state relevant to the ClientConn.
type State struct {
// Addresses is the latest set of resolved addresses for the target.
//
// If a resolver sets Addresses but does not set Endpoints, one Endpoint
// will be created for each Address before the State is passed to the LB
// policy. The BalancerAttributes of each entry in Addresses will be set
// in Endpoints.Attributes, and be cleared in the Endpoint's Address's
// BalancerAttributes.
//
// Soon, Addresses will be deprecated and replaced fully by Endpoints.
Addresses []Address

// Endpoints is the latest set of resolved endpoints for the target.
//
// If a resolver produces a State containing Endpoints but not Addresses,
// it must take care to ensure the LB policies it selects will support
// Endpoints.
Endpoints []Endpoint

// ServiceConfig contains the result from parsing the latest service
// config. If it is nil, it indicates no service config is present or the
// resolver does not provide service configs.
Expand Down Expand Up @@ -294,10 +323,3 @@ type Resolver interface {
// Close closes the resolver.
Close()
}

// UnregisterForTesting removes the resolver builder with the given scheme from the
// resolver map.
// This function is for testing only.
func UnregisterForTesting(scheme string) {
delete(m, scheme)
}
8 changes: 8 additions & 0 deletions resolver_conn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context))
// which includes addresses and service config.
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
errCh := make(chan error, 1)
if s.Endpoints == nil {
s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
for _, a := range s.Addresses {
ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
ep.Addresses[0].BalancerAttributes = nil
s.Endpoints = append(s.Endpoints, ep)
}
}
ok := ccr.serializer.Schedule(func(context.Context) {
ccr.addChannelzTraceEvent(s)
ccr.curState = s
Expand Down
51 changes: 51 additions & 0 deletions resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ import (
"net"
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)

type wrapResolverBuilder struct {
Expand Down Expand Up @@ -91,3 +96,49 @@ func (s) TestResolverCaseSensitivity(t *testing.T) {
}
cc.Close()
}

// TestResolverAddressesToEndpoints ensures one Endpoint is created for each
// entry in resolver.State.Addresses automatically.
func (s) TestResolverAddressesToEndpoints(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

const scheme = "testresolveraddressestoendpoints"
r := manual.NewBuilderWithScheme(scheme)

stateCh := make(chan balancer.ClientConnState, 1)
bf := stub.BalancerFuncs{
UpdateClientConnState: func(_ *stub.BalancerData, ccs balancer.ClientConnState) error {
stateCh <- ccs
return nil
},
}
balancerName := "stub-balancer-" + scheme
stub.Register(balancerName, bf)

a1 := attributes.New("x", "y")
a2 := attributes.New("a", "b")
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: "addr1", BalancerAttributes: a1}, {Addr: "addr2", BalancerAttributes: a2}}})

cc, err := Dial(r.Scheme()+":///",
WithTransportCredentials(insecure.NewCredentials()),
WithResolvers(r),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, balancerName)))
if err != nil {
t.Fatalf("Unexpected error dialing: %v", err)
}
defer cc.Close()

select {
case got := <-stateCh:
want := []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "addr1"}}, Attributes: a1},
{Addresses: []resolver.Address{{Addr: "addr2"}}, Attributes: a2},
}
if diff := cmp.Diff(got.ResolverState.Endpoints, want); diff != "" {
t.Errorf("Did not receive expected endpoints. Diff (-got +want):\n%v", diff)
}
case <-ctx.Done():
t.Fatalf("timed out waiting for endpoints")
}
}
3 changes: 2 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
Expand Down Expand Up @@ -433,7 +434,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
ctx = trace.NewContext(ctx, trInfo.tr)
}

if cs.cc.parsedTarget.URL.Scheme == "xds" {
if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
// Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them.
ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
Expand Down
13 changes: 5 additions & 8 deletions test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -196,14 +197,10 @@ func testPickExtraMetadata(t *testing.T, e env) {
te.startServer(&testServer{security: e.security})
defer te.tearDown()

// Set resolver to xds to trigger the extra metadata code path.
r := manual.NewBuilderWithScheme("xds")
resolver.Register(r)
defer func() {
resolver.UnregisterForTesting("xds")
}()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = "xds"
// Trigger the extra-metadata-adding code path.
defer func(old string) { internal.GRPCResolverSchemeExtraMetadata = old }(internal.GRPCResolverSchemeExtraMetadata)
internal.GRPCResolverSchemeExtraMetadata = "passthrough"

cc := te.clientConn()
tc := testgrpc.NewTestServiceClient(cc)

Expand Down
22 changes: 2 additions & 20 deletions xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,31 +60,13 @@ var (
)

func replaceResolvers() func() {
var registerForTesting bool
if resolver.Get(c2pScheme) == nil {
// If env var to enable c2p is not set, the resolver isn't registered.
// Need to register and unregister in defer.
registerForTesting = true
resolver.Register(&c2pResolverBuilder{})
}
oldDNS := resolver.Get("dns")
resolver.Register(testDNSResolver)
oldXDS := resolver.Get("xds")
resolver.Register(testXDSResolver)
return func() {
if oldDNS != nil {
resolver.Register(oldDNS)
} else {
resolver.UnregisterForTesting("dns")
}
if oldXDS != nil {
resolver.Register(oldXDS)
} else {
resolver.UnregisterForTesting("xds")
}
if registerForTesting {
resolver.UnregisterForTesting(c2pScheme)
}
resolver.Register(oldDNS)
resolver.Register(oldXDS)
}
}

Expand Down
7 changes: 7 additions & 0 deletions xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,15 @@ func (b *clusterResolverBalancer) updateChildConfig() {
}
b.logger.Infof("Built child policy config: %v", pretty.ToJSON(childCfg))

endpoints := make([]resolver.Endpoint, len(addrs))
for i, a := range addrs {
endpoints[i].Attributes = a.BalancerAttributes
endpoints[i].Addresses = []resolver.Address{a}
endpoints[i].Addresses[0].BalancerAttributes = nil
}
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: endpoints,
Addresses: addrs,
ServiceConfig: b.configRaw,
Attributes: b.attrsWithClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOp
mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }

dnsResolverBuilder := resolver.Get("dns")
resolver.UnregisterForTesting("dns")
resolver.Register(mr)

return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }
Expand Down
18 changes: 15 additions & 3 deletions xds/internal/balancer/clusterresolver/resource_resolver_dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,21 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
}

dr.mu.Lock()
addrs := make([]string, len(state.Addresses))
for i, a := range state.Addresses {
addrs[i] = a.Addr
var addrs []string
if len(state.Endpoints) > 0 {
// Assume 1 address per endpoint, which is how DNS is expected to
// behave. The slice will grow as needed, however.
addrs = make([]string, 0, len(state.Endpoints))
for _, e := range state.Endpoints {
for _, a := range e.Addresses {
addrs = append(addrs, a.Addr)
}
}
} else {
addrs = make([]string, len(state.Addresses))
for i, a := range state.Addresses {
addrs[i] = a.Addr
}
}
dr.addrs = addrs
dr.updateReceived = true
Expand Down

0 comments on commit 94df716

Please sign in to comment.