From 8ad6cf967c3b61d7e0734f810686e4a7f7389f00 Mon Sep 17 00:00:00 2001 From: Jason Liu Date: Tue, 26 Apr 2022 08:26:30 +0800 Subject: [PATCH] add expire cache (#93) Signed-off-by: Jason Liu --- pkg/cache/expiration_cache.go | 128 ++++++++++++++++++++++++ pkg/cache/expiration_cache_test.go | 114 +++++++++++++++++++++ pkg/koordlet/resmanager/memory_evict.go | 4 +- pkg/koordlet/resmanager/resmanager.go | 31 +++++- 4 files changed, 272 insertions(+), 5 deletions(-) create mode 100644 pkg/cache/expiration_cache.go create mode 100644 pkg/cache/expiration_cache_test.go diff --git a/pkg/cache/expiration_cache.go b/pkg/cache/expiration_cache.go new file mode 100644 index 000000000..b8ec86153 --- /dev/null +++ b/pkg/cache/expiration_cache.go @@ -0,0 +1,128 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" +) + +const ( + defaultExpiration = 2 * time.Minute + defaultGCInterval = time.Minute +) + +type item struct { + object interface{} + expirationTime time.Time +} + +type Cache struct { + items map[string]item + defaultExpiration time.Duration + gcInterval time.Duration + gcStarted bool + mu sync.Mutex +} + +func NewCacheDefault() *Cache { + return &Cache{ + items: map[string]item{}, + defaultExpiration: defaultExpiration, + gcInterval: defaultGCInterval, + } +} + +func NewCache(expiration time.Duration, gcInterval time.Duration) *Cache { + cache := Cache{ + items: map[string]item{}, + defaultExpiration: expiration, + gcInterval: gcInterval, + } + if cache.defaultExpiration <= 0 { + cache.defaultExpiration = defaultExpiration + } + if cache.gcInterval <= time.Second { + cache.gcInterval = defaultGCInterval + } + return &cache +} + +func (c *Cache) Run(stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + c.gcStarted = true + go wait.Until(func() { + c.gcExpiredCache() + }, c.gcInterval, stopCh) + return nil +} + +func (c *Cache) gcExpiredCache() { + c.mu.Lock() + defer c.mu.Unlock() + gcTime := time.Now() + var gcKeys []string + for key, item := range c.items { + if gcTime.After(item.expirationTime) { + gcKeys = append(gcKeys, key) + } + } + for _, key := range gcKeys { + delete(c.items, key) + } + klog.V(5).Infof("gc resource update executor, current size %v", len(c.items)) +} + +func (c *Cache) Set(key string, value interface{}, expiration time.Duration) error { + return c.set(key, value, expiration) +} + +func (c *Cache) SetDefault(key string, value interface{}) error { + return c.set(key, value, c.defaultExpiration) +} + +func (c *Cache) set(key string, value interface{}, expiration time.Duration) error { + if !c.gcStarted { + return fmt.Errorf("cache GC is not started yet") + } + item := item{ + object: value, + expirationTime: time.Now().Add(expiration), + } + c.mu.Lock() + defer c.mu.Unlock() + c.items[key] = item + return nil +} + +func (c *Cache) Get(key string) (interface{}, bool) { + c.mu.Lock() + defer c.mu.Unlock() + item, ok := c.items[key] + if !ok { + return nil, false + } + if item.expirationTime.Before(time.Now()) { + return nil, false + } + return item.object, true +} diff --git a/pkg/cache/expiration_cache_test.go b/pkg/cache/expiration_cache_test.go new file mode 100644 index 000000000..984943682 --- /dev/null +++ b/pkg/cache/expiration_cache_test.go @@ -0,0 +1,114 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package cache + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_Cache_Get(t *testing.T) { + cache := NewCacheDefault() + cache.gcStarted = true + cache.items = map[string]item{ + "keyExpire": {object: "value1", expirationTime: time.Now().Add(-1 * time.Minute)}, + "keyNotExpire": {object: "value2", expirationTime: time.Now().Add(1 * time.Minute)}, + } + value, found := cache.Get("keyExpire") + assert.True(t, !found, "value not found", "keyExpire") + assert.Nil(t, value, "value must be nil", "keyExpire") + + value, found = cache.Get("keyNotExpire") + assert.True(t, found, "value found", "keyNotExpire") + assert.Equal(t, "value2", value, "keyNotExpire") +} + +func Test_Cache_Set(t *testing.T) { + cache := NewCacheDefault() + cache.gcStarted = true + value, found := cache.Get("key") + assert.True(t, !found, "value not found") + assert.Nil(t, value, "value must be nil") + + _ = cache.SetDefault("key", "value") + value, found = cache.Get("key") + assert.True(t, found, "value found", "checkSetDefault") + assert.Equal(t, "value", value, "checkSetDefault") + + _ = cache.Set("key", "value", -1*time.Minute) + value, found = cache.Get("key") + assert.True(t, !found, "value not found", "checkSet") + assert.Nil(t, value, "value must be nil", "checkSet") + +} + +func Test_gcExpiredCache(t *testing.T) { + tests := []struct { + name string + initItems map[string]item + cache *Cache + expectItemsAfterGC map[string]item + }{ + { + name: "test_gcExpiredCache_NewCacheDefault", + initItems: map[string]item{ + "keyNeedExpire": {object: "value1", expirationTime: time.Now().Add(-1 * time.Minute)}, + "keyNotExpire": {object: "value2", expirationTime: time.Now().Add(time.Minute)}, + }, + cache: NewCacheDefault(), + expectItemsAfterGC: map[string]item{ + "keyNotExpire": {object: "value2", expirationTime: time.Now().Add(time.Minute)}, + }, + }, + { + name: "test_gcExpiredCache_NewCache", + initItems: map[string]item{ + "keyNeedExpire": {object: "value1", expirationTime: time.Now().Add(-1 * time.Minute)}, + "keyNotExpire": {object: "value2", expirationTime: time.Now().Add(time.Minute)}, + }, + cache: NewCache(time.Minute, time.Minute), + expectItemsAfterGC: map[string]item{ + "keyNotExpire": {object: "value2", expirationTime: time.Now().Add(time.Minute)}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.cache.items = tt.initItems + tt.cache.gcStarted = true + tt.cache.gcExpiredCache() + got := tt.cache.items + assert.Equal(t, len(tt.expectItemsAfterGC), len(got), "checkLen") + checkValueEqual(t, tt.expectItemsAfterGC, got) + }) + } +} + +func checkValueEqual(t *testing.T, expect, got map[string]item) { + assert.Equal(t, len(expect), len(got), "checkLen") + for key, item := range expect { + gotItem, ok := got[key] + if !ok { + assert.True(t, ok, "checkFound", key) + return + } + assert.Equal(t, item.object, gotItem.object, "checkValue", key) + } +} diff --git a/pkg/koordlet/resmanager/memory_evict.go b/pkg/koordlet/resmanager/memory_evict.go index 8a0313806..c7e59ff4f 100644 --- a/pkg/koordlet/resmanager/memory_evict.go +++ b/pkg/koordlet/resmanager/memory_evict.go @@ -130,9 +130,7 @@ func (m *MemoryEvictor) killAndEvictBEPods(node *corev1.Node, podMetrics []*metr } } - for _, pod := range killedPods { - m.resManager.evictPod(pod, node, evictPodByNodeMemoryUsage, message) - } + m.resManager.evictPodsIfNotEvicted(killedPods, node, evictPodByNodeMemoryUsage, message) m.lastEvictTime = time.Now() klog.Infof("killAndEvictBEPods completed, memoryNeedRelease(%v) memoryReleased(%v)", memoryNeedRelease, memoryNeedRelease) diff --git a/pkg/koordlet/resmanager/resmanager.go b/pkg/koordlet/resmanager/resmanager.go index d15c158d6..7e7c80994 100644 --- a/pkg/koordlet/resmanager/resmanager.go +++ b/pkg/koordlet/resmanager/resmanager.go @@ -39,9 +39,11 @@ import ( "k8s.io/klog/v2" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + expireCache "github.com/koordinator-sh/koordinator/pkg/cache" koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" slolisterv1alpha1 "github.com/koordinator-sh/koordinator/pkg/client/listers/slo/v1alpha1" "github.com/koordinator-sh/koordinator/pkg/features" + "github.com/koordinator-sh/koordinator/pkg/koordlet/audit" "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" @@ -65,6 +67,7 @@ type resmanager struct { schema *apiruntime.Scheme statesInformer statesinformer.StatesInformer metricCache metriccache.MetricCache + podsEvicted *expireCache.Cache nodeSLOInformer cache.SharedIndexInformer nodeSLOLister slolisterv1alpha1.NodeSLOLister kubeClient clientset.Interface @@ -181,6 +184,7 @@ func NewResManager(cfg *Config, schema *apiruntime.Scheme, kubeClient clientset. schema: schema, statesInformer: statesInformer, metricCache: metricCache, + podsEvicted: expireCache.NewCacheDefault(), nodeSLOInformer: informer, nodeSLOLister: slolisterv1alpha1.NewNodeSLOLister(informer.GetIndexer()), kubeClient: kubeClient, @@ -239,6 +243,8 @@ func (r *resmanager) Run(stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() klog.Info("Starting resmanager") + r.podsEvicted.Run(stopCh) + klog.Infof("starting informer for NodeSLO") go r.nodeSLOInformer.Run(stopCh) if !cache.WaitForCacheSync(stopCh, r.nodeSLOInformer.HasSynced) { @@ -277,9 +283,27 @@ func (r *resmanager) hasSynced() bool { return r.nodeSLO != nil && r.nodeSLO.Spec.ResourceUsedThresholdWithBE != nil } -func (r *resmanager) evictPod(evictPod *corev1.Pod, node *corev1.Node, reason string, message string) { - podEvictMessage := fmt.Sprintf("evict Pod:%s, reason: %s, message: %v", evictPod.Name, reason, message) +func (r *resmanager) evictPodsIfNotEvicted(evictPods []*corev1.Pod, node *corev1.Node, reason string, message string) { + for _, evictPod := range evictPods { + r.evictPodIfNotEvicted(evictPod, node, reason, message) + } +} + +func (r *resmanager) evictPodIfNotEvicted(evictPod *corev1.Pod, node *corev1.Node, reason string, message string) { + _, evicted := r.podsEvicted.Get(string(evictPod.UID)) + if evicted { + klog.V(5).Infof("Pod has been evicted! podID: %v, evict reason: %s", evictPod.UID, reason) + return + } + success := r.evictPod(evictPod, node, reason, message) + if success { + _ = r.podsEvicted.SetDefault(string(evictPod.UID), evictPod.UID) + } +} +func (r *resmanager) evictPod(evictPod *corev1.Pod, node *corev1.Node, reason string, message string) bool { + podEvictMessage := fmt.Sprintf("evict Pod:%s, reason: %s, message: %v", evictPod.Name, reason, message) + _ = audit.V(0).Pod(evictPod.Namespace, evictPod.Name).Reason(reason).Message(message).Do() podEvict := policyv1.Eviction{ ObjectMeta: metav1.ObjectMeta{ Name: evictPod.Name, @@ -291,10 +315,13 @@ func (r *resmanager) evictPod(evictPod *corev1.Pod, node *corev1.Node, reason st r.eventRecorder.Eventf(node, corev1.EventTypeWarning, evictPodSuccess, podEvictMessage) metrics.RecordPodEviction(reason) klog.Infof("evict pod %v/%v success, reason: %v", evictPod.Namespace, evictPod.Name, reason) + return true } else if !errors.IsNotFound(err) { r.eventRecorder.Eventf(node, corev1.EventTypeWarning, evictPodFail, podEvictMessage) klog.Errorf("evict pod %v/%v failed, reason: %v, error: %v", evictPod.Namespace, evictPod.Name, reason, err) + return false } + return true } // killContainers kills containers inside the pod