Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent get SPD syncly & spd creation improvement #606

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/katalyst-agent/app/options/metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
const (
defaultServiceProfileSkipCorruptionError = true
defaultServiceProfileCacheTTL = 1 * time.Minute
defaultSPDGetFromRemote = false
)

const (
Expand Down Expand Up @@ -74,6 +75,7 @@ type MetaServerOptions struct {
// configurations for spd
ServiceProfileSkipCorruptionError bool
ServiceProfileCacheTTL time.Duration
SPDGetFromRemote bool

// configurations for pod-cache
KubeletPodCacheSyncPeriod time.Duration
Expand Down Expand Up @@ -104,6 +106,7 @@ func NewMetaServerOptions() *MetaServerOptions {

ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError,
ServiceProfileCacheTTL: defaultServiceProfileCacheTTL,
SPDGetFromRemote: defaultSPDGetFromRemote,

KubeletPodCacheSyncPeriod: defaultKubeletPodCacheSyncPeriod,
KubeletPodCacheSyncMaxRate: defaultKubeletPodCacheSyncMaxRate,
Expand Down Expand Up @@ -142,6 +145,7 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"Whether to skip corruption error when loading spd checkpoint")
fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL,
"The ttl of service profile manager cache remote spd")
fs.BoolVar(&o.SPDGetFromRemote, "spd-get-from-remote", o.SPDGetFromRemote, "get spd from remote if not in cache")

fs.DurationVar(&o.KubeletPodCacheSyncPeriod, "kubelet-pod-cache-sync-period", o.KubeletPodCacheSyncPeriod,
"The period of meta server to sync pod from kubelet 10255 port")
Expand Down Expand Up @@ -174,6 +178,7 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error

c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError
c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL
c.SPDGetFromRemote = o.SPDGetFromRemote

c.KubeletPodCacheSyncPeriod = o.KubeletPodCacheSyncPeriod
c.KubeletPodCacheSyncMaxRate = rate.Limit(o.KubeletPodCacheSyncMaxRate)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/agent/metaserver/spd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import "time"
type SPDConfiguration struct {
ServiceProfileSkipCorruptionError bool
ServiceProfileCacheTTL time.Duration
SPDGetFromRemote bool
}

func NewSPDConfiguration() *SPDConfiguration {
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/spd/indicator-plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
// IndicatorUpdater is used by IndicatorPlugin as a unified implementation
// to trigger indicator updating logic.
type IndicatorUpdater interface {
// UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus
// UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus + UpdateAggMetrics
// for indicator add functions, IndicatorUpdater will try to merge them in local stores.
UpdateExtendedIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceExtendedIndicatorSpec)
UpdateBusinessIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorSpec)
Expand Down Expand Up @@ -136,15 +136,15 @@ func (u *IndicatorManager) UpdateSystemIndicatorSpec(nn types.NamespacedName, in
}
}

func (u *IndicatorManager) UpdateBusinessIndicatorStatus(nn types.NamespacedName, indicators []apiworkload.ServiceBusinessIndicatorStatus) {
func (u *IndicatorManager) UpdateBusinessIndicatorStatus(nn types.NamespacedName, businessIndicators []apiworkload.ServiceBusinessIndicatorStatus) {
u.statusMtx.Lock()

insert := false
if _, ok := u.statusMap[nn]; !ok {
insert = true
u.statusMap[nn] = initServiceProfileDescriptorStatus()
}
for _, indicator := range indicators {
for _, indicator := range businessIndicators {
util.InsertSPDBusinessIndicatorStatus(u.statusMap[nn], &indicator)
}

Expand Down Expand Up @@ -223,5 +223,6 @@ func initServiceProfileDescriptorSpec() *apiworkload.ServiceProfileDescriptorSpe
func initServiceProfileDescriptorStatus() *apiworkload.ServiceProfileDescriptorStatus {
return &apiworkload.ServiceProfileDescriptorStatus{
BusinessStatus: []apiworkload.ServiceBusinessIndicatorStatus{},
AggMetrics: []apiworkload.AggPodMetrics{},
}
}
8 changes: 8 additions & 0 deletions pkg/controller/spd/indicator-plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"

apiworkload "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1"
Expand All @@ -44,6 +45,9 @@ type IndicatorPlugin interface {
GetSupportedExtendedIndicatorSpec() []string
GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName
GetSupportedAggMetricsStatus() []string
// GetAggMetrics is only used in SPD creation for timing-sensitive status.
// The other non-timing-sensitive status are updated at UpdateAggMetrics in indicatorUpdater.
GetAggMetrics(workload *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error)
}

type DummyIndicatorPlugin struct {
Expand Down Expand Up @@ -78,6 +82,10 @@ func (d DummyIndicatorPlugin) GetSupportedAggMetricsStatus() []string {
return nil
}

func (d DummyIndicatorPlugin) GetAggMetrics(_ *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) {
return nil, nil
}

// pluginInitializers is used to store the initializing function for each plugin
var pluginInitializers sync.Map

Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/spd/indicator-plugin/plugins/ihpa/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -109,6 +110,10 @@ func (p *Plugin) GetSupportedAggMetricsStatus() []string {
return nil
}

func (p *Plugin) GetAggMetrics(_ *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) {
return nil, nil
}

func PluginInitFunc(ctx context.Context, conf *controller.SPDConfig, _ interface{},
_ map[schema.GroupVersionResource]native.DynamicInformer,
controlCtx *katalystbase.GenericContext, updater indicatorplugin.IndicatorUpdater,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (p *ResourcePortraitIndicatorPlugin) GetSupportedAggMetricsStatus() []strin
return []string{ResourcePortraitPluginName}
}

func (p *ResourcePortraitIndicatorPlugin) GetAggMetrics(_ *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) {
return nil, nil
}

// resyncSpecWorker is used to synchronize global configuration to SPD.
func (p *ResourcePortraitIndicatorPlugin) resyncSpecWorker() {
spdList, err := p.spdLister.List(labels.Everything())
Expand Down
Loading
Loading