From 3e508a3658d49c527e8161dde42f04488aed12b5 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 22 Jan 2021 10:24:37 -0800 Subject: [PATCH 1/3] [circuit_breaking_drop_reporting] xds: report drops by circuit breaking Those drops will be reported to store with category "". When reported via LRS, they will only be counted in total_drops, but not in per category. --- xds/internal/balancer/edsbalancer/eds_impl.go | 5 ++ .../balancer/edsbalancer/eds_impl_test.go | 47 ++++++++++++++++--- xds/internal/client/load/store.go | 7 ++- xds/internal/client/load/store_test.go | 3 +- xds/internal/testutils/protos.go | 7 ++- 5 files changed, 57 insertions(+), 12 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 17a41ef18e75..8c34e4b9d436 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -515,6 +515,11 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } if d.counter != nil { if err := d.counter.StartRequest(); err != nil { + // Drops by circuit breaking are reported with empty category. They + // will be reported only in total drops, but not in per category. + if d.loadStore != nil { + d.loadStore.CallDropped("") + } return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error()) } pr, err := d.p.Pick(info) diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 2eec6be30f10..9db07cf1fa72 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -142,7 +142,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { } // The same locality, different drop rate, dropping 50%. - clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50}) + clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50}) clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab5.Build())) @@ -746,6 +746,10 @@ func (s) TestDropPicker(t *testing.T) { } func (s) TestEDS_LoadReport(t *testing.T) { + origCircuitBreakingSupport := env.CircuitBreakingSupport + env.CircuitBreakingSupport = true + defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }() + // We create an xdsClientWrapper with a dummy xdsClientInterface which only // implements the LoadStore() method to return the underlying load.Store to // be used. @@ -758,10 +762,19 @@ func (s) TestEDS_LoadReport(t *testing.T) { edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState + const ( + testServiceName = "test-service" + cbMaxRequests = 20 + ) + var maxRequestsTemp uint32 = cbMaxRequests + client.SetMaxRequests(testServiceName, &maxRequestsTemp) + edsb.updateServiceRequestsCounter(testServiceName) + backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID) + const testDropCategory = "test-drop" // Two localities, each with one backend. - clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{testDropCategory: 50}) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build())) sc1 := <-cc.NewSubConnCh @@ -788,20 +801,42 @@ func (s) TestEDS_LoadReport(t *testing.T) { // the picks on sc1 should show up as inProgress. locality1JSON, _ := locality1.ToString() locality2JSON, _ := locality2.ToString() + const ( + rpcCount = 100 + // 50% will be dropped with category testDropCategory. + dropWithCategory = rpcCount / 2 + // In the remaining RPCs, only cbMaxRequests are allowed by circuit + // breaking. Others will be dropped by CB. + dropWithCB = rpcCount - dropWithCategory - cbMaxRequests + + rpcInProgress = cbMaxRequests / 2 // 50% of RPCs will be never done. + rpcSucceeded = cbMaxRequests / 2 // 50% of RPCs will succeed. + ) wantStoreData := []*load.Data{{ Cluster: testClusterNames[0], Service: "", LocalityStats: map[string]load.LocalityData{ - locality1JSON: {RequestStats: load.RequestData{InProgress: 5}}, - locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}}, + locality1JSON: {RequestStats: load.RequestData{InProgress: rpcInProgress}}, + locality2JSON: {RequestStats: load.RequestData{Succeeded: rpcSucceeded}}, + }, + TotalDrops: dropWithCategory + dropWithCB, + Drops: map[string]uint64{ + testDropCategory: dropWithCategory, }, }} - for i := 0; i < 10; i++ { + + var rpcsToBeDone []balancer.PickResult + // Run the picks, but only pick with sc1 will be done later. + for i := 0; i < rpcCount; i++ { scst, _ := p1.Pick(balancer.PickInfo{}) if scst.Done != nil && scst.SubConn != sc1 { - scst.Done(balancer.DoneInfo{}) + rpcsToBeDone = append(rpcsToBeDone, scst) } } + // Call done on those sc1 picks. + for _, scst := range rpcsToBeDone { + scst.Done(balancer.DoneInfo{}) + } gotStoreData := loadStore.Stats(testClusterNames[0:1]) if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" { diff --git a/xds/internal/client/load/store.go b/xds/internal/client/load/store.go index a6ec1ec337cd..3bbe2513fe45 100644 --- a/xds/internal/client/load/store.go +++ b/xds/internal/client/load/store.go @@ -283,7 +283,12 @@ func (ls *perClusterStore) stats() *Data { return true } sd.TotalDrops += d - sd.Drops[key.(string)] = d + keyStr := key.(string) + if keyStr != "" { + // Skip drops without category. They are counted in total_drops, but + // not in per category. One example is drops by circuit breaking. + sd.Drops[key.(string)] = d + } return true }) ls.localityRPCCount.Range(func(key, val interface{}) bool { diff --git a/xds/internal/client/load/store_test.go b/xds/internal/client/load/store_test.go index 4d62ebc4c621..46568591f9e4 100644 --- a/xds/internal/client/load/store_test.go +++ b/xds/internal/client/load/store_test.go @@ -47,9 +47,10 @@ func TestDrops(t *testing.T) { drops = map[string]int{ dropCategories[0]: 30, dropCategories[1]: 40, + "": 10, } wantStoreData = &Data{ - TotalDrops: 70, + TotalDrops: 80, Drops: map[string]uint64{ dropCategories[0]: 30, dropCategories[1]: 40, diff --git a/xds/internal/testutils/protos.go b/xds/internal/testutils/protos.go index 25a0944d96dc..e0dba0e2b301 100644 --- a/xds/internal/testutils/protos.go +++ b/xds/internal/testutils/protos.go @@ -18,7 +18,6 @@ package testutils import ( - "fmt" "net" "strconv" @@ -59,11 +58,11 @@ type ClusterLoadAssignmentBuilder struct { } // NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder. -func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder { +func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents map[string]uint32) *ClusterLoadAssignmentBuilder { var drops []*v2xdspb.ClusterLoadAssignment_Policy_DropOverload - for i, d := range dropPercents { + for n, d := range dropPercents { drops = append(drops, &v2xdspb.ClusterLoadAssignment_Policy_DropOverload{ - Category: fmt.Sprintf("test-drop-%d", i), + Category: n, DropPercentage: &v2typepb.FractionalPercent{ Numerator: d, Denominator: v2typepb.FractionalPercent_HUNDRED, From 7baecddb205cd143ba59688adbe49d4d09f26280 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 22 Jan 2021 11:57:05 -0800 Subject: [PATCH 2/3] [circuit_breaking_drop_reporting] cleanup counter --- xds/internal/balancer/edsbalancer/eds_impl_test.go | 1 + xds/internal/client/client_requests_counter.go | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 9db07cf1fa72..b4349a021dca 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -768,6 +768,7 @@ func (s) TestEDS_LoadReport(t *testing.T) { ) var maxRequestsTemp uint32 = cbMaxRequests client.SetMaxRequests(testServiceName, &maxRequestsTemp) + defer client.ClearCounterForTesting(testServiceName) edsb.updateServiceRequestsCounter(testServiceName) backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID) diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index 1e28fc003ff3..74b80f1c3f7c 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -87,3 +87,16 @@ func (c *ServiceRequestsCounter) StartRequest() error { func (c *ServiceRequestsCounter) EndRequest() { atomic.AddUint32(&c.numRequests, ^uint32(0)) } + +// ClearCounterForTesting clears the counter for the service. Should be only +// used in tests. +func ClearCounterForTesting(serviceName string) { + src.mu.Lock() + defer src.mu.Unlock() + c, ok := src.services[serviceName] + if !ok { + return + } + c.maxRequests = defaultMaxRequests + c.numRequests = 0 +} From f36be0eef7c1fc96a1ae7bc58d033e36cf34107a Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 27 Jan 2021 09:52:33 -0800 Subject: [PATCH 3/3] [circuit_breaking_drop_reporting] keystr --- xds/internal/client/load/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/client/load/store.go b/xds/internal/client/load/store.go index 3bbe2513fe45..551a5147b6bd 100644 --- a/xds/internal/client/load/store.go +++ b/xds/internal/client/load/store.go @@ -287,7 +287,7 @@ func (ls *perClusterStore) stats() *Data { if keyStr != "" { // Skip drops without category. They are counted in total_drops, but // not in per category. One example is drops by circuit breaking. - sd.Drops[key.(string)] = d + sd.Drops[keyStr] = d } return true })