Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spd support extended indicator sdk and extended baseline #495

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/google/cadvisor v0.44.2
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.4.1-0.20240313101121-1cf6a8bac88a
github.com/kubewharf/katalyst-api v0.4.1-0.20240315044944-45cdd48ceedc
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240313101121-1cf6a8bac88a h1:76wdTiPIAgJzev8ZOun55tICXEfDM018zI0JtRJot9k=
github.com/kubewharf/katalyst-api v0.4.1-0.20240313101121-1cf6a8bac88a/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.4.1-0.20240315044944-45cdd48ceedc h1:KD5OnzzF1B44TpV2W+nTqCghwW7jlCqjfCZ94z6QWLg=
github.com/kubewharf/katalyst-api v0.4.1-0.20240315044944-45cdd48ceedc/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
22 changes: 21 additions & 1 deletion pkg/controller/spd/indicator-plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ const (
// IndicatorUpdater is used by IndicatorPlugin as a unified implementation
// to trigger indicator updating logic.
type IndicatorUpdater interface {
// UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus
// UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus
// for indicator add functions, IndicatorUpdater will try to merge them in local stores.
UpdateExtendedIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceExtendedIndicatorSpec)
UpdateBusinessIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorSpec)
UpdateSystemIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceSystemIndicatorSpec)
UpdateBusinessIndicatorStatus(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorStatus)
Expand Down Expand Up @@ -78,6 +79,24 @@ func NewIndicatorManager() *IndicatorManager {
}
}

func (u *IndicatorManager) UpdateExtendedIndicatorSpec(nn types.NamespacedName, indicators []apiworkload.ServiceExtendedIndicatorSpec) {
u.specMtx.Lock()

insert := false
if _, ok := u.specMap[nn]; !ok {
insert = true
u.specMap[nn] = initServiceProfileDescriptorSpec()
}
for _, indicator := range indicators {
util.InsertSPDExtendedIndicatorSpec(u.specMap[nn], &indicator)
}
u.specMtx.Unlock()

if insert {
u.specQueue <- nn
}
}

func (u *IndicatorManager) UpdateBusinessIndicatorSpec(nn types.NamespacedName, indicators []apiworkload.ServiceBusinessIndicatorSpec) {
u.specMtx.Lock()

Expand Down Expand Up @@ -173,6 +192,7 @@ func (u *IndicatorManager) GetIndicatorStatus(nn types.NamespacedName) *apiworkl

func initServiceProfileDescriptorSpec() *apiworkload.ServiceProfileDescriptorSpec {
return &apiworkload.ServiceProfileDescriptorSpec{
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{},
BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{},
SystemIndicator: []apiworkload.ServiceSystemIndicatorSpec{},
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/spd/indicator-plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ type IndicatorPlugin interface {
// is not supported by any indicator plugin, the controller will clear in CR.
GetSupportedBusinessIndicatorSpec() []apiworkload.ServiceBusinessIndicatorName
GetSupportedSystemIndicatorSpec() []apiworkload.ServiceSystemIndicatorName
GetSupportedExtendedIndicatorSpec() []string
GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName
}

type DummyIndicatorPlugin struct {
SystemSpecNames []apiworkload.ServiceSystemIndicatorName
BusinessSpecNames []apiworkload.ServiceBusinessIndicatorName
BusinessStatusNames []apiworkload.ServiceBusinessIndicatorName
ExtendedSpecNames []string
}

var _ IndicatorPlugin = DummyIndicatorPlugin{}
Expand All @@ -60,6 +62,9 @@ func (d DummyIndicatorPlugin) GetSupportedBusinessIndicatorSpec() []apiworkload.
func (d DummyIndicatorPlugin) GetSupportedSystemIndicatorSpec() []apiworkload.ServiceSystemIndicatorName {
return d.SystemSpecNames
}
func (d DummyIndicatorPlugin) GetSupportedExtendedIndicatorSpec() []string {
return d.ExtendedSpecNames
}
func (d DummyIndicatorPlugin) GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName {
return d.BusinessStatusNames
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/spd/spd.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type SPDController struct {
indicatorManager *indicator_plugin.IndicatorManager
indicatorPlugins map[string]indicator_plugin.IndicatorPlugin
indicatorsSpecBusiness map[apiworkload.ServiceBusinessIndicatorName]interface{}
indicatorsSpecExtended map[string]interface{}
indicatorsSpecSystem map[apiworkload.ServiceSystemIndicatorName]interface{}
indicatorsStatusBusiness map[apiworkload.ServiceBusinessIndicatorName]interface{}
}
Expand Down Expand Up @@ -247,6 +248,7 @@ func (sc *SPDController) initializeIndicatorPlugins(controlCtx *katalystbase.Gen
sc.indicatorPlugins = make(map[string]indicator_plugin.IndicatorPlugin)
sc.indicatorsSpecBusiness = make(map[apiworkload.ServiceBusinessIndicatorName]interface{})
sc.indicatorsSpecSystem = make(map[apiworkload.ServiceSystemIndicatorName]interface{})
sc.indicatorsSpecExtended = make(map[string]interface{})
sc.indicatorsStatusBusiness = make(map[apiworkload.ServiceBusinessIndicatorName]interface{})

initializers := indicator_plugin.GetPluginInitializers()
Expand All @@ -266,6 +268,9 @@ func (sc *SPDController) initializeIndicatorPlugins(controlCtx *katalystbase.Gen
for _, name := range plugin.GetSupportedSystemIndicatorSpec() {
sc.indicatorsSpecSystem[name] = struct{}{}
}
for _, name := range plugin.GetSupportedExtendedIndicatorSpec() {
sc.indicatorsSpecExtended[name] = struct{}{}
}
for _, name := range plugin.GetSupportedBusinessIndicatorStatus() {
sc.indicatorsStatusBusiness[name] = struct{}{}
}
Expand Down Expand Up @@ -596,7 +601,11 @@ func (sc *SPDController) getOrCreateSPDForWorkload(workload *unstructured.Unstru
AggMetrics: []apiworkload.AggPodMetrics{},
},
}
sc.updateBaselineSentinel(spd)

err := sc.updateBaselineSentinel(spd)
if err != nil {
return nil, err
}

return sc.spdControl.CreateSPD(sc.ctx, spd, metav1.CreateOptions{})
}
Expand Down
64 changes: 49 additions & 15 deletions pkg/controller/spd/spd_baseline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,80 @@ func (sc *SPDController) updateBaselineSentinel(spd *v1alpha1.ServiceProfileDesc
return nil
}

if spd.Spec.BaselinePercent == nil || *spd.Spec.BaselinePercent >= consts.SPDBaselinePercentMax || *spd.Spec.BaselinePercent <= consts.SPDBaselinePercentMin {
// delete baseline sentinel annotation if baseline percent or extended indicator not set
if spd.Spec.BaselinePercent == nil && len(spd.Spec.ExtendedIndicator) == 0 {
util.SetSPDBaselineSentinel(spd, nil)
util.SetSPDExtendedBaselineSentinel(spd, nil)
return nil
}

podMeta, err := sc.calculateBaselineSentinel(spd)
podMetaList, err := sc.getSPDPodMetaList(spd)
if err != nil {
return err
}
util.SetSPDBaselineSentinel(spd, &podMeta)

// calculate baseline sentinel
baselineSentinel := calculateBaselineSentinel(podMetaList, spd.Spec.BaselinePercent)

// calculate extended baseline sentinel for each extended indicator
extendedBaselineSentinel := make(map[string]util.SPDBaselinePodMeta)
for _, indicator := range spd.Spec.ExtendedIndicator {
sentinel := calculateBaselineSentinel(podMetaList, indicator.BaselinePercent)
if sentinel == nil {
continue
}

extendedBaselineSentinel[indicator.Name] = *sentinel
}

util.SetSPDBaselineSentinel(spd, baselineSentinel)
util.SetSPDExtendedBaselineSentinel(spd, extendedBaselineSentinel)
return nil
}

// calculateBaselineSentinel returns the sentinel one for a list of pods
// referenced by the SPD. If one pod's createTime is less than the sentinel pod
func (sc *SPDController) calculateBaselineSentinel(spd *v1alpha1.ServiceProfileDescriptor) (util.SPDBaselinePodMeta, error) {
// getSPDPodMetaList get spd pod meta list in order
func (sc *SPDController) getSPDPodMetaList(spd *v1alpha1.ServiceProfileDescriptor) ([]util.SPDBaselinePodMeta, error) {
gvr, _ := meta.UnsafeGuessKindToResource(schema.FromAPIVersionAndKind(spd.Spec.TargetRef.APIVersion, spd.Spec.TargetRef.Kind))
workloadLister, ok := sc.workloadLister[gvr]
if !ok {
return util.SPDBaselinePodMeta{}, fmt.Errorf("without workload lister for gvr %v", gvr)
return nil, fmt.Errorf("without workload lister for gvr %v", gvr)
}

podList, err := util.GetPodListForSPD(spd, sc.podIndexer, sc.conf.SPDPodLabelIndexerKeys, workloadLister, sc.podLister)
if err != nil {
return util.SPDBaselinePodMeta{}, err
return nil, err
}

podList = native.FilterPods(podList, func(pod *v1.Pod) (bool, error) {
return native.PodIsActive(pod), nil
})
if len(podList) == 0 {
return util.SPDBaselinePodMeta{}, nil
return nil, nil
}

bcList := make([]util.SPDBaselinePodMeta, 0, len(podList))
podMetaList := make([]util.SPDBaselinePodMeta, 0, len(podList))
for _, p := range podList {
bcList = append(bcList, util.GetPodMeta(p))
podMetaList = append(podMetaList, util.GetPodMeta(p))
}
sort.SliceStable(bcList, func(i, j int) bool {
return bcList[i].Cmp(bcList[j]) < 0
sort.SliceStable(podMetaList, func(i, j int) bool {
return podMetaList[i].Cmp(podMetaList[j]) < 0
})
baselineIndex := int(math.Floor(float64(len(bcList)-1) * float64(*spd.Spec.BaselinePercent) / 100))
return bcList[baselineIndex], nil

return podMetaList, nil
}

// calculateBaselineSentinel returns the sentinel one for a list of pods
// referenced by the SPD. If one pod's createTime is less than the sentinel pod
func calculateBaselineSentinel(podMetaList []util.SPDBaselinePodMeta, baselinePercent *int32) *util.SPDBaselinePodMeta {
if baselinePercent == nil || *baselinePercent >= consts.SPDBaselinePercentMax ||
*baselinePercent <= consts.SPDBaselinePercentMin {
return nil
}

if len(podMetaList) == 0 {
return nil
}

baselineIndex := int(math.Floor(float64(len(podMetaList)-1) * float64(*baselinePercent) / 100))
return &podMetaList[baselineIndex]
}
24 changes: 20 additions & 4 deletions pkg/controller/spd/spd_baseline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "spd1",
Annotations: map[string]string{
consts.SPDAnnotationBaselineSentinelKey: util.SPDBaselinePodMeta{}.String(),
},
},
Spec: apiworkload.ServiceProfileDescriptorSpec{
TargetRef: apis.CrossVersionObjectReference{
Expand Down Expand Up @@ -288,6 +285,15 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
APIVersion: stsGVK.GroupVersion().String(),
},
BaselinePercent: pointer.Int32(50),
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{
{
Name: "TestExtended",
BaselinePercent: pointer.Int32(50),
Indicators: runtime.RawExtension{
Object: &apiworkload.TestExtendedIndicators{},
},
},
},
},
Status: apiworkload.ServiceProfileDescriptorStatus{},
},
Expand All @@ -297,7 +303,8 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
Namespace: "default",
Name: "spd1",
Annotations: map[string]string{
consts.SPDAnnotationBaselineSentinelKey: "{\"timeStamp\":\"2023-08-01T00:00:01Z\",\"podName\":\"pod2\"}",
consts.SPDAnnotationBaselineSentinelKey: "{\"timeStamp\":\"2023-08-01T00:00:01Z\",\"podName\":\"pod2\"}",
consts.SPDAnnotationExtendedBaselineSentinelKey: "{\"TestExtended\":{\"timeStamp\":\"2023-08-01T00:00:01Z\",\"podName\":\"pod2\"}}",
},
},
Spec: apiworkload.ServiceProfileDescriptorSpec{
Expand All @@ -307,6 +314,15 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
APIVersion: stsGVK.GroupVersion().String(),
},
BaselinePercent: pointer.Int32(50),
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{
{
Name: "TestExtended",
BaselinePercent: pointer.Int32(50),
Indicators: runtime.RawExtension{
Object: &apiworkload.TestExtendedIndicators{},
},
},
},
},
Status: apiworkload.ServiceProfileDescriptorStatus{},
},
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/spd/spd_indicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func (sc *SPDController) mergeIndicatorSpec(spd *apiworkload.ServiceProfileDescr
for _, indicator := range expected.SystemIndicator {
util.InsertSPDSystemIndicatorSpec(&spd.Spec, &indicator)
}
for _, indicator := range expected.ExtendedIndicator {
util.InsertSPDExtendedIndicatorSpec(&spd.Spec, &indicator)
}

for i := 0; i < len(spd.Spec.BusinessIndicator); i++ {
if _, ok := sc.indicatorsSpecBusiness[spd.Spec.BusinessIndicator[i].Name]; !ok {
Expand All @@ -200,6 +203,13 @@ func (sc *SPDController) mergeIndicatorSpec(spd *apiworkload.ServiceProfileDescr
spd.Spec.SystemIndicator = append(spd.Spec.SystemIndicator[:i], spd.Spec.SystemIndicator[i+1:]...)
}
}

for i := 0; i < len(spd.Spec.ExtendedIndicator); i++ {
if _, ok := sc.indicatorsSpecExtended[spd.Spec.ExtendedIndicator[i].Name]; !ok {
klog.Infof("skip spec extended %v for spd %v", spd.Spec.ExtendedIndicator[i].Name, spd.Name)
spd.Spec.ExtendedIndicator = append(spd.Spec.ExtendedIndicator[:i], spd.Spec.ExtendedIndicator[i+1:]...)
}
}
}

func (sc *SPDController) mergeIndicatorStatus(spd *apiworkload.ServiceProfileDescriptor, expected apiworkload.ServiceProfileDescriptorStatus) {
Expand Down
25 changes: 25 additions & 0 deletions pkg/controller/spd/spd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,16 @@ func TestIndicatorUpdater(t *testing.T) {
APIVersion: stsGVK.GroupVersion().String(),
},
BaselinePercent: pointer.Int32(20),
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{
{
Name: "TestExtended",
Indicators: runtime.RawExtension{
Object: &apiworkload.TestExtendedIndicators{
Indicators: &apiworkload.TestIndicators{},
},
},
},
},
BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{
{
Name: "business-1",
Expand Down Expand Up @@ -614,6 +624,9 @@ func TestIndicatorUpdater(t *testing.T) {
}

d1 := indicator_plugin.DummyIndicatorPlugin{
ExtendedSpecNames: []string{
"TestExtended",
},
SystemSpecNames: []apiworkload.ServiceSystemIndicatorName{
"system-1",
},
Expand Down Expand Up @@ -671,6 +684,17 @@ func TestIndicatorUpdater(t *testing.T) {
synced := cache.WaitForCacheSync(ctx.Done(), sc.syncedFunc...)
assert.True(t, synced)

sc.indicatorManager.UpdateExtendedIndicatorSpec(nn, []apiworkload.ServiceExtendedIndicatorSpec{
{
Name: "TestExtended",
Indicators: runtime.RawExtension{
Object: &apiworkload.TestExtendedIndicators{
Indicators: &apiworkload.TestIndicators{},
},
},
},
})

sc.indicatorManager.UpdateBusinessIndicatorSpec(nn, []apiworkload.ServiceBusinessIndicatorSpec{
{
Name: "business-1",
Expand Down Expand Up @@ -753,6 +777,7 @@ func TestIndicatorUpdater(t *testing.T) {
newSPD, err := controlCtx.Client.InternalClient.WorkloadV1alpha1().
ServiceProfileDescriptors("default").Get(ctx, "spd1", metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, expectedSpd.Spec.ExtendedIndicator, newSPD.Spec.ExtendedIndicator)
assert.Equal(t, expectedSpd.Spec.BusinessIndicator, newSPD.Spec.BusinessIndicator)
assert.Equal(t, expectedSpd.Spec.SystemIndicator, newSPD.Spec.SystemIndicator)
assert.Equal(t, expectedSpd.Status.BusinessStatus, newSPD.Status.BusinessStatus)
Expand Down
Loading
Loading