Skip to content

Commit

Permalink
Machine: ignore attached Volumes referred by pods ignored during drain
Browse files Browse the repository at this point in the history
  • Loading branch information
chrischdi committed Oct 4, 2024
1 parent 9569cd6 commit 3057c55
Show file tree
Hide file tree
Showing 7 changed files with 585 additions and 79 deletions.
18 changes: 9 additions & 9 deletions internal/controllers/machine/drain/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ const (
expirationInterval = 10 * time.Hour
)

// CacheEntry is an entry of the drain cache. It stores at which time a Machine was drained the last time.
// CacheEntry is an entry of the requeue cache. It stores at which time a Machine was proceeded the last time.
type CacheEntry struct {
Machine types.NamespacedName
LastDrain time.Time
Machine types.NamespacedName
LastProceed time.Time
}

// Cache caches the time when the last drain was done for a Machine.
// Specifically we only use it to ensure we only retry drains
// Cache caches the time when the a Machine was processed last.
// Specifically we use it to ensure we only drain or wait for volume detachment
// at a specific interval and not more often.
type Cache interface {
// Add adds the given entry to the Cache.
Expand All @@ -53,7 +53,7 @@ type Cache interface {

// NewCache creates a new cache.
func NewCache() Cache {
r := &drainCache{
r := &retryCache{
Store: cache.NewTTLStore(func(obj interface{}) (string, error) {
// We only add CacheEntries to the cache, so it's safe to cast to CacheEntry.
return obj.(CacheEntry).Machine.String(), nil
Expand All @@ -72,13 +72,13 @@ func NewCache() Cache {
return r
}

type drainCache struct {
type retryCache struct {
cache.Store
}

// Add adds the given entry to the Cache.
// Note: entries expire after the ttl.
func (r *drainCache) Add(entry CacheEntry) {
func (r *retryCache) Add(entry CacheEntry) {
// Note: We can ignore the error here because by only allowing CacheEntries
// and providing the corresponding keyFunc ourselves we can guarantee that
// the error never occurs.
Expand All @@ -87,7 +87,7 @@ func (r *drainCache) Add(entry CacheEntry) {

// Has checks if the given key (still) exists in the Cache.
// Note: entries expire after the ttl.
func (r *drainCache) Has(machineName types.NamespacedName) (CacheEntry, bool) {
func (r *retryCache) Has(machineName types.NamespacedName) (CacheEntry, bool) {
// Note: We can ignore the error here because GetByKey never returns an error.
item, exists, _ := r.Store.GetByKey(machineName.String())
if exists {
Expand Down
25 changes: 4 additions & 21 deletions internal/controllers/machine/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

clog "sigs.k8s.io/cluster-api/util/log"
)

// Helper contains the parameters to control the behaviour of the drain helper.
Expand Down Expand Up @@ -351,33 +353,14 @@ func (r EvictionResult) ConditionMessage() string {

// podDeleteListToString returns a comma-separated list of the first n entries of the PodDelete list.
func podDeleteListToString(podList []PodDelete, n int) string {
return listToString(podList, func(pd PodDelete) string {
return clog.ListToString(podList, func(pd PodDelete) string {
return klog.KObj(pd.Pod).String()
}, n)
}

// PodListToString returns a comma-separated list of the first n entries of the Pod list.
func PodListToString(podList []*corev1.Pod, n int) string {
return listToString(podList, func(p *corev1.Pod) string {
return clog.ListToString(podList, func(p *corev1.Pod) string {
return klog.KObj(p).String()
}, n)
}

// listToString returns a comma-separated list of the first n entries of the list (strings are calculated via stringFunc).
func listToString[T any](list []T, stringFunc func(T) string, n int) string {
shortenedBy := 0
if len(list) > n {
shortenedBy = len(list) - n
list = list[:n]
}
stringList := []string{}
for _, p := range list {
stringList = append(stringList, stringFunc(p))
}

if shortenedBy > 0 {
stringList = append(stringList, fmt.Sprintf("... (%d more)", shortenedBy))
}

return strings.Join(stringList, ", ")
}
11 changes: 11 additions & 0 deletions internal/controllers/machine/drain/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func (l *PodDeleteList) Pods() []*corev1.Pod {
return pods
}

// IgnoredPods returns a list of Pods that have to be ignored before the Node can be considered completely drained.
func (l *PodDeleteList) IgnoredPods() []*corev1.Pod {
pods := []*corev1.Pod{}
for _, i := range l.items {
if !i.Status.Delete {
pods = append(pods, i.Pod)
}
}
return pods
}

func (l *PodDeleteList) errors() []error {
failedPods := make(map[string][]string)
for _, i := range l.items {
Expand Down
Loading

0 comments on commit 3057c55

Please sign in to comment.