Skip to content

Conversation

@asb108
Copy link
Contributor

@asb108 asb108 commented May 19, 2025

Closes: #41211 where the SparkKubernetesOperator's reattach_on_restart functionality doesn't work correctly.

Problem

When reattach_on_restart is enabled, the SparkKubernetesOperator tries to find the driver pod by looking for pods with specific task context labels (dag_id, task_id, run_id). However, these labels are not actually added to the driver and executor pods when creating them, causing the reattach functionality to fail.

Solution

This PR adds code to the execute method of the SparkKubernetesOperator class to add task context labels to both the driver and executor pods when reattach_on_restart is enabled. This allows the operator to find the existing driver pod if the scheduler restarts.

Testing

I've performed comprehensive testing to ensure the fix works correctly:

  1. Unit tests that verify the task context labels are correctly added when reattach_on_restart is enabled
  2. Tests that verify the default behavior remains unchanged when reattach_on_restart is disabled
  3. Integration tests in a real Kubernetes environment using Kind
  4. Tests with different Spark application configurations to ensure compatibility

All tests passed, confirming that the fix works as expected.

@boring-cyborg boring-cyborg bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels May 19, 2025
@boring-cyborg
Copy link

boring-cyborg bot commented May 19, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@asb108 asb108 force-pushed the fix-spark-kubernetes-operator-reattach-41211 branch from b7c6702 to c7bf57b Compare May 20, 2025 07:15
@asb108
Copy link
Contributor Author

asb108 commented May 20, 2025

Hi @hussein-awala @jedcunningham @eladkal,

I've pushed the changes to address the raised issue and the previously failing test. When you have a moment, could you please review the updates? I'm new to the Airflow repository, and this is my first PR, so I really appreciate any feedback or suggestions you might have.

Thanks for your time and support!

@asb108
Copy link
Contributor Author

asb108 commented May 21, 2025

Thanks for triggering tests @eladkal,
I'm seeing consistent failures in the Airflow providers test suite, in the job runs at https://github.com/apache/airflow/actions/runs/15146342927/job/42604235636?pr=50803 (ref: 6fcc9f1). The jobs are being killed with exit code 137, indicating an out-of-memory (OOM) issue. There are no other explicit errors—detailed test output is missing as the process is terminated before it can be printed.

Should I consider optimizing the tests for memory usage? I'd appreciate any suggestions on improving test reliability or adjusting the job configuration to reduce resource consumption.

Thank you

@potiuk potiuk added the debug ci resources Set it on PR if you want to debug resource usage for it label May 29, 2025
@potiuk
Copy link
Member

potiuk commented May 29, 2025

Should I consider optimizing the tests for memory usage?

Yes - i think it's best you track down where it comes from. It's hard to say and we are not experts in all integrations, but yes indeed it looks like somewhere memory is leaked - but where and how - hard to say. Note that there are usually some logs uploaded as artifacts (you can scroll down summary page) - maybe they are helpful. I also added "debug ci resources" label that might help you to see if indeed memory is growing and when more easily - it will print resource stats in the periodic dump of your test run - you can trace it when you rebase and re-run the PR.

@asb108 asb108 force-pushed the fix-spark-kubernetes-operator-reattach-41211 branch 2 times, most recently from eb2fb26 to e495110 Compare June 2, 2025 13:07
@asb108 asb108 force-pushed the fix-spark-kubernetes-operator-reattach-41211 branch from e495110 to e105907 Compare June 11, 2025 04:19
asb108 added 5 commits June 11, 2025 09:53
- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing
- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling
- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation
Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211
@asb108 asb108 force-pushed the fix-spark-kubernetes-operator-reattach-41211 branch from 78d8625 to 5b28c36 Compare June 12, 2025 00:25
@eladkal eladkal requested a review from bugraoz93 September 10, 2025 03:19
@asb108
Copy link
Contributor Author

asb108 commented Sep 10, 2025

Hi @eladkal ,
the conflicts are resolved.

Sorry missed that. Can tou rebase again?

Okay

@eladkal eladkal merged commit 619ecf7 into apache:main Sep 14, 2025
86 checks passed
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 14, 2025

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

suman-himanshu pushed a commit to suman-himanshu/airflow that referenced this pull request Sep 17, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
Brunda10 pushed a commit to Brunda10/airflow that referenced this pull request Sep 17, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Sep 30, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 1, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 2, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 3, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 4, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
…ernetesOperator reattach_on_restart functionality (apache#50803)

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix formatting in test_spark_kubernetes.py

* Fix test assertions in SparkKubernetesOperator tests to handle task context labels

* Fix whitespace issues in spark_kubernetes.py

* fix format and resolves failing tests

* Fix SparkKubernetesOperator test OOM issues

* Fix: Add task context labels to driver and executor pods for SparkKubernetesOperator reattach_on_restart functionality (apache#41211)

* Fix whitespace issues in spark_kubernetes.py

* Clean up merge conflict markers in test_spark_kubernetes.py

* Fix test assertions for SparkKubernetesOperator task context labels

- Fixed test structure expectations in test_adds_task_context_labels_to_driver_and_executor
- Changed assertion from created_body['spark']['spec'] to created_body['spec']
- This matches the actual structure passed to create_namespaced_custom_object after SparkJobSpec processing

* Fix compatibility issue with parent_dag attribute access

- Changed from checking is_subdag to parent_dag to match KubernetesPodOperator implementation
- This ensures compatibility with older Airflow versions where is_subdag may not exist
- Follows the same pattern used in the parent class for SubDAG handling

* Align _get_ti_pod_labels implementation with KubernetesPodOperator

- Use ti.map_index directly instead of getattr for consistency
- Convert try_number to string to match parent class behavior
- Convert map_index to string for label value consistency
- This ensures full compatibility with the parent class implementation

* feat: Add reattach functionality to SparkKubernetesOperator

Add reattach_on_restart parameter (default: True) to automatically reattach to
existing Spark applications on task restart, preventing duplicate job creation.

- Implement find_spark_job method for existing job detection
- Add task context labels for pod identification
- Maintain 100% backward compatibility
- Add comprehensive test coverage (2 new tests)

Fixes apache#41211

* Fix SparkKubernetesOperator reattach with task context labels

- Add task context labels to driver and executor pods when reattach_on_restart=True
- Fix execution flow to maintain test compatibility
- Preserve deferrable execution functionality
- Add comprehensive reattach logic with proper pod finding

Fixes apache#41211

* Fix code formatting for static checks

- Remove extra blank line in SparkKubernetesOperator
- Add required blank line in test file
- Ensure compliance with ruff formatting standards

* update  tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers debug ci resources Set it on PR if you want to debug resource usage for it provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SparkKubernetesOperator reattach_on_restart logic doesn't working

3 participants