Skip to content

Commit

Permalink
agent get SPD syncly & spd creation improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
ddjjia committed Jul 12, 2024
1 parent 65905ce commit b21825a
Show file tree
Hide file tree
Showing 12 changed files with 889 additions and 672 deletions.
5 changes: 5 additions & 0 deletions cmd/katalyst-agent/app/options/metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
const (
defaultServiceProfileSkipCorruptionError = true
defaultServiceProfileCacheTTL = 1 * time.Minute
defaultSPDGetFromRemote = false
)

const (
Expand Down Expand Up @@ -74,6 +75,7 @@ type MetaServerOptions struct {
// configurations for spd
ServiceProfileSkipCorruptionError bool
ServiceProfileCacheTTL time.Duration
SPDGetFromRemote bool

// configurations for pod-cache
KubeletPodCacheSyncPeriod time.Duration
Expand Down Expand Up @@ -104,6 +106,7 @@ func NewMetaServerOptions() *MetaServerOptions {

ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError,
ServiceProfileCacheTTL: defaultServiceProfileCacheTTL,
SPDGetFromRemote: defaultSPDGetFromRemote,

KubeletPodCacheSyncPeriod: defaultKubeletPodCacheSyncPeriod,
KubeletPodCacheSyncMaxRate: defaultKubeletPodCacheSyncMaxRate,
Expand Down Expand Up @@ -142,6 +145,7 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"Whether to skip corruption error when loading spd checkpoint")
fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL,
"The ttl of service profile manager cache remote spd")
fs.BoolVar(&o.SPDGetFromRemote, "spd-get-from-remote", o.SPDGetFromRemote, "get spd from remote if not in cache")

fs.DurationVar(&o.KubeletPodCacheSyncPeriod, "kubelet-pod-cache-sync-period", o.KubeletPodCacheSyncPeriod,
"The period of meta server to sync pod from kubelet 10255 port")
Expand Down Expand Up @@ -174,6 +178,7 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error

c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError
c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL
c.SPDGetFromRemote = o.SPDGetFromRemote

c.KubeletPodCacheSyncPeriod = o.KubeletPodCacheSyncPeriod
c.KubeletPodCacheSyncMaxRate = rate.Limit(o.KubeletPodCacheSyncMaxRate)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/metaserver/spd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import "time"
type SPDConfiguration struct {
ServiceProfileSkipCorruptionError bool
ServiceProfileCacheTTL time.Duration
SPDGetFromRemote bool
}

func NewSPDConfiguration() *SPDConfiguration {
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/spd/indicator-plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
// IndicatorUpdater is used by IndicatorPlugin as a unified implementation
// to trigger indicator updating logic.
type IndicatorUpdater interface {
// UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus
// UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus + UpdateAggMetrics
// for indicator add functions, IndicatorUpdater will try to merge them in local stores.
UpdateExtendedIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceExtendedIndicatorSpec)
UpdateBusinessIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorSpec)
Expand Down Expand Up @@ -136,15 +136,15 @@ func (u *IndicatorManager) UpdateSystemIndicatorSpec(nn types.NamespacedName, in
}
}

func (u *IndicatorManager) UpdateBusinessIndicatorStatus(nn types.NamespacedName, indicators []apiworkload.ServiceBusinessIndicatorStatus) {
func (u *IndicatorManager) UpdateBusinessIndicatorStatus(nn types.NamespacedName, businessIndicators []apiworkload.ServiceBusinessIndicatorStatus) {
u.statusMtx.Lock()

insert := false
if _, ok := u.statusMap[nn]; !ok {
insert = true
u.statusMap[nn] = initServiceProfileDescriptorStatus()
}
for _, indicator := range indicators {
for _, indicator := range businessIndicators {
util.InsertSPDBusinessIndicatorStatus(u.statusMap[nn], &indicator)
}

Expand Down Expand Up @@ -223,5 +223,6 @@ func initServiceProfileDescriptorSpec() *apiworkload.ServiceProfileDescriptorSpe
func initServiceProfileDescriptorStatus() *apiworkload.ServiceProfileDescriptorStatus {
return &apiworkload.ServiceProfileDescriptorStatus{
BusinessStatus: []apiworkload.ServiceBusinessIndicatorStatus{},
AggMetrics: []apiworkload.AggPodMetrics{},
}
}
8 changes: 8 additions & 0 deletions pkg/controller/spd/indicator-plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"

apiworkload "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1"
Expand All @@ -44,6 +45,9 @@ type IndicatorPlugin interface {
GetSupportedExtendedIndicatorSpec() []string
GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName
GetSupportedAggMetricsStatus() []string
// GetAggMetrics is only used in SPD creation for timing-sensitive status.
// The other non-timing-sensitive status are updated at UpdateAggMetrics in indicatorUpdater.
GetAggMetrics(workload *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error)
}

type DummyIndicatorPlugin struct {
Expand Down Expand Up @@ -78,6 +82,10 @@ func (d DummyIndicatorPlugin) GetSupportedAggMetricsStatus() []string {
return nil
}

func (d DummyIndicatorPlugin) GetAggMetrics(_ *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) {
return nil, nil
}

// pluginInitializers is used to store the initializing function for each plugin
var pluginInitializers sync.Map

Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/spd/indicator-plugin/plugins/ihpa/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -109,6 +110,10 @@ func (p *Plugin) GetSupportedAggMetricsStatus() []string {
return nil
}

func (p *Plugin) GetAggMetrics(_ *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) {
return nil, nil
}

func PluginInitFunc(ctx context.Context, conf *controller.SPDConfig, _ interface{},
_ map[schema.GroupVersionResource]native.DynamicInformer,
controlCtx *katalystbase.GenericContext, updater indicatorplugin.IndicatorUpdater,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (p *ResourcePortraitIndicatorPlugin) GetSupportedAggMetricsStatus() []strin
return []string{ResourcePortraitPluginName}
}

func (p *ResourcePortraitIndicatorPlugin) GetAggMetrics(_ *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) {
return nil, nil
}

// resyncSpecWorker is used to synchronize global configuration to SPD.
func (p *ResourcePortraitIndicatorPlugin) resyncSpecWorker() {
spdList, err := p.spdLister.List(labels.Everything())
Expand Down
Loading

0 comments on commit b21825a

Please sign in to comment.