Skip to content

Commit ec132e6

Browse files
committed
Implementing custom grpc codec that does not free the buffer when Unmarshalling
Signed-off-by: alanprot <alanprot@gmail.com>
1 parent d914875 commit ec132e6

File tree

6 files changed

+429
-120
lines changed

6 files changed

+429
-120
lines changed

integration/grpc_server_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,20 @@ type mockGprcServer struct {
3131
ingester_client.IngesterServer
3232
}
3333

34-
func (m mockGprcServer) QueryStream(_ *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
34+
func (m mockGprcServer) QueryStream(req *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
3535
md, _ := metadata.FromIncomingContext(streamServer.Context())
3636
i, _ := strconv.Atoi(md["i"][0])
3737
return streamServer.Send(createStreamResponse(i))
3838
}
3939

4040
func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
41+
defer request.Free()
4142
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
4243
md, _ := metadata.FromIncomingContext(ctx)
4344
i, _ := strconv.Atoi(md["i"][0])
4445
expected := createRequest(i)
46+
// Need to do this so the .String method return the same value for MessageWithBufRef
47+
expected.MessageWithBufRef = request.MessageWithBufRef
4548

4649
if expected.String() != request.String() {
4750
return nil, fmt.Errorf("expected %v, got %v", expected, request)

pkg/cortexpb/codec.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package cortexpb
2+
3+
import (
4+
"fmt"
5+
6+
"google.golang.org/grpc/encoding"
7+
"google.golang.org/grpc/mem"
8+
"google.golang.org/protobuf/proto"
9+
"google.golang.org/protobuf/protoadapt"
10+
)
11+
12+
func init() {
13+
c := encoding.GetCodecV2("proto")
14+
encoding.RegisterCodecV2(&cortexCodec{c: c})
15+
}
16+
17+
type ReleasableMessage interface {
18+
RegisterBuffer(mem.Buffer)
19+
}
20+
21+
type cortexCodec struct {
22+
c encoding.CodecV2
23+
}
24+
25+
func (c cortexCodec) Name() string {
26+
return c.c.Name()
27+
}
28+
29+
func (c cortexCodec) Marshal(v any) (mem.BufferSlice, error) {
30+
return c.c.Marshal(v)
31+
}
32+
33+
// Unmarshal Copied from https://github.com/grpc/grpc-go/blob/d2e836604b36400a54fbf04af495d12b38fa1e3a/encoding/proto/proto.go#L69-L81
34+
// but without releasing the buffer
35+
func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error {
36+
vv := messageV2Of(v)
37+
if vv == nil {
38+
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
39+
}
40+
41+
// To be in the safe side, we will never automatically release the buffer used to Unmarshal the message automatically.
42+
// This should simulate the same behavior of grpc v1.65.0 and before.
43+
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
44+
45+
err := proto.Unmarshal(buf.ReadOnlyData(), vv)
46+
47+
if err != nil {
48+
buf.Free()
49+
return err
50+
}
51+
52+
// If v implements ReleasableMessage interface, we add the buff to be freed later when the request is no longer being used
53+
if fm, ok := v.(ReleasableMessage); ok {
54+
fm.RegisterBuffer(buf)
55+
}
56+
57+
return err
58+
}
59+
60+
func messageV2Of(v any) proto.Message {
61+
switch v := v.(type) {
62+
case protoadapt.MessageV1:
63+
return protoadapt.MessageV2Of(v)
64+
case protoadapt.MessageV2:
65+
return v
66+
}
67+
68+
return nil
69+
}
70+
71+
var _ ReleasableMessage = &MessageWithBufRef{}
72+
73+
type MessageWithBufRef struct {
74+
bs mem.BufferSlice
75+
}
76+
77+
func (m *MessageWithBufRef) RegisterBuffer(buffer mem.Buffer) {
78+
m.bs = append(m.bs, buffer)
79+
}
80+
81+
func (m *MessageWithBufRef) Free() {
82+
m.bs.Free()
83+
m.bs = m.bs[:0]
84+
}

0 commit comments

Comments
 (0)