Skip to content

Commit

Permalink
fix: address all gRPC deprecations
Browse files Browse the repository at this point in the history
This addresses all deprecation warnings from `grpc-go` in tests and examples
(e.g. use `NewClient` instead of `DialContext`, the context from `DialContext`
was only useful if the [`grpc.WithBlock()`] option was used,
this is also deprecated)

The only exceptions is the change in `proxy/codec.go` to implement
`encoding.Codec` instead `grpc.Codec` which is a breaking change.

The clients must now use `grpc.NewClient` with the
`grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec()))` option.
And servers must now use `grpc.NewServer` `encoding.RegisterCodec(proxy.Codec())`
`grpc.ForceServerCodec(proxy.Codec())` option.

[`grpc.WithBlock()`]: https://pkg.go.dev/google.golang.org/grpc#WithBlock

Signed-off-by: Maxime Brunet <max@brnt.mx>
Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
maxbrunet authored and DmitriyMV committed Jul 8, 2024
1 parent 02f82db commit ec3b59c
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 75 deletions.
24 changes: 9 additions & 15 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,19 @@ linters-settings:
cyclop:
# the maximal code complexity to report
max-complexity: 20
# depguard:
# Main:
# deny:
# - github.com/OpenPeeDeeP/depguard # this is just an example
# depguard:
# rules:
# Main:
# list-mode: lax
# deny:
# - pkg: github.com/OpenPeeDeeP/depguard
# desc: this is just an example

linters:
enable-all: true
disable-all: false
fast: false
disable:
- exhaustivestruct
- exhaustruct
- err113
- forbidigo
Expand All @@ -120,27 +122,19 @@ linters:
- mnd
- nestif
- nonamedreturns
- nosnakecase
- paralleltest
- tagalign
- tagliatelle
- thelper
- typecheck
- varnamelen
- wrapcheck
- depguard # Disabled because starting with golangci-lint 1.53.0 it doesn't allow denylist alone anymore
- depguard # an Allow and/or Deny package list must be configured
- testifylint # complains about our assert recorder and has a number of false positives for assert.Greater(t, thing, 1)
- protogetter # complains about us using Value field on typed spec, instead of GetValue which has a different signature
- perfsprint # complains about us using fmt.Sprintf in non-performance critical code, updating just kres took too long
# abandoned linters for which golangci shows the warning that the repo is archived by the owner
- deadcode
- golint
- ifshort
- interfacer
- maligned
- scopelint
- structcheck
- varcheck
- execinquery
# disabled as it seems to be broken - goes into imported libraries and reports issues there
- musttag
- goimports # same as gci
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ GRPC_GATEWAY_VERSION ?= 2.20.0
VTPROTOBUF_VERSION ?= 0.6.0
GOIMPORTS_VERSION ?= 0.21.0
DEEPCOPY_VERSION ?= v0.5.6
GOLANGCILINT_VERSION ?= v1.58.2
GOLANGCILINT_VERSION ?= v1.59.1
GOFUMPT_VERSION ?= v0.6.0
GO_VERSION ?= 1.22.3
GO_BUILDFLAGS ?=
Expand Down
30 changes: 19 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,39 +36,47 @@ First, define `Backend` implementation to identify specific upstream.
For one to one proxying, `SingleBackend` might be used:

```go
conn, err := grpc.NewClient(
"api-service.staging.svc.local",
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())),
)
if err != nil {
log.Fatal(err)
}

backend := &proxy.SingleBackend{
GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)

// Copy the inbound metadata explicitly.
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) // nolint: staticcheck

return outCtx, conn, err
return outCtx, conn, nil
},
}
```

Defining a `StreamDirector` that decides where (if at all) to send the request

```go
director = func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromContext(ctx)

md, ok := metadata.FromIncomingContext(ctx)

if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
return ctx, backend1, nil
return proxy.One2One, []proxy.Backend{stagingBackend}, nil
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
return ctx, backend2, nil
return proxy.One2One, []proxy.Backend{prodBackend}, nil
}
}
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")

return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
```

Expand All @@ -77,7 +85,7 @@ The server may have other handlers that will be served locally:

```go
server := grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
grpc.ForceServerCodec(proxy.Codec()),
grpc.UnknownServiceHandler(
proxy.TransparentHandler(director),
proxy.WithMode(proxy.One2One),
Expand Down
22 changes: 9 additions & 13 deletions proxy/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,29 @@ package proxy
import (
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/proto"
)

// Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
// Codec returns a proxying [encoding.Coder] with the default protobuf codec as parent.
//
// See CodecWithParent.
//
//nolint:staticcheck
func Codec() grpc.Codec { //nolint:ireturn
func Codec() encoding.Codec {
return CodecWithParent(&protoCodec{})
}

// CodecWithParent returns a proxying grpc.Codec with a user provided codec as parent.
// CodecWithParent returns a proxying [encoding.Codec] with a user provided codec as parent.
//
// This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious
// to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes.
// However, if the server handler, or the client caller are not proxy-internal functions it will fall back
// to trying to decode the message using a fallback codec.
//
//nolint:staticcheck
func CodecWithParent(fallback grpc.Codec) grpc.Codec { //nolint:ireturn
func CodecWithParent(fallback encoding.Codec) encoding.Codec {
return &rawCodec{fallback}
}

type rawCodec struct {
parentCodec grpc.Codec //nolint: staticcheck
parentCodec encoding.Codec
}

type frame struct {
Expand Down Expand Up @@ -61,8 +57,8 @@ func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
return nil
}

func (c *rawCodec) String() string {
return fmt.Sprintf("proxy>%s", c.parentCodec.String())
func (c *rawCodec) Name() string {
return fmt.Sprintf("proxy>%s", c.parentCodec.Name())
}

// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
Expand All @@ -76,6 +72,6 @@ func (protoCodec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message)) //nolint:forcetypeassert
}

func (protoCodec) String() string {
func (protoCodec) Name() string {
return "proto"
}
40 changes: 30 additions & 10 deletions proxy/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package proxy_test

import (
"context"
"log"
"strings"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

Expand All @@ -20,7 +22,7 @@ var director proxy.StreamDirector
// ExampleRegisterService is a simple example of registering a service with the proxy.
func ExampleRegisterService() {
// A gRPC server with the proxying codec enabled.
server := grpc.NewServer(grpc.CustomCodec(proxy.Codec())) //nolint: staticcheck
server := grpc.NewServer(grpc.ForceServerCodec(proxy.Codec()))

// Register a TestService with 4 of its methods explicitly.
proxy.RegisterService(server, director,
Expand All @@ -35,28 +37,46 @@ func ExampleRegisterService() {
// ExampleTransparentHandler is an example of redirecting all requests to the proxy.
func ExampleTransparentHandler() {
grpc.NewServer(
grpc.CustomCodec(proxy.Codec()), //nolint: staticcheck
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))
grpc.ForceServerCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)

// Output:
}

// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
func ExampleStreamDirector() {
simpleBackendGen := func(hostname string) proxy.Backend {
simpleBackendGen := func(hostname string) (proxy.Backend, error) {
conn, err := grpc.NewClient(
hostname,
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, err
}

return &proxy.SingleBackend{
GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)

// Copy the inbound metadata explicitly.
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
// Make sure we use DialContext so the dialing can be canceled/time out together with the context.
conn, err := grpc.DialContext(ctx, hostname, grpc.WithCodec(proxy.Codec())) //nolint: staticcheck

return outCtx, conn, err
return outCtx, conn, nil
},
}
}, nil
}

stagingBackend, err := simpleBackendGen("api-service.staging.svc.local")
if err != nil {
log.Fatal("failed to create staging backend:", err)
}

prodBackend, err := simpleBackendGen("api-service.prod.svc.local")
if err != nil {
log.Fatal("failed to create production backend:", err)
}

director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
Expand All @@ -70,9 +90,9 @@ func ExampleStreamDirector() {
if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
return proxy.One2One, []proxy.Backend{simpleBackendGen("api-service.staging.svc.local")}, nil
return proxy.One2One, []proxy.Backend{stagingBackend}, nil
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
return proxy.One2One, []proxy.Backend{simpleBackendGen("api-service.prod.svc.local")}, nil
return proxy.One2One, []proxy.Backend{prodBackend}, nil
}
}

Expand Down
36 changes: 14 additions & 22 deletions proxy/handler_one2many_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ import (
"errors"
"fmt"
"io"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -141,11 +139,7 @@ func (s *assertingMultiService) PingStreamError(pb.MultiService_PingStreamErrorS

type assertingBackend struct {
conn *grpc.ClientConn

addr string
i int

mu sync.Mutex
}

func (b *assertingBackend) String() string {
Expand All @@ -157,21 +151,11 @@ func (b *assertingBackend) GetConnection(ctx context.Context, _ string) (context
// Explicitly copy the metadata, otherwise the tests will fail.
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())

if b.addr == "fail" {
if b.conn == nil {
return ctx, nil, status.Error(codes.Unavailable, "backend connection failed")
}

b.mu.Lock()
defer b.mu.Unlock()

if b.conn != nil {
return outCtx, b.conn, nil
}

var err error
b.conn, err = grpc.DialContext(ctx, b.addr, grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) //nolint: staticcheck

return outCtx, b.conn, err
return outCtx, b.conn, nil
}

func (b *assertingBackend) AppendInfo(streaming bool, resp []byte) ([]byte, error) {
Expand Down Expand Up @@ -574,15 +558,23 @@ func (s *ProxyOne2ManySuite) SetupSuite() {
backends := make([]*assertingBackend, numUpstreams)

for i := range backends {
var conn *grpc.ClientConn
conn, err = grpc.NewClient(
s.serverListeners[i].Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())),
)
require.NoError(s.T(), err)

backends[i] = &assertingBackend{
conn: conn,
i: i,
addr: s.serverListeners[i].Addr().String(),
}
}

failingBackend := &assertingBackend{
conn: nil,
i: -1,
addr: "fail",
}

// Setup of the proxy's Director.
Expand Down Expand Up @@ -629,7 +621,7 @@ func (s *ProxyOne2ManySuite) SetupSuite() {
}

s.proxy = grpc.NewServer(
grpc.CustomCodec(proxy.Codec()), //nolint: staticcheck
grpc.ForceServerCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
Expand Down Expand Up @@ -694,5 +686,5 @@ func TestProxyOne2ManySuite(t *testing.T) {
}

func init() {
grpclog.SetLogger(log.New(os.Stderr, "grpc: ", log.LstdFlags)) //nolint: staticcheck
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
}
Loading

0 comments on commit ec3b59c

Please sign in to comment.