Skip to content

Commit

Permalink
Merge pull request #81 from volcano-sh/exit-code
Browse files Browse the repository at this point in the history
implement error code handling
  • Loading branch information
Klaus Ma authored Apr 18, 2019
2 parents 90ae67f + 61e319c commit ae4a34f
Show file tree
Hide file tree
Showing 25 changed files with 1,429 additions and 23 deletions.
18 changes: 17 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ required = [
name = "k8s.io/code-generator"
unused-packages = false


[[constraint]]
name = "github.com/hashicorp/go-multierror"
version = "1.0.0"
45 changes: 34 additions & 11 deletions pkg/admission/admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"

"github.com/hashicorp/go-multierror"
"k8s.io/api/admission/v1beta1"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -61,27 +62,49 @@ func ToAdmissionResponse(err error) *v1beta1.AdmissionResponse {
}
}

func CheckPolicyDuplicate(policies []v1alpha1.LifecyclePolicy) (string, bool) {
policyEvents := map[v1alpha1.Event]v1alpha1.Event{}
hasDuplicate := false
var duplicateInfo string
func ValidatePolicies(policies []v1alpha1.LifecyclePolicy) error {
var err error
policyEvents := map[v1alpha1.Event]struct{}{}
exitCodes := map[int32]struct{}{}

for _, policy := range policies {
if _, found := policyEvents[policy.Event]; found {
hasDuplicate = true
duplicateInfo = fmt.Sprintf("%v", policy.Event)
if policy.Event != "" && policy.ExitCode != nil {
err = multierror.Append(err, fmt.Errorf("must not specify event and exitCode simultaneously"))
break
}

if policy.Event == "" && policy.ExitCode == nil {
err = multierror.Append(err, fmt.Errorf("either event and exitCode should be specified"))
break
}

if policy.Event != "" {
// TODO: check event is in supported Event
if _, found := policyEvents[policy.Event]; found {
err = multierror.Append(err, fmt.Errorf("duplicate event %v", policy.Event))
break
} else {
policyEvents[policy.Event] = struct{}{}
}
} else {
policyEvents[policy.Event] = policy.Event
if *policy.ExitCode == 0 {
err = multierror.Append(err, fmt.Errorf("0 is not a valid error code"))
break
}
if _, found := exitCodes[*policy.ExitCode]; found {
err = multierror.Append(err, fmt.Errorf("duplicate exitCode %v", *policy.ExitCode))
break
} else {
exitCodes[*policy.ExitCode] = struct{}{}
}
}
}

if _, found := policyEvents[v1alpha1.AnyEvent]; found && len(policyEvents) > 1 {
hasDuplicate = true
duplicateInfo = "if there's * here, no other policy should be here"
err = multierror.Append(err, fmt.Errorf("if there's * here, no other policy should be here"))
}

return duplicateInfo, hasDuplicate
return err
}

func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource) (v1alpha1.Job, error) {
Expand Down
14 changes: 6 additions & 8 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"reflect"
"strings"
"volcano.sh/volcano/pkg/controllers/job/plugins"

"github.com/golang/glog"

Expand All @@ -29,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/controllers/job/plugins"
)

// job admit.
Expand Down Expand Up @@ -98,22 +98,20 @@ func validateJobSpec(jobSpec v1alpha1.JobSpec, reviewResponse *v1beta1.Admission
taskNames[task.Name] = task.Name
}

//duplicate task event policies
if duplicateInfo, ok := CheckPolicyDuplicate(task.Policies); ok {
msg = msg + fmt.Sprintf(" duplicated task event policies: %s;", duplicateInfo)
if err := ValidatePolicies(task.Policies); err != nil {
msg = msg + err.Error()
}
}

if totalReplicas < jobSpec.MinAvailable {
msg = msg + " 'minAvailable' should not be greater than total replicas in tasks;"
}

//duplicate job event policies
if duplicateInfo, ok := CheckPolicyDuplicate(jobSpec.Policies); ok {
msg = msg + fmt.Sprintf(" duplicated job event policies: %s;", duplicateInfo)
if err := ValidatePolicies(jobSpec.Policies); err != nil {
msg = msg + err.Error()
}

//invalid job plugins
// invalid job plugins
if len(jobSpec.Plugins) != 0 {
for name := range jobSpec.Plugins {
if _, found := plugins.GetPluginBuilder(name); !found {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/apis/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ type Request struct {
TaskName string

Event v1alpha1.Event
ExitCode int32
Action v1alpha1.Action
JobVersion int32
}

func (r Request) String() string {
return fmt.Sprintf(
"Job: %s/%s, Task:%s, Event:%s, Action:%s, JobVersion: %d",
r.Namespace, r.JobName, r.TaskName, r.Event, r.Action, r.JobVersion)
"Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion)

}
5 changes: 5 additions & 0 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,13 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
}

event := vkbatchv1.OutOfSyncEvent
var exitCode int32
if oldPod.Status.Phase != v1.PodFailed &&
newPod.Status.Phase == v1.PodFailed {
event = vkbatchv1.PodFailedEvent
// TODO: currently only one container pod is supported by volcano
// Once multi containers pod is supported, update accordingly.
exitCode = newPod.Status.ContainerStatuses[0].State.Terminated.ExitCode
}

if oldPod.Status.Phase != v1.PodSucceeded &&
Expand All @@ -221,6 +225,7 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
TaskName: taskName,

Event: event,
ExitCode: exitCode,
JobVersion: int32(dVersion),
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
}

// 0 is not an error code, is prevented in validation admission controller
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
return policy.Action
}
}
break
}
Expand All @@ -196,6 +201,11 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
}

// 0 is not an error code, is prevented in validation admission controller
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
return policy.Action
}
}

return vkv1.SyncJobAction
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var _ = Describe("Job E2E Test: Test Admission service", func() {
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated job event policies"))
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicate event PodFailed"))
})

It("Min Available illegal", func() {
Expand Down
38 changes: 38 additions & 0 deletions test/e2e/job_error_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,42 @@ var _ = Describe("Job Error Handling", func() {

})

It("job level LifecyclePolicy, error code: 3; Action: RestartJob", func() {
By("init test context")
context := initTestContext()
defer cleanupTestContext(context)

By("create job")
var erroCode int32 = 3
job := createJob(context, &jobSpec{
name: "errorcode-restart-job",
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.RestartJobAction,
ExitCode: &erroCode,
},
},
tasks: []taskSpec{
{
name: "success",
img: defaultNginxImage,
min: 1,
rep: 1,
},
{
name: "fail",
img: defaultNginxImage,
min: 1,
rep: 1,
command: "sleep 10s && exit 3",
restartPolicy: v1.RestartPolicyNever,
},
},
})

// job phase: pending -> running -> restarting
err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting})
Expect(err).NotTo(HaveOccurred())
})

})
Loading

0 comments on commit ae4a34f

Please sign in to comment.