diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 17a41ef18e75..8c34e4b9d436 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -515,6 +515,11 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } if d.counter != nil { if err := d.counter.StartRequest(); err != nil { + // Drops by circuit breaking are reported with empty category. They + // will be reported only in total drops, but not in per category. + if d.loadStore != nil { + d.loadStore.CallDropped("") + } return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error()) } pr, err := d.p.Pick(info) diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 2eec6be30f10..b4349a021dca 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -142,7 +142,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { } // The same locality, different drop rate, dropping 50%. - clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50}) + clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50}) clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab5.Build())) @@ -746,6 +746,10 @@ func (s) TestDropPicker(t *testing.T) { } func (s) TestEDS_LoadReport(t *testing.T) { + origCircuitBreakingSupport := env.CircuitBreakingSupport + env.CircuitBreakingSupport = true + defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }() + // We create an xdsClientWrapper with a dummy xdsClientInterface which only // implements the LoadStore() method to return the underlying load.Store to // be used. @@ -758,10 +762,20 @@ func (s) TestEDS_LoadReport(t *testing.T) { edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState + const ( + testServiceName = "test-service" + cbMaxRequests = 20 + ) + var maxRequestsTemp uint32 = cbMaxRequests + client.SetMaxRequests(testServiceName, &maxRequestsTemp) + defer client.ClearCounterForTesting(testServiceName) + edsb.updateServiceRequestsCounter(testServiceName) + backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID) + const testDropCategory = "test-drop" // Two localities, each with one backend. - clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{testDropCategory: 50}) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build())) sc1 := <-cc.NewSubConnCh @@ -788,20 +802,42 @@ func (s) TestEDS_LoadReport(t *testing.T) { // the picks on sc1 should show up as inProgress. locality1JSON, _ := locality1.ToString() locality2JSON, _ := locality2.ToString() + const ( + rpcCount = 100 + // 50% will be dropped with category testDropCategory. + dropWithCategory = rpcCount / 2 + // In the remaining RPCs, only cbMaxRequests are allowed by circuit + // breaking. Others will be dropped by CB. + dropWithCB = rpcCount - dropWithCategory - cbMaxRequests + + rpcInProgress = cbMaxRequests / 2 // 50% of RPCs will be never done. + rpcSucceeded = cbMaxRequests / 2 // 50% of RPCs will succeed. + ) wantStoreData := []*load.Data{{ Cluster: testClusterNames[0], Service: "", LocalityStats: map[string]load.LocalityData{ - locality1JSON: {RequestStats: load.RequestData{InProgress: 5}}, - locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}}, + locality1JSON: {RequestStats: load.RequestData{InProgress: rpcInProgress}}, + locality2JSON: {RequestStats: load.RequestData{Succeeded: rpcSucceeded}}, + }, + TotalDrops: dropWithCategory + dropWithCB, + Drops: map[string]uint64{ + testDropCategory: dropWithCategory, }, }} - for i := 0; i < 10; i++ { + + var rpcsToBeDone []balancer.PickResult + // Run the picks, but only pick with sc1 will be done later. + for i := 0; i < rpcCount; i++ { scst, _ := p1.Pick(balancer.PickInfo{}) if scst.Done != nil && scst.SubConn != sc1 { - scst.Done(balancer.DoneInfo{}) + rpcsToBeDone = append(rpcsToBeDone, scst) } } + // Call done on those sc1 picks. + for _, scst := range rpcsToBeDone { + scst.Done(balancer.DoneInfo{}) + } gotStoreData := loadStore.Stats(testClusterNames[0:1]) if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" { diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index 1e28fc003ff3..74b80f1c3f7c 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -87,3 +87,16 @@ func (c *ServiceRequestsCounter) StartRequest() error { func (c *ServiceRequestsCounter) EndRequest() { atomic.AddUint32(&c.numRequests, ^uint32(0)) } + +// ClearCounterForTesting clears the counter for the service. Should be only +// used in tests. +func ClearCounterForTesting(serviceName string) { + src.mu.Lock() + defer src.mu.Unlock() + c, ok := src.services[serviceName] + if !ok { + return + } + c.maxRequests = defaultMaxRequests + c.numRequests = 0 +} diff --git a/xds/internal/client/load/store.go b/xds/internal/client/load/store.go index a6ec1ec337cd..551a5147b6bd 100644 --- a/xds/internal/client/load/store.go +++ b/xds/internal/client/load/store.go @@ -283,7 +283,12 @@ func (ls *perClusterStore) stats() *Data { return true } sd.TotalDrops += d - sd.Drops[key.(string)] = d + keyStr := key.(string) + if keyStr != "" { + // Skip drops without category. They are counted in total_drops, but + // not in per category. One example is drops by circuit breaking. + sd.Drops[keyStr] = d + } return true }) ls.localityRPCCount.Range(func(key, val interface{}) bool { diff --git a/xds/internal/client/load/store_test.go b/xds/internal/client/load/store_test.go index 4d62ebc4c621..46568591f9e4 100644 --- a/xds/internal/client/load/store_test.go +++ b/xds/internal/client/load/store_test.go @@ -47,9 +47,10 @@ func TestDrops(t *testing.T) { drops = map[string]int{ dropCategories[0]: 30, dropCategories[1]: 40, + "": 10, } wantStoreData = &Data{ - TotalDrops: 70, + TotalDrops: 80, Drops: map[string]uint64{ dropCategories[0]: 30, dropCategories[1]: 40, diff --git a/xds/internal/testutils/protos.go b/xds/internal/testutils/protos.go index 25a0944d96dc..e0dba0e2b301 100644 --- a/xds/internal/testutils/protos.go +++ b/xds/internal/testutils/protos.go @@ -18,7 +18,6 @@ package testutils import ( - "fmt" "net" "strconv" @@ -59,11 +58,11 @@ type ClusterLoadAssignmentBuilder struct { } // NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder. -func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder { +func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents map[string]uint32) *ClusterLoadAssignmentBuilder { var drops []*v2xdspb.ClusterLoadAssignment_Policy_DropOverload - for i, d := range dropPercents { + for n, d := range dropPercents { drops = append(drops, &v2xdspb.ClusterLoadAssignment_Policy_DropOverload{ - Category: fmt.Sprintf("test-drop-%d", i), + Category: n, DropPercentage: &v2typepb.FractionalPercent{ Numerator: d, Denominator: v2typepb.FractionalPercent_HUNDRED,