Skip to content

Commit

Permalink
chore: upgrade Codec implementations and usages to Codec2
Browse files Browse the repository at this point in the history
This is breaking change, that (potentially) should lower the memory usage on our gRPC layer.

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
DmitriyMV committed Oct 10, 2024
1 parent ec3b59c commit e1b4495
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 64 deletions.
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ require (
github.com/golang/protobuf v1.5.4
github.com/hashicorp/go-multierror v1.1.1
github.com/stretchr/testify v1.9.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
29 changes: 14 additions & 15 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand All @@ -14,20 +13,20 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e h1:Elxv5MwEkCI9f5SkoL6afed6NTdxaGoAo39eANBwHL8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
62 changes: 32 additions & 30 deletions proxy/codec.go
Original file line number Diff line number Diff line change
@@ -1,77 +1,79 @@
// Copyright 2024 Siderolabs. All Rights Reserved.
// See LICENSE for licensing terms.

package proxy

import (
"errors"
"fmt"

"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/proto"
gproto "google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/mem"
)

// Codec returns a proxying [encoding.Coder] with the default protobuf codec as parent.
// Codec returns a proxying [encoding.CodecV2] with the default protobuf codec as parent.
//
// See CodecWithParent.
func Codec() encoding.Codec {
return CodecWithParent(&protoCodec{})
// See [CodecWithParent].
func Codec() encoding.CodecV2 {
if c := encoding.GetCodecV2(gproto.Name); c != nil {
return CodecWithParent(c)
}

panic(errors.New(`no codec named "proto" found`))
}

// CodecWithParent returns a proxying [encoding.Codec] with a user provided codec as parent.
// CodecWithParent returns a proxying [encoding.CodecV2] with a user provided codec as parent.
//
// This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious
// to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes.
// However, if the server handler, or the client caller are not proxy-internal functions it will fall back
// to trying to decode the message using a fallback codec.
func CodecWithParent(fallback encoding.Codec) encoding.Codec {
return &rawCodec{fallback}
func CodecWithParent(fallback encoding.CodecV2) encoding.CodecV2 {
return &rawCodec{parentCodec: fallback}
}

type rawCodec struct {
parentCodec encoding.Codec
parentCodec encoding.CodecV2
}

type frame struct {
payload []byte
}

// NewFrame constructs a frame for raw codec.
func NewFrame(payload []byte) interface{} {
func NewFrame(payload []byte) any {
return &frame{payload: payload}
}

func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
out, ok := v.(*frame)
func (c *rawCodec) Marshal(v any) (data mem.BufferSlice, err error) {
f, ok := v.(*frame)
if !ok {
return c.parentCodec.Marshal(v)
}

return out.payload, nil
if mem.IsBelowBufferPoolingThreshold(len(f.payload)) {
data = append(data, mem.SliceBuffer(f.payload))
} else {
pool := mem.DefaultBufferPool()
buf := pool.Get(len(f.payload))
data = append(data, mem.NewBuffer(buf, pool))
}

return data, nil
}

func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
func (c *rawCodec) Unmarshal(data mem.BufferSlice, v any) error {
dst, ok := v.(*frame)
if !ok {
return c.parentCodec.Unmarshal(data, v)
}

dst.payload = data
dst.payload = data.Materialize()

return nil
}

func (c *rawCodec) Name() string {
return fmt.Sprintf("proxy>%s", c.parentCodec.Name())
}

// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
type protoCodec struct{}

func (protoCodec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message)) //nolint:forcetypeassert
}

func (protoCodec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message)) //nolint:forcetypeassert
}

func (protoCodec) Name() string {
return "proto"
}
19 changes: 15 additions & 4 deletions proxy/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/mem"

"github.com/siderolabs/grpc-proxy/proxy"
)
Expand All @@ -12,14 +13,24 @@ func TestCodec_ReadYourWrites(t *testing.T) {
framePtr := proxy.NewFrame(nil)
data := []byte{0xDE, 0xAD, 0xBE, 0xEF}
codec := proxy.Codec()
require.NoError(t, codec.Unmarshal(data, framePtr), "unmarshalling must go ok")

buffer := mem.Copy(data, mem.DefaultBufferPool())
defer buffer.Free()

require.NoError(t, codec.Unmarshal(mem.BufferSlice{buffer}, framePtr), "unmarshalling must go ok")
out, err := codec.Marshal(framePtr)
require.NoError(t, err, "no marshal error")
require.Equal(t, data, out, "output and data must be the same")
require.Equal(t, data, out.Materialize(), "output and data must be the same")

out.Free()
buffer.Free()
buffer = mem.Copy([]byte{0x55}, mem.DefaultBufferPool())

// reuse
require.NoError(t, codec.Unmarshal([]byte{0x55}, framePtr), "unmarshalling must go ok")
require.NoError(t, codec.Unmarshal(mem.BufferSlice{buffer}, framePtr), "unmarshalling must go ok")
out, err = codec.Marshal(framePtr)
require.NoError(t, err, "no marshal error")
require.Equal(t, []byte{0x55}, out, "output and data must be the same")
require.Equal(t, []byte{0x55}, out.Materialize(), "output and data must be the same")

out.Free()
}
6 changes: 3 additions & 3 deletions proxy/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var director proxy.StreamDirector
// ExampleRegisterService is a simple example of registering a service with the proxy.
func ExampleRegisterService() {
// A gRPC server with the proxying codec enabled.
server := grpc.NewServer(grpc.ForceServerCodec(proxy.Codec()))
server := grpc.NewServer(grpc.ForceServerCodecV2(proxy.Codec()))

// Register a TestService with 4 of its methods explicitly.
proxy.RegisterService(server, director,
Expand All @@ -37,7 +37,7 @@ func ExampleRegisterService() {
// ExampleTransparentHandler is an example of redirecting all requests to the proxy.
func ExampleTransparentHandler() {
grpc.NewServer(
grpc.ForceServerCodec(proxy.Codec()),
grpc.ForceServerCodecV2(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)

Expand All @@ -50,7 +50,7 @@ func ExampleStreamDirector() {
simpleBackendGen := func(hostname string) (proxy.Backend, error) {
conn, err := grpc.NewClient(
hostname,
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())),
grpc.WithDefaultCallOptions(grpc.ForceCodecV2(proxy.Codec())),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions proxy/handler_one2many_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (s *ProxyOne2ManySuite) SetupSuite() {
conn, err = grpc.NewClient(
s.serverListeners[i].Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())),
grpc.WithDefaultCallOptions(grpc.ForceCodecV2(proxy.Codec())),
)
require.NoError(s.T(), err)

Expand Down Expand Up @@ -621,7 +621,7 @@ func (s *ProxyOne2ManySuite) SetupSuite() {
}

s.proxy = grpc.NewServer(
grpc.ForceServerCodec(proxy.Codec()),
grpc.ForceServerCodecV2(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
Expand Down
4 changes: 2 additions & 2 deletions proxy/handler_one2one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (s *ProxyOne2OneSuite) SetupSuite() {
s.serverClientConn, err = grpc.NewClient(
s.serverListener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())),
grpc.WithDefaultCallOptions(grpc.ForceCodecV2(proxy.Codec())),
)
require.NoError(s.T(), err, "must not error on deferred client Dial")

Expand All @@ -255,7 +255,7 @@ func (s *ProxyOne2OneSuite) SetupSuite() {
}

s.proxy = grpc.NewServer(
grpc.ForceServerCodec(proxy.Codec()),
grpc.ForceServerCodecV2(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)

Expand Down
2 changes: 1 addition & 1 deletion proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func RegisterService(server grpc.ServiceRegistrar, director StreamDirector, serv

fakeDesc := &grpc.ServiceDesc{
ServiceName: serviceName,
HandlerType: (*interface{})(nil),
HandlerType: (*any)(nil),
}

for _, m := range streamer.options.methodNames {
Expand Down
2 changes: 1 addition & 1 deletion proxy/serverstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (wrapper *ServerStreamWrapper) SetTrailer(md metadata.MD) {
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
func (wrapper *ServerStreamWrapper) SendMsg(m interface{}) error {
func (wrapper *ServerStreamWrapper) SendMsg(m any) error {
wrapper.sendMu.Lock()
defer wrapper.sendMu.Unlock()

Expand Down

0 comments on commit e1b4495

Please sign in to comment.