Skip to content

Commit

Permalink
rpc_util: add default recv buffer pool with nop functionality
Browse files Browse the repository at this point in the history
This change eliminates the need for nil checks on the shared recv buffer pool, resulting in a more simplified codebase.
  • Loading branch information
hueypark committed Apr 9, 2023
1 parent 8f33b9b commit 5155566
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
1 change: 1 addition & 0 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ func defaultDialOptions() dialOptions {
ReadBufferSize: defaultReadBufSize,
UseProxy: true,
},
sharedRecvBufferPool: nopBufferPool{},
}
}

Expand Down
18 changes: 3 additions & 15 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
if p.sharedRecvBufferPool != nil {
msg = p.sharedRecvBufferPool.Get(int(length))
} else {
msg = make([]byte, int(length))
}
msg = p.sharedRecvBufferPool.Get(int(length))
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
Expand Down Expand Up @@ -805,16 +801,8 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
}
if payInfo != nil {
if p.sharedRecvBufferPool != nil {
if len(buf) != 0 {
payInfo.uncompressedBytes = make([]byte, len(buf))
copy(payInfo.uncompressedBytes, buf)
}
} else {
payInfo.uncompressedBytes = buf
}
}
if p.sharedRecvBufferPool != nil {
payInfo.uncompressedBytes = buf
} else {
p.sharedRecvBufferPool.Put(&buf)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s) TestSimpleParsing(t *testing.T) {
{append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
} {
buf := fullReader{bytes.NewReader(test.p)}
parser := &parser{r: buf}
parser := &parser{r: buf, sharedRecvBufferPool: nopBufferPool{}}
pt, b, err := parser.recvMsg(math.MaxInt32)
if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
Expand All @@ -77,7 +77,7 @@ func (s) TestMultipleParsing(t *testing.T) {
// Set a byte stream consists of 3 messages with their headers.
p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
b := fullReader{bytes.NewReader(p)}
parser := &parser{r: b}
parser := &parser{r: b, sharedRecvBufferPool: nopBufferPool{}}

wantRecvs := []struct {
pt payloadFormat
Expand Down
1 change: 1 addition & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ var defaultServerOptions = serverOptions{
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
sharedRecvBufferPool: nopBufferPool{},
}
var globalServerOptions []ServerOption

Expand Down
12 changes: 12 additions & 0 deletions shared_buffer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,15 @@ func makeFallbackBytesPool() bufferPool {
},
}
}

// nopBufferPool is a buffer pool just makes new buffer without pooling.
type nopBufferPool struct {
}

func (nopBufferPool) Get(length int) []byte {
return make([]byte, length)
}

func (nopBufferPool) Put(*[]byte) {

}

0 comments on commit 5155566

Please sign in to comment.