Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add annotation to configure healthcheck retries for EtcdadmCluster #44

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v1beta1/etcdadmcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
const (
UpgradeInProgressAnnotation = "etcdcluster.cluster.x-k8s.io/upgrading"

HealthCheckRetriesAnnotation = "etcdcluster.cluster.x-k8s.io/healthcheck-retries"

// EtcdadmClusterFinalizer is the finalizer applied to EtcdadmCluster resources
// by its managing controller.
EtcdadmClusterFinalizer = "etcdcluster.cluster.x-k8s.io"
Expand Down
1 change: 1 addition & 0 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type EtcdadmClusterReconciler struct {
Scheme *runtime.Scheme
etcdHealthCheckConfig etcdHealthCheckConfig
MaxConcurrentReconciles int
HealthCheckInterval time.Duration
GetEtcdClient func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error)
isPortOpen func(ctx context.Context, endpoint string) bool
}
Expand Down
33 changes: 21 additions & 12 deletions controllers/periodic_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controllers

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -18,8 +20,7 @@ import (
)

const (
maxUnhealthyCount = 5
healthCheckInterval = 30
maxUnhealthyCount = 5
)

type etcdHealthCheckConfig struct {
Expand All @@ -38,7 +39,7 @@ type etcdadmClusterMemberHealthConfig struct {
func (r *EtcdadmClusterReconciler) startHealthCheckLoop(ctx context.Context, done <-chan struct{}) {
r.Log.Info("Starting periodic healthcheck loop")
etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)
ticker := time.NewTicker(healthCheckInterval * time.Second)
ticker := time.NewTicker(r.HealthCheckInterval)
defer ticker.Stop()

for {
Expand All @@ -64,6 +65,14 @@ func (r *EtcdadmClusterReconciler) startHealthCheck(ctx context.Context, etcdadm
log.Info("EtcdadmCluster reconciliation is paused, skipping health checks")
continue
}
if val, set := ec.Annotations[etcdv1.HealthCheckRetriesAnnotation]; set {
if retries, err := strconv.Atoi(val); err != nil || retries < 0 {
log.Info(fmt.Sprintf("healthcheck-retries annotation configured with invalid value: %v", err))
} else if retries == 0 {
log.Info("healthcheck-retries annotation configured to 0, skipping health checks")
continue
}
}
if conditions.IsFalse(&ec, etcdv1.EtcdCertificatesAvailableCondition) {
log.Info("EtcdadmCluster certificates are not ready, skipping health checks")
continue
Expand Down Expand Up @@ -139,7 +148,15 @@ func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Co
log.Info("Machine for member does not exist", "member", endpoint)
currClusterHFConfig.unhealthyMembersToRemove[endpoint] = m
}
if currClusterHFConfig.unhealthyMembersFrequency[endpoint] >= maxUnhealthyCount {
unhealthyCount := maxUnhealthyCount
if val, set := etcdCluster.Annotations[etcdv1.HealthCheckRetriesAnnotation]; set {
retries, err := strconv.Atoi(val)
if err != nil || retries < 0 {
log.Info("healthcheck-retries annotation configured with invalid value, using default retries")
}
unhealthyCount = retries
}
if currClusterHFConfig.unhealthyMembersFrequency[endpoint] >= unhealthyCount {
log.Info("Adding to list of unhealthy members to remove", "member", endpoint)
// member has been unresponsive, add the machine to unhealthyMembersToRemove queue
m := currClusterHFConfig.endpointToMachineMapper[endpoint]
Expand All @@ -158,13 +175,6 @@ func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Co
return nil
}

finalEndpoints := make([]string, 0, len(endpoints))
for _, endpoint := range endpoints {
if _, existsInUnhealthyMap := currClusterHFConfig.unhealthyMembersToRemove[endpoint]; !existsInUnhealthyMap {
finalEndpoints = append(finalEndpoints, endpoint)
}
}

var retErr error
for machineEndpoint, machineToDelete := range currClusterHFConfig.unhealthyMembersToRemove {
if err := r.removeEtcdMachine(ctx, etcdCluster, cluster, machineToDelete, getEtcdMachineAddressFromClientURL(machineEndpoint)); err != nil {
Expand All @@ -183,7 +193,6 @@ func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Co
return retErr
}

etcdCluster.Status.Endpoints = strings.Join(finalEndpoints, ",")
etcdCluster.Status.Ready = false
return r.Client.Status().Update(ctx, etcdCluster)
}
Expand Down
94 changes: 89 additions & 5 deletions controllers/periodic_healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"github.com/aws/etcdadm-controller/controllers/mocks"
"github.com/golang/mock/gomock"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func TestStartHealthCheckLoop(t *testing.T) {
_ = NewWithT(t)

ctrl := gomock.NewController(t)
mockEtcd := mocks.NewMockEtcdClient(ctrl)
mockRt := mocks.NewMockRoundTripper(ctrl)
Expand All @@ -27,15 +26,15 @@ func TestStartHealthCheckLoop(t *testing.T) {

fakeKubernetesClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(etcdTest.gatherObjects()...).Build()

etcdEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
mockEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
return mockEtcd, nil
}

r := &EtcdadmClusterReconciler{
Client: fakeKubernetesClient,
uncachedClient: fakeKubernetesClient,
Log: log.Log,
GetEtcdClient: etcdEtcdClient,
GetEtcdClient: mockEtcdClient,
}
mockHttpClient := &http.Client{
Transport: mockRt,
Expand All @@ -44,12 +43,97 @@ func TestStartHealthCheckLoop(t *testing.T) {
r.etcdHealthCheckConfig.clusterToHttpClient.Store(etcdTest.cluster.UID, mockHttpClient)
r.SetIsPortOpen(isPortOpenMock)

mockRt.EXPECT().RoundTrip(gomock.Any()).Return(healthyEtcdResponse, nil).Times(3)
mockRt.EXPECT().RoundTrip(gomock.Any()).Return(getHealthyEtcdResponse(), nil).Times(3)

etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)
r.startHealthCheck(context.Background(), etcdadmClusterMapper)
}

func TestStartHealthCheckLoopWithNoRetries(t *testing.T) {
g := NewWithT(t)
ctrl := gomock.NewController(t)
mockEtcd := mocks.NewMockEtcdClient(ctrl)
mockRt := mocks.NewMockRoundTripper(ctrl)

etcdTest := newEtcdadmClusterTest()
etcdTest.buildClusterWithExternalEtcd().withHealthCheckRetries(0)
etcdTest.etcdadmCluster.Status.CreationComplete = true

fakeKubernetesClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(etcdTest.gatherObjects()...).Build()

mockEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
return mockEtcd, nil
}

r := &EtcdadmClusterReconciler{
Client: fakeKubernetesClient,
uncachedClient: fakeKubernetesClient,
Log: log.Log,
GetEtcdClient: mockEtcdClient,
}
mockHttpClient := &http.Client{
Transport: mockRt,
}

r.etcdHealthCheckConfig.clusterToHttpClient.Store(etcdTest.cluster.UID, mockHttpClient)
r.SetIsPortOpen(isPortOpenMock)

mockRt.EXPECT().RoundTrip(gomock.Any()).Return(nil, errors.New("error")).Times(3)

etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)
r.startHealthCheck(context.Background(), etcdadmClusterMapper)

g.Expect(etcdadmClusterMapper).To(BeEmpty())
}

func TestStartHealthCheckLoopWithCustomRetries(t *testing.T) {
g := NewWithT(t)
ctrl := gomock.NewController(t)
mockEtcd := mocks.NewMockEtcdClient(ctrl)
mockRt := mocks.NewMockRoundTripper(ctrl)
etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)

etcdTest := newEtcdadmClusterTest()
etcdTest.buildClusterWithExternalEtcd().withHealthCheckRetries(3)
etcdTest.etcdadmCluster.Status.CreationComplete = true

fakeKubernetesClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(etcdTest.gatherObjects()...).Build()
mockEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
return mockEtcd, nil
}

r := &EtcdadmClusterReconciler{
Client: fakeKubernetesClient,
uncachedClient: fakeKubernetesClient,
Log: log.Log,
GetEtcdClient: mockEtcdClient,
}
mockHttpClient := &http.Client{
Transport: mockRt,
}

r.etcdHealthCheckConfig.clusterToHttpClient.Store(etcdTest.cluster.UID, mockHttpClient)
r.SetIsPortOpen(isPortOpenMock)

mockRt.EXPECT().RoundTrip(gomock.Any()).Return(nil, errors.New("error")).Times(9)
mockEtcd.EXPECT().MemberList(gomock.Any()).Return(etcdTest.getMemberListResponse(), nil).Times(3)
mockEtcd.EXPECT().Close().Times(3)

r.startHealthCheck(context.Background(), etcdadmClusterMapper)
g.Expect(etcdTest.getDeletedMachines(fakeKubernetesClient)).To(BeEmpty())

r.startHealthCheck(context.Background(), etcdadmClusterMapper)
g.Expect(etcdTest.getDeletedMachines(fakeKubernetesClient)).To(BeEmpty())

r.startHealthCheck(context.Background(), etcdadmClusterMapper)
g.Expect(etcdTest.getDeletedMachines(fakeKubernetesClient)).To(HaveLen(3))
}

type RoundTripperFunc func(*http.Request) (*http.Response, error)

func (fn RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return fn(r)
}

func isPortOpenMock(_ context.Context, _ string) bool {
return true
Expand Down
1 change: 1 addition & 0 deletions controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (r *EtcdadmClusterReconciler) scaleDownEtcdCluster(ctx context.Context, ec
machineAddress := getEtcdMachineAddress(machineToDelete)
return ctrl.Result{}, r.removeEtcdMachine(ctx, ec, cluster, machineToDelete, machineAddress)
}

func (r *EtcdadmClusterReconciler) removeEtcdMachine(ctx context.Context, ec *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster, machineToDelete *clusterv1.Machine, machineAddress string) error {
peerURL := fmt.Sprintf("https://%s:2380", machineAddress)
etcdClient, err := r.GetEtcdClient(ctx, cluster, ec.Status.Endpoints)
Expand Down
62 changes: 52 additions & 10 deletions controllers/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
Expand All @@ -11,6 +12,8 @@ import (
etcdbootstrapv1 "github.com/aws/etcdadm-bootstrap-provider/api/v1beta1"
etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1"
"github.com/google/uuid"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -48,11 +51,6 @@ var (
},
},
}

healthyEtcdResponse = &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString("{\"Health\": \"true\"}")),
}
)

type etcdadmClusterTest struct {
Expand All @@ -72,7 +70,7 @@ func newEtcdadmClusterTest() *etcdadmClusterTest {
}
}

func (e *etcdadmClusterTest) buildClusterWithExternalEtcd() {
func (e *etcdadmClusterTest) buildClusterWithExternalEtcd() *etcdadmClusterTest {
e.cluster = e.newClusterWithExternalEtcd()
e.etcdadmCluster = e.newEtcdadmCluster(e.cluster)
e.machines = []*clusterv1.Machine{}
Expand All @@ -83,6 +81,15 @@ func (e *etcdadmClusterTest) buildClusterWithExternalEtcd() {
endpoints = append(endpoints, fmt.Sprintf("https://%v:2379", machine.Status.Addresses[0].Address))
}
e.etcdadmCluster.Status.Endpoints = strings.Join(endpoints, ",")
return e
}

func (e *etcdadmClusterTest) withHealthCheckRetries(retries int) *etcdadmClusterTest {
if e.etcdadmCluster.Annotations == nil {
e.etcdadmCluster.Annotations = map[string]string{}
}
e.etcdadmCluster.Annotations[etcdv1.HealthCheckRetriesAnnotation] = fmt.Sprintf("%d", retries)
return e
}

// newClusterWithExternalEtcd return a CAPI cluster object with managed external etcd ref
Expand Down Expand Up @@ -156,10 +163,11 @@ func (e *etcdadmClusterTest) newEtcdMachine() *clusterv1.Machine {
APIVersion: clusterv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: names.SimpleNameGenerator.GenerateName(e.etcdadmCluster.Name + "-"),
Namespace: e.etcdadmCluster.Namespace,
Labels: EtcdLabelsForCluster(e.cluster.Name, e.etcdadmCluster.Name),
UID: types.UID(uuid.New().String()),
Name: names.SimpleNameGenerator.GenerateName(e.etcdadmCluster.Name + "-"),
Namespace: e.etcdadmCluster.Namespace,
Labels: EtcdLabelsForCluster(e.cluster.Name, e.etcdadmCluster.Name),
UID: types.UID(uuid.New().String()),
Finalizers: []string{etcdv1.EtcdadmClusterFinalizer},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(e.etcdadmCluster, etcdv1.GroupVersion.WithKind("EtcdadmCluster")),
},
Expand Down Expand Up @@ -195,3 +203,37 @@ func (e *etcdadmClusterTest) gatherObjects() []client.Object {
func (e *etcdadmClusterTest) getEtcdClusterName() string {
return fmt.Sprintf("%s-%s", e.name, etcdClusterNameSuffix)
}

func (e *etcdadmClusterTest) getMemberListResponse() *clientv3.MemberListResponse {
members := []*etcdserverpb.Member{}
for _, machine := range e.machines {
members = append(members, &etcdserverpb.Member{
PeerURLs: []string{fmt.Sprintf("https://%s:2379", machine.Status.Addresses[0].Address)},
})
}
return &clientv3.MemberListResponse{
Members: members,
}
}

func (e *etcdadmClusterTest) getDeletedMachines(client client.Client) []*clusterv1.Machine {
machines := []*clusterv1.Machine{}
for _, machine := range e.machines {
m := &clusterv1.Machine{}
_ = client.Get(context.Background(), types.NamespacedName{
Name: machine.Name,
Namespace: machine.Namespace,
}, m)
if m.DeletionTimestamp != nil {
machines = append(machines, m)
}
}
return machines
}

func getHealthyEtcdResponse() *http.Response {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString("{\"Health\": \"true\"}")),
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.19
require (
github.com/aws/etcdadm-bootstrap-provider v1.0.9
github.com/go-logr/logr v1.2.3
github.com/golang/mock v1.4.4
github.com/google/uuid v1.3.0
github.com/hashicorp/go-multierror v1.1.1
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.5
Expand Down Expand Up @@ -45,13 +47,11 @@ require (
github.com/gobuffalo/flect v1.0.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.4.4 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
Loading