Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Introduce WorkflowPhase #4856

Merged
merged 12 commits into from
Jan 21, 2021
2 changes: 1 addition & 1 deletion cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func printWorkflowHelper(wf *wfv1.Workflow, getArgs getFlags) string {
if !wf.Status.StartedAt.IsZero() {
out += fmt.Sprintf(fmtStr, "Duration:", humanize.RelativeDuration(wf.Status.StartedAt.Time, wf.Status.FinishedAt.Time))
}
if wf.Status.Phase == wfv1.NodeRunning {
if wf.Status.Phase == wfv1.WorkflowRunning {
if wf.Status.EstimatedDuration > 0 {
out += fmt.Sprintf(fmtStr, "EstimatedDuration:", humanize.Duration(wf.Status.EstimatedDuration.ToDuration()))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/argo/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func waitOnOne(serviceClient workflowpkg.WorkflowServiceClient, ctx context.Cont
if !quiet {
fmt.Printf("%s %s at %v\n", wfName, wf.Status.Phase, wf.Status.FinishedAt)
}
if wf.Status.Phase == wfv1.NodeFailed || wf.Status.Phase == wfv1.NodeError {
if wf.Status.Phase == wfv1.WorkflowFailed || wf.Status.Phase == wfv1.WorkflowError {
return false
}
return true
Expand Down
16 changes: 8 additions & 8 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ const archiveTableName = "argo_archived_workflows"
const archiveLabelsTableName = archiveTableName + "_labels"

type archivedWorkflowMetadata struct {
ClusterName string `db:"clustername"`
InstanceID string `db:"instanceid"`
UID string `db:"uid"`
Name string `db:"name"`
Namespace string `db:"namespace"`
Phase wfv1.NodePhase `db:"phase"`
StartedAt time.Time `db:"startedat"`
FinishedAt time.Time `db:"finishedat"`
ClusterName string `db:"clustername"`
InstanceID string `db:"instanceid"`
UID string `db:"uid"`
Name string `db:"name"`
Namespace string `db:"namespace"`
Phase wfv1.WorkflowPhase `db:"phase"`
StartedAt time.Time `db:"startedat"`
FinishedAt time.Time `db:"finishedat"`
}

type archivedWorkflowRecord struct {
Expand Down
949 changes: 475 additions & 474 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_phase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package v1alpha1

// the workflow's phase
type WorkflowPhase string

const (
WorkflowUnknown WorkflowPhase = ""
WorkflowPending WorkflowPhase = "Pending" // pending some set-up - rarely used
WorkflowRunning WorkflowPhase = "Running" // any node has started; pods might not be running yet, the workflow maybe suspended too
WorkflowSucceeded WorkflowPhase = "Succeeded"
WorkflowFailed WorkflowPhase = "Failed" // it maybe that the the workflow was terminated
WorkflowError WorkflowPhase = "Error"
)

func (p WorkflowPhase) Completed() bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be fulfilled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure either fulfilled or complete should apply to the workflows phase at all. The source of truth is the label. Phase and conditions are just copies of this information.

switch p {
case WorkflowSucceeded, WorkflowFailed, WorkflowError:
return true
default:
return false
}
}
16 changes: 16 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_phase_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestWorkflowPhase_Completed(t *testing.T) {
assert.False(t, WorkflowUnknown.Completed())
assert.False(t, WorkflowPending.Completed())
assert.False(t, WorkflowRunning.Completed())
assert.True(t, WorkflowSucceeded.Completed())
assert.True(t, WorkflowFailed.Completed())
assert.True(t, WorkflowError.Completed())
}
14 changes: 7 additions & 7 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"strings"
"time"

"github.com/argoproj/argo/util/slice"

apiv1 "k8s.io/api/core/v1"
policyv1beta "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/argoproj/argo/util/slice"
)

// TemplateType is the type of a template
Expand Down Expand Up @@ -1291,7 +1291,7 @@ type UserContainer struct {
// WorkflowStatus contains overall status information about a workflow
type WorkflowStatus struct {
// Phase a simple, high-level summary of where the workflow is in its lifecycle.
Phase NodePhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=NodePhase"`
Phase WorkflowPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=WorkflowPhase"`

// Time at which this workflow started
StartedAt metav1.Time `json:"startedAt,omitempty" protobuf:"bytes,2,opt,name=startedAt"`
Expand Down Expand Up @@ -1646,19 +1646,19 @@ func (phase NodePhase) FailedOrError() bool {
return phase == NodeFailed || phase == NodeError
}

// Fulfilled returns whether or not the workflow has fulfilled its execution, i.e. it completed execution or was skipped
// Fulfilled returns whether or not the workflow has fulfilled its execution
func (ws WorkflowStatus) Fulfilled() bool {
return ws.Phase.Fulfilled()
return ws.Phase.Completed()
}

// Successful return whether or not the workflow has succeeded
func (ws WorkflowStatus) Successful() bool {
return ws.Phase == NodeSucceeded
return ws.Phase == WorkflowSucceeded
}

// Failed return whether or not the workflow has failed
func (ws WorkflowStatus) Failed() bool {
return ws.Phase == NodeFailed
return ws.Phase == WorkflowFailed
}

func (ws WorkflowStatus) StartTime() *metav1.Time {
Expand Down
6 changes: 3 additions & 3 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (t testWatchWorkflowServer) Send(*workflowpkg.WorkflowWatchEvent) error {
func TestWatchWorkflows(t *testing.T) {
server, ctx := getWorkflowServer()
wf := &v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeSucceeded},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.WorkflowSucceeded},
}
assert.NoError(t, json.Unmarshal([]byte(wf1), &wf))
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -645,7 +645,7 @@ func TestWatchWorkflows(t *testing.T) {
func TestWatchLatestWorkflow(t *testing.T) {
server, ctx := getWorkflowServer()
wf := &v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeSucceeded},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.WorkflowSucceeded},
}
assert.NoError(t, json.Unmarshal([]byte(wf1), &wf))
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -818,7 +818,7 @@ func TestStopWorkflow(t *testing.T) {
wf, err = server.StopWorkflow(ctx, &rsmWfReq)
if assert.NoError(t, err) {
assert.NotNil(t, wf)
assert.Equal(t, v1alpha1.NodeRunning, wf.Status.Phase)
assert.Equal(t, v1alpha1.WorkflowRunning, wf.Status.Phase)
}
}

Expand Down
18 changes: 9 additions & 9 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (s *CLISuite) TestRoot() {
WaitForWorkflow(createdWorkflowName).
Then().
ExpectWorkflowName(createdWorkflowName, func(t *testing.T, metadata *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
})
})
}
Expand All @@ -449,7 +449,7 @@ func (s *CLISuite) TestWorkflowSuspendResume() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
})
}

Expand All @@ -476,11 +476,11 @@ func (s *CLISuite) TestNodeSuspendResume() {
}
}).
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) bool {
return wf.Status.Phase == wfv1.NodeFailed
return wf.Status.Phase == wfv1.WorkflowFailed
}), "suspended node").
Then().
ExpectWorkflow(func(t *testing.T, _ *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
if assert.Equal(t, wfv1.NodeFailed, status.Phase) {
if assert.Equal(t, wfv1.WorkflowFailed, status.Phase) {
r := regexp.MustCompile(`child '(node-suspend-[0-9]+)' failed`)
res := r.FindStringSubmatch(status.Message)
assert.Equal(t, len(res), 2)
Expand Down Expand Up @@ -762,7 +762,7 @@ func (s *CLISuite) TestWorkflowRetry() {
}).
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) bool {
retryTime = wf.Status.FinishedAt
return wf.Status.Phase == wfv1.NodeFailed
return wf.Status.Phase == wfv1.WorkflowFailed
}), "is terminated", 20*time.Second).
Wait(3*time.Second).
RunCli([]string{"retry", "retry-test", "--restart-successful", "--node-field-selector", "templateName==steps-inner"}, func(t *testing.T, output string, err error) {
Expand Down Expand Up @@ -1184,13 +1184,13 @@ func (s *CLISuite) TestWorkflowLevelSemaphore() {
}).
SubmitWorkflow().
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) bool {
return wf.Status.Phase == ""
return wf.Status.Phase == wfv1.WorkflowUnknown
}), "Workflow is waiting for lock").
WaitForWorkflow().
DeleteConfigMap("my-config").
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
})
}

Expand All @@ -1206,7 +1206,7 @@ func (s *CLISuite) TestTemplateLevelSemaphore() {
CreateConfigMap("my-config", semaphoreData).
SubmitWorkflow().
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) bool {
return wf.Status.Phase == wfv1.NodeRunning
return wf.Status.Phase == wfv1.WorkflowRunning
}), "waiting for Workflow to run", 10*time.Second).
RunCli([]string{"get", "semaphore-tmpl-level"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Waiting for")
Expand Down Expand Up @@ -1446,7 +1446,7 @@ spec:
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("release")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, "Hello, World!", nodeStatus.Inputs.Parameters[0].Value.String())
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/cluster_workflow_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *ClusterWorkflowTemplateSuite) TestSubmitClusterWorkflowTemplate() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.NodeSucceeded)
assert.Equal(t, status.Phase, v1alpha1.WorkflowSucceeded)
})
}

Expand Down Expand Up @@ -68,7 +68,7 @@ spec:
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
assert.Equal(t, v1alpha1.WorkflowSucceeded, status.Phase)
})

}
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/estimated_duration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *EstimatedDurationSuite) TestWorkflowTemplate() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.NotEmpty(t, status.EstimatedDuration)
assert.NotEmpty(t, status.Nodes[metadata.Name].EstimatedDuration)
})
Expand All @@ -45,7 +45,7 @@ func (s *EstimatedDurationSuite) TestClusterWorkflowTemplate() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.NotEmpty(t, status.EstimatedDuration)
assert.NotEmpty(t, status.Nodes[metadata.Name].EstimatedDuration)
})
Expand All @@ -62,7 +62,7 @@ func (s *EstimatedDurationSuite) TestCronWorkflow() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.NotEmpty(t, status.EstimatedDuration)
assert.NotEmpty(t, status.Nodes[metadata.Name].EstimatedDuration)
})
Expand Down
Loading