From a7493819098b3d8defc729ec3aad60c1a14522b7 Mon Sep 17 00:00:00 2001 From: Jussi Nummelin Date: Thu, 4 Feb 2021 00:04:38 +0000 Subject: [PATCH] Use true leader elector for controller counting Signed-off-by: Jussi Nummelin The previous iteration, while successfull at proving the approach in general, had a drawback of the konnectivity-server process flapping bit too much. This has the unwelcome side-effect of the agents getting slightly confused to which and how many servers they should connect with. This causes connectivity issues between API and workers as all that comms is done through konnectivity tunnels. This commit changes couple things: - standard leader election used for per controller leases. They are more accurate but do use bit more resources. - Instead of restarting the whole konnectivity component we now only restart the supervisor part (== the k-server process itself) - per controller lease broken into separate component, to keep things bit apart Signed-off-by: Jussi Nummelin --- cmd/server.go | 8 + pkg/component/server/controllerlease.go | 87 ++++++++ pkg/component/server/konnectivity.go | 254 ++++++++++----------- pkg/kubernetes/lease.go | 285 +----------------------- pkg/kubernetes/lease_test.go | 38 ++++ 5 files changed, 260 insertions(+), 412 deletions(-) create mode 100644 pkg/component/server/controllerlease.go create mode 100644 pkg/kubernetes/lease_test.go diff --git a/cmd/server.go b/cmd/server.go index 1318a8cde3dd..5ac95ca08ad8 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -180,6 +180,14 @@ func startServer(token string) error { LogLevel: logging["kube-apiserver"], Storage: storageBackend, }) + + if clusterConfig.Spec.API.ExternalAddress != "" { + componentManager.Add(&server.ControllerLease{ + ClusterConfig: clusterConfig, + KubeClientFactory: adminClientFactory, + }) + } + componentManager.Add(&server.Konnectivity{ ClusterConfig: clusterConfig, LogLevel: logging["konnectivity-server"], diff --git a/pkg/component/server/controllerlease.go b/pkg/component/server/controllerlease.go new file mode 100644 index 000000000000..b22a9cbe1250 --- /dev/null +++ b/pkg/component/server/controllerlease.go @@ -0,0 +1,87 @@ +package server + +import ( + "context" + "fmt" + "os" + + "github.com/sirupsen/logrus" + + config "github.com/k0sproject/k0s/pkg/apis/v1beta1" + kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "github.com/k0sproject/k0s/pkg/leaderelection" +) + +// ControllerLease implements a component that manages a lease per controller. +// The per-controller leases are used to determine the amount of currently running controllers +type ControllerLease struct { + ClusterConfig *config.ClusterConfig + KubeClientFactory kubeutil.ClientFactory + + cancelCtx context.Context + cancelFunc context.CancelFunc + leaseCancel context.CancelFunc +} + +// Init initializes the component needs +func (c *ControllerLease) Init() error { + return nil +} + +// Run runs the leader elector to keep the lease object up-to-date. +func (c *ControllerLease) Run() error { + c.cancelCtx, c.cancelFunc = context.WithCancel(context.Background()) + log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) + client, err := c.KubeClientFactory.GetClient() + if err != nil { + return fmt.Errorf("can't create kubernetes rest client for lease pool: %v", err) + } + + // hostname used to make the lease names be clear to which controller they belong to + holderIdentity, err := os.Hostname() + if err != nil { + return nil + } + leaseID := fmt.Sprintf("k0s-ctrl-%s", holderIdentity) + + leasePool, err := leaderelection.NewLeasePool(client, leaseID, leaderelection.WithLogger(log)) + + if err != nil { + return err + } + events, cancel, err := leasePool.Watch() + if err != nil { + return err + } + + c.leaseCancel = cancel + + go func() { + for { + select { + case <-events.AcquiredLease: + log.Info("acquired leader lease") + case <-events.LostLease: + log.Error("lost leader lease, this should not really happen!?!?!?") + case <-c.cancelCtx.Done(): + return + } + } + }() + return nil +} + +// Stop stops the component +func (c *ControllerLease) Stop() error { + if c.leaseCancel != nil { + c.leaseCancel() + } + + if c.cancelFunc != nil { + c.cancelFunc() + } + return nil +} + +// Healthy is a no-op healchcheck +func (c *ControllerLease) Healthy() error { return nil } diff --git a/pkg/component/server/konnectivity.go b/pkg/component/server/konnectivity.go index f043c00a20a3..76ac1e198bb9 100644 --- a/pkg/component/server/konnectivity.go +++ b/pkg/component/server/konnectivity.go @@ -20,9 +20,12 @@ import ( "fmt" "os" "path/filepath" + "strconv" + "strings" "time" "github.com/sirupsen/logrus" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/k0sproject/k0s/internal/util" config "github.com/k0sproject/k0s/pkg/apis/v1beta1" @@ -33,27 +36,25 @@ import ( "github.com/k0sproject/k0s/pkg/supervisor" ) -const ( - konnectivityLeaseName = "konnectivity-server" - konnectivityLeaseNameSpace = "kube-node-lease" - serviceName = "konnectivity-server" -) - -// Konnectivity implement the component interface of konnectivity server +// Konnectivity implements the component interface of konnectivity server type Konnectivity struct { ClusterConfig *config.ClusterConfig K0sVars constant.CfgVars LogLevel string - supervisor supervisor.Supervisor + supervisor *supervisor.Supervisor uid int // used for lease lock KubeClientFactory k8sutil.ClientFactory serverCount int - leaseLock *kubernetes.LeaseLock - leaseCtx context.Context - leaseCancel context.CancelFunc + + serverCountChan chan int + + stopCtx context.Context + stopFunc context.CancelFunc + + log *logrus.Entry } // Init ... @@ -73,65 +74,102 @@ func (k *Konnectivity) Init() error { return fmt.Errorf("failed to chown %s: %v", k.K0sVars.KonnectivitySocketDir, err) } - // set default serverCount to 1 - k.serverCount = kubernetes.MinLeaseHolders + k.log = logrus.WithFields(logrus.Fields{"component": "konnectivity"}) + return assets.Stage(k.K0sVars.BinDir, "konnectivity-server", constant.BinDirMode) } // Run .. func (k *Konnectivity) Run() error { - args := []string{ - fmt.Sprintf("--uds-name=%s", filepath.Join(k.K0sVars.KonnectivitySocketDir, "konnectivity-server.sock")), - fmt.Sprintf("--cluster-cert=%s", filepath.Join(k.K0sVars.CertRootDir, "server.crt")), - fmt.Sprintf("--cluster-key=%s", filepath.Join(k.K0sVars.CertRootDir, "server.key")), - fmt.Sprintf("--kubeconfig=%s", k.K0sVars.KonnectivityKubeConfigPath), - "--mode=grpc", - "--server-port=0", - "--agent-port=8132", - "--admin-port=8133", - "--agent-namespace=kube-system", - "--agent-service-account=konnectivity-agent", - "--authentication-audience=system:konnectivity-server", - "--logtostderr=true", - "--stderrthreshold=1", - "-v=2", - fmt.Sprintf("--v=%s", k.LogLevel), - "--enable-profiling=false", - } + + // Buffered chan to send updates for the count of servers + k.serverCountChan = make(chan int, 1) + + k.stopCtx, k.stopFunc = context.WithCancel(context.Background()) + + go k.runServer() if k.ClusterConfig.Spec.API.ExternalAddress != "" { - serverID, err := util.MachineID() - if err != nil { - logrus.Errorf("failed to fetch server ID for %v", serviceName) - } - args = append(args, fmt.Sprintf("--server-count=%d", k.serverCount)) - args = append(args, fmt.Sprintf("--server-id=%s", serverID)) + go k.runLeaseCounter() + } else { + // It's a buffered channel so once we start the runServer routine it'll pick this up and just sees it never changing + k.serverCountChan <- 1 } - logrus.Info("Starting konnectivity") - k.supervisor = supervisor.Supervisor{ - Name: "konnectivity", - BinPath: assets.BinPath("konnectivity-server", k.K0sVars.BinDir), - DataDir: k.K0sVars.DataDir, - RunDir: k.K0sVars.RunDir, - Args: args, - UID: k.uid, - } + return k.writeKonnectivityAgent() +} - err := k.supervisor.Supervise() +func (k *Konnectivity) defaultArgs() util.MappedArgs { + serverID, err := util.MachineID() if err != nil { - return err + logrus.Errorf("failed to fetch server ID for konnectivity-server") } - if k.ClusterConfig.Spec.API.ExternalAddress != "" { - k.runLease() + return util.MappedArgs{ + "--uds-name": filepath.Join(k.K0sVars.KonnectivitySocketDir, "konnectivity-server.sock"), + "--cluster-cert": filepath.Join(k.K0sVars.CertRootDir, "server.crt"), + "--cluster-key": filepath.Join(k.K0sVars.CertRootDir, "server.key"), + "--kubeconfig": k.K0sVars.KonnectivityKubeConfigPath, + "--mode": "grpc", + "--server-port": "0", + "--agent-port": "8132", + "--admin-port": "8133", + "--agent-namespace": "kube-system", + "--agent-service-account": "konnectivity-agent", + "--authentication-audience": "system:konnectivity-server", + "--logtostderr": "true", + "--stderrthreshold": "1", + "--v": k.LogLevel, + "--enable-profiling": "false", + "--server-id": serverID, + } +} + +// runs the supervisor and restarts if the calculated server count changes +func (k *Konnectivity) runServer() { + for { + select { + case <-k.stopCtx.Done(): + return + case count := <-k.serverCountChan: + // restart only if the count actually changes + if count != k.serverCount { + // Stop supervisor + + if k.supervisor != nil { + if err := k.supervisor.Stop(); err != nil { + logrus.Errorf("failed to stop supervisor: %s", err) + // TODO Should we just return? That means other part will continue to run but the server is never properly restarted + } + } + args := k.defaultArgs() + args["--server-count"] = strconv.Itoa(count) + k.supervisor = &supervisor.Supervisor{ + Name: "konnectivity", + BinPath: assets.BinPath("konnectivity-server", k.K0sVars.BinDir), + DataDir: k.K0sVars.DataDir, + RunDir: k.K0sVars.RunDir, + Args: args.ToArgs(), + UID: k.uid, + } + err := k.supervisor.Supervise() + if err != nil { + logrus.Errorf("failed to start konnectivity supervisor: %s", err) + k.supervisor = nil // not to make the next loop to try to stop it first + continue + } + k.serverCount = count + } + } } - return k.writeKonnectivityAgent() } // Stop stops func (k *Konnectivity) Stop() error { - if k.ClusterConfig.Spec.API.ExternalAddress != "" { - k.stopLease() + if k.stopFunc != nil { + k.stopFunc() + } + if k.supervisor == nil { + return nil } return k.supervisor.Stop() } @@ -164,95 +202,47 @@ func (k *Konnectivity) writeKonnectivityAgent() error { return nil } -func (k *Konnectivity) runLease() { - k.leaseCtx, k.leaseCancel = context.WithCancel(context.Background()) - go func(ctx context.Context) { - logrus.Infof("starting %v lease watcher", serviceName) - leaseLock, err := k.newLeaseLock() - if err != nil { - logrus.Error(err) - } - k.leaseLock = leaseLock - for { - select { - case <-ctx.Done(): - logrus.Debugf("stopping lease watcher for %v", serviceName) - return - default: - k.leaseLock.LeaseRunner(context.Background()) +func (k *Konnectivity) runLeaseCounter() { + + logrus.Infof("starting to count controller lease holders every 10 secs") + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-k.stopCtx.Done(): + return + case <-ticker.C: + count, err := k.countLeaseHolders() + if err != nil { + logrus.Errorf("failed to count controller leases: %s", err) + continue } + k.serverCountChan <- count } - }(k.leaseCtx) - - go func(ctx context.Context) { - logrus.Infof("watching %v lease holders", serviceName) - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - logrus.Debugf("stopping lease holder count for %v", serviceName) - return - case <-ticker.C: - observedLeaseHolders := k.leaseLock.CountValidLeaseHolders(context.Background()) - if observedLeaseHolders != k.serverCount { - logrus.Debugf("change in %v lease holders detected. refreshing service.", serviceName) - k.serverCount = observedLeaseHolders - - // restarting service - if err := k.restartService(); err != nil { - logrus.Errorf("failed to restart %v: %v", serviceName, err) - } - } - } - logrus.Debugf("found %v lease holders for %v", k.serverCount, serviceName) - } - }(k.leaseCtx) + } } -func (k *Konnectivity) newLeaseLock() (*kubernetes.LeaseLock, error) { +func (k *Konnectivity) countLeaseHolders() (int, error) { client, err := k.KubeClientFactory.GetClient() if err != nil { - return nil, fmt.Errorf("failed to get lease client: %v", err) + return 0, err } - - holderIdentity, err := os.Hostname() + ctx, cancel := context.WithTimeout(k.stopCtx, 5*time.Second) + defer cancel() + count := 0 + leases, err := client.CoordinationV1().Leases("kube-node-lease").List(ctx, v1.ListOptions{}) if err != nil { - return nil, err + return 0, err } - - log := logrus.WithFields(logrus.Fields{"component": "konnectivity"}) - - leaseConfig := &kubernetes.LeaseConfig{ - HolderIdentity: holderIdentity, - Name: fmt.Sprintf("%v-%v", konnectivityLeaseName, holderIdentity), - Namespace: konnectivityLeaseNameSpace, - ServiceName: serviceName, - LeaseDuration: 120 * time.Second, // to prevent flapping of the konnectivity service, the lease is somewhat longer than normal - RenewDeadline: 40 * time.Second, - RetryPeriod: 30 * time.Second, - } - return &kubernetes.LeaseLock{ - Config: leaseConfig, - Client: client.CoordinationV1(), - Log: log, - }, nil -} - -func (k *Konnectivity) stopLease() { - if k.leaseCancel != nil { - k.leaseCancel() + for _, l := range leases.Items { + if strings.HasPrefix(l.ObjectMeta.Name, "k0s-ctrl") { + if kubernetes.IsValidLease(l) { + count++ + } + } } -} -func (k *Konnectivity) restartService() error { - if err := k.Stop(); err != nil { - return fmt.Errorf("failed to stop %v: %v", serviceName, err) - } - if err := k.Run(); err != nil { - return fmt.Errorf("failed to start %v: %v", serviceName, err) - } - return nil + return count, nil } const konnectivityAgentTemplate = ` @@ -338,5 +328,5 @@ spec: audience: system:konnectivity-server ` -// Health-check interface +// Healthy is a no-op check func (k *Konnectivity) Healthy() error { return nil } diff --git a/pkg/kubernetes/lease.go b/pkg/kubernetes/lease.go index d2f87d4a3ded..a380eb66bd02 100644 --- a/pkg/kubernetes/lease.go +++ b/pkg/kubernetes/lease.go @@ -16,291 +16,16 @@ limitations under the License. package kubernetes import ( - "bytes" - "context" - "encoding/json" - "fmt" - "os" "time" - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - coordinationv1 "k8s.io/api/coordination/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" -) - -var ( - MinLeaseHolders = 1 ) -type LeaseLock struct { - Client coordinationv1client.LeasesGetter - Config *LeaseConfig - Log *logrus.Entry - - lease *coordinationv1.Lease - observedSpec *coordinationv1.LeaseSpec - observedRawSpec []byte - observedTime time.Time -} - -type LeaseConfig struct { - HolderIdentity string - Name string - Namespace string - ServiceName string - - // LeaseDuration is the duration of a lease, before the lease can be re-acquired. A client needs to wait a full LeaseDuration without observing a change to - // the record before it can attempt to take over. - LeaseDuration time.Duration - - // RenewDeadline is the duration that the lease holder will retry refreshing leadership before giving up. - RenewDeadline time.Duration - - // RetryPeriod is the duration the lease clients should wait - // between tries of actions. - RetryPeriod time.Duration -} - -// getLease gets the LeaseSpec for a for the relevant LeaseMeta Name & Namespace -func (l *LeaseLock) getLease(ctx context.Context) (*coordinationv1.LeaseSpec, []byte, error) { - var err error - l.lease, err = l.Client.Leases(l.Config.Namespace).Get(ctx, l.Config.Name, metav1.GetOptions{}) - if err != nil { - return nil, nil, err - } - - leaseByte, err := json.Marshal(l.lease.Spec) - if err != nil { - return nil, nil, err - } - return &l.lease.Spec, leaseByte, nil -} - -// createLease creates the LeaseSpec for a for the relevant LeaseMeta Name, Namespace & LeaseSpec -func (l *LeaseLock) createLease(ctx context.Context, leaseSpec coordinationv1.LeaseSpec) error { - var err error - lease := &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: l.Config.Name, - Namespace: l.Config.Namespace, - Labels: map[string]string{"service": l.Config.ServiceName}, - }, - Spec: leaseSpec, - } - - l.lease, err = l.Client.Leases(l.Config.Namespace).Create(ctx, lease, metav1.CreateOptions{}) - return err -} - -// updateLease updates the LeaseSpec for a for the relevant LeaseMeta Name, Namespace & LeaseSpec -func (l *LeaseLock) updateLease(ctx context.Context, leaseSpec coordinationv1.LeaseSpec) error { - var err error - if l.lease == nil { - return fmt.Errorf("lease not initialized, call get or create first") - } - l.lease.Spec = leaseSpec - - lease, err := l.Client.Leases(l.Config.Namespace).Update(ctx, l.lease, metav1.UpdateOptions{}) - if err != nil { - return err - } - l.lease = lease - return nil -} - -func leaseConfigToLeaseSpec(lec LeaseConfig) *coordinationv1.LeaseSpec { - leaseDurationSeconds := int32(lec.LeaseDuration / time.Second) - holderIdentity, err := os.Hostname() - if err != nil { - return nil - } +// IsValidLease check whether or not the lease is expired +func IsValidLease(lease coordinationv1.Lease) bool { + leaseDur := time.Duration(*lease.Spec.LeaseDurationSeconds) - return &coordinationv1.LeaseSpec{ - HolderIdentity: &holderIdentity, - AcquireTime: &metav1.MicroTime{Time: time.Now()}, - LeaseDurationSeconds: &leaseDurationSeconds, - RenewTime: &metav1.MicroTime{Time: time.Now()}, - } -} - -func (l *LeaseLock) LeaseRunner(ctx context.Context) { - err := l.validateLeaseConfig() - if err != nil { - l.Log.Error(err) - } - - defer runtime.HandleCrash() - if !l.acquire(ctx) { - return // ctx signalled done - } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - l.renew(ctx) -} - -// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. -// Returns false if ctx signals done. -func (l *LeaseLock) acquire(ctx context.Context) bool { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - succeeded := false - desc := fmt.Sprintf("%v/%v", l.Config.Namespace, l.Config.Name) - l.Log.Infof("attempting to acquire lease %v...", desc) - wait.JitterUntil(func() { - succeeded = l.tryAcquireOrRenew(ctx) - if !succeeded { - l.Log.Infof("failed to acquire lease %v. will attempt to re-acquire.", desc) - return - } - l.Log.Infof("successfully acquired lease %v", desc) - cancel() - }, l.Config.RetryPeriod, 1.2, true, ctx.Done()) - return succeeded -} - -func (l *LeaseLock) renew(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - wait.Until(func() { - timeoutCtx, timeoutCancel := context.WithTimeout(ctx, l.Config.RenewDeadline) - defer timeoutCancel() - err := wait.PollImmediateUntil(l.Config.RetryPeriod, func() (bool, error) { - return l.tryAcquireOrRenew(timeoutCtx), nil - }, timeoutCtx.Done()) - - desc := fmt.Sprintf("%v/%v", l.Config.Namespace, l.Config.Name) - if err == nil { - l.Log.Infof("successfully renewed lease %v", desc) - return - } - // renewal failed - l.Log.Infof("failed to renew lease %v: %v", desc, err) - cancel() - }, l.Config.RetryPeriod, ctx.Done()) -} - -func (l *LeaseLock) tryAcquireOrRenew(ctx context.Context) bool { - desiredLeaseSpec := leaseConfigToLeaseSpec(*l.Config) - - // 1. obtain or create the lease - oldLeaseSpec, oldLeaseRawSpec, err := l.getLease(ctx) - - desc := fmt.Sprintf("%v/%v", l.Config.Namespace, l.Config.Name) - - if err != nil { - if !errors.IsNotFound(err) { - l.Log.Errorf("error retrieving resource lease %v: %v", desc, err) - return false - } - if err = l.createLease(ctx, *desiredLeaseSpec); err != nil { - l.Log.Errorf("error initially creating lease lock: %v", err) - return false - } - l.observedSpec = desiredLeaseSpec - l.observedTime = time.Now() - - return true - } - - // 2. record exists, check identity & time - if !bytes.Equal(l.observedRawSpec, oldLeaseRawSpec) { - l.observedSpec = oldLeaseSpec - l.observedRawSpec = oldLeaseRawSpec - l.observedTime = time.Now() - } - - expiredTime := l.observedTime.Add(l.Config.LeaseDuration) - - // check if the expired Time is after current time (I.E. not yet expired) - if len(*oldLeaseSpec.HolderIdentity) > 0 && expiredTime.After(time.Now()) { - l.Log.Debugf("lease is held by %v and has not yet expired (expiration time: %v)", *oldLeaseSpec.HolderIdentity, expiredTime) - return false - } - - // 3. update the lease - desiredLeaseSpec.AcquireTime = oldLeaseSpec.AcquireTime // leave the "acquired" field unchanged - - // update the lock itself - if err = l.updateLease(ctx, *desiredLeaseSpec); err != nil { - l.Log.Errorf("Failed to update lease: %v", err) - return false - } - - l.observedSpec = desiredLeaseSpec - l.observedTime = time.Now() - return true -} - -// fetch a list of all the leases that contain the label service=l.Config.ServiceName -func (l *LeaseLock) listLease(ctx context.Context) *coordinationv1.LeaseList { - label := map[string]string{"service": l.Config.ServiceName} - desc := fmt.Sprintf("%v/%v", l.Config.Namespace, l.Config.Name) - - ls := &metav1.LabelSelector{} - if err := metav1.Convert_Map_string_To_string_To_v1_LabelSelector(&label, ls, nil); err != nil { - logrus.Debugf("failed to parse label for listing lease %v", desc) - } - - opts := metav1.ListOptions{ - LabelSelector: metav1.FormatLabelSelector(ls), - } - - list, err := l.Client.Leases(l.Config.Namespace).List(ctx, opts) - if err != nil { - logrus.Errorf("failed to fetch lease holders for %v", desc) - } - return list -} - -// CountValidLeaseHolders fetches the leaseList and examines if a lease is still valid. -// if it is, it adds it to the validLeaseHolder count -func (l *LeaseLock) CountValidLeaseHolders(ctx context.Context) int { - var validLeases []coordinationv1.Lease - list := l.listLease(ctx) - - for i := range list.Items { - lease := list.Items[i] - leaseDurationSeconds := lease.Spec.LeaseDurationSeconds - leaseDuration := time.Duration(*leaseDurationSeconds) * time.Second - lastRenewed := lease.Spec.RenewTime - expires := lastRenewed.Add(leaseDuration).Add(l.Config.RenewDeadline) - - if expires.After(time.Now()) { - logrus.Debugf("lease for %v still valid, adding to valid lease count", *lease.Spec.HolderIdentity) - validLeases = append(validLeases, lease) - } - } - count := len(validLeases) - if count < MinLeaseHolders { - return MinLeaseHolders - } - return count -} + leaseExpiry := lease.Spec.RenewTime.Add(leaseDur * time.Second) -func (l *LeaseLock) validateLeaseConfig() error { - l.Log.Debug("Validating lease config") - var JitterFactor = 1.2 - if l.Config.LeaseDuration <= l.Config.RenewDeadline { - return fmt.Errorf("leaseDuration must be greater than renewDeadline") - } - if l.Config.RenewDeadline <= time.Duration(JitterFactor*float64(l.Config.RetryPeriod)) { - return fmt.Errorf("RenewDeadline must be greater than retryPeriod*JitterFactor") - } - if l.Config.LeaseDuration < 1 { - return fmt.Errorf("LeaseDuration must be greater than zero") - } - if l.Config.RenewDeadline < 1 { - return fmt.Errorf("RenewDeadline must be greater than zero") - } - if l.Config.RetryPeriod < 1 { - return fmt.Errorf("RetryPeriod must be greater than zero") - } - l.Log.Info("LeaseConfig is valid. Moving on!") - return nil + return leaseExpiry.After(time.Now()) } diff --git a/pkg/kubernetes/lease_test.go b/pkg/kubernetes/lease_test.go new file mode 100644 index 000000000000..2a73fa6a1cf3 --- /dev/null +++ b/pkg/kubernetes/lease_test.go @@ -0,0 +1,38 @@ +package kubernetes + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + coordination "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestValidLease(t *testing.T) { + leaseDuration := int32(60) + microNow := metav1.NowMicro() + + lease := coordination.Lease{ + Spec: coordination.LeaseSpec{ + LeaseDurationSeconds: &leaseDuration, + RenewTime: µNow, + }, + } + + assert.Equal(t, true, IsValidLease(lease)) +} + +func TestExpiredLease(t *testing.T) { + leaseDuration := int32(60) + renew := metav1.NewMicroTime(time.Now().Add(-62 * time.Second)) + + lease := coordination.Lease{ + Spec: coordination.LeaseSpec{ + LeaseDurationSeconds: &leaseDuration, + RenewTime: &renew, + }, + } + + assert.Equal(t, false, IsValidLease(lease)) +}