Skip to content

Commit

Permalink
feat: use discovery information for etcd join (and other etcd calls)
Browse files Browse the repository at this point in the history
Talos historically relied on `kubernetes` `Endpoints` resource (which
specifies `kube-apiserver` endpoints) to find other controlplane members
of the cluster to connect to the `etcd` nodes for the cluster (when node
local etcd instance is not up, for example). This method works great,
but it relies on Kubernetes endpoint being up. If the Kubernetes API is
down for whatever reason, or if the loadbalancer malfunctions, endpoints
are not available and join/leave operations don't work.

This PR replaces the endpoints lookup to use the `Endpoints` COSI
resource which is filled in using two methods:

* from the discovery data (if discovery is enabled, default to enabled)
* from the Kubernetes `Endpoints` resource

If the discovery is disabled (or not available), this change does almost
nothing: still Kubernetes is used to discover control plane endpoints,
but as the data persists in memory, even if the Kubernetes control plane
endpoint went down, cached copy will be used to connect to the endpoint.

If the discovery is enabled, Talos can join the etcd cluster immediately
on boot without waiting for Kubernetes to be up on the bootstrap node
which means that Talos cluster initial bootstrap runs in parallel on all
control plane nodes, while previously nodes were waiting for the first
node to finish bootstrap enough to fill in the endpoints data.

As the `etcd` communication is anyways protected with mutual TLS,
there's no risk even if the discovery data is stale or poisoned, as etcd
operations would fail on TLS mismatch.

Most of the changes in this PR actually enable populating Talos
`Endpoints` resource based on the `Kubernetes` `endpoints` resource
using the watch API.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Apr 21, 2022
1 parent 2b03057 commit b085343
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (s *Server) Upgrade(ctx context.Context, in *machine.UpgradeRequest) (reply
}

if s.Controller.Runtime().Config().Machine().Type() != machinetype.TypeWorker && !in.GetForce() {
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().State().V1Alpha2().Resources())
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
Expand Down Expand Up @@ -1782,7 +1782,7 @@ func (s *Server) EtcdMemberList(ctx context.Context, in *machine.EtcdMemberListR
if in.QueryLocal {
client, err = etcd.NewLocalClient()
} else {
client, err = etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
client, err = etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().State().V1Alpha2().Resources())
}

if err != nil {
Expand Down Expand Up @@ -1834,7 +1834,7 @@ func (s *Server) EtcdRemoveMember(ctx context.Context, in *machine.EtcdRemoveMem
return nil, err
}

client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().State().V1Alpha2().Resources())
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
Expand Down Expand Up @@ -1863,7 +1863,7 @@ func (s *Server) EtcdLeaveCluster(ctx context.Context, in *machine.EtcdLeaveClus
return nil, err
}

client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().State().V1Alpha2().Resources())
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
Expand Down Expand Up @@ -1892,7 +1892,7 @@ func (s *Server) EtcdForfeitLeadership(ctx context.Context, in *machine.EtcdForf
return nil, err
}

client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().State().V1Alpha2().Resources())
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
Expand Down
218 changes: 176 additions & 42 deletions internal/app/machined/pkg/controllers/k8s/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ import (
"inet.af/netaddr"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

"github.com/talos-systems/talos/pkg/conditions"
"github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/resources/config"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
"github.com/talos-systems/talos/pkg/machinery/resources/secrets"
)

// EndpointController looks up control plane endpoints.
Expand All @@ -38,14 +45,7 @@ func (ctrl *EndpointController) Name() string {

// Inputs implements controller.Controller interface.
func (ctrl *EndpointController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineTypeType,
ID: pointer.ToString(config.MachineTypeID),
Kind: controller.InputWeak,
},
}
return nil
}

// Outputs implements controller.Controller interface.
Expand All @@ -59,7 +59,20 @@ func (ctrl *EndpointController) Outputs() []controller.Output {
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
if err := r.UpdateInputs([]controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineTypeType,
ID: pointer.ToString(config.MachineTypeID),
Kind: controller.InputWeak,
},
}); err != nil {
return err
}

for {
select {
case <-ctx.Done():
Expand All @@ -78,25 +91,27 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l

machineType := machineTypeRes.(*config.MachineType).MachineType()

if machineType != machine.TypeWorker {
// TODO: implemented only for machine.TypeWorker for now, should be extended to support control plane machines (for etcd join).
continue
}

logger.Debug("waiting for kubelet client config", zap.String("file", constants.KubeletKubeconfig))

if err = conditions.WaitForKubeconfigReady(constants.KubeletKubeconfig).Wait(ctx); err != nil {
return err
}

if err = ctrl.watchEndpoints(ctx, r, logger); err != nil {
return err
switch machineType { //nolint:exhaustive
case machine.TypeWorker:
if err = ctrl.watchEndpointsOnWorker(ctx, r, logger); err != nil {
return err
}
case machine.TypeControlPlane, machine.TypeInit:
if err = ctrl.watchEndpointsOnControlPlane(ctx, r, logger); err != nil {
return err
}
}
}
}

//nolint:gocyclo
func (ctrl *EndpointController) watchEndpoints(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
func (ctrl *EndpointController) watchEndpointsOnWorker(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
logger.Debug("waiting for kubelet client config", zap.String("file", constants.KubeletKubeconfig))

if err := conditions.WaitForKubeconfigReady(constants.KubeletKubeconfig).Wait(ctx); err != nil {
return err
}

client, err := kubernetes.NewClientFromKubeletKubeconfig()
if err != nil {
return fmt.Errorf("error building Kubernetes client: %w", err)
Expand All @@ -114,38 +129,157 @@ func (ctrl *EndpointController) watchEndpoints(ctx context.Context, r controller
return fmt.Errorf("error getting endpoints: %w", err)
}

addrs := []netaddr.IP{}
if err = ctrl.updateEndpointsResource(ctx, r, logger, endpoints); err != nil {
return err
}

for _, endpoint := range endpoints.Subsets {
for _, addr := range endpoint.Addresses {
ip, err := netaddr.ParseIP(addr.IP)
if err == nil {
addrs = append(addrs, ip)
}
}
select {
case <-ctx.Done():
return nil
case <-ticker.C:
case <-r.EventCh():
}
}
}

sort.Slice(addrs, func(i, j int) bool { return addrs[i].Compare(addrs[j]) < 0 })
//nolint:gocyclo
func (ctrl *EndpointController) watchEndpointsOnControlPlane(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
if err := r.UpdateInputs([]controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineTypeType,
ID: pointer.ToString(config.MachineTypeID),
Kind: controller.InputWeak,
},
{
Namespace: secrets.NamespaceName,
Type: secrets.KubernetesType,
ID: pointer.ToString(secrets.KubernetesID),
Kind: controller.InputWeak,
},
}); err != nil {
return err
}

if err := r.Modify(ctx,
k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneAPIServerEndpointsID),
func(r resource.Resource) error {
if !reflect.DeepEqual(r.(*k8s.Endpoint).TypedSpec().Addresses, addrs) {
logger.Debug("updated controlplane endpoints", zap.Any("endpoints", addrs))
}
r.QueueReconcile()

r.(*k8s.Endpoint).TypedSpec().Addresses = addrs
for {
select {
case <-r.EventCh():
case <-ctx.Done():
return nil
}

secretsResources, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesType, secrets.KubernetesID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
return nil
},
); err != nil {
return fmt.Errorf("error updating endpoints: %w", err)
}

return err
}

secrets := secretsResources.(*secrets.Kubernetes).Certs()

kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
// using here kubeconfig with cluster control plane endpoint, as endpoint discovery should work before local API server is ready
return clientcmd.Load([]byte(secrets.AdminKubeconfig))
})
if err != nil {
return fmt.Errorf("error loading kubeconfig: %w", err)
}

if err = ctrl.watchKubernetesEndpoint(ctx, r, logger, kubeconfig); err != nil {
return err
}
}
}

func (ctrl *EndpointController) updateEndpointsResource(ctx context.Context, r controller.Runtime, logger *zap.Logger, endpoints *corev1.Endpoints) error {
addrs := []netaddr.IP{}

for _, endpoint := range endpoints.Subsets {
for _, addr := range endpoint.Addresses {
ip, err := netaddr.ParseIP(addr.IP)
if err == nil {
addrs = append(addrs, ip)
}
}
}

sort.Slice(addrs, func(i, j int) bool { return addrs[i].Compare(addrs[j]) < 0 })

if err := r.Modify(ctx,
k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneAPIServerEndpointsID),
func(r resource.Resource) error {
if !reflect.DeepEqual(r.(*k8s.Endpoint).TypedSpec().Addresses, addrs) {
logger.Debug("updated controlplane endpoints", zap.Any("endpoints", addrs))
}

r.(*k8s.Endpoint).TypedSpec().Addresses = addrs

return nil
},
); err != nil {
return fmt.Errorf("error updating endpoints: %w", err)
}

return nil
}

func (ctrl *EndpointController) watchKubernetesEndpoint(ctx context.Context, r controller.Runtime, logger *zap.Logger, kubeconfig *rest.Config) error {
client, err := kubernetes.NewForConfig(kubeconfig)
if err != nil {
return fmt.Errorf("error building Kubernetes client: %w", err)
}

defer client.Close() //nolint:errcheck

// abort the watch on any return from this function
ctx, cancel := context.WithCancel(ctx)
defer cancel()

notifyCh := kubernetesEndpointWatcher(ctx, logger, client)

for {
select {
case endpoints := <-notifyCh:
if err = ctrl.updateEndpointsResource(ctx, r, logger, endpoints); err != nil {
return err
}
case <-ctx.Done():
return nil
case <-ticker.C:
case <-r.EventCh():
// something got updated, probably kubeconfig, restart the watch
r.QueueReconcile()

return nil
}
}
}

func kubernetesEndpointWatcher(ctx context.Context, logger *zap.Logger, client *kubernetes.Client) chan *corev1.Endpoints {
informerFactory := informers.NewSharedInformerFactoryWithOptions(
client.Clientset, 30*time.Second,
informers.WithNamespace(corev1.NamespaceDefault),
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", "kubernetes").String()
}),
)

notifyCh := make(chan *corev1.Endpoints, 1)

informer := informerFactory.Core().V1().Endpoints().Informer()
informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { //nolint:errcheck
logger.Error("kubernetes endpoint watch error", zap.Error(err))
})
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { notifyCh <- obj.(*corev1.Endpoints) },
DeleteFunc: func(_ interface{}) { notifyCh <- &corev1.Endpoints{} },
UpdateFunc: func(_, obj interface{}) { notifyCh <- obj.(*corev1.Endpoints) },
})

informerFactory.Start(ctx.Done())

return notifyCh
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti
)

kubeconfig, err = clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return clientcmd.Load([]byte(secrets.AdminKubeconfig))
return clientcmd.Load([]byte(secrets.LocalhostAdminKubeconfig))
})
if err != nil {
return fmt.Errorf("error loading kubeconfig: %w", err)
Expand Down
25 changes: 20 additions & 5 deletions internal/app/machined/pkg/controllers/secrets/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,28 @@ func (ctrl *KubernetesController) updateSecrets(k8sRoot *secrets.KubernetesRootS

buf.Reset()

if err = kubeconfig.GenerateAdmin(&generateAdminAdapter{k8sRoot: k8sRoot}, &buf); err != nil {
if err = kubeconfig.GenerateAdmin(&generateAdminAdapter{
k8sRoot: k8sRoot,
endpoint: k8sRoot.Endpoint,
}, &buf); err != nil {
return fmt.Errorf("failed to generate admin kubeconfig: %w", err)
}

k8sSecrets.AdminKubeconfig = buf.String()

buf.Reset()

localhost, _ := url.Parse("https://localhost:6443/") //nolint:errcheck

if err = kubeconfig.GenerateAdmin(&generateAdminAdapter{
k8sRoot: k8sRoot,
endpoint: localhost,
}, &buf); err != nil {
return fmt.Errorf("failed to generate admin kubeconfig: %w", err)
}

k8sSecrets.LocalhostAdminKubeconfig = buf.String()

return nil
}

Expand All @@ -305,17 +321,16 @@ func (ctrl *KubernetesController) teardownAll(ctx context.Context, r controller.

// generateAdminAdapter allows to translate input config into GenerateAdmin input.
type generateAdminAdapter struct {
k8sRoot *secrets.KubernetesRootSpec
k8sRoot *secrets.KubernetesRootSpec
endpoint *url.URL
}

func (adapter *generateAdminAdapter) Name() string {
return adapter.k8sRoot.Name
}

func (adapter *generateAdminAdapter) Endpoint() *url.URL {
u, _ := url.Parse("https://localhost:6443/") //nolint:errcheck

return u
return adapter.endpoint
}

func (adapter *generateAdminAdapter) CA() *x509.PEMEncodedCertificateAndKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (suite *KubernetesSuite) TestReconcile() {
for _, kubeconfig := range []string{
kubernetesCerts.ControllerManagerKubeconfig,
kubernetesCerts.SchedulerKubeconfig,
kubernetesCerts.LocalhostAdminKubeconfig,
kubernetesCerts.AdminKubeconfig,
} {
config, err := clientcmd.Load([]byte(kubeconfig))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ func UncordonNode(seq runtime.Sequence, data interface{}) (runtime.TaskExecution
// LeaveEtcd represents the task for removing a control plane node from etcd.
func LeaveEtcd(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
client, err := etcd.NewClientFromControlPlaneIPs(ctx, r.Config().Cluster().CA(), r.Config().Cluster().Endpoint())
client, err := etcd.NewClientFromControlPlaneIPs(ctx, r.State().V1Alpha2().Resources())
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}
Expand Down
Loading

0 comments on commit b085343

Please sign in to comment.