diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 2c0539ffaa..3a917623d6 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -128,6 +128,9 @@ func registerQuery(app *extkingpin.App) { endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups."). PlaceHolder("")) + endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "Experimental: DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components."). + PlaceHolder("")) + stores := extkingpin.Addrs(cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). PlaceHolder("")) @@ -151,6 +154,9 @@ func registerQuery(app *extkingpin.App) { strictEndpoints := cmd.Flag("endpoint-strict", "Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). PlaceHolder("").Strings() + strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "Experimental: DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails."). + PlaceHolder("")) + fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). PlaceHolder("").Strings() @@ -291,6 +297,7 @@ func registerQuery(app *extkingpin.App) { selectorLset, getFlagsMap(cmd.Flags()), *endpoints, + *endpointGroups, *stores, *ruleEndpoints, *targetEndpoints, @@ -312,6 +319,7 @@ func registerQuery(app *extkingpin.App) { *defaultMetadataTimeRange, *strictStores, *strictEndpoints, + *strictEndpointGroups, *webDisableCORS, enableQueryPushdown, *alertQueryURL, @@ -367,6 +375,7 @@ func runQuery( selectorLset labels.Labels, flagsMap map[string]string, endpointAddrs []string, + endpointGroupAddrs []string, storeAddrs []string, ruleAddrs []string, targetAddrs []string, @@ -388,6 +397,7 @@ func runQuery( defaultMetadataTimeRange time.Duration, strictStores []string, strictEndpoints []string, + strictEndpointGroups []string, disableCORS bool, enableQueryPushdown bool, alertQueryURL string, @@ -500,6 +510,18 @@ func runQuery( specs = append(specs, tmpSpecs...) } + for _, eg := range endpointGroupAddrs { + addr := fmt.Sprintf("dns:///%s", eg) + spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...) + specs = append(specs, spec) + } + + for _, eg := range strictEndpointGroups { + addr := fmt.Sprintf("dns:///%s", eg) + spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...) + specs = append(specs, spec) + } + return specs }, dialOpts, diff --git a/docs/components/query.md b/docs/components/query.md index 9de81aab5a..b37dd9e9ea 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -280,6 +280,17 @@ Flags: prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups. + --endpoint-group= ... + Experimental: DNS name of statically configured + Thanos API server groups (repeatable). Targets + resolved from the DNS name will be queried in + a round-robin, instead of a fanout manner. + This flag should be used when connecting a + Thanos Query to HA groups of Thanos components. + --endpoint-group-strict= ... + Experimental: DNS name of statically configured + Thanos API server groups (repeatable) that are + always used, even if the health check fails. --endpoint-strict= ... Addresses of only statically configured Thanos API servers that are always used, even if diff --git a/pkg/extgrpc/client.go b/pkg/extgrpc/client.go index 7db0c8e570..564261268d 100644 --- a/pkg/extgrpc/client.go +++ b/pkg/extgrpc/client.go @@ -20,6 +20,27 @@ import ( "github.com/thanos-io/thanos/pkg/tracing" ) +// EndpointGroupGRPCOpts creates gRPC dial options for connecting to endpoint groups. +// For details on retry capabilities, see https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy-capabilities +func EndpointGroupGRPCOpts() []grpc.DialOption { + serviceConfig := ` +{ + "loadBalancingPolicy":"round_robin", + "retryPolicy": { + "maxAttempts": 3, + "initialBackoff": "0.1s", + "backoffMultiplier": 2, + "retryableStatusCodes": [ + "UNAVAILABLE" + ] + } +}` + + return []grpc.DialOption{ + grpc.WithDefaultServiceConfig(serviceConfig), + } +} + // StoreClientGRPCOpts creates gRPC dial options for connecting to a store client. func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { grpcMets := grpc_prometheus.NewClientMetrics() diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 03a0c1cb29..0a2d21cf2b 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -48,14 +48,19 @@ const ( type GRPCEndpointSpec struct { addr string isStrictStatic bool + dialOpts []grpc.DialOption } const externalLabelLimit = 1000 // NewGRPCEndpointSpec creates gRPC endpoint spec. // It uses InfoAPI to get Metadata. -func NewGRPCEndpointSpec(addr string, isStrictStatic bool) *GRPCEndpointSpec { - return &GRPCEndpointSpec{addr: addr, isStrictStatic: isStrictStatic} +func NewGRPCEndpointSpec(addr string, isStrictStatic bool, dialOpts ...grpc.DialOption) *GRPCEndpointSpec { + return &GRPCEndpointSpec{ + addr: addr, + isStrictStatic: isStrictStatic, + dialOpts: dialOpts, + } } func (es *GRPCEndpointSpec) Addr() string { @@ -622,7 +627,8 @@ type endpointRef struct { // newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec) (*endpointRef, error) { - conn, err := grpc.DialContext(ctx, spec.Addr(), e.dialOpts...) + dialOpts := append(e.dialOpts, spec.dialOpts...) + conn, err := grpc.DialContext(ctx, spec.Addr(), dialOpts...) if err != nil { return nil, errors.Wrap(err, "dialing connection") }