Skip to content

Commit

Permalink
Merge pull request #5 from lvyanru8200/main
Browse files Browse the repository at this point in the history
feat(controller): fientunejob reconciliation logic refinement
  • Loading branch information
lvyanru8200 authored Nov 29, 2023
2 parents a9dd771 + 2713656 commit 89023c0
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 270 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/DataTunerX/finetune-experiment-controller
go 1.19

require (
github.com/DataTunerX/meta-server v0.0.0-20231116102108-24bd83a6be89
github.com/DataTunerX/meta-server v0.0.0-20231128065201-7109bd13c9cb
github.com/DataTunerX/utility-server v0.0.0-20231107081331-e4ac0bbd2db2
github.com/go-logr/zapr v1.2.3
github.com/operator-framework/operator-lib v0.11.0
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataTunerX/meta-server v0.0.0-20231113032938-bf87d14956b1 h1:WxEyoS9Dlkm2Yfcpn0sL0Gz/xfXdN0fdxb/dGYAQIqQ=
github.com/DataTunerX/meta-server v0.0.0-20231113032938-bf87d14956b1/go.mod h1:MrA+U+PYANBfU8B43hrkJQ3WOIFPzUqowUO7s+KafvU=
github.com/DataTunerX/meta-server v0.0.0-20231116063244-4b1d018072c0 h1:BJ6OqFz1ROHizgQ9eNWpWSCzMEe4PFLhCloBUsLrYa0=
github.com/DataTunerX/meta-server v0.0.0-20231116063244-4b1d018072c0/go.mod h1:MrA+U+PYANBfU8B43hrkJQ3WOIFPzUqowUO7s+KafvU=
github.com/DataTunerX/meta-server v0.0.0-20231116064242-ea7bb845394f h1:ivD0gAMQ0gWtJ1/xWeUqkOce0PEO2LXWfjAAGiPwTvw=
github.com/DataTunerX/meta-server v0.0.0-20231116064242-ea7bb845394f/go.mod h1:MrA+U+PYANBfU8B43hrkJQ3WOIFPzUqowUO7s+KafvU=
github.com/DataTunerX/meta-server v0.0.0-20231116102108-24bd83a6be89 h1:czoBDPd42BBGiCREjfnaxG5BNcHk+9MnkemXAnG/bEw=
github.com/DataTunerX/meta-server v0.0.0-20231116102108-24bd83a6be89/go.mod h1:MrA+U+PYANBfU8B43hrkJQ3WOIFPzUqowUO7s+KafvU=
github.com/DataTunerX/meta-server v0.0.0-20231128065201-7109bd13c9cb h1:ADOBX2XKCgG6cmTdYt4G0rt1pvDW6gVZHfrkNum8EQw=
github.com/DataTunerX/meta-server v0.0.0-20231128065201-7109bd13c9cb/go.mod h1:MrA+U+PYANBfU8B43hrkJQ3WOIFPzUqowUO7s+KafvU=
github.com/DataTunerX/utility-server v0.0.0-20231107081331-e4ac0bbd2db2 h1:3mBAWDqYrWtDk9xvIHDG/dN5zGcliwJnyvpWHFHcC+A=
github.com/DataTunerX/utility-server v0.0.0-20231107081331-e4ac0bbd2db2/go.mod h1:qL3DYjQa7av0QkZoFrycHbpXHGQfBNEDke8uv+FdDn4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
138 changes: 97 additions & 41 deletions internal/controller/finetune/finetuneexperiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ package finetune
import (
"context"
"fmt"
"reflect"
"time"

"github.com/DataTunerX/utility-server/logging"
"k8s.io/apimachinery/pkg/types"

"github.com/DataTunerX/finetune-experiment-controller/pkg/util/handlererr"
finetunev1beta1 "github.com/DataTunerX/meta-server/api/finetune/v1beta1"
"github.com/DataTunerX/utility-server/logging"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// FinetuneExperimentReconciler reconciles a FinetuneExperiment object
Expand All @@ -50,7 +56,7 @@ func (r *FinetuneExperimentReconciler) Reconcile(ctx context.Context, req ctrl.R
finetuneExperiment := &finetunev1beta1.FinetuneExperiment{}
if err := r.Get(ctx, req.NamespacedName, finetuneExperiment); err != nil {
if errors.IsNotFound(err) {
r.Log.Errorf("FinetuneExperiment resource not found. Ignoring since object must be deleted.")
r.Log.Infof("FinetuneExperiment resource not found. Ignoring since object must be deleted.")
return handlererr.HandlerErr(nil)
}
r.Log.Errorf("Failed get finetuneExperiment: %s/%s, Err: %v", req.Name, req.Namespace, err)
Expand Down Expand Up @@ -79,6 +85,7 @@ func (r *FinetuneExperimentReconciler) Reconcile(ctx context.Context, req ctrl.R

if finetuneExperiment.Spec.Pending {
finetuneExperiment.Status.State = finetunev1beta1.FinetuneExperimentPending
finetuneExperiment.Status.Stats = metav1.Now().Format("2006-01-02 15:04:05")
if err := r.Client.Status().Update(ctx, finetuneExperiment); err != nil {
r.Log.Errorf("Update fineExperiment %s/%s status failed", finetuneExperiment.Name, finetuneExperiment.Namespace)
return handlererr.HandlerErr(err)
Expand All @@ -88,54 +95,84 @@ func (r *FinetuneExperimentReconciler) Reconcile(ctx context.Context, req ctrl.R

for i := range finetuneExperiment.Spec.FinetuneJobs {
finetuneJob := finetuneExperiment.Spec.FinetuneJobs[i]
if finetuneJob.Name == nil {
name := fmt.Sprintf("%s-%s", finetuneExperiment.Name, "finetunejob")
finetuneJob.Name = &name
}
finetuneJobInstance := &finetunev1beta1.FinetuneJob{}
finetuneJobInstance.Spec = finetuneJob.Spec
finetuneJobInstance.Name = *finetuneJob.Name
finetuneJobInstance.Namespace = finetuneExperiment.Namespace
if err := ctrl.SetControllerReference(finetuneExperiment, finetuneJobInstance, r.Scheme); err != nil {
r.Log.Errorf("SetControllerReference failed finetuneJob: %s/%s, owner finetuneExperiment: %s/%s, err: %v",
finetuneJobInstance.Name, finetuneJobInstance.Namespace, finetuneExperiment.Name, finetuneExperiment.Namespace, err)
return handlererr.HandlerErr(err)
}
if err := r.Client.Create(ctx, finetuneJobInstance); err != nil {
if !errors.IsAlreadyExists(err) {
r.Log.Errorf("Create finetuneJob %s/%s failed: %v", finetuneJobInstance.Name, finetuneJobInstance.Namespace, err)
return handlererr.HandlerErr(err)
}
if finetuneJob.Name == "" {
finetuneJob.Name = fmt.Sprintf("%s-%s-%d", finetuneExperiment.Name, "finetunejob", i+1)
finetuneExperiment.Spec.FinetuneJobs[i].Name = fmt.Sprintf("%s-%s-%d", finetuneExperiment.Name, "finetunejob", i+1)
}
existFinetuneJob := &finetunev1beta1.FinetuneJob{}
if err := r.Client.Get(ctx, types.NamespacedName{
Name: *finetuneJob.Name,
Name: finetuneJob.Name,
Namespace: finetuneExperiment.Namespace,
}, existFinetuneJob); err != nil {
r.Log.Errorf("Get finetuneJob failed: %v", err)
return handlererr.HandlerErr(err)
if errors.IsNotFound(err) {
finetuneJobInstance := &finetunev1beta1.FinetuneJob{}
finetuneJobInstance.Spec = finetuneJob.Spec
finetuneJobInstance.Name = finetuneJob.Name
r.Log.Infof("finetuneJob Name: %s", finetuneJobInstance.Name)
finetuneJobInstance.Namespace = finetuneExperiment.Namespace
if err := ctrl.SetControllerReference(finetuneExperiment, finetuneJobInstance, r.Scheme); err != nil {
r.Log.Errorf("SetControllerReference failed finetuneJob: %s/%s, owner finetuneExperiment: %s/%s, err: %v",
finetuneJobInstance.Name, finetuneJobInstance.Namespace, finetuneExperiment.Name, finetuneExperiment.Namespace, err)
return handlererr.HandlerErr(err)
}
if err := r.Client.Create(ctx, finetuneJobInstance); err != nil {
if !errors.IsAlreadyExists(err) {
r.Log.Errorf("Create finetuneJob %s/%s failed: %v", finetuneJobInstance.Name, finetuneJobInstance.Namespace, err)
return handlererr.HandlerErr(err)
}
}
} else {
r.Log.Errorf("Get finetuneJob %s/%s failed: %v", finetuneJob.Name, finetuneExperiment.Namespace, err)
return handlererr.HandlerErr(err)
}
}
alreadyExists := false
}
if finetuneExperiment.Status.State == finetunev1beta1.FinetuneExperimentProcessing {
for i := range finetuneExperiment.Spec.FinetuneJobs {
if finetuneExperiment.Spec.FinetuneJobs[i].Name == "" {
finetuneExperiment.Spec.FinetuneJobs[i].Name = fmt.Sprintf("%s-%s-%d", finetuneExperiment.Name, "finetunejob", i+1)
}
finetuneJobInstance := &finetunev1beta1.FinetuneJob{}
if err := r.Client.Get(ctx, types.NamespacedName{Name: finetuneExperiment.Spec.FinetuneJobs[i].Name, Namespace: finetuneExperiment.Namespace}, finetuneJobInstance); err != nil {
r.Log.Errorf("Get finetuneJob %s/%s failed, err: %v", finetuneExperiment.Spec.FinetuneJobs[i].Name, finetuneExperiment.Namespace, err)
return handlererr.HandlerErr(err)
}
if finetuneJobInstance.Status.FinetuneState == "" {
finetuneJobInstance.Status.State = finetunev1beta1.FinetuneJobInit
}

// Iterate over the JobsStatus to check if existFinetuneJob.Name exists
for _, jobStatus := range finetuneExperiment.Status.JobsStatus {
if jobStatus.Name == existFinetuneJob.Name {
alreadyExists = true
break
if finetuneExperiment.Status.JobsStatus == nil {
finetuneExperiment.Status.JobsStatus = make([]*finetunev1beta1.FinetuneJobStatusSetting, len(finetuneExperiment.Spec.FinetuneJobs))
}
if finetuneExperiment.Status.JobsStatus[i] != nil {
if !reflect.DeepEqual(finetuneExperiment.Status.JobsStatus[i].FinetuneJobStatus, finetuneJobInstance.Status) {
finetuneExperiment.Status.JobsStatus[i] = &finetunev1beta1.FinetuneJobStatusSetting{
Name: finetuneJobInstance.Name,
FinetuneJobStatus: finetuneJobInstance.Status,
}
}
} else {
finetuneExperiment.Status.JobsStatus[i] = &finetunev1beta1.FinetuneJobStatusSetting{
Name: finetuneJobInstance.Name,
FinetuneJobStatus: finetuneJobInstance.Status,
}
}
}
if !alreadyExists {
finetuneExperiment.Status.JobsStatus = append(finetuneExperiment.Status.JobsStatus, finetunev1beta1.FinetuneJobStatusSetting{
Name: existFinetuneJob.Name,
FinetuneJobStatus: existFinetuneJob.Status,
})
if err := r.Client.Update(ctx, finetuneExperiment); err != nil {
r.Log.Errorf("Update fineExperiment %s/%s failed", finetuneExperiment.Name, finetuneExperiment.Namespace)
return handlererr.HandlerErr(err)
}
if err := r.Client.Status().Update(ctx, finetuneExperiment); err != nil {
r.Log.Errorf("Update fineExperiment %s/%s status failed", finetuneExperiment.Name, finetuneExperiment.Namespace)
return handlererr.HandlerErr(err)
}

}
finetuneExperiment.Status.State = finetunev1beta1.FinetuneExperimentProcessing
if err := r.Client.Status().Update(ctx, finetuneExperiment); err != nil {
r.Log.Errorf("Update fineExperiment %s/%s status failed", finetuneExperiment.Name, finetuneExperiment.Namespace)
return handlererr.HandlerErr(err)
if finetuneExperiment.Status.State == "" {
finetuneExperiment.Status.State = finetunev1beta1.FinetuneExperimentProcessing
if err := r.Client.Status().Update(ctx, finetuneExperiment); err != nil {
r.Log.Errorf("Update fineExperiment %s/%s status failed", finetuneExperiment.Name, finetuneExperiment.Namespace)
return handlererr.HandlerErr(err)
}
}
return handlererr.HandlerErr(nil)
}
Expand All @@ -144,6 +181,25 @@ func (r *FinetuneExperimentReconciler) Reconcile(ctx context.Context, req ctrl.R
func (r *FinetuneExperimentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&finetunev1beta1.FinetuneExperiment{}).
Watches(&source.Kind{Type: &finetunev1beta1.FinetuneJob{}}, &handler.EnqueueRequestForOwner{
OwnerType: &finetunev1beta1.FinetuneExperiment{},
IsController: true,
}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
oldFinetuneJob := updateEvent.ObjectOld.(*finetunev1beta1.FinetuneJob)
newFinetuneJob := updateEvent.ObjectNew.(*finetunev1beta1.FinetuneJob)
if oldFinetuneJob.Status.State != newFinetuneJob.Status.State {
r.Log.Infof("Get finetuneJob %s/%s update event oldStatus: %s, newStatus: %s", oldFinetuneJob.Namespace, oldFinetuneJob.Name, oldFinetuneJob.Status.State, newFinetuneJob.Status.State)
return true
}
return false
},
CreateFunc: func(createEvent event.CreateEvent) bool {
finetuneJob := createEvent.Object.(*finetunev1beta1.FinetuneJob)
r.Log.Infof("Get finetuneJob %s/%s crate event, skip", finetuneJob.Name, finetuneJob.Namespace)
return false
},
})).
WithOptions(controller.Options{
CacheSyncTimeout: 10 * time.Second,
MaxConcurrentReconciles: 1}).
Expand Down
Loading

0 comments on commit 89023c0

Please sign in to comment.