Skip to content

Commit

Permalink
chore: simplify DNSUpstreamController and DNSUpstream resource
Browse files Browse the repository at this point in the history
This PR does those things:
- Fixes race condition where controller could potentially modify upstream, while other controller is copying its internals to the slice.
- Simplifies `run` function in `DNSUpstreamController` by removing all `Idx` handling.
- Removes `Idx` field from `DNSUpstream`. Upstreams are now sorted by their id with №X prefix.
- `Proxy` Stop is now called from the finalizer. In combination with iterators, this ensures that we only stop upstream when it's fully unreachable.

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
DmitriyMV committed Oct 24, 2024
1 parent 62d1854 commit a13cf76
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 111 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ require (
github.com/siderolabs/crypto v0.5.0
github.com/siderolabs/discovery-api v0.1.4
github.com/siderolabs/discovery-client v0.1.10
github.com/siderolabs/gen v0.5.0
github.com/siderolabs/gen v0.6.1
github.com/siderolabs/go-api-signature v0.3.6
github.com/siderolabs/go-blockdevice v0.4.8
github.com/siderolabs/go-blockdevice/v2 v2.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,8 @@ github.com/siderolabs/discovery-api v0.1.4 h1:2fMEFSMiWaD1zDiBDY5md8VxItvL1rDQRS
github.com/siderolabs/discovery-api v0.1.4/go.mod h1:kaBy+G42v2xd/uAF/NIe383sjNTBE2AhxPTyi9SZI0s=
github.com/siderolabs/discovery-client v0.1.10 h1:bTAvFLiISSzVXyYL1cIgAz8cPYd9ZfvhxwdebgtxARA=
github.com/siderolabs/discovery-client v0.1.10/go.mod h1:Ew1z07eyJwqNwum84IKYH4S649KEKK5WUmRW49HlXS8=
github.com/siderolabs/gen v0.5.0 h1:Afdjx+zuZDf53eH5DB+E+T2JeCwBXGinV66A6osLgQI=
github.com/siderolabs/gen v0.5.0/go.mod h1:1GUMBNliW98Xeq8GPQeVMYqQE09LFItE8enR3wgMh3Q=
github.com/siderolabs/gen v0.6.1 h1:Mex6Q41Tlw3e+4cGvlju2x4UwULD5WMo/D82n7IxV0Y=
github.com/siderolabs/gen v0.6.1/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
github.com/siderolabs/go-api-signature v0.3.6 h1:wDIsXbpl7Oa/FXvxB6uz4VL9INA9fmr3EbmjEZYFJrU=
github.com/siderolabs/go-api-signature v0.3.6/go.mod h1:hoH13AfunHflxbXfh+NoploqV13ZTDfQ1mQJWNVSW9U=
github.com/siderolabs/go-blockdevice v0.4.8 h1:KfdWvIx0Jft5YVuCsFIJFwjWEF1oqtzkgX9PeU9cX4c=
Expand Down
2 changes: 1 addition & 1 deletion hack/cloud-image-uploader/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/blang/semver/v4 v4.0.0
github.com/google/uuid v1.6.0
github.com/klauspost/compress v1.17.11
github.com/siderolabs/gen v0.5.0
github.com/siderolabs/gen v0.6.1
github.com/siderolabs/go-retry v0.3.3
github.com/spf13/pflag v1.0.5
golang.org/x/sync v0.8.0
Expand Down
4 changes: 2 additions & 2 deletions hack/cloud-image-uploader/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/siderolabs/gen v0.5.0 h1:Afdjx+zuZDf53eH5DB+E+T2JeCwBXGinV66A6osLgQI=
github.com/siderolabs/gen v0.5.0/go.mod h1:1GUMBNliW98Xeq8GPQeVMYqQE09LFItE8enR3wgMh3Q=
github.com/siderolabs/gen v0.6.1 h1:Mex6Q41Tlw3e+4cGvlju2x4UwULD5WMo/D82n7IxV0Y=
github.com/siderolabs/gen v0.6.1/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
github.com/siderolabs/go-retry v0.3.3 h1:zKV+S1vumtO72E6sYsLlmIdV/G/GcYSBLiEx/c9oCEg=
github.com/siderolabs/go-retry v0.3.3/go.mod h1:Ff/VGc7v7un4uQg3DybgrmOWHEmJ8BzZds/XNn/BqMI=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
Expand Down
2 changes: 1 addition & 1 deletion hack/docgen/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/invopop/jsonschema v0.12.0
github.com/microcosm-cc/bluemonday v1.0.27
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/siderolabs/gen v0.5.0
github.com/siderolabs/gen v0.6.1
github.com/wk8/go-ordered-map/v2 v2.1.8
gopkg.in/yaml.v3 v3.0.1
mvdan.cc/gofumpt v0.7.0
Expand Down
4 changes: 2 additions & 2 deletions hack/docgen/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
github.com/siderolabs/gen v0.5.0 h1:Afdjx+zuZDf53eH5DB+E+T2JeCwBXGinV66A6osLgQI=
github.com/siderolabs/gen v0.5.0/go.mod h1:1GUMBNliW98Xeq8GPQeVMYqQE09LFItE8enR3wgMh3Q=
github.com/siderolabs/gen v0.6.1 h1:Mex6Q41Tlw3e+4cGvlju2x4UwULD5WMo/D82n7IxV0Y=
github.com/siderolabs/gen v0.6.1/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/unix4ever/yaml v0.0.0-20220527175918-f17b0f05cf2c h1:Vn6nVVu9MdOYvXPkJP83iX5jVIfvxFC9v9xIKb+DlaQ=
Expand Down
35 changes: 21 additions & 14 deletions internal/app/machined/pkg/controllers/network/dns_resolve_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package network

import (
"cmp"
"context"
"errors"
"fmt"
Expand All @@ -23,7 +22,9 @@ import (
dnssrv "github.com/miekg/dns"
"github.com/siderolabs/gen/optional"
"github.com/siderolabs/gen/pair"
"github.com/siderolabs/gen/xiter"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/siderolabs/talos/internal/pkg/dns"
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
Expand Down Expand Up @@ -161,10 +162,15 @@ func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Run
return fmt.Errorf("error getting resolver status: %w", err)
}

prxs, addrs := SortedProxies(upstreams)
prxs := xiter.Map(
upstreams.All(),
// We are using iterator here to preserve finalizer on
func(upstream *network.DNSUpstream) *proxy.Proxy {
return upstream.TypedSpec().Value.Conn.Proxy().(*proxy.Proxy)
})

if ctrl.handler.SetProxy(prxs) {
ctrl.Logger.Info("updated dns server nameservers", zap.Strings("addrs", addrs))
ctrl.Logger.Info("updated dns server nameservers", zap.Array("addrs", addrsArr(upstreams)))
}

if err = safe.CleanupOutputs[*network.DNSResolveCache](ctx, r); err != nil {
Expand All @@ -173,17 +179,6 @@ func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Run
}
}

// SortedProxies returns sorted list of proxies and their addresses.
func SortedProxies(upstreams safe.List[*network.DNSUpstream]) ([]*proxy.Proxy, []string) {
upstreams.SortFunc(func(a, b *network.DNSUpstream) int {
return cmp.Compare(a.TypedSpec().Value.Idx, b.TypedSpec().Value.Idx)
})

//nolint:forcetypeassert
return safe.ToSlice(upstreams, func(d *network.DNSUpstream) *proxy.Proxy { return d.TypedSpec().Value.Prx.(*proxy.Proxy) }),
safe.ToSlice(upstreams, func(d *network.DNSUpstream) string { return d.TypedSpec().Value.Prx.Addr() })
}

func (ctrl *DNSResolveCacheController) writeDNSStatus(ctx context.Context, r controller.Runtime, config runnerConfig) error {
return safe.WriterModify(ctx, r, network.NewDNSResolveCache(fmt.Sprintf("%s-%s", config.net, config.addr)), func(drc *network.DNSResolveCache) error {
drc.TypedSpec().Status = "running"
Expand Down Expand Up @@ -353,3 +348,15 @@ func fqdnMatch(what, where string) bool {

return what == first
}

type addrsArr safe.List[*network.DNSUpstream]

func (a addrsArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
list := safe.List[*network.DNSUpstream](a)

for u := range list.All() {
encoder.AppendString(u.TypedSpec().Value.Conn.Addr())
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"errors"
"net"
"net/netip"
"slices"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -306,23 +308,37 @@ func (suite *DNSUpstreams) TestOrder() {
{"1.1.1.1", "8.8.8.8", "1.0.0.1", "8.0.0.8"},
{"192.168.0.1"},
} {
resolverSpec.TypedSpec().DNSServers = xslices.Map(addrs, netip.MustParseAddr)
if !suite.Run(strings.Join(addrs, ","), func() {
resolverSpec.TypedSpec().DNSServers = xslices.Map(addrs, netip.MustParseAddr)

switch i {
case 0:
suite.Require().NoError(suite.State().Create(suite.Ctx(), resolverSpec))
default:
suite.Require().NoError(suite.State().Update(suite.Ctx(), resolverSpec))
}
switch i {
case 0:
suite.Require().NoError(suite.State().Create(suite.Ctx(), resolverSpec))
default:
suite.Require().NoError(suite.State().Update(suite.Ctx(), resolverSpec))
}

expected := xslices.Map(addrs, func(t string) string { return t + ":53" })

rtestutils.AssertLength[*network.DNSUpstream](suite.Ctx(), suite.T(), suite.State(), len(addrs))

rtestutils.AssertLength[*network.DNSUpstream](suite.Ctx(), suite.T(), suite.State(), len(addrs))
var actual []string

upstreams, err := safe.ReaderListAll[*network.DNSUpstream](suite.Ctx(), suite.State())
suite.Require().NoError(err)
defer func() { suite.Require().Equal(expected, actual) }()

_, upstreamAddrs := netctrl.SortedProxies(upstreams)
for suite.Ctx().Err() == nil {
upstreams, err := safe.ReaderListAll[*network.DNSUpstream](suite.Ctx(), suite.State())
suite.Require().NoError(err)

suite.Require().Equal(xslices.Map(addrs, func(t string) string { return t + ":53" }), upstreamAddrs)
actual = safe.ToSlice(upstreams, func(u *network.DNSUpstream) string { return u.TypedSpec().Value.Conn.Addr() })

if slices.Equal(expected, actual) {
break
}
}
}) {
break
}
}
}

Expand Down
80 changes: 50 additions & 30 deletions internal/app/machined/pkg/controllers/network/dns_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package network

import (
"context"
"fmt"
"net"
"time"

"github.com/coredns/coredns/plugin/pkg/proxy"
"github.com/cosi-project/runtime/pkg/controller"
Expand Down Expand Up @@ -58,7 +58,7 @@ func (ctrl *DNSUpstreamController) Outputs() []controller.Output {

// Run implements controller.Controller interface.
func (ctrl *DNSUpstreamController) Run(ctx context.Context, r controller.Runtime, l *zap.Logger) error {
defer ctrl.cleanupUpstream(context.Background(), r, nil, l)
defer cleanupUpstream(context.Background(), r, nil, l)

for {
select {
Expand All @@ -78,7 +78,7 @@ func (ctrl *DNSUpstreamController) Run(ctx context.Context, r controller.Runtime
func (ctrl *DNSUpstreamController) run(ctx context.Context, r controller.Runtime, l *zap.Logger) error {
touchedIDs := map[resource.ID]struct{}{}

defer ctrl.cleanupUpstream(ctx, r, touchedIDs, l)
defer cleanupUpstream(ctx, r, touchedIDs, l)

cfg, err := safe.ReaderGetByID[*network.HostDNSConfig](ctx, r, network.HostDNSConfigID)
if err != nil {
Expand All @@ -103,36 +103,22 @@ func (ctrl *DNSUpstreamController) run(ctx context.Context, r controller.Runtime
return err
}

for i, s := range rs.TypedSpec().DNSServers {
remoteAddr := s.String()
initConn, err := existingConnections(ctx, r)
if err != nil {
return err
}

for i, srv := range rs.TypedSpec().DNSServers {
remoteHost := srv.String()

if err = safe.WriterModify[*network.DNSUpstream](
ctx,
r,
network.NewDNSUpstream(remoteAddr),
network.NewDNSUpstream(fmt.Sprintf("#%03d %s", i, remoteHost)),
func(u *network.DNSUpstream) error {
touchedIDs[u.Metadata().ID()] = struct{}{}

if u.TypedSpec().Value.Prx != nil {
// Found upstream, update index
if u.TypedSpec().Value.Idx != i {
old := u.TypedSpec().Value.Idx
u.TypedSpec().Value.Idx = i

l.Info("updated dns upstream idx", zap.String("addr", remoteAddr), zap.Int("was", old), zap.Int("now", i))
}

return nil
}

prx := proxy.NewProxy(remoteAddr, net.JoinHostPort(remoteAddr, "53"), "dns")

prx.Start(500 * time.Millisecond)

u.TypedSpec().Value.Prx = prx
u.TypedSpec().Value.Idx = i

l.Info("created dns upstream", zap.String("addr", remoteAddr), zap.Int("idx", i))
initConn(&u.TypedSpec().Value, remoteHost, l)

return nil
},
Expand All @@ -144,7 +130,43 @@ func (ctrl *DNSUpstreamController) run(ctx context.Context, r controller.Runtime
return nil
}

func (ctrl *DNSUpstreamController) cleanupUpstream(ctx context.Context, r controller.Runtime, touchedIDs map[resource.ID]struct{}, l *zap.Logger) {
func existingConnections(ctx context.Context, r controller.Runtime) (func(*network.DNSUpstreamSpecSpec, string, *zap.Logger), error) {
upstream, err := safe.ReaderListAll[*network.DNSUpstream](ctx, r)
if err != nil {
return nil, err
}

existingConn := make(map[string]*network.DNSConn, upstream.Len())

for u := range upstream.All() {
existingConn[u.TypedSpec().Value.Conn.Addr()] = u.TypedSpec().Value.Conn
}

return func(spec *network.DNSUpstreamSpecSpec, remoteHost string, l *zap.Logger) {
remoteAddr := net.JoinHostPort(remoteHost, "53")
if spec.Conn != nil && spec.Conn.Addr() == remoteAddr {
l.Debug("reusing existing upstream spec", zap.String("addr", remoteAddr))

return
}

if conn, ok := existingConn[remoteAddr]; ok {
spec.Conn = conn

l.Debug("reusing existing upstream connection", zap.String("addr", remoteAddr))

return
}

spec.Conn = network.NewDNSConn(proxy.NewProxy(remoteHost, remoteAddr, "dns"), l)

l.Debug("created new upstream connection", zap.String("addr", remoteAddr))

existingConn[remoteAddr] = spec.Conn
}, nil
}

func cleanupUpstream(ctx context.Context, r controller.Runtime, touchedIDs map[resource.ID]struct{}, l *zap.Logger) {
list, err := safe.ReaderListAll[*network.DNSUpstream](ctx, r)
if err != nil {
l.Error("error listing upstreams", zap.Error(err))
Expand All @@ -156,15 +178,13 @@ func (ctrl *DNSUpstreamController) cleanupUpstream(ctx context.Context, r contro
md := val.Metadata()

if _, ok := touchedIDs[md.ID()]; !ok {
val.TypedSpec().Value.Prx.Stop()

if err = r.Destroy(ctx, md); err != nil {
l.Error("error destroying upstream", zap.Error(err), zap.String("id", md.ID()))

return
}

l.Info("destroyed dns upstream", zap.String("addr", md.ID()))
l.Debug("destroyed dns upstream", zap.String("addr", md.ID()))
}
}
}
Loading

0 comments on commit a13cf76

Please sign in to comment.