Skip to content

Commit

Permalink
feat: add archive resubmit command to argo CLI. Fixes #7910 (#8166)
Browse files Browse the repository at this point in the history
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
  • Loading branch information
dpadhiar authored Mar 23, 2022
1 parent d8aa467 commit 8c77e89
Show file tree
Hide file tree
Showing 23 changed files with 1,240 additions and 936 deletions.
89 changes: 48 additions & 41 deletions cmd/argo/commands/archive/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/client"
workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func NewGetCommand() *cobra.Command {
Expand All @@ -32,49 +33,55 @@ func NewGetCommand() *cobra.Command {
errors.CheckError(err)
wf, err := serviceClient.GetArchivedWorkflow(ctx, &workflowarchivepkg.GetArchivedWorkflowRequest{Uid: uid})
errors.CheckError(err)
switch output {
case "json":
output, err := json.Marshal(wf)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(output))
case "yaml":
output, err := yaml.Marshal(wf)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(output))
default:
const fmtStr = "%-20s %v\n"
fmt.Printf(fmtStr, "Name:", wf.ObjectMeta.Name)
fmt.Printf(fmtStr, "Namespace:", wf.ObjectMeta.Namespace)
serviceAccount := wf.Spec.ServiceAccountName
if serviceAccount == "" {
// if serviceAccountName was not specified in a submitted Workflow, we will
// use the serviceAccountName provided in Workflow Defaults (if any). If that
// also isn't set, we will use the 'default' ServiceAccount in the namespace
// the workflow will run in.
serviceAccount = "unset (will run with the default ServiceAccount)"
}
fmt.Printf(fmtStr, "ServiceAccount:", serviceAccount)
fmt.Printf(fmtStr, "Status:", wf.Status.Phase)
if wf.Status.Message != "" {
fmt.Printf(fmtStr, "Message:", wf.Status.Message)
}
fmt.Printf(fmtStr, "Created:", humanize.Timestamp(wf.ObjectMeta.CreationTimestamp.Time))
if !wf.Status.StartedAt.IsZero() {
fmt.Printf(fmtStr, "Started:", humanize.Timestamp(wf.Status.StartedAt.Time))
}
if !wf.Status.FinishedAt.IsZero() {
fmt.Printf(fmtStr, "Finished:", humanize.Timestamp(wf.Status.FinishedAt.Time))
}
if !wf.Status.StartedAt.IsZero() {
fmt.Printf(fmtStr, "Duration:", humanize.RelativeDuration(wf.Status.StartedAt.Time, wf.Status.FinishedAt.Time))
}
}
printWorkflow(wf, output)
},
}
command.Flags().StringVarP(&output, "output", "o", "wide", "Output format. One of: json|yaml|wide")
return command
}

func printWorkflow(wf *wfv1.Workflow, output string) {

switch output {
case "json":
output, err := json.Marshal(wf)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(output))
case "yaml":
output, err := yaml.Marshal(wf)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(output))
default:
const fmtStr = "%-20s %v\n"
fmt.Printf(fmtStr, "Name:", wf.ObjectMeta.Name)
fmt.Printf(fmtStr, "Namespace:", wf.ObjectMeta.Namespace)
serviceAccount := wf.GetExecSpec().ServiceAccountName
if serviceAccount == "" {
// if serviceAccountName was not specified in a submitted Workflow, we will
// use the serviceAccountName provided in Workflow Defaults (if any). If that
// also isn't set, we will use the 'default' ServiceAccount in the namespace
// the workflow will run in.
serviceAccount = "unset (will run with the default ServiceAccount)"
}
fmt.Printf(fmtStr, "ServiceAccount:", serviceAccount)
fmt.Printf(fmtStr, "Status:", wf.Status.Phase)
if wf.Status.Message != "" {
fmt.Printf(fmtStr, "Message:", wf.Status.Message)
}
fmt.Printf(fmtStr, "Created:", humanize.Timestamp(wf.ObjectMeta.CreationTimestamp.Time))
if !wf.Status.StartedAt.IsZero() {
fmt.Printf(fmtStr, "Started:", humanize.Timestamp(wf.Status.StartedAt.Time))
}
if !wf.Status.FinishedAt.IsZero() {
fmt.Printf(fmtStr, "Finished:", humanize.Timestamp(wf.Status.FinishedAt.Time))
}
if !wf.Status.StartedAt.IsZero() {
fmt.Printf(fmtStr, "Duration:", humanize.RelativeDuration(wf.Status.StartedAt.Time, wf.Status.FinishedAt.Time))
}
}

}
44 changes: 27 additions & 17 deletions cmd/argo/commands/archive/list.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package archive

import (
"context"
"os"
"sort"

Expand Down Expand Up @@ -29,23 +30,8 @@ func NewListCommand() *cobra.Command {
serviceClient, err := apiClient.NewArchivedWorkflowServiceClient()
errors.CheckError(err)
namespace := client.Namespace()
listOpts := &metav1.ListOptions{
FieldSelector: "metadata.namespace=" + namespace,
LabelSelector: selector,
Limit: chunkSize,
}
var workflows wfv1.Workflows
for {
log.WithField("listOpts", listOpts).Debug()
resp, err := serviceClient.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{ListOptions: listOpts})
errors.CheckError(err)
workflows = append(workflows, resp.Items...)
if resp.Continue == "" {
break
}
listOpts.Continue = resp.Continue
}
sort.Sort(workflows)
workflows, err := listArchivedWorkflows(ctx, serviceClient, "metadata.namespace="+namespace, selector, chunkSize)
errors.CheckError(err)
err = printer.PrintWorkflows(workflows, os.Stdout, printer.PrintOpts{Output: output, Namespace: true, UID: true})
errors.CheckError(err)
},
Expand All @@ -55,3 +41,27 @@ func NewListCommand() *cobra.Command {
command.Flags().Int64VarP(&chunkSize, "chunk-size", "", 0, "Return large lists in chunks rather than all at once. Pass 0 to disable.")
return command
}

func listArchivedWorkflows(ctx context.Context, serviceClient workflowarchivepkg.ArchivedWorkflowServiceClient, fieldSelector string, labelSelector string, chunkSize int64) (wfv1.Workflows, error) {
listOpts := &metav1.ListOptions{
FieldSelector: fieldSelector,
LabelSelector: labelSelector,
Limit: chunkSize,
}
var workflows wfv1.Workflows
for {
log.WithField("listOpts", listOpts).Debug()
resp, err := serviceClient.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{ListOptions: listOpts})
if err != nil {
return nil, err
}
workflows = append(workflows, resp.Items...)
if resp.Continue == "" {
break
}
listOpts.Continue = resp.Continue
}
sort.Sort(workflows)

return workflows, nil
}
146 changes: 146 additions & 0 deletions cmd/argo/commands/archive/resubmit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package archive

import (
"context"

"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

client "github.com/argoproj/argo-workflows/v3/cmd/argo/commands/client"
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/common"
workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

type resubmitOps struct {
priority int32 // --priority
memoized bool // --memoized
namespace string // --namespace
labelSelector string // --selector
fieldSelector string // --field-selector
}

// hasSelector returns true if the CLI arguments selects multiple workflows
func (o *resubmitOps) hasSelector() bool {
if o.labelSelector != "" || o.fieldSelector != "" {
return true
}
return false
}

func NewResubmitCommand() *cobra.Command {
var (
resubmitOpts resubmitOps
cliSubmitOpts common.CliSubmitOpts
)
command := &cobra.Command{
Use: "resubmit [WORKFLOW...]",
Short: "resubmit one or more workflows",
Example: `# Resubmit a workflow:
argo archive resubmit uid
# Resubmit multiple workflows:
argo resubmit uid another-uid
# Resubmit multiple workflows by label selector:
argo resubmit -l workflows.argoproj.io/test=true
# Resubmit multiple workflows by field selector:
argo resubmit --field-selector metadata.namespace=argo
# Resubmit and wait for completion:
argo resubmit --wait uid
# Resubmit and watch until completion:
argo resubmit --watch uid
# Resubmit and tail logs until completion:
argo resubmit --log uid
`,
Run: func(cmd *cobra.Command, args []string) {
if cmd.Flag("priority").Changed {
cliSubmitOpts.Priority = &resubmitOpts.priority
}

ctx, apiClient := client.NewAPIClient(cmd.Context())
serviceClient := apiClient.NewWorkflowServiceClient() // needed for wait watch or log flags
archiveServiceClient, err := apiClient.NewArchivedWorkflowServiceClient()
errors.CheckError(err)
resubmitOpts.namespace = client.Namespace()
err = resubmitArchivedWorkflows(ctx, archiveServiceClient, serviceClient, resubmitOpts, cliSubmitOpts, args)
errors.CheckError(err)
},
}

command.Flags().Int32Var(&resubmitOpts.priority, "priority", 0, "workflow priority")
command.Flags().StringVarP(&cliSubmitOpts.Output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&cliSubmitOpts.Wait, "wait", "w", false, "wait for the workflow to complete, only works when a single workflow is resubmitted")
command.Flags().BoolVar(&cliSubmitOpts.Watch, "watch", false, "watch the workflow until it completes, only works when a single workflow is resubmitted")
command.Flags().BoolVar(&cliSubmitOpts.Log, "log", false, "log the workflow until it completes")
command.Flags().BoolVar(&resubmitOpts.memoized, "memoized", false, "re-use successful steps & outputs from the previous run")
command.Flags().StringVarP(&resubmitOpts.labelSelector, "selector", "l", "", "Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)")
command.Flags().StringVar(&resubmitOpts.fieldSelector, "field-selector", "", "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.")
return command
}

// resubmitWorkflows resubmits workflows by given resubmitOpts or workflow names
func resubmitArchivedWorkflows(ctx context.Context, archiveServiceClient workflowarchivepkg.ArchivedWorkflowServiceClient, serviceClient workflowpkg.WorkflowServiceClient, resubmitOpts resubmitOps, cliSubmitOpts common.CliSubmitOpts, args []string) error {
var (
wfs wfv1.Workflows
err error
)

if resubmitOpts.hasSelector() {
wfs, err = listArchivedWorkflows(ctx, archiveServiceClient, resubmitOpts.fieldSelector, resubmitOpts.labelSelector, 0)
if err != nil {
return err
}
}

for _, uid := range args {
wfs = append(wfs, wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(uid),
Namespace: resubmitOpts.namespace,
},
})
}

var lastResubmitted *wfv1.Workflow
resubmittedUids := make(map[string]bool)

for _, wf := range wfs {
if _, ok := resubmittedUids[string(wf.UID)]; ok {
// de-duplication in case there is an overlap between the selector and given workflow names
continue
}
resubmittedUids[string(wf.UID)] = true

lastResubmitted, err = archiveServiceClient.ResubmitArchivedWorkflow(ctx, &workflowarchivepkg.ResubmitArchivedWorkflowRequest{
Uid: string(wf.UID),
Namespace: wf.Namespace,
Name: wf.Name,
Memoized: resubmitOpts.memoized,
})
if err != nil {
return err
}
printWorkflow(lastResubmitted, cliSubmitOpts.Output)
}

if len(resubmittedUids) == 1 {
// watch or wait when there is only one workflow retried
common.WaitWatchOrLog(ctx, serviceClient, lastResubmitted.Namespace, []string{lastResubmitted.Name}, cliSubmitOpts)
}
return nil
}
2 changes: 2 additions & 0 deletions cmd/argo/commands/archive/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ func NewArchiveCommand() *cobra.Command {
cmd.HelpFunc()(cmd, args)
},
}

command.AddCommand(NewListCommand())
command.AddCommand(NewGetCommand())
command.AddCommand(NewDeleteCommand())
command.AddCommand(NewListLabelKeyCommand())
command.AddCommand(NewListLabelValueCommand())
command.AddCommand(NewResubmitCommand())
return command
}
Loading

0 comments on commit 8c77e89

Please sign in to comment.