From 623d4c9d7e60fb74d421ba3fbd4398b5b50bbe26 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 17 Sep 2024 15:15:08 +0200 Subject: [PATCH] grpcclient: Support custom gRPC compressors Signed-off-by: Arve Knudsen --- CHANGELOG.md | 1 + grpcclient/grpcclient.go | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b11b6de2..3f9664645 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -224,6 +224,7 @@ * [ENHANCEMENT] Added new ring methods to expose number of writable instances with tokens per zone, and overall. #560 #562 * [ENHANCEMENT] `services.FailureWatcher` can now be closed, which unregisters all service and manager listeners, and closes channel used to receive errors. #564 * [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571 +* [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583 * [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 diff --git a/grpcclient/grpcclient.go b/grpcclient/grpcclient.go index 751899047..228ef6959 100644 --- a/grpcclient/grpcclient.go +++ b/grpcclient/grpcclient.go @@ -2,6 +2,8 @@ package grpcclient import ( "flag" + "slices" + "strings" "time" "github.com/pkg/errors" @@ -40,6 +42,9 @@ type Config struct { Middleware []grpc.UnaryClientInterceptor `yaml:"-"` StreamMiddleware []grpc.StreamClientInterceptor `yaml:"-"` + + // CustomCompressors allows configuring custom compressors. + CustomCompressors []string `yaml:"-"` } // RegisterFlags registers flags. @@ -55,9 +60,17 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.InitialStreamWindowSize = defaultInitialWindowSize cfg.InitialConnectionWindowSize = defaultInitialWindowSize + var supportedCompressors strings.Builder + supportedCompressors.WriteString("Use compression when sending messages. Supported values are: 'gzip', 'snappy'") + for _, cmp := range cfg.CustomCompressors { + supportedCompressors.WriteString(", ") + supportedCompressors.WriteString(cmp) + } + supportedCompressors.WriteString(", and '' (disable compression)") + f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 100<<20, "gRPC client max send message size (bytes).") - f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") + f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", supportedCompressors.String()) f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit rate limits.") @@ -74,11 +87,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } func (cfg *Config) Validate() error { - switch cfg.GRPCCompression { - case gzip.Name, snappy.Name, "": - // valid - default: - return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) + supportedCompressors := []string{gzip.Name, snappy.Name} + supportedCompressors = append(supportedCompressors, cfg.CustomCompressors...) + if !slices.Contains(supportedCompressors, cfg.GRPCCompression) { + return errors.Errorf("unsupported compression type: %q", cfg.GRPCCompression) } return nil }