Skip to content

Commit

Permalink
Add annotation to configure healthcheck retries for EtcdadmCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavmpandey08 committed Sep 5, 2023
1 parent ec74d9d commit b01352e
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 9 deletions.
2 changes: 2 additions & 0 deletions api/v1beta1/etcdadmcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
const (
UpgradeInProgressAnnotation = "etcdcluster.cluster.x-k8s.io/upgrading"

HealthCheckRetriesAnnotation = "etcdcluster.cluster.x-k8s.io/healthcheck-retries"

// EtcdadmClusterFinalizer is the finalizer applied to EtcdadmCluster resources
// by its managing controller.
EtcdadmClusterFinalizer = "etcdcluster.cluster.x-k8s.io"
Expand Down
1 change: 1 addition & 0 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type EtcdadmClusterReconciler struct {
Scheme *runtime.Scheme
etcdHealthCheckConfig etcdHealthCheckConfig
MaxConcurrentReconciles int
HealthCheckInterval time.Duration
GetEtcdClient func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error)
isPortOpen func(ctx context.Context, endpoint string) bool
}
Expand Down
20 changes: 18 additions & 2 deletions controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ func newClusterWithExternalEtcd() *clusterv1.Cluster {
}
}

func newEtcdadmCluster(cluster *clusterv1.Cluster) *etcdv1.EtcdadmCluster {
return &etcdv1.EtcdadmCluster{
func newEtcdadmCluster(cluster *clusterv1.Cluster, opts ...etcdadmClusterTestOpt) *etcdv1.EtcdadmCluster {
etcdCluster := &etcdv1.EtcdadmCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testEtcdadmClusterName,
Expand Down Expand Up @@ -463,8 +463,24 @@ func newEtcdadmCluster(cluster *clusterv1.Cluster) *etcdv1.EtcdadmCluster {
},
},
}

for _, opt := range opts {
opt(etcdCluster)
}

return etcdCluster
}

type etcdadmClusterTestOpt func(e *etcdv1.EtcdadmCluster)

func withPausedAnnotation(e *etcdv1.EtcdadmCluster) {
e.SetAnnotations(map[string]string{etcdv1.HealthCheckRetriesAnnotation: "0"})
}

// func withOwnerRef(e *etcdv1.EtcdadmCluster) {
// e.SetAnnotations(map[string]string{etcdv1.HealthCheckRetriesAnnotation: "true"})
// }

func newEtcdMachine(etcdadmCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) *clusterv1.Machine {
return &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Expand Down
30 changes: 26 additions & 4 deletions controllers/periodic_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controllers

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -18,8 +20,7 @@ import (
)

const (
maxUnhealthyCount = 5
healthCheckInterval = 30
maxUnhealthyCount = 5
)

type etcdHealthCheckConfig struct {
Expand All @@ -38,7 +39,7 @@ type etcdadmClusterMemberHealthConfig struct {
func (r *EtcdadmClusterReconciler) startHealthCheckLoop(ctx context.Context, done <-chan struct{}) {
r.Log.Info("Starting periodic healthcheck loop")
etcdadmClusterMapper := make(map[types.UID]etcdadmClusterMemberHealthConfig)
ticker := time.NewTicker(healthCheckInterval * time.Second)
ticker := time.NewTicker(r.HealthCheckInterval)
defer ticker.Stop()

for {
Expand All @@ -64,6 +65,14 @@ func (r *EtcdadmClusterReconciler) startHealthCheck(ctx context.Context, etcdadm
log.Info("EtcdadmCluster reconciliation is paused, skipping health checks")
continue
}
if val, set := ec.Annotations[etcdv1.HealthCheckRetriesAnnotation]; set {
if retries, err := strconv.Atoi(val); err != nil || retries < 0 {
log.Info(fmt.Sprintf("healthcheck-retries annotation configured with invalid value: %v", err))
} else if retries == 0 {
log.Info("healthcheck-retries configured to 0, skipping health checks")
continue
}
}
if conditions.IsFalse(&ec, etcdv1.EtcdCertificatesAvailableCondition) {
log.Info("EtcdadmCluster certificates are not ready, skipping health checks")
continue
Expand Down Expand Up @@ -139,7 +148,15 @@ func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Co
log.Info("Machine for member does not exist", "member", endpoint)
currClusterHFConfig.unhealthyMembersToRemove[endpoint] = m
}
if currClusterHFConfig.unhealthyMembersFrequency[endpoint] >= maxUnhealthyCount {
unhealthyCount := maxUnhealthyCount
if val, set := etcdCluster.Annotations[etcdv1.HealthCheckRetriesAnnotation]; set {
retries, err := strconv.Atoi(val)
if err != nil || retries < 0 {
log.Info("healthcheck-retries annotation configured with invalid value, using default retries")
}
unhealthyCount = retries
}
if currClusterHFConfig.unhealthyMembersFrequency[endpoint] >= unhealthyCount {
log.Info("Adding to list of unhealthy members to remove", "member", endpoint)
// member has been unresponsive, add the machine to unhealthyMembersToRemove queue
m := currClusterHFConfig.endpointToMachineMapper[endpoint]
Expand Down Expand Up @@ -205,3 +222,8 @@ func (r *EtcdadmClusterReconciler) getOwnedMachines(ctx context.Context, cluster

return etcdMachines.Filter(collections.OwnedMachines(&ec))
}

func hasHealthCheckPauseAnnotation(ec *etcdv1.EtcdadmCluster) bool {
_, has := ec.Annotations[etcdv1.HealthCheckRetriesAnnotation]
return has
}
78 changes: 77 additions & 1 deletion controllers/periodic_healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"context"
"net/http"
"testing"
"time"

"github.com/aws/etcdadm-controller/controllers/mocks"
"github.com/go-logr/zapr"
"github.com/golang/mock/gomock"
. "github.com/onsi/gomega"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"k8s.io/apimachinery/pkg/types"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -16,7 +21,6 @@ import (

func TestStartHealthCheckLoop(t *testing.T) {
_ = NewWithT(t)

ctrl := gomock.NewController(t)
mockEtcd := mocks.NewMockEtcdClient(ctrl)
mockRt := mocks.NewMockRoundTripper(ctrl)
Expand Down Expand Up @@ -50,6 +54,78 @@ func TestStartHealthCheckLoop(t *testing.T) {
r.startHealthCheck(context.Background(), etcdadmClusterMapper)
}

func TestStartHealthCheckLoopPaused(t *testing.T) {
g := NewWithT(t)
core, recordedLogs := observer.New(zapcore.InfoLevel)
logger := zapr.NewLogger(zap.New(core))

cluster := newClusterWithExternalEtcd()
etcdadmCluster := newEtcdadmCluster(cluster, withPausedAnnotation)
fakeClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(etcdadmCluster).Build()

r := &EtcdadmClusterReconciler{
Client: fakeClient,
Log: logger,
HealthCheckInterval: time.Second, // override the healthcheck interval to 1 second
}

done := make(chan struct{})

// Stop the healthcheck loop after 5 seconds
go func() {
time.Sleep(5 * time.Second)
close(done)
}()

_ = &http.Client{
Transport: RoundTripperFunc(func(*http.Request) (*http.Response, error) {
return nil, nil
}),
}

r.startHealthCheckLoop(context.Background(), done)

g.Expect(recordedLogs.All()).To(Not(BeEmpty()))
g.Expect(recordedLogs.All()[recordedLogs.Len()-1].Message).To(Equal("HealthCheck paused for EtcdadmCluster, skipping"))
}

type RoundTripperFunc func(*http.Request) (*http.Response, error)

func (fn RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return fn(r)
}

func TestStartHealthCheckLoop2(t *testing.T) {
g := NewWithT(t)
core, recordedLogs := observer.New(zapcore.InfoLevel)
logger := zapr.NewLogger(zap.New(core))

cluster := newClusterWithExternalEtcd()
etcdadmCluster := newEtcdadmCluster(cluster)
etcdadmCluster.Status.CreationComplete = true
fakeClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(etcdadmCluster).Build()

// fakeClient.Create()
r := &EtcdadmClusterReconciler{
Client: fakeClient,
Log: logger,
HealthCheckInterval: 1, // override the healthcheck interval to 1 second
}
r.SetIsPortOpen(func(_ context.Context, _ string) bool { return true })

done := make(chan struct{})

// Stop the healthcheck loop after 5 seconds
go func() {
time.Sleep(5 * time.Second)
close(done)
}()

r.startHealthCheckLoop(context.Background(), done)

g.Expect(recordedLogs.All()).To(Not(BeEmpty()))
g.Expect(recordedLogs.All()[recordedLogs.Len()-1].Message).To(Equal("HealthCheck paused for EtcdadmCluster, skipping"))
}

func isPortOpenMock(_ context.Context, _ string) bool {
return true
Expand Down
1 change: 1 addition & 0 deletions controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (r *EtcdadmClusterReconciler) scaleDownEtcdCluster(ctx context.Context, ec
machineAddress := getEtcdMachineAddress(machineToDelete)
return ctrl.Result{}, r.removeEtcdMachine(ctx, ec, cluster, machineToDelete, machineAddress)
}

func (r *EtcdadmClusterReconciler) removeEtcdMachine(ctx context.Context, ec *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster, machineToDelete *clusterv1.Machine, machineAddress string) error {
peerURL := fmt.Sprintf("https://%s:2380", machineAddress)
etcdClient, err := r.GetEtcdClient(ctx, cluster, ec.Status.Endpoints)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ go 1.19
require (
github.com/aws/etcdadm-bootstrap-provider v1.0.9
github.com/go-logr/logr v1.2.3
github.com/go-logr/zapr v1.2.3
github.com/hashicorp/go-multierror v1.1.1
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.5
github.com/pkg/errors v0.9.1
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
go.uber.org/zap v1.24.0
k8s.io/api v0.26.1
k8s.io/apimachinery v0.26.1
k8s.io/apiserver v0.26.1
Expand All @@ -37,7 +39,6 @@ require (
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
Expand Down Expand Up @@ -71,7 +72,6 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.6.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

etcdbp "github.com/aws/etcdadm-bootstrap-provider/api/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -57,13 +58,15 @@ func main() {
var metricsAddr string
var enableLeaderElection bool
var maxConcurrentReconciles int
var healthcheckInterval int
flag.StringVar(&metricsAddr, "metrics-addr", "localhost:8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.StringVar(&watchNamespace, "namespace", "",
"Namespace that the controller watches to reconcile etcdadmCluster objects. If unspecified, the controller watches for objects across all namespaces.")
flag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", 10, "The maximum number of concurrent etcdadm-controller reconciles.")
flag.IntVar(&healthcheckInterval, "healthcheck-interval", 30, "The time interval between each healthcheck loop in seconds.")
flag.Parse()

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
Expand All @@ -88,6 +91,7 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("EtcdadmCluster"),
Scheme: mgr.GetScheme(),
MaxConcurrentReconciles: maxConcurrentReconciles,
HealthCheckInterval: time.Second * time.Duration(healthcheckInterval),
}
if err = (etcdadmReconciler).SetupWithManager(ctx, mgr, stopCh); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "EtcdadmCluster")
Expand Down

0 comments on commit b01352e

Please sign in to comment.