From 29f239c7bb13b036b8ab20d4b7defeb17a1f0eb7 Mon Sep 17 00:00:00 2001 From: IronPan Date: Sat, 10 Aug 2019 12:18:18 -0700 Subject: [PATCH 01/11] garbage collect the completed workflow --- .../apiserver/resource/resource_manager.go | 94 +++++++++++-------- backend/src/common/util/workflow.go | 34 ++++--- 2 files changed, 75 insertions(+), 53 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 69b386cdc28..1dc0ffd7232 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -501,50 +501,64 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error if jobId == "" { // If a run doesn't have owner UID, it's a one-time run created by Pipeline API server. // In this case the DB entry should already been created when argo workflow CRD is created. - return r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) - } - - // Get the experiment resource reference for job. - experimentRef, err := r.resourceReferenceStore.GetResourceReference(jobId, common.Job, common.Experiment) - if err != nil { - return util.Wrap(err, "Failed to retrieve the experiment ID for the job that created the run.") - } - runDetail := &model.RunDetail{ - Run: model.Run{ - UUID: runId, - DisplayName: workflow.Name, - Name: workflow.Name, - StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), - Namespace: workflow.Namespace, - CreatedAtInSec: workflow.CreationTimestamp.Unix(), - ScheduledAtInSec: workflow.ScheduledAtInSecOr0(), - FinishedAtInSec: workflow.FinishedAt(), - Conditions: workflow.Condition(), - PipelineSpec: model.PipelineSpec{ - WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), - }, - ResourceReferences: []*model.ResourceReference{ - { - ResourceUUID: runId, - ResourceType: common.Run, - ReferenceUUID: jobId, - ReferenceType: common.Job, - Relationship: common.Creator, + err := r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) + if err != nil { + return util.Wrap(err, "Failed to update the run.") + } + } else { + // Get the experiment resource reference for job. + experimentRef, err := r.resourceReferenceStore.GetResourceReference(jobId, common.Job, common.Experiment) + if err != nil { + return util.Wrap(err, "Failed to retrieve the experiment ID for the job that created the run.") + } + runDetail := &model.RunDetail{ + Run: model.Run{ + UUID: runId, + DisplayName: workflow.Name, + Name: workflow.Name, + StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), + Namespace: workflow.Namespace, + CreatedAtInSec: workflow.CreationTimestamp.Unix(), + ScheduledAtInSec: workflow.ScheduledAtInSecOr0(), + FinishedAtInSec: workflow.FinishedAt(), + Conditions: workflow.Condition(), + PipelineSpec: model.PipelineSpec{ + WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), }, - { - ResourceUUID: runId, - ResourceType: common.Run, - ReferenceUUID: experimentRef.ReferenceUUID, - ReferenceType: common.Experiment, - Relationship: common.Owner, + ResourceReferences: []*model.ResourceReference{ + { + ResourceUUID: runId, + ResourceType: common.Run, + ReferenceUUID: jobId, + ReferenceType: common.Job, + Relationship: common.Creator, + }, + { + ResourceUUID: runId, + ResourceType: common.Run, + ReferenceUUID: experimentRef.ReferenceUUID, + ReferenceType: common.Experiment, + Relationship: common.Owner, + }, }, }, - }, - PipelineRuntime: model.PipelineRuntime{ - WorkflowRuntimeManifest: workflow.ToStringForStore(), - }, + PipelineRuntime: model.PipelineRuntime{ + WorkflowRuntimeManifest: workflow.ToStringForStore(), + }, + } + err = r.runStore.CreateOrUpdateRun(runDetail) + if err != nil { + return util.Wrap(err, "Failed to create or update runs.") + } + } + + if workflow.IsInFinalState() { + err := r.workflowClient.Delete(workflow.Name, &v1.DeleteOptions{}) + if err != nil { + return util.NewInternalServerError(err, "Failed to delete the completed workflow for run %s", runId) + } } - return r.runStore.CreateOrUpdateRun(runDetail) + return nil } func (r *ResourceManager) ReportScheduledWorkflowResource(swf *util.ScheduledWorkflow) error { diff --git a/backend/src/common/util/workflow.go b/backend/src/common/util/workflow.go index 7edb048adf9..53b7cce870b 100644 --- a/backend/src/common/util/workflow.go +++ b/backend/src/common/util/workflow.go @@ -210,21 +210,21 @@ func (w *Workflow) ReplaceUID(id string) error { return NewInternalServerError(err, "Failed to unmarshal workflow spec manifest. Workflow: %s", w.ToStringForStore()) } - w.Workflow = workflow - return nil - } + w.Workflow = workflow + return nil +} - func (w *Workflow) SetCannonicalLabels(name string, nextScheduledEpoch int64, index int64) { - w.SetLabels(LabelKeyWorkflowScheduledWorkflowName, name) - w.SetLabels(LabelKeyWorkflowEpoch, FormatInt64ForLabel(nextScheduledEpoch)) - w.SetLabels(LabelKeyWorkflowIndex, FormatInt64ForLabel(index)) - w.SetLabels(LabelKeyWorkflowIsOwnedByScheduledWorkflow, "true") - } +func (w *Workflow) SetCannonicalLabels(name string, nextScheduledEpoch int64, index int64) { + w.SetLabels(LabelKeyWorkflowScheduledWorkflowName, name) + w.SetLabels(LabelKeyWorkflowEpoch, FormatInt64ForLabel(nextScheduledEpoch)) + w.SetLabels(LabelKeyWorkflowIndex, FormatInt64ForLabel(index)) + w.SetLabels(LabelKeyWorkflowIsOwnedByScheduledWorkflow, "true") +} - // FindObjectStoreArtifactKeyOrEmpty loops through all node running statuses and look up the first - // S3 artifact with the specified nodeID and artifactName. Returns empty if nothing is found. - func (w *Workflow) FindObjectStoreArtifactKeyOrEmpty(nodeID string, artifactName string) string { - if w.Status.Nodes == nil { +// FindObjectStoreArtifactKeyOrEmpty loops through all node running statuses and look up the first +// S3 artifact with the specified nodeID and artifactName. Returns empty if nothing is found. +func (w *Workflow) FindObjectStoreArtifactKeyOrEmpty(nodeID string, artifactName string) string { + if w.Status.Nodes == nil { return "" } node, found := w.Status.Nodes[nodeID] @@ -243,3 +243,11 @@ func (w *Workflow) ReplaceUID(id string) error { } return s3Key } + +// IsInFinalState whether the workflow is in a final state. +func (w *Workflow) IsInFinalState() bool { + if w.Status.Phase == workflowapi.NodeSucceeded || w.Status.Phase == workflowapi.NodeFailed { + return true + } + return false +} From 7aedf53a2a085025045e2435c6d9f7fb9d9ba549 Mon Sep 17 00:00:00 2001 From: IronPan Date: Sat, 10 Aug 2019 12:32:21 -0700 Subject: [PATCH 02/11] add tests --- .../apiserver/resource/resource_manager.go | 4 +- .../resource/resource_manager_test.go | 55 +++++++++++++++++++ .../src/apiserver/resource/workflow_fake.go | 6 +- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 1dc0ffd7232..aff930119aa 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -499,7 +499,7 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error jobId := workflow.ScheduledWorkflowUUIDAsStringOrEmpty() if jobId == "" { - // If a run doesn't have owner UID, it's a one-time run created by Pipeline API server. + // If a run doesn't have job ID, it's a one-time run created by Pipeline API server. // In this case the DB entry should already been created when argo workflow CRD is created. err := r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) if err != nil { @@ -548,7 +548,7 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error } err = r.runStore.CreateOrUpdateRun(runDetail) if err != nil { - return util.Wrap(err, "Failed to create or update runs.") + return util.Wrap(err, "Failed to create or update the run.") } } diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index e2ee5e40dc6..50ce1d298bf 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -1063,6 +1063,61 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success assert.Equal(t, expectedRunDetail, runDetail) } +func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) { + store, manager, run := initWithOneTimeRun(t) + defer store.Close() + // report workflow + workflow := util.NewWorkflow(&v1alpha1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + UID: types.UID(run.UUID), + }, + Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, + }) + err := manager.ReportWorkflowResource(workflow) + assert.Nil(t, err) + runDetail, err := manager.GetRun(run.UUID) + assert.Nil(t, err) + expectedRun := model.Run{ + UUID: "123e4567-e89b-12d3-a456-426655440000", + DisplayName: "run1", + Name: "workflow-name", + StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), + CreatedAtInSec: 2, + Conditions: "Failed", + PipelineSpec: model.PipelineSpec{ + WorkflowSpecManifest: testWorkflow.ToStringForStore(), + Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", + }, + ResourceReferences: []*model.ResourceReference{ + { + ResourceUUID: "123e4567-e89b-12d3-a456-426655440000", + ResourceType: common.Run, + ReferenceUUID: DefaultFakeUUID, + ReferenceType: common.Experiment, + Relationship: common.Owner, + }, + }, + } + assert.Equal(t, expectedRun, runDetail.Run) +} + +func TestReportWorkflowResource_WorkflowCompleted_DeleteWorkflowFailed(t *testing.T) { + store, manager, run := initWithOneTimeRun(t) + manager.workflowClient = &FakeBadWorkflowClient{} + defer store.Close() + // report workflow + workflow := util.NewWorkflow(&v1alpha1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + UID: types.UID(run.UUID), + }, + Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, + }) + err := manager.ReportWorkflowResource(workflow) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "failed to delete workflow") +} + + func TestReportScheduledWorkflowResource_Success(t *testing.T) { store, manager, job := initWithJob(t) defer store.Close() diff --git a/backend/src/apiserver/resource/workflow_fake.go b/backend/src/apiserver/resource/workflow_fake.go index 3c47b1f5ee8..988199ff792 100644 --- a/backend/src/apiserver/resource/workflow_fake.go +++ b/backend/src/apiserver/resource/workflow_fake.go @@ -83,7 +83,6 @@ func (c *FakeWorkflowClient) Update(workflow *v1alpha1.Workflow) (*v1alpha1.Work } func (c *FakeWorkflowClient) Delete(name string, options *v1.DeleteOptions) error { - glog.Error("This fake method is not yet implemented.") return nil } @@ -142,6 +141,11 @@ func (FakeBadWorkflowClient) Create(*v1alpha1.Workflow) (*v1alpha1.Workflow, err func (FakeBadWorkflowClient) Get(name string, options v1.GetOptions) (*v1alpha1.Workflow, error) { return nil, errors.New("some error") } + func (c *FakeBadWorkflowClient) Update(workflow *v1alpha1.Workflow) (*v1alpha1.Workflow, error) { return nil, errors.New("failed to update workflow") } + +func (c *FakeBadWorkflowClient) Delete(name string, options *v1.DeleteOptions) error { + return errors.New("failed to delete workflow") +} \ No newline at end of file From 312677c89508a405866d578e5683d8081b2d19ef Mon Sep 17 00:00:00 2001 From: IronPan Date: Sat, 10 Aug 2019 12:59:19 -0700 Subject: [PATCH 03/11] fix tests --- backend/src/apiserver/resource/resource_manager.go | 2 +- backend/src/apiserver/resource/resource_manager_test.go | 3 +++ backend/src/apiserver/server/report_server_test.go | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index aff930119aa..246abfff9df 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -493,7 +493,7 @@ func (r *ResourceManager) DeleteJob(jobID string) error { func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error { if _, ok := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId]; !ok { // Skip reporting if the workflow doesn't have the run id label - return nil + return util.NewInvalidInputError("Workflow missing the Run ID label") } runId := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId] jobId := workflow.ScheduledWorkflowUUIDAsStringOrEmpty() diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 50ce1d298bf..d76ce6a732a 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -899,6 +899,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) { workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeRunning}, }) @@ -1070,6 +1071,7 @@ func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) { workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, }) @@ -1109,6 +1111,7 @@ func TestReportWorkflowResource_WorkflowCompleted_DeleteWorkflowFailed(t *testin workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, }) diff --git a/backend/src/apiserver/server/report_server_test.go b/backend/src/apiserver/server/report_server_test.go index ae5029e4195..2faa6b80867 100644 --- a/backend/src/apiserver/server/report_server_test.go +++ b/backend/src/apiserver/server/report_server_test.go @@ -25,6 +25,7 @@ func TestReportWorkflow(t *testing.T) { Name: "run1", Namespace: "default", UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Spec: v1alpha1.WorkflowSpec{ Arguments: v1alpha1.Arguments{ From 6315152e89bb19925cfeb6666557687521038a3d Mon Sep 17 00:00:00 2001 From: IronPan Date: Sat, 10 Aug 2019 13:01:55 -0700 Subject: [PATCH 04/11] fix tests --- backend/src/apiserver/resource/resource_manager_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index d76ce6a732a..a37765c3450 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -1336,6 +1336,7 @@ func TestReadArtifact_WorkflowNoStatus_NotFound(t *testing.T) { Name: "MY_NAME", Namespace: "MY_NAMESPACE", UID: "run-1", + Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()), OwnerReferences: []v1.OwnerReference{{ APIVersion: "kubeflow.org/v1beta1", From 05661b49ec47fc46b645d6100a18fa78ab51f20f Mon Sep 17 00:00:00 2001 From: IronPan Date: Sun, 11 Aug 2019 00:43:25 -0700 Subject: [PATCH 05/11] update e2e test --- test/frontend-integration-test/helloworld.spec.js | 9 --------- 1 file changed, 9 deletions(-) diff --git a/test/frontend-integration-test/helloworld.spec.js b/test/frontend-integration-test/helloworld.spec.js index 238e48defcc..939c30eb80a 100644 --- a/test/frontend-integration-test/helloworld.spec.js +++ b/test/frontend-integration-test/helloworld.spec.js @@ -178,15 +178,6 @@ describe('deploy helloworld sample run', () => { $('button=Logs').waitForVisible(); }); - it('shows logs from node', () => { - $('button=Logs').click(); - $('#logViewer').waitForVisible(); - browser.waitUntil(() => { - const logs = $('#logViewer').getText(); - return logs.indexOf(outputParameterValue + ' from node: ') > -1; - }, waitTimeout); - }); - it('navigates back to the experiment list', () => { $('button=Experiments').click(); browser.waitUntil(() => { From 251e6d52ccb3c54ff052eee22835db8682b18488 Mon Sep 17 00:00:00 2001 From: IronPan Date: Mon, 12 Aug 2019 12:55:17 -0700 Subject: [PATCH 06/11] update logic --- .../persistence/worker/workflow_saver.go | 5 +- .../apiserver/resource/resource_manager.go | 100 ++++++++---------- .../resource/resource_manager_util.go | 2 + backend/src/common/util/consts.go | 4 + backend/src/common/util/workflow.go | 13 ++- .../helloworld.spec.js | 9 ++ 6 files changed, 74 insertions(+), 59 deletions(-) diff --git a/backend/src/agent/persistence/worker/workflow_saver.go b/backend/src/agent/persistence/worker/workflow_saver.go index df8211aaf13..abd61585d9b 100644 --- a/backend/src/agent/persistence/worker/workflow_saver.go +++ b/backend/src/agent/persistence/worker/workflow_saver.go @@ -53,7 +53,10 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch "Workflow (%s): transient failure: %v", key, err) } - + if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < 0 { + log.Infof("Skip syncing Workflow (%v): workflow marked as persisted.", name) + return nil + } // Save this Workflow to the database. err = s.pipelineClient.ReportWorkflow(wf) retry := util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 246abfff9df..df780812ee6 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -498,67 +498,59 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error runId := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId] jobId := workflow.ScheduledWorkflowUUIDAsStringOrEmpty() + if workflow.PersistedFinalState() { + err := r.workflowClient.Delete(workflow.Name, &v1.DeleteOptions{}) + if err != nil { + return util.NewInternalServerError(err, "Failed to delete the completed workflow for run %s", runId) + } + } + if jobId == "" { // If a run doesn't have job ID, it's a one-time run created by Pipeline API server. // In this case the DB entry should already been created when argo workflow CRD is created. - err := r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) - if err != nil { - return util.Wrap(err, "Failed to update the run.") - } - } else { - // Get the experiment resource reference for job. - experimentRef, err := r.resourceReferenceStore.GetResourceReference(jobId, common.Job, common.Experiment) - if err != nil { - return util.Wrap(err, "Failed to retrieve the experiment ID for the job that created the run.") - } - runDetail := &model.RunDetail{ - Run: model.Run{ - UUID: runId, - DisplayName: workflow.Name, - Name: workflow.Name, - StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), - Namespace: workflow.Namespace, - CreatedAtInSec: workflow.CreationTimestamp.Unix(), - ScheduledAtInSec: workflow.ScheduledAtInSecOr0(), - FinishedAtInSec: workflow.FinishedAt(), - Conditions: workflow.Condition(), - PipelineSpec: model.PipelineSpec{ - WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), + return r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) + } + // Get the experiment resource reference for job. + experimentRef, err := r.resourceReferenceStore.GetResourceReference(jobId, common.Job, common.Experiment) + if err != nil { + return util.Wrap(err, "Failed to retrieve the experiment ID for the job that created the run.") + } + runDetail := &model.RunDetail{ + Run: model.Run{ + UUID: runId, + DisplayName: workflow.Name, + Name: workflow.Name, + StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), + Namespace: workflow.Namespace, + CreatedAtInSec: workflow.CreationTimestamp.Unix(), + ScheduledAtInSec: workflow.ScheduledAtInSecOr0(), + FinishedAtInSec: workflow.FinishedAt(), + Conditions: workflow.Condition(), + PipelineSpec: model.PipelineSpec{ + WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), + }, + ResourceReferences: []*model.ResourceReference{ + { + ResourceUUID: runId, + ResourceType: common.Run, + ReferenceUUID: jobId, + ReferenceType: common.Job, + Relationship: common.Creator, }, - ResourceReferences: []*model.ResourceReference{ - { - ResourceUUID: runId, - ResourceType: common.Run, - ReferenceUUID: jobId, - ReferenceType: common.Job, - Relationship: common.Creator, - }, - { - ResourceUUID: runId, - ResourceType: common.Run, - ReferenceUUID: experimentRef.ReferenceUUID, - ReferenceType: common.Experiment, - Relationship: common.Owner, - }, + { + ResourceUUID: runId, + ResourceType: common.Run, + ReferenceUUID: experimentRef.ReferenceUUID, + ReferenceType: common.Experiment, + Relationship: common.Owner, }, }, - PipelineRuntime: model.PipelineRuntime{ - WorkflowRuntimeManifest: workflow.ToStringForStore(), - }, - } - err = r.runStore.CreateOrUpdateRun(runDetail) - if err != nil { - return util.Wrap(err, "Failed to create or update the run.") - } - } - - if workflow.IsInFinalState() { - err := r.workflowClient.Delete(workflow.Name, &v1.DeleteOptions{}) - if err != nil { - return util.NewInternalServerError(err, "Failed to delete the completed workflow for run %s", runId) - } + }, + PipelineRuntime: model.PipelineRuntime{ + WorkflowRuntimeManifest: workflow.ToStringForStore(), + }, } - return nil + return r.runStore.CreateOrUpdateRun(runDetail) } func (r *ResourceManager) ReportScheduledWorkflowResource(swf *util.ScheduledWorkflow) error { diff --git a/backend/src/apiserver/resource/resource_manager_util.go b/backend/src/apiserver/resource/resource_manager_util.go index 924b5b6f8bf..da87816efac 100644 --- a/backend/src/apiserver/resource/resource_manager_util.go +++ b/backend/src/apiserver/resource/resource_manager_util.go @@ -126,6 +126,8 @@ func formulateRetryWorkflow(wf *util.Workflow) (*util.Workflow, []string, error) newWF := wf.DeepCopy() // Delete/reset fields which indicate workflow completed delete(newWF.Labels, common.LabelKeyCompleted) + // Delete/reset fields which indicate workflow is finished being persisted to the database + delete(newWF.Labels, util.LabelKeyWorkflowPersistedFinalState) newWF.ObjectMeta.Labels[common.LabelKeyPhase] = string(wfv1.NodeRunning) newWF.Status.Phase = wfv1.NodeRunning newWF.Status.Message = "" diff --git a/backend/src/common/util/consts.go b/backend/src/common/util/consts.go index a3ec99cce2f..e51dc4fa94b 100644 --- a/backend/src/common/util/consts.go +++ b/backend/src/common/util/consts.go @@ -39,4 +39,8 @@ const ( // LabelKeyWorkflowScheduledWorkflowName is a label on a Workflow. // It captures whether the name of the owning ScheduledWorkflow. LabelKeyWorkflowScheduledWorkflowName = constants.FullName + "/scheduledWorkflowName" + + + LabelKeyWorkflowRunId = "pipeline/runid" + LabelKeyWorkflowPersistedFinalState = "pipeline/persistedFinalState" ) diff --git a/backend/src/common/util/workflow.go b/backend/src/common/util/workflow.go index 53b7cce870b..63d62f1bf92 100644 --- a/backend/src/common/util/workflow.go +++ b/backend/src/common/util/workflow.go @@ -25,10 +25,6 @@ import ( "strings" ) -const ( - LabelKeyWorkflowRunId = "pipeline/runid" -) - // Workflow is a type to help manipulate Workflow objects. type Workflow struct { *workflowapi.Workflow @@ -251,3 +247,12 @@ func (w *Workflow) IsInFinalState() bool { } return false } + +// PersistedFinalState whether the workflow final state has being persisted. +func (w *Workflow) PersistedFinalState() bool { + if _, ok :=w.GetLabels()[LabelKeyWorkflowPersistedFinalState]; ok { + // If the label exist, workflow final state has being persisted. + return true + } + return false +} \ No newline at end of file diff --git a/test/frontend-integration-test/helloworld.spec.js b/test/frontend-integration-test/helloworld.spec.js index 939c30eb80a..238e48defcc 100644 --- a/test/frontend-integration-test/helloworld.spec.js +++ b/test/frontend-integration-test/helloworld.spec.js @@ -178,6 +178,15 @@ describe('deploy helloworld sample run', () => { $('button=Logs').waitForVisible(); }); + it('shows logs from node', () => { + $('button=Logs').click(); + $('#logViewer').waitForVisible(); + browser.waitUntil(() => { + const logs = $('#logViewer').getText(); + return logs.indexOf(outputParameterValue + ' from node: ') > -1; + }, waitTimeout); + }); + it('navigates back to the experiment list', () => { $('button=Experiments').click(); browser.waitUntil(() => { From 424ca8c737188a5378f0133f86da8590d81f1777 Mon Sep 17 00:00:00 2001 From: IronPan Date: Mon, 12 Aug 2019 13:19:30 -0700 Subject: [PATCH 07/11] update logic --- backend/Dockerfile.persistenceagent | 3 +- backend/src/agent/persistence/main.go | 41 +++--- .../agent/persistence/persistence_agent.go | 10 +- .../persistence/worker/workflow_saver.go | 21 ++-- .../apiserver/resource/resource_manager.go | 117 ++++++++++++------ 5 files changed, 121 insertions(+), 71 deletions(-) diff --git a/backend/Dockerfile.persistenceagent b/backend/Dockerfile.persistenceagent index a57c7435c9f..0645f65cca5 100644 --- a/backend/Dockerfile.persistenceagent +++ b/backend/Dockerfile.persistenceagent @@ -16,5 +16,6 @@ COPY --from=builder /bin/persistence_agent /bin/persistence_agent COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.txt /bin/license.txt ENV NAMESPACE "" +ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH "" -CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} +CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH} diff --git a/backend/src/agent/persistence/main.go b/backend/src/agent/persistence/main.go index 91b2b0ecfc5..b30e921b7e8 100644 --- a/backend/src/agent/persistence/main.go +++ b/backend/src/agent/persistence/main.go @@ -31,28 +31,30 @@ import ( ) var ( - masterURL string - kubeconfig string - initializeTimeout time.Duration - timeout time.Duration - mlPipelineAPIServerName string - mlPipelineAPIServerPort string - mlPipelineAPIServerBasePath string - mlPipelineServiceHttpPort string - mlPipelineServiceGRPCPort string - namespace string + masterURL string + kubeconfig string + initializeTimeout time.Duration + timeout time.Duration + mlPipelineAPIServerName string + mlPipelineAPIServerPort string + mlPipelineAPIServerBasePath string + mlPipelineServiceHttpPort string + mlPipelineServiceGRPCPort string + namespace string + ttlSecondsAfterWorkflowFinish int64 ) const ( - kubeconfigFlagName = "kubeconfig" - masterFlagName = "master" - initializationTimeoutFlagName = "initializeTimeout" - timeoutFlagName = "timeout" - mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath" - mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName" - mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort" - mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort" - namespaceFlagName = "namespace" + kubeconfigFlagName = "kubeconfig" + masterFlagName = "master" + initializationTimeoutFlagName = "initializeTimeout" + timeoutFlagName = "timeout" + mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath" + mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName" + mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort" + mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort" + namespaceFlagName = "namespace" + ttlSecondsAfterWorkflowFinishFlagName = "ttlSecondsAfterWorkflowFinish" ) func main() { @@ -122,4 +124,5 @@ func init() { flag.StringVar(&mlPipelineAPIServerBasePath, mlPipelineAPIServerBasePathFlagName, "/apis/v1beta1", "The base path for the ML pipeline API server.") flag.StringVar(&namespace, namespaceFlagName, "", "The namespace name used for Kubernetes informers to obtain the listers.") + flag.Int64Var(&ttlSecondsAfterWorkflowFinish, ttlSecondsAfterWorkflowFinishFlagName, 3600, "The TTL for Argo workflow to persist after workflow finish.") } diff --git a/backend/src/agent/persistence/persistence_agent.go b/backend/src/agent/persistence/persistence_agent.go index af58aa58fe9..bf530e236de 100644 --- a/backend/src/agent/persistence/persistence_agent.go +++ b/backend/src/agent/persistence/persistence_agent.go @@ -44,10 +44,10 @@ type PersistenceAgent struct { // NewPersistenceAgent returns a new persistence agent. func NewPersistenceAgent( - swfInformerFactory swfinformers.SharedInformerFactory, - workflowInformerFactory workflowinformers.SharedInformerFactory, - pipelineClient *client.PipelineClient, - time util.TimeInterface) *PersistenceAgent { + swfInformerFactory swfinformers.SharedInformerFactory, + workflowInformerFactory workflowinformers.SharedInformerFactory, + pipelineClient *client.PipelineClient, + time util.TimeInterface) *PersistenceAgent { // obtain references to shared informers swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows() workflowInformer := workflowInformerFactory.Argoproj().V1alpha1().Workflows() @@ -64,7 +64,7 @@ func NewPersistenceAgent( workflowWorker := worker.NewPersistenceWorker(time, workflowregister.Kind, workflowInformer.Informer(), true, - worker.NewWorkflowSaver(workflowClient, pipelineClient)) + worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish)) agent := &PersistenceAgent{ swfClient: swfClient, diff --git a/backend/src/agent/persistence/worker/workflow_saver.go b/backend/src/agent/persistence/worker/workflow_saver.go index abd61585d9b..1b9fbdd3a65 100644 --- a/backend/src/agent/persistence/worker/workflow_saver.go +++ b/backend/src/agent/persistence/worker/workflow_saver.go @@ -19,21 +19,24 @@ import ( "github.com/kubeflow/pipelines/backend/src/common/util" log "github.com/sirupsen/logrus" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "time" ) // WorkflowSaver provides a function to persist a workflow to a database. type WorkflowSaver struct { - client client.WorkflowClientInterface - pipelineClient client.PipelineClientInterface - metricsReporter *MetricsReporter + client client.WorkflowClientInterface + pipelineClient client.PipelineClientInterface + metricsReporter *MetricsReporter + ttlSecondsAfterWorkflowFinish int64 } func NewWorkflowSaver(client client.WorkflowClientInterface, - pipelineClient client.PipelineClientInterface) *WorkflowSaver { + pipelineClient client.PipelineClientInterface, ttlSecondsAfterWorkflowFinish int64) *WorkflowSaver { return &WorkflowSaver{ - client: client, - pipelineClient: pipelineClient, - metricsReporter: NewMetricsReporter(pipelineClient), + client: client, + pipelineClient: pipelineClient, + metricsReporter: NewMetricsReporter(pipelineClient), + ttlSecondsAfterWorkflowFinish: ttlSecondsAfterWorkflowFinish, } } @@ -53,7 +56,9 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch "Workflow (%s): transient failure: %v", key, err) } - if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < 0 { + if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < s.ttlSecondsAfterWorkflowFinish { + // Skip persisting the workflow if the workflow is finished + // and the workflow hasn't being passing the TTL log.Infof("Skip syncing Workflow (%v): workflow marked as persisted.", name) return nil } diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index df780812ee6..c7652a65843 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -499,6 +499,7 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error jobId := workflow.ScheduledWorkflowUUIDAsStringOrEmpty() if workflow.PersistedFinalState() { + // If workflow's final state has being persisted, the workflow should be garbage collected. err := r.workflowClient.Delete(workflow.Name, &v1.DeleteOptions{}) if err != nil { return util.NewInternalServerError(err, "Failed to delete the completed workflow for run %s", runId) @@ -508,49 +509,89 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error if jobId == "" { // If a run doesn't have job ID, it's a one-time run created by Pipeline API server. // In this case the DB entry should already been created when argo workflow CRD is created. - return r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) - } - // Get the experiment resource reference for job. - experimentRef, err := r.resourceReferenceStore.GetResourceReference(jobId, common.Job, common.Experiment) - if err != nil { - return util.Wrap(err, "Failed to retrieve the experiment ID for the job that created the run.") - } - runDetail := &model.RunDetail{ - Run: model.Run{ - UUID: runId, - DisplayName: workflow.Name, - Name: workflow.Name, - StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), - Namespace: workflow.Namespace, - CreatedAtInSec: workflow.CreationTimestamp.Unix(), - ScheduledAtInSec: workflow.ScheduledAtInSecOr0(), - FinishedAtInSec: workflow.FinishedAt(), - Conditions: workflow.Condition(), - PipelineSpec: model.PipelineSpec{ - WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), - }, - ResourceReferences: []*model.ResourceReference{ - { - ResourceUUID: runId, - ResourceType: common.Run, - ReferenceUUID: jobId, - ReferenceType: common.Job, - Relationship: common.Creator, + err := r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) + if err != nil { + return util.Wrap(err, "Failed to update the run.") + } + } else { + // Get the experiment resource reference for job. + experimentRef, err := r.resourceReferenceStore.GetResourceReference(jobId, common.Job, common.Experiment) + if err != nil { + return util.Wrap(err, "Failed to retrieve the experiment ID for the job that created the run.") + } + runDetail := &model.RunDetail{ + Run: model.Run{ + UUID: runId, + DisplayName: workflow.Name, + Name: workflow.Name, + StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), + Namespace: workflow.Namespace, + CreatedAtInSec: workflow.CreationTimestamp.Unix(), + ScheduledAtInSec: workflow.ScheduledAtInSecOr0(), + FinishedAtInSec: workflow.FinishedAt(), + Conditions: workflow.Condition(), + PipelineSpec: model.PipelineSpec{ + WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), }, - { - ResourceUUID: runId, - ResourceType: common.Run, - ReferenceUUID: experimentRef.ReferenceUUID, - ReferenceType: common.Experiment, - Relationship: common.Owner, + ResourceReferences: []*model.ResourceReference{ + { + ResourceUUID: runId, + ResourceType: common.Run, + ReferenceUUID: jobId, + ReferenceType: common.Job, + Relationship: common.Creator, + }, + { + ResourceUUID: runId, + ResourceType: common.Run, + ReferenceUUID: experimentRef.ReferenceUUID, + ReferenceType: common.Experiment, + Relationship: common.Owner, + }, }, }, + PipelineRuntime: model.PipelineRuntime{ + WorkflowRuntimeManifest: workflow.ToStringForStore(), + }, + } + err = r.runStore.CreateOrUpdateRun(runDetail) + if err != nil { + return util.Wrap(err, "Failed to create or update the run.") + } + } + + if workflow.IsInFinalState() { + err := AddWorkflowLabel(r.workflowClient, workflow.Name, util.LabelKeyWorkflowPersistedFinalState, "true") + if err != nil { + return util.Wrap(err, "Failed to add PersistedFinalState label to workflow") + } + } + + return nil +} + +// AddWorkflowLabel add label for a workflow +func AddWorkflowLabel(wfClient workflowclient.WorkflowInterface, name string, labelKey string, labelValue string) error { + patchObj := map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + labelKey: labelValue, + }, }, - PipelineRuntime: model.PipelineRuntime{ - WorkflowRuntimeManifest: workflow.ToStringForStore(), - }, } - return r.runStore.CreateOrUpdateRun(runDetail) + + patch, err := json.Marshal(patchObj) + if err != nil { + return util.NewInternalServerError(err, "Unexpected error while marshalling a patch object.") + } + + var operation = func() error { + _, err = wfClient.Patch(name, types.MergePatchType, patch) + return err + } + var backoffPolicy = backoff.WithMaxRetries(backoff.NewConstantBackOff(100), 10) + err = backoff.Retry(operation, backoffPolicy) + return err } func (r *ResourceManager) ReportScheduledWorkflowResource(swf *util.ScheduledWorkflow) error { From 88e0d8b8ee31776752bb2b94c140a16b86b3877e Mon Sep 17 00:00:00 2001 From: IronPan Date: Mon, 12 Aug 2019 14:43:32 -0700 Subject: [PATCH 08/11] update logic --- .../worker/persistence_worker_test.go | 10 +-- .../persistence/worker/workflow_saver_test.go | 80 +++++++++++++++---- .../resource/resource_manager_test.go | 59 +++++++------- .../src/apiserver/resource/workflow_fake.go | 33 +++++--- 4 files changed, 121 insertions(+), 61 deletions(-) diff --git a/backend/src/agent/persistence/worker/persistence_worker_test.go b/backend/src/agent/persistence/worker/persistence_worker_test.go index 6bfaf11761a..15ccb12047d 100644 --- a/backend/src/agent/persistence/worker/persistence_worker_test.go +++ b/backend/src/agent/persistence/worker/persistence_worker_test.go @@ -54,7 +54,7 @@ func TestPersistenceWorker_Success(t *testing.T) { pipelineClient := client.NewPipelineClientFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -84,7 +84,7 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) { pipelineClient := client.NewPipelineClientFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -115,7 +115,7 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) { pipelineClient := client.NewPipelineClientFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -148,7 +148,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) { "My Retriable Error")) // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -181,7 +181,7 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) { "My Permanent Error")) // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), diff --git a/backend/src/agent/persistence/worker/workflow_saver_test.go b/backend/src/agent/persistence/worker/workflow_saver_test.go index 4f7360e5adf..99fe6139b31 100644 --- a/backend/src/agent/persistence/worker/workflow_saver_test.go +++ b/backend/src/agent/persistence/worker/workflow_saver_test.go @@ -16,6 +16,7 @@ package worker import ( "fmt" + "github.com/kubeflow/pipelines/bazel-pipelines/external/go_sdk/src/time" "testing" workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -39,9 +40,7 @@ func TestWorkflow_Save_Success(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -53,9 +52,7 @@ func TestWorkflow_Save_NotFoundDuringGet(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -70,9 +67,7 @@ func TestWorkflow_Save_ErrorDuringGet(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", nil) - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -97,9 +92,7 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -124,9 +117,7 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -134,3 +125,62 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) { assert.NotNil(t, err) assert.Contains(t, err.Error(), "transient failure") } + +func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) { + workflowFake := client.NewWorkflowClientFake() + pipelineFake := client.NewPipelineClientFake() + + // Add this will result in failure unless reporting is skipped + pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, + "My Permanent Error")) + + workflow := util.NewWorkflow(&workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "MY_NAMESPACE", + Name: "MY_NAME", + Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"}, + }, + }) + + workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) + + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) + + err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) + + assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)) + assert.Equal(t, nil, err) +} + +func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) { + workflowFake := client.NewWorkflowClientFake() + pipelineFake := client.NewPipelineClientFake() + + // Add this will result in failure unless reporting is skipped + pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, + "My Permanent Error")) + + workflow := util.NewWorkflow(&workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "MY_NAMESPACE", + Name: "MY_NAME", + Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"}, + }, + Status: workflowapi.WorkflowStatus{ + FinishedAt: metav1.Now(), + }, + }) + + workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) + + saver := NewWorkflowSaver(workflowFake, pipelineFake, 1) + + // Sleep 2 seconds to make sure workflow passed TTL + time.Sleep(2 * time.Second) + + err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) + + assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "permanent failure") +} diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index a37765c3450..e3958964777 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -898,8 +898,8 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) { // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ - UID: types.UID(run.UUID), - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeRunning}, }) @@ -1070,48 +1070,46 @@ func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) { // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ - UID: types.UID(run.UUID), - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, }) err := manager.ReportWorkflowResource(workflow) assert.Nil(t, err) - runDetail, err := manager.GetRun(run.UUID) + + actualRunDetail, err := manager.GetRun(run.UUID) assert.Nil(t, err) - expectedRun := model.Run{ - UUID: "123e4567-e89b-12d3-a456-426655440000", - DisplayName: "run1", - Name: "workflow-name", - StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), - CreatedAtInSec: 2, - Conditions: "Failed", - PipelineSpec: model.PipelineSpec{ - WorkflowSpecManifest: testWorkflow.ToStringForStore(), - Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", - }, - ResourceReferences: []*model.ResourceReference{ - { - ResourceUUID: "123e4567-e89b-12d3-a456-426655440000", - ResourceType: common.Run, - ReferenceUUID: DefaultFakeUUID, - ReferenceType: common.Experiment, - Relationship: common.Owner, - }, + + wf, err := store.workflowClientFake.Get(actualRunDetail.Run.Name, v1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, wf.Labels[util.LabelKeyWorkflowPersistedFinalState], "true") +} + +func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted(t *testing.T) { + store, manager, run := initWithOneTimeRun(t) + defer store.Close() + // report workflow + workflow := util.NewWorkflow(&v1alpha1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, }, - } - assert.Equal(t, expectedRun, runDetail.Run) + Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, + }) + err := manager.ReportWorkflowResource(workflow) + assert.Nil(t, err) } -func TestReportWorkflowResource_WorkflowCompleted_DeleteWorkflowFailed(t *testing.T) { +func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted_DeleteFailed(t *testing.T) { store, manager, run := initWithOneTimeRun(t) manager.workflowClient = &FakeBadWorkflowClient{} defer store.Close() // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ - UID: types.UID(run.UUID), - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, }) @@ -1120,7 +1118,6 @@ func TestReportWorkflowResource_WorkflowCompleted_DeleteWorkflowFailed(t *testin assert.Contains(t, err.Error(), "failed to delete workflow") } - func TestReportScheduledWorkflowResource_Success(t *testing.T) { store, manager, job := initWithJob(t) defer store.Close() @@ -1336,7 +1333,7 @@ func TestReadArtifact_WorkflowNoStatus_NotFound(t *testing.T) { Name: "MY_NAME", Namespace: "MY_NAMESPACE", UID: "run-1", - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, + Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()), OwnerReferences: []v1.OwnerReference{{ APIVersion: "kubeflow.org/v1beta1", diff --git a/backend/src/apiserver/resource/workflow_fake.go b/backend/src/apiserver/resource/workflow_fake.go index 988199ff792..4099525399b 100644 --- a/backend/src/apiserver/resource/workflow_fake.go +++ b/backend/src/apiserver/resource/workflow_fake.go @@ -16,6 +16,7 @@ package resource import ( "encoding/json" + "github.com/kubeflow/pipelines/backend/src/common/util" "strconv" "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -87,32 +88,44 @@ func (c *FakeWorkflowClient) Delete(name string, options *v1.DeleteOptions) erro } func (c *FakeWorkflowClient) DeleteCollection(options *v1.DeleteOptions, - listOptions v1.ListOptions) error { + listOptions v1.ListOptions) error { glog.Error("This fake method is not yet implemented.") return nil } func (c *FakeWorkflowClient) Patch(name string, pt types.PatchType, data []byte, - subresources ...string) (*v1alpha1.Workflow, error) { + subresources ...string) (*v1alpha1.Workflow, error) { var dat map[string]interface{} json.Unmarshal(data, &dat) // TODO: Should we actually assert the type here, or just panic if it's wrong? - spec := dat["spec"].(map[string]interface{}) - activeDeadlineSeconds := spec["activeDeadlineSeconds"].(float64) + if _, ok := dat["spec"]; ok { + spec := dat["spec"].(map[string]interface{}) + activeDeadlineSeconds := spec["activeDeadlineSeconds"].(float64) + + // Simulate terminating a workflow + if pt == types.MergePatchType && activeDeadlineSeconds == 0 { + workflow, ok := c.workflows[name] + if ok { + newActiveDeadlineSeconds := int64(0) + workflow.Spec.ActiveDeadlineSeconds = &newActiveDeadlineSeconds + return workflow, nil + } + } + } - // Simulate terminating a workflow - if pt == types.MergePatchType && activeDeadlineSeconds == 0 { + if _, ok := dat["metadata"]; ok { workflow, ok := c.workflows[name] if ok { - newActiveDeadlineSeconds := int64(0) - workflow.Spec.ActiveDeadlineSeconds = &newActiveDeadlineSeconds + if workflow.Labels == nil { + workflow.Labels = map[string]string{} + } + workflow.Labels[util.LabelKeyWorkflowPersistedFinalState] = "true" return workflow, nil } } - return nil, errors.New("Failed to patch worfklow") } @@ -148,4 +161,4 @@ func (c *FakeBadWorkflowClient) Update(workflow *v1alpha1.Workflow) (*v1alpha1.W func (c *FakeBadWorkflowClient) Delete(name string, options *v1.DeleteOptions) error { return errors.New("failed to delete workflow") -} \ No newline at end of file +} From 512de51d39d8b6d67be8edec1fd8ce2229db474c Mon Sep 17 00:00:00 2001 From: IronPan Date: Mon, 12 Aug 2019 15:16:10 -0700 Subject: [PATCH 09/11] fix tests --- .../src/agent/persistence/worker/workflow_saver_test.go | 5 ++++- backend/src/apiserver/resource/resource_manager_test.go | 8 ++++---- backend/src/apiserver/resource/workflow_fake.go | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/backend/src/agent/persistence/worker/workflow_saver_test.go b/backend/src/agent/persistence/worker/workflow_saver_test.go index 99fe6139b31..57aa87c9149 100644 --- a/backend/src/agent/persistence/worker/workflow_saver_test.go +++ b/backend/src/agent/persistence/worker/workflow_saver_test.go @@ -16,8 +16,8 @@ package worker import ( "fmt" - "github.com/kubeflow/pipelines/bazel-pipelines/external/go_sdk/src/time" "testing" + "time" workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/kubeflow/pipelines/backend/src/agent/persistence/client" @@ -140,6 +140,9 @@ func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) { Name: "MY_NAME", Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"}, }, + Status: workflowapi.WorkflowStatus{ + FinishedAt: metav1.Now(), + }, }) workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index e3958964777..f89561564c8 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -1070,6 +1070,7 @@ func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) { // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ + Name: run.Name, UID: types.UID(run.UUID), Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, @@ -1078,10 +1079,7 @@ func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) { err := manager.ReportWorkflowResource(workflow) assert.Nil(t, err) - actualRunDetail, err := manager.GetRun(run.UUID) - assert.Nil(t, err) - - wf, err := store.workflowClientFake.Get(actualRunDetail.Run.Name, v1.GetOptions{}) + wf, err := store.workflowClientFake.Get(run.Run.Name, v1.GetOptions{}) assert.Nil(t, err) assert.Equal(t, wf.Labels[util.LabelKeyWorkflowPersistedFinalState], "true") } @@ -1092,6 +1090,7 @@ func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted(t *testing // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ + Name: run.Name, UID: types.UID(run.UUID), Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, }, @@ -1108,6 +1107,7 @@ func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted_DeleteFail // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ + Name: run.Name, UID: types.UID(run.UUID), Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, }, diff --git a/backend/src/apiserver/resource/workflow_fake.go b/backend/src/apiserver/resource/workflow_fake.go index 4099525399b..257a5bcc546 100644 --- a/backend/src/apiserver/resource/workflow_fake.go +++ b/backend/src/apiserver/resource/workflow_fake.go @@ -126,7 +126,7 @@ func (c *FakeWorkflowClient) Patch(name string, pt types.PatchType, data []byte, return workflow, nil } } - return nil, errors.New("Failed to patch worfklow") + return nil, errors.New("Failed to patch workflow") } func (c *FakeWorkflowClient) isTerminated(name string) (bool, error) { From 35ea2e29350b9cf24b3bb68cf0af62d6c86c3635 Mon Sep 17 00:00:00 2001 From: IronPan Date: Mon, 12 Aug 2019 16:27:52 -0700 Subject: [PATCH 10/11] fix tests --- backend/Dockerfile.persistenceagent | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/Dockerfile.persistenceagent b/backend/Dockerfile.persistenceagent index 0645f65cca5..bc1fcbff492 100644 --- a/backend/Dockerfile.persistenceagent +++ b/backend/Dockerfile.persistenceagent @@ -16,6 +16,6 @@ COPY --from=builder /bin/persistence_agent /bin/persistence_agent COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.txt /bin/license.txt ENV NAMESPACE "" -ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH "" +ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH 3600 CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH} From faa4dd64eefab05c628e6ef4d921a75b22fad897 Mon Sep 17 00:00:00 2001 From: IronPan Date: Mon, 12 Aug 2019 16:31:51 -0700 Subject: [PATCH 11/11] update --- backend/Dockerfile.persistenceagent | 4 +++- backend/src/agent/persistence/main.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/Dockerfile.persistenceagent b/backend/Dockerfile.persistenceagent index bc1fcbff492..e0cc5afd19c 100644 --- a/backend/Dockerfile.persistenceagent +++ b/backend/Dockerfile.persistenceagent @@ -16,6 +16,8 @@ COPY --from=builder /bin/persistence_agent /bin/persistence_agent COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.txt /bin/license.txt ENV NAMESPACE "" -ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH 3600 + +# Set Workflow TTL to 7 days +ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH 604800 CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH} diff --git a/backend/src/agent/persistence/main.go b/backend/src/agent/persistence/main.go index b30e921b7e8..21293674a70 100644 --- a/backend/src/agent/persistence/main.go +++ b/backend/src/agent/persistence/main.go @@ -124,5 +124,5 @@ func init() { flag.StringVar(&mlPipelineAPIServerBasePath, mlPipelineAPIServerBasePathFlagName, "/apis/v1beta1", "The base path for the ML pipeline API server.") flag.StringVar(&namespace, namespaceFlagName, "", "The namespace name used for Kubernetes informers to obtain the listers.") - flag.Int64Var(&ttlSecondsAfterWorkflowFinish, ttlSecondsAfterWorkflowFinishFlagName, 3600, "The TTL for Argo workflow to persist after workflow finish.") + flag.Int64Var(&ttlSecondsAfterWorkflowFinish, ttlSecondsAfterWorkflowFinishFlagName, 604800 /* 7 days */, "The TTL for Argo workflow to persist after workflow finish.") }