Skip to content

Commit

Permalink
*: enable gRPC pooling (#7742)
Browse files Browse the repository at this point in the history
Use the new CodecV2 interface to enable pooling gRPC
marshaling/unmarshaling buffers. Also, add missing includes to
scripts/genproto.sh so that we could enable the `pool` flag in the next
PR.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS authored Sep 13, 2024
1 parent 7bddb60 commit ca8ab90
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
60 changes: 38 additions & 22 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/common/version"
"go.uber.org/automaxprocs/maxprocs"
_ "google.golang.org/grpc/encoding/proto"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/extkingpin"
Expand All @@ -35,9 +34,11 @@ import (
// use the original golang/protobuf package we can continue serializing
// messages from our dependencies, particularly from OTEL. Original version
// from Vitess.
"google.golang.org/protobuf/proto"

"google.golang.org/grpc/encoding"
"google.golang.org/grpc/mem"

// Guarantee that the built-in proto is called registered before this one
// so that it can be replaced.
_ "google.golang.org/grpc/encoding/proto"
)

Expand All @@ -48,41 +49,56 @@ const Name = "proto"
// but also handles non-vtproto messages that are needed
// for stuff like OpenTelemetry. Otherwise, such errors appear:
// error while marshaling: failed to marshal, message is *v1.ExportTraceServiceRequest (missing vtprotobuf helpers).
type vtprotoCodec struct{}
type vtprotoCodec struct {
fallback encoding.CodecV2
}

type vtprotoMessage interface {
MarshalVT() ([]byte, error)
UnmarshalVT([]byte) error
MarshalToSizedBufferVT(data []byte) (int, error)
SizeVT() int
}

func (vtprotoCodec) Marshal(v any) ([]byte, error) {
switch v := v.(type) {
case vtprotoMessage:
return v.MarshalVT()
case proto.Message:
return proto.Marshal(v)
default:
return nil, fmt.Errorf("failed to marshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
func (c *vtprotoCodec) Marshal(v any) (mem.BufferSlice, error) {
if m, ok := v.(vtprotoMessage); ok {
size := m.SizeVT()
if mem.IsBelowBufferPoolingThreshold(size) {
buf := make([]byte, size)
if _, err := m.MarshalToSizedBufferVT(buf); err != nil {
return nil, err
}
return mem.BufferSlice{mem.SliceBuffer(buf)}, nil
}
pool := mem.DefaultBufferPool()
buf := pool.Get(size)
if _, err := m.MarshalToSizedBufferVT((*buf)[:size]); err != nil {
pool.Put(buf)
return nil, err
}
return mem.BufferSlice{mem.NewBuffer(buf, pool)}, nil
}

return c.fallback.Marshal(v)
}

func (vtprotoCodec) Unmarshal(data []byte, v any) error {
switch v := v.(type) {
case vtprotoMessage:
return v.UnmarshalVT(data)
case proto.Message:
return proto.Unmarshal(data, v)
default:
return fmt.Errorf("failed to unmarshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
func (c *vtprotoCodec) Unmarshal(data mem.BufferSlice, v any) error {
if m, ok := v.(vtprotoMessage); ok {
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
defer buf.Free()
return m.UnmarshalVT(buf.ReadOnlyData())
}
}

return c.fallback.Unmarshal(data, v)
}
func (vtprotoCodec) Name() string {
return Name
}

func init() {
encoding.RegisterCodec(vtprotoCodec{})
encoding.RegisterCodecV2(&vtprotoCodec{
fallback: encoding.GetCodecV2("proto"),
})
}

func main() {
Expand Down
6 changes: 4 additions & 2 deletions scripts/genproto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ PROTOC_GO_INJECT_TAG_BIN=${PROTOC_GO_INJECT_TAG_BIN:-protoc-go-inject-tag}
PROTOC_GEN_GO_BIN=${PROTOC_GEN_GO_BIN:-protoc-gen-go}
PROTOC_GEN_GO_GRPC_BIN=${PROTOC_GEN_GO_GRPC_BIN:-protoc-gen-go-grpc}
PROTOC_GEN_GO_VTPROTO_BIN=${PROTOC_GEN_GO_VTPROTO_BIN:-protoc-gen-go-vtproto}
VTPROTOBUF_VERSION="$(go list -m all | grep 'github.com/planetscale/vtprotobuf' | awk '{ print $2 }')"
VTPROTOBUF_INCLUDE_PATH="$(go env GOMODCACHE)/github.com/planetscale/vtprotobuf@${VTPROTOBUF_VERSION}/include"

if ! [[ "scripts/genproto.sh" =~ $0 ]]; then
echo "must be run from repository root"
Expand All @@ -34,7 +36,7 @@ for dir in ${DIRS}; do
--plugin=protoc-gen-go-grpc=${PROTOC_GEN_GO_GRPC_BIN} \
--plugin=protoc-gen-go-vtproto=${PROTOC_GEN_GO_VTPROTO_BIN} \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative -I=. -I=${INCLUDE_PATH} \
--go-grpc_out=. --go-grpc_opt=paths=source_relative -I=. -I=${INCLUDE_PATH} -I=${VTPROTOBUF_INCLUDE_PATH} \
--go_opt=Mstore/storepb/types.proto=github.com/thanos-io/thanos/pkg/store/storepb \
--go_opt=Mrules/rulespb/rpc.proto=github.com/thanos-io/thanos/pkg/rules/rulespb \
--go-vtproto_out=. --go-vtproto_opt=features=marshal+unmarshal+size+equal,paths=source_relative \
Expand All @@ -57,7 +59,7 @@ for dir in ${CORTEX_DIRS}; do
--plugin=protoc-gen-go-grpc=${PROTOC_GEN_GO_GRPC_BIN} \
--plugin=protoc-gen-go-vtproto=${PROTOC_GEN_GO_VTPROTO_BIN} \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative -I=. -I=${INCLUDE_PATH} \
--go-grpc_out=. --go-grpc_opt=paths=source_relative -I=. -I=${INCLUDE_PATH} -I=${VTPROTOBUF_INCLUDE_PATH} \
--go_opt=Mstore/storepb/types.proto=github.com/thanos-io/thanos/pkg/store/storepb \
--go_opt=Mrules/rulespb/rpc.proto=github.com/thanos-io/thanos/pkg/rules/rulespb \
--go-vtproto_out=. --go-vtproto_opt=features=marshal+unmarshal+size+equal,paths=source_relative \
Expand Down

0 comments on commit ca8ab90

Please sign in to comment.