Skip to content

Commit 9ab8b62

Browse files
authored
Implement new Codec that uses mem.BufferSlice instead of []byte (#7356)
1 parent 7e12068 commit 9ab8b62

39 files changed

+1842
-1065
lines changed

.github/workflows/testing.yml

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ jobs:
5555
goversion: '1.22'
5656
testflags: -race
5757

58+
- type: tests
59+
goversion: '1.22'
60+
testflags: '-race -tags=buffer_pooling'
61+
5862
- type: tests
5963
goversion: '1.22'
6064
goarch: 386

benchmark/benchmain/main.go

+30-4
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ import (
6666
"google.golang.org/grpc/benchmark/stats"
6767
"google.golang.org/grpc/credentials/insecure"
6868
"google.golang.org/grpc/encoding/gzip"
69-
"google.golang.org/grpc/experimental"
7069
"google.golang.org/grpc/grpclog"
7170
"google.golang.org/grpc/internal"
7271
"google.golang.org/grpc/internal/channelz"
7372
"google.golang.org/grpc/keepalive"
73+
"google.golang.org/grpc/mem"
7474
"google.golang.org/grpc/metadata"
7575
"google.golang.org/grpc/test/bufconn"
7676

@@ -153,6 +153,33 @@ const (
153153
warmuptime = time.Second
154154
)
155155

156+
var useNopBufferPool atomic.Bool
157+
158+
type swappableBufferPool struct {
159+
mem.BufferPool
160+
}
161+
162+
func (p swappableBufferPool) Get(length int) *[]byte {
163+
var pool mem.BufferPool
164+
if useNopBufferPool.Load() {
165+
pool = mem.NopBufferPool{}
166+
} else {
167+
pool = p.BufferPool
168+
}
169+
return pool.Get(length)
170+
}
171+
172+
func (p swappableBufferPool) Put(i *[]byte) {
173+
if useNopBufferPool.Load() {
174+
return
175+
}
176+
p.BufferPool.Put(i)
177+
}
178+
179+
func init() {
180+
internal.SetDefaultBufferPoolForTesting.(func(mem.BufferPool))(swappableBufferPool{mem.DefaultBufferPool()})
181+
}
182+
156183
var (
157184
allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
158185
allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
@@ -343,10 +370,9 @@ func makeClients(bf stats.Features) ([]testgrpc.BenchmarkServiceClient, func())
343370
}
344371
switch bf.RecvBufferPool {
345372
case recvBufferPoolNil:
346-
// Do nothing.
373+
useNopBufferPool.Store(true)
347374
case recvBufferPoolSimple:
348-
opts = append(opts, experimental.WithRecvBufferPool(grpc.NewSharedBufferPool()))
349-
sopts = append(sopts, experimental.RecvBufferPool(grpc.NewSharedBufferPool()))
375+
// Do nothing as buffering is enabled by default.
350376
default:
351377
logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool)
352378
}

codec.go

+68-7
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,79 @@ package grpc
2121
import (
2222
"google.golang.org/grpc/encoding"
2323
_ "google.golang.org/grpc/encoding/proto" // to register the Codec for "proto"
24+
"google.golang.org/grpc/mem"
2425
)
2526

26-
// baseCodec contains the functionality of both Codec and encoding.Codec, but
27-
// omits the name/string, which vary between the two and are not needed for
28-
// anything besides the registry in the encoding package.
27+
// baseCodec captures the new encoding.CodecV2 interface without the Name
28+
// function, allowing it to be implemented by older Codec and encoding.Codec
29+
// implementations. The omitted Name function is only needed for the register in
30+
// the encoding package and is not part of the core functionality.
2931
type baseCodec interface {
30-
Marshal(v any) ([]byte, error)
31-
Unmarshal(data []byte, v any) error
32+
Marshal(v any) (mem.BufferSlice, error)
33+
Unmarshal(data mem.BufferSlice, v any) error
34+
}
35+
36+
// getCodec returns an encoding.CodecV2 for the codec of the given name (if
37+
// registered). Initially checks the V2 registry with encoding.GetCodecV2 and
38+
// returns the V2 codec if it is registered. Otherwise, it checks the V1 registry
39+
// with encoding.GetCodec and if it is registered wraps it with newCodecV1Bridge
40+
// to turn it into an encoding.CodecV2. Returns nil otherwise.
41+
func getCodec(name string) encoding.CodecV2 {
42+
codecV2 := encoding.GetCodecV2(name)
43+
if codecV2 != nil {
44+
return codecV2
45+
}
46+
47+
codecV1 := encoding.GetCodec(name)
48+
if codecV1 != nil {
49+
return newCodecV1Bridge(codecV1)
50+
}
51+
52+
return nil
3253
}
3354

34-
var _ baseCodec = Codec(nil)
35-
var _ baseCodec = encoding.Codec(nil)
55+
func newCodecV0Bridge(c Codec) baseCodec {
56+
return codecV0Bridge{codec: c}
57+
}
58+
59+
func newCodecV1Bridge(c encoding.Codec) encoding.CodecV2 {
60+
return codecV1Bridge{
61+
codecV0Bridge: codecV0Bridge{codec: c},
62+
name: c.Name(),
63+
}
64+
}
65+
66+
var _ baseCodec = codecV0Bridge{}
67+
68+
type codecV0Bridge struct {
69+
codec interface {
70+
Marshal(v any) ([]byte, error)
71+
Unmarshal(data []byte, v any) error
72+
}
73+
}
74+
75+
func (c codecV0Bridge) Marshal(v any) (mem.BufferSlice, error) {
76+
data, err := c.codec.Marshal(v)
77+
if err != nil {
78+
return nil, err
79+
}
80+
return mem.BufferSlice{mem.NewBuffer(&data, nil)}, nil
81+
}
82+
83+
func (c codecV0Bridge) Unmarshal(data mem.BufferSlice, v any) (err error) {
84+
return c.codec.Unmarshal(data.Materialize(), v)
85+
}
86+
87+
var _ encoding.CodecV2 = codecV1Bridge{}
88+
89+
type codecV1Bridge struct {
90+
codecV0Bridge
91+
name string
92+
}
93+
94+
func (c codecV1Bridge) Name() string {
95+
return c.name
96+
}
3697

3798
// Codec defines the interface gRPC uses to encode and decode messages.
3899
// Note that implementations of this interface must be thread safe;

dialoptions.go

+5-22
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"google.golang.org/grpc/internal/binarylog"
3434
"google.golang.org/grpc/internal/transport"
3535
"google.golang.org/grpc/keepalive"
36+
"google.golang.org/grpc/mem"
3637
"google.golang.org/grpc/resolver"
3738
"google.golang.org/grpc/stats"
3839
)
@@ -60,7 +61,7 @@ func init() {
6061
internal.WithBinaryLogger = withBinaryLogger
6162
internal.JoinDialOptions = newJoinDialOption
6263
internal.DisableGlobalDialOptions = newDisableGlobalDialOptions
63-
internal.WithRecvBufferPool = withRecvBufferPool
64+
internal.WithBufferPool = withBufferPool
6465
}
6566

6667
// dialOptions configure a Dial call. dialOptions are set by the DialOption
@@ -92,7 +93,6 @@ type dialOptions struct {
9293
defaultServiceConfigRawJSON *string
9394
resolvers []resolver.Builder
9495
idleTimeout time.Duration
95-
recvBufferPool SharedBufferPool
9696
defaultScheme string
9797
maxCallAttempts int
9898
}
@@ -679,11 +679,11 @@ func defaultDialOptions() dialOptions {
679679
WriteBufferSize: defaultWriteBufSize,
680680
UseProxy: true,
681681
UserAgent: grpcUA,
682+
BufferPool: mem.DefaultBufferPool(),
682683
},
683684
bs: internalbackoff.DefaultExponential,
684685
healthCheckFunc: internal.HealthCheckFunc,
685686
idleTimeout: 30 * time.Minute,
686-
recvBufferPool: nopBufferPool{},
687687
defaultScheme: "dns",
688688
maxCallAttempts: defaultMaxCallAttempts,
689689
}
@@ -760,25 +760,8 @@ func WithMaxCallAttempts(n int) DialOption {
760760
})
761761
}
762762

763-
// WithRecvBufferPool returns a DialOption that configures the ClientConn
764-
// to use the provided shared buffer pool for parsing incoming messages. Depending
765-
// on the application's workload, this could result in reduced memory allocation.
766-
//
767-
// If you are unsure about how to implement a memory pool but want to utilize one,
768-
// begin with grpc.NewSharedBufferPool.
769-
//
770-
// Note: The shared buffer pool feature will not be active if any of the following
771-
// options are used: WithStatsHandler, EnableTracing, or binary logging. In such
772-
// cases, the shared buffer pool will be ignored.
773-
//
774-
// Deprecated: use experimental.WithRecvBufferPool instead. Will be deleted in
775-
// v1.60.0 or later.
776-
func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption {
777-
return withRecvBufferPool(bufferPool)
778-
}
779-
780-
func withRecvBufferPool(bufferPool SharedBufferPool) DialOption {
763+
func withBufferPool(bufferPool mem.BufferPool) DialOption {
781764
return newFuncDialOption(func(o *dialOptions) {
782-
o.recvBufferPool = bufferPool
765+
o.copts.BufferPool = bufferPool
783766
})
784767
}

encoding/encoding_v2.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package encoding
20+
21+
import (
22+
"strings"
23+
24+
"google.golang.org/grpc/mem"
25+
)
26+
27+
// CodecV2 defines the interface gRPC uses to encode and decode messages. Note
28+
// that implementations of this interface must be thread safe; a CodecV2's
29+
// methods can be called from concurrent goroutines.
30+
type CodecV2 interface {
31+
// Marshal returns the wire format of v. The buffers in the returned
32+
// [mem.BufferSlice] must have at least one reference each, which will be freed
33+
// by gRPC when they are no longer needed.
34+
Marshal(v any) (out mem.BufferSlice, err error)
35+
// Unmarshal parses the wire format into v. Note that data will be freed as soon
36+
// as this function returns. If the codec wishes to guarantee access to the data
37+
// after this function, it must take its own reference that it frees when it is
38+
// no longer needed.
39+
Unmarshal(data mem.BufferSlice, v any) error
40+
// Name returns the name of the Codec implementation. The returned string
41+
// will be used as part of content type in transmission. The result must be
42+
// static; the result cannot change between calls.
43+
Name() string
44+
}
45+
46+
var registeredV2Codecs = make(map[string]CodecV2)
47+
48+
// RegisterCodecV2 registers the provided CodecV2 for use with all gRPC clients and
49+
// servers.
50+
//
51+
// The CodecV2 will be stored and looked up by result of its Name() method, which
52+
// should match the content-subtype of the encoding handled by the CodecV2. This
53+
// is case-insensitive, and is stored and looked up as lowercase. If the
54+
// result of calling Name() is an empty string, RegisterCodecV2 will panic. See
55+
// Content-Type on
56+
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
57+
// more details.
58+
//
59+
// If both a Codec and CodecV2 are registered with the same name, the CodecV2
60+
// will be used.
61+
//
62+
// NOTE: this function must only be called during initialization time (i.e. in
63+
// an init() function), and is not thread-safe. If multiple Codecs are
64+
// registered with the same name, the one registered last will take effect.
65+
func RegisterCodecV2(codec CodecV2) {
66+
if codec == nil {
67+
panic("cannot register a nil CodecV2")
68+
}
69+
if codec.Name() == "" {
70+
panic("cannot register CodecV2 with empty string result for Name()")
71+
}
72+
contentSubtype := strings.ToLower(codec.Name())
73+
registeredV2Codecs[contentSubtype] = codec
74+
}
75+
76+
// GetCodecV2 gets a registered CodecV2 by content-subtype, or nil if no CodecV2 is
77+
// registered for the content-subtype.
78+
//
79+
// The content-subtype is expected to be lowercase.
80+
func GetCodecV2(contentSubtype string) CodecV2 {
81+
return registeredV2Codecs[contentSubtype]
82+
}

encoding/proto/proto_v2.go

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package proto
20+
21+
import (
22+
"fmt"
23+
24+
"google.golang.org/grpc/encoding"
25+
"google.golang.org/grpc/mem"
26+
"google.golang.org/protobuf/proto"
27+
)
28+
29+
func init() {
30+
encoding.RegisterCodecV2(&codecV2{})
31+
}
32+
33+
// codec is a CodecV2 implementation with protobuf. It is the default codec for
34+
// gRPC.
35+
type codecV2 struct{}
36+
37+
var _ encoding.CodecV2 = (*codecV2)(nil)
38+
39+
func (c *codecV2) Marshal(v any) (data mem.BufferSlice, err error) {
40+
vv := messageV2Of(v)
41+
if vv == nil {
42+
return nil, fmt.Errorf("proto: failed to marshal, message is %T, want proto.Message", v)
43+
}
44+
45+
size := proto.Size(vv)
46+
if mem.IsBelowBufferPoolingThreshold(size) {
47+
buf, err := proto.Marshal(vv)
48+
if err != nil {
49+
return nil, err
50+
}
51+
data = append(data, mem.SliceBuffer(buf))
52+
} else {
53+
pool := mem.DefaultBufferPool()
54+
buf := pool.Get(size)
55+
if _, err := (proto.MarshalOptions{}).MarshalAppend((*buf)[:0], vv); err != nil {
56+
pool.Put(buf)
57+
return nil, err
58+
}
59+
data = append(data, mem.NewBuffer(buf, pool))
60+
}
61+
62+
return data, nil
63+
}
64+
65+
func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) (err error) {
66+
vv := messageV2Of(v)
67+
if vv == nil {
68+
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
69+
}
70+
71+
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
72+
defer buf.Free()
73+
// TODO: Upgrade proto.Unmarshal to support mem.BufferSlice. Right now, it's not
74+
// really possible without a major overhaul of the proto package, but the
75+
// vtprotobuf library may be able to support this.
76+
return proto.Unmarshal(buf.ReadOnlyData(), vv)
77+
}
78+
79+
func (c *codecV2) Name() string {
80+
return Name
81+
}

0 commit comments

Comments
 (0)