Skip to content

Commit

Permalink
grpcclient: Support custom gRPC compressors
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Sep 17, 2024
1 parent 47b1b63 commit 623d4c9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
* [ENHANCEMENT] Added new ring methods to expose number of writable instances with tokens per zone, and overall. #560 #562
* [ENHANCEMENT] `services.FailureWatcher` can now be closed, which unregisters all service and manager listeners, and closes channel used to receive errors. #564
* [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571
* [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
Expand Down
24 changes: 18 additions & 6 deletions grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package grpcclient

import (
"flag"
"slices"
"strings"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -40,6 +42,9 @@ type Config struct {

Middleware []grpc.UnaryClientInterceptor `yaml:"-"`
StreamMiddleware []grpc.StreamClientInterceptor `yaml:"-"`

// CustomCompressors allows configuring custom compressors.
CustomCompressors []string `yaml:"-"`
}

// RegisterFlags registers flags.
Expand All @@ -55,9 +60,17 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.InitialStreamWindowSize = defaultInitialWindowSize
cfg.InitialConnectionWindowSize = defaultInitialWindowSize

var supportedCompressors strings.Builder
supportedCompressors.WriteString("Use compression when sending messages. Supported values are: 'gzip', 'snappy'")
for _, cmp := range cfg.CustomCompressors {
supportedCompressors.WriteString(", ")
supportedCompressors.WriteString(cmp)
}
supportedCompressors.WriteString(", and '' (disable compression)")

f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 100<<20, "gRPC client max send message size (bytes).")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", supportedCompressors.String())
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit rate limits.")
Expand All @@ -74,11 +87,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
}

func (cfg *Config) Validate() error {
switch cfg.GRPCCompression {
case gzip.Name, snappy.Name, "":
// valid
default:
return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression)
supportedCompressors := []string{gzip.Name, snappy.Name}
supportedCompressors = append(supportedCompressors, cfg.CustomCompressors...)
if !slices.Contains(supportedCompressors, cfg.GRPCCompression) {
return errors.Errorf("unsupported compression type: %q", cfg.GRPCCompression)
}
return nil
}
Expand Down

0 comments on commit 623d4c9

Please sign in to comment.