Skip to content

Commit

Permalink
discovery: use thanos resolver for endpoint groups
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Jul 31, 2024
1 parent 639bf8f commit ef38b8a
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Changed

- [#7494](https://github.com/thanos-io/thanos/pull/7494) Ruler: remove trailing period from SRV records returned by discovery `dnsnosrva` lookups
- [#7567](https://github.com/thanos-io/thanos/pull/7565) Query: Use thanos resolver for endpoint groups.

### Removed

Expand Down
16 changes: 12 additions & 4 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,16 @@ func runQuery(
}
}

// Register resolver for the "thanos:///" scheme for endpoint-groups
dns.RegisterGRPCResolver(
dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg),
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
)

dnsEndpointProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
Expand Down Expand Up @@ -891,14 +901,12 @@ func prepareEndpointSet(
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

Expand Down
97 changes: 97 additions & 0 deletions pkg/discovery/dns/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package dns

import (
"context"
"sync"
"time"

grpcresolver "google.golang.org/grpc/resolver"
)

var (
_ grpcresolver.Builder = &builder{}
_ grpcresolver.Resolver = &resolver{}
)

type builder struct {
resolveInterval time.Duration
provider *Provider
}

func RegisterGRPCResolver(provider *Provider, interval time.Duration) {
grpcresolver.Register(&builder{
resolveInterval: interval,
provider: provider,
})
}

func (b *builder) Scheme() string { return "thanos" }

func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grpcresolver.BuildOptions) (grpcresolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
r := &resolver{
provider: b.provider,
target: t.Endpoint(),
ctx: ctx,
cancel: cancel,
cc: cc,
interval: b.resolveInterval,
}
r.wg.Add(1)
go r.run()

return r, nil
}

type resolver struct {
provider *Provider

target string
ctx context.Context
cancel context.CancelFunc
cc grpcresolver.ClientConn
interval time.Duration

wg sync.WaitGroup
}

func (r *resolver) Close() {
r.cancel()
r.wg.Wait()
}

func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {}

func (r *resolver) resolve() error {
ctx, cancel := context.WithTimeout(r.ctx, r.interval)
defer cancel()
return r.provider.Resolve(ctx, []string{r.target})
}

func (r *resolver) addresses() []string {
return r.provider.AddressesForHost(r.target)
}

func (r *resolver) run() {
defer r.wg.Done()
for {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
} else {
state := grpcresolver.State{}
for _, addr := range r.addresses() {
raddr := grpcresolver.Address{Addr: addr}
state.Addresses = append(state.Addresses, raddr)
}
_ = r.cc.UpdateState(state)
}
select {
case <-r.ctx.Done():
return
case <-time.After(r.interval):
}
}
}
13 changes: 13 additions & 0 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,16 @@ func (p *Provider) Addresses() []string {
}
return result
}

// AddressesForHost returns the latest addresses present for the host in the Provider.
func (p *Provider) AddressesForHost(host string) []string {
p.RLock()
defer p.RUnlock()

addrs := p.resolved[host]

res := make([]string, len(addrs))
copy(res, addrs)

return res
}

0 comments on commit ef38b8a

Please sign in to comment.