-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
xds_resolver.go
323 lines (290 loc) · 11.1 KB
/
xds_resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package resolver implements the xds resolver, that does LDS and RDS to find
// the cluster to use.
package resolver
import (
"errors"
"fmt"
"strings"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
const xdsScheme = "xds"
// newBuilderForTesting creates a new xds resolver builder using a specific xds
// bootstrap config, so tests can use multiple xds clients in different
// ClientConns at the same time.
func newBuilderForTesting(config []byte) (resolver.Builder, error) {
return &xdsResolverBuilder{
newXDSClient: func() (xdsclient.XDSClient, func(), error) {
return xdsclient.NewWithBootstrapContentsForTesting(config)
},
}, nil
}
// For overriding in unittests.
var newXDSClient = func() (xdsclient.XDSClient, func(), error) { return xdsclient.New() }
func init() {
resolver.Register(&xdsResolverBuilder{})
internal.NewXDSResolverWithConfigForTesting = newBuilderForTesting
}
type xdsResolverBuilder struct {
newXDSClient func() (xdsclient.XDSClient, func(), error)
}
// Build helps implement the resolver.Builder interface.
//
// The xds bootstrap process is performed (and a new xds client is built) every
// time an xds resolver is built.
func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) {
r := &xdsResolver{
cc: cc,
closed: grpcsync.NewEvent(),
updateCh: make(chan suWithError, 1),
activeClusters: make(map[string]*clusterInfo),
channelID: grpcrand.Uint64(),
}
defer func() {
if retErr != nil {
r.Close()
}
}()
r.logger = prefixLogger(r)
r.logger.Infof("Creating resolver for target: %+v", target)
newXDSClient := newXDSClient
if b.newXDSClient != nil {
newXDSClient = b.newXDSClient
}
client, close, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
}
r.xdsClient = client
r.xdsClientClose = close
bootstrapConfig := client.BootstrapConfig()
if bootstrapConfig == nil {
return nil, errors.New("bootstrap configuration is empty")
}
// If xds credentials were specified by the user, but bootstrap configs do
// not contain any certificate provider configuration, it is better to fail
// right now rather than failing when attempting to create certificate
// providers after receiving an CDS response with security configuration.
var creds credentials.TransportCredentials
switch {
case opts.DialCreds != nil:
creds = opts.DialCreds
case opts.CredsBundle != nil:
creds = opts.CredsBundle.TransportCredentials()
}
if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
if len(bootstrapConfig.CertProviderConfigs) == 0 {
return nil, errors.New("xds: xdsCreds specified but certificate_providers config missing in bootstrap file")
}
}
// Find the client listener template to use from the bootstrap config:
// - If authority is not set in the target, use the top level template
// - If authority is set, use the template from the authority map.
template := bootstrapConfig.ClientDefaultListenerResourceNameTemplate
if authority := target.URL.Host; authority != "" {
a := bootstrapConfig.Authorities[authority]
if a == nil {
return nil, fmt.Errorf("xds: authority %q is not found in the bootstrap file", authority)
}
if a.ClientListenerResourceNameTemplate != "" {
// This check will never be false, because
// ClientListenerResourceNameTemplate is required to start with
// xdstp://, and has a default value (not an empty string) if unset.
template = a.ClientListenerResourceNameTemplate
}
}
endpoint := target.URL.Path
if endpoint == "" {
endpoint = target.URL.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")
r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, endpoint)
// Register a watch on the xdsClient for the resource name determined above.
cancelWatch := watchService(r.xdsClient, r.ldsResourceName, r.handleServiceUpdate, r.logger)
r.logger.Infof("Watch started on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient)
r.cancelWatch = func() {
cancelWatch()
r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient)
}
go r.run()
return r, nil
}
// Name helps implement the resolver.Builder interface.
func (*xdsResolverBuilder) Scheme() string {
return xdsScheme
}
// suWithError wraps the ServiceUpdate and error received through a watch API
// callback, so that it can pushed onto the update channel as a single entity.
type suWithError struct {
su serviceUpdate
emptyUpdate bool
err error
}
// xdsResolver implements the resolver.Resolver interface.
//
// It registers a watcher for ServiceConfig updates with the xdsClient object
// (which performs LDS/RDS queries for the same), and passes the received
// updates to the ClientConn.
type xdsResolver struct {
cc resolver.ClientConn
closed *grpcsync.Event
logger *grpclog.PrefixLogger
ldsResourceName string
// The underlying xdsClient which performs all xDS requests and responses.
xdsClient xdsclient.XDSClient
xdsClientClose func()
// A channel for the watch API callback to write service updates on to. The
// updates are read by the run goroutine and passed on to the ClientConn.
updateCh chan suWithError
// cancelWatch is the function to cancel the watcher.
cancelWatch func()
// activeClusters is a map from cluster name to a ref count. Only read or
// written during a service update (synchronous).
activeClusters map[string]*clusterInfo
curConfigSelector *configSelector
// A random number which uniquely identifies the channel which owns this
// resolver.
channelID uint64
}
// sendNewServiceConfig prunes active clusters, generates a new service config
// based on the current set of active clusters, and sends an update to the
// channel with that service config and the provided config selector. Returns
// false if an error occurs while generating the service config and the update
// cannot be sent.
func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
// Delete entries from r.activeClusters with zero references;
// otherwise serviceConfigJSON will generate a config including
// them.
r.pruneActiveClusters()
if cs == nil && len(r.activeClusters) == 0 {
// There are no clusters and we are sending a failing configSelector.
// Send an empty config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure.
r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")})
return true
}
sc, err := serviceConfigJSON(r.activeClusters)
if err != nil {
// JSON marshal error; should never happen.
r.logger.Errorf("%v", err)
r.cc.ReportError(err)
return false
}
r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.ldsResourceName, r.xdsClient, pretty.FormatJSON(sc))
// Send the update to the ClientConn.
state := iresolver.SetConfigSelector(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
}, cs)
r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient))
return true
}
// run is a long running goroutine which blocks on receiving service updates
// and passes it on the ClientConn.
func (r *xdsResolver) run() {
for {
select {
case <-r.closed.Done():
return
case update := <-r.updateCh:
if update.err != nil {
r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.ldsResourceName, r.xdsClient, update.err)
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
// If error is resource-not-found, it means the LDS
// resource was removed. Ultimately send an empty service
// config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure. Before we
// can do that, we may need to send a normal service config
// along with an erroring (nil) config selector.
r.sendNewServiceConfig(nil)
// Stop and dereference the active config selector, if one exists.
r.curConfigSelector.stop()
r.curConfigSelector = nil
continue
}
// Send error to ClientConn, and balancers, if error is not
// resource not found. No need to update resolver state if we
// can keep using the old config.
r.cc.ReportError(update.err)
continue
}
if update.emptyUpdate {
r.sendNewServiceConfig(r.curConfigSelector)
continue
}
// Create the config selector for this update.
cs, err := r.newConfigSelector(update.su)
if err != nil {
r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.ldsResourceName, r.xdsClient, err)
r.cc.ReportError(err)
continue
}
if !r.sendNewServiceConfig(cs) {
// JSON error creating the service config (unexpected); erase
// this config selector and ignore this update, continuing with
// the previous config selector.
cs.stop()
continue
}
// Decrement references to the old config selector and assign the
// new one as the current one.
r.curConfigSelector.stop()
r.curConfigSelector = cs
}
}
}
// handleServiceUpdate is the callback which handles service updates. It writes
// the received update to the update channel, which is picked by the run
// goroutine.
func (r *xdsResolver) handleServiceUpdate(su serviceUpdate, err error) {
if r.closed.HasFired() {
// Do not pass updates to the ClientConn once the resolver is closed.
return
}
// Remove any existing entry in updateCh and replace with the new one.
select {
case <-r.updateCh:
default:
}
r.updateCh <- suWithError{su: su, err: err}
}
// ResolveNow is a no-op at this point.
func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}
// Close closes the resolver, and also closes the underlying xdsClient.
func (r *xdsResolver) Close() {
// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
if r.cancelWatch != nil {
r.cancelWatch()
}
if r.xdsClientClose != nil {
r.xdsClientClose()
}
r.closed.Fire()
r.logger.Infof("Shutdown")
}