diff --git a/cmd/katalyst-agent/app/options/metaserver/metaserver.go b/cmd/katalyst-agent/app/options/metaserver/metaserver.go index 2b4afeda0b..6fe24106d7 100644 --- a/cmd/katalyst-agent/app/options/metaserver/metaserver.go +++ b/cmd/katalyst-agent/app/options/metaserver/metaserver.go @@ -38,7 +38,10 @@ const ( defaultConfigCheckpointGraceTime = 2 * time.Hour ) -const defaultServiceProfileCacheTTL = 1 * time.Minute +const ( + defaultServiceProfileSkipCorruptionError = true + defaultServiceProfileCacheTTL = 1 * time.Minute +) const defaultMetricInsurancePeriod = 0 * time.Second @@ -70,7 +73,8 @@ type MetaServerOptions struct { ConfigCheckpointGraceTime time.Duration // configurations for spd - ServiceProfileCacheTTL time.Duration + ServiceProfileSkipCorruptionError bool + ServiceProfileCacheTTL time.Duration // configurations for metric-fetcher MetricInsurancePeriod time.Duration @@ -100,7 +104,8 @@ func NewMetaServerOptions() *MetaServerOptions { ConfigSkipFailedInitialization: defaultConfigSkipFailedInitialization, ConfigCheckpointGraceTime: defaultConfigCheckpointGraceTime, - ServiceProfileCacheTTL: defaultServiceProfileCacheTTL, + ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError, + ServiceProfileCacheTTL: defaultServiceProfileCacheTTL, MetricInsurancePeriod: defaultMetricInsurancePeriod, MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet}, @@ -136,6 +141,8 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.DurationVar(&o.ConfigCheckpointGraceTime, "config-checkpoint-grace-time", o.ConfigCheckpointGraceTime, "The grace time of meta server config checkpoint") + fs.BoolVar(&o.ServiceProfileSkipCorruptionError, "service-profile-skip-corruption-error", o.ServiceProfileSkipCorruptionError, + "Whether to skip corruption error when loading spd checkpoint") fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL, "The ttl of service profile manager cache remote spd") @@ -171,6 +178,7 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error c.ConfigSkipFailedInitialization = o.ConfigSkipFailedInitialization c.ConfigCheckpointGraceTime = o.ConfigCheckpointGraceTime + c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL c.MetricInsurancePeriod = o.MetricInsurancePeriod diff --git a/pkg/config/agent/metaserver/spd.go b/pkg/config/agent/metaserver/spd.go index d8169003bf..45a4f99c33 100644 --- a/pkg/config/agent/metaserver/spd.go +++ b/pkg/config/agent/metaserver/spd.go @@ -19,7 +19,8 @@ package metaserver import "time" type SPDConfiguration struct { - ServiceProfileCacheTTL time.Duration + ServiceProfileSkipCorruptionError bool + ServiceProfileCacheTTL time.Duration } func NewSPDConfiguration() *SPDConfiguration { diff --git a/pkg/metaserver/spd/cache.go b/pkg/metaserver/spd/cache.go index fddd7d9052..30775b3a4e 100644 --- a/pkg/metaserver/spd/cache.go +++ b/pkg/metaserver/spd/cache.go @@ -55,24 +55,26 @@ type spdInfo struct { type Cache struct { sync.RWMutex - expiredTime time.Duration - cacheTTL time.Duration - jitterFactor float64 - maxRetryCount int64 + skipCorruptionError bool + expiredTime time.Duration + cacheTTL time.Duration + jitterFactor float64 + maxRetryCount int64 manager checkpointmanager.CheckpointManager spdInfo map[string]*spdInfo } -func NewSPDCache(manager checkpointmanager.CheckpointManager, cacheTTL, expiredTime time.Duration, - maxRetryCount int64, jitterFactor float64) *Cache { +func NewSPDCache(manager checkpointmanager.CheckpointManager, skipCorruptionError bool, + cacheTTL, expiredTime time.Duration, maxRetryCount int64, jitterFactor float64) *Cache { cache := &Cache{ - spdInfo: map[string]*spdInfo{}, - manager: manager, - expiredTime: expiredTime, - cacheTTL: cacheTTL, - jitterFactor: jitterFactor, - maxRetryCount: maxRetryCount, + spdInfo: map[string]*spdInfo{}, + manager: manager, + skipCorruptionError: skipCorruptionError, + expiredTime: expiredTime, + cacheTTL: cacheTTL, + jitterFactor: jitterFactor, + maxRetryCount: maxRetryCount, } err := cache.restore() @@ -210,13 +212,16 @@ func (s *Cache) restore() error { s.Lock() defer s.Unlock() - spdList, err := checkpoint.LoadSPDs(s.manager) + spdList, err := checkpoint.LoadSPDs(s.manager, s.skipCorruptionError) if err != nil { return fmt.Errorf("restore spd failed: %v", err) } now := time.Now() for _, spd := range spdList { + if spd == nil { + continue + } key := native.GenerateUniqObjectNameKey(spd) s.initSPDInfoWithoutLock(key) s.spdInfo[key].spd = spd diff --git a/pkg/metaserver/spd/checkpoint/checkpoint.go b/pkg/metaserver/spd/checkpoint/checkpoint.go index c54d36eceb..f3c37782a6 100644 --- a/pkg/metaserver/spd/checkpoint/checkpoint.go +++ b/pkg/metaserver/spd/checkpoint/checkpoint.go @@ -18,13 +18,14 @@ package checkpoint import ( "encoding/json" + "errors" "fmt" "strings" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" + cpmerrors "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" ) @@ -79,7 +80,7 @@ func getSPDKey(spd *v1alpha1.ServiceProfileDescriptor) string { } // LoadSPDs Loads All Checkpoints from disk -func LoadSPDs(cpm checkpointmanager.CheckpointManager) ([]*v1alpha1.ServiceProfileDescriptor, error) { +func LoadSPDs(cpm checkpointmanager.CheckpointManager, skipCorruptionError bool) ([]*v1alpha1.ServiceProfileDescriptor, error) { spd := make([]*v1alpha1.ServiceProfileDescriptor, 0) checkpointKeys, err := cpm.ListCheckpoints() @@ -95,8 +96,15 @@ func LoadSPDs(cpm checkpointmanager.CheckpointManager) ([]*v1alpha1.ServiceProfi checkpoint := NewServiceProfileCheckpoint(nil) err := cpm.GetCheckpoint(key, checkpoint) if err != nil { - klog.Errorf("Failed to retrieve checkpoint for spd %q: %v", key, err) - continue + klog.Errorf("Failed to retrieve checkpoint for spd %q, error: %v", key, err) + if !errors.Is(err, cpmerrors.ErrCorruptCheckpoint) { + continue + } else { + if !skipCorruptionError { + continue + } + klog.Warningf("Skip corruption error for spd %q", key) + } } spd = append(spd, checkpoint.GetSPD()) } diff --git a/pkg/metaserver/spd/checkpoint/checkpoint_test.go b/pkg/metaserver/spd/checkpoint/checkpoint_test.go index 01b2967010..430cb3601b 100644 --- a/pkg/metaserver/spd/checkpoint/checkpoint_test.go +++ b/pkg/metaserver/spd/checkpoint/checkpoint_test.go @@ -81,7 +81,7 @@ func TestWriteLoadDeleteSPDs(t *testing.T) { } // verify the correct written files are loaded from disk - spdList, err := LoadSPDs(cpm) + spdList, err := LoadSPDs(cpm, false) if err != nil { t.Errorf("failed to Load spds: %v", err) } diff --git a/pkg/metaserver/spd/fetcher.go b/pkg/metaserver/spd/fetcher.go index 0272b5d772..1c6781369d 100644 --- a/pkg/metaserver/spd/fetcher.go +++ b/pkg/metaserver/spd/fetcher.go @@ -108,7 +108,8 @@ func NewSPDFetcher(clientSet *client.GenericClientSet, emitter metrics.MetricEmi } m.getPodSPDNameFunc = util.GetPodSPDName - m.spdCache = NewSPDCache(checkpointManager, conf.ServiceProfileCacheTTL, defaultClearUnusedSPDPeriod, defaultMaxRetryCount, defaultJitterFactor) + m.spdCache = NewSPDCache(checkpointManager, conf.ServiceProfileSkipCorruptionError, conf.ServiceProfileCacheTTL, + defaultClearUnusedSPDPeriod, defaultMaxRetryCount, defaultJitterFactor) return m, nil }