Skip to content

Commit

Permalink
chore(v2): add metastore dns discovery (#3606)
Browse files Browse the repository at this point in the history
  • Loading branch information
korniltsev authored Oct 4, 2024
1 parent fff4231 commit 81ab235
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 30 deletions.
18 changes: 16 additions & 2 deletions pkg/experiment/metastore/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestUnavailable(t *testing.T) {
ports, err := test.GetFreePorts(nServers)
assert.NoError(t, err)

d.On("ServerError", mock.Anything).Run(func(args mock.Arguments) {
d.On("Rediscover").Run(func(args mock.Arguments) {
}).Return()

c.updateServers(createServers(ports))
Expand Down Expand Up @@ -98,7 +98,7 @@ func testRediscoverWrongLeader(t *testing.T, f func(c *Client)) {
defer servers.Close()

verify := func() {}
d.On("ServerError", mock.Anything).Run(func(args mock.Arguments) {
d.On("Rediscover", mock.Anything).Run(func(args mock.Arguments) {
m.Lock()
defer m.Unlock()
if servers == nil {
Expand All @@ -116,3 +116,17 @@ func testRediscoverWrongLeader(t *testing.T, f func(c *Client)) {
f(c)
verify()
}

func TestServerError(t *testing.T) {
d := mockdiscovery.NewMockDiscovery(t)
d.On("Subscribe", mock.Anything).Return()
l := testutil.NewLogger(t)
c := New(l, grpcclient.Config{}, d)

d.On("Rediscover").Run(func(args mock.Arguments) {
}).Return()

res, err := c.AddBlock(context.Background(), &metastorev1.AddBlockRequest{})
require.Error(t, err)
require.Nil(t, res)
}
3 changes: 2 additions & 1 deletion pkg/experiment/metastore/client/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func invoke[R any](ctx context.Context, cl *Client,
if it == nil {
cl.logger.Log("msg", "no instances available, backoff and retry")
time.Sleep(backoff)
cl.discovery.Rediscover()
continue
}
res, err := f(ctx, it)
Expand Down Expand Up @@ -65,9 +66,9 @@ func invoke[R any](ctx context.Context, cl *Client,
cl.leader = raft.ServerID(detailsLeader)
}
cl.mu.Unlock()
cl.discovery.ServerError(it.srv)
}
time.Sleep(backoff)
cl.discovery.Rediscover()
}
return nil, fmt.Errorf("metastore client retries failed")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/metastore/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func (s *Server) String() string {

type Discovery interface {
Subscribe(updates Updates)
ServerError(srv Server)
Rediscover()
Close()
}
76 changes: 76 additions & 0 deletions pkg/experiment/metastore/discovery/dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package discovery

import (
"context"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/dns"
"github.com/hashicorp/raft"
"sync"
)

type DNSDiscovery struct {
logger log.Logger
addr string
provider *dns.Provider
m sync.Mutex
upd Updates
resolved []Server
}

func NewDNSDiscovery(l log.Logger, addr string, p *dns.Provider) *DNSDiscovery {
d := &DNSDiscovery{
logger: log.With(l, "addr", addr, "component", "dns-discovery"),
addr: addr,
provider: p,
}

return d
}

func (d *DNSDiscovery) Subscribe(updates Updates) {
d.m.Lock()
d.upd = updates
d.m.Unlock()
d.resolve()
}

func (d *DNSDiscovery) Rediscover() {
d.resolve()
}

func (d *DNSDiscovery) Close() {

}

func (d *DNSDiscovery) resolve() {
err := d.provider.Resolve(context.Background(), []string{d.addr})
if err != nil {
level.Error(d.logger).Log("msg", "failed to resolve DNS", "addr", d.addr, "err", err)
return
}
addrs := d.provider.Addresses()
if len(addrs) == 0 {
level.Error(d.logger).Log("msg", "failed to resolve DNS", "addr", d.addr, "err", "no addresses")
return
}
level.Debug(d.logger).Log("msg", "resolved DNS", "addr", d.addr, "addrs", fmt.Sprintf("%+v", addrs))

servers := make([]Server, 0, len(addrs))
for _, peer := range addrs {
servers = append(servers, Server{
Raft: raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(peer),
Address: raft.ServerAddress(peer),
},
})
}
d.m.Lock()
defer d.m.Unlock()
d.resolved = servers
if d.upd != nil {
d.upd.Servers(servers)
}
}
2 changes: 1 addition & 1 deletion pkg/experiment/metastore/discovery/kuberesolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type KubeDiscovery struct {
upd Updates
}

func (g *KubeDiscovery) ServerError(srv Server) {
func (g *KubeDiscovery) Rediscover() {

}

Expand Down
24 changes: 15 additions & 9 deletions pkg/experiment/metastore/discovery/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,29 @@ package discovery

import (
"github.com/go-kit/log"
"github.com/grafana/dskit/dns"
kuberesolver2 "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery/kuberesolver"
"github.com/hashicorp/raft"
"github.com/prometheus/client_golang/prometheus"
"net"
"strings"
)

func NewDiscovery(l log.Logger, address string) (Discovery, error) {

kubeClient, err := kuberesolver2.NewInClusterK8sClient()
if err != nil {
return nil, err
}

if strings.HasPrefix(address, "dns:///_grpc._tcp.") {
address = strings.Replace(address, "dns:///_grpc._tcp.", "kubernetes:///", 1) // todo support dns discovery
func NewDiscovery(l log.Logger, address string, reg prometheus.Registerer) (Discovery, error) {
if strings.HasPrefix(address, "dnssrvnoa+") {
p := dns.NewProvider(l,
prometheus.WrapRegistererWithPrefix(
"pyroscope_metastore_client_",
reg,
),
dns.MiekgdnsResolverType)
return NewDNSDiscovery(l, address, p), nil
}
if strings.HasPrefix(address, "kubernetes:///") {
kubeClient, err := kuberesolver2.NewInClusterK8sClient()
if err != nil {
return nil, err
}
return NewKubeResolverDiscovery(l, address, kubeClient)
}
peers := ParsePeers(address)
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/metastore/discovery/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (s *StaticDiscovery) Subscribe(updates Updates) {
updates.Servers(s.servers)
}

func (s *StaticDiscovery) ServerError(srv Server) {
func (s *StaticDiscovery) Rediscover() {
}

func (s *StaticDiscovery) Close() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/metastore/test/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func MockStaticDiscovery(t *testing.T, servers []discovery.Server) *mockdiscover
upd := args.Get(0).(discovery.Updates)
upd.Servers(servers)
})
d.On("ServerError", mock.Anything).Return()
d.On("Rediscover", mock.Anything).Return()
d.On("Close").Return(nil)
return d
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/phlare/modules_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (f *Phlare) initMetastoreClient() (services.Service, error) {
return nil, err
}

disc, err := discovery.NewDiscovery(f.logger, f.Cfg.Metastore.Address)
disc, err := discovery.NewDiscovery(f.logger, f.Cfg.Metastore.Address, f.reg)
if err != nil {
return nil, fmt.Errorf("failed to create discovery: %w %s", err, f.Cfg.Metastore.Address)
}
Expand Down
25 changes: 12 additions & 13 deletions pkg/test/mocks/mockdiscovery/mock_discovery.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 81ab235

Please sign in to comment.