diff --git a/cmd/server.go b/cmd/server.go index 1318a8cde3dd..5ac95ca08ad8 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -180,6 +180,14 @@ func startServer(token string) error { LogLevel: logging["kube-apiserver"], Storage: storageBackend, }) + + if clusterConfig.Spec.API.ExternalAddress != "" { + componentManager.Add(&server.ControllerLease{ + ClusterConfig: clusterConfig, + KubeClientFactory: adminClientFactory, + }) + } + componentManager.Add(&server.Konnectivity{ ClusterConfig: clusterConfig, LogLevel: logging["konnectivity-server"], diff --git a/pkg/component/server/controllerlease.go b/pkg/component/server/controllerlease.go new file mode 100644 index 000000000000..b22a9cbe1250 --- /dev/null +++ b/pkg/component/server/controllerlease.go @@ -0,0 +1,87 @@ +package server + +import ( + "context" + "fmt" + "os" + + "github.com/sirupsen/logrus" + + config "github.com/k0sproject/k0s/pkg/apis/v1beta1" + kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "github.com/k0sproject/k0s/pkg/leaderelection" +) + +// ControllerLease implements a component that manages a lease per controller. +// The per-controller leases are used to determine the amount of currently running controllers +type ControllerLease struct { + ClusterConfig *config.ClusterConfig + KubeClientFactory kubeutil.ClientFactory + + cancelCtx context.Context + cancelFunc context.CancelFunc + leaseCancel context.CancelFunc +} + +// Init initializes the component needs +func (c *ControllerLease) Init() error { + return nil +} + +// Run runs the leader elector to keep the lease object up-to-date. +func (c *ControllerLease) Run() error { + c.cancelCtx, c.cancelFunc = context.WithCancel(context.Background()) + log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) + client, err := c.KubeClientFactory.GetClient() + if err != nil { + return fmt.Errorf("can't create kubernetes rest client for lease pool: %v", err) + } + + // hostname used to make the lease names be clear to which controller they belong to + holderIdentity, err := os.Hostname() + if err != nil { + return nil + } + leaseID := fmt.Sprintf("k0s-ctrl-%s", holderIdentity) + + leasePool, err := leaderelection.NewLeasePool(client, leaseID, leaderelection.WithLogger(log)) + + if err != nil { + return err + } + events, cancel, err := leasePool.Watch() + if err != nil { + return err + } + + c.leaseCancel = cancel + + go func() { + for { + select { + case <-events.AcquiredLease: + log.Info("acquired leader lease") + case <-events.LostLease: + log.Error("lost leader lease, this should not really happen!?!?!?") + case <-c.cancelCtx.Done(): + return + } + } + }() + return nil +} + +// Stop stops the component +func (c *ControllerLease) Stop() error { + if c.leaseCancel != nil { + c.leaseCancel() + } + + if c.cancelFunc != nil { + c.cancelFunc() + } + return nil +} + +// Healthy is a no-op healchcheck +func (c *ControllerLease) Healthy() error { return nil } diff --git a/pkg/component/server/konnectivity.go b/pkg/component/server/konnectivity.go index f043c00a20a3..76ac1e198bb9 100644 --- a/pkg/component/server/konnectivity.go +++ b/pkg/component/server/konnectivity.go @@ -20,9 +20,12 @@ import ( "fmt" "os" "path/filepath" + "strconv" + "strings" "time" "github.com/sirupsen/logrus" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/k0sproject/k0s/internal/util" config "github.com/k0sproject/k0s/pkg/apis/v1beta1" @@ -33,27 +36,25 @@ import ( "github.com/k0sproject/k0s/pkg/supervisor" ) -const ( - konnectivityLeaseName = "konnectivity-server" - konnectivityLeaseNameSpace = "kube-node-lease" - serviceName = "konnectivity-server" -) - -// Konnectivity implement the component interface of konnectivity server +// Konnectivity implements the component interface of konnectivity server type Konnectivity struct { ClusterConfig *config.ClusterConfig K0sVars constant.CfgVars LogLevel string - supervisor supervisor.Supervisor + supervisor *supervisor.Supervisor uid int // used for lease lock KubeClientFactory k8sutil.ClientFactory serverCount int - leaseLock *kubernetes.LeaseLock - leaseCtx context.Context - leaseCancel context.CancelFunc + + serverCountChan chan int + + stopCtx context.Context + stopFunc context.CancelFunc + + log *logrus.Entry } // Init ... @@ -73,65 +74,102 @@ func (k *Konnectivity) Init() error { return fmt.Errorf("failed to chown %s: %v", k.K0sVars.KonnectivitySocketDir, err) } - // set default serverCount to 1 - k.serverCount = kubernetes.MinLeaseHolders + k.log = logrus.WithFields(logrus.Fields{"component": "konnectivity"}) + return assets.Stage(k.K0sVars.BinDir, "konnectivity-server", constant.BinDirMode) } // Run .. func (k *Konnectivity) Run() error { - args := []string{ - fmt.Sprintf("--uds-name=%s", filepath.Join(k.K0sVars.KonnectivitySocketDir, "konnectivity-server.sock")), - fmt.Sprintf("--cluster-cert=%s", filepath.Join(k.K0sVars.CertRootDir, "server.crt")), - fmt.Sprintf("--cluster-key=%s", filepath.Join(k.K0sVars.CertRootDir, "server.key")), - fmt.Sprintf("--kubeconfig=%s", k.K0sVars.KonnectivityKubeConfigPath), - "--mode=grpc", - "--server-port=0", - "--agent-port=8132", - "--admin-port=8133", - "--agent-namespace=kube-system", - "--agent-service-account=konnectivity-agent", - "--authentication-audience=system:konnectivity-server", - "--logtostderr=true", - "--stderrthreshold=1", - "-v=2", - fmt.Sprintf("--v=%s", k.LogLevel), - "--enable-profiling=false", - } + + // Buffered chan to send updates for the count of servers + k.serverCountChan = make(chan int, 1) + + k.stopCtx, k.stopFunc = context.WithCancel(context.Background()) + + go k.runServer() if k.ClusterConfig.Spec.API.ExternalAddress != "" { - serverID, err := util.MachineID() - if err != nil { - logrus.Errorf("failed to fetch server ID for %v", serviceName) - } - args = append(args, fmt.Sprintf("--server-count=%d", k.serverCount)) - args = append(args, fmt.Sprintf("--server-id=%s", serverID)) + go k.runLeaseCounter() + } else { + // It's a buffered channel so once we start the runServer routine it'll pick this up and just sees it never changing + k.serverCountChan <- 1 } - logrus.Info("Starting konnectivity") - k.supervisor = supervisor.Supervisor{ - Name: "konnectivity", - BinPath: assets.BinPath("konnectivity-server", k.K0sVars.BinDir), - DataDir: k.K0sVars.DataDir, - RunDir: k.K0sVars.RunDir, - Args: args, - UID: k.uid, - } + return k.writeKonnectivityAgent() +} - err := k.supervisor.Supervise() +func (k *Konnectivity) defaultArgs() util.MappedArgs { + serverID, err := util.MachineID() if err != nil { - return err + logrus.Errorf("failed to fetch server ID for konnectivity-server") } - if k.ClusterConfig.Spec.API.ExternalAddress != "" { - k.runLease() + return util.MappedArgs{ + "--uds-name": filepath.Join(k.K0sVars.KonnectivitySocketDir, "konnectivity-server.sock"), + "--cluster-cert": filepath.Join(k.K0sVars.CertRootDir, "server.crt"), + "--cluster-key": filepath.Join(k.K0sVars.CertRootDir, "server.key"), + "--kubeconfig": k.K0sVars.KonnectivityKubeConfigPath, + "--mode": "grpc", + "--server-port": "0", + "--agent-port": "8132", + "--admin-port": "8133", + "--agent-namespace": "kube-system", + "--agent-service-account": "konnectivity-agent", + "--authentication-audience": "system:konnectivity-server", + "--logtostderr": "true", + "--stderrthreshold": "1", + "--v": k.LogLevel, + "--enable-profiling": "false", + "--server-id": serverID, + } +} + +// runs the supervisor and restarts if the calculated server count changes +func (k *Konnectivity) runServer() { + for { + select { + case <-k.stopCtx.Done(): + return + case count := <-k.serverCountChan: + // restart only if the count actually changes + if count != k.serverCount { + // Stop supervisor + + if k.supervisor != nil { + if err := k.supervisor.Stop(); err != nil { + logrus.Errorf("failed to stop supervisor: %s", err) + // TODO Should we just return? That means other part will continue to run but the server is never properly restarted + } + } + args := k.defaultArgs() + args["--server-count"] = strconv.Itoa(count) + k.supervisor = &supervisor.Supervisor{ + Name: "konnectivity", + BinPath: assets.BinPath("konnectivity-server", k.K0sVars.BinDir), + DataDir: k.K0sVars.DataDir, + RunDir: k.K0sVars.RunDir, + Args: args.ToArgs(), + UID: k.uid, + } + err := k.supervisor.Supervise() + if err != nil { + logrus.Errorf("failed to start konnectivity supervisor: %s", err) + k.supervisor = nil // not to make the next loop to try to stop it first + continue + } + k.serverCount = count + } + } } - return k.writeKonnectivityAgent() } // Stop stops func (k *Konnectivity) Stop() error { - if k.ClusterConfig.Spec.API.ExternalAddress != "" { - k.stopLease() + if k.stopFunc != nil { + k.stopFunc() + } + if k.supervisor == nil { + return nil } return k.supervisor.Stop() } @@ -164,95 +202,47 @@ func (k *Konnectivity) writeKonnectivityAgent() error { return nil } -func (k *Konnectivity) runLease() { - k.leaseCtx, k.leaseCancel = context.WithCancel(context.Background()) - go func(ctx context.Context) { - logrus.Infof("starting %v lease watcher", serviceName) - leaseLock, err := k.newLeaseLock() - if err != nil { - logrus.Error(err) - } - k.leaseLock = leaseLock - for { - select { - case <-ctx.Done(): - logrus.Debugf("stopping lease watcher for %v", serviceName) - return - default: - k.leaseLock.LeaseRunner(context.Background()) +func (k *Konnectivity) runLeaseCounter() { + + logrus.Infof("starting to count controller lease holders every 10 secs") + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-k.stopCtx.Done(): + return + case <-ticker.C: + count, err := k.countLeaseHolders() + if err != nil { + logrus.Errorf("failed to count controller leases: %s", err) + continue } + k.serverCountChan <- count } - }(k.leaseCtx) - - go func(ctx context.Context) { - logrus.Infof("watching %v lease holders", serviceName) - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - logrus.Debugf("stopping lease holder count for %v", serviceName) - return - case <-ticker.C: - observedLeaseHolders := k.leaseLock.CountValidLeaseHolders(context.Background()) - if observedLeaseHolders != k.serverCount { - logrus.Debugf("change in %v lease holders detected. refreshing service.", serviceName) - k.serverCount = observedLeaseHolders - - // restarting service - if err := k.restartService(); err != nil { - logrus.Errorf("failed to restart %v: %v", serviceName, err) - } - } - } - logrus.Debugf("found %v lease holders for %v", k.serverCount, serviceName) - } - }(k.leaseCtx) + } } -func (k *Konnectivity) newLeaseLock() (*kubernetes.LeaseLock, error) { +func (k *Konnectivity) countLeaseHolders() (int, error) { client, err := k.KubeClientFactory.GetClient() if err != nil { - return nil, fmt.Errorf("failed to get lease client: %v", err) + return 0, err } - - holderIdentity, err := os.Hostname() + ctx, cancel := context.WithTimeout(k.stopCtx, 5*time.Second) + defer cancel() + count := 0 + leases, err := client.CoordinationV1().Leases("kube-node-lease").List(ctx, v1.ListOptions{}) if err != nil { - return nil, err + return 0, err } - - log := logrus.WithFields(logrus.Fields{"component": "konnectivity"}) - - leaseConfig := &kubernetes.LeaseConfig{ - HolderIdentity: holderIdentity, - Name: fmt.Sprintf("%v-%v", konnectivityLeaseName, holderIdentity), - Namespace: konnectivityLeaseNameSpace, - ServiceName: serviceName, - LeaseDuration: 120 * time.Second, // to prevent flapping of the konnectivity service, the lease is somewhat longer than normal - RenewDeadline: 40 * time.Second, - RetryPeriod: 30 * time.Second, - } - return &kubernetes.LeaseLock{ - Config: leaseConfig, - Client: client.CoordinationV1(), - Log: log, - }, nil -} - -func (k *Konnectivity) stopLease() { - if k.leaseCancel != nil { - k.leaseCancel() + for _, l := range leases.Items { + if strings.HasPrefix(l.ObjectMeta.Name, "k0s-ctrl") { + if kubernetes.IsValidLease(l) { + count++ + } + } } -} -func (k *Konnectivity) restartService() error { - if err := k.Stop(); err != nil { - return fmt.Errorf("failed to stop %v: %v", serviceName, err) - } - if err := k.Run(); err != nil { - return fmt.Errorf("failed to start %v: %v", serviceName, err) - } - return nil + return count, nil } const konnectivityAgentTemplate = ` @@ -338,5 +328,5 @@ spec: audience: system:konnectivity-server ` -// Health-check interface +// Healthy is a no-op check func (k *Konnectivity) Healthy() error { return nil } diff --git a/pkg/kubernetes/lease.go b/pkg/kubernetes/lease.go index d2f87d4a3ded..a380eb66bd02 100644 --- a/pkg/kubernetes/lease.go +++ b/pkg/kubernetes/lease.go @@ -16,291 +16,16 @@ limitations under the License. package kubernetes import ( - "bytes" - "context" - "encoding/json" - "fmt" - "os" "time" - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - coordinationv1 "k8s.io/api/coordination/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" -) - -var ( - MinLeaseHolders = 1 ) -type LeaseLock struct { - Client coordinationv1client.LeasesGetter - Config *LeaseConfig - Log *logrus.Entry - - lease *coordinationv1.Lease - observedSpec *coordinationv1.LeaseSpec - observedRawSpec []byte - observedTime time.Time -} - -type LeaseConfig struct { - HolderIdentity string - Name string - Namespace string - ServiceName string - - // LeaseDuration is the duration of a lease, before the lease can be re-acquired. A client needs to wait a full LeaseDuration without observing a change to - // the record before it can attempt to take over. - LeaseDuration time.Duration - - // RenewDeadline is the duration that the lease holder will retry refreshing leadership before giving up. - RenewDeadline time.Duration - - // RetryPeriod is the duration the lease clients should wait - // between tries of actions. - RetryPeriod time.Duration -} - -// getLease gets the LeaseSpec for a for the relevant LeaseMeta Name & Namespace -func (l *LeaseLock) getLease(ctx context.Context) (*coordinationv1.LeaseSpec, []byte, error) { - var err error - l.lease, err = l.Client.Leases(l.Config.Namespace).Get(ctx, l.Config.Name, metav1.GetOptions{}) - if err != nil { - return nil, nil, err - } - - leaseByte, err := json.Marshal(l.lease.Spec) - if err != nil { - return nil, nil, err - } - return &l.lease.Spec, leaseByte, nil -} - -// createLease creates the LeaseSpec for a for the relevant LeaseMeta Name, Namespace & LeaseSpec -func (l *LeaseLock) createLease(ctx context.Context, leaseSpec coordinationv1.LeaseSpec) error { - var err error - lease := &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: l.Config.Name, - Namespace: l.Config.Namespace, - Labels: map[string]string{"service": l.Config.ServiceName}, - }, - Spec: leaseSpec, - } - - l.lease, err = l.Client.Leases(l.Config.Namespace).Create(ctx, lease, metav1.CreateOptions{}) - return err -} - -// updateLease updates the LeaseSpec for a for the relevant LeaseMeta Name, Namespace & LeaseSpec -func (l *LeaseLock) updateLease(ctx context.Context, leaseSpec coordinationv1.LeaseSpec) error { - var err error - if l.lease == nil { - return fmt.Errorf("lease not initialized, call get or create first") - } - l.lease.Spec = leaseSpec - - lease, err := l.Client.Leases(l.Config.Namespace).Update(ctx, l.lease, metav1.UpdateOptions{}) - if err != nil { - return err - } - l.lease = lease - return nil -} - -func leaseConfigToLeaseSpec(lec LeaseConfig) *coordinationv1.LeaseSpec { - leaseDurationSeconds := int32(lec.LeaseDuration / time.Second) - holderIdentity, err := os.Hostname() - if err != nil { - return nil - } +// IsValidLease check whether or not the lease is expired +func IsValidLease(lease coordinationv1.Lease) bool { + leaseDur := time.Duration(*lease.Spec.LeaseDurationSeconds) - return &coordinationv1.LeaseSpec{ - HolderIdentity: &holderIdentity, - AcquireTime: &metav1.MicroTime{Time: time.Now()}, - LeaseDurationSeconds: &leaseDurationSeconds, - RenewTime: &metav1.MicroTime{Time: time.Now()}, - } -} - -func (l *LeaseLock) LeaseRunner(ctx context.Context) { - err := l.validateLeaseConfig() - if err != nil { - l.Log.Error(err) - } - - defer runtime.HandleCrash() - if !l.acquire(ctx) { - return // ctx signalled done - } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - l.renew(ctx) -} - -// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. -// Returns false if ctx signals done. -func (l *LeaseLock) acquire(ctx context.Context) bool { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - succeeded := false - desc := fmt.Sprintf("%v/%v", l.Config.Namespace, l.Config.Name) - l.Log.Infof("attempting to acquire lease %v...", desc) - wait.JitterUntil(func() { - succeeded = l.tryAcquireOrRenew(ctx) - if !succeeded { - l.Log.Infof("failed to acquire lease %v. will attempt to re-acquire.", desc) - return - } - l.Log.Infof("successfully acquired lease %v", desc) - cancel() - }, l.Config.RetryPeriod, 1.2, true, ctx.Done()) - return succeeded -} - -func (l *LeaseLock) renew(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - wait.Until(func() { - timeoutCtx, timeoutCancel := context.WithTimeout(ctx, l.Config.RenewDeadline) - defer timeoutCancel() - err := wait.PollImmediateUntil(l.Config.RetryPeriod, func() (bool, error) { - return l.tryAcquireOrRenew(timeoutCtx), nil - }, timeoutCtx.Done()) - - desc := fmt.Sprintf("%v/%v", l.Config.Namespace, l.Config.Name) - if err == nil { - l.Log.Infof("successfully renewed lease %v", desc) - return - } - // renewal failed - l.Log.Infof("failed to renew lease %v: %v", desc, err) - cancel() - }, l.Config.RetryPeriod, ctx.Done()) -} - -func (l *LeaseLock) tryAcquireOrRenew(ctx context.Context) bool { - desiredLeaseSpec := leaseConfigToLeaseSpec(*l.Config) - - // 1. obtain or create the lease - oldLeaseSpec, oldLeaseRawSpec, err := l.getLease(ctx) - - desc := fmt.Sprintf("%v/%v", l.Config.Namespace, l.Config.Name) - - if err != nil { - if !errors.IsNotFound(err) { - l.Log.Errorf("error retrieving resource lease %v: %v", desc, err) - return false - } - if err = l.createLease(ctx, *desiredLeaseSpec); err != nil { - l.Log.Errorf("error initially creating lease lock: %v", err) - return false - } - l.observedSpec = desiredLeaseSpec - l.observedTime = time.Now() - - return true - } - - // 2. record exists, check identity & time - if !bytes.Equal(l.observedRawSpec, oldLeaseRawSpec) { - l.observedSpec = oldLeaseSpec - l.observedRawSpec = oldLeaseRawSpec - l.observedTime = time.Now() - } - - expiredTime := l.observedTime.Add(l.Config.LeaseDuration) - - // check if the expired Time is after current time (I.E. not yet expired) - if len(*oldLeaseSpec.HolderIdentity) > 0 && expiredTime.After(time.Now()) { - l.Log.Debugf("lease is held by %v and has not yet expired (expiration time: %v)", *oldLeaseSpec.HolderIdentity, expiredTime) - return false - } - - // 3. update the lease - desiredLeaseSpec.AcquireTime = oldLeaseSpec.AcquireTime // leave the "acquired" field unchanged - - // update the lock itself - if err = l.updateLease(ctx, *desiredLeaseSpec); err != nil { - l.Log.Errorf("Failed to update lease: %v", err) - return false - } - - l.observedSpec = desiredLeaseSpec - l.observedTime = time.Now() - return true -} - -// fetch a list of all the leases that contain the label service=l.Config.ServiceName -func (l *LeaseLock) listLease(ctx context.Context) *coordinationv1.LeaseList { - label := map[string]string{"service": l.Config.ServiceName} - desc := fmt.Sprintf("%v/%v", l.Config.Namespace, l.Config.Name) - - ls := &metav1.LabelSelector{} - if err := metav1.Convert_Map_string_To_string_To_v1_LabelSelector(&label, ls, nil); err != nil { - logrus.Debugf("failed to parse label for listing lease %v", desc) - } - - opts := metav1.ListOptions{ - LabelSelector: metav1.FormatLabelSelector(ls), - } - - list, err := l.Client.Leases(l.Config.Namespace).List(ctx, opts) - if err != nil { - logrus.Errorf("failed to fetch lease holders for %v", desc) - } - return list -} - -// CountValidLeaseHolders fetches the leaseList and examines if a lease is still valid. -// if it is, it adds it to the validLeaseHolder count -func (l *LeaseLock) CountValidLeaseHolders(ctx context.Context) int { - var validLeases []coordinationv1.Lease - list := l.listLease(ctx) - - for i := range list.Items { - lease := list.Items[i] - leaseDurationSeconds := lease.Spec.LeaseDurationSeconds - leaseDuration := time.Duration(*leaseDurationSeconds) * time.Second - lastRenewed := lease.Spec.RenewTime - expires := lastRenewed.Add(leaseDuration).Add(l.Config.RenewDeadline) - - if expires.After(time.Now()) { - logrus.Debugf("lease for %v still valid, adding to valid lease count", *lease.Spec.HolderIdentity) - validLeases = append(validLeases, lease) - } - } - count := len(validLeases) - if count < MinLeaseHolders { - return MinLeaseHolders - } - return count -} + leaseExpiry := lease.Spec.RenewTime.Add(leaseDur * time.Second) -func (l *LeaseLock) validateLeaseConfig() error { - l.Log.Debug("Validating lease config") - var JitterFactor = 1.2 - if l.Config.LeaseDuration <= l.Config.RenewDeadline { - return fmt.Errorf("leaseDuration must be greater than renewDeadline") - } - if l.Config.RenewDeadline <= time.Duration(JitterFactor*float64(l.Config.RetryPeriod)) { - return fmt.Errorf("RenewDeadline must be greater than retryPeriod*JitterFactor") - } - if l.Config.LeaseDuration < 1 { - return fmt.Errorf("LeaseDuration must be greater than zero") - } - if l.Config.RenewDeadline < 1 { - return fmt.Errorf("RenewDeadline must be greater than zero") - } - if l.Config.RetryPeriod < 1 { - return fmt.Errorf("RetryPeriod must be greater than zero") - } - l.Log.Info("LeaseConfig is valid. Moving on!") - return nil + return leaseExpiry.After(time.Now()) } diff --git a/pkg/kubernetes/lease_test.go b/pkg/kubernetes/lease_test.go new file mode 100644 index 000000000000..2a73fa6a1cf3 --- /dev/null +++ b/pkg/kubernetes/lease_test.go @@ -0,0 +1,38 @@ +package kubernetes + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + coordination "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestValidLease(t *testing.T) { + leaseDuration := int32(60) + microNow := metav1.NowMicro() + + lease := coordination.Lease{ + Spec: coordination.LeaseSpec{ + LeaseDurationSeconds: &leaseDuration, + RenewTime: µNow, + }, + } + + assert.Equal(t, true, IsValidLease(lease)) +} + +func TestExpiredLease(t *testing.T) { + leaseDuration := int32(60) + renew := metav1.NewMicroTime(time.Now().Add(-62 * time.Second)) + + lease := coordination.Lease{ + Spec: coordination.LeaseSpec{ + LeaseDurationSeconds: &leaseDuration, + RenewTime: &renew, + }, + } + + assert.Equal(t, false, IsValidLease(lease)) +}