From 071f044562dd247dd54584d7b9fa0bb24d6f7599 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Tue, 11 May 2021 23:41:45 +0300 Subject: [PATCH] feat: implement AddressSpec handling This includes multiple controllers responsible for different stages of `AddressSpec` conversion: * `AddressConfigController` produces initial unmerged configuration from multiple sources (more sources coming later, e.g. DHCP) * `AddressMergeController` merges address configuration into final representation * `AddressSpecController` syncs resources with kernel state Signed-off-by: Andrey Smirnov --- .../pkg/controllers/network/address_config.go | 343 ++++++++++++++++++ .../network/address_config_test.go | 241 ++++++++++++ .../pkg/controllers/network/address_merge.go | 125 +++++++ .../controllers/network/address_merge_test.go | 196 ++++++++++ .../pkg/controllers/network/address_spec.go | 245 +++++++++++++ .../controllers/network/address_spec_test.go | 229 ++++++++++++ .../pkg/controllers/network/address_status.go | 39 +- .../pkg/controllers/network/link_status.go | 87 +---- .../pkg/controllers/network/watch/ethtool.go | 83 +++++ .../controllers/network/watch/rtnetlink.go | 60 +++ .../pkg/controllers/network/watch/watch.go | 11 + .../pkg/runtime/v1alpha1/v1alpha1_runtime.go | 7 + .../runtime/v1alpha2/v1alpha2_controller.go | 6 + .../pkg/runtime/v1alpha2/v1alpha2_state.go | 33 +- pkg/resources/network/address_spec.go | 83 +++++ pkg/resources/network/configlayer.go | 24 ++ pkg/resources/network/configlayer_string.go | 27 ++ pkg/resources/network/network.go | 10 + pkg/resources/network/network_test.go | 1 + 19 files changed, 1721 insertions(+), 129 deletions(-) create mode 100644 internal/app/machined/pkg/controllers/network/address_config.go create mode 100644 internal/app/machined/pkg/controllers/network/address_config_test.go create mode 100644 internal/app/machined/pkg/controllers/network/address_merge.go create mode 100644 internal/app/machined/pkg/controllers/network/address_merge_test.go create mode 100644 internal/app/machined/pkg/controllers/network/address_spec.go create mode 100644 internal/app/machined/pkg/controllers/network/address_spec_test.go create mode 100644 internal/app/machined/pkg/controllers/network/watch/ethtool.go create mode 100644 internal/app/machined/pkg/controllers/network/watch/rtnetlink.go create mode 100644 internal/app/machined/pkg/controllers/network/watch/watch.go create mode 100644 pkg/resources/network/address_spec.go create mode 100644 pkg/resources/network/configlayer.go create mode 100644 pkg/resources/network/configlayer_string.go diff --git a/internal/app/machined/pkg/controllers/network/address_config.go b/internal/app/machined/pkg/controllers/network/address_config.go new file mode 100644 index 0000000000..e82254f067 --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/address_config.go @@ -0,0 +1,343 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network + +import ( + "context" + "fmt" + "log" + "net" + "sort" + "strings" + + "github.com/AlekSi/pointer" + "github.com/cosi-project/runtime/pkg/controller" + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/state" + "github.com/talos-systems/go-procfs/procfs" + "inet.af/netaddr" + + talosconfig "github.com/talos-systems/talos/pkg/machinery/config" + "github.com/talos-systems/talos/pkg/resources/config" + "github.com/talos-systems/talos/pkg/resources/network" + "github.com/talos-systems/talos/pkg/resources/network/nethelpers" +) + +// AddressConfigController manages network.AddressSpec based on machine configuration, kernel cmdline and some built-in defaults. +type AddressConfigController struct { + Cmdline *procfs.Cmdline +} + +// Name implements controller.Controller interface. +func (ctrl *AddressConfigController) Name() string { + return "network.AddressConfigController" +} + +// Inputs implements controller.Controller interface. +func (ctrl *AddressConfigController) Inputs() []controller.Input { + return []controller.Input{ + { + Namespace: config.NamespaceName, + Type: config.MachineConfigType, + ID: pointer.ToString(config.V1Alpha1ID), + Kind: controller.InputWeak, + }, + } +} + +// Outputs implements controller.Controller interface. +func (ctrl *AddressConfigController) Outputs() []controller.Output { + return []controller.Output{ + { + Type: network.AddressSpecType, + Kind: controller.OutputShared, + }, + } +} + +// Run implements controller.Controller interface. +// +//nolint: gocyclo, cyclop +func (ctrl *AddressConfigController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error { + // apply defaults for the loopback interface once + defaultTouchedIDs, err := ctrl.apply(ctx, r, ctrl.loopbackDefaults()) + if err != nil { + return fmt.Errorf("error generating loopback interface defaults: %w", err) + } + + for { + select { + case <-ctx.Done(): + return nil + case <-r.EventCh(): + } + + touchedIDs := make(map[resource.ID]struct{}) + + for _, id := range defaultTouchedIDs { + touchedIDs[id] = struct{}{} + } + + var cfgProvider talosconfig.Provider + + cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineConfigType, config.V1Alpha1ID, resource.VersionUndefined)) + if err != nil { + if !state.IsNotFoundError(err) { + return fmt.Errorf("error getting config: %w", err) + } + } else { + cfgProvider = cfg.(*config.MachineConfig).Config() + } + + ignoredInterfaces := map[string]struct{}{} + + if cfgProvider != nil { + for _, device := range cfgProvider.Machine().Network().Devices() { + if device.Ignore() { + ignoredInterfaces[device.Interface()] = struct{}{} + } + } + } + + // parse kernel cmdline for the address + cmdlineAddress := ctrl.parseCmdline(logger) + if !cmdlineAddress.Address.IsZero() { + if _, ignored := ignoredInterfaces[cmdlineAddress.LinkName]; !ignored { + var ids []string + + ids, err = ctrl.apply(ctx, r, []network.AddressSpecSpec{cmdlineAddress}) + if err != nil { + return fmt.Errorf("error applying cmdline address: %w", err) + } + + for _, id := range ids { + touchedIDs[id] = struct{}{} + } + } + } + + // parse machine configuration for static addresses + if cfgProvider != nil { + addresses := ctrl.parseMachineConfiguration(logger, cfgProvider) + + var ids []string + + ids, err = ctrl.apply(ctx, r, addresses) + if err != nil { + return fmt.Errorf("error applying machine configuration address: %w", err) + } + + for _, id := range ids { + touchedIDs[id] = struct{}{} + } + } + + // list addresses for cleanup + list, err := r.List(ctx, resource.NewMetadata(network.ConfigNamespaceName, network.AddressSpecType, "", resource.VersionUndefined)) + if err != nil { + return fmt.Errorf("error listing resources: %w", err) + } + + for _, res := range list.Items { + if _, ok := touchedIDs[res.Metadata().ID()]; !ok { + if err = r.Destroy(ctx, res.Metadata()); err != nil { + return fmt.Errorf("error cleaning up addresses: %w", err) + } + } + } + } +} + +func (ctrl *AddressConfigController) apply(ctx context.Context, r controller.Runtime, addresses []network.AddressSpecSpec) ([]resource.ID, error) { + ids := make([]string, 0, len(addresses)) + + for _, address := range addresses { + address := address + id := network.LayeredID(address.Layer, network.AddressID(address.LinkName, address.Address)) + + if err := r.Modify( + ctx, + network.NewAddressSpec(network.ConfigNamespaceName, id), + func(r resource.Resource) error { + *r.(*network.AddressSpec).Status() = address + + return nil + }, + ); err != nil { + return ids, err + } + + ids = append(ids, id) + } + + return ids, nil +} + +func (ctrl *AddressConfigController) loopbackDefaults() []network.AddressSpecSpec { + return []network.AddressSpecSpec{ + { + Address: netaddr.IPPrefix{ + IP: netaddr.IPv4(127, 0, 0, 1), + Bits: 8, + }, + Family: nethelpers.FamilyInet4, + Scope: nethelpers.ScopeHost, + Flags: nethelpers.AddressFlags(nethelpers.AddressPermanent), + LinkName: "lo", + Layer: network.ConfigDefault, + }, + { + Address: netaddr.IPPrefix{ + IP: netaddr.IPFrom16([16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}), + Bits: 128, + }, + Family: nethelpers.FamilyInet6, + Scope: nethelpers.ScopeHost, + Flags: nethelpers.AddressFlags(nethelpers.AddressPermanent), + LinkName: "lo", + Layer: network.ConfigDefault, + }, + } +} + +//nolint: gocyclo +func (ctrl *AddressConfigController) parseCmdline(logger *log.Logger) (address network.AddressSpecSpec) { + if ctrl.Cmdline == nil { + return + } + + cmdline := ctrl.Cmdline.Get("ip").First() + if cmdline == nil { + return + } + + // https://www.kernel.org/doc/Documentation/filesystems/nfs/nfsroot.txt + // ip=::::::::: + fields := strings.Split(*cmdline, ":") + + // If dhcp is specified, we'll handle it as a normal discovered + // interface + if len(fields) == 1 && fields[0] == "dhcp" { + return + } + + var err error + + address.Address.IP, err = netaddr.ParseIP(fields[0]) + if err != nil { + logger.Printf("ignoring cmdline address parse failure: %s", err) + + return + } + + if len(fields) >= 4 { + netmask, err := netaddr.ParseIP(fields[3]) + if err != nil { + logger.Printf("ignoring cmdline netmask parse failure: %s", err) + + return + } + + ones, _ := net.IPMask(netmask.IPAddr().IP).Size() + + address.Address.Bits = uint8(ones) + } else { + // default is to have complete address masked + address.Address.Bits = address.Address.IP.BitLen() + } + + if address.Address.IP.Is6() { + address.Family = nethelpers.FamilyInet6 + } else { + address.Family = nethelpers.FamilyInet4 + } + + address.Scope = nethelpers.ScopeGlobal + address.Flags = nethelpers.AddressFlags(nethelpers.AddressPermanent) + + address.Layer = network.ConfigCmdline + + if len(fields) >= 6 { + address.LinkName = fields[5] + } else { + ifaces, _ := net.Interfaces() //nolint: errcheck // ignoring error here as ifaces will be empty + + sort.Slice(ifaces, func(i, j int) bool { return ifaces[i].Name < ifaces[j].Name }) + + for _, iface := range ifaces { + if iface.Flags&net.FlagLoopback != 0 { + continue + } + + address.LinkName = iface.Name + + break + } + } + + return address +} + +func (ctrl *AddressConfigController) parseMachineConfiguration(logger *log.Logger, cfgProvider talosconfig.Provider) (addresses []network.AddressSpecSpec) { + for _, device := range cfgProvider.Machine().Network().Devices() { + if device.Ignore() { + continue + } + + if device.CIDR() != "" { + ipPrefix, err := netaddr.ParseIPPrefix(device.CIDR()) + if err != nil { + logger.Printf("skipping address %q on interface %q: %s", device.CIDR(), device.Interface(), err) + + continue + } + + address := network.AddressSpecSpec{ + Address: ipPrefix, + Scope: nethelpers.ScopeGlobal, + LinkName: device.Interface(), + Layer: network.ConfigMachineConfiguration, + Flags: nethelpers.AddressFlags(nethelpers.AddressPermanent), + } + + if address.Address.IP.Is6() { + address.Family = nethelpers.FamilyInet6 + } else { + address.Family = nethelpers.FamilyInet4 + } + + addresses = append(addresses, address) + } + + for _, vlan := range device.Vlans() { + if vlan.CIDR() != "" { + ipPrefix, err := netaddr.ParseIPPrefix(vlan.CIDR()) + if err != nil { + logger.Printf("skipping address %q on interface %q vlan %d: %s", device.CIDR(), device.Interface(), vlan.ID(), err) + + continue + } + + address := network.AddressSpecSpec{ + Address: ipPrefix, + Scope: nethelpers.ScopeGlobal, + LinkName: fmt.Sprintf("%s.%d", device.Interface(), vlan.ID()), + Layer: network.ConfigMachineConfiguration, + Flags: nethelpers.AddressFlags(nethelpers.AddressPermanent), + } + + if address.Address.IP.Is6() { + address.Family = nethelpers.FamilyInet6 + } else { + address.Family = nethelpers.FamilyInet4 + } + + addresses = append(addresses, address) + } + } + } + + return addresses +} diff --git a/internal/app/machined/pkg/controllers/network/address_config_test.go b/internal/app/machined/pkg/controllers/network/address_config_test.go new file mode 100644 index 0000000000..5af53188d3 --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/address_config_test.go @@ -0,0 +1,241 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//nolint:dupl +package network_test + +import ( + "context" + "fmt" + "log" + "net" + "net/url" + "sort" + "sync" + "testing" + "time" + + "github.com/cosi-project/runtime/pkg/controller/runtime" + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/state" + "github.com/cosi-project/runtime/pkg/state/impl/inmem" + "github.com/cosi-project/runtime/pkg/state/impl/namespaced" + "github.com/stretchr/testify/suite" + "github.com/talos-systems/go-procfs/procfs" + "github.com/talos-systems/go-retry/retry" + + netctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network" + "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1" + "github.com/talos-systems/talos/pkg/resources/config" + "github.com/talos-systems/talos/pkg/resources/network" + "github.com/talos-systems/talos/pkg/resources/network/nethelpers" +) + +type AddressConfigSuite struct { + suite.Suite + + state state.State + + runtime *runtime.Runtime + wg sync.WaitGroup + + ctx context.Context + ctxCancel context.CancelFunc +} + +func (suite *AddressConfigSuite) SetupTest() { + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute) + + suite.state = state.WrapCore(namespaced.NewState(inmem.Build)) + + var err error + + logger := log.New(log.Writer(), "controller-runtime: ", log.Flags()) + + suite.runtime, err = runtime.NewRuntime(suite.state, logger) + suite.Require().NoError(err) +} + +func (suite *AddressConfigSuite) startRuntime() { + suite.wg.Add(1) + + go func() { + defer suite.wg.Done() + + suite.Assert().NoError(suite.runtime.Run(suite.ctx)) + }() +} + +func (suite *AddressConfigSuite) assertAddresses(requiredIDs []string, check func(*network.AddressSpec) error) error { + missingIDs := make(map[string]struct{}, len(requiredIDs)) + + for _, id := range requiredIDs { + missingIDs[id] = struct{}{} + } + + resources, err := suite.state.List(suite.ctx, resource.NewMetadata(network.ConfigNamespaceName, network.AddressSpecType, "", resource.VersionUndefined)) + if err != nil { + return retry.UnexpectedError(err) + } + + for _, res := range resources.Items { + _, required := missingIDs[res.Metadata().ID()] + if !required { + continue + } + + delete(missingIDs, res.Metadata().ID()) + + if err = check(res.(*network.AddressSpec)); err != nil { + return retry.ExpectedError(err) + } + } + + if len(missingIDs) > 0 { + return retry.ExpectedError(fmt.Errorf("some resources are missing: %q", missingIDs)) + } + + return nil +} + +func (suite *AddressConfigSuite) TestLoopback() { + suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressConfigController{})) + + suite.startRuntime() + + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertAddresses([]string{ + "default/lo/127.0.0.1/8", + "default/lo/::1/128", + }, func(r *network.AddressSpec) error { + suite.Assert().Equal("lo", r.Status().LinkName) + suite.Assert().Equal(nethelpers.ScopeHost, r.Status().Scope) + + return nil + }) + })) +} + +func (suite *AddressConfigSuite) TestCmdline() { + suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressConfigController{ + Cmdline: procfs.NewCmdline("ip=172.20.0.2::172.20.0.1:255.255.255.0::eth1:::::"), + })) + + suite.startRuntime() + + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertAddresses([]string{ + "cmdline/eth1/172.20.0.2/24", + }, func(r *network.AddressSpec) error { + suite.Assert().Equal("eth1", r.Status().LinkName) + + return nil + }) + })) +} + +func (suite *AddressConfigSuite) TestCmdlineNoNetmask() { + suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressConfigController{ + Cmdline: procfs.NewCmdline("ip=172.20.0.2::172.20.0.1"), + })) + + suite.startRuntime() + + ifaces, _ := net.Interfaces() //nolint: errcheck // ignoring error here as ifaces will be empty + + sort.Slice(ifaces, func(i, j int) bool { return ifaces[i].Name < ifaces[j].Name }) + + ifaceName := "" + + for _, iface := range ifaces { + if iface.Flags&net.FlagLoopback != 0 { + continue + } + + ifaceName = iface.Name + + break + } + + suite.Assert().NotEmpty(ifaceName) + + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertAddresses([]string{ + fmt.Sprintf("cmdline/%s/172.20.0.2/32", ifaceName), + }, func(r *network.AddressSpec) error { + suite.Assert().Equal(ifaceName, r.Status().LinkName) + suite.Assert().Equal(network.ConfigCmdline, r.Status().Layer) + + return nil + }) + })) +} + +func (suite *AddressConfigSuite) TestMachineConfiguration() { + suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressConfigController{})) + + suite.startRuntime() + + u, err := url.Parse("https://foo:6443") + suite.Require().NoError(err) + + cfg := config.NewMachineConfig(&v1alpha1.Config{ + ConfigVersion: "v1alpha1", + MachineConfig: &v1alpha1.MachineConfig{ + MachineNetwork: &v1alpha1.NetworkConfig{ + NetworkInterfaces: []*v1alpha1.Device{ + { + DeviceInterface: "eth3", + DeviceCIDR: "192.168.0.24/28", + }, + { + DeviceIgnore: true, + DeviceInterface: "eth4", + DeviceCIDR: "192.168.0.24/28", + }, + { + DeviceInterface: "eth2", + DeviceCIDR: "2001:470:6d:30e:8ed2:b60c:9d2f:803a/64", + }, + { + DeviceInterface: "eth0", + DeviceVlans: []*v1alpha1.Vlan{ + { + VlanID: 24, + VlanCIDR: "10.0.0.1/8", + }, + }, + }, + }, + }, + }, + ClusterConfig: &v1alpha1.ClusterConfig{ + ControlPlane: &v1alpha1.ControlPlaneConfig{ + Endpoint: &v1alpha1.Endpoint{ + URL: u, + }, + }, + }, + }) + + suite.Require().NoError(suite.state.Create(suite.ctx, cfg)) + + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertAddresses([]string{ + "configuration/eth2/2001:470:6d:30e:8ed2:b60c:9d2f:803a/64", + "configuration/eth3/192.168.0.24/28", + "configuration/eth0.24/10.0.0.1/8", + }, func(r *network.AddressSpec) error { + return nil + }) + })) +} + +func TestAddressConfigSuite(t *testing.T) { + suite.Run(t, new(AddressConfigSuite)) +} diff --git a/internal/app/machined/pkg/controllers/network/address_merge.go b/internal/app/machined/pkg/controllers/network/address_merge.go new file mode 100644 index 0000000000..37898bd683 --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/address_merge.go @@ -0,0 +1,125 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network + +import ( + "context" + "fmt" + "log" + + "github.com/cosi-project/runtime/pkg/controller" + "github.com/cosi-project/runtime/pkg/resource" + + "github.com/talos-systems/talos/pkg/resources/network" +) + +// AddressMergeController merges network.AddressSpec in network.ConfigNamespace and produces final network.AddressSpec in network.Namespace. +type AddressMergeController struct{} + +// Name implements controller.Controller interface. +func (ctrl *AddressMergeController) Name() string { + return "network.AddressMergeController" +} + +// Inputs implements controller.Controller interface. +func (ctrl *AddressMergeController) Inputs() []controller.Input { + return []controller.Input{ + { + Namespace: network.ConfigNamespaceName, + Type: network.AddressSpecType, + Kind: controller.InputWeak, + }, + // TODO: temporary hack to make controller watch its outputs to facilitate proper teardown sequence + // should be fixed in the runtime library to automatically support notifications on finalizer change + // on outputs + { + Namespace: network.NamespaceName, + Type: network.AddressSpecType, + Kind: controller.InputWeak, + }, + } +} + +// Outputs implements controller.Controller interface. +func (ctrl *AddressMergeController) Outputs() []controller.Output { + return []controller.Output{ + { + Type: network.AddressSpecType, + Kind: controller.OutputShared, + }, + } +} + +// Run implements controller.Controller interface. +// +//nolint: gocyclo +func (ctrl *AddressMergeController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error { + for { + select { + case <-ctx.Done(): + return nil + case <-r.EventCh(): + } + + // list source network configuration resources + list, err := r.List(ctx, resource.NewMetadata(network.ConfigNamespaceName, network.AddressSpecType, "", resource.VersionUndefined)) + if err != nil { + return fmt.Errorf("error listing source network addresses: %w", err) + } + + // address is allowed as long as it's not duplicate, for duplicate higher layer takes precedence + addresses := map[string]*network.AddressSpec{} + + for _, res := range list.Items { + address := res.(*network.AddressSpec) //nolint:errcheck,forcetypeassert + id := network.AddressID(address.Status().LinkName, address.Status().Address) + + existing, ok := addresses[id] + if ok && existing.Status().Layer > address.Status().Layer { + // skip this address, as existing one is higher layer + continue + } + + addresses[id] = address + } + + for id, address := range addresses { + address := address + + if err = r.Modify(ctx, network.NewAddressSpec(network.NamespaceName, id), func(res resource.Resource) error { + addr := res.(*network.AddressSpec) //nolint:errcheck,forcetypeassert + + *addr.Status() = *address.Status() + + return nil + }); err != nil { + return fmt.Errorf("error updating resource: %w", err) + } + } + + // list addresses for cleanup + list, err = r.List(ctx, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "", resource.VersionUndefined)) + if err != nil { + return fmt.Errorf("error listing resources: %w", err) + } + + for _, res := range list.Items { + if _, ok := addresses[res.Metadata().ID()]; !ok { + var okToDestroy bool + + okToDestroy, err = r.Teardown(ctx, res.Metadata()) + if err != nil { + return fmt.Errorf("error cleaning up addresses: %w", err) + } + + if okToDestroy { + if err = r.Destroy(ctx, res.Metadata()); err != nil { + return fmt.Errorf("error cleaning up addresses: %w", err) + } + } + } + } + } +} diff --git a/internal/app/machined/pkg/controllers/network/address_merge_test.go b/internal/app/machined/pkg/controllers/network/address_merge_test.go new file mode 100644 index 0000000000..a7da7c2e71 --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/address_merge_test.go @@ -0,0 +1,196 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//nolint:dupl +package network_test + +import ( + "context" + "fmt" + "log" + "sync" + "testing" + "time" + + "github.com/cosi-project/runtime/pkg/controller/runtime" + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/state" + "github.com/cosi-project/runtime/pkg/state/impl/inmem" + "github.com/cosi-project/runtime/pkg/state/impl/namespaced" + "github.com/stretchr/testify/suite" + "github.com/talos-systems/go-retry/retry" + "inet.af/netaddr" + + netctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network" + "github.com/talos-systems/talos/pkg/resources/network" + "github.com/talos-systems/talos/pkg/resources/network/nethelpers" +) + +type AddressMergeSuite struct { + suite.Suite + + state state.State + + runtime *runtime.Runtime + wg sync.WaitGroup + + ctx context.Context + ctxCancel context.CancelFunc +} + +func (suite *AddressMergeSuite) SetupTest() { + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute) + + suite.state = state.WrapCore(namespaced.NewState(inmem.Build)) + + var err error + + logger := log.New(log.Writer(), "controller-runtime: ", log.Flags()) + + suite.runtime, err = runtime.NewRuntime(suite.state, logger) + suite.Require().NoError(err) + + suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressMergeController{})) + + suite.startRuntime() +} + +func (suite *AddressMergeSuite) startRuntime() { + suite.wg.Add(1) + + go func() { + defer suite.wg.Done() + + suite.Assert().NoError(suite.runtime.Run(suite.ctx)) + }() +} + +func (suite *AddressMergeSuite) assertAddresses(requiredIDs []string, check func(*network.AddressSpec) error) error { + missingIDs := make(map[string]struct{}, len(requiredIDs)) + + for _, id := range requiredIDs { + missingIDs[id] = struct{}{} + } + + resources, err := suite.state.List(suite.ctx, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "", resource.VersionUndefined)) + if err != nil { + return retry.UnexpectedError(err) + } + + for _, res := range resources.Items { + _, required := missingIDs[res.Metadata().ID()] + if !required { + continue + } + + delete(missingIDs, res.Metadata().ID()) + + if err = check(res.(*network.AddressSpec)); err != nil { + return retry.ExpectedError(err) + } + } + + if len(missingIDs) > 0 { + return retry.ExpectedError(fmt.Errorf("some resources are missing: %q", missingIDs)) + } + + return nil +} + +func (suite *AddressMergeSuite) assertNoAddress(id string) error { + resources, err := suite.state.List(suite.ctx, resource.NewMetadata(network.NamespaceName, network.AddressStatusType, "", resource.VersionUndefined)) + if err != nil { + return retry.UnexpectedError(err) + } + + for _, res := range resources.Items { + if res.Metadata().ID() == id { + return retry.ExpectedError(fmt.Errorf("address %q is still there", id)) + } + } + + return nil +} + +func (suite *AddressMergeSuite) TestMerge() { + loopback := network.NewAddressSpec(network.ConfigNamespaceName, "default/lo/127.0.0.1/8") + *loopback.Status() = network.AddressSpecSpec{ + Address: netaddr.MustParseIPPrefix("127.0.0.1/8"), + LinkName: "lo", + Family: nethelpers.FamilyInet4, + Scope: nethelpers.ScopeHost, + Layer: network.ConfigDefault, + } + + dhcp := network.NewAddressSpec(network.ConfigNamespaceName, "dhcp/eth0/10.0.0.1/8") + *dhcp.Status() = network.AddressSpecSpec{ + Address: netaddr.MustParseIPPrefix("10.0.0.1/8"), + LinkName: "eth0", + Family: nethelpers.FamilyInet4, + Scope: nethelpers.ScopeGlobal, + Layer: network.ConfigDHCP, + } + + static := network.NewAddressSpec(network.ConfigNamespaceName, "configuration/eth0/10.0.0.35/32") + *static.Status() = network.AddressSpecSpec{ + Address: netaddr.MustParseIPPrefix("10.0.0.35/32"), + LinkName: "eth0", + Family: nethelpers.FamilyInet4, + Scope: nethelpers.ScopeGlobal, + Layer: network.ConfigMachineConfiguration, + } + + override := network.NewAddressSpec(network.ConfigNamespaceName, "configuration/eth0/10.0.0.1/8") + *override.Status() = network.AddressSpecSpec{ + Address: netaddr.MustParseIPPrefix("10.0.0.1/8"), + LinkName: "eth0", + Family: nethelpers.FamilyInet4, + Scope: nethelpers.ScopeHost, + Layer: network.ConfigMachineConfiguration, + } + + for _, res := range []resource.Resource{loopback, dhcp, static, override} { + suite.Require().NoError(suite.state.Create(suite.ctx, res), "%v", res.Spec()) + } + + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertAddresses([]string{ + "lo/127.0.0.1/8", + "eth0/10.0.0.1/8", + "eth0/10.0.0.35/32", + }, func(r *network.AddressSpec) error { + switch r.Metadata().ID() { + case "lo/127.0.0.1/8": + suite.Assert().Equal(*loopback.Status(), *r.Status()) + case "eth0/10.0.0.1/8": + suite.Assert().Equal(*override.Status(), *r.Status()) + case "eth0/10.0.0.35/32": + suite.Assert().Equal(*static.Status(), *r.Status()) + } + + return nil + }) + })) + + suite.Require().NoError(suite.state.Destroy(suite.ctx, static.Metadata())) + + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertAddresses([]string{ + "lo/127.0.0.1/8", + "eth0/10.0.0.35/32", + }, func(r *network.AddressSpec) error { + return nil + }) + })) + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertNoAddress("eth0/10.0.0.35/32") + })) +} + +func TestAddressMergeSuite(t *testing.T) { + suite.Run(t, new(AddressMergeSuite)) +} diff --git a/internal/app/machined/pkg/controllers/network/address_spec.go b/internal/app/machined/pkg/controllers/network/address_spec.go new file mode 100644 index 0000000000..0b2c8a2eed --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/address_spec.go @@ -0,0 +1,245 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network + +import ( + "context" + "fmt" + "log" + "net" + + "github.com/cosi-project/runtime/pkg/controller" + "github.com/cosi-project/runtime/pkg/resource" + "github.com/jsimonetti/rtnetlink" + "golang.org/x/sys/unix" + "inet.af/netaddr" + + "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network/watch" + "github.com/talos-systems/talos/pkg/resources/network" +) + +// AddressSpecController applies network.AddressSpec to the actual interfaces. +type AddressSpecController struct{} + +// Name implements controller.Controller interface. +func (ctrl *AddressSpecController) Name() string { + return "network.AddressSpecController" +} + +// Inputs implements controller.Controller interface. +func (ctrl *AddressSpecController) Inputs() []controller.Input { + return []controller.Input{ + { + Namespace: network.NamespaceName, + Type: network.AddressSpecType, + Kind: controller.InputStrong, + }, + } +} + +// Outputs implements controller.Controller interface. +func (ctrl *AddressSpecController) Outputs() []controller.Output { + return nil +} + +// Run implements controller.Controller interface. +// +//nolint: gocyclo +func (ctrl *AddressSpecController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error { + watchCh := make(chan struct{}) + + // watch link changes as some address might need to be re-applied if the link appears + watcher, err := watch.NewRtNetlink(ctx, watchCh, unix.RTMGRP_LINK) + if err != nil { + return err + } + + defer watcher.Done() + + conn, err := rtnetlink.Dial(nil) + if err != nil { + return fmt.Errorf("error dialing rtnetlink socket: %w", err) + } + + defer conn.Close() //nolint:errcheck + + for { + select { + case <-ctx.Done(): + return nil + case <-r.EventCh(): + case <-watchCh: + } + + // list source network configuration resources + list, err := r.List(ctx, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "", resource.VersionUndefined)) + if err != nil { + return fmt.Errorf("error listing source network addresses: %w", err) + } + + // add finalizers for all live resources + for _, res := range list.Items { + if res.Metadata().Phase() != resource.PhaseRunning { + continue + } + + if err = r.AddFinalizer(ctx, res.Metadata(), ctrl.Name()); err != nil { + return fmt.Errorf("error adding finalizer: %w", err) + } + } + + // list rtnetlink links (interfaces) + links, err := conn.Link.List() + if err != nil { + return fmt.Errorf("error listing links: %w", err) + } + + // list rtnetlink addresses + addrs, err := conn.Address.List() + if err != nil { + return fmt.Errorf("error listing addresses: %w", err) + } + + // loop over addresses and make reconcile decision + for _, res := range list.Items { + address := res.(*network.AddressSpec) //nolint:forcetypeassert,errcheck + + if err = ctrl.syncAddress(ctx, r, logger, conn, links, addrs, address); err != nil { + return err + } + } + } +} + +func resolveLinkName(links []rtnetlink.LinkMessage, linkName string) uint32 { + for _, link := range links { + if link.Attributes.Name == linkName { + return link.Index + } + } + + return 0 +} + +func findAddress(addrs []rtnetlink.AddressMessage, linkIndex uint32, ipPrefix netaddr.IPPrefix) *rtnetlink.AddressMessage { + for i, addr := range addrs { + if addr.Index != linkIndex { + continue + } + + if addr.PrefixLength != ipPrefix.Bits { + continue + } + + if !addr.Attributes.Address.Equal(ipPrefix.IP.IPAddr().IP) { + continue + } + + return &addrs[i] + } + + return nil +} + +//nolint:gocyclo +func (ctrl *AddressSpecController) syncAddress(ctx context.Context, r controller.Runtime, logger *log.Logger, conn *rtnetlink.Conn, + links []rtnetlink.LinkMessage, addrs []rtnetlink.AddressMessage, address *network.AddressSpec) error { + linkIndex := resolveLinkName(links, address.Status().LinkName) + + switch address.Metadata().Phase() { + case resource.PhaseTearingDown: + if linkIndex == 0 { + // address should be deleted, but link is gone, so assume address is gone + if err := r.RemoveFinalizer(ctx, address.Metadata(), ctrl.Name()); err != nil { + return fmt.Errorf("error removing finalizer: %w", err) + } + + return nil + } + + if existing := findAddress(addrs, linkIndex, address.Status().Address); existing != nil { + // delete address + if err := conn.Address.Delete(existing); err != nil { + return fmt.Errorf("error removing address: %w", err) + } + + logger.Printf("removed address %s from %q", address.Status().Address, address.Status().LinkName) + } + + // now remove finalizer as address was deleted + if err := r.RemoveFinalizer(ctx, address.Metadata(), ctrl.Name()); err != nil { + return fmt.Errorf("error removing finalizer: %w", err) + } + case resource.PhaseRunning: + if linkIndex == 0 { + // address can't be assigned as link doesn't exist (yet), skip it + return nil + } + + if existing := findAddress(addrs, linkIndex, address.Status().Address); existing != nil { + // check if existing matches the spec: if it does, skip update + if existing.Scope == uint8(address.Status().Scope) && existing.Flags == uint8(address.Status().Flags) && + existing.Attributes.Flags == uint32(address.Status().Flags) { + return nil + } + + // delete address to get new one assigned below + if err := conn.Address.Delete(existing); err != nil { + return fmt.Errorf("error removing address: %w", err) + } + + logger.Printf("removed address %s from %q", address.Status().Address, address.Status().LinkName) + } + + // add address + if err := conn.Address.New(&rtnetlink.AddressMessage{ + Family: uint8(address.Status().Family), + PrefixLength: address.Status().Address.Bits, + Flags: uint8(address.Status().Flags), + Scope: uint8(address.Status().Scope), + Index: linkIndex, + Attributes: rtnetlink.AddressAttributes{ + Address: address.Status().Address.IP.IPAddr().IP, + Local: address.Status().Address.IP.IPAddr().IP, + Broadcast: broadcastAddr(address.Status().Address), + Flags: uint32(address.Status().Flags), + }, + }); err != nil { + return fmt.Errorf("error adding address: %w", err) + } + + logger.Printf("assigned address %s to %q", address.Status().Address, address.Status().LinkName) + } + + return nil +} + +func broadcastAddr(addr netaddr.IPPrefix) net.IP { + if !addr.IP.Is4() { + return nil + } + + ipnet := addr.IPNet() + + ip := ipnet.IP.To4() + if ip == nil { + return nil + } + + mask := net.IP(ipnet.Mask).To4() + + n := len(ip) + if n != len(mask) { + return nil + } + + out := make(net.IP, n) + + for i := 0; i < n; i++ { + out[i] = ip[i] | ^mask[i] + } + + return out +} diff --git a/internal/app/machined/pkg/controllers/network/address_spec_test.go b/internal/app/machined/pkg/controllers/network/address_spec_test.go new file mode 100644 index 0000000000..ef4f898bd9 --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/address_spec_test.go @@ -0,0 +1,229 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//nolint:dupl +package network_test + +import ( + "context" + "fmt" + "log" + "net" + "sync" + "testing" + "time" + + "github.com/cosi-project/runtime/pkg/controller/runtime" + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/state" + "github.com/cosi-project/runtime/pkg/state/impl/inmem" + "github.com/cosi-project/runtime/pkg/state/impl/namespaced" + "github.com/jsimonetti/rtnetlink" + "github.com/stretchr/testify/suite" + "github.com/talos-systems/go-retry/retry" + "golang.org/x/sys/unix" + "inet.af/netaddr" + + netctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network" + "github.com/talos-systems/talos/pkg/resources/network" + "github.com/talos-systems/talos/pkg/resources/network/nethelpers" +) + +type AddressSpecSuite struct { + suite.Suite + + state state.State + + runtime *runtime.Runtime + wg sync.WaitGroup + + ctx context.Context + ctxCancel context.CancelFunc +} + +func (suite *AddressSpecSuite) SetupTest() { + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute) + + suite.state = state.WrapCore(namespaced.NewState(inmem.Build)) + + var err error + + logger := log.New(log.Writer(), "controller-runtime: ", log.Flags()) + + suite.runtime, err = runtime.NewRuntime(suite.state, logger) + suite.Require().NoError(err) + + suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressSpecController{})) + + suite.startRuntime() +} + +func (suite *AddressSpecSuite) startRuntime() { + suite.wg.Add(1) + + go func() { + defer suite.wg.Done() + + suite.Assert().NoError(suite.runtime.Run(suite.ctx)) + }() +} + +func (suite *AddressSpecSuite) assertLinkAddress(linkName, address string) error { + addr := netaddr.MustParseIPPrefix(address) + + iface, err := net.InterfaceByName(linkName) + suite.Require().NoError(err) + + conn, err := rtnetlink.Dial(nil) + suite.Require().NoError(err) + + defer conn.Close() //nolint: errcheck + + linkAddresses, err := conn.Address.List() + suite.Require().NoError(err) + + for _, linkAddress := range linkAddresses { + if linkAddress.Index != uint32(iface.Index) { + continue + } + + if linkAddress.PrefixLength != addr.Bits { + continue + } + + if !linkAddress.Attributes.Address.Equal(addr.IP.IPAddr().IP) { + continue + } + + return nil + } + + return retry.ExpectedError(fmt.Errorf("address %s not found on %q", addr, linkName)) +} + +func (suite *AddressSpecSuite) assertNoLinkAddress(linkName, address string) error { + addr := netaddr.MustParseIPPrefix(address) + + iface, err := net.InterfaceByName(linkName) + suite.Require().NoError(err) + + conn, err := rtnetlink.Dial(nil) + suite.Require().NoError(err) + + defer conn.Close() //nolint: errcheck + + linkAddresses, err := conn.Address.List() + suite.Require().NoError(err) + + for _, linkAddress := range linkAddresses { + if linkAddress.Index == uint32(iface.Index) && linkAddress.PrefixLength == addr.Bits && linkAddress.Attributes.Address.Equal(addr.IP.IPAddr().IP) { + return retry.ExpectedError(fmt.Errorf("address %s is assigned to %q", addr, linkName)) + } + } + + return nil +} + +func (suite *AddressSpecSuite) TestLoopback() { + loopback := network.NewAddressSpec(network.NamespaceName, "lo/127.0.0.1/8") + *loopback.Status() = network.AddressSpecSpec{ + Address: netaddr.MustParseIPPrefix("127.11.0.1/32"), + LinkName: "lo", + Family: nethelpers.FamilyInet4, + Scope: nethelpers.ScopeHost, + Layer: network.ConfigDefault, + Flags: nethelpers.AddressFlags(nethelpers.AddressPermanent), + } + + for _, res := range []resource.Resource{loopback} { + suite.Require().NoError(suite.state.Create(suite.ctx, res), "%v", res.Spec()) + } + + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertLinkAddress("lo", "127.11.0.1/32") + })) + + // teardown the address + for { + ready, err := suite.state.Teardown(suite.ctx, loopback.Metadata()) + suite.Require().NoError(err) + + if ready { + break + } + + time.Sleep(100 * time.Millisecond) + } + + // torn down address should be removed immediately + suite.Assert().NoError(suite.assertNoLinkAddress("lo", "127.11.0.1/32")) + + suite.Require().NoError(suite.state.Destroy(suite.ctx, loopback.Metadata())) +} + +func (suite *AddressSpecSuite) TestDummy() { + const dummyInterface = "dummy9" + + conn, err := rtnetlink.Dial(nil) + suite.Require().NoError(err) + + defer conn.Close() //nolint:errcheck + + dummy := network.NewAddressSpec(network.NamespaceName, "dummy/10.0.0.1/8") + *dummy.Status() = network.AddressSpecSpec{ + Address: netaddr.MustParseIPPrefix("10.0.0.1/8"), + LinkName: dummyInterface, + Family: nethelpers.FamilyInet4, + Scope: nethelpers.ScopeGlobal, + Layer: network.ConfigDefault, + Flags: nethelpers.AddressFlags(nethelpers.AddressPermanent), + } + + // it's fine to create the address before the interface is actually created + for _, res := range []resource.Resource{dummy} { + suite.Require().NoError(suite.state.Create(suite.ctx, res), "%v", res.Spec()) + } + + // create dummy interface + suite.Require().NoError(conn.Link.New(&rtnetlink.LinkMessage{ + Type: unix.ARPHRD_ETHER, + Attributes: &rtnetlink.LinkAttributes{ + Name: dummyInterface, + MTU: 1400, + Info: &rtnetlink.LinkInfo{ + Kind: "dummy", + }, + }, + })) + + iface, err := net.InterfaceByName(dummyInterface) + suite.Require().NoError(err) + + defer conn.Link.Delete(uint32(iface.Index)) //nolint: errcheck + + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + func() error { + return suite.assertLinkAddress(dummyInterface, "10.0.0.1/8") + })) + + // delete dummy interface, address should be unassigned automatically + suite.Require().NoError(conn.Link.Delete(uint32(iface.Index))) + + // teardown the address + for { + ready, err := suite.state.Teardown(suite.ctx, dummy.Metadata()) + suite.Require().NoError(err) + + if ready { + break + } + + time.Sleep(100 * time.Millisecond) + } +} + +func TestAddressSpecSuite(t *testing.T) { + suite.Run(t, new(AddressSpecSuite)) +} diff --git a/internal/app/machined/pkg/controllers/network/address_status.go b/internal/app/machined/pkg/controllers/network/address_status.go index 2ccaa7c8a5..b9e5c8d7e6 100644 --- a/internal/app/machined/pkg/controllers/network/address_status.go +++ b/internal/app/machined/pkg/controllers/network/address_status.go @@ -8,15 +8,14 @@ import ( "context" "fmt" "log" - "sync" "github.com/cosi-project/runtime/pkg/controller" "github.com/cosi-project/runtime/pkg/resource" "github.com/jsimonetti/rtnetlink" - "github.com/mdlayher/netlink" "golang.org/x/sys/unix" "inet.af/netaddr" + "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network/watch" "github.com/talos-systems/talos/pkg/resources/network" "github.com/talos-systems/talos/pkg/resources/network/nethelpers" ) @@ -48,40 +47,14 @@ func (ctrl *AddressStatusController) Outputs() []controller.Output { // //nolint:gocyclo func (ctrl *AddressStatusController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error { - watchConn, err := rtnetlink.Dial(&netlink.Config{ - Groups: unix.RTMGRP_LINK | unix.RTMGRP_IPV4_IFADDR | unix.RTMGRP_IPV6_IFADDR, - }) - if err != nil { - return fmt.Errorf("error dialing watch socket: %w", err) - } - - var wg sync.WaitGroup - - wg.Add(1) - watchCh := make(chan struct{}) - go func() { - defer wg.Done() - - for { - _, _, watchErr := watchConn.Receive() - if watchErr != nil { - return - } - - select { - case watchCh <- struct{}{}: - case <-ctx.Done(): - return - } - } - }() - - defer wg.Wait() + watcher, err := watch.NewRtNetlink(ctx, watchCh, unix.RTMGRP_LINK|unix.RTMGRP_IPV4_IFADDR|unix.RTMGRP_IPV6_IFADDR) + if err != nil { + return err + } - // close the watchConn first to abort the goroutine above on early exit - defer watchConn.Close() //nolint:errcheck + defer watcher.Done() conn, err := rtnetlink.Dial(nil) if err != nil { diff --git a/internal/app/machined/pkg/controllers/network/link_status.go b/internal/app/machined/pkg/controllers/network/link_status.go index 3ca7272a89..e958def8ec 100644 --- a/internal/app/machined/pkg/controllers/network/link_status.go +++ b/internal/app/machined/pkg/controllers/network/link_status.go @@ -10,16 +10,14 @@ import ( "fmt" "log" "os" - "sync" "github.com/cosi-project/runtime/pkg/controller" "github.com/cosi-project/runtime/pkg/resource" "github.com/jsimonetti/rtnetlink" "github.com/mdlayher/ethtool" - "github.com/mdlayher/genetlink" - "github.com/mdlayher/netlink" "golang.org/x/sys/unix" + "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network/watch" "github.com/talos-systems/talos/pkg/resources/network" "github.com/talos-systems/talos/pkg/resources/network/nethelpers" ) @@ -48,92 +46,25 @@ func (ctrl *LinkStatusController) Outputs() []controller.Output { } // Run implements controller.Controller interface. -// -//nolint:gocyclo,cyclop func (ctrl *LinkStatusController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error { // create watch connections to rtnetlink and ethtool via genetlink // these connections are used only to join multicast groups and receive notifications on changes // other connections are used to send requests and receive responses, as we can't mix the notifications and request/responses - watchRtnetlink, err := rtnetlink.Dial(&netlink.Config{ - Groups: unix.RTMGRP_LINK, - }) - if err != nil { - return fmt.Errorf("error dialing watch socket: %w", err) - } - - watchEthool, err := genetlink.Dial(nil) - if err != nil { - return fmt.Errorf("error dialing ethtool watch socket: %w", err) - } + watchCh := make(chan struct{}) - ethFamily, err := watchEthool.GetFamily(unix.ETHTOOL_GENL_NAME) + rtnetlinkWatcher, err := watch.NewRtNetlink(ctx, watchCh, unix.RTMGRP_LINK) if err != nil { - return fmt.Errorf("error getting family information for ethtool: %w", err) + return err } - var monitorID uint32 + defer rtnetlinkWatcher.Done() - for _, g := range ethFamily.Groups { - if g.Name == unix.ETHTOOL_MCGRP_MONITOR_NAME { - monitorID = g.ID - - break - } - } - - if monitorID == 0 { - return fmt.Errorf("could not find monitor multicast group ID for ethtool") - } - - if err = watchEthool.JoinGroup(monitorID); err != nil { - return fmt.Errorf("error joing multicast group for ethtool: %w", err) + ethtoolWatcher, err := watch.NewEthtool(ctx, watchCh) + if err != nil { + return err } - watchCh := make(chan struct{}) - - var wg sync.WaitGroup - - wg.Add(2) - - go func() { - defer wg.Done() - - for { - _, _, watchErr := watchRtnetlink.Receive() - if watchErr != nil { - return - } - - select { - case watchCh <- struct{}{}: - case <-ctx.Done(): - return - } - } - }() - - go func() { - defer wg.Done() - - for { - _, _, watchErr := watchEthool.Receive() - if watchErr != nil { - return - } - - select { - case watchCh <- struct{}{}: - case <-ctx.Done(): - return - } - } - }() - - defer wg.Wait() - - // close the watch connections first to abort the goroutines above on early exit - defer watchRtnetlink.Close() //nolint:errcheck - defer watchEthool.Close() //nolint:errcheck + defer ethtoolWatcher.Done() conn, err := rtnetlink.Dial(nil) if err != nil { diff --git a/internal/app/machined/pkg/controllers/network/watch/ethtool.go b/internal/app/machined/pkg/controllers/network/watch/ethtool.go new file mode 100644 index 0000000000..d74e77c4f6 --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/watch/ethtool.go @@ -0,0 +1,83 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package watch + +import ( + "context" + "fmt" + "sync" + + "github.com/mdlayher/genetlink" + "golang.org/x/sys/unix" +) + +type ethtoolWatcher struct { + wg sync.WaitGroup + conn *genetlink.Conn +} + +// NewEthtool starts ethtool watch. +// +//nolint: gocyclo +func NewEthtool(ctx context.Context, watchCh chan<- struct{}) (Watcher, error) { + watcher := ðtoolWatcher{} + + var err error + + watcher.conn, err = genetlink.Dial(nil) + if err != nil { + return nil, fmt.Errorf("error dialing ethtool watch socket: %w", err) + } + + ethFamily, err := watcher.conn.GetFamily(unix.ETHTOOL_GENL_NAME) + if err != nil { + return nil, fmt.Errorf("error getting family information for ethtool: %w", err) + } + + var monitorID uint32 + + for _, g := range ethFamily.Groups { + if g.Name == unix.ETHTOOL_MCGRP_MONITOR_NAME { + monitorID = g.ID + + break + } + } + + if monitorID == 0 { + return nil, fmt.Errorf("could not find monitor multicast group ID for ethtool") + } + + if err = watcher.conn.JoinGroup(monitorID); err != nil { + return nil, fmt.Errorf("error joing multicast group for ethtool: %w", err) + } + + watcher.wg.Add(1) + + go func() { + defer watcher.wg.Done() + + for { + _, _, watchErr := watcher.conn.Receive() + if watchErr != nil { + return + } + + select { + case watchCh <- struct{}{}: + case <-ctx.Done(): + return + } + } + }() + + return watcher, nil +} + +func (watcher *ethtoolWatcher) Done() { + watcher.conn.Close() //nolint: errcheck + + watcher.wg.Wait() +} diff --git a/internal/app/machined/pkg/controllers/network/watch/rtnetlink.go b/internal/app/machined/pkg/controllers/network/watch/rtnetlink.go new file mode 100644 index 0000000000..ededb7558f --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/watch/rtnetlink.go @@ -0,0 +1,60 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package watch + +import ( + "context" + "fmt" + "sync" + + "github.com/jsimonetti/rtnetlink" + "github.com/mdlayher/netlink" +) + +type rtnetlinkWatcher struct { + wg sync.WaitGroup + conn *rtnetlink.Conn +} + +// NewRtNetlink starts rtnetlink watch over specified groups. +func NewRtNetlink(ctx context.Context, watchCh chan<- struct{}, groups uint32) (Watcher, error) { + watcher := &rtnetlinkWatcher{} + + var err error + + watcher.conn, err = rtnetlink.Dial(&netlink.Config{ + Groups: groups, + }) + if err != nil { + return nil, fmt.Errorf("error dialing watch socket: %w", err) + } + + watcher.wg.Add(1) + + go func() { + defer watcher.wg.Done() + + for { + _, _, watchErr := watcher.conn.Receive() + if watchErr != nil { + return + } + + select { + case watchCh <- struct{}{}: + case <-ctx.Done(): + return + } + } + }() + + return watcher, nil +} + +func (watcher *rtnetlinkWatcher) Done() { + watcher.conn.Close() //nolint: errcheck + + watcher.wg.Wait() +} diff --git a/internal/app/machined/pkg/controllers/network/watch/watch.go b/internal/app/machined/pkg/controllers/network/watch/watch.go new file mode 100644 index 0000000000..24296670b3 --- /dev/null +++ b/internal/app/machined/pkg/controllers/network/watch/watch.go @@ -0,0 +1,11 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package watch provides netlink watchers via multicast groups. +package watch + +// Watcher interface allows to stop watching. +type Watcher interface { + Done() +} diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_runtime.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_runtime.go index 51ec578f61..a2e20b4613 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_runtime.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_runtime.go @@ -69,6 +69,8 @@ func (r *Runtime) SetConfig(b []byte) error { } // CanApplyImmediate implements the Runtime interface. +// +//nolint:gocyclo func (r *Runtime) CanApplyImmediate(b []byte) error { cfg, err := r.ValidateConfig(b) if err != nil { @@ -100,12 +102,17 @@ func (r *Runtime) CanApplyImmediate(b []byte) error { // the config changes allowed to be applied immediately are: // * cluster config // * .machine.time + // * .machine.network newConfig.ClusterConfig = currentConfig.ClusterConfig if newConfig.MachineConfig != nil && currentConfig.MachineConfig != nil { newConfig.MachineConfig.MachineTime = currentConfig.MachineConfig.MachineTime } + if newConfig.MachineConfig != nil && currentConfig.MachineConfig != nil { + newConfig.MachineConfig.MachineNetwork = currentConfig.MachineConfig.MachineNetwork + } + if !reflect.DeepEqual(currentConfig, newConfig) { diff := cmp.Diff(currentConfig, newConfig) diff --git a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go index 675258eac2..c30f3416c5 100644 --- a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go +++ b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go @@ -10,6 +10,7 @@ import ( "github.com/cosi-project/runtime/pkg/controller" osruntime "github.com/cosi-project/runtime/pkg/controller/runtime" + "github.com/talos-systems/go-procfs/procfs" "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/config" "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/k8s" @@ -64,6 +65,11 @@ func (ctrl *Controller) Run(ctx context.Context) error { &k8s.ManifestController{}, &k8s.ManifestApplyController{}, &k8s.RenderSecretsStaticPodController{}, + &network.AddressConfigController{ + Cmdline: procfs.ProcCmdline(), + }, + &network.AddressMergeController{}, + &network.AddressSpecController{}, &network.AddressStatusController{}, &network.LinkStatusController{}, &secrets.EtcdController{}, diff --git a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go index f64f71779a..06eee64075 100644 --- a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go +++ b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go @@ -49,24 +49,20 @@ func NewState() (*State, error) { } // register Talos namespaces - if err := s.namespaceRegistry.Register(ctx, v1alpha1.NamespaceName, "Talos v1alpha1 subsystems glue resources."); err != nil { - return nil, err - } - - if err := s.namespaceRegistry.Register(ctx, config.NamespaceName, "Talos node configuration."); err != nil { - return nil, err - } - - if err := s.namespaceRegistry.Register(ctx, k8s.ControlPlaneNamespaceName, "Kubernetes control plane resources."); err != nil { - return nil, err - } - - if err := s.namespaceRegistry.Register(ctx, secrets.NamespaceName, "Resources with secret material."); err != nil { - return nil, err - } - - if err := s.namespaceRegistry.Register(ctx, network.NamespaceName, "Networking resources."); err != nil { - return nil, err + for _, ns := range []struct { + name string + description string + }{ + {v1alpha1.NamespaceName, "Talos v1alpha1 subsystems glue resources."}, + {config.NamespaceName, "Talos node configuration."}, + {k8s.ControlPlaneNamespaceName, "Kubernetes control plane resources."}, + {secrets.NamespaceName, "Resources with secret material."}, + {network.NamespaceName, "Networking resources."}, + {network.ConfigNamespaceName, "Networking configuration resources."}, + } { + if err := s.namespaceRegistry.Register(ctx, ns.name, ns.description); err != nil { + return nil, err + } } // register Talos resources @@ -82,6 +78,7 @@ func NewState() (*State, error) { &k8s.StaticPodStatus{}, &k8s.SecretsStatus{}, &network.AddressStatus{}, + &network.AddressSpec{}, &network.LinkStatus{}, &secrets.Etcd{}, &secrets.Kubernetes{}, diff --git a/pkg/resources/network/address_spec.go b/pkg/resources/network/address_spec.go new file mode 100644 index 0000000000..6761d64652 --- /dev/null +++ b/pkg/resources/network/address_spec.go @@ -0,0 +1,83 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network + +import ( + "fmt" + + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/resource/meta" + "inet.af/netaddr" + + "github.com/talos-systems/talos/pkg/resources/network/nethelpers" +) + +// AddressSpecType is type of AddressSpec resource. +const AddressSpecType = resource.Type("AddressSpecs.net.talos.dev") + +// AddressSpec resource holds physical network link status. +type AddressSpec struct { + md resource.Metadata + spec AddressSpecSpec +} + +// AddressSpecSpec describes status of rendered secrets. +type AddressSpecSpec struct { + Address netaddr.IPPrefix `yaml:"address"` + LinkName string `yaml:"linkName"` + Family nethelpers.Family `yaml:"family"` + Scope nethelpers.Scope `yaml:"scope"` + Flags nethelpers.AddressFlags `yaml:"flags"` + Layer ConfigLayer `yaml:"layer"` +} + +// NewAddressSpec initializes a SecretsStatus resource. +func NewAddressSpec(namespace resource.Namespace, id resource.ID) *AddressSpec { + r := &AddressSpec{ + md: resource.NewMetadata(namespace, AddressSpecType, id, resource.VersionUndefined), + spec: AddressSpecSpec{}, + } + + r.md.BumpVersion() + + return r +} + +// Metadata implements resource.Resource. +func (r *AddressSpec) Metadata() *resource.Metadata { + return &r.md +} + +// Spec implements resource.Resource. +func (r *AddressSpec) Spec() interface{} { + return r.spec +} + +func (r *AddressSpec) String() string { + return fmt.Sprintf("network.AddressSpec(%q)", r.md.ID()) +} + +// DeepCopy implements resource.Resource. +func (r *AddressSpec) DeepCopy() resource.Resource { + return &AddressSpec{ + md: r.md, + spec: r.spec, + } +} + +// ResourceDefinition implements meta.ResourceDefinitionProvider interface. +func (r *AddressSpec) ResourceDefinition() meta.ResourceDefinitionSpec { + return meta.ResourceDefinitionSpec{ + Type: AddressSpecType, + Aliases: []resource.Type{}, + DefaultNamespace: NamespaceName, + PrintColumns: []meta.PrintColumn{}, + } +} + +// Status sets pod status. +func (r *AddressSpec) Status() *AddressSpecSpec { + return &r.spec +} diff --git a/pkg/resources/network/configlayer.go b/pkg/resources/network/configlayer.go new file mode 100644 index 0000000000..d9e3455348 --- /dev/null +++ b/pkg/resources/network/configlayer.go @@ -0,0 +1,24 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network + +//go:generate stringer -type=ConfigLayer -linecomment + +// ConfigLayer describes network configuration layers, with lowest priority first. +type ConfigLayer int + +// Configuration layers. +const ( + ConfigDefault ConfigLayer = iota // default + ConfigCmdline // cmdline + ConfigDHCP // dhcp + ConfigPlatform // platform + ConfigMachineConfiguration // configuration +) + +// MarshalYAML implements yaml.Marshaler. +func (layer ConfigLayer) MarshalYAML() (interface{}, error) { + return layer.String(), nil +} diff --git a/pkg/resources/network/configlayer_string.go b/pkg/resources/network/configlayer_string.go new file mode 100644 index 0000000000..9c5b6ed225 --- /dev/null +++ b/pkg/resources/network/configlayer_string.go @@ -0,0 +1,27 @@ +// Code generated by "stringer -type=ConfigLayer -linecomment"; DO NOT EDIT. + +package network + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[ConfigDefault-0] + _ = x[ConfigCmdline-1] + _ = x[ConfigDHCP-2] + _ = x[ConfigPlatform-3] + _ = x[ConfigMachineConfiguration-4] +} + +const _ConfigLayer_name = "defaultcmdlinedhcpplatformconfiguration" + +var _ConfigLayer_index = [...]uint8{0, 7, 14, 18, 26, 39} + +func (i ConfigLayer) String() string { + if i < 0 || i >= ConfigLayer(len(_ConfigLayer_index)-1) { + return "ConfigLayer(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ConfigLayer_name[_ConfigLayer_index[i]:_ConfigLayer_index[i+1]] +} diff --git a/pkg/resources/network/network.go b/pkg/resources/network/network.go index 678ed2f367..b3efc15155 100644 --- a/pkg/resources/network/network.go +++ b/pkg/resources/network/network.go @@ -15,7 +15,17 @@ import ( // NamespaceName contains resources related to networking. const NamespaceName resource.Namespace = "network" +// ConfigNamespaceName contains umerged resources related to networking generate from the configuration. +// +// Resources in the ConfigNamespaceName namespace are merged to produce final versions in the NamespaceName namespace. +const ConfigNamespaceName resource.Namespace = "network-config" + // AddressID builds ID (primary key) for the address. func AddressID(linkName string, addr netaddr.IPPrefix) string { return fmt.Sprintf("%s/%s", linkName, addr) } + +// LayeredID builds configuration for the entity at some layer. +func LayeredID(layer ConfigLayer, id string) string { + return fmt.Sprintf("%s/%s", layer, id) +} diff --git a/pkg/resources/network/network_test.go b/pkg/resources/network/network_test.go index 0b0e054392..02b8e965b5 100644 --- a/pkg/resources/network/network_test.go +++ b/pkg/resources/network/network_test.go @@ -26,6 +26,7 @@ func TestRegisterResource(t *testing.T) { for _, resource := range []resource.Resource{ &network.AddressStatus{}, + &network.AddressSpec{}, &network.LinkStatus{}, } { assert.NoError(t, resourceRegistry.Register(ctx, resource))