Skip to content

Commit

Permalink
fix spd checkpoint is corrupted when api was changed
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Apr 9, 2024
1 parent 7f30841 commit 779d640
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 23 deletions.
14 changes: 11 additions & 3 deletions cmd/katalyst-agent/app/options/metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ const (
defaultConfigCheckpointGraceTime = 2 * time.Hour
)

const defaultServiceProfileCacheTTL = 1 * time.Minute
const (
defaultServiceProfileSkipCorruptionError = true
defaultServiceProfileCacheTTL = 1 * time.Minute
)

const defaultMetricInsurancePeriod = 0 * time.Second

Expand Down Expand Up @@ -70,7 +73,8 @@ type MetaServerOptions struct {
ConfigCheckpointGraceTime time.Duration

// configurations for spd
ServiceProfileCacheTTL time.Duration
ServiceProfileSkipCorruptionError bool
ServiceProfileCacheTTL time.Duration

// configurations for metric-fetcher
MetricInsurancePeriod time.Duration
Expand Down Expand Up @@ -100,7 +104,8 @@ func NewMetaServerOptions() *MetaServerOptions {
ConfigSkipFailedInitialization: defaultConfigSkipFailedInitialization,
ConfigCheckpointGraceTime: defaultConfigCheckpointGraceTime,

ServiceProfileCacheTTL: defaultServiceProfileCacheTTL,
ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError,
ServiceProfileCacheTTL: defaultServiceProfileCacheTTL,

MetricInsurancePeriod: defaultMetricInsurancePeriod,
MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet},
Expand Down Expand Up @@ -136,6 +141,8 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs.DurationVar(&o.ConfigCheckpointGraceTime, "config-checkpoint-grace-time", o.ConfigCheckpointGraceTime,
"The grace time of meta server config checkpoint")

fs.BoolVar(&o.ServiceProfileSkipCorruptionError, "service-profile-skip-corruption-error", o.ServiceProfileSkipCorruptionError,
"Whether to skip corruption error when loading spd checkpoint")
fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL,
"The ttl of service profile manager cache remote spd")

Expand Down Expand Up @@ -171,6 +178,7 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error
c.ConfigSkipFailedInitialization = o.ConfigSkipFailedInitialization
c.ConfigCheckpointGraceTime = o.ConfigCheckpointGraceTime

c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError
c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL

c.MetricInsurancePeriod = o.MetricInsurancePeriod
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/agent/metaserver/spd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package metaserver
import "time"

type SPDConfiguration struct {
ServiceProfileCacheTTL time.Duration
ServiceProfileSkipCorruptionError bool
ServiceProfileCacheTTL time.Duration
}

func NewSPDConfiguration() *SPDConfiguration {
Expand Down
31 changes: 18 additions & 13 deletions pkg/metaserver/spd/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,26 @@ type spdInfo struct {
type Cache struct {
sync.RWMutex

expiredTime time.Duration
cacheTTL time.Duration
jitterFactor float64
maxRetryCount int64
skipCorruptionError bool
expiredTime time.Duration
cacheTTL time.Duration
jitterFactor float64
maxRetryCount int64

manager checkpointmanager.CheckpointManager
spdInfo map[string]*spdInfo
}

func NewSPDCache(manager checkpointmanager.CheckpointManager, cacheTTL, expiredTime time.Duration,
maxRetryCount int64, jitterFactor float64) *Cache {
func NewSPDCache(manager checkpointmanager.CheckpointManager, skipCorruptionError bool,
cacheTTL, expiredTime time.Duration, maxRetryCount int64, jitterFactor float64) *Cache {
cache := &Cache{
spdInfo: map[string]*spdInfo{},
manager: manager,
expiredTime: expiredTime,
cacheTTL: cacheTTL,
jitterFactor: jitterFactor,
maxRetryCount: maxRetryCount,
spdInfo: map[string]*spdInfo{},
manager: manager,
skipCorruptionError: skipCorruptionError,
expiredTime: expiredTime,
cacheTTL: cacheTTL,
jitterFactor: jitterFactor,
maxRetryCount: maxRetryCount,
}

err := cache.restore()
Expand Down Expand Up @@ -210,13 +212,16 @@ func (s *Cache) restore() error {
s.Lock()
defer s.Unlock()

spdList, err := checkpoint.LoadSPDs(s.manager)
spdList, err := checkpoint.LoadSPDs(s.manager, s.skipCorruptionError)
if err != nil {
return fmt.Errorf("restore spd failed: %v", err)
}

now := time.Now()
for _, spd := range spdList {
if spd == nil {
continue
}
key := native.GenerateUniqObjectNameKey(spd)
s.initSPDInfoWithoutLock(key)
s.spdInfo[key].spd = spd
Expand Down
16 changes: 12 additions & 4 deletions pkg/metaserver/spd/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package checkpoint

import (
"encoding/json"
"errors"
"fmt"
"strings"

"k8s.io/klog/v2"

"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
cpmerrors "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"

"github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1"
)
Expand Down Expand Up @@ -79,7 +80,7 @@ func getSPDKey(spd *v1alpha1.ServiceProfileDescriptor) string {
}

// LoadSPDs Loads All Checkpoints from disk
func LoadSPDs(cpm checkpointmanager.CheckpointManager) ([]*v1alpha1.ServiceProfileDescriptor, error) {
func LoadSPDs(cpm checkpointmanager.CheckpointManager, skipCorruptionError bool) ([]*v1alpha1.ServiceProfileDescriptor, error) {
spd := make([]*v1alpha1.ServiceProfileDescriptor, 0)

checkpointKeys, err := cpm.ListCheckpoints()
Expand All @@ -95,8 +96,15 @@ func LoadSPDs(cpm checkpointmanager.CheckpointManager) ([]*v1alpha1.ServiceProfi
checkpoint := NewServiceProfileCheckpoint(nil)
err := cpm.GetCheckpoint(key, checkpoint)
if err != nil {
klog.Errorf("Failed to retrieve checkpoint for spd %q: %v", key, err)
continue
klog.Errorf("Failed to retrieve checkpoint for spd %q, error: %v", key, err)
if !errors.Is(err, cpmerrors.ErrCorruptCheckpoint) {
continue
} else {
if !skipCorruptionError {
continue
}
klog.Warningf("Skip corruption error for spd %q", key)
}
}
spd = append(spd, checkpoint.GetSPD())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/metaserver/spd/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestWriteLoadDeleteSPDs(t *testing.T) {
}

// verify the correct written files are loaded from disk
spdList, err := LoadSPDs(cpm)
spdList, err := LoadSPDs(cpm, false)
if err != nil {
t.Errorf("failed to Load spds: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/metaserver/spd/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func NewSPDFetcher(clientSet *client.GenericClientSet, emitter metrics.MetricEmi
}

m.getPodSPDNameFunc = util.GetPodSPDName
m.spdCache = NewSPDCache(checkpointManager, conf.ServiceProfileCacheTTL, defaultClearUnusedSPDPeriod, defaultMaxRetryCount, defaultJitterFactor)
m.spdCache = NewSPDCache(checkpointManager, conf.ServiceProfileSkipCorruptionError, conf.ServiceProfileCacheTTL,
defaultClearUnusedSPDPeriod, defaultMaxRetryCount, defaultJitterFactor)

return m, nil
}
Expand Down

0 comments on commit 779d640

Please sign in to comment.