Logs
@@ -57,11 +66,11 @@ export const WorkflowLogsViewer = ({workflow, nodeId, container, archived}: Work
Logs for archived workflows may be overwritten by a more recent workflow with the same name.
)}
-
+
{' '}
x.value === podName) || {}).label} onSelect={(_, item) => setPodName(item.value)} /> /{' '}
-
+
{selectedContainer === 'init' && (
diff --git a/ui/src/app/workflows/components/workflow-node-info/workflow-node-info.tsx b/ui/src/app/workflows/components/workflow-node-info/workflow-node-info.tsx
index 5d305c112ac5..9cc1a63e669b 100644
--- a/ui/src/app/workflows/components/workflow-node-info/workflow-node-info.tsx
+++ b/ui/src/app/workflows/components/workflow-node-info/workflow-node-info.tsx
@@ -118,7 +118,10 @@ const WorkflowNodeSummary = (props: Props) => {
);
}
if (props.node.type === 'Retry') {
- attributes.push({title: 'FAIL HOSTS', value:
{failHosts(props.node, props.workflow)} });
+ attributes.push({
+ title: 'FAIL HOSTS',
+ value:
{failHosts(props.node, props.workflow)}
+ });
}
if (props.node.resourcesDuration) {
attributes.push({
@@ -132,7 +135,7 @@ const WorkflowNodeSummary = (props: Props) => {
- {props.onShowYaml && props.onShowYaml(props.node.id)}>YAML }{' '}
+ {props.node.type !== 'Container' && props.onShowYaml && props.onShowYaml(props.node.id)}>YAML }{' '}
{props.node.type === 'Pod' && props.onShowContainerLogs && (
showLogs()}
@@ -148,20 +151,27 @@ const WorkflowNodeSummary = (props: Props) => {
EVENTS
)}{' '}
-
+ {props.node.type === 'Container' && props.onShowContainerLogs && (
+ props.onShowContainerLogs(props.node.name.replace(/.[^.]*$/, ''), props.node.name.replace(/.*\./, ''))}>
+ logs
+
+ )}{' '}
+ {props.node.type === 'Pod' && (
+
+ )}
);
@@ -311,7 +321,7 @@ class WorkflowNodeContainers extends React.Component
@@ -320,8 +330,10 @@ class WorkflowNodeContainers extends React.Component
);
}
- const container =
- (this.state.selectedSidecar && template.sidecars && template.sidecars.find(item => item.name === this.state.selectedSidecar)) || template.container || template.script;
+ const containers = (template.containerSet ? template.containerSet.graph || template.containerSet.sequence : []).concat(template.sidecars || []);
+
+ const container = (this.state.selectedSidecar && containers.find(item => item.name === this.state.selectedSidecar)) || template.container || template.script;
+
return (
{this.state.selectedSidecar &&
this.setState({selectedSidecar: null})} />}
@@ -331,10 +343,10 @@ class WorkflowNodeContainers extends React.Component
- {!this.state.selectedSidecar && template.sidecars && template.sidecars.length > 0 && (
+ {!this.state.selectedSidecar && (
SIDECARS:
- {template.sidecars.map(sidecar => (
+ {containers.map(sidecar => (
this.setState({selectedSidecar: sidecar.name})}>
{sidecar.name}
@@ -412,23 +424,28 @@ export const WorkflowNodeInfo = (props: Props) => (
)
- },
- {
- title: 'CONTAINERS',
- key: 'containers',
- content:
- },
- {
- title: 'INPUTS/OUTPUTS',
- key: 'inputs-outputs',
- content: (
- <>
-
-
- >
- )
}
- ]}
+ ].concat(
+ props.node.type !== 'Container'
+ ? [
+ {
+ title: 'CONTAINERS',
+ key: 'containers',
+ content:
+ },
+ {
+ title: 'INPUTS/OUTPUTS',
+ key: 'inputs-outputs',
+ content: (
+ <>
+
+
+ >
+ )
+ }
+ ]
+ : []
+ )}
/>
);
diff --git a/ui/src/models/workflows.ts b/ui/src/models/workflows.ts
index a941932ef85d..e1d68cecaebd 100644
--- a/ui/src/models/workflows.ts
+++ b/ui/src/models/workflows.ts
@@ -293,6 +293,10 @@ export interface SidecarOptions {
mirrorVolumeMounts?: boolean;
}
+export interface ContainerNode extends kubernetes.Container {
+ dependencies?: string[];
+}
+
/**
* Template is a reusable and composable unit of execution in a workflow
*/
@@ -310,6 +314,11 @@ export interface Template {
* Container is the main container image to run in the pod
*/
container?: kubernetes.Container;
+
+ containerSet?: {
+ sequence: kubernetes.Container[];
+ graph: ContainerNode[];
+ };
/**
* Deamon will allow a workflow to proceed to the next step so long as the container reaches readiness
*/
@@ -415,7 +424,7 @@ export interface Workflow {
export const execSpec = (w: Workflow) => Object.assign({}, w.status.storedWorkflowTemplateSpec, w.spec);
-export type NodeType = 'Pod' | 'Steps' | 'StepGroup' | 'DAG' | 'Retry' | 'Skipped' | 'TaskGroup' | 'Suspend';
+export type NodeType = 'Pod' | 'Container' | 'Steps' | 'StepGroup' | 'DAG' | 'Retry' | 'Skipped' | 'TaskGroup' | 'Suspend';
export interface NodeStatus {
/**
diff --git a/workflow/common/util.go b/workflow/common/util.go
index 536170ea1597..bf1634e9d0ef 100644
--- a/workflow/common/util.go
+++ b/workflow/common/util.go
@@ -34,17 +34,9 @@ import (
// user specified volumeMounts in the template, and returns the deepest volumeMount
// (if any). A return value of nil indicates the path is not under any volumeMount.
func FindOverlappingVolume(tmpl *wfv1.Template, path string) *apiv1.VolumeMount {
- var volMounts []apiv1.VolumeMount
- if tmpl.Container != nil {
- volMounts = tmpl.Container.VolumeMounts
- } else if tmpl.Script != nil {
- volMounts = tmpl.Script.VolumeMounts
- } else {
- return nil
- }
var volMnt *apiv1.VolumeMount
deepestLen := 0
- for _, mnt := range volMounts {
+ for _, mnt := range tmpl.GetVolumeMounts() {
if path != mnt.MountPath && !strings.HasPrefix(path, mnt.MountPath+"/") {
continue
}
diff --git a/workflow/controller/container_set_template.go b/workflow/controller/container_set_template.go
new file mode 100644
index 000000000000..dc15bbc08d6c
--- /dev/null
+++ b/workflow/controller/container_set_template.go
@@ -0,0 +1,51 @@
+package controller
+
+import (
+ "context"
+ "fmt"
+
+ wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
+ "github.com/argoproj/argo-workflows/v3/workflow/common"
+)
+
+func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
+ node := woc.wf.GetNodeByName(nodeName)
+ if node == nil {
+ node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending)
+ }
+
+ if woc.getContainerRuntimeExecutor() != common.ContainerRuntimeExecutorEmissary && tmpl.HasSequencedContainers() {
+ woc.markNodePhase(nodeName, wfv1.NodeFailed, fmt.Sprintf("template has sequenced containers, so you must use the emissary executor rather than %q, learn more: https://argoproj.github.io/argo-workflows/workflow-executors/#emissary-emissary", woc.getContainerRuntimeExecutor()))
+ return woc.wf.GetNodeByName(nodeName), nil
+ }
+
+ _, err := woc.createWorkflowPod(ctx, nodeName, tmpl.ContainerSet.GetContainers(), tmpl, &createWorkflowPodOpts{
+ includeScriptOutput: tmpl.HasOutput(),
+ onExitPod: opts.onExitTemplate,
+ executionDeadline: opts.executionDeadline,
+ })
+ if err != nil {
+ return woc.requeueIfTransientErr(err, node.Name)
+ }
+
+ // we only complete the graph if we actually managed to create the pod,
+ // which prevents creating many pending nodes that could never be scheduled
+ for _, c := range tmpl.ContainerSet.GetContainers() {
+ ctxNodeName := fmt.Sprintf("%s.%s", nodeName, c.Name)
+ ctrNode := woc.wf.GetNodeByName(ctxNodeName)
+ if ctrNode == nil {
+ _ = woc.initializeNode(ctxNodeName, wfv1.NodeTypeContainer, templateScope, orgTmpl, node.ID, wfv1.NodePending)
+ }
+ }
+ for _, c := range tmpl.ContainerSet.GetGraph() {
+ ctrNodeName := fmt.Sprintf("%s.%s", nodeName, c.Name)
+ if len(c.Dependencies) == 0 {
+ woc.addChildNode(nodeName, ctrNodeName)
+ }
+ for _, v := range c.Dependencies {
+ woc.addChildNode(fmt.Sprintf("%s.%s", nodeName, v), ctrNodeName)
+ }
+ }
+
+ return node, nil
+}
diff --git a/workflow/controller/container_set_template_test.go b/workflow/controller/container_set_template_test.go
new file mode 100644
index 000000000000..c4e4de5c21b2
--- /dev/null
+++ b/workflow/controller/container_set_template_test.go
@@ -0,0 +1,242 @@
+package controller
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+
+ wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
+ "github.com/argoproj/argo-workflows/v3/workflow/common"
+)
+
+func TestContainerSetTemplate(t *testing.T) {
+ wf := unmarshalWF(`
+metadata:
+ name: pod
+spec:
+ entrypoint: main
+ templates:
+ - name: main
+ volumes:
+ - name: workspace
+ emptyDir: { }
+ containerSet:
+ volumeMounts:
+ - name: workspace
+ mountPath: /workspace
+ containers:
+ - name: ctr-0
+ image: argoproj/argosay:v2
+`)
+ cancel, controller := newController(wf)
+ defer cancel()
+
+ woc := newWorkflowOperationCtx(wf, controller)
+ woc.operate(context.Background())
+
+ assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
+ assert.Len(t, woc.wf.Status.Nodes, 2)
+
+ pod, err := getPod(woc, "pod")
+ assert.NoError(t, err)
+
+ socket := corev1.HostPathSocket
+ assert.ElementsMatch(t, []corev1.Volume{
+ {
+ Name: "podmetadata",
+ VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
+ Items: []corev1.DownwardAPIVolumeFile{{
+ Path: "annotations",
+ FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
+ }},
+ }},
+ },
+ {Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
+ {Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
+ }, pod.Spec.Volumes)
+
+ assert.Empty(t, pod.Spec.InitContainers)
+
+ assert.Len(t, pod.Spec.Containers, 2)
+ for _, c := range pod.Spec.Containers {
+ switch c.Name {
+ case common.WaitContainerName:
+ assert.ElementsMatch(t, []corev1.VolumeMount{
+ {Name: "podmetadata", MountPath: "/argo/podmetadata"},
+ {Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
+ }, c.VolumeMounts)
+ case "ctr-0":
+ assert.ElementsMatch(t, []corev1.VolumeMount{
+ {Name: "workspace", MountPath: "/workspace"},
+ }, c.VolumeMounts)
+ default:
+ t.Fatalf(c.Name)
+ }
+ }
+}
+
+func TestContainerSetTemplateWithInputArtifacts(t *testing.T) {
+ wf := unmarshalWF(`
+metadata:
+ name: pod
+spec:
+ entrypoint: main
+ templates:
+ - name: main
+ inputs:
+ artifacts:
+ - name: in-0
+ path: /in/in-0
+ raw:
+ data: hi
+ - name: in-1
+ path: /workspace/in-1
+ raw:
+ data: hi
+ volumes:
+ - name: workspace
+ emptyDir: { }
+ containerSet:
+ volumeMounts:
+ - name: workspace
+ mountPath: /workspace
+ containers:
+ - name: main
+ image: argoproj/argosay:v2
+`)
+ cancel, controller := newController(wf)
+ defer cancel()
+
+ woc := newWorkflowOperationCtx(wf, controller)
+ woc.operate(context.Background())
+
+ assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
+ assert.Len(t, woc.wf.Status.Nodes, 2)
+
+ pod, err := getPod(woc, "pod")
+ assert.NoError(t, err)
+
+ socket := corev1.HostPathSocket
+ assert.ElementsMatch(t, []corev1.Volume{
+ {
+ Name: "podmetadata",
+ VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
+ Items: []corev1.DownwardAPIVolumeFile{{
+ Path: "annotations",
+ FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
+ }},
+ }},
+ },
+ {Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
+ {Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
+ {Name: "input-artifacts", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
+ }, pod.Spec.Volumes)
+
+ if assert.Len(t, pod.Spec.InitContainers, 1) {
+ c := pod.Spec.InitContainers[0]
+ assert.ElementsMatch(t, []corev1.VolumeMount{
+ {Name: "podmetadata", MountPath: "/argo/podmetadata"},
+ {Name: "input-artifacts", MountPath: "/argo/inputs/artifacts"},
+ {Name: "workspace", MountPath: "/mainctrfs/workspace"},
+ }, c.VolumeMounts)
+ }
+
+ assert.Len(t, pod.Spec.Containers, 2)
+ for _, c := range pod.Spec.Containers {
+ switch c.Name {
+ case common.WaitContainerName:
+ assert.ElementsMatch(t, []corev1.VolumeMount{
+ {Name: "podmetadata", MountPath: "/argo/podmetadata"},
+ {Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
+ {Name: "workspace", MountPath: "/mainctrfs/workspace"},
+ {Name: "input-artifacts", MountPath: "/mainctrfs/in/in-0", SubPath: "in-0"},
+ }, c.VolumeMounts)
+ case "main":
+ assert.ElementsMatch(t, []corev1.VolumeMount{
+ {Name: "workspace", MountPath: "/workspace"},
+ {Name: "input-artifacts", MountPath: "/in/in-0", SubPath: "in-0"},
+ }, c.VolumeMounts)
+ default:
+ t.Fatalf(c.Name)
+ }
+ }
+}
+
+func TestContainerSetTemplateWithOutputArtifacts(t *testing.T) {
+ wf := unmarshalWF(`
+metadata:
+ name: pod
+spec:
+ entrypoint: main
+ templates:
+ - name: main
+ volumes:
+ - name: workspace
+ emptyDir: { }
+ containerSet:
+ volumeMounts:
+ - name: workspace
+ mountPath: /workspace
+ containers:
+ - name: main
+ image: argoproj/argosay:v2
+ outputs:
+ artifacts:
+ - name: in-0
+ path: /in/in-0
+ raw:
+ data: hi
+ - name: in-1
+ path: /workspace/in-1
+ raw:
+ data: hi
+`)
+ cancel, controller := newController(wf)
+ defer cancel()
+
+ woc := newWorkflowOperationCtx(wf, controller)
+ woc.operate(context.Background())
+
+ assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
+ assert.Len(t, woc.wf.Status.Nodes, 2)
+
+ pod, err := getPod(woc, "pod")
+ assert.NoError(t, err)
+
+ socket := corev1.HostPathSocket
+ assert.ElementsMatch(t, []corev1.Volume{
+ {
+ Name: "podmetadata",
+ VolumeSource: corev1.VolumeSource{DownwardAPI: &corev1.DownwardAPIVolumeSource{
+ Items: []corev1.DownwardAPIVolumeFile{{
+ Path: "annotations",
+ FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.annotations"},
+ }},
+ }},
+ },
+ {Name: "docker-sock", VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/var/run/docker.sock", Type: &socket}}},
+ {Name: "workspace", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}},
+ }, pod.Spec.Volumes)
+
+ assert.Len(t, pod.Spec.InitContainers, 0)
+
+ assert.Len(t, pod.Spec.Containers, 2)
+ for _, c := range pod.Spec.Containers {
+ switch c.Name {
+ case common.WaitContainerName:
+ assert.ElementsMatch(t, []corev1.VolumeMount{
+ {Name: "podmetadata", MountPath: "/argo/podmetadata"},
+ {Name: "docker-sock", MountPath: "/var/run/docker.sock", ReadOnly: true},
+ {Name: "workspace", MountPath: "/mainctrfs/workspace"},
+ }, c.VolumeMounts)
+ case "main":
+ assert.ElementsMatch(t, []corev1.VolumeMount{
+ {Name: "workspace", MountPath: "/workspace"},
+ }, c.VolumeMounts)
+ default:
+ t.Fatalf(c.Name)
+ }
+ }
+}
diff --git a/workflow/controller/exec_control.go b/workflow/controller/exec_control.go
index 1649178f38ba..97d21644d7a6 100644
--- a/workflow/controller/exec_control.go
+++ b/workflow/controller/exec_control.go
@@ -73,26 +73,22 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
woc.log.Warnf("Failed to unmarshal execution control from pod %s", pod.Name)
}
}
- containerName := common.WaitContainerName
- // A resource template does not have a wait container,
- // instead the only container is the main container (which is running argoexec)
- if len(pod.Spec.Containers) == 1 {
- containerName = common.MainContainerName
- }
- if woc.wf.Spec.Shutdown != "" {
- if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
- podExecCtl.Deadline = &time.Time{}
- woc.log.Infof("Applying shutdown deadline for pod %s", pod.Name)
- return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, containerName)
+ for _, c := range woc.findTemplate(pod).GetMainContainerNames() {
+ if woc.wf.Spec.Shutdown != "" {
+ if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
+ podExecCtl.Deadline = &time.Time{}
+ woc.log.Infof("Applying shutdown deadline for pod %s", pod.Name)
+ return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, c)
+ }
}
- }
- if woc.workflowDeadline != nil {
- if podExecCtl.Deadline == nil || woc.workflowDeadline.Before(*podExecCtl.Deadline) {
- podExecCtl.Deadline = woc.workflowDeadline
- woc.log.Infof("Applying sooner Workflow Deadline for pod %s at: %v", pod.Name, woc.workflowDeadline)
- return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, containerName)
+ if woc.workflowDeadline != nil {
+ if podExecCtl.Deadline == nil || woc.workflowDeadline.Before(*podExecCtl.Deadline) {
+ podExecCtl.Deadline = woc.workflowDeadline
+ woc.log.Infof("Applying sooner Workflow Deadline for pod %s at: %v", pod.Name, woc.workflowDeadline)
+ return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, c)
+ }
}
}
diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go
index dc80b6cd424a..3ed644931a70 100644
--- a/workflow/controller/operator.go
+++ b/workflow/controller/operator.go
@@ -1131,7 +1131,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
if node.IsDaemoned() {
newPhase = wfv1.NodeSucceeded
} else {
- newPhase, message = inferFailedReason(pod)
+ newPhase, message = woc.inferFailedReason(pod)
woc.log.WithField("displayName", node.DisplayName).WithField("templateName", node.TemplateName).
WithField("pod", pod.Name).Infof("Pod failed: %s", message)
}
@@ -1168,6 +1168,32 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
WithField("pod", pod.Name).Error(message)
}
+ for _, c := range pod.Status.ContainerStatuses {
+ ctrNodeName := fmt.Sprintf("%s.%s", node.Name, c.Name)
+ if woc.wf.GetNodeByName(ctrNodeName) == nil {
+ continue
+ }
+ switch {
+ case c.State.Waiting != nil:
+ woc.markNodePhase(ctrNodeName, wfv1.NodePending)
+ case c.State.Running != nil:
+ woc.markNodePhase(ctrNodeName, wfv1.NodeRunning)
+ case c.State.Terminated != nil:
+ exitCode := int(c.State.Terminated.ExitCode)
+ message := fmt.Sprintf("%s (exit code %d): %s", c.State.Terminated.Reason, exitCode, c.State.Terminated.Message)
+ switch exitCode {
+ case 0:
+ woc.markNodePhase(ctrNodeName, wfv1.NodeSucceeded)
+ case 64:
+ // special emissary exit code indicating the emissary errors, rather than the sub-process failure,
+ // (unless the sub-process coincidentally exits with code 64 of course)
+ woc.markNodePhase(ctrNodeName, wfv1.NodeError, message)
+ default:
+ woc.markNodePhase(ctrNodeName, wfv1.NodeFailed, message)
+ }
+ }
+ }
+
if newDaemonStatus != nil {
if !*newDaemonStatus {
// if the daemon status switched to false, we prefer to just unset daemoned status field
@@ -1278,25 +1304,28 @@ func getPendingReason(pod *apiv1.Pod) string {
// inferFailedReason returns metadata about a Failed pod to be used in its NodeStatus
// Returns a tuple of the new phase and message
-func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
+func (woc *wfOperationCtx) inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
if pod.Status.Message != "" {
// Pod has a nice error message. Use that.
return wfv1.NodeFailed, pod.Status.Message
}
+ tmpl := woc.findTemplate(pod)
+
// We only get one message to set for the overall node status.
// If multiple containers failed, in order of preference:
// init, main (annotated), main (exit code), wait, sidecars
order := func(n string) int {
- order, ok := map[string]int{
- common.InitContainerName: 0,
- common.MainContainerName: 1,
- common.WaitContainerName: 2,
- }[n]
- if ok {
- return order
+ switch {
+ case n == common.InitContainerName:
+ return 0
+ case tmpl.IsMainContainerName(n):
+ return 1
+ case n == common.WaitContainerName:
+ return 2
+ default:
+ return 3
}
- return 3
}
ctrs := append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...)
@@ -1326,12 +1355,12 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
msg = fmt.Sprintf("%s: %s", msg, t.Message)
}
- switch ctr.Name {
- case common.InitContainerName:
+ switch {
+ case ctr.Name == common.InitContainerName:
return wfv1.NodeError, msg
- case common.MainContainerName:
+ case tmpl.IsMainContainerName(ctr.Name):
return wfv1.NodeFailed, msg
- case common.WaitContainerName:
+ case ctr.Name == common.WaitContainerName:
return wfv1.NodeError, msg
default:
if t.ExitCode == 137 || t.ExitCode == 143 {
@@ -1721,6 +1750,8 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
switch processedTmpl.GetType() {
case wfv1.TemplateTypeContainer:
node, err = woc.executeContainer(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
+ case wfv1.TemplateTypeContainerSet:
+ node, err = woc.executeContainerSet(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypeSteps:
node, err = woc.executeSteps(ctx, nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypeScript:
@@ -1924,6 +1955,15 @@ func (woc *wfOperationCtx) hasDaemonNodes() bool {
return false
}
+func (woc *wfOperationCtx) findTemplate(pod *apiv1.Pod) *wfv1.Template {
+ nodeName := pod.Annotations[common.AnnotationKeyNodeName]
+ node := woc.wf.GetNodeByName(nodeName)
+ if node == nil {
+ return nil // I don't expect this to happen in production, just in tests
+ }
+ return woc.wf.GetTemplateByName(node.TemplateName)
+}
+
func (woc *wfOperationCtx) markWorkflowRunning(ctx context.Context) {
woc.markWorkflowPhase(ctx, wfv1.WorkflowRunning, "")
}
@@ -2201,7 +2241,7 @@ func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string
}
woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
- _, err = woc.createWorkflowPod(ctx, nodeName, *tmpl.Container, tmpl, &createWorkflowPodOpts{
+ _, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*tmpl.Container}, tmpl, &createWorkflowPodOpts{
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
@@ -2217,9 +2257,9 @@ func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string
func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
node := woc.wf.Status.Nodes[nodeID]
switch node.Type {
- case wfv1.NodeTypePod, wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend:
+ case wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend:
return []string{node.ID}
- case wfv1.NodeTypeTaskGroup:
+ case wfv1.NodeTypeContainer, wfv1.NodeTypePod, wfv1.NodeTypeTaskGroup:
if len(node.Children) == 0 {
return []string{node.ID}
}
@@ -2236,13 +2276,7 @@ func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
}
outbound := make([]string, 0)
for _, outboundNodeID := range node.OutboundNodes {
- outNode := woc.wf.Status.Nodes[outboundNodeID]
- if outNode.Type == wfv1.NodeTypePod {
- outbound = append(outbound, outboundNodeID)
- } else {
- subOutIDs := woc.getOutboundNodes(outboundNodeID)
- outbound = append(outbound, subOutIDs...)
- }
+ outbound = append(outbound, woc.getOutboundNodes(outboundNodeID)...)
}
return outbound
}
@@ -2339,7 +2373,7 @@ func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, t
mainCtr := tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
- _, err = woc.createWorkflowPod(ctx, nodeName, mainCtr, tmpl, &createWorkflowPodOpts{
+ _, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{mainCtr}, tmpl, &createWorkflowPodOpts{
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
@@ -2488,7 +2522,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(tmpl *wfv1.Template, scop
resultsList = append(resultsList, item)
}
}
- if tmpl.GetType() == wfv1.TemplateTypeScript || tmpl.GetType() == wfv1.TemplateTypeContainer {
+ if tmpl.HasOutput() {
resultsJSON, err := json.Marshal(resultsList)
if err != nil {
return err
@@ -2631,7 +2665,7 @@ func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string,
mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
mainCtr.Command = []string{"argoexec", "resource", tmpl.Resource.Action}
- _, err := woc.createWorkflowPod(ctx, nodeName, *mainCtr, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline})
+ _, err := woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline})
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
@@ -2654,7 +2688,7 @@ func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, tem
mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
mainCtr.Command = []string{"argoexec", "data", string(dataTemplate)}
- _, err = woc.createWorkflowPod(ctx, nodeName, *mainCtr, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline, includeScriptOutput: true})
+ _, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline, includeScriptOutput: true})
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go
index 3d5449d8a839..8cb873e31b5a 100644
--- a/workflow/controller/operator_test.go
+++ b/workflow/controller/operator_test.go
@@ -298,16 +298,11 @@ func TestGlobalParams(t *testing.T) {
// TestSidecarWithVolume verifies ia sidecar can have a volumeMount reference to both existing or volumeClaimTemplate volumes
func TestSidecarWithVolume(t *testing.T) {
- cancel, controller := newController()
+ wf := unmarshalWF(sidecarWithVol)
+ cancel, controller := newController(wf)
defer cancel()
ctx := context.Background()
- wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
- wf := unmarshalWF(sidecarWithVol)
- wf, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
- assert.NoError(t, err)
- wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{})
- assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
@@ -5319,7 +5314,7 @@ func TestPodFailureWithContainerWaitingState(t *testing.T) {
var pod apiv1.Pod
testutil.MustUnmarshallYAML(podWithFailed, &pod)
assert.NotNil(t, pod)
- nodeStatus, msg := inferFailedReason(&pod)
+ nodeStatus, msg := newWoc().inferFailedReason(&pod)
assert.Equal(t, wfv1.NodeError, nodeStatus)
assert.Contains(t, msg, "Pod failed before")
}
@@ -5371,6 +5366,12 @@ status:
var podWithMainContainerOOM = `
apiVersion: v1
kind: Pod
+spec:
+ containers:
+ - name: main
+ env:
+ - name: ARGO_CONTAINER_NAME
+ value: main
status:
containerStatuses:
- containerID: containerd://3e8c564c13893914ec81a2c105188fa5d34748576b368e709dbc2e71cbf23c5b
@@ -5427,7 +5428,7 @@ func TestPodFailureWithContainerOOM(t *testing.T) {
for _, tt := range tests {
testutil.MustUnmarshallYAML(tt.podDetail, &pod)
assert.NotNil(t, pod)
- nodeStatus, msg := inferFailedReason(&pod)
+ nodeStatus, msg := newWoc().inferFailedReason(&pod)
assert.Equal(t, tt.phase, nodeStatus)
assert.Contains(t, msg, "OOMKilled")
}
diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go
index 673411f02938..f68c24e3da85 100644
--- a/workflow/controller/workflowpod.go
+++ b/workflow/controller/workflowpod.go
@@ -137,7 +137,7 @@ type createWorkflowPodOpts struct {
executionDeadline time.Time
}
-func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, opts *createWorkflowPodOpts) (*apiv1.Pod, error) {
+func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName string, mainCtrs []apiv1.Container, tmpl *wfv1.Template, opts *createWorkflowPodOpts) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)
// we must check to see if the pod exists rather than just optimistically creating the pod and see if we get
@@ -164,14 +164,19 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
tmpl = tmpl.DeepCopy()
wfSpec := woc.execWf.Spec.DeepCopy()
- mainCtr.Name = common.MainContainerName
- // Allow customization of main container resources.
- if isResourcesSpecified(woc.controller.Config.MainContainer) {
- mainCtr.Resources = *woc.controller.Config.MainContainer.Resources.DeepCopy()
- }
- // Container resources in workflow spec takes precedence over the main container's configuration in controller.
- if isResourcesSpecified(tmpl.Container) && tmpl.Container.Name == common.MainContainerName {
- mainCtr.Resources = *tmpl.Container.Resources.DeepCopy()
+ for i, c := range mainCtrs {
+ if c.Name == "" {
+ c.Name = common.MainContainerName
+ }
+ // Allow customization of main container resources.
+ if isResourcesSpecified(woc.controller.Config.MainContainer) {
+ c.Resources = *woc.controller.Config.MainContainer.Resources.DeepCopy()
+ }
+ // Container resources in workflow spec takes precedence over the main container's configuration in controller.
+ if isResourcesSpecified(tmpl.Container) && tmpl.Container.Name == common.MainContainerName {
+ c.Resources = *tmpl.Container.Resources.DeepCopy()
+ }
+ mainCtrs[i] = c
}
var activeDeadlineSeconds *int64
@@ -258,7 +263,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
// each container sequentially in the order that they appear in this list. For PNS we want the
// wait container to start before the main, so that it always has the chance to see the main
// container's PID and root filesystem.
- pod.Spec.Containers = append(pod.Spec.Containers, mainCtr)
+ pod.Spec.Containers = append(pod.Spec.Containers, mainCtrs...)
// Add init container only if it needs input artifacts. This is also true for
// script templates (which needs to populate the script)
@@ -747,7 +752,7 @@ func addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *w
// These are either specified in the workflow.spec.volumes or the workflow.spec.volumeClaimTemplate section
func addVolumeReferences(pod *apiv1.Pod, vols []apiv1.Volume, tmpl *wfv1.Template, pvcs []apiv1.Volume) error {
switch tmpl.GetType() {
- case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript, wfv1.TemplateTypeData:
+ case wfv1.TemplateTypeContainer, wfv1.TemplateTypeContainerSet, wfv1.TemplateTypeScript, wfv1.TemplateTypeData:
default:
return nil
}
@@ -798,17 +803,9 @@ func addVolumeReferences(pod *apiv1.Pod, vols []apiv1.Volume, tmpl *wfv1.Templat
return nil
}
- if tmpl.Container != nil {
- err := addVolumeRef(tmpl.Container.VolumeMounts)
- if err != nil {
- return err
- }
- }
- if tmpl.Script != nil {
- err := addVolumeRef(tmpl.Script.VolumeMounts)
- if err != nil {
- return err
- }
+ err := addVolumeRef(tmpl.GetVolumeMounts())
+ if err != nil {
+ return err
}
for _, container := range tmpl.InitContainers {
@@ -890,13 +887,7 @@ func (woc *wfOperationCtx) addInputArtifactsVolumes(pod *apiv1.Pod, tmpl *wfv1.T
// We also add the user supplied mount paths to the init container,
// in case the executor needs to load artifacts to this volume
// instead of the artifacts volume
- tmplVolumeMounts := []apiv1.VolumeMount{}
- if tmpl.Container != nil {
- tmplVolumeMounts = tmpl.Container.VolumeMounts
- } else if tmpl.Script != nil {
- tmplVolumeMounts = tmpl.Script.Container.VolumeMounts
- }
- for _, mnt := range tmplVolumeMounts {
+ for _, mnt := range tmpl.GetVolumeMounts() {
mnt.MountPath = filepath.Join(common.ExecutorMainFilesystemDir, mnt.MountPath)
initCtr.VolumeMounts = append(initCtr.VolumeMounts, mnt)
}
@@ -906,47 +897,37 @@ func (woc *wfOperationCtx) addInputArtifactsVolumes(pod *apiv1.Pod, tmpl *wfv1.T
}
}
- mainCtrIndex := -1
- for i, ctr := range pod.Spec.Containers {
- switch ctr.Name {
- case common.MainContainerName:
- mainCtrIndex = i
- }
- }
- if mainCtrIndex == -1 {
- panic("Could not find main container in pod spec")
- }
- mainCtr := &pod.Spec.Containers[mainCtrIndex]
-
- for _, art := range tmpl.Inputs.Artifacts {
- if art.Path == "" {
- return errors.Errorf(errors.CodeBadRequest, "inputs.artifacts.%s did not specify a path", art.Name)
- }
- if !art.HasLocationOrKey() && art.Optional {
- woc.log.Infof("skip volume mount of %s (%s): optional artifact was not provided",
- art.Name, art.Path)
- continue
- }
- overlap := common.FindOverlappingVolume(tmpl, art.Path)
- if overlap != nil {
- // artifact path overlaps with a mounted volume. do not mount the
- // artifacts emptydir to the main container. init would have copied
- // the artifact to the user's volume instead
- woc.log.Debugf("skip volume mount of %s (%s): overlaps with mount %s at %s",
- art.Name, art.Path, overlap.Name, overlap.MountPath)
+ for i, c := range pod.Spec.Containers {
+ if c.Name != common.MainContainerName {
continue
}
- volMount := apiv1.VolumeMount{
- Name: artVol.Name,
- MountPath: art.Path,
- SubPath: art.Name,
- }
- if mainCtr.VolumeMounts == nil {
- mainCtr.VolumeMounts = make([]apiv1.VolumeMount, 0)
+ for _, art := range tmpl.Inputs.Artifacts {
+ if art.Path == "" {
+ return errors.Errorf(errors.CodeBadRequest, "inputs.artifacts.%s did not specify a path", art.Name)
+ }
+ if !art.HasLocationOrKey() && art.Optional {
+ woc.log.Infof("skip volume mount of %s (%s): optional artifact was not provided",
+ art.Name, art.Path)
+ continue
+ }
+ overlap := common.FindOverlappingVolume(tmpl, art.Path)
+ if overlap != nil {
+ // artifact path overlaps with a mounted volume. do not mount the
+ // artifacts emptydir to the main container. init would have copied
+ // the artifact to the user's volume instead
+ woc.log.Debugf("skip volume mount of %s (%s): overlaps with mount %s at %s",
+ art.Name, art.Path, overlap.Name, overlap.MountPath)
+ continue
+ }
+ volMount := apiv1.VolumeMount{
+ Name: artVol.Name,
+ MountPath: art.Path,
+ SubPath: art.Name,
+ }
+ c.VolumeMounts = append(c.VolumeMounts, volMount)
}
- mainCtr.VolumeMounts = append(mainCtr.VolumeMounts, volMount)
+ pod.Spec.Containers[i] = c
}
- pod.Spec.Containers[mainCtrIndex] = *mainCtr
return nil
}
@@ -959,28 +940,30 @@ func addOutputArtifactsVolumes(pod *apiv1.Pod, tmpl *wfv1.Template) {
if tmpl.GetType() == wfv1.TemplateTypeResource || tmpl.GetType() == wfv1.TemplateTypeData {
return
}
- mainCtrIndex := -1
+
waitCtrIndex := -1
- var mainCtr *apiv1.Container
for i, ctr := range pod.Spec.Containers {
switch ctr.Name {
- case common.MainContainerName:
- mainCtrIndex = i
case common.WaitContainerName:
waitCtrIndex = i
}
}
- if mainCtrIndex == -1 || waitCtrIndex == -1 {
- panic("Could not find main or wait container in pod spec")
+ if waitCtrIndex == -1 {
+ log.Info("Could not find wait container in pod spec")
+ return
}
- mainCtr = &pod.Spec.Containers[mainCtrIndex]
waitCtr := &pod.Spec.Containers[waitCtrIndex]
- for _, mnt := range mainCtr.VolumeMounts {
- mnt.MountPath = filepath.Join(common.ExecutorMainFilesystemDir, mnt.MountPath)
- // ReadOnly is needed to be false for overlapping volume mounts
- mnt.ReadOnly = false
- waitCtr.VolumeMounts = append(waitCtr.VolumeMounts, mnt)
+ for _, c := range pod.Spec.Containers {
+ if c.Name != common.MainContainerName {
+ continue
+ }
+ for _, mnt := range c.VolumeMounts {
+ mnt.MountPath = filepath.Join(common.ExecutorMainFilesystemDir, mnt.MountPath)
+ // ReadOnly is needed to be false for overlapping volume mounts
+ mnt.ReadOnly = false
+ waitCtr.VolumeMounts = append(waitCtr.VolumeMounts, mnt)
+ }
}
pod.Spec.Containers[waitCtrIndex] = *waitCtr
}
@@ -1094,16 +1077,10 @@ func addScriptStagingVolume(pod *apiv1.Pod) {
// addInitContainers adds all init containers to the pod spec of the step
// Optionally volume mounts from the main container to the init containers
func addInitContainers(pod *apiv1.Pod, tmpl *wfv1.Template) {
- if len(tmpl.InitContainers) == 0 {
- return
- }
mainCtr := findMainContainer(pod)
- if mainCtr == nil {
- panic("Unable to locate main container")
- }
for _, ctr := range tmpl.InitContainers {
log.Debugf("Adding init container %s", ctr.Name)
- if ctr.MirrorVolumeMounts != nil && *ctr.MirrorVolumeMounts {
+ if mainCtr != nil && ctr.MirrorVolumeMounts != nil && *ctr.MirrorVolumeMounts {
mirrorVolumeMounts(mainCtr, &ctr.Container)
}
pod.Spec.InitContainers = append(pod.Spec.InitContainers, ctr.Container)
@@ -1113,16 +1090,10 @@ func addInitContainers(pod *apiv1.Pod, tmpl *wfv1.Template) {
// addSidecars adds all sidecars to the pod spec of the step.
// Optionally volume mounts from the main container to the sidecar
func addSidecars(pod *apiv1.Pod, tmpl *wfv1.Template) {
- if len(tmpl.Sidecars) == 0 {
- return
- }
mainCtr := findMainContainer(pod)
- if mainCtr == nil {
- panic("Unable to locate main container")
- }
for _, sidecar := range tmpl.Sidecars {
log.Debugf("Adding sidecar container %s", sidecar.Name)
- if sidecar.MirrorVolumeMounts != nil && *sidecar.MirrorVolumeMounts {
+ if mainCtr != nil && sidecar.MirrorVolumeMounts != nil && *sidecar.MirrorVolumeMounts {
mirrorVolumeMounts(mainCtr, &sidecar.Container)
}
pod.Spec.Containers = append(pod.Spec.Containers, sidecar.Container)
@@ -1256,15 +1227,12 @@ func createSecretVal(volMap map[string]apiv1.Volume, secret *apiv1.SecretKeySele
// findMainContainer finds main container
func findMainContainer(pod *apiv1.Pod) *apiv1.Container {
- var mainCtr *apiv1.Container
for _, ctr := range pod.Spec.Containers {
- if ctr.Name != common.MainContainerName {
- continue
+ if common.MainContainerName == ctr.Name {
+ return &ctr
}
- mainCtr = &ctr
- break
}
- return mainCtr
+ return nil
}
// mirrorVolumeMounts mirrors volumeMounts of source container to target container
diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go
index a30b916a200f..f69466d2cd61 100644
--- a/workflow/controller/workflowpod_test.go
+++ b/workflow/controller/workflowpod_test.go
@@ -139,7 +139,7 @@ func TestScriptTemplateWithoutVolumeOptionalArtifact(t *testing.T) {
mainCtr := tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
ctx := context.Background()
- pod, err := woc.createWorkflowPod(ctx, tmpl.Name, mainCtr, tmpl, &createWorkflowPodOpts{})
+ pod, err := woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{mainCtr}, tmpl, &createWorkflowPodOpts{})
assert.NoError(t, err)
// Note: pod.Spec.Containers[0] is wait
assert.Contains(t, pod.Spec.Containers[1].VolumeMounts, volumeMount)
@@ -154,7 +154,7 @@ func TestScriptTemplateWithoutVolumeOptionalArtifact(t *testing.T) {
woc = newWoc(*wf)
mainCtr = tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
- pod, err = woc.createWorkflowPod(ctx, tmpl.Name, mainCtr, tmpl, &createWorkflowPodOpts{includeScriptOutput: true})
+ pod, err = woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{mainCtr}, tmpl, &createWorkflowPodOpts{includeScriptOutput: true})
assert.NoError(t, err)
assert.NotContains(t, pod.Spec.Containers[1].VolumeMounts, volumeMount)
assert.Contains(t, pod.Spec.Containers[1].VolumeMounts, customVolumeMount)
@@ -566,20 +566,20 @@ func Test_createWorkflowPod_emissary(t *testing.T) {
t.Run("NoCommand", func(t *testing.T) {
woc := newWoc()
woc.controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary
- _, err := woc.createWorkflowPod(context.Background(), "", apiv1.Container{}, &wfv1.Template{}, &createWorkflowPodOpts{})
+ _, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{}}, &wfv1.Template{}, &createWorkflowPodOpts{})
assert.Error(t, err)
})
t.Run("CommandNoArgs", func(t *testing.T) {
woc := newWoc()
woc.controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary
- pod, err := woc.createWorkflowPod(context.Background(), "", apiv1.Container{Command: []string{"foo"}}, &wfv1.Template{}, &createWorkflowPodOpts{})
+ pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Command: []string{"foo"}}}, &wfv1.Template{}, &createWorkflowPodOpts{})
assert.NoError(t, err)
assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary", "--", "foo"}, pod.Spec.Containers[1].Command)
})
t.Run("NoCommandWithImageIndex", func(t *testing.T) {
woc := newWoc()
woc.controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary
- pod, err := woc.createWorkflowPod(context.Background(), "", apiv1.Container{Image: "my-image"}, &wfv1.Template{}, &createWorkflowPodOpts{})
+ pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Image: "my-image"}}, &wfv1.Template{}, &createWorkflowPodOpts{})
if assert.NoError(t, err) {
assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary", "--", "my-cmd"}, pod.Spec.Containers[1].Command)
assert.Equal(t, []string{"my-args"}, pod.Spec.Containers[1].Args)
@@ -588,7 +588,7 @@ func Test_createWorkflowPod_emissary(t *testing.T) {
t.Run("NoCommandWithArgsWithImageIndex", func(t *testing.T) {
woc := newWoc()
woc.controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary
- pod, err := woc.createWorkflowPod(context.Background(), "", apiv1.Container{Image: "my-image", Args: []string{"foo"}}, &wfv1.Template{}, &createWorkflowPodOpts{})
+ pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Image: "my-image", Args: []string{"foo"}}}, &wfv1.Template{}, &createWorkflowPodOpts{})
if assert.NoError(t, err) {
assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary", "--", "my-cmd"}, pod.Spec.Containers[1].Command)
assert.Equal(t, []string{"foo"}, pod.Spec.Containers[1].Args)
@@ -1189,19 +1189,19 @@ func TestPodSpecPatch(t *testing.T) {
ctx := context.Background()
woc := newWoc(*wf)
mainCtr := woc.execWf.Spec.Templates[0].Container
- pod, _ := woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
wf = unmarshalWF(helloWorldWfWithWFPatch)
woc = newWoc(*wf)
mainCtr = woc.execWf.Spec.Templates[0].Container
- pod, _ = woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
wf = unmarshalWF(helloWorldWfWithWFYAMLPatch)
woc = newWoc(*wf)
mainCtr = woc.execWf.Spec.Templates[0].Container
- pod, _ = woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
assert.Equal(t, "104857600", pod.Spec.Containers[1].Resources.Limits.Memory().AsDec().String())
@@ -1224,7 +1224,7 @@ func TestMainContainerCustomization(t *testing.T) {
ctx := context.Background()
woc.controller.Config.MainContainer = mainCtrSpec
mainCtr := woc.execWf.Spec.Templates[0].Container
- pod, _ := woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
// The main container's resources should be changed since the existing
@@ -1234,7 +1234,7 @@ func TestMainContainerCustomization(t *testing.T) {
woc.controller.Config.MainContainer = mainCtrSpec
mainCtr = woc.execWf.Spec.Templates[0].Container
mainCtr.Resources = apiv1.ResourceRequirements{Limits: apiv1.ResourceList{}}
- pod, _ = woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "0.200", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
// Workflow spec's main container takes precedence over config in controller
@@ -1250,7 +1250,7 @@ func TestMainContainerCustomization(t *testing.T) {
apiv1.ResourceMemory: resource.MustParse("512Mi"),
},
}
- pod, _ = woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "0.900", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
// If the name of the container in the workflow spec is not "main",
@@ -1271,7 +1271,7 @@ func TestMainContainerCustomization(t *testing.T) {
apiv1.ResourceMemory: resource.MustParse("512Mi"),
},
}
- pod, _ = woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "0.100", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
}
@@ -1331,7 +1331,7 @@ func TestHybridWfVolumesWindows(t *testing.T) {
ctx := context.Background()
mainCtr := woc.execWf.Spec.Templates[0].Container
- pod, _ := woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "\\\\.\\pipe\\docker_engine", pod.Spec.Containers[0].VolumeMounts[1].MountPath)
assert.Equal(t, false, pod.Spec.Containers[0].VolumeMounts[1].ReadOnly)
assert.Equal(t, (*apiv1.HostPathType)(nil), pod.Spec.Volumes[1].HostPath.Type)
@@ -1343,7 +1343,7 @@ func TestHybridWfVolumesLinux(t *testing.T) {
ctx := context.Background()
mainCtr := woc.execWf.Spec.Templates[0].Container
- pod, _ := woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "/var/run/docker.sock", pod.Spec.Containers[0].VolumeMounts[1].MountPath)
assert.Equal(t, true, pod.Spec.Containers[0].VolumeMounts[1].ReadOnly)
assert.Equal(t, &hostPathSocket, pod.Spec.Volumes[1].HostPath.Type)
@@ -1370,7 +1370,7 @@ func TestPropagateMaxDuration(t *testing.T) {
woc := newWoc()
deadline := time.Now()
ctx := context.Background()
- pod, err := woc.createWorkflowPod(ctx, tmpl.Name, *tmpl.Container, tmpl, &createWorkflowPodOpts{executionDeadline: deadline})
+ pod, err := woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{*tmpl.Container}, tmpl, &createWorkflowPodOpts{executionDeadline: deadline})
assert.NoError(t, err)
out, err := json.Marshal(map[string]time.Time{"deadline": deadline})
if assert.NoError(t, err) {
@@ -1430,14 +1430,14 @@ func TestPodMetadata(t *testing.T) {
ctx := context.Background()
woc := newWoc(*wf)
mainCtr := woc.execWf.Spec.Templates[0].Container
- pod, _ := woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "foo", pod.ObjectMeta.Annotations["workflow-level-pod-annotation"])
assert.Equal(t, "bar", pod.ObjectMeta.Labels["workflow-level-pod-label"])
wf = unmarshalWF(wfWithPodMetadataAndTemplateMetadata)
woc = newWoc(*wf)
mainCtr = woc.execWf.Spec.Templates[0].Container
- pod, _ = woc.createWorkflowPod(ctx, wf.Name, *mainCtr, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+ pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.Equal(t, "fizz", pod.ObjectMeta.Annotations["workflow-level-pod-annotation"])
assert.Equal(t, "buzz", pod.ObjectMeta.Labels["workflow-level-pod-label"])
assert.Equal(t, "hello", pod.ObjectMeta.Annotations["template-level-pod-annotation"])
diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go
index c5d1b488f890..1ffc53efe717 100644
--- a/workflow/executor/executor.go
+++ b/workflow/executor/executor.go
@@ -137,7 +137,6 @@ func (we *WorkflowExecutor) HandleError(ctx context.Context) {
// LoadArtifacts loads artifacts from location to a container path
func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
log.Infof("Start loading input artifacts...")
-
for _, art := range we.Template.Inputs.Artifacts {
log.Infof("Downloading artifact: %s", art.Name)
@@ -523,7 +522,7 @@ func (we *WorkflowExecutor) SaveParameters(ctx context.Context) error {
// SaveLogs saves logs
func (we *WorkflowExecutor) SaveLogs(ctx context.Context) (*wfv1.Artifact, error) {
- if !we.Template.ArchiveLocation.IsArchiveLogs() {
+ if !we.Template.ArchiveLocation.IsArchiveLogs() || !we.Template.HasLogs() {
return nil, nil
}
log.Infof("Saving logs")
@@ -677,8 +676,8 @@ func (we *WorkflowExecutor) CaptureScriptResult(ctx context.Context) error {
log.Infof("No Script output reference in workflow. Capturing script output ignored")
return nil
}
- if we.Template.Script == nil && we.Template.Container == nil {
- log.Infof("Template type is neither of Script or Container. Capturing script output ignored")
+ if !we.Template.HasOutput() {
+ log.Infof("Template type is neither of Script, Container, or Pod. Capturing script output ignored")
return nil
}
log.Infof("Capturing script output")
@@ -711,8 +710,8 @@ func (we *WorkflowExecutor) CaptureScriptResult(ctx context.Context) error {
// CaptureScriptExitCode will add the exit code of a script template as output exit code
func (we *WorkflowExecutor) CaptureScriptExitCode(ctx context.Context) error {
- if we.Template.Script == nil && we.Template.Container == nil {
- log.Infof("Template type is neither of Script or Container. Capturing exit code ignored")
+ if !we.Template.HasOutput() {
+ log.Infof("Template type is neither of Script, Container, or Pod. Capturing exit code ignored")
return nil
}
log.Infof("Capturing script exit code")
@@ -923,7 +922,7 @@ func chmod(artPath string, mode int32, recurse bool) error {
// Also monitors for updates in the pod annotations which may change (e.g. terminate)
// Upon completion, kills any sidecars after it finishes.
func (we *WorkflowExecutor) Wait(ctx context.Context) error {
- containerNames := []string{common.MainContainerName}
+ containerNames := we.Template.GetMainContainerNames()
annotationUpdatesCh := we.monitorAnnotations(ctx)
go we.monitorDeadline(ctx, containerNames, annotationUpdatesCh)
err := waitutil.Backoff(ExecutorRetry, func() (bool, error) {
@@ -1054,6 +1053,7 @@ func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, containerNames
if we.ExecutionControl != nil && we.ExecutionControl.Deadline != nil {
if time.Now().UTC().After(*we.ExecutionControl.Deadline) {
var message string
+
// Zero value of the deadline indicates an intentional cancel vs. a timeout. We treat
// timeouts as a failure and the pod should be annotated with that error
if we.ExecutionControl.Deadline.IsZero() {
diff --git a/workflow/util/util.go b/workflow/util/util.go
index 7d231ddb3f46..20fc0c24d22b 100644
--- a/workflow/util/util.go
+++ b/workflow/util/util.go
@@ -1044,7 +1044,7 @@ func GetNodeType(tmpl *wfv1.Template) wfv1.NodeType {
return wfv1.NodeTypeRetry
}
switch tmpl.GetType() {
- case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript, wfv1.TemplateTypeResource, wfv1.TemplateTypeData:
+ case wfv1.TemplateTypeContainer, wfv1.TemplateTypeContainerSet, wfv1.TemplateTypeScript, wfv1.TemplateTypeResource, wfv1.TemplateTypeData:
return wfv1.NodeTypePod
case wfv1.TemplateTypeDAG:
return wfv1.NodeTypeDAG
diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go
index ff269298402d..cc218afd9789 100644
--- a/workflow/validate/validate.go
+++ b/workflow/validate/validate.go
@@ -434,18 +434,18 @@ func (ctx *templateValidationCtx) validateTemplateHolder(tmplHolder wfv1.Templat
// validateTemplateType validates that only one template type is defined
func validateTemplateType(tmpl *wfv1.Template) error {
numTypes := 0
- for _, tmplType := range []interface{}{tmpl.Container, tmpl.Steps, tmpl.Script, tmpl.Resource, tmpl.DAG, tmpl.Suspend, tmpl.Data} {
+ for _, tmplType := range []interface{}{tmpl.Container, tmpl.ContainerSet, tmpl.Steps, tmpl.Script, tmpl.Resource, tmpl.DAG, tmpl.Suspend, tmpl.Data} {
if !reflect.ValueOf(tmplType).IsNil() {
numTypes++
}
}
switch numTypes {
case 0:
- return errors.Errorf(errors.CodeBadRequest, "templates.%s template type unspecified. choose one of: container, steps, script, resource, dag, suspend, template, template ref", tmpl.Name)
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s template type unspecified. choose one of: container, containerSet, steps, script, resource, dag, suspend, template, template ref", tmpl.Name)
case 1:
// Do nothing
default:
- return errors.Errorf(errors.CodeBadRequest, "templates.%s multiple template types specified. choose one of: container, steps, script, resource, dag, suspend, template, template ref", tmpl.Name)
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s multiple template types specified. choose one of: container, containerSet, steps, script, resource, dag, suspend, template, template ref", tmpl.Name)
}
return nil
}
@@ -800,7 +800,7 @@ func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix
if tmpl.Daemon != nil && *tmpl.Daemon {
scope[fmt.Sprintf("%s.ip", prefix)] = true
}
- if tmpl.Script != nil || tmpl.Container != nil || tmpl.Data != nil {
+ if tmpl.HasOutput() {
scope[fmt.Sprintf("%s.outputs.result", prefix)] = true
scope[fmt.Sprintf("%s.exitCode", prefix)] = true
}
@@ -834,7 +834,7 @@ func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix
switch tmpl.GetType() {
// Not that we don't also include TemplateTypeContainer here, even though it uses `outputs.result` it uses
// `outputs.parameters` as its aggregator.
- case wfv1.TemplateTypeScript:
+ case wfv1.TemplateTypeScript, wfv1.TemplateTypeContainerSet:
scope[fmt.Sprintf("%s.outputs.result", prefix)] = true
scope[fmt.Sprintf("%s.exitCode", prefix)] = true
default:
@@ -891,7 +891,7 @@ func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
if param.ValueFrom != nil {
tmplType := tmpl.GetType()
switch tmplType {
- case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript:
+ case wfv1.TemplateTypeContainer, wfv1.TemplateTypeContainerSet, wfv1.TemplateTypeScript:
if param.ValueFrom.Path == "" {
return errors.Errorf(errors.CodeBadRequest, "%s.path must be specified for %s templates", paramRef, tmplType)
}
@@ -928,18 +928,9 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template)
for _, out := range tmpl.Outputs.Artifacts {
if common.FindOverlappingVolume(tmpl, out.Path) == nil {
// output is in the base image layer. need to verify there are no volume mounts under it
- if tmpl.Container != nil {
- for _, volMnt := range tmpl.Container.VolumeMounts {
- if strings.HasPrefix(volMnt.MountPath, out.Path+"/") {
- return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts.%s: %s", tmpl.Name, out.Name, errMsg)
- }
- }
- }
- if tmpl.Script != nil {
- for _, volMnt := range tmpl.Script.VolumeMounts {
- if strings.HasPrefix(volMnt.MountPath, out.Path+"/") {
- return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts.%s: %s", tmpl.Name, out.Name, errMsg)
- }
+ for _, volMnt := range tmpl.GetVolumeMounts() {
+ if strings.HasPrefix(volMnt.MountPath, out.Path+"/") {
+ return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts.%s: %s", tmpl.Name, out.Name, errMsg)
}
}
}