Skip to content

Commit

Permalink
fix: ArtifactGC Fails for Stopped Workflows. Fixes #11879 (#11947)
Browse files Browse the repository at this point in the history
Signed-off-by: Garett MacGowan <garettsoftware@gmail.com>
Signed-off-by: Garett MacGowan <garett.macgowan@gmail.com>
  • Loading branch information
Garett-MacGowan authored Nov 13, 2023
1 parent 6b2e8d8 commit 6805c91
Show file tree
Hide file tree
Showing 40 changed files with 1,558 additions and 777 deletions.
7 changes: 7 additions & 0 deletions api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func initExecutor() *executor.WorkflowExecutor {
podName,
types.UID(os.Getenv(common.EnvVarPodUID)),
os.Getenv(common.EnvVarWorkflowName),
types.UID(os.Getenv(common.EnvVarWorkflowUID)),
os.Getenv(common.EnvVarNodeID),
namespace,
cre,
Expand Down
26 changes: 20 additions & 6 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ func NewWaitCommand() *cobra.Command {

func waitContainer(ctx context.Context) error {
wfExecutor := initExecutor()
defer wfExecutor.HandleError(ctx) // Must be placed at the bottom of defers stack.

// Don't allow cancellation to impact capture of results, parameters, artifacts, or defers.
bgCtx := context.Background()

defer wfExecutor.HandleError(bgCtx) // Must be placed at the bottom of defers stack.
defer wfExecutor.FinalizeOutput(bgCtx) // Ensures the LabelKeyReportOutputsCompleted is set to true.
defer stats.LogStats()
stats.StartStatsTicker(5 * time.Minute)

Expand All @@ -35,24 +40,33 @@ func waitContainer(ctx context.Context) error {
if err != nil {
wfExecutor.AddError(err)
}
ctx = context.Background() // don't allow cancellation to impact capture of results, parameters,or artifacts

// Capture output script result
err = wfExecutor.CaptureScriptResult(ctx)
err = wfExecutor.CaptureScriptResult(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Saving output parameters
err = wfExecutor.SaveParameters(ctx)
err = wfExecutor.SaveParameters(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Saving output artifacts
err = wfExecutor.SaveArtifacts(ctx)
err = wfExecutor.SaveArtifacts(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Save log artifacts
logArtifacts := wfExecutor.SaveLogs(bgCtx)

// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
err = wfExecutor.ReportOutputs(bgCtx, logArtifacts)
if err != nil {
wfExecutor.AddError(err)
}

wfExecutor.SaveLogs(ctx)
return wfExecutor.HasError()
}
1 change: 1 addition & 0 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ WorkflowStatus contains overall status information about a workflow
|`storedTemplates`|[`Template`](#template)|StoredTemplates is a mapping between a template ref and the node's status.|
|`storedWorkflowTemplateSpec`|[`WorkflowSpec`](#workflowspec)|StoredWorkflowSpec stores the WorkflowTemplate spec for future execution.|
|`synchronization`|[`SynchronizationStatus`](#synchronizationstatus)|Synchronization stores the status of synchronization locks|
|`taskResultsCompleted`|`Map< boolean , string >`|Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.|

## CronWorkflowSpec

Expand Down
4 changes: 4 additions & 0 deletions manifests/base/crds/full/argoproj.io_workflows.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,527 changes: 847 additions & 680 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,30 @@ type WorkflowStatus struct {

// ArtifactGCStatus maintains the status of Artifact Garbage Collection
ArtifactGCStatus *ArtGCStatus `json:"artifactGCStatus,omitempty" protobuf:"bytes,19,opt,name=artifactGCStatus"`

// Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.
TaskResultsCompleted map[string]bool `json:"taskResultsCompleted,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompleted"`
}

func (ws *WorkflowStatus) InitializeTaskResultIncomplete(resultName string) {
if ws.TaskResultsCompleted == nil {
ws.TaskResultsCompleted = make(map[string]bool)
}
if _, ok := ws.TaskResultsCompleted[resultName]; !ok {
ws.MarkTaskResultIncomplete(resultName)
}
}
func (ws *WorkflowStatus) MarkTaskResultComplete(name string) {
ws.TaskResultsCompleted[name] = true
}
func (ws *WorkflowStatus) MarkTaskResultIncomplete(name string) {
ws.TaskResultsCompleted[name] = false
}
func (ws *WorkflowStatus) GetTaskResultCompleted(name string) bool {
return ws.TaskResultsCompleted[name]
}
func (ws *WorkflowStatus) GetTaskResultsCompleted() map[string]bool {
return ws.TaskResultsCompleted
}

func (ws *WorkflowStatus) IsOffloadNodeStatus() bool {
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions sdks/python/client/docs/WorkflowServiceApi.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion test/e2e/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ spec:
`).
When().
SubmitWorkflow().
WaitForWorkflow(time.Minute + 5*time.Second).
WaitForWorkflow(2 * time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
Expand Down
95 changes: 93 additions & 2 deletions test/e2e/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/common"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
Expand Down Expand Up @@ -66,6 +67,96 @@ type artifactState struct {
deletedAtWFDeletion bool
}

func (s *ArtifactsSuite) TestStoppedWorkflow() {

for _, tt := range []struct {
workflowFile string
}{
{workflowFile: "@testdata/artifactgc/artgc-dag-wf-stopped.yaml"},
{workflowFile: "@testdata/artifactgc/artgc-dag-wf-stopped-pod-gc-on-pod-completion.yaml"},
} {
// Create the minio client for interacting with the bucket.
c, err := minio.New("localhost:9000", &minio.Options{
Creds: credentials.NewStaticV4("admin", "password", ""),
})
assert.NoError(s.T(), err)

// Ensure the artifacts aren't in the bucket.
_, err = c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.StatObjectOptions{})
if err == nil {
err = c.RemoveObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.RemoveObjectOptions{})
assert.NoError(s.T(), err)
}
_, err = c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.StatObjectOptions{})
if err == nil {
err = c.RemoveObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.RemoveObjectOptions{})
assert.NoError(s.T(), err)
}

then := s.Given().
Workflow(tt.workflowFile).
When().
Then()

// Assert the artifacts don't exist.
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})

when := then.When().
SubmitWorkflow().
WaitForWorkflow(
fixtures.WorkflowCompletionOkay(true),
fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {

condition := "for artifacts to exist"

_, err1 := c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.StatObjectOptions{})
_, err2 := c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.StatObjectOptions{})

if err1 == nil && err2 == nil {
return true, condition
}

return false, condition
}))

then = when.Then()

// Assert artifact exists
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})

when = then.When()

when.
DeleteWorkflow().
WaitForWorkflowDeletion()

then = when.Then()

// Assert the artifacts don't exist.
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})

when = then.When()

// Remove the finalizers so the workflow gets deleted in case the test failed.
when.RemoveFinalizers(false)
}
}

func (s *ArtifactsSuite) TestArtifactGC() {

s.Given().
Expand Down Expand Up @@ -231,7 +322,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
} else {
fmt.Printf("verifying artifact %s is not deleted\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.Nil(t, err)
assert.NoError(t, err)
})
}
}
Expand Down Expand Up @@ -270,7 +361,7 @@ spec:

// create a ServiceAccount which won't be tied to the artifactgc role and attempt to use that service account in the GC Pod
// Want to verify that this causes the ArtifactGCError Condition in the Workflow
func (s *ArtifactsSuite) TestArtifactGC_InsufficientRole() {
func (s *ArtifactsSuite) TestInsufficientRole() {
ctx := context.Background()
_, err := s.KubeClient.CoreV1().ServiceAccounts(fixtures.Namespace).Create(ctx, &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: "artgc-role-test-sa"}}, metav1.CreateOptions{})
assert.NoError(s.T(), err)
Expand Down
Loading

0 comments on commit 6805c91

Please sign in to comment.