-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: retry large archived wf. Fixes #12740 #12741
fix: retry large archived wf. Fixes #12740 #12741
Conversation
|
||
wf.ObjectMeta.ResourceVersion = "" | ||
wf.ObjectMeta.UID = "" | ||
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{}) | ||
if err != nil { | ||
return nil, sutils.ToStatusError(err, codes.Internal) | ||
} | ||
if large { | ||
offloadedNodes, err := w.offloadNodeStatusRepo.Get(string(oriUid), wf.GetOffloadNodeStatusVersion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if wf.GetOffloadNodeStatusVersion() is "" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to retry a archived workflow, it will create a new workflow, it is different to retry a not-archived workflow. So when retry large archived workflow, we need to Dehydrate
the wf ( offload the node status) first, the key of these node status is the uid
of workflow, and then we need to create a new workflow according the Dehydrated workflow. And as for the controller, it will read the offload-node status by the uid
of the new created workflow, so we copy the node status info of the archived workflow and save it with new uid
of the new created workflow. we could only get the uid
after creating the new workflow, and the controller may process the new created workflow before we update node status with new uid
, so we catch the db error and make it retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I known we should get offloadedNodes. But if version is "", we got err:
time="2024-03-12T14:01:23.864Z" level=info msg="Workflow to be dehydrated" Workflow Size=3361331
time="2024-03-12T14:01:24.370Z" level=debug msg="Getting offloaded nodes" uid=06ab13ed-7e5a-4a0c-aeae-40c19f196bc5 version=
time="2024-03-12T14:01:24.371Z" level=error msg="finished unary call with code Internal" error="rpc error: code = Internal desc = upper: no more rows in this result set" grpc.code=Internal grpc.method=RetryArchivedWorkflow grpc.service=workflowarchive.ArchivedWorkflowService grpc.start_time="2024-03-12T14:01:23Z" grpc.time_ms=1242.014 span.kind=server system=grpc
you can test it argo server UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need to check "large", becaust Dehydrate->CompressWorkflowIfNeeded>IsLarge have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need to check "large", becaust Dehydrate->CompressWorkflowIfNeeded>IsLarge have.
yeah, I have removed this check 'large'. And use w.hydrator.IsHydrated(wf)
to assert it is been Dehydrated
actually.
util/errors/errors.go
Outdated
@@ -37,7 +37,8 @@ func IsTransientErr(err error) bool { | |||
apierr.IsServiceUnavailable(err) || | |||
isTransientEtcdErr(err) || | |||
matchTransientErrPattern(err) || | |||
errors.Is(err, NewErrTransient("")) | |||
errors.Is(err, NewErrTransient("")) || | |||
strings.Contains(err.Error(), "upper: no more rows in ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will this error occur ? it's better to build a function like “matchTransientErrPattern” || "isTransientEtcdErr", maybe we can add like "isTransientSqbErr".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, the root cause is as mentioned above. and I will have a test for the function isTransientSqbErr
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did this error occur? Is there an example to reproduce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I run it local with make start UI=true PROFILE=mysql ALWAYS_OFFLOAD_NODE_STATUS=true
, and submit a workflow and stop it when one step succeed, after it is archived, then retry it. (remove this error caght, it will retry failed at here https://github.com/argoproj/argo-workflows/blob/main/workflow/controller/controller.go#L854)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
steps:
- - name: run-pod1
template: run-pod
- name: run-pod2
template: run-pod
- name: run-pod3
template: run-pod
- - name: run-pod2-sleep
template: run-pod-sleep
- name: run-pod
container:
image: docker/whalesay:latest
command: [python3, -c]
args: ["print('gfn')"]
- name: run-pod-sleep
container:
image: docker/whalesay:latest
command: [python3, -c]
args: ["print('gfn')\nimport time\ntime.sleep(100)"]
ttlStrategy:
secondsAfterCompletion: 3
podGC:
strategy: OnPodCompletion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may occur in old version:#2333
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah,I have checked this issue before, and I got the error with evrsion v3.5.0, and the latest commit it also has this error. I test it at locally according to https://argo-workflows.readthedocs.io/en/latest/running-locally/, and do you add the env ALWAYS_OFFLOAD_NODE_STATUS=true
? @shuangkun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use make start UI=true PROFILE=mysql ALWAYS_OFFLOAD_NODE_STATUS=true
run worflow-controller and server and I use latest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I just pull my branch heidongxianhua:fix_retry_large_archived_wf
and rebased by the branch [argoproj:main](https://github.com/argoproj/argo-workflows/tree/main)
. If i add the error check https://github.com/argoproj/argo-workflows/pull/12741/files#diff-c2fc14b8209201e80dbf52f4293639d125ce3b6df398ebc93f5ee4e1abf05842R41, it will run ok and the server.log contains this, but there is no error in controller.log:
But if i delete this check (run make clean
and make start UI=true PROFILE=mysql ALWAYS_OFFLOAD_NODE_STATUS=true
), we will get the error in controller.log, and it lost the node:
@shuangkun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, thanks! I reproduced it. LGTM
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
Change the title. add "Fixes #12740" at the end. And commit empty Re-trigger CI. |
3142972
to
7d0bb9c
Compare
yeah, i have changed the title and Re-trigger the CI again, couly you have a look? @shuangkun |
thanks, but it has not been merged, could you merge it again? @shuangkun |
Sorry, I don't have permission. I just gave it a high priority to review. @juliev0 @isubasinghe Can you help have a look? Thanks! |
okay, I'll try to take a look at this one later |
@sarabala1979 Is it okay I assigned you to take a look at this? I see you worked on |
sorry to bother, could you have a look as this PR? @sarabala1979 |
wf.ObjectMeta.ResourceVersion = "" | ||
wf.ObjectMeta.UID = "" | ||
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{}) | ||
if err != nil { | ||
return nil, sutils.ToStatusError(err, codes.Internal) | ||
} | ||
if !w.hydrator.IsHydrated(wf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this if statement is just in case? there isn't any reason why it shouldn't be dehydrated, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nothing wrong with that of course...just trying to make sure I understand)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line 307(https://github.com/argoproj/argo-workflows/pull/12741/files#diff-f854ed23b7da7ee7a4f8bd0dc4e2114f284610f55f9604b1c09b37c0fe35d933R307), we call the func w.hydrator.Dehydrate(wf)
to Dehydrate the wf, but within this function, it will automatically determine whether(according to the size of this wf, https://github.com/argoproj/argo-workflows/blob/main/workflow/hydrator/hydrator.go#L100) dehydration really needs to be performed, so we should add this if statement . @juliev0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. Okay, please add some comments around all of this code, because I think it will be hard otherwise for people to understand the purpose.
Can you add comments to the code? |
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com>
@@ -299,12 +303,30 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req | |||
} | |||
} | |||
|
|||
log.WithFields(log.Fields{"Dehydrate workflow uid=": wf.UID}).Info("RetryArchivedWorkflow") | |||
// It will dehydrate the wf actually only if needed (when the size of workflow is too large or set the env `ALWAYS_OFFLOAD_NODE_STATUS`=true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// It will dehydrate the wf actually only if needed (when the size of workflow is too large or set the env `ALWAYS_OFFLOAD_NODE_STATUS`=true) | |
// If the Workflow needs to be dehydrated in order to capture and retain all of the previous state for the subsequent workflow, then do so |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think this is accurate?
wf.ObjectMeta.ResourceVersion = "" | ||
wf.ObjectMeta.UID = "" | ||
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{}) | ||
if err != nil { | ||
return nil, sutils.ToStatusError(err, codes.Internal) | ||
} | ||
// determine whether th wf is `Dehydrated` actually before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// determine whether th wf is `Dehydrated` actually before | |
// if the Workflow was dehydrated before, we need to capture and maintain its previous state for the new Workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a correct statement?
Signed-off-by: heidongxianhua <18207133434@163.com>
Signed-off-by: heidongxianhua <18207133434@163.com> Signed-off-by: xiaowu.zhu <xiaowu.zhu@daocloud.io>
Fixes #12740
Motivation
Modifications
Verification