Skip to content

Commit

Permalink
Implement endpoint groups (thanos-io#5548)
Browse files Browse the repository at this point in the history
* Implement endpoint groups

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix imports

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Remove stray grpc option

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Mark as experimental and regenerate docs

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

---------

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski authored and Nathaniel Graham committed Apr 17, 2023
1 parent 07a8d7d commit 54ef3e8
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 3 deletions.
22 changes: 22 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func registerQuery(app *extkingpin.App) {
endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").
PlaceHolder("<endpoint>"))

endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "Experimental: DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components.").
PlaceHolder("<endpoint-group>"))

stores := extkingpin.Addrs(cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>"))

Expand All @@ -151,6 +154,9 @@ func registerQuery(app *extkingpin.App) {
strictEndpoints := cmd.Flag("endpoint-strict", "Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticendpoint>").Strings()

strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "Experimental: DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails.").
PlaceHolder("<endpoint-group-strict>"))

fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

Expand Down Expand Up @@ -291,6 +297,7 @@ func registerQuery(app *extkingpin.App) {
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
*endpointGroups,
*stores,
*ruleEndpoints,
*targetEndpoints,
Expand All @@ -312,6 +319,7 @@ func registerQuery(app *extkingpin.App) {
*defaultMetadataTimeRange,
*strictStores,
*strictEndpoints,
*strictEndpointGroups,
*webDisableCORS,
enableQueryPushdown,
*alertQueryURL,
Expand Down Expand Up @@ -367,6 +375,7 @@ func runQuery(
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
endpointGroupAddrs []string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
Expand All @@ -388,6 +397,7 @@ func runQuery(
defaultMetadataTimeRange time.Duration,
strictStores []string,
strictEndpoints []string,
strictEndpointGroups []string,
disableCORS bool,
enableQueryPushdown bool,
alertQueryURL string,
Expand Down Expand Up @@ -500,6 +510,18 @@ func runQuery(
specs = append(specs, tmpSpecs...)
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

return specs
},
dialOpts,
Expand Down
11 changes: 11 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ Flags:
prefixed with 'dns+' or 'dnssrv+' to detect
Thanos API servers through respective DNS
lookups.
--endpoint-group=<endpoint-group> ...
Experimental: DNS name of statically configured
Thanos API server groups (repeatable). Targets
resolved from the DNS name will be queried in
a round-robin, instead of a fanout manner.
This flag should be used when connecting a
Thanos Query to HA groups of Thanos components.
--endpoint-group-strict=<endpoint-group-strict> ...
Experimental: DNS name of statically configured
Thanos API server groups (repeatable) that are
always used, even if the health check fails.
--endpoint-strict=<staticendpoint> ...
Addresses of only statically configured Thanos
API servers that are always used, even if
Expand Down
21 changes: 21 additions & 0 deletions pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,27 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

// EndpointGroupGRPCOpts creates gRPC dial options for connecting to endpoint groups.
// For details on retry capabilities, see https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy-capabilities
func EndpointGroupGRPCOpts() []grpc.DialOption {
serviceConfig := `
{
"loadBalancingPolicy":"round_robin",
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.1s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE"
]
}
}`

return []grpc.DialOption{
grpc.WithDefaultServiceConfig(serviceConfig),
}
}

// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
Expand Down
12 changes: 9 additions & 3 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,19 @@ const (
type GRPCEndpointSpec struct {
addr string
isStrictStatic bool
dialOpts []grpc.DialOption
}

const externalLabelLimit = 1000

// NewGRPCEndpointSpec creates gRPC endpoint spec.
// It uses InfoAPI to get Metadata.
func NewGRPCEndpointSpec(addr string, isStrictStatic bool) *GRPCEndpointSpec {
return &GRPCEndpointSpec{addr: addr, isStrictStatic: isStrictStatic}
func NewGRPCEndpointSpec(addr string, isStrictStatic bool, dialOpts ...grpc.DialOption) *GRPCEndpointSpec {
return &GRPCEndpointSpec{
addr: addr,
isStrictStatic: isStrictStatic,
dialOpts: dialOpts,
}
}

func (es *GRPCEndpointSpec) Addr() string {
Expand Down Expand Up @@ -622,7 +627,8 @@ type endpointRef struct {
// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address.
// The call to newEndpointRef will return an error if establishing the channel fails.
func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec) (*endpointRef, error) {
conn, err := grpc.DialContext(ctx, spec.Addr(), e.dialOpts...)
dialOpts := append(e.dialOpts, spec.dialOpts...)
conn, err := grpc.DialContext(ctx, spec.Addr(), dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "dialing connection")
}
Expand Down

0 comments on commit 54ef3e8

Please sign in to comment.