Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: preserve results from other resolve calls #7886

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
logger,
)

dnsEndpointProvider := dns.NewProvider(
Expand Down Expand Up @@ -608,7 +609,7 @@ func runQuery(
fileSDCache.Update(update)
endpoints.Update(ctxUpdate)

if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}

Expand All @@ -628,22 +629,22 @@ func runQuery(
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval)
defer resolveCancel()
if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
}
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
}
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil {
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err)
}
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs); err != nil {
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err)

}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func runRule(
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second)
defer resolveCancel()
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints); err != nil {
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/cortex/chunk/cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (c *memcachedClient) updateMemcacheServers() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := c.provider.Resolve(ctx, c.addresses); err != nil {
if err := c.provider.Resolve(ctx, c.addresses, true); err != nil {
return err
}
servers = c.provider.Addresses()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf

go func() {
for {
if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers); err != nil {
if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for groupcache", "err", err)
} else {
err := universe.Set(dnsGroupcacheProvider.Addresses()...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ type memcachedClient struct {
// AddressProvider performs node address resolution given a list of clusters.
type AddressProvider interface {
// Resolves the provided list of memcached cluster to the actual nodes
Resolve(context.Context, []string) error
Resolve(context.Context, []string, bool) error

// Returns the nodes
Addresses() []string
Expand Down Expand Up @@ -638,7 +638,7 @@ func (c *memcachedClient) resolveAddrs() error {
defer cancel()

// If some of the dns resolution fails, log the error.
if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil {
if err := c.addressProvider.Resolve(ctx, c.config.Addresses, true); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
Expand Down
4 changes: 2 additions & 2 deletions pkg/clientconfig/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (c HTTPFileSDConfig) convert() (file.SDConfig, error) {
}

type AddressProvider interface {
Resolve(context.Context, []string) error
Resolve(context.Context, []string, bool) error
Addresses() []string
}

Expand Down Expand Up @@ -433,5 +433,5 @@ func (c *HTTPClient) Discover(ctx context.Context) {

// Resolve refreshes and resolves the list of targets.
func (c *HTTPClient) Resolve(ctx context.Context) error {
return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...))
return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...), true)
}
36 changes: 25 additions & 11 deletions pkg/discovery/dns/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/go-kit/log"
grpcresolver "google.golang.org/grpc/resolver"
)

Expand All @@ -19,12 +20,14 @@ var (
type builder struct {
resolveInterval time.Duration
provider *Provider
logger log.Logger
}

func RegisterGRPCResolver(provider *Provider, interval time.Duration) {
func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) {
grpcresolver.Register(&builder{
resolveInterval: interval,
provider: provider,
logger: logger,
})
}

Expand All @@ -39,6 +42,7 @@ func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grp
cancel: cancel,
cc: cc,
interval: b.resolveInterval,
logger: b.logger,
}
r.wg.Add(1)
go r.run()
Expand All @@ -55,7 +59,8 @@ type resolver struct {
cc grpcresolver.ClientConn
interval time.Duration

wg sync.WaitGroup
wg sync.WaitGroup
logger log.Logger
}

func (r *resolver) Close() {
Expand All @@ -68,7 +73,7 @@ func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {}
func (r *resolver) resolve() error {
ctx, cancel := context.WithTimeout(r.ctx, r.interval)
defer cancel()
return r.provider.Resolve(ctx, []string{r.target})
return r.provider.Resolve(ctx, []string{r.target}, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @GiedriusS, can you please help me understand this change. This is the only place where we set it to false. Why it is the case?
Can we just enable it to true everywhere?

Copy link
Member Author

@GiedriusS GiedriusS Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we cannot, because in gRPC the same resolver is reused between --endpoint-groups. In other words, Build() above is called from multiple places in gRPC but they all reuse the same resolver. They first resolve and then fetch the values from cache. If we flush here then some of the results are lost and the Query component will not connected to some of the endpoints.

Some addresses can be shared between --endpoint-groups so that's why I opted to reuse the same resolver.

}

func (r *resolver) addresses() []string {
Expand All @@ -78,16 +83,25 @@ func (r *resolver) addresses() []string {
func (r *resolver) run() {
defer r.wg.Done()
for {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
} else {
func() {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
r.logger.Log("msg", "failed to resolve", "err", err)
return
}
state := grpcresolver.State{}
for _, addr := range r.addresses() {
raddr := grpcresolver.Address{Addr: addr}
state.Addresses = append(state.Addresses, raddr)
addrs := r.addresses()
if len(addrs) == 0 {
r.logger.Log("msg", "no addresses resolved", "target", r.target)
return
}
_ = r.cc.UpdateState(state)
}
for _, addr := range addrs {
state.Addresses = append(state.Addresses, grpcresolver.Address{Addr: addr})
}
if err := r.cc.UpdateState(state); err != nil {
r.logger.Log("msg", "failed to update state", "err", err)
}
}()
select {
case <-r.ctx.Done():
return
Expand Down
15 changes: 8 additions & 7 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func GetQTypeName(addr string) (qtype, name string) {
// Resolve stores a list of provided addresses or their DNS records if requested.
// Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV).
// For non-SRV records, it will return an error if a port is not supplied.
func (p *Provider) Resolve(ctx context.Context, addrs []string) error {
func (p *Provider) Resolve(ctx context.Context, addrs []string, flushOld bool) error {
resolvedAddrs := map[string][]string{}
errs := errutil.MultiError{}

Expand All @@ -129,10 +129,7 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) error {
errs.Add(err)
// The DNS resolution failed. Continue without modifying the old records.
p.resolverFailuresCount.Inc()
// Use cached values.
p.RLock()
resolved = p.resolved[addr]
p.RUnlock()
continue
}
resolvedAddrs[addr] = resolved
}
Expand All @@ -143,13 +140,17 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) error {
defer p.Unlock()

p.resolverAddrs.ResetTx()
if flushOld && len(errs) == 0 {
p.resolved = map[string][]string{}
}
for name, addrs := range resolvedAddrs {
p.resolved[name] = addrs
}
for name, addrs := range p.resolved {
p.resolverAddrs.WithLabelValues(name).Set(float64(len(addrs)))
}
p.resolverAddrs.Submit()

p.resolved = resolvedAddrs

return errs.Err()
}

Expand Down
41 changes: 25 additions & 16 deletions pkg/discovery/dns/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,66 +33,75 @@ func TestProvider(t *testing.T) {
}
ctx := context.TODO()

err := prv.Resolve(ctx, []string{"any+x"})
err := prv.Resolve(ctx, []string{"any+x"}, false)
testutil.Ok(t, err)
result := prv.Addresses()
sort.Strings(result)
testutil.Equals(t, []string(nil), result)
testutil.Equals(t, 1, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+x")))

err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"})
err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, ips, result)
testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+a")))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+b", "any+c"})
err = prv.Resolve(ctx, []string{"any+b", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, ips[2:], result)
testutil.Equals(t, 2, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, ips, result)
testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+x"})
err = prv.Resolve(ctx, []string{"any+x"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, []string(nil), result)
testutil.Equals(t, 1, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, ips, result)
testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+x")))

err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"})
err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, ips, result)
testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+a")))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+b", "example.com:90", "any+c"})
err = prv.Resolve(ctx, []string{"any+b", "example.com:90", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, append(ips[2:], "example.com:90"), result)
testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, append(ips, "example.com:90"), result)
testutil.Equals(t, 5, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("example.com:90")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+b", "any+c"})
err = prv.Resolve(ctx, []string{"any+b", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, append(ips, "example.com:90"), result)
testutil.Equals(t, 5, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+b", "any+c"}, true)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, ips[2:], result)
testutil.Equals(t, ips[2:5], result)
testutil.Equals(t, 2, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))
Expand Down
17 changes: 9 additions & 8 deletions pkg/discovery/memcache/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.
}

// Resolve stores a list of nodes auto-discovered from the provided addresses.
func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
func (p *Provider) Resolve(ctx context.Context, addresses []string, flushOld bool) error {
clusterConfigs := map[string]*clusterConfig{}
errs := errutil.MultiError{}

Expand All @@ -74,13 +74,9 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
errs.Add(err)
p.resolverFailuresCount.Inc()

// Use cached values.
p.RLock()
clusterConfigs[address] = p.clusterConfigs[address]
p.RUnlock()
} else {
clusterConfigs[address] = clusterConfig
continue
}
clusterConfigs[address] = clusterConfig
}

p.Lock()
Expand All @@ -95,7 +91,12 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
p.resolvedAddresses.Submit()
p.configVersion.Submit()

p.clusterConfigs = clusterConfigs
if flushOld && len(errs) == 0 {
p.clusterConfigs = map[string]*clusterConfig{}
}
for addr, config := range clusterConfigs {
p.clusterConfigs[addr] = config
}

return errs.Err()
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/discovery/memcache/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestProviderUpdatesAddresses(t *testing.T) {
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
testutil.Ok(t, provider.Resolve(ctx, clusters, true))
addresses := provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
Expand All @@ -38,7 +38,7 @@ func TestProviderUpdatesAddresses(t *testing.T) {
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
}

testutil.Ok(t, provider.Resolve(ctx, clusters))
testutil.Ok(t, provider.Resolve(ctx, clusters, true))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses)
Expand All @@ -56,15 +56,15 @@ func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) {
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
testutil.Ok(t, provider.Resolve(ctx, clusters, true))
addresses := provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = nil
resolver.err = errors.New("oops")

testutil.NotOk(t, provider.Resolve(ctx, clusters))
testutil.NotOk(t, provider.Resolve(ctx, clusters, true))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
Expand Down
Loading