Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Conditionally annotate outputs of script template only when consumed #1359 #1462

Merged
merged 34 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
00f08b1
CheckandEstimate implementation
sarabala1979 Apr 4, 2019
e7ac42a
fixed variable rename
sarabala1979 Apr 4, 2019
d956bfc
fixed gofmt
sarabala1979 Apr 5, 2019
be9b3b5
fixed feedbacks
sarabala1979 Apr 5, 2019
0d15503
merge
sarabala1979 Apr 9, 2019
d967df3
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Apr 9, 2019
4c9b645
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Apr 10, 2019
c40cc5f
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Apr 11, 2019
51e2d5f
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Apr 26, 2019
5f41f3e
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 May 9, 2019
9a95a48
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 May 31, 2019
6a080b5
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Jun 7, 2019
0a48649
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Jun 11, 2019
7b273ab
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Jun 17, 2019
0ba8910
Merge remote-tracking branch 'upstream/master' into Issue1359
sarabala1979 Jul 1, 2019
b0e4288
Implement the exclude script output in step and DAG workflow
sarabala1979 Jul 3, 2019
14af0f7
Update operator.go
sarabala1979 Jul 3, 2019
e94f5b6
Update workflowpod.go
sarabala1979 Jul 3, 2019
9693af7
updated Review Comments
sarabala1979 Jul 16, 2019
730c763
Update exec_control.go
sarabala1979 Jul 17, 2019
3a7a544
fixed typo
sarabala1979 Jul 19, 2019
9fd3109
Updated Review comments
sarabala1979 Jul 19, 2019
4d65f75
updated review comments
sarabala1979 Jul 22, 2019
08779f0
Update wait.go
sarabala1979 Jul 22, 2019
a8c5c97
Update operator.go
sarabala1979 Jul 22, 2019
c9716aa
Merge remote-tracking branch 'upstream/master' into Issue1359
sarabala1979 Jul 22, 2019
2d6b309
Merge branch 'master' into Issue1359
sarabala1979 Jul 22, 2019
413c1f5
fixed merge issue
sarabala1979 Jul 22, 2019
a70662d
Update operator_test.go
sarabala1979 Jul 22, 2019
164519b
updated review comments
sarabala1979 Jul 25, 2019
a4a79b3
updated comments
sarabala1979 Jul 25, 2019
737c595
updated
sarabala1979 Jul 25, 2019
f21ca51
Update operator_test.go
sarabala1979 Jul 25, 2019
c4d2b50
updated comments
sarabala1979 Jul 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (

// AnnotationKeyNodeName is the pod metadata annotation key containing the workflow node name
AnnotationKeyNodeName = workflow.FullName + "/node-name"

// AnnotationKeyNodeMessage is the pod metadata annotation key the executor will use to
// communicate errors encountered by the executor during artifact load/save, etc...
AnnotationKeyNodeMessage = workflow.FullName + "/node-message"
Expand Down Expand Up @@ -130,6 +131,8 @@ type ExecutionControl struct {
// It is used to signal the executor to terminate a daemoned container. In the future it will be
// used to support workflow or steps/dag level timeouts.
Deadline *time.Time `json:"deadline,omitempty"`
// IncludeScriptOutput is containing flag to include script output
IncludeScriptOutput bool `json:"includeScriptOutput,omitempty"`
}

type ResourceInterface interface {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d *dagContext) assertBranchFinished(targetTaskName string) bool {
// We should ensure that from the bottom to the top,
// all the nodes of this branch have at least one failure.
// If successful, we should continue to run down until the leaf node
taskNode := d.getTaskNode(targetTaskName)
taskNode := d.GetTaskNode(targetTaskName)
if taskNode == nil {
taskObject := d.getTask(targetTaskName)
if taskObject != nil {
Expand Down
19 changes: 10 additions & 9 deletions workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,33 @@ func (woc *wfOperationCtx) applyExecutionControl(pod *apiv1.Pod, wfNodesLock *sy
}
}

// Now ensure the pod's current annotation matches our desired deadline
desiredExecCtl := common.ExecutionControl{
Deadline: woc.workflowDeadline,
}
var podExecCtl common.ExecutionControl
if execCtlStr, ok := pod.Annotations[common.AnnotationKeyExecutionControl]; ok && execCtlStr != "" {
err := json.Unmarshal([]byte(execCtlStr), &podExecCtl)
if err != nil {
woc.log.Warnf("Failed to unmarshal execution control from pod %s", pod.Name)
}
}
if podExecCtl.Deadline == nil && desiredExecCtl.Deadline == nil {
if podExecCtl.Deadline == nil && woc.workflowDeadline == nil {
return nil
} else if podExecCtl.Deadline != nil && desiredExecCtl.Deadline != nil {
if podExecCtl.Deadline.Equal(*desiredExecCtl.Deadline) {
} else if podExecCtl.Deadline != nil && woc.workflowDeadline != nil {
if podExecCtl.Deadline.Equal(*woc.workflowDeadline) {
return nil
}
}

if podExecCtl.Deadline != nil && podExecCtl.Deadline.IsZero() {
// If the pod has already been explicitly signaled to terminate, then do nothing.
// This can happen when daemon steps are terminated.
woc.log.Infof("Skipping sync of execution control of pod %s. pod has been signaled to terminate", pod.Name)
return nil
}
woc.log.Infof("Execution control for pod %s out-of-sync desired: %v, actual: %v", pod.Name, desiredExecCtl.Deadline, podExecCtl.Deadline)
return woc.updateExecutionControl(pod.Name, desiredExecCtl)

// Assign new deadline value to PodExeCtl
podExecCtl.Deadline = woc.workflowDeadline

woc.log.Infof("Execution control for pod %s out-of-sync desired: %v, actual: %v", pod.Name, woc.workflowDeadline, podExecCtl.Deadline)
return woc.updateExecutionControl(pod.Name, podExecCtl)
}

// killDaemonedChildren kill any daemoned pods of a steps or DAG template node.
Expand Down
49 changes: 46 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template
return node
}
woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl, false)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down Expand Up @@ -1365,14 +1365,57 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out
return &outputs, nil
}

// hasOutputResultRef will check given template output has any reference
func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool {

var variableRefName string
if parentTmpl.DAG != nil {
variableRefName = "{{tasks." + name + ".outputs.result}}"
} else if parentTmpl.Steps != nil {
variableRefName = "{{steps." + name + ".outputs.result}}"
}

jsonValue, err := json.Marshal(parentTmpl)
if err != nil {
log.Warnf("Unable to marshal the template. %v, %v", parentTmpl, err)
}

return strings.Contains(string(jsonValue), variableRefName)
}

// getStepOrDAGTaskName will extract the node from NodeStatus Name
func getStepOrDAGTaskName(nodeName string, hasRetryStrategy bool) string {
if strings.Contains(nodeName, ".") {
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
name := nodeName[strings.LastIndex(nodeName, ".")+1:]
// Check retry scenario
if hasRetryStrategy {
if indx := strings.LastIndex(name, "("); indx > 0 {
return name[0:indx]
}
}
return name
}
return nodeName
}

func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus {

boundaryNode := woc.wf.Status.Nodes[boundaryID]
parentTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName)

includeScriptOutput := false
if parentTemplate != nil {
name := getStepOrDAGTaskName(nodeName, tmpl.RetryStrategy != nil)
includeScriptOutput = hasOutputResultRef(name, parentTemplate)
}
node := woc.getNodeByName(nodeName)

if node != nil {
return node
}
mainCtr := tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl)
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl, includeScriptOutput)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down Expand Up @@ -1607,7 +1650,7 @@ func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template,
mainCtr.VolumeMounts = []apiv1.VolumeMount{
volumeMountPodMetadata,
}
_, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl)
_, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl, false)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down
128 changes: 125 additions & 3 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"fmt"
"github.com/argoproj/argo/workflow/config"
"strings"
"testing"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -1160,6 +1161,127 @@ func TestResourceWithOwnerReferenceTemplate(t *testing.T) {
}
}

var stepScriptTmpl = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: scripts-bash-
spec:
entrypoint: bash-script-example
templates:
- name: bash-script-example
steps:
- - name: generate
template: gen-random-int
- - name: print
template: print-message
arguments:
parameters:
- name: message
value: "{{steps.generate.outputs.result}}"

- name: gen-random-int
script:
image: debian:9.4
command: [bash]
source: |
cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'

- name: print-message
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo result was: {{inputs.parameters.message}}"]
`

var dagScriptTmpl = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-target-
spec:
entrypoint: dag-target
arguments:
parameters:
- name: target
value: E

templates:
- name: dag-target
dag:
tasks:
- name: A
template: echo
arguments:
parameters: [{name: message, value: A}]
- name: B
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: "{{tasks.A.outputs.result}}"}]
- name: echo
script:
image: debian:9.4
command: [bash]
source: |
cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'`

func TestStepWFGetNodeName(t *testing.T) {

controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(stepScriptTmpl)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
assert.True(t, hasOutputResultRef("generate", &wf.Spec.Templates[0]))
assert.False(t, hasOutputResultRef("print-message", &wf.Spec.Templates[0]))
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
for _, node := range wf.Status.Nodes {
if strings.Contains(node.Name, "generate") {
assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil) == "generate")
} else if strings.Contains(node.Name, "print-message") {
assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil) == "print-message")
}
}
}

func TestDAGWFGetNodeName(t *testing.T) {

controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(dagScriptTmpl)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
assert.True(t, hasOutputResultRef("A", &wf.Spec.Templates[0]))
assert.False(t, hasOutputResultRef("B", &wf.Spec.Templates[0]))
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
for _, node := range wf.Status.Nodes {
if strings.Contains(node.Name, ".A") {
assert.True(t, getStepOrDAGTaskName(node.Name, wf.Spec.Templates[0].RetryStrategy != nil) == "A")
}
if strings.Contains(node.Name, ".B") {
assert.True(t, getStepOrDAGTaskName(node.Name, wf.Spec.Templates[0].RetryStrategy != nil) == "B")
}
}
}

var withParamAsJsonList = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand All @@ -1181,14 +1303,14 @@ spec:
- name: message
value: "{{item}}"
withParam: "{{workflow.parameters.input}}"
- name: whalesay
- name: whalesay
inputs:
parameters:
- name: message
container:
script:
image: alpine:latest
command: [sh, -c]
args: ["echo "]
args: ["echo result was: {{inputs.parameters.message}}"]
`

func TestWithParamAsJsonList(t *testing.T) {
Expand Down
19 changes: 13 additions & 6 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume {
}
}

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template) (*apiv1.Pod, error) {
func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)
woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)
tmpl = tmpl.DeepCopy()
Expand Down Expand Up @@ -168,7 +168,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
}

addSchedulingConstraints(pod, wfSpec, tmpl)
woc.addMetadata(pod, tmpl)
woc.addMetadata(pod, tmpl, includeScriptOutput)

err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims)
if err != nil {
Expand Down Expand Up @@ -446,21 +446,28 @@ func isResourcesSpecified(ctr *apiv1.Container) bool {
}

// addMetadata applies metadata specified in the template
func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template) {
func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, includeScriptOutput bool) {
for k, v := range tmpl.Metadata.Annotations {
pod.ObjectMeta.Annotations[k] = v
}
for k, v := range tmpl.Metadata.Labels {
pod.ObjectMeta.Labels[k] = v
}

execCtl := common.ExecutionControl{
IncludeScriptOutput: includeScriptOutput,
}

if woc.workflowDeadline != nil {
execCtl := common.ExecutionControl{
Deadline: woc.workflowDeadline,
}
execCtl.Deadline = woc.workflowDeadline

}
if woc.workflowDeadline != nil || includeScriptOutput {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
panic(err)
}

pod.ObjectMeta.Annotations[common.AnnotationKeyExecutionControl] = string(execCtlBytes)
}
}
Expand Down
5 changes: 5 additions & 0 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,11 @@ func (we *WorkflowExecutor) GetMainContainerID() (string, error) {

// CaptureScriptResult will add the stdout of a script template as output result
func (we *WorkflowExecutor) CaptureScriptResult() error {

if we.ExecutionControl == nil || !we.ExecutionControl.IncludeScriptOutput {
log.Infof("No Script output reference in workflow. Capturing script output ignored")
return nil
}
if we.Template.Script == nil {
return nil
}
Expand Down