-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add retry API for archived workflows. Fixes #7906 #7988
Conversation
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
|
||
// retryWorkflow takes a wf in method signature instead and has boolean to determine if this is from archive or not - archive means we must create the workflow before update | ||
func retryWorkflow(ctx context.Context, kubeClient kubernetes.Interface, hydrator hydrator.Interface, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string, retryArchive bool) (*wfv1.Workflow, error) { | ||
|
||
switch wf.Status.Phase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have a new func prepareWorkflowForRettry
, specifically it should not take any kubeClient
, hydrator
, or wfClient
- all it should do is take a workflow and make it ready for retrying
the calling function can do any hydration or whatever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func prepareWorkflowForRetry(wf *wfv1.Workflow) (*wfv1.Workflow, error) {
switch wf.Status.Phase {
case wfv1.WorkflowFailed, wfv1.WorkflowError:
default:
return nil, errors.Errorf(errors.CodeBadRequest, "workflow must be Failed/Error to retry")
}
newWF := wf.DeepCopy()
// Delete/reset fields which indicate workflow completed
delete(newWF.Labels, common.LabelKeyCompleted)
delete(newWF.Labels, common.LabelKeyWorkflowArchivingStatus)
newWF.Status.Conditions.UpsertCondition(wfv1.Condition{Status: metav1.ConditionFalse, Type: wfv1.ConditionTypeCompleted})
newWF.ObjectMeta.Labels[common.LabelKeyPhase] = string(wfv1.NodeRunning)
newWF.Status.Phase = wfv1.WorkflowRunning
newWF.Status.Nodes = make(wfv1.Nodes)
newWF.Status.Message = ""
newWF.Status.StartedAt = metav1.Time{Time: time.Now().UTC()}
newWF.Status.FinishedAt = metav1.Time{}
newWF.Spec.Shutdown = ""
if newWF.Spec.ActiveDeadlineSeconds != nil && *newWF.Spec.ActiveDeadlineSeconds == 0 {
// if it was terminated, unset the deadline
newWF.Spec.ActiveDeadlineSeconds = nil
}
newWF.Status.StoredTemplates = make(map[string]wfv1.Template)
for id, tmpl := range wf.Status.StoredTemplates {
newWF.Status.StoredTemplates[id] = tmpl
}
return newWF, nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this function replace retryWorkflow
? In that case, we would move all the node logic into the calling functions RetryWorkflow
and RetryArchiveWorkflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prepareWorkflowForRetry
is great
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work identifying that you cannot just resubmit an archived workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes requested
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some quick comments.
What happens if you move the Update/Create to the service level?
|
||
// retryWorkflow takes a wf in method signature instead and has boolean to determine if this is from archive or not - archive means we must create the workflow before update | ||
func retryWorkflow(ctx context.Context, kubeClient kubernetes.Interface, hydrator hydrator.Interface, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string, retryArchive bool) (*wfv1.Workflow, error) { | ||
|
||
switch wf.Status.Phase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prepareWorkflowForRetry
is great
Functionality is the same if we move the update/create calls to the service level. We can remove the |
I think that's the right thing to do. The current util finc combines two responsibilies:
It if only does the first, the second responsibly can be moved to the service layer. https://en.wikipedia.org/wiki/Single-responsibility_principle |
… to service level Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add an e2e (API) test?
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
test/e2e/argo_server_test.go
Outdated
}) | ||
|
||
s.Run("Retry", func() { | ||
s.Need(fixtures.BaseLayerArtifacts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, retry archived workflow will fail and return a 500 internal server error if the original workflow isn't deleted so for testing purposes, this is required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.Need(fixtures.BaseLayerArtifacts)
should not be needed?
workflow/util/util.go
Outdated
updated, err = retryWorkflow(ctx, kubeClient, hydrator, wfClient, name, restartSuccessful, nodeFieldSelector) | ||
return !errorsutil.IsTransientErr(err), err | ||
}) | ||
func RetryWorkflow(ctx context.Context, podIf v1.PodInterface, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string) (*wfv1.Workflow, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we follow the pattern we found in Resubmit
, i.e. create a FormulateResubmitWorkflow
and move all the client-go calls into the service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove wfClient as a whole from the RetryWorkflow and RetryArchivedWorkflow methods since they no longer are used at all in the helper prepareWorkflowForRetry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes please
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
workflow/util/util.go
Outdated
func retryWorkflow(ctx context.Context, kubeClient kubernetes.Interface, hydrator hydrator.Interface, wfClient v1alpha1.WorkflowInterface, name string, restartSuccessful bool, nodeFieldSelector string) (*wfv1.Workflow, error) { | ||
wf, err := wfClient.Get(ctx, name, metav1.GetOptions{}) | ||
// RetryWorkflow creates a workflow from the workflow archive | ||
func RetryArchivedWorkflow(ctx context.Context, podIf v1.PodInterface, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string) (*wfv1.Workflow, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a weird one. We want to retry an archived workflow. But the workflow itself may or may not have been deleted. So I think logic can be:
- Hydrate workflow
- Formulate the workflow for retry.
- Delete any pods for any failed node.
- De-hydrate workflow.
- Update the workflow. If 404 error, zero fields like UID and resourceVersion, then create it.
I want to see these changes:
- Rename
prepareWorkflowForRetry
FormulateRetryWorkflow
. - Rather than pass in
podIf
. Instead return a list of pod names that need to be deleted. - Wrap that in
RetryWorkflow
. I think you can use that is both cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move deleting of the pods all to the server side then?
// Iterate the previous nodes. If it was successful Pod carry it forward
deletedNodes := make(map[string]bool)
for _, node := range wf.Status.Nodes {
doForceResetNode := false
if _, present := nodeIDsToReset[node.ID]; present {
// if we are resetting this node then don't carry it across regardless of its phase
doForceResetNode = true
}
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if !strings.HasPrefix(node.Name, onExitNodeName) && !doForceResetNode {
newWF.Status.Nodes[node.ID] = node
continue
}
case wfv1.NodeError, wfv1.NodeFailed, wfv1.NodeOmitted:
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeTaskGroup || node.Type == wfv1.NodeTypeStepGroup) {
newNode := node.DeepCopy()
newNode.Phase = wfv1.NodeRunning
newNode.Message = ""
newNode.StartedAt = metav1.Time{Time: time.Now().UTC()}
newNode.FinishedAt = metav1.Time{}
newWF.Status.Nodes[newNode.ID] = *newNode
continue
} else {
deletedNodes[node.ID] = true
}
// do not add this status to the node. pretend as if this node never existed.
default:
// Do not allow retry of workflows with pods in Running/Pending phase
return nil, errors.InternalErrorf("Workflow cannot be retried with node %s in %s phase", node.Name, node.Phase)
}
if node.Type == wfv1.NodeTypePod {
templateName := getTemplateFromNode(node)
version := GetWorkflowPodNameVersion(wf)
podName := PodName(wf.Name, node.Name, templateName, node.ID, version)
log.Infof("Deleting pod: %s", podName)
err := podIf.Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
return nil, errors.InternalWrapError(err)
}
} else if node.Name == wf.ObjectMeta.Name {
newNode := node.DeepCopy()
newNode.Phase = wfv1.NodeRunning
newNode.Message = ""
newNode.StartedAt = metav1.Time{Time: time.Now().UTC()}
newNode.FinishedAt = metav1.Time{}
newWF.Status.Nodes[newNode.ID] = *newNode
continue
}
}
if len(deletedNodes) > 0 {
for _, node := range newWF.Status.Nodes {
var newChildren []string
for _, child := range node.Children {
if !deletedNodes[child] {
newChildren = append(newChildren, child)
}
}
node.Children = newChildren
var outboundNodes []string
for _, outboundNode := range node.OutboundNodes {
if !deletedNodes[outboundNode] {
outboundNodes = append(outboundNodes, outboundNode)
}
}
node.OutboundNodes = outboundNodes
newWF.Status.Nodes[node.ID] = node
}
}
Update the workflow. If 404 error, zero fields like UID and resourceVersion, then create it.
This can also occur on the server side so we can remove RetryArchivedWorkflow
and use RetryWorkflow
for both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct - to the service layer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved deletion of pods to server side. RetryWorkflow
now also returns a list of podNames to delete which allows us to remove podIf needing to be passed at all.
func RetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string) (*wfv1.Workflow, []string, error) {
updatedWf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, restartSuccessful, nodeFieldSelector)
if err != nil {
return nil, nil, err
}
return updatedWf, podsToDelete, err
}
func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string) (*wfv1.Workflow, []string, error) {
...
if node.Type == wfv1.NodeTypePod {
templateName := getTemplateFromNode(node)
version := GetWorkflowPodNameVersion(wf)
podName := PodName(wf.Name, node.Name, templateName, node.ID, version)
podsToDelete = append(podsToDelete, podName)
func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req *workflowarchivepkg.RetryArchivedWorkflowRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
kubeClient := auth.GetKubeClient(ctx)
wf, err := w.GetArchivedWorkflow(ctx, &workflowarchivepkg.GetArchivedWorkflowRequest{Uid: req.Uid})
if err != nil {
return nil, err
}
wf, podsToDelete, err := util.RetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector)
if err != nil {
return nil, err
}
for _, podName := range podsToDelete {
log.Infof("Deleting pod: %s", podName)
err := kubeClient.CoreV1().Pods(wf.Namespace).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
return nil, errors.InternalWrapError(err)
}
}
wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Update(ctx, wf, metav1.UpdateOptions{})
if apierr.IsBadRequest(err) {
wf.ObjectMeta.ResourceVersion = ""
wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{})
if err != nil {
return nil, err
}
}
return wf, nil
}
…ForRetry to FormulateRetryWorkflow Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
workflow/util/util_test.go
Outdated
} | ||
}) | ||
} | ||
// func TestRetryArchivedWorkflow(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed on a quick scan, removed
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
server/workflow/workflow_server.go
Outdated
} | ||
|
||
for _, podName := range podsToDelete { | ||
log.Infof("Deleting pod: %s", podName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use structured logging
server/workflow/workflow_server.go
Outdated
log.Infof("Deleting pod: %s", podName) | ||
err := kubeClient.CoreV1().Pods(wf.Namespace).Delete(ctx, podName, metav1.DeleteOptions{}) | ||
if err != nil && !apierr.IsNotFound(err) { | ||
return nil, errors.InternalWrapError(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove InternalWrapError, not needed
} | ||
|
||
for _, podName := range podsToDelete { | ||
log.Infof("Deleting pod: %s", podName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
structured logging please
log.Infof("Deleting pod: %s", podName) | ||
err := kubeClient.CoreV1().Pods(wf.Namespace).Delete(ctx, podName, metav1.DeleteOptions{}) | ||
if err != nil && !apierr.IsNotFound(err) { | ||
return nil, errors.InternalWrapError(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove InternalWrapError
} | ||
|
||
wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Update(ctx, wf, metav1.UpdateOptions{}) | ||
if apierr.IsAlreadyExists(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this condition correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsNotFound? how did you test this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used wrong error check, this is meant for going from Create to Update however we're doing Update to Create, will fix
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
workflow/util/util.go
Outdated
updated, err = retryWorkflow(ctx, kubeClient, hydrator, wfClient, name, restartSuccessful, nodeFieldSelector) | ||
return !errorsutil.IsTransientErr(err), err | ||
}) | ||
func RetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string) (*wfv1.Workflow, []string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should delete this func
Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
…proj#7988) Signed-off-by: Dillen Padhiar <dpadhiar99@gmail.com>
Signed-off-by: Dillen Padhiar dpadhiar99@gmail.com
Fixes #7906
Previously workflows were only able to be retried if they existed in the cluster. If a workflow is GC-ed immediately after completion, you must save the workflow manually as a plain YAML or into workflow archive, then
kubectl create
before usingargo retry
. This API allows for archived workflows to be created and retried removing the need for this otherwise hacky method.