Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: report drops by circuit breaking #4171

Merged
merged 3 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,11 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
if d.counter != nil {
if err := d.counter.StartRequest(); err != nil {
// Drops by circuit breaking are reported with empty category. They
// will be reported only in total drops, but not in per category.
if d.loadStore != nil {
d.loadStore.CallDropped("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we report these with an empty category instead of using a category which makes it clear that these calls were dropped because of circuit-breaking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is defined in the design doc.
And I believe the reason behind is that envoy doesn't report those with category.

}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
}
pr, err := d.p.Pick(info)
Expand Down
48 changes: 42 additions & 6 deletions xds/internal/balancer/edsbalancer/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}

// The same locality, different drop rate, dropping 50%.
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50})
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab5.Build()))

Expand Down Expand Up @@ -746,6 +746,10 @@ func (s) TestDropPicker(t *testing.T) {
}

func (s) TestEDS_LoadReport(t *testing.T) {
origCircuitBreakingSupport := env.CircuitBreakingSupport
env.CircuitBreakingSupport = true
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()

// We create an xdsClientWrapper with a dummy xdsClientInterface which only
// implements the LoadStore() method to return the underlying load.Store to
// be used.
Expand All @@ -758,10 +762,20 @@ func (s) TestEDS_LoadReport(t *testing.T) {
edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

const (
testServiceName = "test-service"
cbMaxRequests = 20
)
var maxRequestsTemp uint32 = cbMaxRequests
client.SetMaxRequests(testServiceName, &maxRequestsTemp)
defer client.ClearCounterForTesting(testServiceName)
edsb.updateServiceRequestsCounter(testServiceName)

backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID)

const testDropCategory = "test-drop"
// Two localities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{testDropCategory: 50})
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
Expand All @@ -788,20 +802,42 @@ func (s) TestEDS_LoadReport(t *testing.T) {
// the picks on sc1 should show up as inProgress.
locality1JSON, _ := locality1.ToString()
locality2JSON, _ := locality2.ToString()
const (
rpcCount = 100
// 50% will be dropped with category testDropCategory.
dropWithCategory = rpcCount / 2
// In the remaining RPCs, only cbMaxRequests are allowed by circuit
// breaking. Others will be dropped by CB.
dropWithCB = rpcCount - dropWithCategory - cbMaxRequests

rpcInProgress = cbMaxRequests / 2 // 50% of RPCs will be never done.
rpcSucceeded = cbMaxRequests / 2 // 50% of RPCs will succeed.
)
wantStoreData := []*load.Data{{
Cluster: testClusterNames[0],
Service: "",
LocalityStats: map[string]load.LocalityData{
locality1JSON: {RequestStats: load.RequestData{InProgress: 5}},
locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}},
locality1JSON: {RequestStats: load.RequestData{InProgress: rpcInProgress}},
locality2JSON: {RequestStats: load.RequestData{Succeeded: rpcSucceeded}},
},
TotalDrops: dropWithCategory + dropWithCB,
Drops: map[string]uint64{
testDropCategory: dropWithCategory,
},
}}
for i := 0; i < 10; i++ {

var rpcsToBeDone []balancer.PickResult
// Run the picks, but only pick with sc1 will be done later.
for i := 0; i < rpcCount; i++ {
scst, _ := p1.Pick(balancer.PickInfo{})
if scst.Done != nil && scst.SubConn != sc1 {
scst.Done(balancer.DoneInfo{})
rpcsToBeDone = append(rpcsToBeDone, scst)
}
}
// Call done on those sc1 picks.
for _, scst := range rpcsToBeDone {
scst.Done(balancer.DoneInfo{})
}

gotStoreData := loadStore.Stats(testClusterNames[0:1])
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {
Expand Down
13 changes: 13 additions & 0 deletions xds/internal/client/client_requests_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,16 @@ func (c *ServiceRequestsCounter) StartRequest() error {
func (c *ServiceRequestsCounter) EndRequest() {
atomic.AddUint32(&c.numRequests, ^uint32(0))
}

// ClearCounterForTesting clears the counter for the service. Should be only
// used in tests.
func ClearCounterForTesting(serviceName string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return an error if serviceName is not found in the map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't matter.
Or are you suggesting that we should fail the test if this cleanup can't find the counter?

src.mu.Lock()
defer src.mu.Unlock()
c, ok := src.services[serviceName]
if !ok {
return
}
c.maxRequests = defaultMaxRequests
c.numRequests = 0
}
7 changes: 6 additions & 1 deletion xds/internal/client/load/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,12 @@ func (ls *perClusterStore) stats() *Data {
return true
}
sd.TotalDrops += d
sd.Drops[key.(string)] = d
keyStr := key.(string)
if keyStr != "" {
// Skip drops without category. They are counted in total_drops, but
// not in per category. One example is drops by circuit breaking.
sd.Drops[key.(string)] = d
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyStr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
return true
})
ls.localityRPCCount.Range(func(key, val interface{}) bool {
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/client/load/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ func TestDrops(t *testing.T) {
drops = map[string]int{
dropCategories[0]: 30,
dropCategories[1]: 40,
"": 10,
}
wantStoreData = &Data{
TotalDrops: 70,
TotalDrops: 80,
Drops: map[string]uint64{
dropCategories[0]: 30,
dropCategories[1]: 40,
Expand Down
7 changes: 3 additions & 4 deletions xds/internal/testutils/protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package testutils

import (
"fmt"
"net"
"strconv"

Expand Down Expand Up @@ -59,11 +58,11 @@ type ClusterLoadAssignmentBuilder struct {
}

// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder.
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder {
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents map[string]uint32) *ClusterLoadAssignmentBuilder {
var drops []*v2xdspb.ClusterLoadAssignment_Policy_DropOverload
for i, d := range dropPercents {
for n, d := range dropPercents {
drops = append(drops, &v2xdspb.ClusterLoadAssignment_Policy_DropOverload{
Category: fmt.Sprintf("test-drop-%d", i),
Category: n,
DropPercentage: &v2typepb.FractionalPercent{
Numerator: d,
Denominator: v2typepb.FractionalPercent_HUNDRED,
Expand Down