diff --git a/go.mod b/go.mod index 3122c6e36..c3b7e3cf4 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 - google.golang.org/grpc v1.47.0 + google.golang.org/grpc v1.48.0 google.golang.org/protobuf v1.28.0 gopkg.in/square/go-jose.v2 v2.6.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 19b31007d..887183ae1 100644 --- a/go.sum +++ b/go.sum @@ -3848,8 +3848,9 @@ google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ5 google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.47.0 h1:9n77onPX5F3qfFCqjy9dhn8PbNQsIKeVU04J9G7umt8= google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= +google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b/go.mod h1:IBqQ7wSUJ2Ep09a8rMWFsg4fmI2r38zwsq8a0GgxXpM= google.golang.org/grpc/examples v0.0.0-20201130180447-c456688b1860/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE= diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go index a67074a3a..e8dfc828a 100644 --- a/vendor/google.golang.org/grpc/balancer/base/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go @@ -45,6 +45,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) scStates: make(map[balancer.SubConn]connectivity.State), csEvltr: &balancer.ConnectivityStateEvaluator{}, config: bb.config, + state: connectivity.Connecting, } // Initialize picker to a picker that always returns // ErrNoSubConnAvailable, because when state of a SubConn changes, we @@ -134,6 +135,9 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { b.ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } + + b.regeneratePicker() + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) return nil } diff --git a/vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go b/vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go index f15dddb56..d82b714e0 100644 --- a/vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go +++ b/vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go @@ -26,12 +26,12 @@ import ( // Name is the name of weighted_round_robin balancer. const Name = "weighted_round_robin" -// attributeKey is the type used as the key to store AddrInfo in the Attributes -// field of resolver.Address. +// attributeKey is the type used as the key to store AddrInfo in the +// BalancerAttributes field of resolver.Address. type attributeKey struct{} -// AddrInfo will be stored inside Address metadata in order to use weighted -// roundrobin balancer. +// AddrInfo will be stored in the BalancerAttributes field of Address in order +// to use weighted roundrobin balancer. type AddrInfo struct { Weight uint32 } @@ -42,8 +42,8 @@ func (a AddrInfo) Equal(o interface{}) bool { return ok && oa.Weight == a.Weight } -// SetAddrInfo returns a copy of addr in which the Attributes field is updated -// with addrInfo. +// SetAddrInfo returns a copy of addr in which the BalancerAttributes field is +// updated with addrInfo. // // Experimental // @@ -54,7 +54,8 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address { return addr } -// GetAddrInfo returns the AddrInfo stored in the Attributes fields of addr. +// GetAddrInfo returns the AddrInfo stored in the BalancerAttributes field of +// addr. // // Experimental // diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index de6d41c23..0d21f2210 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -146,6 +146,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) + for _, opt := range extraDialOptions { + opt.apply(&cc.dopts) + } + for _, opt := range opts { opt.apply(&cc.dopts) } diff --git a/vendor/google.golang.org/grpc/credentials/google/xds.go b/vendor/google.golang.org/grpc/credentials/google/xds.go index b8c2e8f92..e32edc042 100644 --- a/vendor/google.golang.org/grpc/credentials/google/xds.go +++ b/vendor/google.golang.org/grpc/credentials/google/xds.go @@ -21,6 +21,7 @@ package google import ( "context" "net" + "net/url" "strings" "google.golang.org/grpc/credentials" @@ -28,12 +29,16 @@ import ( ) const cfeClusterNamePrefix = "google_cfe_" +const cfeClusterResourceNamePrefix = "/envoy.config.cluster.v3.Cluster/google_cfe_" +const cfeClusterAuthorityName = "traffic-director-c2p.xds.googleapis.com" // clusterTransportCreds is a combo of TLS + ALTS. // // On the client, ClientHandshake picks TLS or ALTS based on address attributes. // - if attributes has cluster name -// - if cluster name has prefix "google_cfe_", use TLS +// - if cluster name has prefix "google_cfe_", or +// "xdstp://traffic-director-c2p.xds.googleapis.com/envoy.config.cluster.v3.Cluster/google_cfe_", +// use TLS // - otherwise, use ALTS // - else, do TLS // @@ -50,18 +55,49 @@ func newClusterTransportCreds(tls, alts credentials.TransportCredentials) *clust } } -func (c *clusterTransportCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { +// clusterName returns the xDS cluster name stored in the attributes in the +// context. +func clusterName(ctx context.Context) string { chi := credentials.ClientHandshakeInfoFromContext(ctx) if chi.Attributes == nil { - return c.tls.ClientHandshake(ctx, authority, rawConn) + return "" + } + cluster, _ := internal.GetXDSHandshakeClusterName(chi.Attributes) + return cluster +} + +// isDirectPathCluster returns true if the cluster in the context is a +// directpath cluster, meaning ALTS should be used. +func isDirectPathCluster(ctx context.Context) bool { + cluster := clusterName(ctx) + if cluster == "" { + // No cluster; not xDS; use TLS. + return false + } + if strings.HasPrefix(cluster, cfeClusterNamePrefix) { + // xDS cluster prefixed by "google_cfe_"; use TLS. + return false } - cn, ok := internal.GetXDSHandshakeClusterName(chi.Attributes) - if !ok || strings.HasPrefix(cn, cfeClusterNamePrefix) { - return c.tls.ClientHandshake(ctx, authority, rawConn) + if !strings.HasPrefix(cluster, "xdstp:") { + // Other xDS cluster name; use ALTS. + return true + } + u, err := url.Parse(cluster) + if err != nil { + // Shouldn't happen, but assume ALTS. + return true + } + // If authority AND path match our CFE checks, use TLS; otherwise use ALTS. + return u.Host != cfeClusterAuthorityName || !strings.HasPrefix(u.Path, cfeClusterResourceNamePrefix) +} + +func (c *clusterTransportCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { + if isDirectPathCluster(ctx) { + // If attributes have cluster name, and cluster name is not cfe, it's a + // backend address, use ALTS. + return c.alts.ClientHandshake(ctx, authority, rawConn) } - // If attributes have cluster name, and cluster name is not cfe, it's a - // backend address, use ALTS. - return c.alts.ClientHandshake(ctx, authority, rawConn) + return c.tls.ClientHandshake(ctx, authority, rawConn) } func (c *clusterTransportCreds) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) { diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index f2f605a17..75d01ba77 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -35,6 +35,15 @@ import ( "google.golang.org/grpc/stats" ) +func init() { + internal.AddExtraDialOptions = func(opt ...DialOption) { + extraDialOptions = append(extraDialOptions, opt...) + } + internal.ClearExtraDialOptions = func() { + extraDialOptions = nil + } +} + // dialOptions configure a Dial call. dialOptions are set by the DialOption // values passed to Dial. type dialOptions struct { @@ -70,6 +79,8 @@ type DialOption interface { apply(*dialOptions) } +var extraDialOptions []DialOption + // EmptyDialOption does not alter the dial configuration. It can be embedded in // another structure to build custom dial options. // @@ -380,7 +391,7 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { // all the RPCs and underlying network connections in this ClientConn. func WithStatsHandler(h stats.Handler) DialOption { return newFuncDialOption(func(o *dialOptions) { - o.copts.StatsHandler = h + o.copts.StatsHandlers = append(o.copts.StatsHandlers, h) }) } diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go index 7ba8f4d18..08666f62a 100644 --- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go @@ -193,6 +193,8 @@ func (gsb *Balancer) ExitIdle() { ei.ExitIdle() return } + gsb.mu.Lock() + defer gsb.mu.Unlock() for sc := range balToUpdate.subconns { sc.Connect() } diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go index 0a25ce43f..e3dfe204f 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go @@ -42,14 +42,14 @@ var binLogger Logger var grpclogLogger = grpclog.Component("binarylog") -// SetLogger sets the binarg logger. +// SetLogger sets the binary logger. // // Only call this at init time. func SetLogger(l Logger) { binLogger = l } -// GetLogger gets the binarg logger. +// GetLogger gets the binary logger. // // Only call this at init time. func GetLogger() Logger { diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go index 7d996e51b..55aaeea8b 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go @@ -77,7 +77,7 @@ var ( // environment variable // "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to // "true". - XDSAggregateAndDNS = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true") + XDSAggregateAndDNS = !strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "false") // XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled, // which can be disabled by setting the environment variable diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 6d355b0b0..83018be7c 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -63,6 +63,76 @@ var ( // xDS-enabled server invokes this method on a grpc.Server when a particular // listener moves to "not-serving" mode. DrainServerTransports interface{} // func(*grpc.Server, string) + // AddExtraServerOptions adds an array of ServerOption that will be + // effective globally for newly created servers. The priority will be: 1. + // user-provided; 2. this method; 3. default values. + AddExtraServerOptions interface{} // func(opt ...ServerOption) + // ClearExtraServerOptions clears the array of extra ServerOption. This + // method is useful in testing and benchmarking. + ClearExtraServerOptions func() + // AddExtraDialOptions adds an array of DialOption that will be effective + // globally for newly created client channels. The priority will be: 1. + // user-provided; 2. this method; 3. default values. + AddExtraDialOptions interface{} // func(opt ...DialOption) + // ClearExtraDialOptions clears the array of extra DialOption. This + // method is useful in testing and benchmarking. + ClearExtraDialOptions func() + + // NewXDSResolverWithConfigForTesting creates a new xds resolver builder using + // the provided xds bootstrap config instead of the global configuration from + // the supported environment variables. The resolver.Builder is meant to be + // used in conjunction with the grpc.WithResolvers DialOption. + // + // Testing Only + // + // This function should ONLY be used for testing and may not work with some + // other features, including the CSDS service. + NewXDSResolverWithConfigForTesting interface{} // func([]byte) (resolver.Builder, error) + + // RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster + // Specifier Plugin for testing purposes, regardless of the XDSRLS environment + // variable. + // + // TODO: Remove this function once the RLS env var is removed. + RegisterRLSClusterSpecifierPluginForTesting func() + + // UnregisterRLSClusterSpecifierPluginForTesting unregisters the RLS Cluster + // Specifier Plugin for testing purposes. This is needed because there is no way + // to unregister the RLS Cluster Specifier Plugin after registering it solely + // for testing purposes using RegisterRLSClusterSpecifierPluginForTesting(). + // + // TODO: Remove this function once the RLS env var is removed. + UnregisterRLSClusterSpecifierPluginForTesting func() + + // RegisterRBACHTTPFilterForTesting registers the RBAC HTTP Filter for testing + // purposes, regardless of the RBAC environment variable. + // + // TODO: Remove this function once the RBAC env var is removed. + RegisterRBACHTTPFilterForTesting func() + + // UnregisterRBACHTTPFilterForTesting unregisters the RBAC HTTP Filter for + // testing purposes. This is needed because there is no way to unregister the + // HTTP Filter after registering it solely for testing purposes using + // RegisterRBACHTTPFilterForTesting(). + // + // TODO: Remove this function once the RBAC env var is removed. + UnregisterRBACHTTPFilterForTesting func() + + // RegisterOutlierDetectionBalancerForTesting registers the Outlier + // Detection Balancer for testing purposes, regardless of the Outlier + // Detection environment variable. + // + // TODO: Remove this function once the Outlier Detection env var is removed. + RegisterOutlierDetectionBalancerForTesting func() + + // UnregisterOutlierDetectionBalancerForTesting unregisters the Outlier + // Detection Balancer for testing purposes. This is needed because there is + // no way to unregister the Outlier Detection Balancer after registering it + // solely for testing purposes using + // RegisterOutlierDetectionBalancerForTesting(). + // + // TODO: Remove this function once the Outlier Detection env var is removed. + UnregisterOutlierDetectionBalancerForTesting func() ) // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index 1c3459c2b..090120925 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -49,7 +49,7 @@ import ( // NewServerHandlerTransport returns a ServerTransport handling gRPC // from inside an http.Handler. It requires that the http Server // supports HTTP/2. -func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) { +func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) { if r.ProtoMajor != 2 { return nil, errors.New("gRPC requires HTTP/2") } @@ -138,7 +138,7 @@ type serverHandlerTransport struct { // TODO make sure this is consistent across handler_server and http2_server contentSubtype string - stats stats.Handler + stats []stats.Handler } func (ht *serverHandlerTransport) Close() { @@ -228,10 +228,10 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro }) if err == nil { // transport has not been closed - if ht.stats != nil { - // Note: The trailer fields are compressed with hpack after this call returns. - // No WireLength field is set here. - ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{ + // Note: The trailer fields are compressed with hpack after this call returns. + // No WireLength field is set here. + for _, sh := range ht.stats { + sh.HandleRPC(s.Context(), &stats.OutTrailer{ Trailer: s.trailer.Copy(), }) } @@ -314,10 +314,10 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { }) if err == nil { - if ht.stats != nil { + for _, sh := range ht.stats { // Note: The header fields are compressed with hpack after this call returns. // No WireLength field is set here. - ht.stats.HandleRPC(s.Context(), &stats.OutHeader{ + sh.HandleRPC(s.Context(), &stats.OutHeader{ Header: md.Copy(), Compression: s.sendCompress, }) @@ -369,14 +369,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace } ctx = metadata.NewIncomingContext(ctx, ht.headerMD) s.ctx = peer.NewContext(ctx, pr) - if ht.stats != nil { - s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) + for _, sh := range ht.stats { + s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) inHeader := &stats.InHeader{ FullMethod: s.method, RemoteAddr: ht.RemoteAddr(), Compression: s.recvCompress, } - ht.stats.HandleRPC(s.ctx, inHeader) + sh.HandleRPC(s.ctx, inHeader) } s.trReader = &transportReader{ reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}}, diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index 24ca59084..be371c6e0 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -90,7 +90,7 @@ type http2Client struct { kp keepalive.ClientParameters keepaliveEnabled bool - statsHandler stats.Handler + statsHandlers []stats.Handler initialWindowSize int32 @@ -311,7 +311,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts isSecure: isSecure, perRPCCreds: perRPCCreds, kp: kp, - statsHandler: opts.StatsHandler, + statsHandlers: opts.StatsHandlers, initialWindowSize: initialWindowSize, onPrefaceReceipt: onPrefaceReceipt, nextID: 1, @@ -341,15 +341,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts updateFlowControl: t.updateFlowControl, } } - if t.statsHandler != nil { - t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ + for _, sh := range t.statsHandlers { + t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{ RemoteAddr: t.remoteAddr, LocalAddr: t.localAddr, }) connBegin := &stats.ConnBegin{ Client: true, } - t.statsHandler.HandleConn(t.ctx, connBegin) + sh.HandleConn(t.ctx, connBegin) } t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) if err != nil { @@ -773,24 +773,27 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true} } } - if t.statsHandler != nil { + if len(t.statsHandlers) != 0 { header, ok := metadata.FromOutgoingContext(ctx) if ok { header.Set("user-agent", t.userAgent) } else { header = metadata.Pairs("user-agent", t.userAgent) } - // Note: The header fields are compressed with hpack after this call returns. - // No WireLength field is set here. - outHeader := &stats.OutHeader{ - Client: true, - FullMethod: callHdr.Method, - RemoteAddr: t.remoteAddr, - LocalAddr: t.localAddr, - Compression: callHdr.SendCompress, - Header: header, + for _, sh := range t.statsHandlers { + // Note: The header fields are compressed with hpack after this call returns. + // No WireLength field is set here. + // Note: Creating a new stats object to prevent pollution. + outHeader := &stats.OutHeader{ + Client: true, + FullMethod: callHdr.Method, + RemoteAddr: t.remoteAddr, + LocalAddr: t.localAddr, + Compression: callHdr.SendCompress, + Header: header, + } + sh.HandleRPC(s.ctx, outHeader) } - t.statsHandler.HandleRPC(s.ctx, outHeader) } return s, nil } @@ -916,11 +919,11 @@ func (t *http2Client) Close(err error) { for _, s := range streams { t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false) } - if t.statsHandler != nil { + for _, sh := range t.statsHandlers { connEnd := &stats.ConnEnd{ Client: true, } - t.statsHandler.HandleConn(t.ctx, connEnd) + sh.HandleConn(t.ctx, connEnd) } } @@ -1432,7 +1435,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { close(s.headerChan) } - if t.statsHandler != nil { + for _, sh := range t.statsHandlers { if isHeader { inHeader := &stats.InHeader{ Client: true, @@ -1440,14 +1443,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { Header: metadata.MD(mdata).Copy(), Compression: s.recvCompress, } - t.statsHandler.HandleRPC(s.ctx, inHeader) + sh.HandleRPC(s.ctx, inHeader) } else { inTrailer := &stats.InTrailer{ Client: true, WireLength: int(frame.Header().Length), Trailer: metadata.MD(mdata).Copy(), } - t.statsHandler.HandleRPC(s.ctx, inTrailer) + sh.HandleRPC(s.ctx, inTrailer) } } diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 45d7bd145..2b0fde334 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -82,7 +82,7 @@ type http2Server struct { // updates, reset streams, and various settings) to the controller. controlBuf *controlBuffer fc *trInFlow - stats stats.Handler + stats []stats.Handler // Keepalive and max-age parameters for the server. kp keepalive.ServerParameters // Keepalive enforcement policy. @@ -257,7 +257,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, fc: &trInFlow{limit: uint32(icwz)}, state: reachable, activeStreams: make(map[uint32]*Stream), - stats: config.StatsHandler, + stats: config.StatsHandlers, kp: kp, idle: time.Now(), kep: kep, @@ -272,13 +272,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, updateFlowControl: t.updateFlowControl, } } - if t.stats != nil { - t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ + for _, sh := range t.stats { + t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{ RemoteAddr: t.remoteAddr, LocalAddr: t.localAddr, }) connBegin := &stats.ConnBegin{} - t.stats.HandleConn(t.ctx, connBegin) + sh.HandleConn(t.ctx, connBegin) } t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) if err != nil { @@ -570,8 +570,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( t.adjustWindow(s, uint32(n)) } s.ctx = traceCtx(s.ctx, s.method) - if t.stats != nil { - s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) + for _, sh := range t.stats { + s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) inHeader := &stats.InHeader{ FullMethod: s.method, RemoteAddr: t.remoteAddr, @@ -580,7 +580,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( WireLength: int(frame.Header().Length), Header: metadata.MD(mdata).Copy(), } - t.stats.HandleRPC(s.ctx, inHeader) + sh.HandleRPC(s.ctx, inHeader) } s.ctxDone = s.ctx.Done() s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) @@ -996,14 +996,14 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error { t.closeStream(s, true, http2.ErrCodeInternal, false) return ErrHeaderListSizeLimitViolation } - if t.stats != nil { + for _, sh := range t.stats { // Note: Headers are compressed with hpack after this call returns. // No WireLength field is set here. outHeader := &stats.OutHeader{ Header: s.header.Copy(), Compression: s.sendCompress, } - t.stats.HandleRPC(s.Context(), outHeader) + sh.HandleRPC(s.Context(), outHeader) } return nil } @@ -1064,10 +1064,10 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { // Send a RST_STREAM after the trailers if the client has not already half-closed. rst := s.getState() == streamActive t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true) - if t.stats != nil { + for _, sh := range t.stats { // Note: The trailer fields are compressed with hpack after this call returns. // No WireLength field is set here. - t.stats.HandleRPC(s.Context(), &stats.OutTrailer{ + sh.HandleRPC(s.Context(), &stats.OutTrailer{ Trailer: s.trailer.Copy(), }) } @@ -1222,9 +1222,9 @@ func (t *http2Server) Close() { for _, s := range streams { s.cancel() } - if t.stats != nil { + for _, sh := range t.stats { connEnd := &stats.ConnEnd{} - t.stats.HandleConn(t.ctx, connEnd) + sh.HandleConn(t.ctx, connEnd) } } diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go index d8247bcdf..b77513068 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http_util.go +++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go @@ -322,8 +322,6 @@ type bufWriter struct { batchSize int conn net.Conn err error - - onFlush func() } func newBufWriter(conn net.Conn, batchSize int) *bufWriter { @@ -360,9 +358,6 @@ func (w *bufWriter) Flush() error { if w.offset == 0 { return nil } - if w.onFlush != nil { - w.onFlush() - } _, w.err = w.conn.Write(w.buf[:w.offset]) w.offset = 0 return w.err diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index a9ce717f1..6c3ba8515 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -523,7 +523,7 @@ type ServerConfig struct { ConnectionTimeout time.Duration Credentials credentials.TransportCredentials InTapHandle tap.ServerInHandle - StatsHandler stats.Handler + StatsHandlers []stats.Handler KeepaliveParams keepalive.ServerParameters KeepalivePolicy keepalive.EnforcementPolicy InitialWindowSize int32 @@ -553,8 +553,8 @@ type ConnectOptions struct { CredsBundle credentials.Bundle // KeepaliveParams stores the keepalive parameters. KeepaliveParams keepalive.ClientParameters - // StatsHandler stores the handler for stats. - StatsHandler stats.Handler + // StatsHandlers stores the handler for stats. + StatsHandlers []stats.Handler // InitialWindowSize sets the initial window size for a stream. InitialWindowSize int32 // InitialConnWindowSize sets the initial window size for a connection. diff --git a/vendor/google.golang.org/grpc/regenerate.sh b/vendor/google.golang.org/grpc/regenerate.sh index 978b89f37..99db79faf 100644 --- a/vendor/google.golang.org/grpc/regenerate.sh +++ b/vendor/google.golang.org/grpc/regenerate.sh @@ -68,7 +68,6 @@ SOURCES=( ${WORKDIR}/grpc-proto/grpc/gcp/transport_security_common.proto ${WORKDIR}/grpc-proto/grpc/lookup/v1/rls.proto ${WORKDIR}/grpc-proto/grpc/lookup/v1/rls_config.proto - ${WORKDIR}/grpc-proto/grpc/service_config/service_config.proto ${WORKDIR}/grpc-proto/grpc/testing/*.proto ${WORKDIR}/grpc-proto/grpc/core/*.proto ) @@ -80,8 +79,7 @@ SOURCES=( # Note that the protos listed here are all for testing purposes. All protos to # be used externally should have a go_package option (and they don't need to be # listed here). -OPTS=Mgrpc/service_config/service_config.proto=/internal/proto/grpc_service_config,\ -Mgrpc/core/stats.proto=google.golang.org/grpc/interop/grpc_testing/core,\ +OPTS=Mgrpc/core/stats.proto=google.golang.org/grpc/interop/grpc_testing/core,\ Mgrpc/testing/benchmark_service.proto=google.golang.org/grpc/interop/grpc_testing,\ Mgrpc/testing/stats.proto=google.golang.org/grpc/interop/grpc_testing,\ Mgrpc/testing/report_qps_scenario_service.proto=google.golang.org/grpc/interop/grpc_testing,\ @@ -121,9 +119,6 @@ mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/ # see grpc_testing_not_regenerate/README.md for details. rm ${WORKDIR}/out/google.golang.org/grpc/reflection/grpc_testing_not_regenerate/*.pb.go -# grpc/service_config/service_config.proto does not have a go_package option. -mv ${WORKDIR}/out/grpc/service_config/service_config.pb.go internal/proto/grpc_service_config - # grpc/testing does not have a go_package option. mv ${WORKDIR}/out/grpc/testing/*.pb.go interop/grpc_testing/ mv ${WORKDIR}/out/grpc/core/*.pb.go interop/grpc_testing/core/ diff --git a/vendor/google.golang.org/grpc/resolver/map.go b/vendor/google.golang.org/grpc/resolver/map.go index e87ecd0ee..efcb7f3ef 100644 --- a/vendor/google.golang.org/grpc/resolver/map.go +++ b/vendor/google.golang.org/grpc/resolver/map.go @@ -28,25 +28,40 @@ type addressMapEntry struct { // Multiple accesses may not be performed concurrently. Must be created via // NewAddressMap; do not construct directly. type AddressMap struct { - m map[string]addressMapEntryList + // The underlying map is keyed by an Address with fields that we don't care + // about being set to their zero values. The only fields that we care about + // are `Addr`, `ServerName` and `Attributes`. Since we need to be able to + // distinguish between addresses with same `Addr` and `ServerName`, but + // different `Attributes`, we cannot store the `Attributes` in the map key. + // + // The comparison operation for structs work as follows: + // Struct values are comparable if all their fields are comparable. Two + // struct values are equal if their corresponding non-blank fields are equal. + // + // The value type of the map contains a slice of addresses which match the key + // in their `Addr` and `ServerName` fields and contain the corresponding value + // associated with them. + m map[Address]addressMapEntryList +} + +func toMapKey(addr *Address) Address { + return Address{Addr: addr.Addr, ServerName: addr.ServerName} } type addressMapEntryList []*addressMapEntry // NewAddressMap creates a new AddressMap. func NewAddressMap() *AddressMap { - return &AddressMap{m: make(map[string]addressMapEntryList)} + return &AddressMap{m: make(map[Address]addressMapEntryList)} } // find returns the index of addr in the addressMapEntry slice, or -1 if not // present. func (l addressMapEntryList) find(addr Address) int { - if len(l) == 0 { - return -1 - } for i, entry := range l { - if entry.addr.ServerName == addr.ServerName && - entry.addr.Attributes.Equal(addr.Attributes) { + // Attributes are the only thing to match on here, since `Addr` and + // `ServerName` are already equal. + if entry.addr.Attributes.Equal(addr.Attributes) { return i } } @@ -55,7 +70,8 @@ func (l addressMapEntryList) find(addr Address) int { // Get returns the value for the address in the map, if present. func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) { - entryList := a.m[addr.Addr] + addrKey := toMapKey(&addr) + entryList := a.m[addrKey] if entry := entryList.find(addr); entry != -1 { return entryList[entry].value, true } @@ -64,17 +80,19 @@ func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) { // Set updates or adds the value to the address in the map. func (a *AddressMap) Set(addr Address, value interface{}) { - entryList := a.m[addr.Addr] + addrKey := toMapKey(&addr) + entryList := a.m[addrKey] if entry := entryList.find(addr); entry != -1 { - a.m[addr.Addr][entry].value = value + entryList[entry].value = value return } - a.m[addr.Addr] = append(a.m[addr.Addr], &addressMapEntry{addr: addr, value: value}) + a.m[addrKey] = append(entryList, &addressMapEntry{addr: addr, value: value}) } // Delete removes addr from the map. func (a *AddressMap) Delete(addr Address) { - entryList := a.m[addr.Addr] + addrKey := toMapKey(&addr) + entryList := a.m[addrKey] entry := entryList.find(addr) if entry == -1 { return @@ -85,7 +103,7 @@ func (a *AddressMap) Delete(addr Address) { copy(entryList[entry:], entryList[entry+1:]) entryList = entryList[:len(entryList)-1] } - a.m[addr.Addr] = entryList + a.m[addrKey] = entryList } // Len returns the number of entries in the map. @@ -107,3 +125,14 @@ func (a *AddressMap) Keys() []Address { } return ret } + +// Values returns a slice of all current map values. +func (a *AddressMap) Values() []interface{} { + ret := make([]interface{}, 0, a.Len()) + for _, entryList := range a.m { + for _, entry := range entryList { + ret = append(ret, entry.value) + } + } + return ret +} diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 65de84b30..b54f5bb57 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -73,6 +73,12 @@ func init() { internal.DrainServerTransports = func(srv *Server, addr string) { srv.drainServerTransports(addr) } + internal.AddExtraServerOptions = func(opt ...ServerOption) { + extraServerOptions = opt + } + internal.ClearExtraServerOptions = func() { + extraServerOptions = nil + } } var statusOK = status.New(codes.OK, "") @@ -150,7 +156,7 @@ type serverOptions struct { chainUnaryInts []UnaryServerInterceptor chainStreamInts []StreamServerInterceptor inTapHandle tap.ServerInHandle - statsHandler stats.Handler + statsHandlers []stats.Handler maxConcurrentStreams uint32 maxReceiveMessageSize int maxSendMessageSize int @@ -174,6 +180,7 @@ var defaultServerOptions = serverOptions{ writeBufferSize: defaultWriteBufSize, readBufferSize: defaultReadBufSize, } +var extraServerOptions []ServerOption // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. type ServerOption interface { @@ -435,7 +442,7 @@ func InTapHandle(h tap.ServerInHandle) ServerOption { // StatsHandler returns a ServerOption that sets the stats handler for the server. func StatsHandler(h stats.Handler) ServerOption { return newFuncServerOption(func(o *serverOptions) { - o.statsHandler = h + o.statsHandlers = append(o.statsHandlers, h) }) } @@ -560,6 +567,9 @@ func (s *Server) stopServerWorkers() { // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions + for _, o := range extraServerOptions { + o.apply(&opts) + } for _, o := range opt { o.apply(&opts) } @@ -867,7 +877,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { ConnectionTimeout: s.opts.connectionTimeout, Credentials: s.opts.creds, InTapHandle: s.opts.inTapHandle, - StatsHandler: s.opts.statsHandler, + StatsHandlers: s.opts.statsHandlers, KeepaliveParams: s.opts.keepaliveParams, KeepalivePolicy: s.opts.keepalivePolicy, InitialWindowSize: s.opts.initialWindowSize, @@ -963,7 +973,7 @@ var _ http.Handler = (*Server)(nil) // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) + st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -1076,8 +1086,10 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) } err = t.Write(stream, hdr, payload, opts) - if err == nil && s.opts.statsHandler != nil { - s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) + if err == nil { + for _, sh := range s.opts.statsHandlers { + sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) + } } return err } @@ -1124,13 +1136,13 @@ func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerIn } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { - sh := s.opts.statsHandler - if sh != nil || trInfo != nil || channelz.IsOn() { + shs := s.opts.statsHandlers + if len(shs) != 0 || trInfo != nil || channelz.IsOn() { if channelz.IsOn() { s.incrCallsStarted() } var statsBegin *stats.Begin - if sh != nil { + for _, sh := range shs { beginTime := time.Now() statsBegin = &stats.Begin{ BeginTime: beginTime, @@ -1161,7 +1173,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. trInfo.tr.Finish() } - if sh != nil { + for _, sh := range shs { end := &stats.End{ BeginTime: statsBegin.BeginTime, EndTime: time.Now(), @@ -1243,7 +1255,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } var payInfo *payloadInfo - if sh != nil || binlog != nil { + if len(shs) != 0 || binlog != nil { payInfo = &payloadInfo{} } d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) @@ -1260,7 +1272,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) } - if sh != nil { + for _, sh := range shs { sh.HandleRPC(stream.Context(), &stats.InPayload{ RecvTime: time.Now(), Payload: v, @@ -1418,16 +1430,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp if channelz.IsOn() { s.incrCallsStarted() } - sh := s.opts.statsHandler + shs := s.opts.statsHandlers var statsBegin *stats.Begin - if sh != nil { + if len(shs) != 0 { beginTime := time.Now() statsBegin = &stats.Begin{ BeginTime: beginTime, IsClientStream: sd.ClientStreams, IsServerStream: sd.ServerStreams, } - sh.HandleRPC(stream.Context(), statsBegin) + for _, sh := range shs { + sh.HandleRPC(stream.Context(), statsBegin) + } } ctx := NewContextWithServerTransportStream(stream.Context(), stream) ss := &serverStream{ @@ -1439,10 +1453,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize, trInfo: trInfo, - statsHandler: sh, + statsHandler: shs, } - if sh != nil || trInfo != nil || channelz.IsOn() { + if len(shs) != 0 || trInfo != nil || channelz.IsOn() { // See comment in processUnaryRPC on defers. defer func() { if trInfo != nil { @@ -1456,7 +1470,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.mu.Unlock() } - if sh != nil { + if len(shs) != 0 { end := &stats.End{ BeginTime: statsBegin.BeginTime, EndTime: time.Now(), @@ -1464,7 +1478,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp if err != nil && err != io.EOF { end.Error = toRPCErr(err) } - sh.HandleRPC(stream.Context(), end) + for _, sh := range shs { + sh.HandleRPC(stream.Context(), end) + } } if channelz.IsOn() { diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 236fc17ec..6d82e0d7c 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -374,9 +374,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp) method := cs.callHdr.Method - sh := cs.cc.dopts.copts.StatsHandler var beginTime time.Time - if sh != nil { + shs := cs.cc.dopts.copts.StatsHandlers + for _, sh := range shs { ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast}) beginTime = time.Now() begin := &stats.Begin{ @@ -414,12 +414,12 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) } return &csAttempt{ - ctx: ctx, - beginTime: beginTime, - cs: cs, - dc: cs.cc.dopts.dc, - statsHandler: sh, - trInfo: trInfo, + ctx: ctx, + beginTime: beginTime, + cs: cs, + dc: cs.cc.dopts.dc, + statsHandlers: shs, + trInfo: trInfo, }, nil } @@ -536,8 +536,8 @@ type csAttempt struct { // and cleared when the finish method is called. trInfo *traceInfo - statsHandler stats.Handler - beginTime time.Time + statsHandlers []stats.Handler + beginTime time.Time // set for newStream errors that may be transparently retried allowTransparentRetry bool @@ -960,8 +960,8 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { } return io.EOF } - if a.statsHandler != nil { - a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now())) + for _, sh := range a.statsHandlers { + sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now())) } if channelz.IsOn() { a.t.IncrMsgSent() @@ -971,7 +971,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { cs := a.cs - if a.statsHandler != nil && payInfo == nil { + if len(a.statsHandlers) != 0 && payInfo == nil { payInfo = &payloadInfo{} } @@ -1008,8 +1008,8 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { } a.mu.Unlock() } - if a.statsHandler != nil { - a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{ + for _, sh := range a.statsHandlers { + sh.HandleRPC(a.ctx, &stats.InPayload{ Client: true, RecvTime: time.Now(), Payload: m, @@ -1068,7 +1068,7 @@ func (a *csAttempt) finish(err error) { ServerLoad: balancerload.Parse(tr), }) } - if a.statsHandler != nil { + for _, sh := range a.statsHandlers { end := &stats.End{ Client: true, BeginTime: a.beginTime, @@ -1076,7 +1076,7 @@ func (a *csAttempt) finish(err error) { Trailer: tr, Error: err, } - a.statsHandler.HandleRPC(a.ctx, end) + sh.HandleRPC(a.ctx, end) } if a.trInfo != nil && a.trInfo.tr != nil { if err == nil { @@ -1445,7 +1445,7 @@ type serverStream struct { maxSendMessageSize int trInfo *traceInfo - statsHandler stats.Handler + statsHandler []stats.Handler binlog binarylog.MethodLogger // serverHeaderBinlogged indicates whether server header has been logged. It @@ -1555,8 +1555,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { Message: data, }) } - if ss.statsHandler != nil { - ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) + if len(ss.statsHandler) != 0 { + for _, sh := range ss.statsHandler { + sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) + } } return nil } @@ -1590,7 +1592,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { } }() var payInfo *payloadInfo - if ss.statsHandler != nil || ss.binlog != nil { + if len(ss.statsHandler) != 0 || ss.binlog != nil { payInfo = &payloadInfo{} } if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { @@ -1605,15 +1607,17 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { } return toRPCErr(err) } - if ss.statsHandler != nil { - ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{ - RecvTime: time.Now(), - Payload: m, - // TODO truncate large payload. - Data: payInfo.uncompressedBytes, - WireLength: payInfo.wireLength + headerLen, - Length: len(payInfo.uncompressedBytes), - }) + if len(ss.statsHandler) != 0 { + for _, sh := range ss.statsHandler { + sh.HandleRPC(ss.s.Context(), &stats.InPayload{ + RecvTime: time.Now(), + Payload: m, + // TODO truncate large payload. + Data: payInfo.uncompressedBytes, + WireLength: payInfo.wireLength + headerLen, + Length: len(payInfo.uncompressedBytes), + }) + } } if ss.binlog != nil { ss.binlog.Log(&binarylog.ClientMessage{ diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 5bc03f9b3..0eb2998cb 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.47.0" +const Version = "1.48.0" diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/vendor/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cdsbalancer.go index d057ed66a..14c1c2e76 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -272,12 +272,12 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc return provider, nil } -func outlierDetectionToConfig(od *xdsresource.OutlierDetection) *outlierdetection.LBConfig { // Already validated - no need to return error +func outlierDetectionToConfig(od *xdsresource.OutlierDetection) outlierdetection.LBConfig { // Already validated - no need to return error if od == nil { // "If the outlier_detection field is not set in the Cluster message, a // "no-op" outlier_detection config will be generated, with interval set // to the maximum possible value and all other fields unset." - A50 - return &outlierdetection.LBConfig{ + return outlierdetection.LBConfig{ Interval: 1<<63 - 1, } } @@ -308,7 +308,7 @@ func outlierDetectionToConfig(od *xdsresource.OutlierDetection) *outlierdetectio } } - return &outlierdetection.LBConfig{ + return outlierdetection.LBConfig{ Interval: od.Interval, BaseEjectionTime: od.BaseEjectionTime, MaxEjectionTime: od.MaxEjectionTime, diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go index d49014cfa..9b373fb36 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -23,10 +23,12 @@ import ( "encoding/json" "errors" "fmt" + "strings" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/grpclog" @@ -35,6 +37,7 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/priority" + "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -99,6 +102,9 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err if err := json.Unmarshal(c, &cfg); err != nil { return nil, fmt.Errorf("unable to unmarshal balancer config %s into cluster-resolver config, error: %v", string(c), err) } + if lbp := cfg.XDSLBPolicy; lbp != nil && !strings.EqualFold(lbp.Name, roundrobin.Name) && !strings.EqualFold(lbp.Name, ringhash.Name) { + return nil, fmt.Errorf("unsupported child policy with name %q, not one of {%q,%q}", lbp.Name, roundrobin.Name, ringhash.Name) + } return &cfg, nil } diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/config.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/config.go index 26e2812d2..2458b1067 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/config.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/config.go @@ -21,13 +21,10 @@ import ( "bytes" "encoding/json" "fmt" - "strings" - "google.golang.org/grpc/balancer/roundrobin" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/outlierdetection" - "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" ) @@ -105,7 +102,7 @@ type DiscoveryMechanism struct { DNSHostname string `json:"dnsHostname,omitempty"` // OutlierDetection is the Outlier Detection LB configuration for this // priority. - OutlierDetection *outlierdetection.LBConfig `json:"outlierDetection,omitempty"` + OutlierDetection outlierdetection.LBConfig `json:"outlierDetection,omitempty"` } // Equal returns whether the DiscoveryMechanism is the same with the parameter. @@ -121,7 +118,7 @@ func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool { return false case dm.DNSHostname != b.DNSHostname: return false - case !dm.OutlierDetection.EqualIgnoringChildPolicy(b.OutlierDetection): + case !dm.OutlierDetection.EqualIgnoringChildPolicy(&b.OutlierDetection): return false } @@ -167,19 +164,3 @@ type LBConfig struct { // is responsible for both locality picking and endpoint picking. XDSLBPolicy *internalserviceconfig.BalancerConfig `json:"xdsLbPolicy,omitempty"` } - -const ( - rrName = roundrobin.Name - rhName = ringhash.Name -) - -func parseConfig(c json.RawMessage) (*LBConfig, error) { - var cfg LBConfig - if err := json.Unmarshal(c, &cfg); err != nil { - return nil, err - } - if lbp := cfg.XDSLBPolicy; lbp != nil && !strings.EqualFold(lbp.Name, rrName) && !strings.EqualFold(lbp.Name, rhName) { - return nil, fmt.Errorf("unsupported child policy with name %q, not one of {%q,%q}", lbp.Name, rrName, rhName) - } - return &cfg, nil -} diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go index 4cce16ff9..a29658ec3 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go @@ -26,11 +26,13 @@ import ( "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/balancer/weightedtarget" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/hierarchy" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" + "google.golang.org/grpc/xds/internal/balancer/outlierdetection" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -51,6 +53,9 @@ type priorityConfig struct { edsResp xdsresource.EndpointsUpdate // addresses is set only if type is DNS. addresses []string + // Each discovery mechanism has a name generator so that the child policies + // can reuse names between updates (EDS updates for example). + childNameGen *nameGenerator } // buildPriorityConfigJSON builds balancer config for the passed in @@ -118,42 +123,80 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} retAddrs []resolver.Address ) - for i, p := range priorities { + for _, p := range priorities { switch p.mechanism.Type { case DiscoveryMechanismTypeEDS: - names, configs, addrs, err := buildClusterImplConfigForEDS(i, p.edsResp, p.mechanism, xdsLBPolicy) + names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy) if err != nil { return nil, nil, err } retConfig.Priorities = append(retConfig.Priorities, names...) + retAddrs = append(retAddrs, addrs...) + var odCfgs map[string]*outlierdetection.LBConfig + if envconfig.XDSOutlierDetection { + odCfgs = convertClusterImplMapToOutlierDetection(configs, p.mechanism.OutlierDetection) + for n, c := range odCfgs { + retConfig.Children[n] = &priority.Child{ + Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: c}, + // Ignore all re-resolution from EDS children. + IgnoreReresolutionRequests: true, + } + } + continue + } for n, c := range configs { retConfig.Children[n] = &priority.Child{ Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c}, // Ignore all re-resolution from EDS children. IgnoreReresolutionRequests: true, } + } - retAddrs = append(retAddrs, addrs...) case DiscoveryMechanismTypeLogicalDNS: - name, config, addrs := buildClusterImplConfigForDNS(i, p.addresses, p.mechanism) + name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism) retConfig.Priorities = append(retConfig.Priorities, name) + retAddrs = append(retAddrs, addrs...) + var odCfg *outlierdetection.LBConfig + if envconfig.XDSOutlierDetection { + odCfg = makeClusterImplOutlierDetectionChild(config, p.mechanism.OutlierDetection) + retConfig.Children[name] = &priority.Child{ + Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: odCfg}, + // Not ignore re-resolution from DNS children, they will trigger + // DNS to re-resolve. + IgnoreReresolutionRequests: false, + } + continue + } retConfig.Children[name] = &priority.Child{ Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config}, // Not ignore re-resolution from DNS children, they will trigger // DNS to re-resolve. IgnoreReresolutionRequests: false, } - retAddrs = append(retAddrs, addrs...) } } return retConfig, retAddrs, nil } -func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) { +func convertClusterImplMapToOutlierDetection(ciCfgs map[string]*clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) map[string]*outlierdetection.LBConfig { + odCfgs := make(map[string]*outlierdetection.LBConfig, len(ciCfgs)) + for n, c := range ciCfgs { + odCfgs[n] = makeClusterImplOutlierDetectionChild(c, odCfg) + } + return odCfgs +} + +func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) *outlierdetection.LBConfig { + odCfgRet := odCfg + odCfgRet.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: ciCfg} + return &odCfgRet +} + +func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) { // Endpoint picking policy for DNS is hardcoded to pick_first. const childPolicy = "pick_first" retAddrs := make([]resolver.Address, 0, len(addrStrs)) - pName := fmt.Sprintf("priority-%v", parentPriority) + pName := fmt.Sprintf("priority-%v", g.prefix) for _, addrStr := range addrStrs { retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName})) } @@ -172,7 +215,7 @@ func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string, mechani // - map{"p0":p0_config, "p1":p1_config} // - [p0_address_0, p0_address_1, p1_address_0, p1_address_1] // - p0 addresses' hierarchy attributes are set to p0 -func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) { +func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) { drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops)) for _, d := range edsResp.Drops { drops = append(drops, clusterimpl.DropConfig{ @@ -181,15 +224,12 @@ func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsresource.Endpoi }) } - priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities) - retNames := make([]string, 0, len(priorityChildNames)) - retAddrs := make([]resolver.Address, 0, len(priorityChildNames)) - retConfigs := make(map[string]*clusterimpl.LBConfig, len(priorityChildNames)) - for _, priorityName := range priorityChildNames { - priorityLocalities := priorities[priorityName] - // Prepend parent priority to the priority names, to avoid duplicates. - pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName) - retNames = append(retNames, pName) + priorities := groupLocalitiesByPriority(edsResp.Localities) + retNames := g.generate(priorities) + retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames)) + var retAddrs []resolver.Address + for i, pName := range retNames { + priorityLocalities := priorities[i] cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy) if err != nil { return nil, nil, nil, err @@ -202,33 +242,29 @@ func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsresource.Endpoi // groupLocalitiesByPriority returns the localities grouped by priority. // -// It also returns a list of strings where each string represents a priority, -// and the list is sorted from higher priority to lower priority. +// The returned list is sorted from higher priority to lower. Each item in the +// list is a group of localities. // // For example, for L0-p0, L1-p0, L2-p1, results will be -// - ["p0", "p1"] -// - map{"p0":[L0, L1], "p1":[L2]} -func groupLocalitiesByPriority(localities []xdsresource.Locality) ([]string, map[string][]xdsresource.Locality) { +// - [[L0, L1], [L2]] +func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresource.Locality { var priorityIntSlice []int - priorities := make(map[string][]xdsresource.Locality) + priorities := make(map[int][]xdsresource.Locality) for _, locality := range localities { - if locality.Weight == 0 { - continue - } - priorityName := fmt.Sprintf("%v", locality.Priority) - priorities[priorityName] = append(priorities[priorityName], locality) - priorityIntSlice = append(priorityIntSlice, int(locality.Priority)) + priority := int(locality.Priority) + priorities[priority] = append(priorities[priority], locality) + priorityIntSlice = append(priorityIntSlice, priority) } // Sort the priorities based on the int value, deduplicate, and then turn // the sorted list into a string list. This will be child names, in priority // order. sort.Ints(priorityIntSlice) priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice) - priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped)) + ret := make([][]xdsresource.Locality, 0, len(priorityIntSliceDeduped)) for _, p := range priorityIntSliceDeduped { - priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p)) + ret = append(ret, priorities[p]) } - return priorityNameSlice, priorities + return ret } func dedupSortedIntSlice(a []int) []int { @@ -265,22 +301,22 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority // ChildPolicy is not set. Will be set based on xdsLBPolicy } - if xdsLBPolicy == nil || xdsLBPolicy.Name == rrName { + if xdsLBPolicy == nil || xdsLBPolicy.Name == roundrobin.Name { // If lb policy is ROUND_ROBIN: // - locality-picking policy is weighted_target // - endpoint-picking policy is round_robin - logger.Infof("xds lb policy is %q, building config with weighted_target + round_robin", rrName) + logger.Infof("xds lb policy is %q, building config with weighted_target + round_robin", roundrobin.Name) // Child of weighted_target is hardcoded to round_robin. wtConfig, addrs := localitiesToWeightedTarget(localities, priorityName, rrBalancerConfig) clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig} return clusterImplCfg, addrs, nil } - if xdsLBPolicy.Name == rhName { + if xdsLBPolicy.Name == ringhash.Name { // If lb policy is RIHG_HASH, will build one ring_hash policy as child. // The endpoints from all localities will be flattened to one addresses // list, and the ring_hash policy will pick endpoints from it. - logger.Infof("xds lb policy is %q, building config with ring_hash", rhName) + logger.Infof("xds lb policy is %q, building config with ring_hash", ringhash.Name) addrs := localitiesToRingHash(localities, priorityName) // Set child to ring_hash, note that the ring_hash config is from // xdsLBPolicy. @@ -288,7 +324,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority return clusterImplCfg, addrs, nil } - return nil, nil, fmt.Errorf("unsupported xds LB policy %q, not one of {%q,%q}", xdsLBPolicy.Name, rrName, rhName) + return nil, nil, fmt.Errorf("unsupported xds LB policy %q, not one of {%q,%q}", xdsLBPolicy.Name, roundrobin.Name, ringhash.Name) } // localitiesToRingHash takes a list of localities (with the same priority), and diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder_childname.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder_childname.go new file mode 100644 index 000000000..119f4c474 --- /dev/null +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder_childname.go @@ -0,0 +1,88 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package clusterresolver + +import ( + "fmt" + + "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" +) + +// nameGenerator generates a child name for a list of priorities (each priority +// is a list of localities). +// +// The purpose of this generator is to reuse names between updates. So the +// struct keeps state between generate() calls, and a later generate() might +// return names returned by the previous call. +type nameGenerator struct { + existingNames map[internal.LocalityID]string + prefix uint64 + nextID uint64 +} + +func newNameGenerator(prefix uint64) *nameGenerator { + return &nameGenerator{prefix: prefix} +} + +// generate returns a list of names for the given list of priorities. +// +// Each priority is a list of localities. The name for the priority is picked as +// - for each locality in this priority, if it exists in the existing names, +// this priority will reuse the name +// - if no reusable name is found for this priority, a new name is generated +// +// For example: +// - update 1: [[L1], [L2], [L3]] --> ["0", "1", "2"] +// - update 2: [[L1], [L2], [L3]] --> ["0", "1", "2"] +// - update 3: [[L1, L2], [L3]] --> ["0", "2"] (Two priorities were merged) +// - update 4: [[L1], [L4]] --> ["0", "3",] (A priority was split, and a new priority was added) +func (ng *nameGenerator) generate(priorities [][]xdsresource.Locality) []string { + var ret []string + usedNames := make(map[string]bool) + newNames := make(map[internal.LocalityID]string) + for _, priority := range priorities { + var nameFound string + for _, locality := range priority { + if name, ok := ng.existingNames[locality.ID]; ok { + if !usedNames[name] { + nameFound = name + // Found a name to use. No need to process the remaining + // localities. + break + } + } + } + + if nameFound == "" { + // No appropriate used name is found. Make a new name. + nameFound = fmt.Sprintf("priority-%d-%d", ng.prefix, ng.nextID) + ng.nextID++ + } + + ret = append(ret, nameFound) + // All localities in this priority share the same name. Add them all to + // the new map. + for _, l := range priority { + newNames[l.ID] = nameFound + } + usedNames[nameFound] = true + } + ng.existingNames = newNames + return ret +} diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go index 9d7db26ad..3e4e7a7af 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -55,6 +55,8 @@ type resolverMechanismTuple struct { dm DiscoveryMechanism dmKey discoveryMechanismKey r discoveryMechanism + + childNameGen *nameGenerator } type resourceResolver struct { @@ -62,17 +64,28 @@ type resourceResolver struct { updateChannel chan *resourceUpdate // mu protects the slice and map, and content of the resolvers in the slice. - mu sync.Mutex - mechanisms []DiscoveryMechanism - children []resolverMechanismTuple - childrenMap map[discoveryMechanismKey]discoveryMechanism + mu sync.Mutex + mechanisms []DiscoveryMechanism + children []resolverMechanismTuple + // childrenMap's value only needs the resolver implementation (type + // discoveryMechanism) and the childNameGen. The other two fields are not + // used. + // + // TODO(cleanup): maybe we can make a new type with just the necessary + // fields, and use it here instead. + childrenMap map[discoveryMechanismKey]resolverMechanismTuple + // Each new discovery mechanism needs a child name generator to reuse child + // policy names. But to make sure the names across discover mechanism + // doesn't conflict, we need a seq ID. This ID is incremented for each new + // discover mechanism. + childNameGeneratorSeqID uint64 } func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver { return &resourceResolver{ parent: parent, updateChannel: make(chan *resourceUpdate, 1), - childrenMap: make(map[discoveryMechanismKey]discoveryMechanism), + childrenMap: make(map[discoveryMechanismKey]resolverMechanismTuple), } } @@ -112,31 +125,54 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { dmKey := discoveryMechanismKey{typ: dm.Type, name: nameToWatch} newDMs[dmKey] = true - r := rr.childrenMap[dmKey] - if r == nil { - r = newEDSResolver(nameToWatch, rr.parent.xdsClient, rr) + r, ok := rr.childrenMap[dmKey] + if !ok { + r = resolverMechanismTuple{ + dm: dm, + dmKey: dmKey, + r: newEDSResolver(nameToWatch, rr.parent.xdsClient, rr), + childNameGen: newNameGenerator(rr.childNameGeneratorSeqID), + } rr.childrenMap[dmKey] = r + rr.childNameGeneratorSeqID++ + } else { + // If this is not new, keep the fields (especially + // childNameGen), and only update the DiscoveryMechanism. + // + // Note that the same dmKey doesn't mean the same + // DiscoveryMechanism. There are fields (e.g. + // MaxConcurrentRequests) in DiscoveryMechanism that are not + // copied to dmKey, we need to keep those updated. + r.dm = dm } - rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r} + rr.children[i] = r case DiscoveryMechanismTypeLogicalDNS: // Name to resolve in DNS is the hostname, not the ClientConn // target. dmKey := discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname} newDMs[dmKey] = true - r := rr.childrenMap[dmKey] - if r == nil { - r = newDNSResolver(dm.DNSHostname, rr) + r, ok := rr.childrenMap[dmKey] + if !ok { + r = resolverMechanismTuple{ + dm: dm, + dmKey: dmKey, + r: newDNSResolver(dm.DNSHostname, rr), + childNameGen: newNameGenerator(rr.childNameGeneratorSeqID), + } rr.childrenMap[dmKey] = r + rr.childNameGeneratorSeqID++ + } else { + r.dm = dm } - rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r} + rr.children[i] = r } } // Stop the resources that were removed. for dm, r := range rr.childrenMap { if !newDMs[dm] { delete(rr.childrenMap, dm) - r.stop() + r.r.stop() } } // Regenerate even if there's no change in discovery mechanism, in case @@ -150,7 +186,7 @@ func (rr *resourceResolver) resolveNow() { rr.mu.Lock() defer rr.mu.Unlock() for _, r := range rr.childrenMap { - r.resolveNow() + r.r.resolveNow() } } @@ -159,7 +195,7 @@ func (rr *resourceResolver) stop() { defer rr.mu.Unlock() for dm, r := range rr.childrenMap { delete(rr.childrenMap, dm) - r.stop() + r.r.stop() } rr.mechanisms = nil rr.children = nil @@ -174,13 +210,7 @@ func (rr *resourceResolver) stop() { func (rr *resourceResolver) generate() { var ret []priorityConfig for _, rDM := range rr.children { - r, ok := rr.childrenMap[rDM.dmKey] - if !ok { - rr.parent.logger.Infof("resolver for %+v not found, should never happen", rDM.dmKey) - continue - } - - u, ok := r.lastUpdate() + u, ok := rDM.r.lastUpdate() if !ok { // Don't send updates to parent until all resolvers have update to // send. @@ -188,9 +218,9 @@ func (rr *resourceResolver) generate() { } switch uu := u.(type) { case xdsresource.EndpointsUpdate: - ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu}) + ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu, childNameGen: rDM.childNameGen}) case []string: - ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu}) + ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu, childNameGen: rDM.childNameGen}) } } select { diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go b/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go new file mode 100644 index 000000000..872946138 --- /dev/null +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go @@ -0,0 +1,107 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package outlierdetection provides an implementation of the outlier detection +// LB policy, as defined in +// https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md. +package outlierdetection + +import ( + "encoding/json" + "errors" + "fmt" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/serviceconfig" +) + +// Name is the name of the outlier detection balancer. +const Name = "outlier_detection_experimental" + +func init() { + if envconfig.XDSOutlierDetection { + balancer.Register(bb{}) + } + // TODO: Remove these once the Outlier Detection env var is removed. + internal.RegisterOutlierDetectionBalancerForTesting = func() { + balancer.Register(bb{}) + } + internal.UnregisterOutlierDetectionBalancerForTesting = func() { + internal.BalancerUnregister(Name) + } +} + +type bb struct{} + +func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + return nil +} + +func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + var lbCfg *LBConfig + if err := json.Unmarshal(s, &lbCfg); err != nil { // Validates child config if present as well. + return nil, fmt.Errorf("xds: unable to unmarshal LBconfig: %s, error: %v", string(s), err) + } + + // Note: in the xds flow, these validations will never fail. The xdsclient + // performs the same validations as here on the xds Outlier Detection + // resource before parsing into the internal struct which gets marshaled + // into JSON before calling this function. A50 defines two separate places + // for these validations to take place, the xdsclient and this ParseConfig + // method. "When parsing a config from JSON, if any of these requirements is + // violated, that should be treated as a parsing error." - A50 + + switch { + // "The google.protobuf.Duration fields interval, base_ejection_time, and + // max_ejection_time must obey the restrictions in the + // google.protobuf.Duration documentation and they must have non-negative + // values." - A50 + // Approximately 290 years is the maximum time that time.Duration (int64) + // can represent. The restrictions on the protobuf.Duration field are to be + // within +-10000 years. Thus, just check for negative values. + case lbCfg.Interval < 0: + return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.interval = %s; must be >= 0", lbCfg.Interval) + case lbCfg.BaseEjectionTime < 0: + return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.base_ejection_time = %s; must be >= 0", lbCfg.BaseEjectionTime) + case lbCfg.MaxEjectionTime < 0: + return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_time = %s; must be >= 0", lbCfg.MaxEjectionTime) + // "The fields max_ejection_percent, + // success_rate_ejection.enforcement_percentage, + // failure_percentage_ejection.threshold, and + // failure_percentage.enforcement_percentage must have values less than or + // equal to 100." - A50 + case lbCfg.MaxEjectionPercent > 100: + return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_percent = %v; must be <= 100", lbCfg.MaxEjectionPercent) + case lbCfg.SuccessRateEjection != nil && lbCfg.SuccessRateEjection.EnforcementPercentage > 100: + return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.SuccessRateEjection.enforcement_percentage = %v; must be <= 100", lbCfg.SuccessRateEjection.EnforcementPercentage) + case lbCfg.FailurePercentageEjection != nil && lbCfg.FailurePercentageEjection.Threshold > 100: + return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.threshold = %v; must be <= 100", lbCfg.FailurePercentageEjection.Threshold) + case lbCfg.FailurePercentageEjection != nil && lbCfg.FailurePercentageEjection.EnforcementPercentage > 100: + return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.enforcement_percentage = %v; must be <= 100", lbCfg.FailurePercentageEjection.EnforcementPercentage) + case lbCfg.ChildPolicy == nil: + return nil, errors.New("OutlierDetectionLoadBalancingConfig.child_policy must be present") + } + + return lbCfg, nil +} + +func (bb) Name() string { + return Name +} diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/config.go b/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/config.go index da8311263..c931674ae 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/config.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/config.go @@ -15,8 +15,6 @@ * limitations under the License. */ -// Package outlierdetection implements a balancer that implements -// Outlier Detection. package outlierdetection import ( diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer.go b/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer.go index 672f10122..d05ef18c2 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer.go @@ -30,6 +30,8 @@ import ( "time" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancergroup" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/grpclog" @@ -53,7 +55,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba b := &priorityBalancer{ cc: cc, done: grpcsync.NewEvent(), - childToPriority: make(map[string]int), children: make(map[string]*childBalancer), childBalancerStateUpdate: buffer.NewUnbounded(), } @@ -90,16 +91,17 @@ type priorityBalancer struct { mu sync.Mutex childInUse string - // priority of the child that's current in use. Int starting from 0, and 0 - // is the higher priority. - priorityInUse int // priorities is a list of child names from higher to lower priority. priorities []string - // childToPriority is a map from the child name to it's priority. Priority - // is an int start from 0, and 0 is the higher priority. - childToPriority map[string]int // children is a map from child name to sub-balancers. children map[string]*childBalancer + + // Set during UpdateClientConnState when calling into sub-balancers. + // Prevents child updates from recomputing the active priority or sending + // an update of the aggregated picker to the parent. Cleared after all + // sub-balancers have finished UpdateClientConnState, after which + // syncPriority is called manually. + inhibitPickerUpdates bool } func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error { @@ -111,7 +113,6 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err addressesSplit := hierarchy.Group(s.ResolverState.Addresses) b.mu.Lock() - defer b.mu.Unlock() // Create and remove children, since we know all children from the config // are used by some priority. for name, newSubConfig := range newConfig.Children { @@ -146,15 +147,14 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err } // Update config and address, but note that this doesn't send the - // updates to child balancer (the child balancer might not be built, if - // it's a low priority). + // updates to non-started child balancers (the child balancer might not + // be built, if it's a low priority). currentChild.updateConfig(newSubConfig, resolver.State{ Addresses: addressesSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, }) } - // Remove child from children if it's not in new config. for name, oldChild := range b.children { if _, ok := newConfig.Children[name]; !ok { @@ -164,13 +164,32 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err // Update priorities and handle priority changes. b.priorities = newConfig.Priorities - b.childToPriority = make(map[string]int, len(newConfig.Priorities)) - for pi, pName := range newConfig.Priorities { - b.childToPriority[pName] = pi + + // Everything was removed by the update. + if len(b.priorities) == 0 { + b.childInUse = "" + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: base.NewErrPicker(ErrAllPrioritiesRemoved), + }) + b.mu.Unlock() + return nil } - // Sync the states of all children to the new updated priorities. This - // include starting/stopping child balancers when necessary. - b.syncPriority(true) + + // This will sync the states of all children to the new updated + // priorities. Includes starting/stopping child balancers when necessary. + // Block picker updates until all children have had a chance to call + // UpdateState to prevent races where, e.g., the active priority reports + // transient failure but a higher priority may have reported something that + // made it active, and if the transient failure update is handled first, + // RPCs could fail. + b.inhibitPickerUpdates = true + // Add an item to queue to notify us when the current items in the queue + // are done and syncPriority has been called. + done := make(chan struct{}) + b.childBalancerStateUpdate.Put(resumePickerUpdates{done: done}) + b.mu.Unlock() + <-done return nil } @@ -206,7 +225,7 @@ func (b *priorityBalancer) ExitIdle() { // UpdateState implements balancergroup.BalancerStateAggregator interface. The // balancer group sends new connectivity state and picker here. func (b *priorityBalancer) UpdateState(childName string, state balancer.State) { - b.childBalancerStateUpdate.Put(&childBalancerState{ + b.childBalancerStateUpdate.Put(childBalancerState{ name: childName, s: state, }) @@ -217,6 +236,10 @@ type childBalancerState struct { s balancer.State } +type resumePickerUpdates struct { + done chan struct{} +} + // run handles child update in a separate goroutine, so if the child sends // updates inline (when called by parent), it won't cause deadlocks (by trying // to hold the same mutex). @@ -225,11 +248,22 @@ func (b *priorityBalancer) run() { select { case u := <-b.childBalancerStateUpdate.Get(): b.childBalancerStateUpdate.Load() - s := u.(*childBalancerState) // Needs to handle state update in a goroutine, because each state // update needs to start/close child policy, could result in // deadlock. - b.handleChildStateUpdate(s.name, s.s) + b.mu.Lock() + if b.done.HasFired() { + return + } + switch s := u.(type) { + case childBalancerState: + b.handleChildStateUpdate(s.name, s.s) + case resumePickerUpdates: + b.inhibitPickerUpdates = false + b.syncPriority("") + close(s.done) + } + b.mu.Unlock() case <-b.done.Done(): return } diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer_child.go b/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer_child.go index c00a56b8f..34bab34c9 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer_child.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer_child.go @@ -44,7 +44,8 @@ type childBalancer struct { // will be restarted if the child has not reported TF more recently than it // reported Ready or Idle. reportedTF bool - state balancer.State + // The latest state the child balancer provided. + state balancer.State // The timer to give a priority some time to connect. And if the priority // doesn't go into Ready/Failure, the next priority will be started. initTimer *timerWrapper @@ -74,11 +75,14 @@ func (cb *childBalancer) updateBuilder(bb balancer.Builder) { } // updateConfig sets childBalancer's config and state, but doesn't send update to -// the child balancer. +// the child balancer unless it is started. func (cb *childBalancer) updateConfig(child *Child, rState resolver.State) { cb.ignoreReresolutionRequests = child.IgnoreReresolutionRequests cb.config = child.Config.Config cb.rState = rState + if cb.started { + cb.sendUpdate() + } } // start builds the child balancer if it's not already started. @@ -91,6 +95,7 @@ func (cb *childBalancer) start() { cb.started = true cb.parent.bg.Add(cb.name, cb.bb) cb.startInitTimer() + cb.sendUpdate() } // sendUpdate sends the addresses and config to the child balancer. @@ -145,7 +150,7 @@ func (cb *childBalancer) startInitTimer() { // Re-sync the priority. This will switch to the next priority if // there's any. Note that it's important sync() is called after setting // initTimer to nil. - cb.parent.syncPriority(false) + cb.parent.syncPriority("") }) } diff --git a/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer_priority.go b/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer_priority.go index 2487c2626..33068709e 100644 --- a/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer_priority.go +++ b/vendor/google.golang.org/grpc/xds/internal/balancer/priority/balancer_priority.go @@ -23,7 +23,6 @@ import ( "time" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" ) @@ -59,7 +58,7 @@ var ( // - If balancer is Connecting and has non-nil initTimer (meaning it // transitioned from Ready or Idle to connecting, not from TF, so we // should give it init-time to connect). -// - If balancer is READY +// - If balancer is READY or IDLE // - If this is the lowest priority // - do the following: // - if this is not the old childInUse, override picker so old picker is no @@ -68,18 +67,10 @@ var ( // - forward the new addresses and config // // Caller must hold b.mu. -func (b *priorityBalancer) syncPriority(forceUpdate bool) { - // Everything was removed by the update. - if len(b.priorities) == 0 { - b.childInUse = "" - b.priorityInUse = 0 - b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.TransientFailure, - Picker: base.NewErrPicker(ErrAllPrioritiesRemoved), - }) +func (b *priorityBalancer) syncPriority(childUpdating string) { + if b.inhibitPickerUpdates { return } - for p, name := range b.priorities { child, ok := b.children[name] if !ok { @@ -92,23 +83,14 @@ func (b *priorityBalancer) syncPriority(forceUpdate bool) { child.state.ConnectivityState == connectivity.Idle || (child.state.ConnectivityState == connectivity.Connecting && child.initTimer != nil) || p == len(b.priorities)-1 { - if b.childInUse != "" && b.childInUse != child.name { - // childInUse was set and is different from this child, will - // change childInUse later. We need to update picker here - // immediately so parent stops using the old picker. + if b.childInUse != child.name || child.name == childUpdating { + logger.Warningf("ciu, cn, cu: %v, %v, %v", b.childInUse, child.name, childUpdating) + // If we switch children or the child in use just updated its + // picker, push the child's picker to the parent. b.cc.UpdateState(child.state) } b.logger.Infof("switching to (%q, %v) in syncPriority", child.name, p) - oldChildInUse := b.childInUse b.switchToChild(child, p) - if b.childInUse != oldChildInUse || forceUpdate { - // If child is switched, send the update to the new child. - // - // Or if forceUpdate is true (when this is triggered by a - // ClientConn update), because the ClientConn update might - // contain changes for this child. - child.sendUpdate() - } break } } @@ -163,7 +145,6 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) { return } b.childInUse = child.name - b.priorityInUse = priority if !child.started { child.start() @@ -173,40 +154,13 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) { // handleChildStateUpdate start/close priorities based on the connectivity // state. func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.State) { - b.mu.Lock() - defer b.mu.Unlock() - if b.done.HasFired() { - return - } - - priority, ok := b.childToPriority[childName] - if !ok { - b.logger.Warningf("priority: received picker update with unknown child %v", childName) - return - } - - if b.childInUse == "" { - b.logger.Warningf("priority: no child is in use when picker update is received") - return - } - - // priorityInUse is higher than this priority. - if b.priorityInUse < priority { - // Lower priorities should all be closed, this is an unexpected update. - // Can happen if the child policy sends an update after we tell it to - // close. - b.logger.Warningf("priority: received picker update from priority %v, lower than priority in use %v", priority, b.priorityInUse) - return - } - // Update state in child. The updated picker will be sent to parent later if // necessary. child, ok := b.children[childName] if !ok { - b.logger.Warningf("priority: child balancer not found for child %v, priority %v", childName, priority) + b.logger.Warningf("priority: child balancer not found for child %v", childName) return } - oldChildState := child.state child.state = s // We start/stop the init timer of this child based on the new connectivity @@ -227,36 +181,5 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S // New state is Shutdown, should never happen. Don't forward. } - oldPriorityInUse := b.priorityInUse - child.parent.syncPriority(false) - // If child is switched by syncPriority(), it also sends the update from the - // new child to overwrite the old picker used by the parent. - // - // But no update is sent if the child is not switches. That means if this - // update is from childInUse, and this child is still childInUse after - // syncing, the update being handled here is not sent to the parent. In that - // case, we need to do an explicit check here to forward the update. - if b.priorityInUse == oldPriorityInUse && b.priorityInUse == priority { - // Special handling for Connecting. If child was not switched, and this - // is a Connecting->Connecting transition, do not send the redundant - // update, since all Connecting pickers are the same (they tell the RPCs - // to repick). - // - // This can happen because the initial state of a child (before any - // update is received) is Connecting. When the child is started, it's - // picker is sent to the parent by syncPriority (to overwrite the old - // picker if there's any). When it reports Connecting after being - // started, it will send a Connecting update (handled here), causing a - // Connecting->Connecting transition. - if oldChildState.ConnectivityState == connectivity.Connecting && s.ConnectivityState == connectivity.Connecting { - return - } - // Only forward this update if sync() didn't switch child, and this - // child is in use. - // - // sync() forwards the update if the child was switched, so there's no - // need to forward again. - b.cc.UpdateState(child.state) - } - + child.parent.syncPriority(childName) } diff --git a/vendor/google.golang.org/grpc/xds/internal/clusterspecifier/rls/rls.go b/vendor/google.golang.org/grpc/xds/internal/clusterspecifier/rls/rls.go index 69fb7f4a9..a167cc5fa 100644 --- a/vendor/google.golang.org/grpc/xds/internal/clusterspecifier/rls/rls.go +++ b/vendor/google.golang.org/grpc/xds/internal/clusterspecifier/rls/rls.go @@ -38,27 +38,15 @@ func init() { if envconfig.XDSRLS { clusterspecifier.Register(rls{}) } -} - -// RegisterForTesting registers the RLS Cluster Specifier Plugin for testing -// purposes, regardless of the XDSRLS environment variable. This is needed -// because there is no way to set the XDSRLS environment variable to true in a -// test before init() in this package is run. -// -// TODO: Remove this function once the RLS env var is removed. -func RegisterForTesting() { - clusterspecifier.Register(rls{}) -} -// UnregisterForTesting unregisters the RLS Cluster Specifier Plugin for testing -// purposes. This is needed because there is no way to unregister the RLS -// Cluster Specifier Plugin after registering it solely for testing purposes -// using rls.RegisterForTesting(). -// -// TODO: Remove this function once the RLS env var is removed. -func UnregisterForTesting() { - for _, typeURL := range rls.TypeURLs(rls{}) { - clusterspecifier.UnregisterForTesting(typeURL) + // TODO: Remove these once the RLS env var is removed. + internal.RegisterRLSClusterSpecifierPluginForTesting = func() { + clusterspecifier.Register(rls{}) + } + internal.UnregisterRLSClusterSpecifierPluginForTesting = func() { + for _, typeURL := range rls.TypeURLs(rls{}) { + clusterspecifier.UnregisterForTesting(typeURL) + } } } diff --git a/vendor/google.golang.org/grpc/xds/internal/httpfilter/rbac/rbac.go b/vendor/google.golang.org/grpc/xds/internal/httpfilter/rbac/rbac.go index 3dc4b5682..209283c3b 100644 --- a/vendor/google.golang.org/grpc/xds/internal/httpfilter/rbac/rbac.go +++ b/vendor/google.golang.org/grpc/xds/internal/httpfilter/rbac/rbac.go @@ -27,6 +27,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/xds/rbac" @@ -41,26 +42,15 @@ func init() { if envconfig.XDSRBAC { httpfilter.Register(builder{}) } -} - -// RegisterForTesting registers the RBAC HTTP Filter for testing purposes, -// regardless of the RBAC environment variable. This is needed because there is -// no way to set the RBAC environment variable to true in a test before init() -// in this package is run. -// -// TODO: Remove this function once the RBAC env var is removed. -func RegisterForTesting() { - httpfilter.Register(builder{}) -} -// UnregisterForTesting unregisters the RBAC HTTP Filter for testing purposes. -// This is needed because there is no way to unregister the HTTP Filter after -// registering it solely for testing purposes using rbac.RegisterForTesting(). -// -// TODO: Remove this function once the RBAC env var is removed. -func UnregisterForTesting() { - for _, typeURL := range builder.TypeURLs(builder{}) { - httpfilter.UnregisterForTesting(typeURL) + // TODO: Remove these once the RBAC env var is removed. + internal.RegisterRBACHTTPFilterForTesting = func() { + httpfilter.Register(builder{}) + } + internal.UnregisterRBACHTTPFilterForTesting = func() { + for _, typeURL := range builder.TypeURLs(builder{}) { + httpfilter.UnregisterForTesting(typeURL) + } } } diff --git a/vendor/google.golang.org/grpc/xds/internal/resolver/xds_resolver.go b/vendor/google.golang.org/grpc/xds/internal/resolver/xds_resolver.go index 8a613c4c4..4f31d9c44 100644 --- a/vendor/google.golang.org/grpc/xds/internal/resolver/xds_resolver.go +++ b/vendor/google.golang.org/grpc/xds/internal/resolver/xds_resolver.go @@ -25,6 +25,7 @@ import ( "strings" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -37,10 +38,10 @@ import ( const xdsScheme = "xds" -// NewBuilderForTesting creates a new xds resolver builder using a specific xds +// newBuilderForTesting creates a new xds resolver builder using a specific xds // bootstrap config, so tests can use multiple xds clients in different // ClientConns at the same time. -func NewBuilderForTesting(config []byte) (resolver.Builder, error) { +func newBuilderForTesting(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ newXDSClient: func() (xdsclient.XDSClient, error) { return xdsclient.NewWithBootstrapContentsForTesting(config) @@ -53,6 +54,7 @@ var newXDSClient = func() (xdsclient.XDSClient, error) { return xdsclient.New() func init() { resolver.Register(&xdsResolverBuilder{}) + internal.NewXDSResolverWithConfigForTesting = newBuilderForTesting } type xdsResolverBuilder struct { diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go index 9bc4588c1..26db726dd 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" + v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/load" "google.golang.org/grpc/xds/internal/xdsclient/pubsub" @@ -102,7 +104,13 @@ func (c *clientImpl) newAuthorityLocked(config *bootstrap.ServerConfig) (_ *auth } // Make a new authority since there's no existing authority for this config. - ret := &authority{config: config, pubsub: pubsub.New(c.watchExpiryTimeout, c.config.XDSServer.NodeProto, c.logger)} + nodeID := "" + if v3, ok := c.config.XDSServer.NodeProto.(*v3corepb.Node); ok { + nodeID = v3.GetId() + } else if v2, ok := c.config.XDSServer.NodeProto.(*v2corepb.Node); ok { + nodeID = v2.GetId() + } + ret := &authority{config: config, pubsub: pubsub.New(c.watchExpiryTimeout, nodeID, c.logger)} defer func() { if retErr != nil { ret.close() diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/pubsub/pubsub.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/pubsub/pubsub.go index 48b5ce489..95e8ac773 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/pubsub/pubsub.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/pubsub/pubsub.go @@ -23,11 +23,9 @@ package pubsub import ( - "fmt" "sync" "time" - "github.com/golang/protobuf/proto" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" @@ -43,7 +41,7 @@ type Pubsub struct { done *grpcsync.Event logger *grpclog.PrefixLogger watchExpiryTimeout time.Duration - nodeIDJSON string + nodeID string updateCh *buffer.Unbounded // chan *watcherInfoWithUpdate // All the following maps are to keep the updates/metadata in a cache. @@ -65,12 +63,12 @@ type Pubsub struct { // New creates a new Pubsub. // // The passed in nodeID will be attached to all errors sent to the watchers. -func New(watchExpiryTimeout time.Duration, nodeID proto.Message, logger *grpclog.PrefixLogger) *Pubsub { +func New(watchExpiryTimeout time.Duration, nodeID string, logger *grpclog.PrefixLogger) *Pubsub { pb := &Pubsub{ done: grpcsync.NewEvent(), logger: logger, watchExpiryTimeout: watchExpiryTimeout, - nodeIDJSON: fmt.Sprint(nodeID), + nodeID: nodeID, updateCh: buffer.NewUnbounded(), ldsWatchers: make(map[string]map[*watchInfo]bool), diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/pubsub/watch.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/pubsub/watch.go index 2fc6bb2d6..bef179936 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/pubsub/watch.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/pubsub/watch.go @@ -115,9 +115,9 @@ func (wi *watchInfo) sendErrorLocked(err error) { errMsg := err.Error() errTyp := xdsresource.ErrType(err) if errTyp == xdsresource.ErrorTypeUnknown { - err = fmt.Errorf("%v, xDS client nodeID: %s", errMsg, wi.c.nodeIDJSON) + err = fmt.Errorf("%v, xDS client nodeID: %s", errMsg, wi.c.nodeID) } else { - err = xdsresource.NewErrorf(errTyp, "%v, xDS client nodeID: %s", errMsg, wi.c.nodeIDJSON) + err = xdsresource.NewErrorf(errTyp, "%v, xDS client nodeID: %s", errMsg, wi.c.nodeID) } wi.c.scheduleCallback(wi, u, err) diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go index ad590160f..ec70f32ca 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go @@ -64,7 +64,10 @@ type Locality struct { // EndpointsUpdate contains an EDS update. type EndpointsUpdate struct { - Drops []OverloadDropConfig + Drops []OverloadDropConfig + // Localities in the EDS response with `load_balancing_weight` field not set + // or explicitly set to 0 are ignored while parsing the resource, and + // therefore do not show up here. Localities []Locality // Raw is the resource from the xds response. diff --git a/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index 9eb7117d9..7cc12d73d 100644 --- a/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -57,7 +57,7 @@ func unmarshalEndpointsResource(r *anypb.Any, logger *grpclog.PrefixLogger) (str } logger.Infof("Resource with name: %v, type: %T, contains: %v", cla.GetClusterName(), cla, pretty.ToJSON(cla)) - u, err := parseEDSRespProto(cla) + u, err := parseEDSRespProto(cla, logger) if err != nil { return cla.GetClusterName(), EndpointsUpdate{}, err } @@ -102,7 +102,7 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) []Endpoint { return endpoints } -func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) { +func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.PrefixLogger) (EndpointsUpdate, error) { ret := EndpointsUpdate{} for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) @@ -113,6 +113,11 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, if l == nil { return EndpointsUpdate{}, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) } + weight := locality.GetLoadBalancingWeight().GetValue() + if weight == 0 { + logger.Warningf("Ignoring locality %s with weight 0", pretty.ToJSON(l)) + continue + } lid := internal.LocalityID{ Region: l.Region, Zone: l.Zone, diff --git a/vendor/google.golang.org/grpc/xds/xds.go b/vendor/google.golang.org/grpc/xds/xds.go index 1b2b0c579..3ff3c76bc 100644 --- a/vendor/google.golang.org/grpc/xds/xds.go +++ b/vendor/google.golang.org/grpc/xds/xds.go @@ -32,17 +32,15 @@ import ( v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" "google.golang.org/grpc" + _ "google.golang.org/grpc/credentials/tls/certprovider/pemfile" // Register the file watcher certificate provider plugin. internaladmin "google.golang.org/grpc/internal/admin" - "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/csds" - - _ "google.golang.org/grpc/credentials/tls/certprovider/pemfile" // Register the file watcher certificate provider plugin. _ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers. _ "google.golang.org/grpc/xds/internal/clusterspecifier/rls" // Register the RLS cluster specifier plugin. Note that this does not register the RLS LB policy. _ "google.golang.org/grpc/xds/internal/httpfilter/fault" // Register the fault injection filter. _ "google.golang.org/grpc/xds/internal/httpfilter/rbac" // Register the RBAC filter. _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter. - xdsresolver "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver. + _ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver _ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v2" // Register the v2 xDS API client. _ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v3" // Register the v3 xDS API client. ) @@ -75,21 +73,3 @@ func init() { return csdss.Close, nil }) } - -// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using -// the provided xds bootstrap config instead of the global configuration from -// the supported environment variables. The resolver.Builder is meant to be -// used in conjunction with the grpc.WithResolvers DialOption. -// -// Testing Only -// -// This function should ONLY be used for testing and may not work with some -// other features, including the CSDS service. -// -// Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. -func NewXDSResolverWithConfigForTesting(bootstrapConfig []byte) (resolver.Builder, error) { - return xdsresolver.NewBuilderForTesting(bootstrapConfig) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index f2abd6a98..a1e03188f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2016,7 +2016,7 @@ google.golang.org/genproto/protobuf/api google.golang.org/genproto/protobuf/field_mask google.golang.org/genproto/protobuf/ptype google.golang.org/genproto/protobuf/source_context -# google.golang.org/grpc v1.47.0 +# google.golang.org/grpc v1.48.0 ## explicit; go 1.14 google.golang.org/grpc google.golang.org/grpc/attributes