Skip to content

Commit

Permalink
status
Browse files Browse the repository at this point in the history
  • Loading branch information
jiuker committed Mar 19, 2024
1 parent 7fde11f commit 919e314
Showing 1 changed file with 66 additions and 1 deletion.
67 changes: 66 additions & 1 deletion pkg/controller/job-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ const (
minioJobCRName = "job.min.io/job-cr-name"
// DefaultMCImage - job mc image
DefaultMCImage = "minio/mc:latest"
// MinioJobPhaseError - error
MinioJobPhaseError = "Error"
// MinioJobPhaseSuccess - success
MinioJobPhaseSuccess = "Success"
// MinioJobPhaseRunning - running
MinioJobPhaseRunning = "Running"
// MinioJobPhaseFailed - failed
MinioJobPhaseFailed = "Failed"
)

var fileRegexp = regexp.MustCompile(`(\[file\([^)]+\)\])`)
Expand Down Expand Up @@ -232,6 +240,12 @@ func (c *JobController) SyncHandler(key string) (Result, error) {
}
return WrapResult(Result{}, err)
}
// if job is success, do nothing
if jobCR.Status.Phase == MinioJobPhaseSuccess {
// delete the job status
globalIntervalJobStatus.Delete(fmt.Sprintf("%s/%s", jobCR.Namespace, jobCR.Name))
return WrapResult(Result{}, nil)
}
intervalJob, err := checkMinIOJob(&jobCR)
if err != nil {
return WrapResult(Result{}, err)
Expand All @@ -245,7 +259,7 @@ func (c *JobController) SyncHandler(key string) (Result, error) {
}
err = c.k8sClient.Get(ctx, client.ObjectKeyFromObject(tenant), tenant)
if err != nil {
jobCR.Status.Phase = "Error"
jobCR.Status.Phase = MinioJobPhaseError
jobCR.Status.Message = fmt.Sprintf("Get tenant %s/%s error:%v", jobCR.Spec.TenantRef.Namespace, jobCR.Spec.TenantRef.Name, err)
err = c.updateJobStatus(ctx, &jobCR)
return WrapResult(Result{}, err)
Expand All @@ -272,6 +286,16 @@ func (c *JobController) SyncHandler(key string) (Result, error) {
return WrapResult(Result{}, fmt.Errorf("no serviceaccount found"))
}
err = intervalJob.createCommandJob(ctx, c.k8sClient)
if err != nil {
jobCR.Status.Phase = MinioJobPhaseError
jobCR.Status.Message = fmt.Sprintf("Create job error:%v", err)
err = c.updateJobStatus(ctx, &jobCR)
return WrapResult(Result{}, err)
}
// update status
status := intervalJob.getMinioJobStatus(ctx)
jobCR.Status = status
err = c.updateJobStatus(ctx, &jobCR)
return WrapResult(Result{}, err)
}

Expand Down Expand Up @@ -472,6 +496,47 @@ type MinIOIntervalJob struct {
CommandMap map[string]*MinIOIntervalJobCommand
}

func (intervalJob *MinIOIntervalJob) getMinioJobStatus(ctx context.Context) v1alpha1.MinIOJobStatus {
status := v1alpha1.MinIOJobStatus{}
failed := false
running := false
message := ""
for _, command := range intervalJob.Command {
command.Locker.RLock()
if command.Succeeded {
status.CommandsStatus = append(status.CommandsStatus, v1alpha1.CommandStatus{
Name: command.JobName,
Result: "success",
Message: command.Message,
})
} else {
status.CommandsStatus = append(status.CommandsStatus, v1alpha1.CommandStatus{
Name: command.JobName,
Result: "failed",
Message: command.Message,
})
failed = true
message = command.Message
// if success is false and message is empty, it means the job is running
if command.Message == "" {
running = true
}
}
command.Locker.RUnlock()
}
if running {
status.Phase = MinioJobPhaseRunning
} else {
if failed {
status.Phase = MinioJobPhaseFailed
status.Message = message
} else {
status.Phase = MinioJobPhaseSuccess
}
}
return status
}

func (intervalJob *MinIOIntervalJob) createCommandJob(ctx context.Context, k8sClient client.Client) error {
for _, command := range intervalJob.Command {
if len(command.DepnedOn) == 0 {
Expand Down

0 comments on commit 919e314

Please sign in to comment.