Skip to content

Commit

Permalink
feat: add host dns support for resolving member addrs
Browse files Browse the repository at this point in the history
Closes siderolabs#8330

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
DmitriyMV committed Apr 17, 2024
1 parent 89040ce commit 8b61991
Show file tree
Hide file tree
Showing 10 changed files with 822 additions and 567 deletions.
1 change: 1 addition & 0 deletions api/resource/definitions/network/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ message HostDNSConfigSpec {
bool enabled = 1;
repeated common.NetIPPort listen_addresses = 2;
common.NetIP service_host_dns_address = 3;
bool resolve_member_names = 4;
}

// HostnameSpecSpec describes node hostname.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net"
"net/netip"
"strings"
"sync"
"time"

Expand All @@ -22,15 +23,18 @@ import (
"go.uber.org/zap"

"github.com/siderolabs/talos/internal/pkg/dns"
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
)

// DNSResolveCacheController starts dns server on both udp and tcp ports based on finalized network configuration.
type DNSResolveCacheController struct {
State state.State
Logger *zap.Logger

mx sync.Mutex
handler *dns.Handler
nodeHandler *dns.NodeHandler
cache *dns.Cache
runners map[runnerConfig]pair.Pair[func(), <-chan struct{}]
reconcile chan struct{}
Expand Down Expand Up @@ -115,6 +119,8 @@ func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Run
continue
}

ctrl.nodeHandler.SetEnabled(cfg.TypedSpec().ResolveMemberNames)

touchedRunners := make(map[runnerConfig]struct{}, len(ctrl.runners))

for _, addr := range cfg.TypedSpec().ListenAddresses {
Expand Down Expand Up @@ -191,7 +197,8 @@ func (ctrl *DNSResolveCacheController) init(ctx context.Context) {

ctrl.originalCtx = ctx
ctrl.handler = dns.NewHandler(ctrl.Logger)
ctrl.cache = dns.NewCache(ctrl.handler, ctrl.Logger)
ctrl.nodeHandler = dns.NewNodeHandler(ctrl.handler, &stateMapper{state: ctrl.State}, ctrl.Logger)
ctrl.cache = dns.NewCache(ctrl.nodeHandler, ctrl.Logger)
ctrl.runners = map[runnerConfig]pair.Pair[func(), <-chan struct{}]{}
ctrl.reconcile = make(chan struct{}, 1)

Expand Down Expand Up @@ -288,3 +295,24 @@ func newDNSRunner(cfg runnerConfig, cache *dns.Cache, logger *zap.Logger) (*dns.

return dns.NewServer(serverOpts), nil
}

type stateMapper struct {
state state.State
}

func (s *stateMapper) ResolveAddr(ctx context.Context, name string) (netip.Addr, bool) {
name = strings.TrimRight(name, ".")

res, err := safe.ReaderGetByID[*cluster.Member](ctx, s.state, name)
if err != nil {
return netip.Addr{}, false
}

result := res.TypedSpec().Addresses

if len(result) == 0 {
return netip.Addr{}, false
}

return result[0], true
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest"
netctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/network"
"github.com/siderolabs/talos/pkg/machinery/config/machine"
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
)

Expand Down Expand Up @@ -141,6 +143,66 @@ func (suite *DNSServer) TestSetupStartStop() {
rtestutils.AssertLength[*network.DNSUpstream](suite.Ctx(), suite.T(), suite.State(), len(dnsSlice))
}

func (suite *DNSServer) TestResolveMembers() {
port := must.Value(getDynamicPort())(suite.T())

const id = "talos-default-controlplane-1"

member := cluster.NewMember(cluster.NamespaceName, id)
*member.TypedSpec() = cluster.MemberSpec{
NodeID: id,
Addresses: []netip.Addr{
netip.MustParseAddr("172.20.0.2"),
},
Hostname: id,
MachineType: machine.TypeControlPlane,
OperatingSystem: "Talos dev",
ControlPlane: nil,
}

suite.Require().NoError(suite.State().Create(suite.Ctx(), member))

cfg := network.NewHostDNSConfig(network.HostDNSConfigID)
cfg.TypedSpec().Enabled = true
cfg.TypedSpec().ListenAddresses = makeAddrs(port)
cfg.TypedSpec().ResolveMemberNames = true
suite.Require().NoError(suite.State().Create(suite.Ctx(), cfg))

rtestutils.AssertResources(suite.Ctx(), suite.T(), suite.State(),
expectedDNSRunners(port),
func(r *network.DNSResolveCache, assert *assert.Assertions) {
assert.Equal("running", r.TypedSpec().Status)
},
)

suite.Require().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
exchange, err := dns.Exchange(
&dns.Msg{
MsgHdr: dns.MsgHdr{Id: dns.Id(), RecursionDesired: true},
Question: []dns.Question{
{Name: dns.Fqdn(id), Qtype: dns.TypeA, Qclass: dns.ClassINET},
},
},
"127.0.0.53:"+port,
)
if err != nil {
return retry.ExpectedError(err)
}

if exchange.Rcode != dns.RcodeSuccess {
return retry.ExpectedErrorf("expected rcode %d, got %d for %q", dns.RcodeSuccess, exchange.Rcode, id)
}

proper := dns.Fqdn(id)

if exchange.Answer[0].Header().Name != proper {
return retry.ExpectedErrorf("expected answer name %q, got %q", proper, exchange.Answer[0].Header().Name)
}

return nil
}))
}

func TestDNSServer(t *testing.T) {
suite.Run(t, &DNSServer{
DefaultSuite: ctest.DefaultSuite{
Expand All @@ -149,6 +211,7 @@ func TestDNSServer(t *testing.T) {
suite.Require().NoError(suite.Runtime().RegisterController(&netctrl.DNSUpstreamController{}))
suite.Require().NoError(suite.Runtime().RegisterController(&netctrl.DNSResolveCacheController{
Logger: zaptest.NewLogger(t),
State: suite.State(),
}))
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (ctrl *HostDNSConfigController) Run(ctx context.Context, r controller.Runti
}

res.TypedSpec().Enabled = cfgProvider.Machine().Features().HostDNS().Enabled()
res.TypedSpec().ResolveMemberNames = cfgProvider.Machine().Features().HostDNS().ResolveMemberNames()

if cfgProvider.Machine().Features().HostDNS().ForwardKubeDNSToHost() {
serviceCIDRStr := cfgProvider.Cluster().Network().ServiceCIDRs()[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (ctrl *Controller) Run(ctx context.Context, drainer *runtime.Drainer) error
&network.AddressStatusController{},
&network.DeviceConfigController{},
&network.DNSResolveCacheController{
State: ctrl.v1alpha1Runtime.State().V1Alpha2().Resources(),
Logger: dnsCacheLogger,
},
&network.DNSUpstreamController{},
Expand Down
85 changes: 82 additions & 3 deletions internal/pkg/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"math/rand/v2"
"net"
"net/netip"
"slices"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -127,7 +129,7 @@ func (h *Handler) ServeDNS(ctx context.Context, wrt dns.ResponseWriter, msg *dns

h.logger.Warn("dns response didn't match", zap.Stringer("data", resp))

return 0, nil
return dns.RcodeFormatError, nil
}

err = wrt.WriteMsg(resp)
Expand All @@ -138,7 +140,7 @@ func (h *Handler) ServeDNS(ctx context.Context, wrt dns.ResponseWriter, msg *dns

h.logger.Debug("dns response", zap.Stringer("data", resp))

return 0, nil
return dns.RcodeSuccess, nil
}

// SetProxy sets destination dns proxy servers.
Expand All @@ -158,6 +160,83 @@ func (h *Handler) SetProxy(prxs []*proxy.Proxy) bool {
// Stop stops and clears dns proxy selector.
func (h *Handler) Stop() { h.SetProxy(nil) }

// NewNodeHandler creates a new NodeHandler.
func NewNodeHandler(next plugin.Handler, hostMapper HostMapper, logger *zap.Logger) *NodeHandler {
return &NodeHandler{next: next, mapper: hostMapper, logger: logger}
}

// HostMapper is a name to node mapper.
type HostMapper interface {
ResolveAddr(ctx context.Context, name string) (netip.Addr, bool)
}

// NodeHandler try to resolve dns request to a node. If required node is not found, it will move to the next handler.
type NodeHandler struct {
next plugin.Handler
mapper HostMapper
logger *zap.Logger

enabled atomic.Bool
}

// Name implements plugin.Handler.
func (h *NodeHandler) Name() string {
return "NodeHandler"
}

// ServeDNS implements plugin.Handler.
func (h *NodeHandler) ServeDNS(ctx context.Context, wrt dns.ResponseWriter, msg *dns.Msg) (int, error) {
if !h.enabled.Load() {
return h.next.ServeDNS(ctx, wrt, msg)
}

idx := slices.IndexFunc(msg.Question, func(q dns.Question) bool { return q.Qtype == dns.TypeA || q.Qtype == dns.TypeAAAA })
if idx == -1 {
return h.next.ServeDNS(ctx, wrt, msg)
}

req := request.Request{W: wrt, Req: msg}

// Check if the request is for a node.
result, ok := h.mapper.ResolveAddr(ctx, req.Name())
if !ok {
return h.next.ServeDNS(ctx, wrt, msg)
}

var rtype uint16

if result.Is4() {
rtype = dns.TypeA
} else {
rtype = dns.TypeAAAA
}

resp := new(dns.Msg).SetReply(req.Req)
resp.Authoritative = true
resp.Answer = append(resp.Answer, &dns.A{
Hdr: dns.RR_Header{
Name: req.Name(),
Rrtype: rtype,
Class: dns.ClassINET,
Ttl: 10,
},
A: result.AsSlice(),
})

err := wrt.WriteMsg(resp)
if err != nil {
// We can't do much here, but at least log the error.
h.logger.Warn("error writing dns response in node handler", zap.Error(err))
}

return dns.RcodeSuccess, nil
}

// SetEnabled sets the handler enabled state.
func (h *NodeHandler) SetEnabled(enabled bool) {
h.enabled.Store(enabled)
}

// ServerOptions is a Server options.
type ServerOptions struct {
Listener net.Listener
Expand Down
Loading

0 comments on commit 8b61991

Please sign in to comment.