Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup microcluster/k8s when scaling down cluster nodes #15

Merged
merged 10 commits into from
Jun 28, 2024
18 changes: 4 additions & 14 deletions bootstrap/controllers/ck8sconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,12 @@ func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scop
// injects into config.Version values from top level object
r.reconcileTopLevelObjectSettings(scope.Cluster, machine, scope.Config)

authToken, err := token.Lookup(ctx, r.Client, client.ObjectKeyFromObject(scope.Cluster))
if err != nil {
conditions.MarkFalse(scope.Config, bootstrapv1.DataSecretAvailableCondition, bootstrapv1.DataSecretGenerationFailedReason, clusterv1.ConditionSeverityWarning, err.Error())
return err
}

if authToken == nil {
return fmt.Errorf("auth token not yet generated")
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(scope.Cluster))
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(scope.Cluster), scope.Config.Spec.ControlPlaneConfig.MicroclusterPort)
if err != nil {
return fmt.Errorf("failed to create remote cluster client: %w", err)
}

joinToken, err := workloadCluster.NewControlPlaneJoinToken(ctx, *authToken, scope.Config.Spec.ControlPlaneConfig.MicroclusterPort, scope.Config.Name)
joinToken, err := workloadCluster.NewControlPlaneJoinToken(ctx, scope.Config.Name)
if err != nil {
return fmt.Errorf("failed to request join token: %w", err)
}
Expand Down Expand Up @@ -303,12 +293,12 @@ func (r *CK8sConfigReconciler) joinWorker(ctx context.Context, scope *Scope) err
return fmt.Errorf("auth token not yet generated")
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(scope.Cluster))
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(scope.Cluster), scope.Config.Spec.ControlPlaneConfig.MicroclusterPort)
if err != nil {
return fmt.Errorf("failed to create remote cluster client: %w", err)
}

joinToken, err := workloadCluster.NewWorkerJoinToken(ctx, *authToken, scope.Config.Spec.ControlPlaneConfig.MicroclusterPort, scope.Config.Name)
joinToken, err := workloadCluster.NewWorkerJoinToken(ctx)
if err != nil {
return fmt.Errorf("failed to request join token: %w", err)
}
Expand Down
8 changes: 5 additions & 3 deletions controlplane/controllers/ck8scontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (r *CK8sControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr c

if r.managementClusterUncached == nil {
r.managementClusterUncached = &ck8s.Management{
Client: mgr.GetAPIReader(),
Client: mgr.GetClient(),
K8sdDialTimeout: r.K8sdDialTimeout,
}
}
Expand Down Expand Up @@ -378,7 +378,8 @@ func (r *CK8sControlPlaneReconciler) updateStatus(ctx context.Context, kcp *cont
}
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
microclusterPort := kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.MicroclusterPort
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster), microclusterPort)
if err != nil {
return fmt.Errorf("failed to create remote cluster client: %w", err)
}
Expand Down Expand Up @@ -665,7 +666,8 @@ func (r *CK8sControlPlaneReconciler) reconcileControlPlaneConditions(ctx context
return nil
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster))
microclusterPort := controlPlane.KCP.Spec.CK8sConfigSpec.ControlPlaneConfig.MicroclusterPort
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster), microclusterPort)
if err != nil {
return fmt.Errorf("cannot get remote client to workload cluster: %w", err)
}
Expand Down
14 changes: 6 additions & 8 deletions controlplane/controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/canonical/cluster-api-k8s/pkg/ck8s"
"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -24,10 +25,7 @@ type MachineReconciler struct {

K8sdDialTimeout time.Duration

// NOTE(neoaggelos): See note below
/**
managementCluster ck8s.ManagementCluster
**/
}

func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger) error {
Expand All @@ -36,15 +34,16 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
Build(r)

// NOTE(neoaggelos): See note below
/**
if r.managementCluster == nil {
r.managementCluster = &ck8s.Management{
Client: r.Client,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
K8sdDialTimeout: r.K8sdDialTimeout,
/*
neoaggelos marked this conversation as resolved.
Show resolved Hide resolved
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
*/
}
}
**/

return err
}
Expand Down Expand Up @@ -101,7 +100,6 @@ func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
logger.Info("unable to get cluster.")
return ctrl.Result{}, errors.Wrapf(err, "unable to get cluster")
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to create client to workload cluster")
Expand Down
13 changes: 13 additions & 0 deletions controlplane/controllers/remediation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -236,6 +237,18 @@ func (r *CK8sControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Cont
**/
}

microclusterPort := controlPlane.KCP.Spec.CK8sConfigSpec.ControlPlaneConfig.MicroclusterPort
clusterObjectKey := util.ObjectKey(controlPlane.Cluster)
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, clusterObjectKey, microclusterPort)
if err != nil {
log.Error(err, "failed to create client to workload cluster")
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
}

if err := workloadCluster.RemoveMachineFromCluster(ctx, machineToBeRemediated); err != nil {
log.Error(err, "failed to remove machine from microcluster")
}

// Delete the machine
if err := r.Client.Delete(ctx, machineToBeRemediated); err != nil {
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error())
Expand Down
12 changes: 12 additions & 0 deletions controlplane/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ func (r *CK8sControlPlaneReconciler) scaleDownControlPlane(
}
**/

microclusterPort := controlPlane.KCP.Spec.CK8sConfigSpec.ControlPlaneConfig.MicroclusterPort
clusterObjectKey := util.ObjectKey(cluster)
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, clusterObjectKey, microclusterPort)
if err != nil {
logger.Error(err, "failed to create client to workload cluster")
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
}

if err := workloadCluster.RemoveMachineFromCluster(ctx, machineToDelete); err != nil {
logger.Error(err, "failed to remove machine from microcluster")
}

logger = logger.WithValues("machine", machineToDelete)
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to delete control plane machine")
Expand Down
2 changes: 1 addition & 1 deletion controlplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func main() {
flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute,
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

flag.DurationVar(&k8sdDialTimeout, "k8sd-dial-timeout-duration", 10*time.Second,
flag.DurationVar(&k8sdDialTimeout, "k8sd-dial-timeout-duration", 60*time.Second,
bschimke95 marked this conversation as resolved.
Show resolved Hide resolved
"Duration that the proxy client waits at most to establish a connection with k8sd")

flag.Parse()
Expand Down
7 changes: 7 additions & 0 deletions pkg/ck8s/api/cluster_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package apiv1

// RemoveNodeRequest is used to request to remove a node from the cluster.
type RemoveNodeRequest struct {
Name string `json:"name"`
Force bool `json:"force"`
}
24 changes: 21 additions & 3 deletions pkg/ck8s/management_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/canonical/cluster-api-k8s/pkg/token"
"k8s.io/client-go/kubernetes/scheme"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
Expand All @@ -17,14 +18,14 @@ type ManagementCluster interface {
client.Reader

GetMachinesForCluster(ctx context.Context, cluster client.ObjectKey, filters ...collections.Func) (collections.Machines, error)
GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (*Workload, error)
GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey, microclusterPort int) (*Workload, error)
}

// Management holds operations on the management cluster.
type Management struct {
ManagementCluster

Client client.Reader
Client client.Client

K8sdDialTimeout time.Duration
}
Expand Down Expand Up @@ -70,7 +71,7 @@ const (

// GetWorkloadCluster builds a cluster object.
// The cluster comes with an etcd client generator to connect to any etcd pod living on a managed machine.
func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (*Workload, error) {
func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey, microclusterPort int) (*Workload, error) {
restConfig, err := remote.RESTConfig(ctx, CK8sControlPlaneControllerName, m.Client, clusterKey)
if err != nil {
return nil, fmt.Errorf("failed to get config: %w", err)
Expand All @@ -87,10 +88,25 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
return nil, &RemoteClusterConnectionError{Name: clusterKey.String(), Err: err}
}

authToken, err := token.Lookup(ctx, m.Client, clusterKey)
if err != nil {
return nil, fmt.Errorf("failed to lookup auth token: %w", err)
}

if authToken == nil {
return nil, fmt.Errorf("auth token not yet generated")
}

if microclusterPort == 0 {
microclusterPort = 2380
}
bschimke95 marked this conversation as resolved.
Show resolved Hide resolved

workload := &Workload{
authToken: *authToken,
Client: c,
ClientRestConfig: restConfig,
K8sdClientGenerator: g,
microclusterPort: microclusterPort,

/**
CoreDNSMigrator: &CoreDNSMigrator{},
Expand Down Expand Up @@ -155,3 +171,5 @@ func (m *Management) getEtcdCAKeyPair(ctx context.Context, clusterKey client.Obj
return crtData, keyData, nil
}
**/

var _ ManagementCluster = &Management{}
Loading