Skip to content

Commit

Permalink
Merge pull request #84 from furiko-io/irvinlim/feat/kill-job-cli
Browse files Browse the repository at this point in the history
  • Loading branch information
irvinlim authored May 29, 2022
2 parents 8320441 + e245a0f commit 9e17087
Show file tree
Hide file tree
Showing 12 changed files with 387 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/cli/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewRootCommand(streams *streams.Streams) *cobra.Command {
cmd.AddCommand(NewGetCommand(streams))
cmd.AddCommand(NewListCommand(streams))
cmd.AddCommand(NewRunCommand(streams))
cmd.AddCommand(NewKillCommand(streams))

return cmd
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/cmd/cmd_get_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
execution "github.com/furiko-io/furiko/apis/execution/v1alpha1"
"github.com/furiko-io/furiko/pkg/cli/formatter"
"github.com/furiko-io/furiko/pkg/cli/printer"
streams2 "github.com/furiko-io/furiko/pkg/cli/streams"
"github.com/furiko-io/furiko/pkg/cli/streams"
stringsutils "github.com/furiko-io/furiko/pkg/utils/strings"
)

Expand All @@ -43,10 +43,10 @@ var (
)

type GetJobCommand struct {
streams *streams2.Streams
streams *streams.Streams
}

func NewGetJobCommand(streams *streams2.Streams) *cobra.Command {
func NewGetJobCommand(streams *streams.Streams) *cobra.Command {
c := &GetJobCommand{
streams: streams,
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/cmd/cmd_get_jobconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
execution "github.com/furiko-io/furiko/apis/execution/v1alpha1"
"github.com/furiko-io/furiko/pkg/cli/formatter"
"github.com/furiko-io/furiko/pkg/cli/printer"
streams2 "github.com/furiko-io/furiko/pkg/cli/streams"
"github.com/furiko-io/furiko/pkg/cli/streams"
"github.com/furiko-io/furiko/pkg/config"
"github.com/furiko-io/furiko/pkg/execution/util/cronparser"
)
Expand All @@ -49,10 +49,10 @@ var (
)

type GetJobConfigCommand struct {
streams *streams2.Streams
streams *streams.Streams
}

func NewGetJobConfigCommand(streams *streams2.Streams) *cobra.Command {
func NewGetJobConfigCommand(streams *streams.Streams) *cobra.Command {
c := &GetJobConfigCommand{
streams: streams,
}
Expand Down
142 changes: 142 additions & 0 deletions pkg/cli/cmd/cmd_kill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2022 The Furiko Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmd

import (
"fmt"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/furiko-io/furiko/pkg/cli/formatter"
"github.com/furiko-io/furiko/pkg/cli/streams"
"github.com/furiko-io/furiko/pkg/utils/ktime"
)

var (
KillExample = PrepareExample(`
# Request to kill an ongoing Job.
{{.CommandName}} kill job-sample-1653825000
# Request for an ongoing Job to be killed 60 seconds from now.
{{.CommandName}} kill job-sample-1653825000 -p 60s
# Request for an ongoing Job to be killed at a specific time in the future.
{{.CommandName}} kill job-sample-1653825000 -t 2023-01-01T00:00:00Z`)
)

type KillCommand struct {
streams *streams.Streams
name string
override bool
killAt string
killAfter time.Duration
}

func NewKillCommand(streams *streams.Streams) *cobra.Command {
c := &KillCommand{
streams: streams,
}

cmd := &cobra.Command{
Use: "kill",
Short: "Kill an ongoing Job.",
Long: `Kills an ongoing Job that is currently running or pending.`,
Example: KillExample,
Args: cobra.ExactArgs(1),
PreRunE: PrerunWithKubeconfig,
RunE: c.Run,
}

cmd.Flags().BoolVar(&c.override, "override", false,
"If the Job already has a kill timestamp, specifying this flag allows overriding the previous value.")
cmd.Flags().StringVarP(&c.killAt, "at", "t", "",
"Specify an explicit timestamp to kill the job at, in RFC3339 format.")
cmd.Flags().DurationVarP(&c.killAfter, "after", "p", 0,
"Specify a duration relative to the current time that the job should be killed.")

return cmd
}

func (c *KillCommand) Run(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
client := ctrlContext.Clientsets().Furiko().ExecutionV1alpha1()
namespace, err := GetNamespace(cmd)
if err != nil {
return err
}

if len(args) == 0 {
return errors.New("job name must be specified")
}
name := args[0]

// Validate flags.
if c.killAt != "" && c.killAfter != 0 {
return errors.Wrapf(err, "cannot only specify at most one of: --at, --after")
}
if c.killAfter < 0 {
return fmt.Errorf("must be a positive duration: %v", c.killAfter)
}

killAt := ktime.Now()
if len(c.killAt) > 0 {
parsed, err := time.Parse(time.RFC3339, c.killAt)
if err != nil {
return errors.Wrapf(err, "cannot parse kill timestamp: %v", c.killAt)
}
mt := metav1.NewTime(parsed)
killAt = &mt
}
if c.killAfter > 0 {
mt := metav1.NewTime(ktime.Now().Add(c.killAfter))
killAt = &mt
}

job, err := client.Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "cannot get job")
}

// Cannot kill if a kill timestamp is already set, unless --override is specified.
if !job.Spec.KillTimestamp.IsZero() && !c.override {
return fmt.Errorf("job already has a kill timestamp set to %v, use --override to override previous value",
job.Spec.KillTimestamp)
}

newJob := job.DeepCopy()
newJob.Spec.KillTimestamp = killAt

// Kill the job.
updatedJob, err := client.Jobs(namespace).Update(ctx, newJob, metav1.UpdateOptions{})
if err != nil {
return errors.Wrapf(err, "cannot update job")
}
klog.V(1).InfoS("updated job", "namespace", updatedJob.Namespace, "name", updatedJob.Name)

key, err := cache.MetaNamespaceKeyFunc(updatedJob)
if err != nil {
return errors.Wrapf(err, "key func error")
}

c.streams.Printf("Requested for job %v to be killed at %v\n", key, formatter.FormatTime(killAt))
return nil
}
197 changes: 197 additions & 0 deletions pkg/cli/cmd/cmd_kill_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright 2022 The Furiko Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmd_test

import (
"regexp"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

execution "github.com/furiko-io/furiko/apis/execution/v1alpha1"
"github.com/furiko-io/furiko/pkg/cli/cmd"
"github.com/furiko-io/furiko/pkg/cli/formatter"
runtimetesting "github.com/furiko-io/furiko/pkg/runtime/testing"
"github.com/furiko-io/furiko/pkg/utils/testutils"
)

const (
killTime = "2021-02-09T04:04:04Z"
killTime2 = "2021-02-09T04:04:44Z"
)

var (
runningJob = &execution.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "running-job",
Namespace: DefaultNamespace,
},
Spec: execution.JobSpec{},
Status: execution.JobStatus{
Phase: execution.JobRunning,
Condition: execution.JobCondition{
Running: &execution.JobConditionRunning{
LatestCreationTimestamp: testutils.Mkmtime(taskCreateTime),
LatestRunningTimestamp: testutils.Mkmtime(taskLaunchTime),
},
},
StartTime: testutils.Mkmtimep(startTime),
CreatedTasks: 1,
},
}

runningJobKilled = &execution.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "running-job",
Namespace: DefaultNamespace,
},
Spec: execution.JobSpec{
KillTimestamp: testutils.Mkmtimep(killTime),
},
Status: execution.JobStatus{
Phase: execution.JobRunning,
Condition: execution.JobCondition{
Running: &execution.JobConditionRunning{
LatestCreationTimestamp: testutils.Mkmtime(taskCreateTime),
LatestRunningTimestamp: testutils.Mkmtime(taskLaunchTime),
},
},
StartTime: testutils.Mkmtimep(startTime),
CreatedTasks: 1,
},
}

runningJobKilledNewTimestamp = &execution.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "running-job",
Namespace: DefaultNamespace,
},
Spec: execution.JobSpec{
KillTimestamp: testutils.Mkmtimep(killTime2),
},
Status: execution.JobStatus{
Phase: execution.JobRunning,
Condition: execution.JobCondition{
Running: &execution.JobConditionRunning{
LatestCreationTimestamp: testutils.Mkmtime(taskCreateTime),
LatestRunningTimestamp: testutils.Mkmtime(taskLaunchTime),
},
},
StartTime: testutils.Mkmtimep(startTime),
CreatedTasks: 1,
},
}
)

func TestKillCommand(t *testing.T) {
runtimetesting.RunCommandTests(t, []runtimetesting.CommandTest{
{
Name: "display help",
Args: []string{"kill", "--help"},
Stdout: runtimetesting.Output{
Contains: cmd.KillExample,
},
},
{
Name: "need an argument",
Args: []string{"kill"},
WantError: assert.Error,
},
{
Name: "job does not exist",
Args: []string{"run", "running-job"},
WantError: runtimetesting.AssertErrorIsNotFound(),
},
{
Name: "kill running job",
Now: testutils.Mktime(killTime),
Args: []string{"kill", "running-job"},
Fixtures: []runtime.Object{runningJob},
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobAction(DefaultNamespace, runningJobKilled),
},
},
},
Stdout: runtimetesting.Output{
Matches: regexp.MustCompile(`^Requested for job [^\s]+ to be killed`),
Contains: formatter.FormatTime(testutils.Mkmtimep(killTime)),
},
},
{
Name: "already killed previously",
Now: testutils.Mktime(killTime),
Args: []string{"kill", "running-job"},
Fixtures: []runtime.Object{runningJobKilled},
WantError: assert.Error,
},
{
Name: "kill running job with override",
Now: testutils.Mktime(killTime2),
Args: []string{"kill", "running-job", "--override"},
Fixtures: []runtime.Object{runningJobKilled},
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobAction(DefaultNamespace, runningJobKilledNewTimestamp),
},
},
},
Stdout: runtimetesting.Output{
Matches: regexp.MustCompile(`^Requested for job [^\s]+ to be killed`),
Contains: formatter.FormatTime(testutils.Mkmtimep(killTime2)),
},
},
{
Name: "kill running job at specific time",
Now: testutils.Mktime(killTime),
Args: []string{"kill", "running-job", "--at", killTime2},
Fixtures: []runtime.Object{runningJob},
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobAction(DefaultNamespace, runningJobKilledNewTimestamp),
},
},
},
Stdout: runtimetesting.Output{
Matches: regexp.MustCompile(`^Requested for job [^\s]+ to be killed`),
Contains: formatter.FormatTime(testutils.Mkmtimep(killTime2)),
},
},
{
Name: "kill running job after certain duration",
Now: testutils.Mktime(killTime),
Args: []string{"kill", "running-job", "--after", "40s"},
Fixtures: []runtime.Object{runningJob},
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobAction(DefaultNamespace, runningJobKilledNewTimestamp),
},
},
},
Stdout: runtimetesting.Output{
Matches: regexp.MustCompile(`^Requested for job [^\s]+ to be killed`),
Contains: formatter.FormatTime(testutils.Mkmtimep(killTime2)),
},
},
})
}
Loading

0 comments on commit 9e17087

Please sign in to comment.