Skip to content

Commit

Permalink
discovery: preserve results from other resolve calls (#7886)
Browse files Browse the repository at this point in the history
Properly preserve results from other resolve calls. There is an
assumption that resolve() is always called with the same addresses but
that is not true with gRPC and `--endpoint-group`. Without this fix,
multiple resolves could happen at the same time but some of the callers
will not be able to retrieve the results leading to random errors.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS authored Nov 6, 2024
1 parent ebfc03e commit df3df36
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 60 deletions.
15 changes: 8 additions & 7 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
logger,
)

dnsEndpointProvider := dns.NewProvider(
Expand Down Expand Up @@ -608,7 +609,7 @@ func runQuery(
fileSDCache.Update(update)
endpoints.Update(ctxUpdate)

if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}

Expand All @@ -628,22 +629,22 @@ func runQuery(
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval)
defer resolveCancel()
if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
}
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
}
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil {
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err)
}
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs); err != nil {
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err)

}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func runRule(
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second)
defer resolveCancel()
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints); err != nil {
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/cortex/chunk/cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (c *memcachedClient) updateMemcacheServers() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := c.provider.Resolve(ctx, c.addresses); err != nil {
if err := c.provider.Resolve(ctx, c.addresses, true); err != nil {
return err
}
servers = c.provider.Addresses()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf

go func() {
for {
if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers); err != nil {
if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for groupcache", "err", err)
} else {
err := universe.Set(dnsGroupcacheProvider.Addresses()...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ type memcachedClient struct {
// AddressProvider performs node address resolution given a list of clusters.
type AddressProvider interface {
// Resolves the provided list of memcached cluster to the actual nodes
Resolve(context.Context, []string) error
Resolve(context.Context, []string, bool) error

// Returns the nodes
Addresses() []string
Expand Down Expand Up @@ -638,7 +638,7 @@ func (c *memcachedClient) resolveAddrs() error {
defer cancel()

// If some of the dns resolution fails, log the error.
if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil {
if err := c.addressProvider.Resolve(ctx, c.config.Addresses, true); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
Expand Down
4 changes: 2 additions & 2 deletions pkg/clientconfig/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (c HTTPFileSDConfig) convert() (file.SDConfig, error) {
}

type AddressProvider interface {
Resolve(context.Context, []string) error
Resolve(context.Context, []string, bool) error
Addresses() []string
}

Expand Down Expand Up @@ -433,5 +433,5 @@ func (c *HTTPClient) Discover(ctx context.Context) {

// Resolve refreshes and resolves the list of targets.
func (c *HTTPClient) Resolve(ctx context.Context) error {
return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...))
return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...), true)
}
36 changes: 25 additions & 11 deletions pkg/discovery/dns/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/go-kit/log"
grpcresolver "google.golang.org/grpc/resolver"
)

Expand All @@ -19,12 +20,14 @@ var (
type builder struct {
resolveInterval time.Duration
provider *Provider
logger log.Logger
}

func RegisterGRPCResolver(provider *Provider, interval time.Duration) {
func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) {
grpcresolver.Register(&builder{
resolveInterval: interval,
provider: provider,
logger: logger,
})
}

Expand All @@ -39,6 +42,7 @@ func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grp
cancel: cancel,
cc: cc,
interval: b.resolveInterval,
logger: b.logger,
}
r.wg.Add(1)
go r.run()
Expand All @@ -55,7 +59,8 @@ type resolver struct {
cc grpcresolver.ClientConn
interval time.Duration

wg sync.WaitGroup
wg sync.WaitGroup
logger log.Logger
}

func (r *resolver) Close() {
Expand All @@ -68,7 +73,7 @@ func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {}
func (r *resolver) resolve() error {
ctx, cancel := context.WithTimeout(r.ctx, r.interval)
defer cancel()
return r.provider.Resolve(ctx, []string{r.target})
return r.provider.Resolve(ctx, []string{r.target}, false)
}

func (r *resolver) addresses() []string {
Expand All @@ -78,16 +83,25 @@ func (r *resolver) addresses() []string {
func (r *resolver) run() {
defer r.wg.Done()
for {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
} else {
func() {
if err := r.resolve(); err != nil {
r.cc.ReportError(err)
r.logger.Log("msg", "failed to resolve", "err", err)
return
}
state := grpcresolver.State{}
for _, addr := range r.addresses() {
raddr := grpcresolver.Address{Addr: addr}
state.Addresses = append(state.Addresses, raddr)
addrs := r.addresses()
if len(addrs) == 0 {
r.logger.Log("msg", "no addresses resolved", "target", r.target)
return
}
_ = r.cc.UpdateState(state)
}
for _, addr := range addrs {
state.Addresses = append(state.Addresses, grpcresolver.Address{Addr: addr})
}
if err := r.cc.UpdateState(state); err != nil {
r.logger.Log("msg", "failed to update state", "err", err)
}
}()
select {
case <-r.ctx.Done():
return
Expand Down
15 changes: 8 additions & 7 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func GetQTypeName(addr string) (qtype, name string) {
// Resolve stores a list of provided addresses or their DNS records if requested.
// Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV).
// For non-SRV records, it will return an error if a port is not supplied.
func (p *Provider) Resolve(ctx context.Context, addrs []string) error {
func (p *Provider) Resolve(ctx context.Context, addrs []string, flushOld bool) error {
resolvedAddrs := map[string][]string{}
errs := errutil.MultiError{}

Expand All @@ -129,10 +129,7 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) error {
errs.Add(err)
// The DNS resolution failed. Continue without modifying the old records.
p.resolverFailuresCount.Inc()
// Use cached values.
p.RLock()
resolved = p.resolved[addr]
p.RUnlock()
continue
}
resolvedAddrs[addr] = resolved
}
Expand All @@ -143,13 +140,17 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) error {
defer p.Unlock()

p.resolverAddrs.ResetTx()
if flushOld && len(errs) == 0 {
p.resolved = map[string][]string{}
}
for name, addrs := range resolvedAddrs {
p.resolved[name] = addrs
}
for name, addrs := range p.resolved {
p.resolverAddrs.WithLabelValues(name).Set(float64(len(addrs)))
}
p.resolverAddrs.Submit()

p.resolved = resolvedAddrs

return errs.Err()
}

Expand Down
41 changes: 25 additions & 16 deletions pkg/discovery/dns/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,66 +33,75 @@ func TestProvider(t *testing.T) {
}
ctx := context.TODO()

err := prv.Resolve(ctx, []string{"any+x"})
err := prv.Resolve(ctx, []string{"any+x"}, false)
testutil.Ok(t, err)
result := prv.Addresses()
sort.Strings(result)
testutil.Equals(t, []string(nil), result)
testutil.Equals(t, 1, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+x")))

err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"})
err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, ips, result)
testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+a")))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+b", "any+c"})
err = prv.Resolve(ctx, []string{"any+b", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, ips[2:], result)
testutil.Equals(t, 2, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, ips, result)
testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+x"})
err = prv.Resolve(ctx, []string{"any+x"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, []string(nil), result)
testutil.Equals(t, 1, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, ips, result)
testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+x")))

err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"})
err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, ips, result)
testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+a")))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+b", "example.com:90", "any+c"})
err = prv.Resolve(ctx, []string{"any+b", "example.com:90", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, append(ips[2:], "example.com:90"), result)
testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, append(ips, "example.com:90"), result)
testutil.Equals(t, 5, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("example.com:90")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+b", "any+c"})
err = prv.Resolve(ctx, []string{"any+b", "any+c"}, false)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, append(ips, "example.com:90"), result)
testutil.Equals(t, 5, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))

err = prv.Resolve(ctx, []string{"any+b", "any+c"}, true)
testutil.Ok(t, err)
result = prv.Addresses()
sort.Strings(result)
testutil.Equals(t, ips[2:], result)
testutil.Equals(t, ips[2:5], result)
testutil.Equals(t, 2, promtestutil.CollectAndCount(prv.resolverAddrs))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b")))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c")))
Expand Down
17 changes: 9 additions & 8 deletions pkg/discovery/memcache/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.
}

// Resolve stores a list of nodes auto-discovered from the provided addresses.
func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
func (p *Provider) Resolve(ctx context.Context, addresses []string, flushOld bool) error {
clusterConfigs := map[string]*clusterConfig{}
errs := errutil.MultiError{}

Expand All @@ -74,13 +74,9 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
errs.Add(err)
p.resolverFailuresCount.Inc()

// Use cached values.
p.RLock()
clusterConfigs[address] = p.clusterConfigs[address]
p.RUnlock()
} else {
clusterConfigs[address] = clusterConfig
continue
}
clusterConfigs[address] = clusterConfig
}

p.Lock()
Expand All @@ -95,7 +91,12 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
p.resolvedAddresses.Submit()
p.configVersion.Submit()

p.clusterConfigs = clusterConfigs
if flushOld && len(errs) == 0 {
p.clusterConfigs = map[string]*clusterConfig{}
}
for addr, config := range clusterConfigs {
p.clusterConfigs[addr] = config
}

return errs.Err()
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/discovery/memcache/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestProviderUpdatesAddresses(t *testing.T) {
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
testutil.Ok(t, provider.Resolve(ctx, clusters, true))
addresses := provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
Expand All @@ -38,7 +38,7 @@ func TestProviderUpdatesAddresses(t *testing.T) {
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
}

testutil.Ok(t, provider.Resolve(ctx, clusters))
testutil.Ok(t, provider.Resolve(ctx, clusters, true))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses)
Expand All @@ -56,15 +56,15 @@ func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) {
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
testutil.Ok(t, provider.Resolve(ctx, clusters, true))
addresses := provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = nil
resolver.err = errors.New("oops")

testutil.NotOk(t, provider.Resolve(ctx, clusters))
testutil.NotOk(t, provider.Resolve(ctx, clusters, true))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
Expand Down

0 comments on commit df3df36

Please sign in to comment.