Skip to content

Comments

Bugfix/fix spark k8s pod duplicate issue#61110

Merged
jscheffl merged 22 commits intoapache:mainfrom
Nataneljpwd:bugfix/fix-spark-k8s-pod-duplicate-issue
Feb 5, 2026
Merged

Bugfix/fix spark k8s pod duplicate issue#61110
jscheffl merged 22 commits intoapache:mainfrom
Nataneljpwd:bugfix/fix-spark-k8s-pod-duplicate-issue

Conversation

@Nataneljpwd
Copy link
Contributor

Fixes an issue where if a worker failed, and the k8s spark driver failed, restarted, and was in pending state, the operator would fail, yet we can recover, this PR allows for the recovery of the given state.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
  • No

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Natanel Rudyuklakir added 2 commits January 27, 2026 09:11
Copy link
Contributor

@SameerMesiah97 SameerMesiah97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excluding terminating pods is a valid filter but I’m not sure if we should remove the existing sorting keys.

@Nataneljpwd
Copy link
Contributor Author

Hello @SameerMesiah97 , I have replied to your comments, I would love to hear if you have any other comments

@Nataneljpwd Nataneljpwd force-pushed the bugfix/fix-spark-k8s-pod-duplicate-issue branch from b20fed6 to 6686be7 Compare January 27, 2026 20:12
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If okay with @SameerMesiah97 I#d be OK to merge. Not an expert in SparkK8s but changes seem to be reasonable.

@SameerMesiah97
Copy link
Contributor

If okay with @SameerMesiah97 I#d be OK to merge. Not an expert in SparkK8s but changes seem to be reasonable.

I personally feel we should have another pair of expert eyes on this. @Nataneljpwd raised a point about quorum reads which I’m not knowledgeable enough to verify.

@Nataneljpwd Nataneljpwd force-pushed the bugfix/fix-spark-k8s-pod-duplicate-issue branch from 77456fc to 792f229 Compare January 29, 2026 17:41
@SameerMesiah97
Copy link
Contributor

@Nataneljpwd

I still have two blocking concerns here:

  1. I don’t think the quorum-read argument holds. Not setting resourceVersion may require a quorum read , but that only guarantees freshness of what the API server returns. It doesn’t mean we’re reading directly from etcd, and more importantly it doesn’t enforce any stronger invariants (like uniqueness or lifecycle ordering).

  2. Even if the API response reflects the latest etcd state, Kubernetes does not guarantee that there can’t be multiple pods with the same labels in Pending or Running at the same time. I understand the SparkApplication controller intends there to be a single driver, but that guarantee is eventual. During restarts or reconciliation, overlapping driver pods are possible and expected. Your filter for deletion_timestamp partially mitigates against this by exlcuding terminating pods but I suspect there can still be windows where the replacement pod is created before the deletion for the existing pod is triggered.

Because of that, removing the existing phase-based prioritization is risky. I see that you are now prioritizing 'Succeeded' pods but since you have removed the ordering for Running and Pending pods, this does not mitigate the concerns I explained above.

I would advise you to implement something like this:

pod = max(
    pod_list,
    key=lambda p: (
    p.metadata.deletion_timestamp is None,
    p.status.phase == PodPhase.RUNNING,
    p.status.phase == PodPhase.SUCCEEDED,
    p.metadata.creation_timestamp or datetime.min.replace(tzinfo=timezone.utc),
    p.metadata.name or "",
     ),
)

This handles the edge cases revealed by this PR without accidentally returning multiple pods. I would advise removing the field selector as well because we are already filtering in the lambda expression.

@Nataneljpwd
Copy link
Contributor Author

Hello, thank you for the comment!

@Nataneljpwd

I still have two blocking concerns here:

  1. I don’t think the quorum-read argument holds. Not setting resourceVersion may require a quorum read , but that only guarantees freshness of what the API server returns. It doesn’t mean we’re reading directly from etcd, and more importantly it doesn’t enforce any stronger invariants (like uniqueness or lifecycle ordering).

This is a valid concern, though from reading the k8s api documentation, not setting a resourceVersion requires a quorum read to be served, it does mean that the API server validates with etcd before returning a result, and so it seems to achieve the same result.
About the invarients, I understand the concern, I have explained in the code in a comment why we do not care about the ordering, and why we won't get more than 1 result.

  1. Even if the API response reflects the latest etcd state, Kubernetes does not guarantee that there can’t be multiple pods with the same labels in Pending or Running at the same time. I understand the SparkApplication controller intends there to be a single driver, but that guarantee is eventual. During restarts or reconciliation, overlapping driver pods are possible and expected. Your filter for deletion_timestamp partially mitigates against this by exlcuding terminating pods but I suspect there can still be windows where the replacement pod is created before the deletion for the existing pod is triggered.

Yes, though the Spark kubernetes operator which handles the spark crd makes sure there is only 1 driver up at any given time, the transition does not happen, as before a pod is created, the old pods are deleted, as written here.

Because of that, removing the existing phase-based prioritization is risky. I see that you are now prioritizing 'Succeeded' pods but since you have removed the ordering for Running and Pending pods, this does not mitigate the concerns I explained above.

Don't we want to prioritize Succeeded pods? as there is no case in which a driver for an application will be running while there is a pod in Succeeded state, I will add back the creation time check, as it is something I most likelly deleted on accident.

I would advise you to implement something like this:

pod = max(
    pod_list,
    key=lambda p: (
    p.metadata.deletion_timestamp is None,
    p.status.phase == PodPhase.RUNNING,
    p.status.phase == PodPhase.SUCCEEDED,
    p.metadata.creation_timestamp or datetime.min.replace(tzinfo=timezone.utc),
    p.metadata.name or "",
     ),
)

This handles the edge cases revealed by this PR without accidentally returning multiple pods. I would advise removing the field selector as well because we are already filtering in the lambda expression.

We can leave the field selector, as we do not need to implicitly choose Running pods, as a running pod can only occur if it is terminating, meaning that deletion timestamp is not None, in addition to not having the RUNNING preference, as it means we might select a terminating pod before a newly created pod.

@jscheffl
Copy link
Contributor

jscheffl commented Feb 2, 2026

In my view just one minor static check, then LGTM!

@SameerMesiah97
Copy link
Contributor

@Nataneljpwd

I rest my case. I am not fully convinced that there cannot be multiple pods returned in your new implementation for find_spark_job but I will no longer block this PR based on the 2 concerns I communicated.

I appreciate all the thought and effort you put into this but whilst going through the diff, I did see quite a few areas in need of refinement. I have made comments where I saw specific areas which could be improved but I think it would not hurt to review the new implementation for find_spark_job and the relevant tests to make sure that tests are explicit, correctly documented and that they adequately guard against regressions. Once these gaps are addressed, I am okay with this PR being merged.

@Nataneljpwd
Copy link
Contributor Author

@SameerMesiah97 Thank you for the review!

@Nataneljpwd

I rest my case. I am not fully convinced that there cannot be multiple pods returned in your new implementation for find_spark_job but I will no longer block this PR based on the 2 concerns I communicated.

It is fine to block this PR if you do not think that there cannot be multiple pods returned, maybe I missed something, I do not want a PR with a supposed bugfix merged when there is even a slight concern the bug was not really fixed.
Could you explain in what case you think multiple pods can be returned? as maybe I just missed a detail.
Thank you

@SameerMesiah97
Copy link
Contributor

It is fine to block this PR if you do not think that there cannot be multiple pods returned, maybe I missed something, I do not want a PR with a supposed bugfix merged when there is even a slight concern the bug was not really fixed. Could you explain in what case you think multiple pods can be returned? as maybe I just missed a detail. Thank you

Let's just focus on getting this PR into a mergeable state.

@SameerMesiah97
Copy link
Contributor

@Nataneljpwd

I would remove the test test_find_spark_job_picks_running_pod as it is now redundant. It is not something you touched. In fact, it is a test I added, which is no longer necessary. If it is not feasible, I can remove it myself later on.

@Nataneljpwd
Copy link
Contributor Author

@SameerMesiah97 I finished answering all of your comments, I would appreciate another review

@SameerMesiah97
Copy link
Contributor

@Nataneljpwd

Thanks for all the effort. The PR is now in a mergeable state.

Apparently, there was an unrelated CI failure on main (which has been fixed in #61469) the last time you pushed your changes. I would request you to rebase and force push again so that the CI workflow can run again and hopefully complete successfully. If the CI workflow is re-triggered by the time you see this comment, there is no need to do that.

Once CI is all green, I will approve this PR.

@SameerMesiah97
Copy link
Contributor

If okay with @SameerMesiah97 I#d be OK to merge. Not an expert in SparkK8s but changes seem to be reasonable.

@jscheffl

I am okay witth this PR being merged now.

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the continued reviews @SameerMesiah97 and many more thanks to @Nataneljpwd for the patience and repeated rework. I think this is very very good now and I hope you have a reliable state with this merged.

Very positive constructive discussion folks!

@jscheffl jscheffl merged commit 3d15dcf into apache:main Feb 5, 2026
105 checks passed
jhgoebbert pushed a commit to jhgoebbert/airflow_Owen-CH-Leung that referenced this pull request Feb 8, 2026
* fix an edge case where if a pod was pending in SparkKubernetes operator, the task won't fail and will recover

* formatting

* fixed tests and removed redundent tests

* fixed pod status phase

* address comment

* added another test for the success case

* resolve CR comments

* fixed mypy issue

* address cr comments

* fix last test

* remove deletion timestamp

---------

Co-authored-by: Natanel Rudyuklakir <natanelrudyuklakir@gmail.com>
Ratasa143 pushed a commit to Ratasa143/airflow that referenced this pull request Feb 15, 2026
* fix an edge case where if a pod was pending in SparkKubernetes operator, the task won't fail and will recover

* formatting

* fixed tests and removed redundent tests

* fixed pod status phase

* address comment

* added another test for the success case

* resolve CR comments

* fixed mypy issue

* address cr comments

* fix last test

* remove deletion timestamp

---------

Co-authored-by: Natanel Rudyuklakir <natanelrudyuklakir@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants