From 0f15b7c28fcd067176aade50310c4d2906e3997c Mon Sep 17 00:00:00 2001 From: Denys Zhdanov Date: Fri, 13 Sep 2024 11:18:27 +0200 Subject: [PATCH] gRPC: Support S2 compression Support S2 and S2 in Snappy compatible mode gRPC compression. Signed-off-by: Arve Knudsen --- CHANGELOG.md | 9 ++ cmd/mimir/config-descriptor.json | 16 +-- cmd/mimir/help-all.txt.tmpl | 16 +-- .../configuration-parameters/index.md | 4 +- go.mod | 4 +- go.sum | 8 +- pkg/alertmanager/alertmanager_client.go | 2 + pkg/frontend/v2/frontend.go | 2 + pkg/ingester/client/client.go | 2 + pkg/querier/worker/worker.go | 3 + pkg/ruler/remotequerier.go | 2 + pkg/ruler/ruler.go | 2 + pkg/scheduler/scheduler.go | 2 + pkg/util/grpcencoding/s2/s2.go | 97 +++++++++++++++++++ .../grafana/dskit/grpcclient/grpcclient.go | 26 +++-- .../grafana/dskit/httpgrpc/server/server.go | 2 +- vendor/github.com/grafana/dskit/ring/batch.go | 6 +- .../grafana/dskit/ring/lifecycler.go | 4 +- .../github.com/grafana/dskit/server/server.go | 8 +- .../grpc/experimental/experimental.go | 65 ------------- vendor/modules.txt | 5 +- 21 files changed, 175 insertions(+), 110 deletions(-) create mode 100644 pkg/util/grpcencoding/s2/s2.go delete mode 100644 vendor/google.golang.org/grpc/experimental/experimental.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 37fb48684f7..0b3ccd9bafb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,15 @@ * [FEATURE] Query frontend: added new query pruning middleware to enable pruning dead code (eg. expressions that cannot produce any results) and simplifying expressions (eg. expressions that can be evaluated immediately) in queries. #9086 * [FEATURE] Ruler: added experimental configuration, `-ruler.rule-evaluation-write-enabled`, to disable writing the result of rule evaluation to ingesters. This feature can be used for testing purposes. #9060 * [FEATURE] Ingester: added experimental configuration `ingester.ignore-ooo-exemplars`. When set to `true` out of order exemplars are no longer reported to the remote write client. #9151 +* [FEATURE] gRPC: Add flags for using respectively S2 or Snappy compatible S2 gRPC compression. #9322 + * `-alertmanager.alertmanager-client.grpc-compression` + * `-ingester.client.grpc-compression` + * `-querier.frontend-client.grpc-compression` + * `-querier.scheduler-client.grpc-compression` + * `-query-frontend.grpc-client-config.grpc-compression` + * `-query-scheduler.grpc-client-config.grpc-compression` + * `-ruler.client.grpc-compression` + * `-ruler.query-frontend.grpc-client-config.grpc-compression` * [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371 * [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378 * [ENHANCEMENT] Rules: Added per namespace max rule groups per tenant limit. The maximum number of rule groups per rule tenant for all namespaces continues to be configured by `-ruler.max-rule-groups-per-tenant`, but now, this can be superseded by the new `-ruler.max-rule-groups-per-tenant-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8425 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 9765237a1ba..2a3c7fa8ee8 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2101,7 +2101,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ingester.client.grpc-compression", @@ -4838,7 +4838,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "querier.frontend-client.grpc-compression", @@ -5100,7 +5100,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "querier.scheduler-client.grpc-compression", @@ -5491,7 +5491,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "query-frontend.grpc-client-config.grpc-compression", @@ -11455,7 +11455,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ruler.client.grpc-compression", @@ -12420,7 +12420,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ruler.query-frontend.grpc-client-config.grpc-compression", @@ -14655,7 +14655,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "alertmanager.alertmanager-client.grpc-compression", @@ -16232,7 +16232,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "query-scheduler.grpc-client-config.grpc-compression", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index a5755b29a85..7cf04c8cbd0 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -154,7 +154,7 @@ Usage of ./cmd/mimir/mimir: -alertmanager.alertmanager-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -alertmanager.alertmanager-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -alertmanager.alertmanager-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -alertmanager.alertmanager-client.grpc-max-send-msg-size int @@ -1414,7 +1414,7 @@ Usage of ./cmd/mimir/mimir: -ingester.client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ingester.client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ingester.client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ingester.client.grpc-max-send-msg-size int @@ -1868,7 +1868,7 @@ Usage of ./cmd/mimir/mimir: -querier.frontend-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -querier.frontend-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -querier.frontend-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -querier.frontend-client.grpc-max-send-msg-size int @@ -1972,7 +1972,7 @@ Usage of ./cmd/mimir/mimir: -querier.scheduler-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -querier.scheduler-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -querier.scheduler-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -querier.scheduler-client.grpc-max-send-msg-size int @@ -2050,7 +2050,7 @@ Usage of ./cmd/mimir/mimir: -query-frontend.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -query-frontend.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -query-frontend.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -query-frontend.grpc-client-config.grpc-max-send-msg-size int @@ -2256,7 +2256,7 @@ Usage of ./cmd/mimir/mimir: -query-scheduler.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -query-scheduler.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -query-scheduler.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -query-scheduler.grpc-client-config.grpc-max-send-msg-size int @@ -2628,7 +2628,7 @@ Usage of ./cmd/mimir/mimir: -ruler.client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ruler.client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ruler.client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ruler.client.grpc-max-send-msg-size int @@ -2712,7 +2712,7 @@ Usage of ./cmd/mimir/mimir: -ruler.query-frontend.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ruler.query-frontend.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ruler.query-frontend.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ruler.query-frontend.grpc-client-config.grpc-max-send-msg-size int diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 9420968be78..c2054fa8ade 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -2328,7 +2328,7 @@ alertmanager_client: [max_send_msg_size: | default = 104857600] # (advanced) Use compression when sending messages. Supported values are: - # 'gzip', 'snappy' and '' (disable compression) + # 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) # CLI flag: -alertmanager.alertmanager-client.grpc-compression [grpc_compression: | default = ""] @@ -2591,7 +2591,7 @@ The `grpc_client` block configures the gRPC client used to communicate between t [max_send_msg_size: | default = 104857600] # (advanced) Use compression when sending messages. Supported values are: -# 'gzip', 'snappy' and '' (disable compression) +# 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) # CLI flag: -.grpc-compression [grpc_compression: | default = ""] diff --git a/go.mod b/go.mod index 4df2d1201fb..e16b89bf8bf 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20240826073544-47b1b6311db3 + github.com/grafana/dskit v0.0.0-20240917154516-a871de81908f github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 @@ -199,7 +199,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20240711041743-f6c9dda6c6da // indirect github.com/google/s2a-go v0.1.8 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.3.3 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/gosimple/slug v1.1.1 // indirect github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect diff --git a/go.sum b/go.sum index b0eaf1727a6..edba430dc28 100644 --- a/go.sum +++ b/go.sum @@ -1223,8 +1223,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= -github.com/googleapis/enterprise-certificate-proxy v0.3.3 h1:QRje2j5GZimBzlbhGA2V2QlGNgL8G6e+wGo/+/2bWI0= -github.com/googleapis/enterprise-certificate-proxy v0.3.3/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= +github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw= +github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= @@ -1255,8 +1255,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20240906191856-cdc634f213ea h1:AGmVRk+9ZmzuiLJl6hzQE1vBlVz9wbEb2+J52Gui2ys= github.com/grafana/alerting v0.0.0-20240906191856-cdc634f213ea/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= -github.com/grafana/dskit v0.0.0-20240826073544-47b1b6311db3 h1:fc4ORkqFiLzuCRD2wGmMXsyDTOLkcl3QUqQwjtAlKcE= -github.com/grafana/dskit v0.0.0-20240826073544-47b1b6311db3/go.mod h1:wJbJeQ2ygiGuBKsur7BPPNe+3pSyHEDPtKa7IU3I8ZA= +github.com/grafana/dskit v0.0.0-20240917154516-a871de81908f h1:wNlcmc+dYc6HAUcDlpcwNIYZ5o46ezaEA9y19ZXUGts= +github.com/grafana/dskit v0.0.0-20240917154516-a871de81908f/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ= diff --git a/pkg/alertmanager/alertmanager_client.go b/pkg/alertmanager/alertmanager_client.go index dfb4b0d7ea5..d1976d2355e 100644 --- a/pkg/alertmanager/alertmanager_client.go +++ b/pkg/alertmanager/alertmanager_client.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) // ClientsPool is the interface used to get the client from the pool for a specified address. @@ -45,6 +46,7 @@ type ClientConfig struct { // RegisterFlagsWithPrefix registers flags with prefix. func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f) f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 2*time.Second, "Timeout for downstream alertmanagers.") } diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 81ae91baf8a..da14ddfa40a 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/util/globalerror" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/httpgrpcutil" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -76,6 +77,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.StringVar(&cfg.Addr, "query-frontend.instance-addr", "", "IP address to advertise to the querier (via scheduler) (default is auto-detected from network interfaces).") f.IntVar(&cfg.Port, "query-frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).") + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-frontend.grpc-client-config", f) } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 84b3ed00c5e..4717e13675b 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" querierapi "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) // HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient. @@ -70,6 +71,7 @@ type Config struct { // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f) } diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index da801030ba1..75e605ee83b 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/math" ) @@ -48,7 +49,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to the query-frontend to identify requests from the same querier. Defaults to hostname.") f.BoolVar(&cfg.ResponseStreamingEnabled, "querier.response-streaming-enabled", false, "Enables streaming of responses from querier to query-frontend for response types that support it (currently only `active_series` responses do).") + cfg.QueryFrontendGRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.QueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) + cfg.QuerySchedulerGRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.QuerySchedulerGRPCClientConfig.RegisterFlagsWithPrefix("querier.scheduler-client", f) } diff --git a/pkg/ruler/remotequerier.go b/pkg/ruler/remotequerier.go index e6329ce1352..89ee2947843 100644 --- a/pkg/ruler/remotequerier.go +++ b/pkg/ruler/remotequerier.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/version" ) @@ -75,6 +76,7 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) { "GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) "+ "to enable client side load balancing.") + c.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f) f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", "))) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index c527fcf67e2..01b84467058 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -40,6 +40,7 @@ import ( "github.com/grafana/mimir/pkg/ruler/rulestore" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -165,6 +166,7 @@ func (cfg *Config) Validate(limits validation.Limits) error { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { + cfg.ClientTLSConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", f) cfg.Ring.RegisterFlags(f, logger) cfg.Notifier.RegisterFlags(f) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 05692da39e5..82a588fa19a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/httpgrpcutil" "github.com/grafana/mimir/pkg/util/validation" ) @@ -106,6 +107,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) cfg.ServiceDiscovery.RegisterFlags(f, logger) } diff --git a/pkg/util/grpcencoding/s2/s2.go b/pkg/util/grpcencoding/s2/s2.go new file mode 100644 index 00000000000..af9160fdbef --- /dev/null +++ b/pkg/util/grpcencoding/s2/s2.go @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/mostynb/go-grpc-compression/blob/f7e92b39057ca421a6485f650243a3e804036498/internal/s2/s2.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright 2022 Mostyn Bramley-Moore. + +// Package s2 is an experimental wrapper for using +// github.com/klauspost/compress/s2 stream compression with gRPC. +package s2 + +import ( + "io" + "sync" + + "github.com/klauspost/compress/s2" + "google.golang.org/grpc/encoding" +) + +const ( + // Name is the name of the S2 compressor. + Name = "s2" + // SnappyCompatName is the name of the Snappy compatible S2 compressor. + SnappyCompatName = "s2-snappy" +) + +type compressor struct { + name string + poolCompressor sync.Pool + poolDecompressor sync.Pool +} + +type writer struct { + *s2.Writer + pool *sync.Pool +} + +type reader struct { + *s2.Reader + pool *sync.Pool +} + +func init() { + encoding.RegisterCompressor(newCompressor(false)) + encoding.RegisterCompressor(newCompressor(true)) +} + +func newCompressor(snappyCompat bool) *compressor { + opts := []s2.WriterOption{s2.WriterConcurrency(1)} + var name string + if snappyCompat { + opts = append(opts, s2.WriterSnappyCompat()) + name = SnappyCompatName + } else { + name = Name + } + c := &compressor{ + name: name, + } + c.poolCompressor.New = func() interface{} { + w := s2.NewWriter(io.Discard, opts...) + return &writer{Writer: w, pool: &c.poolCompressor} + } + return c +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + s := c.poolCompressor.Get().(*writer) + s.Writer.Reset(w) + return s, nil +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + s, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newR := s2.NewReader(r) + return &reader{Reader: newR, pool: &c.poolDecompressor}, nil + } + s.Reset(r) + return s, nil +} + +func (c *compressor) Name() string { + return c.name +} + +func (s *writer) Close() error { + err := s.Writer.Close() + s.pool.Put(s) + return err +} + +func (s *reader) Read(p []byte) (n int, err error) { + n, err = s.Reader.Read(p) + if err == io.EOF { + s.pool.Put(s) + } + return n, err +} diff --git a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go index 75189904715..a8f728c61e2 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go +++ b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go @@ -2,6 +2,8 @@ package grpcclient import ( "flag" + "slices" + "strings" "time" "github.com/pkg/errors" @@ -40,6 +42,9 @@ type Config struct { Middleware []grpc.UnaryClientInterceptor `yaml:"-"` StreamMiddleware []grpc.StreamClientInterceptor `yaml:"-"` + + // CustomCompressors allows configuring custom compressors. + CustomCompressors []string `yaml:"-"` } // RegisterFlags registers flags. @@ -55,9 +60,19 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.InitialStreamWindowSize = defaultInitialWindowSize cfg.InitialConnectionWindowSize = defaultInitialWindowSize + var supportedCompressors strings.Builder + supportedCompressors.WriteString("Use compression when sending messages. Supported values are: 'gzip', 'snappy'") + for _, cmp := range cfg.CustomCompressors { + supportedCompressors.WriteString(", ") + supportedCompressors.WriteString("'") + supportedCompressors.WriteString(cmp) + supportedCompressors.WriteString("'") + } + supportedCompressors.WriteString(" and '' (disable compression)") + f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 100<<20, "gRPC client max send message size (bytes).") - f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") + f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", supportedCompressors.String()) f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit rate limits.") @@ -74,11 +89,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } func (cfg *Config) Validate() error { - switch cfg.GRPCCompression { - case gzip.Name, snappy.Name, "": - // valid - default: - return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) + supportedCompressors := []string{gzip.Name, snappy.Name, ""} + supportedCompressors = append(supportedCompressors, cfg.CustomCompressors...) + if !slices.Contains(supportedCompressors, cfg.GRPCCompression) { + return errors.Errorf("unsupported compression type: %q", cfg.GRPCCompression) } return nil } diff --git a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go index 6a831dac0f8..935ec0fc5e3 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go @@ -186,7 +186,7 @@ func NewClient(address string) (*Client, error) { ), } - conn, err := grpc.Dial(address, dialOptions...) + conn, err := grpc.NewClient(address, dialOptions...) if err != nil { return nil, err } diff --git a/vendor/github.com/grafana/dskit/ring/batch.go b/vendor/github.com/grafana/dskit/ring/batch.go index f982bd6c68c..e107cab830f 100644 --- a/vendor/github.com/grafana/dskit/ring/batch.go +++ b/vendor/github.com/grafana/dskit/ring/batch.go @@ -131,7 +131,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [ // Get call below takes ~1 microsecond for ~500 instances. // Checking every 10K calls would be every 10ms. if i%10e3 == 0 { - if err := ctx.Err(); err != nil { + if err := context.Cause(ctx); err != nil { o.Cleanup() return err } @@ -161,7 +161,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [ } // One last check before calling the callbacks: it doesn't make sense if context is canceled. - if err := ctx.Err(); err != nil { + if err := context.Cause(ctx); err != nil { o.Cleanup() return err } @@ -196,7 +196,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [ case <-tracker.done: return nil case <-ctx.Done(): - return ctx.Err() + return context.Cause(ctx) } } diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 8507e89ba54..083f112bdf1 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -724,8 +724,8 @@ func (i *Lifecycler) initRing(ctx context.Context) error { } tokens := Tokens(instanceDesc.Tokens) - level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", - len(tokens), "ring", i.RingName) + ro, rots := instanceDesc.GetReadOnlyState() + level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", len(tokens), "ring", i.RingName, "readOnly", ro, "readOnlyStateUpdate", rots) // If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its // ring state as LEAVING. Make sure to switch to the ACTIVE state. diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index a23eead3891..7b8e7593d9e 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -31,7 +31,6 @@ import ( "golang.org/x/net/netutil" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/experimental" "google.golang.org/grpc/keepalive" "github.com/grafana/dskit/httpgrpc" @@ -197,7 +196,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.GRPCServerMinTimeBetweenPings, "server.grpc.keepalive.min-time-between-pings", 5*time.Minute, "Minimum amount of time a client should wait before sending a keepalive ping. If client sends keepalive ping more often, server will send GOAWAY and close the connection.") f.BoolVar(&cfg.GRPCServerPingWithoutStreamAllowed, "server.grpc.keepalive.ping-without-stream-allowed", false, "If true, server allows keepalive pings even when there are no active streams(RPCs). If false, and client sends ping when there are no active streams, server will send GOAWAY and close the connection.") f.BoolVar(&cfg.GRPCServerStatsTrackingEnabled, "server.grpc.stats-tracking-enabled", true, "If true, the request_message_bytes, response_message_bytes, and inflight_requests metrics will be tracked. Enabling this option prevents the use of memory pools for parsing gRPC request bodies and may lead to more memory allocations.") - f.BoolVar(&cfg.GRPCServerRecvBufferPoolsEnabled, "server.grpc.recv-buffer-pools-enabled", false, "If true, gGPC's buffer pools will be used to handle incoming requests. Enabling this feature can reduce memory allocation, but also requires disabling GRPC server stats tracking by setting `server.grpc.stats-tracking-enabled=false`. This is an experimental gRPC feature, so it might be removed in a future version of the gRPC library.") + f.BoolVar(&cfg.GRPCServerRecvBufferPoolsEnabled, "server.grpc.recv-buffer-pools-enabled", false, "Deprecated option, has no effect and will be removed in a future version.") f.IntVar(&cfg.GRPCServerNumWorkers, "server.grpc.num-workers", 0, "If non-zero, configures the amount of GRPC server workers used to serve the requests.") f.StringVar(&cfg.PathPrefix, "server.path-prefix", "", "Base path to serve all API routes from (e.g. /v1/)") f.StringVar(&cfg.LogFormat, "log.format", log.LogfmtFormat, "Output log messages in the given format. Valid formats: [logfmt, json]") @@ -439,10 +438,7 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { } if cfg.GRPCServerRecvBufferPoolsEnabled { - if cfg.GRPCServerStatsTrackingEnabled { - return nil, fmt.Errorf("grpc_server_stats_tracking_enabled must be set to false if grpc_server_recv_buffer_pools_enabled is true") - } - grpcOptions = append(grpcOptions, experimental.RecvBufferPool(grpc.NewSharedBufferPool())) + level.Warn(logger).Log("msg", "'server.grpc.recv-buffer-pools-enabled' is a deprecated option that currently has no effect and will be removed in a future version") } grpcOptions = append(grpcOptions, cfg.GRPCOptions...) diff --git a/vendor/google.golang.org/grpc/experimental/experimental.go b/vendor/google.golang.org/grpc/experimental/experimental.go deleted file mode 100644 index de7f13a2210..00000000000 --- a/vendor/google.golang.org/grpc/experimental/experimental.go +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright 2023 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package experimental is a collection of experimental features that might -// have some rough edges to them. Housing experimental features in this package -// results in a user accessing these APIs as `experimental.Foo`, thereby making -// it explicit that the feature is experimental and using them in production -// code is at their own risk. -// -// All APIs in this package are experimental. -package experimental - -import ( - "google.golang.org/grpc" - "google.golang.org/grpc/internal" -) - -// WithRecvBufferPool returns a grpc.DialOption that configures the use of -// bufferPool for parsing incoming messages on a grpc.ClientConn. Depending on -// the application's workload, this could result in reduced memory allocation. -// -// If you are unsure about how to implement a memory pool but want to utilize -// one, begin with grpc.NewSharedBufferPool. -// -// Note: The shared buffer pool feature will not be active if any of the -// following options are used: WithStatsHandler, EnableTracing, or binary -// logging. In such cases, the shared buffer pool will be ignored. -// -// Note: It is not recommended to use the shared buffer pool when compression is -// enabled. -func WithRecvBufferPool(bufferPool grpc.SharedBufferPool) grpc.DialOption { - return internal.WithRecvBufferPool.(func(grpc.SharedBufferPool) grpc.DialOption)(bufferPool) -} - -// RecvBufferPool returns a grpc.ServerOption that configures the server to use -// the provided shared buffer pool for parsing incoming messages. Depending on -// the application's workload, this could result in reduced memory allocation. -// -// If you are unsure about how to implement a memory pool but want to utilize -// one, begin with grpc.NewSharedBufferPool. -// -// Note: The shared buffer pool feature will not be active if any of the -// following options are used: StatsHandler, EnableTracing, or binary logging. -// In such cases, the shared buffer pool will be ignored. -// -// Note: It is not recommended to use the shared buffer pool when compression is -// enabled. -func RecvBufferPool(bufferPool grpc.SharedBufferPool) grpc.ServerOption { - return internal.RecvBufferPool.(func(grpc.SharedBufferPool) grpc.ServerOption)(bufferPool) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 325406828aa..f8718009df4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -560,7 +560,7 @@ github.com/google/s2a-go/stream # github.com/google/uuid v1.6.0 ## explicit github.com/google/uuid -# github.com/googleapis/enterprise-certificate-proxy v0.3.3 +# github.com/googleapis/enterprise-certificate-proxy v0.3.4 ## explicit; go 1.19 github.com/googleapis/enterprise-certificate-proxy/client github.com/googleapis/enterprise-certificate-proxy/client/util @@ -614,7 +614,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20240826073544-47b1b6311db3 +# github.com/grafana/dskit v0.0.0-20240917154516-a871de81908f ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast @@ -1489,7 +1489,6 @@ google.golang.org/grpc/credentials/oauth google.golang.org/grpc/encoding google.golang.org/grpc/encoding/gzip google.golang.org/grpc/encoding/proto -google.golang.org/grpc/experimental google.golang.org/grpc/grpclog google.golang.org/grpc/health/grpc_health_v1 google.golang.org/grpc/internal