diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go new file mode 100644 index 000000000000..627fc3892a33 --- /dev/null +++ b/balancer/randomsubsetting/randomsubsetting.go @@ -0,0 +1,206 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package randomsubsetting defines a random subsetting balancer. +// +// To install random subsetting balancer, import this package as: +// +// import _ "google.golang.org/grpc/balancer/randomsubsetting" +package randomsubsetting + +import ( + "cmp" + "encoding/json" + "fmt" + "slices" + "time" + + "github.com/cespare/xxhash/v2" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/balancer/gracefulswitch" + internalgrpclog "google.golang.org/grpc/internal/grpclog" + iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" +) + +const ( + // Name is the name of the random subsetting load balancer. + Name = "random_subsetting" +) + +var ( + logger = grpclog.Component(Name) +) + +func prefixLogger(p *subsettingBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[random-subsetting-lb %p] ", p)) +} + +func init() { + balancer.Register(bb{}) +} + +type bb struct{} + +func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &subsettingBalancer{ + cc: cc, + hashf: xxhash.NewWithSeed(uint64(time.Now().UnixNano())), + } + // Create a logger with a prefix specific to this balancer instance. + b.logger = prefixLogger(b) + + b.logger.Infof("Created") + b.child = gracefulswitch.NewBalancer(cc, bOpts) + return b +} + +// LBConfig is the config for the random subsetting balancer. +type LBConfig struct { + serviceconfig.LoadBalancingConfig `json:"-"` + + SubsetSize uint64 `json:"subset_size,omitempty"` + ChildPolicy *iserviceconfig.BalancerConfig `json:"child_policy,omitempty"` +} + +func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + lbCfg := &LBConfig{ + SubsetSize: 2, // default value + ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, + } + + if err := json.Unmarshal(s, lbCfg); err != nil { // Validates child config if present as well. + return nil, fmt.Errorf("randomsubsetting: unable to unmarshal LBConfig: %s, error: %v", string(s), err) + } + + // if someone needs SubsetSize == 1, he should use pick_first instead + if lbCfg.SubsetSize < 2 { + return nil, fmt.Errorf("randomsubsetting: SubsetSize must be >= 2") + } + + if lbCfg.ChildPolicy == nil { + return nil, fmt.Errorf("randomsubsetting: child policy field must be set") + } + + // Reject whole config if child policy doesn't exist, don't persist it for + // later. + bb := balancer.Get(lbCfg.ChildPolicy.Name) + if bb == nil { + return nil, fmt.Errorf("randomsubsetting: child balancer %q not registered", lbCfg.ChildPolicy.Name) + } + + return lbCfg, nil +} + +func (bb) Name() string { + return Name +} + +type subsettingBalancer struct { + cc balancer.ClientConn + logger *internalgrpclog.PrefixLogger + cfg *LBConfig + hashf *xxhash.Digest + child *gracefulswitch.Balancer +} + +func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + lbCfg, ok := s.BalancerConfig.(*LBConfig) + if !ok { + b.logger.Errorf("randomsubsetting: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) + return balancer.ErrBadResolverState + } + + if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { + + if err := b.child.SwitchTo(balancer.Get(lbCfg.ChildPolicy.Name)); err != nil { + return fmt.Errorf("randomsubsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) + } + } + b.cfg = lbCfg + + return b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: b.prepareChildResolverState(s), + BalancerConfig: b.cfg.ChildPolicy.Config, + }) +} + +type endpointWithHash struct { + hash uint64 + ep resolver.Endpoint +} + +// implements the subsetting algorithm, +// as described in A68: https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md +func (b *subsettingBalancer) prepareChildResolverState(s balancer.ClientConnState) resolver.State { + subsetSize := b.cfg.SubsetSize + endPoints := s.ResolverState.Endpoints + backendCount := len(endPoints) + if backendCount <= int(subsetSize) || subsetSize < 2 { + return s.ResolverState + } + + // calculate hash for each endpoint + endpointSet := make([]endpointWithHash, backendCount) + for i, endpoint := range endPoints { + b.hashf.Write([]byte(endpoint.Addresses[0].String())) + endpointSet[i] = endpointWithHash{ + hash: b.hashf.Sum64(), + ep: endpoint, + } + } + + // sort endpoint by hash + slices.SortFunc(endpointSet, func(a, b endpointWithHash) int { + return cmp.Compare(a.hash, b.hash) + }) + + if b.logger.V(2) { + b.logger.Infof("randomsubsetting: resulting subset: %v", endpointSet[:subsetSize]) + } + + // Convert back to resolver.Endpoints + endpointSubset := make([]resolver.Endpoint, subsetSize) + for i, endpoint := range endpointSet[:subsetSize] { + endpointSubset[i] = endpoint.ep + } + + return resolver.State{ + Endpoints: endpointSubset, + ServiceConfig: s.ResolverState.ServiceConfig, + Attributes: s.ResolverState.Attributes, + } +} + +func (b *subsettingBalancer) ResolverError(err error) { + b.child.ResolverError(err) +} + +func (b *subsettingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + b.child.UpdateSubConnState(sc, state) +} + +func (b *subsettingBalancer) Close() { + b.child.Close() +} + +func (b *subsettingBalancer) ExitIdle() { + b.child.ExitIdle() +} diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go new file mode 100644 index 000000000000..b2f42d2a5180 --- /dev/null +++ b/balancer/randomsubsetting/randomsubsetting_test.go @@ -0,0 +1,291 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package e2e_test contains e2e test cases for the Subsetting LB Policy. +package randomsubsetting + +import ( + "context" + "encoding/json" + "fmt" + "math" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + + iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +var defaultTestTimeout = 120 * time.Second + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestParseConfig(t *testing.T) { + parser := bb{} + tests := []struct { + name string + input string + wantCfg serviceconfig.LoadBalancingConfig + wantErr string + }{ + { + name: "happy-case-default", + input: `{}`, + wantCfg: &LBConfig{ + SubsetSize: 2, + ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, + }, + }, + { + name: "happy-case-subset_size-set", + input: `{ "subset_size": 3 }`, + wantCfg: &LBConfig{ + SubsetSize: 3, + ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"}, + }, + }, + { + name: "subset_size-less-than-2", + input: `{ "subset_size": 1, + "child_policy": [{"round_robin": {}}]}`, + wantErr: "randomsubsetting: SubsetSize must be >= 2", + }, + { + name: "invalid-json", + input: "{{invalidjson{{", + wantErr: "invalid character", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input)) + // Substring match makes this very tightly coupled to the + // internalserviceconfig.BalancerConfig error strings. However, it + // is important to distinguish the different types of error messages + // possible as the parser has a few defined buckets of ways it can + // error out. + if (gotErr != nil) != (test.wantErr != "") { + t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr) + } + if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) { + t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr) + } + if test.wantErr != "" { + return + } + if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" { + t.Fatalf("ParseConfig(%v) got unexpected output, diff (-got +want): %v", test.input, diff) + } + }) + } +} + +func setupBackends(t *testing.T, backendsCount int) ([]resolver.Address, func()) { + t.Helper() + + backends := make([]*stubserver.StubServer, backendsCount) + addresses := make([]resolver.Address, backendsCount) + for i := 0; i < backendsCount; i++ { + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started good TestService backend at: %q", backend.Address) + backends[i] = backend + addresses[i] = resolver.Address{ + Addr: backend.Address, + } + } + + cancel := func() { + for _, backend := range backends { + backend.Stop() + } + } + return addresses, cancel +} + +func setupClients(t *testing.T, clientsCount int, subsetSize int, addresses []resolver.Address) ([]testgrpc.TestServiceClient, func()) { + t.Helper() + + clients := make([]testgrpc.TestServiceClient, clientsCount) + ccs := make([]*grpc.ClientConn, clientsCount) + var err error + + for i := 0; i < clientsCount; i++ { + mr := manual.NewBuilderWithScheme("subsetting-e2e") + jsonConfig := fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "random_subsetting": { + "subset_size": %d, + "child_policy": [{"round_robin": {}}] + } + } + ] + }`, subsetSize) + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(string(jsonConfig)) + mr.InitialState(resolver.State{ + Addresses: addresses, + ServiceConfig: sc, + }) + + ccs[i], err = grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + clients[i] = testgrpc.NewTestServiceClient(ccs[i]) + } + + cancel := func() { + for _, cc := range ccs { + cc.Close() + } + } + return clients, cancel +} + +func checkRoundRobinRPCs(ctx context.Context, t *testing.T, clients []testgrpc.TestServiceClient, subsetSize int, maxDiff int) { + clientsPerBackend := map[string]map[int]struct{}{} + + for clientIdx, client := range clients { + // make sure that every client send exactly 1 request to each server in its subset + for i := 0; i < subsetSize; i++ { + var peer peer.Peer + _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)) + if err != nil { + t.Fatalf("failed to call server: %v", err) + } + if peer.Addr != nil { + if m, ok := clientsPerBackend[peer.Addr.String()]; !ok { + clientsPerBackend[peer.Addr.String()] = map[int]struct{}{clientIdx: {}} + } else if _, ok := m[clientIdx]; !ok { + m[clientIdx] = struct{}{} + } else { + // The backend receives a second request from the same client. This could happen if the client have 1 backend in READY + // state while the other are CONNECTING. In this case round_robbin will pick the same address twice. + // We are going to retry after short timeout. + time.Sleep(10 * time.Microsecond) + i-- + } + } else { + t.Fatalf("peer.Addr == nil, peer: %v", peer) + } + } + } + + minClientsPerBackend := math.MaxInt + maxClientsPerBackend := 0 + for _, v := range clientsPerBackend { + if len(v) < minClientsPerBackend { + minClientsPerBackend = len(v) + } + if len(v) > maxClientsPerBackend { + maxClientsPerBackend = len(v) + } + } + + if maxClientsPerBackend > minClientsPerBackend+maxDiff { + t.Fatalf("the difference between min and max clients per backend should be <= %d, clientsPerBackend: %v", maxDiff, clientsPerBackend) + } +} + +func (s) TestSubsettingE2E(t *testing.T) { + tests := []struct { + name string + subsetSize int + clients int + backends int + maxDiff int + }{ + { + name: "backends could be evenly distributed between small number of clients", + backends: 3, + clients: 2, + subsetSize: 2, + maxDiff: 1, + }, + { + name: "backends could be evenly distributed between clients", + backends: 12, + clients: 8, + subsetSize: 3, + maxDiff: 3, + }, + { + name: "backends could NOT be evenly distributed between clients", + backends: 37, + clients: 22, + subsetSize: 5, + maxDiff: 15, + }, + { + name: "Nbackends %% subsetSize == 0, but there are not enough clients to fill the last round", + backends: 20, + clients: 7, + subsetSize: 5, + maxDiff: 20, + }, + { + name: "last round is completely filled, but there are some excluded backends on every round", + backends: 21, + clients: 8, + subsetSize: 5, + maxDiff: 3, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + addresses, stopBackends := setupBackends(t, test.backends) + defer stopBackends() + + clients, stopClients := setupClients(t, test.clients, test.subsetSize, addresses) + defer stopClients() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + checkRoundRobinRPCs(ctx, t, clients, test.subsetSize, test.maxDiff) + }) + } +}