Skip to content

Commit

Permalink
Add annotation to configure healthcheck retries for EtcdadmCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavmpandey08 committed Sep 6, 2023
1 parent ec74d9d commit c7a2b6d
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 29 deletions.
2 changes: 2 additions & 0 deletions api/v1beta1/etcdadmcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
const (
UpgradeInProgressAnnotation = "etcdcluster.cluster.x-k8s.io/upgrading"

HealthCheckRetriesAnnotation = "etcdcluster.cluster.x-k8s.io/healthcheck-retries"

// EtcdadmClusterFinalizer is the finalizer applied to EtcdadmCluster resources
// by its managing controller.
EtcdadmClusterFinalizer = "etcdcluster.cluster.x-k8s.io"
Expand Down
1 change: 1 addition & 0 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type EtcdadmClusterReconciler struct {
Scheme *runtime.Scheme
etcdHealthCheckConfig etcdHealthCheckConfig
MaxConcurrentReconciles int
HealthCheckInterval time.Duration
GetEtcdClient func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error)
isPortOpen func(ctx context.Context, endpoint string) bool
}
Expand Down
33 changes: 21 additions & 12 deletions controllers/periodic_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controllers

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -18,8 +20,7 @@ import (
)

const (
maxUnhealthyCount = 5
healthCheckInterval = 30
maxUnhealthyCount = 5
)

type etcdHealthCheckConfig struct {
Expand All @@ -38,7 +39,7 @@ type etcdadmClusterMemberHealthConfig struct {
func (r *EtcdadmClusterReconciler) startHealthCheckLoop(ctx context.Context, done <-chan struct{}) {
r.Log.Info("Starting periodic healthcheck loop")
etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)
ticker := time.NewTicker(healthCheckInterval * time.Second)
ticker := time.NewTicker(r.HealthCheckInterval)
defer ticker.Stop()

for {
Expand All @@ -64,6 +65,14 @@ func (r *EtcdadmClusterReconciler) startHealthCheck(ctx context.Context, etcdadm
log.Info("EtcdadmCluster reconciliation is paused, skipping health checks")
continue
}
if val, set := ec.Annotations[etcdv1.HealthCheckRetriesAnnotation]; set {
if retries, err := strconv.Atoi(val); err != nil || retries < 0 {
log.Info(fmt.Sprintf("healthcheck-retries annotation configured with invalid value: %v", err))
} else if retries == 0 {
log.Info("healthcheck-retries annotation configured to 0, skipping health checks")
continue
}
}
if conditions.IsFalse(&ec, etcdv1.EtcdCertificatesAvailableCondition) {
log.Info("EtcdadmCluster certificates are not ready, skipping health checks")
continue
Expand Down Expand Up @@ -139,7 +148,15 @@ func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Co
log.Info("Machine for member does not exist", "member", endpoint)
currClusterHFConfig.unhealthyMembersToRemove[endpoint] = m
}
if currClusterHFConfig.unhealthyMembersFrequency[endpoint] >= maxUnhealthyCount {
unhealthyCount := maxUnhealthyCount
if val, set := etcdCluster.Annotations[etcdv1.HealthCheckRetriesAnnotation]; set {
retries, err := strconv.Atoi(val)
if err != nil || retries < 0 {
log.Info("healthcheck-retries annotation configured with invalid value, using default retries")
}
unhealthyCount = retries
}
if currClusterHFConfig.unhealthyMembersFrequency[endpoint] >= unhealthyCount {
log.Info("Adding to list of unhealthy members to remove", "member", endpoint)
// member has been unresponsive, add the machine to unhealthyMembersToRemove queue
m := currClusterHFConfig.endpointToMachineMapper[endpoint]
Expand All @@ -158,13 +175,6 @@ func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Co
return nil
}

finalEndpoints := make([]string, 0, len(endpoints))
for _, endpoint := range endpoints {
if _, existsInUnhealthyMap := currClusterHFConfig.unhealthyMembersToRemove[endpoint]; !existsInUnhealthyMap {
finalEndpoints = append(finalEndpoints, endpoint)
}
}

var retErr error
for machineEndpoint, machineToDelete := range currClusterHFConfig.unhealthyMembersToRemove {
if err := r.removeEtcdMachine(ctx, etcdCluster, cluster, machineToDelete, getEtcdMachineAddressFromClientURL(machineEndpoint)); err != nil {
Expand All @@ -183,7 +193,6 @@ func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Co
return retErr
}

etcdCluster.Status.Endpoints = strings.Join(finalEndpoints, ",")
etcdCluster.Status.Ready = false
return r.Client.Status().Update(ctx, etcdCluster)
}
Expand Down
94 changes: 89 additions & 5 deletions controllers/periodic_healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"github.com/aws/etcdadm-controller/controllers/mocks"
"github.com/golang/mock/gomock"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func TestStartHealthCheckLoop(t *testing.T) {
_ = NewWithT(t)

ctrl := gomock.NewController(t)
mockEtcd := mocks.NewMockEtcdClient(ctrl)
mockRt := mocks.NewMockRoundTripper(ctrl)
Expand All @@ -27,15 +26,15 @@ func TestStartHealthCheckLoop(t *testing.T) {

fakeKubernetesClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(etcdTest.gatherObjects()...).Build()

etcdEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
mockEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
return mockEtcd, nil
}

r := &EtcdadmClusterReconciler{
Client: fakeKubernetesClient,
uncachedClient: fakeKubernetesClient,
Log: log.Log,
GetEtcdClient: etcdEtcdClient,
GetEtcdClient: mockEtcdClient,
}
mockHttpClient := &http.Client{
Transport: mockRt,
Expand All @@ -44,12 +43,97 @@ func TestStartHealthCheckLoop(t *testing.T) {
r.etcdHealthCheckConfig.clusterToHttpClient.Store(etcdTest.cluster.UID, mockHttpClient)
r.SetIsPortOpen(isPortOpenMock)

mockRt.EXPECT().RoundTrip(gomock.Any()).Return(healthyEtcdResponse, nil).Times(3)
mockRt.EXPECT().RoundTrip(gomock.Any()).Return(getHealthyEtcdResponse(), nil).Times(3)

etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)
r.startHealthCheck(context.Background(), etcdadmClusterMapper)
}

func TestStartHealthCheckLoopWithNoRetries(t *testing.T) {
g := NewWithT(t)
ctrl := gomock.NewController(t)
mockEtcd := mocks.NewMockEtcdClient(ctrl)
mockRt := mocks.NewMockRoundTripper(ctrl)

etcdTest := newEtcdadmClusterTest()
etcdTest.buildClusterWithExternalEtcd().withHealthCheckRetries(0)
etcdTest.etcdadmCluster.Status.CreationComplete = true

fakeKubernetesClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(etcdTest.gatherObjects()...).Build()

mockEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
return mockEtcd, nil
}

r := &EtcdadmClusterReconciler{
Client: fakeKubernetesClient,
uncachedClient: fakeKubernetesClient,
Log: log.Log,
GetEtcdClient: mockEtcdClient,
}
mockHttpClient := &http.Client{
Transport: mockRt,
}

r.etcdHealthCheckConfig.clusterToHttpClient.Store(etcdTest.cluster.UID, mockHttpClient)
r.SetIsPortOpen(isPortOpenMock)

mockRt.EXPECT().RoundTrip(gomock.Any()).Return(nil, errors.New("error")).Times(3)

etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)
r.startHealthCheck(context.Background(), etcdadmClusterMapper)

g.Expect(etcdadmClusterMapper).To(BeEmpty())
}

func TestStartHealthCheckLoopWithCustomRetries(t *testing.T) {
g := NewWithT(t)
ctrl := gomock.NewController(t)
mockEtcd := mocks.NewMockEtcdClient(ctrl)
mockRt := mocks.NewMockRoundTripper(ctrl)
etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)

etcdTest := newEtcdadmClusterTest()
etcdTest.buildClusterWithExternalEtcd().withHealthCheckRetries(3)
etcdTest.etcdadmCluster.Status.CreationComplete = true

fakeKubernetesClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(etcdTest.gatherObjects()...).Build()
mockEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
return mockEtcd, nil
}

r := &EtcdadmClusterReconciler{
Client: fakeKubernetesClient,
uncachedClient: fakeKubernetesClient,
Log: log.Log,
GetEtcdClient: mockEtcdClient,
}
mockHttpClient := &http.Client{
Transport: mockRt,
}

r.etcdHealthCheckConfig.clusterToHttpClient.Store(etcdTest.cluster.UID, mockHttpClient)
r.SetIsPortOpen(isPortOpenMock)

mockRt.EXPECT().RoundTrip(gomock.Any()).Return(nil, errors.New("error")).Times(9)
mockEtcd.EXPECT().MemberList(gomock.Any()).Return(etcdTest.getMemberListResponse(), nil).Times(3)
mockEtcd.EXPECT().Close().Times(3)

r.startHealthCheck(context.Background(), etcdadmClusterMapper)
g.Expect(etcdTest.getDeletedMachines(fakeKubernetesClient)).To(BeEmpty())

r.startHealthCheck(context.Background(), etcdadmClusterMapper)
g.Expect(etcdTest.getDeletedMachines(fakeKubernetesClient)).To(BeEmpty())

r.startHealthCheck(context.Background(), etcdadmClusterMapper)
g.Expect(etcdTest.getDeletedMachines(fakeKubernetesClient)).To(HaveLen(3))
}

type RoundTripperFunc func(*http.Request) (*http.Response, error)

func (fn RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return fn(r)
}

func isPortOpenMock(_ context.Context, _ string) bool {
return true
Expand Down
1 change: 1 addition & 0 deletions controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (r *EtcdadmClusterReconciler) scaleDownEtcdCluster(ctx context.Context, ec
machineAddress := getEtcdMachineAddress(machineToDelete)
return ctrl.Result{}, r.removeEtcdMachine(ctx, ec, cluster, machineToDelete, machineAddress)
}

func (r *EtcdadmClusterReconciler) removeEtcdMachine(ctx context.Context, ec *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster, machineToDelete *clusterv1.Machine, machineAddress string) error {
peerURL := fmt.Sprintf("https://%s:2380", machineAddress)
etcdClient, err := r.GetEtcdClient(ctx, cluster, ec.Status.Endpoints)
Expand Down
62 changes: 52 additions & 10 deletions controllers/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
Expand All @@ -11,6 +12,8 @@ import (
etcdbootstrapv1 "github.com/aws/etcdadm-bootstrap-provider/api/v1beta1"
etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1"
"github.com/google/uuid"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -48,11 +51,6 @@ var (
},
},
}

healthyEtcdResponse = &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString("{\"Health\": \"true\"}")),
}
)

type etcdadmClusterTest struct {
Expand All @@ -72,7 +70,7 @@ func newEtcdadmClusterTest() *etcdadmClusterTest {
}
}

func (e *etcdadmClusterTest) buildClusterWithExternalEtcd() {
func (e *etcdadmClusterTest) buildClusterWithExternalEtcd() *etcdadmClusterTest {
e.cluster = e.newClusterWithExternalEtcd()
e.etcdadmCluster = e.newEtcdadmCluster(e.cluster)
e.machines = []*clusterv1.Machine{}
Expand All @@ -83,6 +81,15 @@ func (e *etcdadmClusterTest) buildClusterWithExternalEtcd() {
endpoints = append(endpoints, fmt.Sprintf("https://%v:2379", machine.Status.Addresses[0].Address))
}
e.etcdadmCluster.Status.Endpoints = strings.Join(endpoints, ",")
return e
}

func (e *etcdadmClusterTest) withHealthCheckRetries(retries int) *etcdadmClusterTest {
if e.etcdadmCluster.Annotations == nil {
e.etcdadmCluster.Annotations = map[string]string{}
}
e.etcdadmCluster.Annotations[etcdv1.HealthCheckRetriesAnnotation] = fmt.Sprintf("%d", retries)
return e
}

// newClusterWithExternalEtcd return a CAPI cluster object with managed external etcd ref
Expand Down Expand Up @@ -156,10 +163,11 @@ func (e *etcdadmClusterTest) newEtcdMachine() *clusterv1.Machine {
APIVersion: clusterv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: names.SimpleNameGenerator.GenerateName(e.etcdadmCluster.Name + "-"),
Namespace: e.etcdadmCluster.Namespace,
Labels: EtcdLabelsForCluster(e.cluster.Name, e.etcdadmCluster.Name),
UID: types.UID(uuid.New().String()),
Name: names.SimpleNameGenerator.GenerateName(e.etcdadmCluster.Name + "-"),
Namespace: e.etcdadmCluster.Namespace,
Labels: EtcdLabelsForCluster(e.cluster.Name, e.etcdadmCluster.Name),
UID: types.UID(uuid.New().String()),
Finalizers: []string{etcdv1.EtcdadmClusterFinalizer},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(e.etcdadmCluster, etcdv1.GroupVersion.WithKind("EtcdadmCluster")),
},
Expand Down Expand Up @@ -195,3 +203,37 @@ func (e *etcdadmClusterTest) gatherObjects() []client.Object {
func (e *etcdadmClusterTest) getEtcdClusterName() string {
return fmt.Sprintf("%s-%s", e.name, etcdClusterNameSuffix)
}

func (e *etcdadmClusterTest) getMemberListResponse() *clientv3.MemberListResponse {
members := []*etcdserverpb.Member{}
for _, machine := range e.machines {
members = append(members, &etcdserverpb.Member{
PeerURLs: []string{fmt.Sprintf("https://%s:2379", machine.Status.Addresses[0].Address)},
})
}
return &clientv3.MemberListResponse{
Members: members,
}
}

func (e *etcdadmClusterTest) getDeletedMachines(client client.Client) []*clusterv1.Machine {
machines := []*clusterv1.Machine{}
for _, machine := range e.machines {
m := &clusterv1.Machine{}
client.Get(context.Background(), types.NamespacedName{
Name: machine.Name,
Namespace: machine.Namespace,
}, m)
if m.DeletionTimestamp != nil {
machines = append(machines, m)
}
}
return machines
}

func getHealthyEtcdResponse() *http.Response {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString("{\"Health\": \"true\"}")),
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ go 1.19
require (
github.com/aws/etcdadm-bootstrap-provider v1.0.9
github.com/go-logr/logr v1.2.3
github.com/go-logr/zapr v1.2.3
github.com/hashicorp/go-multierror v1.1.1
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.5
github.com/pkg/errors v0.9.1
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
go.uber.org/zap v1.24.0
k8s.io/api v0.26.1
k8s.io/apimachinery v0.26.1
k8s.io/apiserver v0.26.1
Expand All @@ -37,7 +39,6 @@ require (
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
Expand Down Expand Up @@ -71,7 +72,6 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.6.0 // indirect
Expand Down
Loading

0 comments on commit c7a2b6d

Please sign in to comment.