Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: use thanos resolver for endpoint groups #7565

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Changed

- [#7494](https://github.com/thanos-io/thanos/pull/7494) Ruler: remove trailing period from SRV records returned by discovery `dnsnosrva` lookups
- [#7567](https://github.com/thanos-io/thanos/pull/7565) Query: Use thanos resolver for endpoint groups.

### Removed

Expand Down
16 changes: 12 additions & 4 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,16 @@ func runQuery(
}
}

// Register resolver for the "thanos:///" scheme for endpoint-groups
dns.RegisterGRPCResolver(
dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg),
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
)

dnsEndpointProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
Expand Down Expand Up @@ -891,14 +901,12 @@ func prepareEndpointSet(
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is a breaking change; its so that people can chose to keep previous behavior by adding "dns:///" prefix or opt into thanos resolver by adding "thanos:///" prefix; alternatively we could snoop if thanos:/// is added and if not add dns:/// or just add "thanos:///" by default, in theory it should be not noticeable mostly. I like it better to state the resolver explicitely here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the main difference between the Thanos DNS resolver and the default DNS resolver we are using?

Trying to understand if it is a big difference. If it is, then I think the detection is needed and if no prefix is added we can still default to dns://. If there is no big difference I think I am fine either way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thanos one understands our "dnssrv+" stuff while the dns one does not. Otherwise it should be roughly the same.

spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

Expand Down
97 changes: 97 additions & 0 deletions pkg/discovery/dns/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package dns

import (
"context"
"sync"
"time"

grpcresolver "google.golang.org/grpc/resolver"
)

var (
_ grpcresolver.Builder = &builder{}
_ grpcresolver.Resolver = &resolver{}
)

type builder struct {
resolveInterval time.Duration
provider *Provider
}

func RegisterGRPCResolver(provider *Provider, interval time.Duration) {
grpcresolver.Register(&builder{
resolveInterval: interval,
provider: provider,
})
}

func (b *builder) Scheme() string { return "thanos" }

func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grpcresolver.BuildOptions) (grpcresolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &resolver{
provider: b.provider,
target: t.Endpoint(),
ctx: ctx,
cancel: cancel,
cc: cc,
interval: b.resolveInterval,
}
r.wg.Add(1)
go r.run()

return r, nil
}

type resolver struct {
provider *Provider

target string
ctx context.Context
cancel context.CancelFunc
cc grpcresolver.ClientConn
interval time.Duration

wg sync.WaitGroup
}

func (r *resolver) Close() {
r.cancel()
r.wg.Wait()
}

func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {}
MichaHoffmann marked this conversation as resolved.
Show resolved Hide resolved

func (r *resolver) resolve() error {
ctx, cancel := context.WithTimeout(r.ctx, r.interval)
defer cancel()
return r.provider.Resolve(ctx, []string{r.target})
}

func (r *resolver) addresses() []string {
return r.provider.AddressesForHost(r.target)
}

func (r *resolver) run() {
defer r.wg.Done()
for {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
} else {
state := grpcresolver.State{}
for _, addr := range r.addresses() {
raddr := grpcresolver.Address{Addr: addr}
state.Addresses = append(state.Addresses, raddr)
}
_ = r.cc.UpdateState(state)
}
select {
case <-r.ctx.Done():
return
case <-time.After(r.interval):
}
}
}
13 changes: 13 additions & 0 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,16 @@ func (p *Provider) Addresses() []string {
}
return result
}

// AddressesForHost returns the latest addresses present for the host in the Provider.
func (p *Provider) AddressesForHost(host string) []string {
p.RLock()
defer p.RUnlock()

addrs := p.resolved[host]

res := make([]string, len(addrs))
copy(res, addrs)

return res
}
Loading