Skip to content

Commit a263158

Browse files
committed
easwars review 1
1 parent aaa667d commit a263158

File tree

4 files changed

+69
-69
lines changed

4 files changed

+69
-69
lines changed

xds/internal/clients/lrsclient/load_store.go

+16-24
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
//
3434
// It is safe for concurrent use.
3535
type LoadStore struct {
36-
lrsStream *streamImpl
36+
stop func(ctx context.Context) // Function to call to Stop the LoadStore
3737

3838
// mu only protects the map (2 layers). The read/write to
3939
// *PerClusterReporter doesn't need to hold the mu.
@@ -49,31 +49,23 @@ type LoadStore struct {
4949
clusters map[string]map[string]*PerClusterReporter
5050
}
5151

52-
// newStore creates a LoadStore.
53-
func newLoadStore(lrsStream *streamImpl) *LoadStore {
52+
// newLoadStore creates a LoadStore.
53+
func newLoadStore() *LoadStore {
5454
return &LoadStore{
55-
clusters: make(map[string]map[string]*PerClusterReporter),
56-
lrsStream: lrsStream,
55+
clusters: make(map[string]map[string]*PerClusterReporter),
5756
}
5857
}
5958

6059
// Stop stops the LRS stream associated with this LoadStore.
6160
//
62-
// If this LoadStore is the only one using the underlying LRS stream, the
63-
// stream will be closed. If other LoadStores are also using the same stream,
64-
// the reference count to the stream is decremented, and the stream remains
65-
// open until all LoadStores have called Stop().
61+
// If this is the last reference to the underlying LRS stream, the
62+
// stream will be closed.
6663
//
67-
// If this is the last LoadStore for the stream, this method makes a last
68-
// attempt to flush any unreported load data to the LRS server. It will either
69-
// wait for this attempt to complete, or for the provided context to be done
70-
// before canceling the LRS stream.
64+
// The provided context should have a deadline or timeout set. If this is the
65+
// last reference, Stop will block until the context is done, allowing any
66+
// pending load reports to be flushed before closing the stream.
7167
func (ls *LoadStore) Stop(ctx context.Context) {
72-
// Wait for the provided context to be done (timeout or cancellation).
73-
if ctx != nil {
74-
<-ctx.Done()
75-
}
76-
ls.lrsStream.stop()
68+
ls.stop(ctx)
7769
}
7870

7971
// ReporterForCluster returns the PerClusterReporter for the given cluster and
@@ -111,22 +103,22 @@ func (ls *LoadStore) ReporterForCluster(clusterName, serviceName string) *PerClu
111103
// If a cluster's loadData is empty (no load to report), it's not appended to
112104
// the returned slice.
113105
func (ls *LoadStore) stats(clusterNames []string) []*loadData {
114-
var ret []*loadData
115106
ls.mu.Lock()
116107
defer ls.mu.Unlock()
117108

109+
var ret []*loadData
118110
if len(clusterNames) == 0 {
119111
for _, c := range ls.clusters {
120112
ret = appendClusterStats(ret, c)
121113
}
122114
return ret
123115
}
124-
125116
for _, n := range clusterNames {
126117
if c, ok := ls.clusters[n]; ok {
127118
ret = appendClusterStats(ret, c)
128119
}
129120
}
121+
130122
return ret
131123
}
132124

@@ -314,12 +306,12 @@ type serverLoadData struct {
314306
sum float64
315307
}
316308

317-
// appendClusterStats gets Data for the given cluster, append to ret, and return
318-
// the new slice.
309+
// appendClusterStats gets the Data for all the given clusters, append to ret,
310+
// and return the new slice.
319311
//
320312
// Data is only appended to ret if it's not empty.
321-
func appendClusterStats(ret []*loadData, cluster map[string]*PerClusterReporter) []*loadData {
322-
for _, d := range cluster {
313+
func appendClusterStats(ret []*loadData, clusters map[string]*PerClusterReporter) []*loadData {
314+
for _, d := range clusters {
323315
data := d.stats()
324316
if data == nil {
325317
// Skip this data if it doesn't contain any information.

xds/internal/clients/lrsclient/load_store_test.go

+24-19
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ type rpcData struct {
3939
serverData map[string]float64 // Will be reported with successful RPCs.
4040
}
4141

42+
func verifyLoadStoreData(wantStoreData, gotStoreData []*loadData) error {
43+
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval"), sortDataSlice); diff != "" {
44+
return fmt.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
45+
}
46+
return nil
47+
}
48+
4249
// TestDrops spawns a bunch of goroutines which report drop data. After the
4350
// goroutines have exited, the test dumps the stats from the Store and makes
4451
// sure they are as expected.
@@ -72,8 +79,8 @@ func TestDrops(t *testing.T) {
7279
wg.Wait()
7380

7481
gotStoreData := ls.stats()
75-
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval")); diff != "" {
76-
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
82+
if err := verifyLoadStoreData([]*loadData{wantStoreData}, []*loadData{gotStoreData}); err != nil {
83+
t.Error(err)
7784
}
7885
}
7986

@@ -165,8 +172,8 @@ func TestLocalityStats(t *testing.T) {
165172
}
166173

167174
gotStoreData := ls.stats()
168-
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval")); diff != "" {
169-
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
175+
if err := verifyLoadStoreData([]*loadData{wantStoreData}, []*loadData{gotStoreData}); err != nil {
176+
t.Error(err)
170177
}
171178
}
172179

@@ -260,8 +267,8 @@ func TestResetAfterStats(t *testing.T) {
260267
ls := PerClusterReporter{}
261268
reportLoad(&ls)
262269
gotStoreData := ls.stats()
263-
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval")); diff != "" {
264-
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
270+
if err := verifyLoadStoreData([]*loadData{wantStoreData}, []*loadData{gotStoreData}); err != nil {
271+
t.Error(err)
265272
}
266273

267274
// The above call to stats() should have reset all load reports except the
@@ -274,8 +281,8 @@ func TestResetAfterStats(t *testing.T) {
274281
}
275282
reportLoad(&ls)
276283
gotStoreData = ls.stats()
277-
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval")); diff != "" {
278-
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
284+
if err := verifyLoadStoreData([]*loadData{wantStoreData}, []*loadData{gotStoreData}); err != nil {
285+
t.Error(err)
279286
}
280287
}
281288

@@ -304,7 +311,7 @@ func TestStoreStats(t *testing.T) {
304311
testLocality = "test-locality"
305312
)
306313

307-
store := newLoadStore(nil)
314+
store := newLoadStore()
308315
for _, c := range testClusters {
309316
for _, s := range testServices {
310317
store.ReporterForCluster(c, s).CallStarted(testLocality)
@@ -339,9 +346,7 @@ func TestStoreStats(t *testing.T) {
339346
// Call Stats with just "c0", this should return data for "c0", and not
340347
// touch data for other clusters.
341348
gotC0 := store.stats([]string{"c0"})
342-
if diff := cmp.Diff(wantC0, gotC0, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval"), sortDataSlice); diff != "" {
343-
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
344-
}
349+
verifyLoadStoreData(wantC0, gotC0)
345350

346351
wantOther := []*loadData{
347352
{
@@ -388,8 +393,8 @@ func TestStoreStats(t *testing.T) {
388393
// Call Stats with empty slice, this should return data for all the
389394
// remaining clusters, and not include c0 (because c0 data was cleared).
390395
gotOther := store.stats(nil)
391-
if diff := cmp.Diff(wantOther, gotOther, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval"), sortDataSlice); diff != "" {
392-
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
396+
if err := verifyLoadStoreData(wantOther, gotOther); err != nil {
397+
t.Error(err)
393398
}
394399
}
395400

@@ -401,7 +406,7 @@ func TestStoreStatsEmptyDataNotReported(t *testing.T) {
401406
testLocality = "test-locality"
402407
)
403408

404-
store := newLoadStore(nil)
409+
store := newLoadStore()
405410
// "c0"'s RPCs all finish with success.
406411
for _, s := range testServices {
407412
store.ReporterForCluster("c0", s).CallStarted(testLocality)
@@ -441,8 +446,8 @@ func TestStoreStatsEmptyDataNotReported(t *testing.T) {
441446
// Call Stats with empty slice, this should return data for all the
442447
// clusters.
443448
got0 := store.stats(nil)
444-
if diff := cmp.Diff(want0, got0, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval"), sortDataSlice); diff != "" {
445-
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
449+
if err := verifyLoadStoreData(want0, got0); err != nil {
450+
t.Error(err)
446451
}
447452

448453
want1 := []*loadData{
@@ -462,7 +467,7 @@ func TestStoreStatsEmptyDataNotReported(t *testing.T) {
462467
// Call Stats with empty slice again, this should return data only for "c1",
463468
// because "c0" data was cleared, but "c1" has in-progress RPCs.
464469
got1 := store.stats(nil)
465-
if diff := cmp.Diff(want1, got1, cmpopts.EquateEmpty(), cmp.AllowUnexported(loadData{}, localityData{}, requestData{}, serverLoadData{}), cmpopts.IgnoreFields(loadData{}, "reportInterval"), sortDataSlice); diff != "" {
466-
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
470+
if err := verifyLoadStoreData(want1, got1); err != nil {
471+
t.Error(err)
467472
}
468473
}

xds/internal/clients/lrsclient/lrs_stream.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ type streamImpl struct {
5959

6060
cancelStream context.CancelFunc // Cancel the stream. If nil, the stream is not active.
6161
loadStore *LoadStore // LoadStore returned to user for pushing loads.
62-
cleanup func() // Function to call after the stream is stopped.
6362
}
6463

6564
// streamOpts holds the options for creating an lrsStream.
@@ -75,14 +74,20 @@ type streamOpts struct {
7574
// The actual streaming RPC call is initiated when the first call to ReportLoad
7675
// is made, and is terminated when the last call to ReportLoad is canceled.
7776
func newStreamImpl(opts streamOpts) *streamImpl {
77+
ctx, cancel := context.WithCancel(context.Background())
78+
7879
lrs := &streamImpl{
79-
transport: opts.transport,
80-
backoff: opts.backoff,
81-
nodeProto: opts.nodeProto,
80+
transport: opts.transport,
81+
backoff: opts.backoff,
82+
nodeProto: opts.nodeProto,
83+
cancelStream: cancel,
84+
doneCh: make(chan struct{}),
8285
}
8386

8487
l := grpclog.Component("xds")
8588
lrs.logger = igrpclog.NewPrefixLogger(l, opts.logPrefix+fmt.Sprintf("[lrs-stream %p] ", lrs))
89+
lrs.loadStore = newLoadStore()
90+
go lrs.runner(ctx)
8691
return lrs
8792
}
8893

@@ -183,7 +188,9 @@ func (lrs *streamImpl) recvFirstLoadStatsResponse(stream clients.Stream) ([]stri
183188
}
184189
var resp v3lrspb.LoadStatsResponse
185190
if err := proto.Unmarshal(r, &resp); err != nil {
186-
lrs.logger.Infof("Failed to unmarshal response to LoadStatsResponse: %v", err)
191+
if lrs.logger.V(2) {
192+
lrs.logger.Infof("Failed to unmarshal response to LoadStatsResponse: %v", err)
193+
}
187194
return nil, time.Duration(0), fmt.Errorf("lrs: unexpected message type %T", r)
188195
}
189196
if lrs.logger.V(perRPCVerbosityLevel) {
@@ -260,7 +267,9 @@ func (lrs *streamImpl) sendLoadStatsRequest(stream clients.Stream, loads []*load
260267
}
261268
msg, err := proto.Marshal(req)
262269
if err != nil {
263-
lrs.logger.Warningf("Failed to marshal LoadStatsRequest: %v", err)
270+
if lrs.logger.V(2) {
271+
lrs.logger.Infof("Failed to marshal LoadStatsRequest: %v", err)
272+
}
264273
return err
265274
}
266275
err = stream.Send(msg)
@@ -278,11 +287,6 @@ func getStreamError(stream clients.Stream) error {
278287
}
279288
}
280289

281-
// stop calls the registered cleanup function by LRS client to stop the stream.
282-
func (lrs *streamImpl) stop() {
283-
lrs.cleanup()
284-
}
285-
286290
// localityFromString converts a json representation of locality, into a
287291
// clients.Locality struct.
288292
func localityFromString(s string) (ret clients.Locality, _ error) {

xds/internal/clients/lrsclient/lrsclient.go

+14-15
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type LRSClient struct {
6363
func New(config Config) (*LRSClient, error) {
6464
switch {
6565
case config.Node.ID == "":
66-
return nil, errors.New("lrsclient: node ID is empty")
66+
return nil, errors.New("lrsclient: node ID in node is empty")
6767
case config.TransportBuilder == nil:
6868
return nil, errors.New("lrsclient: transport builder is nil")
6969
}
@@ -81,6 +81,9 @@ func New(config Config) (*LRSClient, error) {
8181

8282
// ReportLoad creates and returns a LoadStore for the caller to report loads
8383
// using a LoadReportingStream.
84+
//
85+
// Caller must call Stop on the returned LoadStore when they are done reporting
86+
// load to this server.
8487
func (c *LRSClient) ReportLoad(si clients.ServerIdentifier) (*LoadStore, error) {
8588
lrs, err := c.getOrCreateLRSStream(si)
8689
if err != nil {
@@ -133,17 +136,13 @@ func (c *LRSClient) getOrCreateLRSStream(serverIdentifier clients.ServerIdentifi
133136
nodeProto: nodeProto,
134137
logPrefix: logPrefix,
135138
})
136-
ctx, cancel := context.WithCancel(context.Background())
137-
lrs.cancelStream = cancel
138-
lrs.doneCh = make(chan struct{})
139-
lrs.loadStore = newLoadStore(lrs)
140-
go lrs.runner(ctx)
141139

142-
// Register a cleanup function that decrements the reference count, stops
140+
// Register a stop function that decrements the reference count, stops
143141
// the LRS stream when the last reference is removed and closes the
144142
// transport and removes the lrs stream and its references from the
145-
// respective maps.
146-
cleanup := func() {
143+
// respective maps. Before closing the stream, it waits for the provided
144+
// context to be done (timeout or cancellation).
145+
stop := func(ctx context.Context) {
147146
c.mu.Lock()
148147
defer c.mu.Unlock()
149148

@@ -156,12 +155,12 @@ func (c *LRSClient) getOrCreateLRSStream(serverIdentifier clients.ServerIdentifi
156155
return
157156
}
158157

159-
if lrs.cancelStream == nil {
160-
// It is possible that Stop() is called before the cleanup function
161-
// is called, thereby setting cancelStream to nil. Hence we need a
162-
// nil check here bofore invoking the cancel function.
163-
return
158+
// Wait for the provided context to be done (timeout or cancellation)
159+
// before closing the stream.
160+
if ctx != nil {
161+
<-ctx.Done()
164162
}
163+
165164
lrs.cancelStream()
166165
lrs.cancelStream = nil
167166
lrs.logger.Infof("Stopping LRS stream")
@@ -170,7 +169,7 @@ func (c *LRSClient) getOrCreateLRSStream(serverIdentifier clients.ServerIdentifi
170169
delete(c.lrsStreams, serverIdentifier)
171170
tr.Close()
172171
}
173-
lrs.cleanup = cleanup
172+
lrs.loadStore.stop = stop
174173

175174
c.lrsStreams[serverIdentifier] = lrs
176175
c.lrsRefs[serverIdentifier] = 1

0 commit comments

Comments
 (0)