Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement error code handling #81

Merged
merged 3 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ required = [
name = "k8s.io/code-generator"
unused-packages = false


[[constraint]]
name = "github.com/hashicorp/go-multierror"
version = "1.0.0"
45 changes: 34 additions & 11 deletions pkg/admission/admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"

"github.com/hashicorp/go-multierror"
"k8s.io/api/admission/v1beta1"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -61,27 +62,49 @@ func ToAdmissionResponse(err error) *v1beta1.AdmissionResponse {
}
}

func CheckPolicyDuplicate(policies []v1alpha1.LifecyclePolicy) (string, bool) {
policyEvents := map[v1alpha1.Event]v1alpha1.Event{}
hasDuplicate := false
var duplicateInfo string
func ValidatePolicies(policies []v1alpha1.LifecyclePolicy) error {
var err error
policyEvents := map[v1alpha1.Event]struct{}{}
exitCodes := map[int32]struct{}{}

for _, policy := range policies {
if _, found := policyEvents[policy.Event]; found {
hasDuplicate = true
duplicateInfo = fmt.Sprintf("%v", policy.Event)
if policy.Event != "" && policy.ExitCode != nil {
err = multierror.Append(err, fmt.Errorf("must not specify event and exitCode simultaneously"))
break
}

if policy.Event == "" && policy.ExitCode == nil {
err = multierror.Append(err, fmt.Errorf("either event and exitCode should be specified"))
break
}

if policy.Event != "" {
// TODO: check event is in supported Event
if _, found := policyEvents[policy.Event]; found {
err = multierror.Append(err, fmt.Errorf("duplicate event %v", policy.Event))
break
} else {
policyEvents[policy.Event] = struct{}{}
}
} else {
policyEvents[policy.Event] = policy.Event
if *policy.ExitCode == 0 {
err = multierror.Append(err, fmt.Errorf("0 is not a valid error code"))
break
}
if _, found := exitCodes[*policy.ExitCode]; found {
err = multierror.Append(err, fmt.Errorf("duplicate exitCode %v", *policy.ExitCode))
break
} else {
exitCodes[*policy.ExitCode] = struct{}{}
}
}
}

if _, found := policyEvents[v1alpha1.AnyEvent]; found && len(policyEvents) > 1 {
hasDuplicate = true
duplicateInfo = "if there's * here, no other policy should be here"
err = multierror.Append(err, fmt.Errorf("if there's * here, no other policy should be here"))
}

return duplicateInfo, hasDuplicate
return err
}

func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource) (v1alpha1.Job, error) {
Expand Down
14 changes: 6 additions & 8 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"reflect"
"strings"
"volcano.sh/volcano/pkg/controllers/job/plugins"

"github.com/golang/glog"

Expand All @@ -29,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/controllers/job/plugins"
)

// job admit.
Expand Down Expand Up @@ -98,22 +98,20 @@ func validateJobSpec(jobSpec v1alpha1.JobSpec, reviewResponse *v1beta1.Admission
taskNames[task.Name] = task.Name
}

//duplicate task event policies
if duplicateInfo, ok := CheckPolicyDuplicate(task.Policies); ok {
msg = msg + fmt.Sprintf(" duplicated task event policies: %s;", duplicateInfo)
if err := ValidatePolicies(task.Policies); err != nil {
msg = msg + err.Error()
}
}

if totalReplicas < jobSpec.MinAvailable {
msg = msg + " 'minAvailable' should not be greater than total replicas in tasks;"
}

//duplicate job event policies
if duplicateInfo, ok := CheckPolicyDuplicate(jobSpec.Policies); ok {
msg = msg + fmt.Sprintf(" duplicated job event policies: %s;", duplicateInfo)
if err := ValidatePolicies(jobSpec.Policies); err != nil {
msg = msg + err.Error()
}

//invalid job plugins
// invalid job plugins
if len(jobSpec.Plugins) != 0 {
for name := range jobSpec.Plugins {
if _, found := plugins.GetPluginBuilder(name); !found {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/apis/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ type Request struct {
TaskName string

Event v1alpha1.Event
ExitCode int32
Action v1alpha1.Action
JobVersion int32
}

func (r Request) String() string {
return fmt.Sprintf(
"Job: %s/%s, Task:%s, Event:%s, Action:%s, JobVersion: %d",
r.Namespace, r.JobName, r.TaskName, r.Event, r.Action, r.JobVersion)
"Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion)

}
5 changes: 5 additions & 0 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,13 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
}

event := vkbatchv1.OutOfSyncEvent
var exitCode int32
if oldPod.Status.Phase != v1.PodFailed &&
newPod.Status.Phase == v1.PodFailed {
event = vkbatchv1.PodFailedEvent
// TODO: currently only one container pod is supported by volcano
// Once multi containers pod is supported, update accordingly.
exitCode = newPod.Status.ContainerStatuses[0].State.Terminated.ExitCode
}

if oldPod.Status.Phase != v1.PodSucceeded &&
Expand All @@ -221,6 +225,7 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
TaskName: taskName,

Event: event,
ExitCode: exitCode,
JobVersion: int32(dVersion),
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
}

// 0 is not an error code, is prevented in validation admission controller
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
return policy.Action
}
}
break
}
Expand All @@ -196,6 +201,11 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
return policy.Action
}

// 0 is not an error code, is prevented in validation admission controller
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
return policy.Action
}
}

return vkv1.SyncJobAction
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var _ = Describe("Job E2E Test: Test Admission service", func() {
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated job event policies"))
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicate event PodFailed"))
})

It("Min Available illegal", func() {
Expand Down
38 changes: 38 additions & 0 deletions test/e2e/job_error_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,42 @@ var _ = Describe("Job Error Handling", func() {

})

It("job level LifecyclePolicy, error code: 3; Action: RestartJob", func() {
By("init test context")
context := initTestContext()
defer cleanupTestContext(context)

By("create job")
var erroCode int32 = 3
job := createJob(context, &jobSpec{
name: "errorcode-restart-job",
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.RestartJobAction,
ExitCode: &erroCode,
},
},
tasks: []taskSpec{
{
name: "success",
img: defaultNginxImage,
min: 1,
rep: 1,
},
{
name: "fail",
img: defaultNginxImage,
min: 1,
rep: 1,
command: "sleep 10s && exit 3",
restartPolicy: v1.RestartPolicyNever,
},
},
})

// job phase: pending -> running -> restarting
err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting})
Expect(err).NotTo(HaveOccurred())
})

})
Loading