Skip to content

Commit 2bf22b3

Browse files
authored
Merge branch 'grpc:master' into master
2 parents 786724d + c63aeef commit 2bf22b3

14 files changed

+226
-235
lines changed

Diff for: balancer/pickfirst/pickfirst_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ func Test(t *testing.T) {
4848
grpctest.RunSubTests(t, s{})
4949
}
5050

51-
// TestPickFirstLeaf_InitialResolverError sends a resolver error to the balancer
51+
// TestPickFirst_InitialResolverError sends a resolver error to the balancer
5252
// before a valid resolver update. It verifies that the clientconn state is
5353
// updated to TRANSIENT_FAILURE.
54-
func (s) TestPickFirstLeaf_InitialResolverError(t *testing.T) {
54+
func (s) TestPickFirst_InitialResolverError(t *testing.T) {
5555
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5656
defer cancel()
5757
cc := testutils.NewBalancerClientConn(t)
58-
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
58+
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
5959
defer bal.Close()
6060
bal.ResolverError(errors.New("resolution failed: test error"))
6161

@@ -81,14 +81,14 @@ func (s) TestPickFirstLeaf_InitialResolverError(t *testing.T) {
8181
}
8282
}
8383

84-
// TestPickFirstLeaf_ResolverErrorinTF sends a resolver error to the balancer
84+
// TestPickFirst_ResolverErrorinTF sends a resolver error to the balancer
8585
// before when it's attempting to connect to a SubConn TRANSIENT_FAILURE. It
8686
// verifies that the picker is updated and the SubConn is not closed.
87-
func (s) TestPickFirstLeaf_ResolverErrorinTF(t *testing.T) {
87+
func (s) TestPickFirst_ResolverErrorinTF(t *testing.T) {
8888
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
8989
defer cancel()
9090
cc := testutils.NewBalancerClientConn(t)
91-
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
91+
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
9292
defer bal.Close()
9393

9494
// After sending a valid update, the LB policy should report CONNECTING.

Diff for: internal/transport/client_stream.go

+35-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package transport
2121
import (
2222
"sync/atomic"
2323

24+
"golang.org/x/net/http2"
25+
"google.golang.org/grpc/mem"
2426
"google.golang.org/grpc/metadata"
2527
"google.golang.org/grpc/status"
2628
)
@@ -29,7 +31,7 @@ import (
2931
type ClientStream struct {
3032
*Stream // Embed for common stream functionality.
3133

32-
ct ClientTransport
34+
ct *http2Client
3335
done chan struct{} // closed at the end of stream to unblock writers.
3436
doneFunc func() // invoked at the end of stream.
3537

@@ -42,29 +44,56 @@ type ClientStream struct {
4244
header metadata.MD // the received header metadata
4345
noHeaders bool // set if the client never received headers (set only after the stream is done).
4446

45-
bytesReceived uint32 // indicates whether any bytes have been received on this stream
46-
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
47+
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
48+
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
4749

4850
status *status.Status // the status error received from the server
4951
}
5052

53+
// Read reads an n byte message from the input stream.
54+
func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
55+
b, err := s.Stream.read(n)
56+
if err == nil {
57+
s.ct.incrMsgRecv()
58+
}
59+
return b, err
60+
}
61+
62+
// Close closes the stream and popagates err to any readers.
63+
func (s *ClientStream) Close(err error) {
64+
var (
65+
rst bool
66+
rstCode http2.ErrCode
67+
)
68+
if err != nil {
69+
rst = true
70+
rstCode = http2.ErrCodeCancel
71+
}
72+
s.ct.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
73+
}
74+
75+
// Write writes the hdr and data bytes to the output stream.
76+
func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
77+
return s.ct.write(s, hdr, data, opts)
78+
}
79+
5180
// BytesReceived indicates whether any bytes have been received on this stream.
5281
func (s *ClientStream) BytesReceived() bool {
53-
return atomic.LoadUint32(&s.bytesReceived) == 1
82+
return s.bytesReceived.Load()
5483
}
5584

5685
// Unprocessed indicates whether the server did not process this stream --
5786
// i.e. it sent a refused stream or GOAWAY including this stream ID.
5887
func (s *ClientStream) Unprocessed() bool {
59-
return atomic.LoadUint32(&s.unprocessed) == 1
88+
return s.unprocessed.Load()
6089
}
6190

6291
func (s *ClientStream) waitOnHeader() {
6392
select {
6493
case <-s.ctx.Done():
6594
// Close the stream to prevent headers/trailers from changing after
6695
// this function returns.
67-
s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
96+
s.Close(ContextErr(s.ctx.Err()))
6897
// headerChan could possibly not be closed yet if closeStream raced
6998
// with operateHeaders; wait until it is closed explicitly here.
7099
<-s.headerChan

Diff for: internal/transport/handler_server.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func (ht *serverHandlerTransport) do(fn func()) error {
225225
}
226226
}
227227

228-
func (ht *serverHandlerTransport) WriteStatus(s *ServerStream, st *status.Status) error {
228+
func (ht *serverHandlerTransport) writeStatus(s *ServerStream, st *status.Status) error {
229229
ht.writeStatusMu.Lock()
230230
defer ht.writeStatusMu.Unlock()
231231

@@ -333,7 +333,7 @@ func (ht *serverHandlerTransport) writeCustomHeaders(s *ServerStream) {
333333
s.hdrMu.Unlock()
334334
}
335335

336-
func (ht *serverHandlerTransport) Write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *Options) error {
336+
func (ht *serverHandlerTransport) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
337337
// Always take a reference because otherwise there is no guarantee the data will
338338
// be available after this function returns. This is what callers to Write
339339
// expect.
@@ -357,7 +357,7 @@ func (ht *serverHandlerTransport) Write(s *ServerStream, hdr []byte, data mem.Bu
357357
return nil
358358
}
359359

360-
func (ht *serverHandlerTransport) WriteHeader(s *ServerStream, md metadata.MD) error {
360+
func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) error {
361361
if err := s.SetHeader(md); err != nil {
362362
return err
363363
}
@@ -473,9 +473,7 @@ func (ht *serverHandlerTransport) runStream() {
473473
}
474474
}
475475

476-
func (ht *serverHandlerTransport) IncrMsgSent() {}
477-
478-
func (ht *serverHandlerTransport) IncrMsgRecv() {}
476+
func (ht *serverHandlerTransport) incrMsgRecv() {}
479477

480478
func (ht *serverHandlerTransport) Drain(string) {
481479
panic("Drain() is not implemented")

Diff for: internal/transport/handler_server_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (s) TestHandlerTransport_HandleStreams(t *testing.T) {
310310
}
311311

312312
st.bodyw.Close() // no body
313-
st.ht.WriteStatus(s, status.New(codes.OK, ""))
313+
s.WriteStatus(status.New(codes.OK, ""))
314314
}
315315
st.ht.HandleStreams(
316316
context.Background(), func(s *ServerStream) { go handleStream(s) },
@@ -343,7 +343,7 @@ func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string)
343343
st := newHandleStreamTest(t)
344344

345345
handleStream := func(s *ServerStream) {
346-
st.ht.WriteStatus(s, status.New(statusCode, msg))
346+
s.WriteStatus(status.New(statusCode, msg))
347347
}
348348
st.ht.HandleStreams(
349349
context.Background(), func(s *ServerStream) { go handleStream(s) },
@@ -392,7 +392,7 @@ func (s) TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
392392
t.Errorf("ctx.Err = %v; want %v", err, context.DeadlineExceeded)
393393
return
394394
}
395-
ht.WriteStatus(s, status.New(codes.DeadlineExceeded, "too slow"))
395+
s.WriteStatus(status.New(codes.DeadlineExceeded, "too slow"))
396396
}
397397
ht.HandleStreams(
398398
context.Background(), func(s *ServerStream) { go runStream(s) },
@@ -423,7 +423,7 @@ func (s) TestHandlerTransport_HandleStreams_MultiWriteStatus(t *testing.T) {
423423
for i := 0; i < 5; i++ {
424424
go func() {
425425
defer wg.Done()
426-
st.ht.WriteStatus(s, status.New(codes.OK, ""))
426+
s.WriteStatus(status.New(codes.OK, ""))
427427
}()
428428
}
429429
wg.Wait()
@@ -439,8 +439,8 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
439439
}
440440
st.bodyw.Close() // no body
441441

442-
st.ht.WriteStatus(s, status.New(codes.OK, ""))
443-
st.ht.Write(s, []byte("hdr"), newBufferSlice([]byte("data")), &Options{})
442+
s.WriteStatus(status.New(codes.OK, ""))
443+
s.Write([]byte("hdr"), newBufferSlice([]byte("data")), &WriteOptions{})
444444
})
445445
}
446446

@@ -477,7 +477,7 @@ func (s) TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
477477

478478
hst := newHandleStreamTest(t)
479479
handleStream := func(s *ServerStream) {
480-
hst.ht.WriteStatus(s, st)
480+
s.WriteStatus(st)
481481
}
482482
hst.ht.HandleStreams(
483483
context.Background(), func(s *ServerStream) { go handleStream(s) },

Diff for: internal/transport/http2_client.go

+17-26
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt
508508
ctxDone: s.ctx.Done(),
509509
recv: s.buf,
510510
closeStream: func(err error) {
511-
t.CloseStream(s, err)
511+
s.Close(err)
512512
},
513513
},
514514
windowHandler: func(n int) {
@@ -759,7 +759,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
759759
return
760760
}
761761
// The stream was unprocessed by the server.
762-
atomic.StoreUint32(&s.unprocessed, 1)
762+
s.unprocessed.Store(true)
763763
s.write(recvMsg{err: err})
764764
close(s.done)
765765
// If headerChan isn't closed, then close it.
@@ -904,20 +904,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
904904
return s, nil
905905
}
906906

907-
// CloseStream clears the footprint of a stream when the stream is not needed any more.
908-
// This must not be executed in reader's goroutine.
909-
func (t *http2Client) CloseStream(s *ClientStream, err error) {
910-
var (
911-
rst bool
912-
rstCode http2.ErrCode
913-
)
914-
if err != nil {
915-
rst = true
916-
rstCode = http2.ErrCodeCancel
917-
}
918-
t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
919-
}
920-
921907
func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
922908
// Set stream status to done.
923909
if s.swapState(streamDone) == streamDone {
@@ -1081,7 +1067,7 @@ func (t *http2Client) GracefulClose() {
10811067

10821068
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
10831069
// should proceed only if Write returns nil.
1084-
func (t *http2Client) Write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *Options) error {
1070+
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
10851071
reader := data.Reader()
10861072

10871073
if opts.Last {
@@ -1110,6 +1096,7 @@ func (t *http2Client) Write(s *ClientStream, hdr []byte, data mem.BufferSlice, o
11101096
_ = reader.Close()
11111097
return err
11121098
}
1099+
t.incrMsgSent()
11131100
return nil
11141101
}
11151102

@@ -1238,7 +1225,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
12381225
}
12391226
if f.ErrCode == http2.ErrCodeRefusedStream {
12401227
// The stream was unprocessed by the server.
1241-
atomic.StoreUint32(&s.unprocessed, 1)
1228+
s.unprocessed.Store(true)
12421229
}
12431230
statusCode, ok := http2ErrConvTab[f.ErrCode]
12441231
if !ok {
@@ -1383,7 +1370,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
13831370
for streamID, stream := range t.activeStreams {
13841371
if streamID > id && streamID <= upperLimit {
13851372
// The stream was unprocessed by the server.
1386-
atomic.StoreUint32(&stream.unprocessed, 1)
1373+
stream.unprocessed.Store(true)
13871374
streamsToClose = append(streamsToClose, stream)
13881375
}
13891376
}
@@ -1435,7 +1422,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
14351422
return
14361423
}
14371424
endStream := frame.StreamEnded()
1438-
atomic.StoreUint32(&s.bytesReceived, 1)
1425+
s.bytesReceived.Store(true)
14391426
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
14401427

14411428
if !initialHeader && !endStream {
@@ -1805,14 +1792,18 @@ func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
18051792

18061793
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
18071794

1808-
func (t *http2Client) IncrMsgSent() {
1809-
t.channelz.SocketMetrics.MessagesSent.Add(1)
1810-
t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
1795+
func (t *http2Client) incrMsgSent() {
1796+
if channelz.IsOn() {
1797+
t.channelz.SocketMetrics.MessagesSent.Add(1)
1798+
t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
1799+
}
18111800
}
18121801

1813-
func (t *http2Client) IncrMsgRecv() {
1814-
t.channelz.SocketMetrics.MessagesReceived.Add(1)
1815-
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
1802+
func (t *http2Client) incrMsgRecv() {
1803+
if channelz.IsOn() {
1804+
t.channelz.SocketMetrics.MessagesReceived.Add(1)
1805+
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
1806+
}
18161807
}
18171808

18181809
func (t *http2Client) getOutFlowWindow() int64 {

Diff for: internal/transport/http2_server.go

+15-10
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,7 @@ func (t *http2Server) streamContextErr(s *ServerStream) error {
969969
}
970970

971971
// WriteHeader sends the header metadata md back to the client.
972-
func (t *http2Server) WriteHeader(s *ServerStream, md metadata.MD) error {
972+
func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
973973
s.hdrMu.Lock()
974974
defer s.hdrMu.Unlock()
975975
if s.getState() == streamDone {
@@ -1042,7 +1042,7 @@ func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
10421042
// There is no further I/O operations being able to perform on this stream.
10431043
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
10441044
// OK is adopted.
1045-
func (t *http2Server) WriteStatus(s *ServerStream, st *status.Status) error {
1045+
func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
10461046
s.hdrMu.Lock()
10471047
defer s.hdrMu.Unlock()
10481048

@@ -1113,11 +1113,11 @@ func (t *http2Server) WriteStatus(s *ServerStream, st *status.Status) error {
11131113

11141114
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
11151115
// is returns if it fails (e.g., framing error, transport error).
1116-
func (t *http2Server) Write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *Options) error {
1116+
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
11171117
reader := data.Reader()
11181118

11191119
if !s.isHeaderSent() { // Headers haven't been written yet.
1120-
if err := t.WriteHeader(s, nil); err != nil {
1120+
if err := t.writeHeader(s, nil); err != nil {
11211121
_ = reader.Close()
11221122
return err
11231123
}
@@ -1143,6 +1143,7 @@ func (t *http2Server) Write(s *ServerStream, hdr []byte, data mem.BufferSlice, _
11431143
_ = reader.Close()
11441144
return err
11451145
}
1146+
t.incrMsgSent()
11461147
return nil
11471148
}
11481149

@@ -1411,14 +1412,18 @@ func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
14111412
}
14121413
}
14131414

1414-
func (t *http2Server) IncrMsgSent() {
1415-
t.channelz.SocketMetrics.MessagesSent.Add(1)
1416-
t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
1415+
func (t *http2Server) incrMsgSent() {
1416+
if channelz.IsOn() {
1417+
t.channelz.SocketMetrics.MessagesSent.Add(1)
1418+
t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
1419+
}
14171420
}
14181421

1419-
func (t *http2Server) IncrMsgRecv() {
1420-
t.channelz.SocketMetrics.MessagesReceived.Add(1)
1421-
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
1422+
func (t *http2Server) incrMsgRecv() {
1423+
if channelz.IsOn() {
1424+
t.channelz.SocketMetrics.MessagesReceived.Add(1)
1425+
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
1426+
}
14221427
}
14231428

14241429
func (t *http2Server) getOutFlowWindow() int64 {

0 commit comments

Comments
 (0)