Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use an uncached direct client to read the kcp and md to bypass delay in cache #7779

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions controllers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func (f *Factory) WithKubeadmControlPlaneReconciler() *Factory {

f.reconcilers.KubeadmControlPlaneReconciler = NewKubeadmControlPlaneReconciler(
f.manager.GetClient(),
f.manager.GetAPIReader(),
)

return nil
Expand All @@ -619,6 +620,7 @@ func (f *Factory) WithMachineDeploymentReconciler() *Factory {

f.reconcilers.MachineDeploymentReconciler = NewMachineDeploymentReconciler(
f.manager.GetClient(),
f.manager.GetAPIReader(),
)

return nil
Expand Down
2 changes: 2 additions & 0 deletions controllers/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestFactoryWithKubeadmControlPlaneReconciler(t *testing.T) {
ctrl := gomock.NewController(t)
manager := mocks.NewMockManager(ctrl)
manager.EXPECT().GetClient().AnyTimes()
manager.EXPECT().GetAPIReader().AnyTimes()
manager.EXPECT().GetScheme().AnyTimes()

f := controllers.NewFactory(logger, manager).
Expand All @@ -257,6 +258,7 @@ func TestFactoryWithMachineDeploymentReconciler(t *testing.T) {
ctrl := gomock.NewController(t)
manager := mocks.NewMockManager(ctrl)
manager.EXPECT().GetClient().AnyTimes()
manager.EXPECT().GetAPIReader().AnyTimes()
manager.EXPECT().GetScheme().AnyTimes()

f := controllers.NewFactory(logger, manager).
Expand Down
52 changes: 28 additions & 24 deletions controllers/kubeadmcontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,24 @@ import (
const (
kcpInPlaceUpgradeNeededAnnotation = "controlplane.clusters.x-k8s.io/in-place-upgrade-needed"
controlPlaneMachineLabel = "cluster.x-k8s.io/control-plane-name"
kubeadmControlPlaneKind = "KubeadmControlPlane"
)

// KubeadmControlPlaneReconciler reconciles a KubeadmControlPlaneReconciler object.
type KubeadmControlPlaneReconciler struct {
// client reads from a cache and is not a fully direct client.
client client.Client
log logr.Logger
// uncachedClient reads directly from the API server and is slightly slower.
uncachedClient client.Reader
log logr.Logger
}

// NewKubeadmControlPlaneReconciler returns a new instance of KubeadmControlPlaneReconciler.
func NewKubeadmControlPlaneReconciler(client client.Client) *KubeadmControlPlaneReconciler {
func NewKubeadmControlPlaneReconciler(client client.Client, uncachedClient client.Reader) *KubeadmControlPlaneReconciler {
return &KubeadmControlPlaneReconciler{
client: client,
log: ctrl.Log.WithName("KubeadmControlPlaneController"),
client: client,
uncachedClient: uncachedClient,
log: ctrl.Log.WithName("KubeadmControlPlaneController"),
}
}

Expand All @@ -69,7 +74,7 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.
log := r.log.WithValues("KubeadmControlPlane", req.NamespacedName)

kcp := &controlplanev1.KubeadmControlPlane{}
if err := r.client.Get(ctx, req.NamespacedName, kcp); err != nil {
if err := r.uncachedClient.Get(ctx, req.NamespacedName, kcp); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -117,26 +122,26 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
}

mhc := &clusterv1.MachineHealthCheck{}
if err := r.client.Get(ctx, GetNamespacedNameType(cpMachineHealthCheckName(kcp.Name), constants.EksaSystemNamespace), mhc); err != nil {
if err := r.client.Get(ctx, GetNamespacedNameType(cpMachineHealthCheckName(kcp.ObjectMeta.Name), constants.EksaSystemNamespace), mhc); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}
return ctrl.Result{}, fmt.Errorf("getting MachineHealthCheck %s: %v", cpMachineHealthCheckName(kcp.Name), err)
return ctrl.Result{}, fmt.Errorf("getting MachineHealthCheck %s: %v", cpMachineHealthCheckName(kcp.ObjectMeta.Name), err)
}
mhcPatchHelper, err := patch.NewHelper(mhc, r.client)
if err != nil {
return ctrl.Result{}, err
}

cpUpgrade := &anywherev1.ControlPlaneUpgrade{}
cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.Name), constants.EksaSystemNamespace), cpUpgrade)
cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.ObjectMeta.Name), constants.EksaSystemNamespace), cpUpgrade)
if cpuGetErr == nil {
if cpUpgrade.Status.Ready && kcp.Status.Version != nil && *kcp.Status.Version == cpUpgrade.Spec.KubernetesVersion {
log.Info("Control plane upgrade complete, deleting object", "ControlPlaneUpgrade", cpUpgrade.Name)
if err := r.client.Delete(ctx, cpUpgrade); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err)
}
log.Info("Resuming control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
log.Info("Resuming control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.ObjectMeta.Name))
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
Expand All @@ -159,32 +164,31 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err)
}

log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.ObjectMeta.Name))
if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}

if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)
return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.ObjectMeta.Name, err)
}
return ctrl.Result{}, nil
}

return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)

return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.ObjectMeta.Name, err)
}

func (r *KubeadmControlPlaneReconciler) inPlaceUpgradeNeeded(kcp *controlplanev1.KubeadmControlPlane) bool {
return strings.ToLower(kcp.Annotations[kcpInPlaceUpgradeNeededAnnotation]) == "true"
}

func (r *KubeadmControlPlaneReconciler) machinesToUpgrade(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) ([]corev1.ObjectReference, error) {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{controlPlaneMachineLabel: kcp.Name}})
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{controlPlaneMachineLabel: kcp.ObjectMeta.Name}})
if err != nil {
return nil, err
}
machineList := &clusterv1.MachineList{}
if err := r.client.List(ctx, machineList, &client.ListOptions{LabelSelector: selector, Namespace: kcp.Namespace}); err != nil {
if err := r.client.List(ctx, machineList, &client.ListOptions{LabelSelector: selector, Namespace: kcp.ObjectMeta.Namespace}); err != nil {
return nil, err
}
machines := collections.FromMachineList(machineList).SortedByCreationTimestamp()
Expand All @@ -203,7 +207,7 @@ func (r *KubeadmControlPlaneReconciler) machinesToUpgrade(ctx context.Context, k

func (r *KubeadmControlPlaneReconciler) validateStackedEtcd(kcp *controlplanev1.KubeadmControlPlane) error {
if kcp.Spec.KubeadmConfigSpec.ClusterConfiguration == nil {
return fmt.Errorf("ClusterConfiguration not set for KubeadmControlPlane \"%s\", unable to retrieve etcd information", kcp.Name)
return fmt.Errorf("ClusterConfiguration not set for KubeadmControlPlane \"%s\", unable to retrieve etcd information", kcp.ObjectMeta.Name)
}
if kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.Local == nil {
return fmt.Errorf("local etcd configuration is missing")
Expand All @@ -229,20 +233,20 @@ func controlPlaneUpgrade(kcp *controlplanev1.KubeadmControlPlane, machines []cor

return &anywherev1.ControlPlaneUpgrade{
ObjectMeta: metav1.ObjectMeta{
Name: cpUpgradeName(kcp.Name),
Name: cpUpgradeName(kcp.ObjectMeta.Name),
Namespace: constants.EksaSystemNamespace,
OwnerReferences: []metav1.OwnerReference{{
APIVersion: kcp.APIVersion,
Kind: kcp.Kind,
Name: kcp.Name,
UID: kcp.UID,
APIVersion: controlplanev1.GroupVersion.String(),
Kind: kubeadmControlPlaneKind,
Name: kcp.ObjectMeta.Name,
UID: kcp.ObjectMeta.UID,
}},
},
Spec: anywherev1.ControlPlaneUpgradeSpec{
ControlPlane: corev1.ObjectReference{
Kind: kcp.Kind,
Namespace: kcp.Namespace,
Name: kcp.Name,
Kind: kubeadmControlPlaneKind,
Namespace: kcp.ObjectMeta.Namespace,
Name: kcp.ObjectMeta.Name,
},
KubernetesVersion: kcp.Spec.Version,
EtcdVersion: kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.Local.ImageTag,
Expand Down
20 changes: 10 additions & 10 deletions controllers/kubeadmcontrolplane_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type kcpObjects struct {

func TestKCPSetupWithManager(t *testing.T) {
client := env.Client()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)

g := NewWithT(t)
g.Expect(r.SetupWithManager(env.Manager())).To(Succeed())
Expand All @@ -54,7 +54,7 @@ func TestKCPReconcileNotNeeded(t *testing.T) {

runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())
Expand All @@ -72,7 +72,7 @@ func TestKCPReconcile(t *testing.T) {

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())
Expand All @@ -89,7 +89,7 @@ func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestKCPReconcileFullFlow(t *testing.T) {

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestKCPReconcileNotFound(t *testing.T) {
kcpObjs := getObjectsForKCP()

client := fake.NewClientBuilder().WithRuntimeObjects().Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).To(MatchError("kubeadmcontrolplanes.controlplane.cluster.x-k8s.io \"my-cluster\" not found"))
Expand All @@ -220,7 +220,7 @@ func TestKCPReconcileMHCNotFound(t *testing.T) {

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).To(MatchError("machinehealthchecks.cluster.x-k8s.io \"my-cluster-kcp-unhealthy\" not found"))
Expand All @@ -235,7 +235,7 @@ func TestKCPReconcileClusterConfigurationMissing(t *testing.T) {

runtimeObjs := []runtime.Object{kcpObjs.kcp}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).To(MatchError("ClusterConfiguration not set for KubeadmControlPlane \"my-cluster\", unable to retrieve etcd information"))
Expand All @@ -250,7 +250,7 @@ func TestKCPReconcileStackedEtcdMissing(t *testing.T) {

runtimeObjs := []runtime.Object{kcpObjs.kcp}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
r := controllers.NewKubeadmControlPlaneReconciler(client, client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).To(MatchError("local etcd configuration is missing"))
Expand Down
Loading