Skip to content

Commit

Permalink
spd support extended indicator sdk and extended baseline
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Mar 15, 2024
1 parent fbb326a commit 5d62ca7
Show file tree
Hide file tree
Showing 15 changed files with 851 additions and 123 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/google/cadvisor v0.44.2
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58
github.com/kubewharf/katalyst-api v0.4.1-0.20240315044944-45cdd48ceedc
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58 h1:D9dCR5EIR0k0Qil2A5biZjrubagRkEr7fyov6fb2ApY=
github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.4.1-0.20240315044944-45cdd48ceedc h1:KD5OnzzF1B44TpV2W+nTqCghwW7jlCqjfCZ94z6QWLg=
github.com/kubewharf/katalyst-api v0.4.1-0.20240315044944-45cdd48ceedc/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
22 changes: 21 additions & 1 deletion pkg/controller/spd/indicator-plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ const (
// IndicatorUpdater is used by IndicatorPlugin as a unified implementation
// to trigger indicator updating logic.
type IndicatorUpdater interface {
// UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus
// UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus
// for indicator add functions, IndicatorUpdater will try to merge them in local stores.
UpdateExtendedIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceExtendedIndicatorSpec)
UpdateBusinessIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorSpec)
UpdateSystemIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceSystemIndicatorSpec)
UpdateBusinessIndicatorStatus(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorStatus)
Expand Down Expand Up @@ -78,6 +79,24 @@ func NewIndicatorManager() *IndicatorManager {
}
}

func (u *IndicatorManager) UpdateExtendedIndicatorSpec(nn types.NamespacedName, indicators []apiworkload.ServiceExtendedIndicatorSpec) {
u.specMtx.Lock()

insert := false
if _, ok := u.specMap[nn]; !ok {
insert = true
u.specMap[nn] = initServiceProfileDescriptorSpec()
}
for _, indicator := range indicators {
util.InsertSPDExtendedIndicatorSpec(u.specMap[nn], &indicator)
}
u.specMtx.Unlock()

if insert {
u.specQueue <- nn
}
}

func (u *IndicatorManager) UpdateBusinessIndicatorSpec(nn types.NamespacedName, indicators []apiworkload.ServiceBusinessIndicatorSpec) {
u.specMtx.Lock()

Expand Down Expand Up @@ -173,6 +192,7 @@ func (u *IndicatorManager) GetIndicatorStatus(nn types.NamespacedName) *apiworkl

func initServiceProfileDescriptorSpec() *apiworkload.ServiceProfileDescriptorSpec {
return &apiworkload.ServiceProfileDescriptorSpec{
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{},
BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{},
SystemIndicator: []apiworkload.ServiceSystemIndicatorSpec{},
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/spd/indicator-plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ type IndicatorPlugin interface {
// is not supported by any indicator plugin, the controller will clear in CR.
GetSupportedBusinessIndicatorSpec() []apiworkload.ServiceBusinessIndicatorName
GetSupportedSystemIndicatorSpec() []apiworkload.ServiceSystemIndicatorName
GetSupportedExtendedIndicatorSpec() []string
GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName
}

type DummyIndicatorPlugin struct {
SystemSpecNames []apiworkload.ServiceSystemIndicatorName
BusinessSpecNames []apiworkload.ServiceBusinessIndicatorName
BusinessStatusNames []apiworkload.ServiceBusinessIndicatorName
ExtendedSpecNames []string
}

var _ IndicatorPlugin = DummyIndicatorPlugin{}
Expand All @@ -60,6 +62,9 @@ func (d DummyIndicatorPlugin) GetSupportedBusinessIndicatorSpec() []apiworkload.
func (d DummyIndicatorPlugin) GetSupportedSystemIndicatorSpec() []apiworkload.ServiceSystemIndicatorName {
return d.SystemSpecNames
}
func (d DummyIndicatorPlugin) GetSupportedExtendedIndicatorSpec() []string {
return d.ExtendedSpecNames
}
func (d DummyIndicatorPlugin) GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName {
return d.BusinessStatusNames
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/spd/spd.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type SPDController struct {
indicatorManager *indicator_plugin.IndicatorManager
indicatorPlugins map[string]indicator_plugin.IndicatorPlugin
indicatorsSpecBusiness map[apiworkload.ServiceBusinessIndicatorName]interface{}
indicatorsSpecExtended map[string]interface{}
indicatorsSpecSystem map[apiworkload.ServiceSystemIndicatorName]interface{}
indicatorsStatusBusiness map[apiworkload.ServiceBusinessIndicatorName]interface{}
}
Expand Down Expand Up @@ -247,6 +248,7 @@ func (sc *SPDController) initializeIndicatorPlugins(controlCtx *katalystbase.Gen
sc.indicatorPlugins = make(map[string]indicator_plugin.IndicatorPlugin)
sc.indicatorsSpecBusiness = make(map[apiworkload.ServiceBusinessIndicatorName]interface{})
sc.indicatorsSpecSystem = make(map[apiworkload.ServiceSystemIndicatorName]interface{})
sc.indicatorsSpecExtended = make(map[string]interface{})
sc.indicatorsStatusBusiness = make(map[apiworkload.ServiceBusinessIndicatorName]interface{})

initializers := indicator_plugin.GetPluginInitializers()
Expand All @@ -266,6 +268,9 @@ func (sc *SPDController) initializeIndicatorPlugins(controlCtx *katalystbase.Gen
for _, name := range plugin.GetSupportedSystemIndicatorSpec() {
sc.indicatorsSpecSystem[name] = struct{}{}
}
for _, name := range plugin.GetSupportedExtendedIndicatorSpec() {
sc.indicatorsSpecExtended[name] = struct{}{}
}
for _, name := range plugin.GetSupportedBusinessIndicatorStatus() {
sc.indicatorsStatusBusiness[name] = struct{}{}
}
Expand Down Expand Up @@ -596,7 +601,11 @@ func (sc *SPDController) getOrCreateSPDForWorkload(workload *unstructured.Unstru
AggMetrics: []apiworkload.AggPodMetrics{},
},
}
sc.updateBaselineSentinel(spd)

err := sc.updateBaselineSentinel(spd)
if err != nil {
return nil, err
}

return sc.spdControl.CreateSPD(sc.ctx, spd, metav1.CreateOptions{})
}
Expand Down
64 changes: 49 additions & 15 deletions pkg/controller/spd/spd_baseline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,80 @@ func (sc *SPDController) updateBaselineSentinel(spd *v1alpha1.ServiceProfileDesc
return nil
}

if spd.Spec.BaselinePercent == nil || *spd.Spec.BaselinePercent >= consts.SPDBaselinePercentMax || *spd.Spec.BaselinePercent <= consts.SPDBaselinePercentMin {
// delete baseline sentinel annotation if baseline percent or extended indicator not set
if spd.Spec.BaselinePercent == nil && len(spd.Spec.ExtendedIndicator) == 0 {
util.SetSPDBaselineSentinel(spd, nil)
util.SetSPDExtendedBaselineSentinel(spd, nil)
return nil
}

podMeta, err := sc.calculateBaselineSentinel(spd)
podMetaList, err := sc.getSPDPodMetaList(spd)
if err != nil {
return err
}
util.SetSPDBaselineSentinel(spd, &podMeta)

// calculate baseline sentinel
baselineSentinel := calculateBaselineSentinel(podMetaList, spd.Spec.BaselinePercent)

// calculate extended baseline sentinel for each extended indicator
extendedBaselineSentinel := make(map[string]util.SPDBaselinePodMeta)
for _, indicator := range spd.Spec.ExtendedIndicator {
sentinel := calculateBaselineSentinel(podMetaList, indicator.BaselinePercent)
if sentinel == nil {
continue
}

extendedBaselineSentinel[indicator.Name] = *sentinel
}

util.SetSPDBaselineSentinel(spd, baselineSentinel)
util.SetSPDExtendedBaselineSentinel(spd, extendedBaselineSentinel)
return nil
}

// calculateBaselineSentinel returns the sentinel one for a list of pods
// referenced by the SPD. If one pod's createTime is less than the sentinel pod
func (sc *SPDController) calculateBaselineSentinel(spd *v1alpha1.ServiceProfileDescriptor) (util.SPDBaselinePodMeta, error) {
// getSPDPodMetaList get spd pod meta list in order
func (sc *SPDController) getSPDPodMetaList(spd *v1alpha1.ServiceProfileDescriptor) ([]util.SPDBaselinePodMeta, error) {
gvr, _ := meta.UnsafeGuessKindToResource(schema.FromAPIVersionAndKind(spd.Spec.TargetRef.APIVersion, spd.Spec.TargetRef.Kind))
workloadLister, ok := sc.workloadLister[gvr]
if !ok {
return util.SPDBaselinePodMeta{}, fmt.Errorf("without workload lister for gvr %v", gvr)
return nil, fmt.Errorf("without workload lister for gvr %v", gvr)
}

podList, err := util.GetPodListForSPD(spd, sc.podIndexer, sc.conf.SPDPodLabelIndexerKeys, workloadLister, sc.podLister)
if err != nil {
return util.SPDBaselinePodMeta{}, err
return nil, err
}

podList = native.FilterPods(podList, func(pod *v1.Pod) (bool, error) {
return native.PodIsActive(pod), nil
})
if len(podList) == 0 {
return util.SPDBaselinePodMeta{}, nil
return nil, nil
}

bcList := make([]util.SPDBaselinePodMeta, 0, len(podList))
podMetaList := make([]util.SPDBaselinePodMeta, 0, len(podList))
for _, p := range podList {
bcList = append(bcList, util.GetPodMeta(p))
podMetaList = append(podMetaList, util.GetPodMeta(p))
}
sort.SliceStable(bcList, func(i, j int) bool {
return bcList[i].Cmp(bcList[j]) < 0
sort.SliceStable(podMetaList, func(i, j int) bool {
return podMetaList[i].Cmp(podMetaList[j]) < 0
})
baselineIndex := int(math.Floor(float64(len(bcList)-1) * float64(*spd.Spec.BaselinePercent) / 100))
return bcList[baselineIndex], nil

return podMetaList, nil
}

// calculateBaselineSentinel returns the sentinel one for a list of pods
// referenced by the SPD. If one pod's createTime is less than the sentinel pod
func calculateBaselineSentinel(podMetaList []util.SPDBaselinePodMeta, baselinePercent *int32) *util.SPDBaselinePodMeta {
if baselinePercent == nil || *baselinePercent >= consts.SPDBaselinePercentMax ||
*baselinePercent <= consts.SPDBaselinePercentMin {
return nil
}

if len(podMetaList) == 0 {
return nil
}

baselineIndex := int(math.Floor(float64(len(podMetaList)-1) * float64(*baselinePercent) / 100))
return &podMetaList[baselineIndex]
}
24 changes: 20 additions & 4 deletions pkg/controller/spd/spd_baseline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "spd1",
Annotations: map[string]string{
consts.SPDAnnotationBaselineSentinelKey: util.SPDBaselinePodMeta{}.String(),
},
},
Spec: apiworkload.ServiceProfileDescriptorSpec{
TargetRef: apis.CrossVersionObjectReference{
Expand Down Expand Up @@ -288,6 +285,15 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
APIVersion: stsGVK.GroupVersion().String(),
},
BaselinePercent: pointer.Int32(50),
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{
{
Name: "TestExtended",
BaselinePercent: pointer.Int32(50),
Indicators: runtime.RawExtension{
Object: &apiworkload.TestExtendedIndicators{},
},
},
},
},
Status: apiworkload.ServiceProfileDescriptorStatus{},
},
Expand All @@ -297,7 +303,8 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
Namespace: "default",
Name: "spd1",
Annotations: map[string]string{
consts.SPDAnnotationBaselineSentinelKey: "{\"timeStamp\":\"2023-08-01T00:00:01Z\",\"podName\":\"pod2\"}",
consts.SPDAnnotationBaselineSentinelKey: "{\"timeStamp\":\"2023-08-01T00:00:01Z\",\"podName\":\"pod2\"}",
consts.SPDAnnotationExtendedBaselineSentinelKey: "{\"TestExtended\":{\"timeStamp\":\"2023-08-01T00:00:01Z\",\"podName\":\"pod2\"}}",
},
},
Spec: apiworkload.ServiceProfileDescriptorSpec{
Expand All @@ -307,6 +314,15 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
APIVersion: stsGVK.GroupVersion().String(),
},
BaselinePercent: pointer.Int32(50),
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{
{
Name: "TestExtended",
BaselinePercent: pointer.Int32(50),
Indicators: runtime.RawExtension{
Object: &apiworkload.TestExtendedIndicators{},
},
},
},
},
Status: apiworkload.ServiceProfileDescriptorStatus{},
},
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/spd/spd_indicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func (sc *SPDController) mergeIndicatorSpec(spd *apiworkload.ServiceProfileDescr
for _, indicator := range expected.SystemIndicator {
util.InsertSPDSystemIndicatorSpec(&spd.Spec, &indicator)
}
for _, indicator := range expected.ExtendedIndicator {
util.InsertSPDExtendedIndicatorSpec(&spd.Spec, &indicator)
}

for i := 0; i < len(spd.Spec.BusinessIndicator); i++ {
if _, ok := sc.indicatorsSpecBusiness[spd.Spec.BusinessIndicator[i].Name]; !ok {
Expand All @@ -200,6 +203,13 @@ func (sc *SPDController) mergeIndicatorSpec(spd *apiworkload.ServiceProfileDescr
spd.Spec.SystemIndicator = append(spd.Spec.SystemIndicator[:i], spd.Spec.SystemIndicator[i+1:]...)
}
}

for i := 0; i < len(spd.Spec.ExtendedIndicator); i++ {
if _, ok := sc.indicatorsSpecExtended[spd.Spec.ExtendedIndicator[i].Name]; !ok {
klog.Infof("skip spec extended %v for spd %v", spd.Spec.ExtendedIndicator[i].Name, spd.Name)
spd.Spec.ExtendedIndicator = append(spd.Spec.ExtendedIndicator[:i], spd.Spec.ExtendedIndicator[i+1:]...)
}
}
}

func (sc *SPDController) mergeIndicatorStatus(spd *apiworkload.ServiceProfileDescriptor, expected apiworkload.ServiceProfileDescriptorStatus) {
Expand Down
25 changes: 25 additions & 0 deletions pkg/controller/spd/spd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,16 @@ func TestIndicatorUpdater(t *testing.T) {
APIVersion: stsGVK.GroupVersion().String(),
},
BaselinePercent: pointer.Int32(20),
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{
{
Name: "TestExtended",
Indicators: runtime.RawExtension{
Object: &apiworkload.TestExtendedIndicators{
Indicators: &apiworkload.TestIndicators{},
},
},
},
},
BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{
{
Name: "business-1",
Expand Down Expand Up @@ -614,6 +624,9 @@ func TestIndicatorUpdater(t *testing.T) {
}

d1 := indicator_plugin.DummyIndicatorPlugin{
ExtendedSpecNames: []string{
"TestExtended",
},
SystemSpecNames: []apiworkload.ServiceSystemIndicatorName{
"system-1",
},
Expand Down Expand Up @@ -671,6 +684,17 @@ func TestIndicatorUpdater(t *testing.T) {
synced := cache.WaitForCacheSync(ctx.Done(), sc.syncedFunc...)
assert.True(t, synced)

sc.indicatorManager.UpdateExtendedIndicatorSpec(nn, []apiworkload.ServiceExtendedIndicatorSpec{
{
Name: "TestExtended",
Indicators: runtime.RawExtension{
Object: &apiworkload.TestExtendedIndicators{
Indicators: &apiworkload.TestIndicators{},
},
},
},
})

sc.indicatorManager.UpdateBusinessIndicatorSpec(nn, []apiworkload.ServiceBusinessIndicatorSpec{
{
Name: "business-1",
Expand Down Expand Up @@ -753,6 +777,7 @@ func TestIndicatorUpdater(t *testing.T) {
newSPD, err := controlCtx.Client.InternalClient.WorkloadV1alpha1().
ServiceProfileDescriptors("default").Get(ctx, "spd1", metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, expectedSpd.Spec.ExtendedIndicator, newSPD.Spec.ExtendedIndicator)
assert.Equal(t, expectedSpd.Spec.BusinessIndicator, newSPD.Spec.BusinessIndicator)
assert.Equal(t, expectedSpd.Spec.SystemIndicator, newSPD.Spec.SystemIndicator)
assert.Equal(t, expectedSpd.Status.BusinessStatus, newSPD.Status.BusinessStatus)
Expand Down
Loading

0 comments on commit 5d62ca7

Please sign in to comment.