Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abandon recommendation missions #803

Merged
merged 5 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controller/analytics/analytics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,10 @@ func ConvertToRecommendationRule(analytics *analysisv1alph1.Analytics) *analysis
recommendationRule.Spec.RunInterval = (time.Duration(*analytics.Spec.CompletionStrategy.PeriodSeconds) * time.Second).String()
}

recommendationRule.Status = analysisv1alph1.RecommendationRuleStatus{
/*recommendationRule.Status = analysisv1alph1.RecommendationRuleStatus{
LastUpdateTime: analytics.Status.LastUpdateTime,
Recommendations: analytics.Status.Recommendations,
}
}*/

return recommendationRule
}
Expand Down
260 changes: 134 additions & 126 deletions pkg/controller/recommendation/recommendation_rule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package recommendation
import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -121,40 +121,6 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen
return false
}

timeNow := metav1.Now()

// if the first mission start time is last round, reset currMissions here
currMissions := newStatus.Recommendations
if currMissions != nil && len(currMissions) > 0 {
firstMissionStartTime := currMissions[0].LastStartTime
if firstMissionStartTime.IsZero() {
currMissions = nil
} else {
planingTime := firstMissionStartTime.Add(interval)
if time.Now().After(planingTime) {
currMissions = nil // reset missions to trigger creation for missions
}
}
}

if currMissions == nil {
// create recommendation rule missions for this round
// every recommendation rule have multi recommender for one identity
for _, id := range identities {
for _, recommender := range recommendationRule.Spec.Recommenders {
currMissions = append(currMissions, analysisv1alph1.RecommendationMission{
TargetRef: id.GetObjectReference(),
RecommenderRef: analysisv1alph1.Recommender{
Name: recommender.Name,
},
})
}
}

// +1 for runNumber
newStatus.RunNumber = newStatus.RunNumber + 1
}

var currRecommendations analysisv1alph1.RecommendationList
opts := []client.ListOption{
client.MatchingLabels(map[string]string{known.RecommendationRuleUidLabel: string(recommendationRule.UID)}),
Expand All @@ -174,20 +140,50 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen
return false
}

if klog.V(6).Enabled() {
// Print identities
for k, id := range identities {
klog.V(6).InfoS("identities", "RecommendationRule", klog.KObj(recommendationRule), "key", k, "apiVersion", id.APIVersion, "kind", id.Kind, "namespace", id.Namespace, "name", id.Name)
var identitiesArray []ObjectIdentity
keys := make([]string, 0, len(identities))
for k := range identities {
keys = append(keys, k)
}
sort.Strings(keys) // sort key to get a certain order
for _, key := range keys {
id := identities[key]
id.Recommendation = GetRecommendationFromIdentity(identities[key], currRecommendations)
identitiesArray = append(identitiesArray, id)
}

timeNow := metav1.Now()
newRound := false
if len(identitiesArray) > 0 {
firstRecommendation := identitiesArray[0].Recommendation
firstMissionStartTime, err := utils.GetLastStartTime(firstRecommendation)
if err != nil {
newRound = true
} else {
planingTime := firstMissionStartTime.Add(interval)
now := utils.NowUTC()
if now.After(planingTime) {
newRound = true
}
}
}

if newRound {
// +1 for runNumber
newStatus.RunNumber = newStatus.RunNumber + 1
}

maxConcurrency := 10
executionIndex := -1
var concurrency int
for index, mission := range currMissions {
if mission.LastStartTime != nil {
continue
for index, identity := range identitiesArray {
if identity.Recommendation != nil {
runNumber, _ := utils.GetRunNumber(identity.Recommendation)
if runNumber >= newStatus.RunNumber {
continue
}
}

if executionIndex == -1 {
executionIndex = index
}
Expand All @@ -198,22 +194,17 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen

wg := sync.WaitGroup{}
wg.Add(concurrency)
for index := executionIndex; index < len(currMissions) && index < concurrency+executionIndex; index++ {
var existingRecommendation *analysisv1alph1.Recommendation
for _, r := range currRecommendations.Items {
if reflect.DeepEqual(currMissions[index].TargetRef, r.Spec.TargetRef) && string(r.Spec.Type) == currMissions[index].RecommenderRef.Name {
existingRecommendation = &r
break
}
for index := executionIndex; index < len(identitiesArray) && index < concurrency+executionIndex; index++ {
if klog.V(6).Enabled() {
klog.V(6).InfoS("execute identities", "RecommendationRule", klog.KObj(recommendationRule), "target", identitiesArray[index].GetObjectReference())
}

go executeMission(ctx, &wg, c.RecommenderMgr, c.Provider, c.PredictorMgr, recommendationRule, identities, &currMissions[index], existingRecommendation, c.Client, c.ScaleClient, c.OOMRecorder, timeNow, newStatus.RunNumber)
go executeIdentity(ctx, &wg, c.RecommenderMgr, c.Provider, c.PredictorMgr, recommendationRule, identitiesArray[index], c.Client, c.ScaleClient, c.OOMRecorder, timeNow, newStatus.RunNumber)
}

wg.Wait()

finished := false
if executionIndex+concurrency == len(currMissions) || len(currMissions) == 0 {
if executionIndex+concurrency == len(identitiesArray) || len(identitiesArray) == 0 {
finished = true
}

Expand All @@ -222,8 +213,8 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen
// clean orphan recommendations
for _, recommendation := range currRecommendations.Items {
exist := false
for _, mission := range currMissions {
if recommendation.UID == mission.UID {
for _, id := range identitiesArray {
if recommendation.UID == id.Recommendation.UID {
exist = true
break
}
Expand All @@ -241,8 +232,6 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen

}

newStatus.Recommendations = currMissions

updateRecommendationRuleStatus(ctx, c.Client, c.Recorder, recommendationRule, newStatus)
return finished
}
Expand Down Expand Up @@ -303,15 +292,18 @@ func (c *RecommendationRuleController) getIdentities(ctx context.Context, recomm
}

for i := range filterdUnstructureds {
k := objRefKey(rs.Kind, rs.APIVersion, filterdUnstructureds[i].GetNamespace(), filterdUnstructureds[i].GetName())
if _, exists := identities[k]; !exists {
identities[k] = ObjectIdentity{
Namespace: filterdUnstructureds[i].GetNamespace(),
Name: filterdUnstructureds[i].GetName(),
Kind: rs.Kind,
APIVersion: rs.APIVersion,
Labels: filterdUnstructureds[i].GetLabels(),
Object: filterdUnstructureds[i],
for _, recommender := range recommendationRule.Spec.Recommenders {
k := objRefKey(rs.Kind, rs.APIVersion, filterdUnstructureds[i].GetNamespace(), filterdUnstructureds[i].GetName(), recommender.Name)
if _, exists := identities[k]; !exists {
identities[k] = ObjectIdentity{
Namespace: filterdUnstructureds[i].GetNamespace(),
Name: filterdUnstructureds[i].GetName(),
Kind: rs.Kind,
APIVersion: rs.APIVersion,
Labels: filterdUnstructureds[i].GetLabels(),
Object: filterdUnstructureds[i],
Recommender: recommender.Name,
}
}
}
}
Expand Down Expand Up @@ -351,12 +343,14 @@ func updateRecommendationRuleStatus(ctx context.Context, c client.Client, record
}

type ObjectIdentity struct {
Namespace string
APIVersion string
Kind string
Name string
Labels map[string]string
Object unstructuredv1.Unstructured
Namespace string
APIVersion string
Kind string
Name string
Labels map[string]string
Recommender string
Object unstructuredv1.Unstructured
Recommendation *analysisv1alph1.Recommendation
}

func (id ObjectIdentity) GetObjectReference() corev1.ObjectReference {
Expand All @@ -375,8 +369,22 @@ func newOwnerRef(a *analysisv1alph1.RecommendationRule) *metav1.OwnerReference {
}
}

func objRefKey(kind, apiVersion, namespace, name string) string {
return fmt.Sprintf("%s#%s#%s#%s", kind, apiVersion, namespace, name)
func objRefKey(kind, apiVersion, namespace, name, recommender string) string {
return fmt.Sprintf("%s#%s#%s#%s#%s", kind, apiVersion, namespace, name, recommender)
}

func GetRecommendationFromIdentity(id ObjectIdentity, currRecommendations analysisv1alph1.RecommendationList) *analysisv1alph1.Recommendation {
for _, r := range currRecommendations.Items {
if id.Kind == r.Spec.TargetRef.Kind &&
id.APIVersion == r.Spec.TargetRef.APIVersion &&
id.Namespace == r.Spec.TargetRef.Namespace &&
id.Name == r.Spec.TargetRef.Name &&
id.Recommender == string(r.Spec.Type) {
return &r
}
}

return nil
}

func CreateRecommendationObject(recommendationRule *analysisv1alph1.RecommendationRule,
Expand Down Expand Up @@ -404,6 +412,11 @@ func CreateRecommendationObject(recommendationRule *analysisv1alph1.Recommendati
},
}

recommendation.Labels = generateRecommendationLabels(recommendationRule, target, id, recommenderName)
return recommendation
}

func generateRecommendationLabels(recommendationRule *analysisv1alph1.RecommendationRule, target corev1.ObjectReference, id ObjectIdentity, recommenderName string) map[string]string {
labels := map[string]string{}
labels[known.RecommendationRuleNameLabel] = recommendationRule.Name
labels[known.RecommendationRuleUidLabel] = string(recommendationRule.UID)
Expand All @@ -414,87 +427,82 @@ func CreateRecommendationObject(recommendationRule *analysisv1alph1.Recommendati
labels[known.RecommendationRuleTargetKindLabel] = target.Kind
labels[known.RecommendationRuleTargetVersionLabel] = target.GroupVersionKind().Version
labels[known.RecommendationRuleTargetNameLabel] = target.Name
labels[known.RecommendationRuleTargetNamespaceLabel] = target.Namespace
for k, v := range id.Labels {
labels[k] = v
}

recommendation.Labels = labels
return recommendation
return labels
}

func executeMission(ctx context.Context, wg *sync.WaitGroup, recommenderMgr recommender.RecommenderManager, provider providers.History, predictorMgr predictormgr.Manager,
recommendationRule *analysisv1alph1.RecommendationRule, identities map[string]ObjectIdentity, mission *analysisv1alph1.RecommendationMission,
existingRecommendation *analysisv1alph1.Recommendation, client client.Client, scaleClient scale.ScalesGetter, oomRecorder oom.Recorder, timeNow metav1.Time, currentRunNumber int32) {
func executeIdentity(ctx context.Context, wg *sync.WaitGroup, recommenderMgr recommender.RecommenderManager, provider providers.History, predictorMgr predictormgr.Manager,
recommendationRule *analysisv1alph1.RecommendationRule, id ObjectIdentity, client client.Client, scaleClient scale.ScalesGetter, oomRecorder oom.Recorder, timeNow metav1.Time, currentRunNumber int32) {
defer func() {
mission.LastStartTime = &timeNow
klog.Infof("Mission message: %s", mission.Message)
if wg != nil {
wg.Done()
}
}()
var message string

k := objRefKey(mission.TargetRef.Kind, mission.TargetRef.APIVersion, mission.TargetRef.Namespace, mission.TargetRef.Name)
if id, exist := identities[k]; !exist {
mission.Message = fmt.Sprintf("Failed to get identity, key %s. ", k)
return
recommendation := id.Recommendation
if recommendation == nil {
recommendation = CreateRecommendationObject(recommendationRule, id.GetObjectReference(), id, id.Recommender)
} else {
recommendation := existingRecommendation
if recommendation == nil {
recommendation = CreateRecommendationObject(recommendationRule, mission.TargetRef, id, mission.RecommenderRef.Name)
// update existing recommendation's labels
for k, v := range generateRecommendationLabels(recommendationRule, id.GetObjectReference(), id, id.Recommender) {
recommendation.Labels[k] = v
}
}

r, err := recommenderMgr.GetRecommenderWithRule(mission.RecommenderRef.Name, *recommendationRule)
if err != nil {
mission.Message = fmt.Sprintf("get recommender %s failed, %v", mission.RecommenderRef.Name, err)
return
}
r, err := recommenderMgr.GetRecommenderWithRule(id.Recommender, *recommendationRule)
if err != nil {
message = fmt.Sprintf("get recommender %s failed, %v", id.Recommender, err)
} else {
p := make(map[providers.DataSourceType]providers.History)
p[providers.PrometheusDataSource] = provider
identity := framework.ObjectIdentity{
Namespace: identities[k].Namespace,
Name: identities[k].Name,
Kind: identities[k].Kind,
APIVersion: identities[k].APIVersion,
Labels: identities[k].Labels,
Object: identities[k].Object,
Namespace: id.Namespace,
Name: id.Name,
Kind: id.Kind,
APIVersion: id.APIVersion,
Labels: id.Labels,
Object: id.Object,
}
recommendationContext := framework.NewRecommendationContext(ctx, identity, recommendationRule, predictorMgr, p, recommendation, client, scaleClient, oomRecorder)
err = recommender.Run(&recommendationContext, r)
if err != nil {
mission.Message = fmt.Sprintf("Failed to run recommendation flow in recommender %s: %s", r.Name(), err.Error())
return
}

recommendation.Status.LastUpdateTime = &timeNow
if recommendation.Annotations == nil {
recommendation.Annotations = map[string]string{}
message = fmt.Sprintf("Failed to run recommendation flow in recommender %s: %s", r.Name(), err.Error())
}
recommendation.Annotations[known.RunNumberAnnotation] = strconv.Itoa(int(currentRunNumber))
}

if existingRecommendation != nil {
klog.Infof("Update recommendation %s", klog.KObj(recommendation))
if err := client.Update(ctx, recommendation); err != nil {
mission.Message = fmt.Sprintf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err)
return
}
if len(message) == 0 {
message = "Success"
}

klog.Infof("Successfully to update Recommendation %s", klog.KObj(recommendation))
} else {
klog.Infof("Create recommendation %s", klog.KObj(recommendation))
if err := client.Create(ctx, recommendation); err != nil {
mission.Message = fmt.Sprintf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err)
return
}
recommendation.Status.LastUpdateTime = &timeNow
if recommendation.Annotations == nil {
recommendation.Annotations = map[string]string{}
}
recommendation.Annotations[known.RunNumberAnnotation] = strconv.Itoa(int(currentRunNumber))
recommendation.Annotations[known.MessageAnnotation] = message
utils.SetLastStartTime(recommendation)

if id.Recommendation != nil {
klog.Infof("Update recommendation %s", klog.KObj(recommendation))
if err := client.Update(ctx, recommendation); err != nil {
klog.Errorf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err)
return
}

klog.Infof("Successfully to create Recommendation %s", klog.KObj(recommendation))
klog.Infof("Successfully to update Recommendation %s", klog.KObj(recommendation))
} else {
klog.Infof("Create recommendation %s", klog.KObj(recommendation))
if err := client.Create(ctx, recommendation); err != nil {
klog.Errorf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err)
return
}

mission.Message = "Success"
mission.UID = recommendation.UID
mission.Name = recommendation.Name
mission.Namespace = recommendation.Namespace
mission.Kind = recommendation.Kind
mission.APIVersion = recommendation.APIVersion
klog.Infof("Successfully to create Recommendation %s", klog.KObj(recommendation))
}
}

Expand Down
Loading