Skip to content

Conversation

@bin-lian
Copy link
Contributor

@bin-lian bin-lian commented Feb 17, 2025

SparkSubmitOperator on kubernetes .Regardless of cluster or client mode, you only need to monitor the final status of the subprocess. The final status of the subprocess is the final status of the spark program.

closes:#44810


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg
Copy link

boring-cyborg bot commented Feb 17, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@jscheffl
Copy link
Contributor

I assume the referenced bug thicket is not the right one, it refers to a PR

@bin-lian
Copy link
Contributor Author

I have modified

@bin-lian
Copy link
Contributor Author

Hello, if you have any questions about the process, I can modify it.

Copy link
Contributor

@nevcohen nevcohen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ours cluster, we get that the returncode is 0 but the spark k8s driver exit code is 1.

And this is a normal scenario that happens from time to time, we want the process to be marked as failed in Airflow!

And even more, I would change the log so that it would be more indicative, because it did manage to execute the spark submit command, but the spatk app failed.

@nevcohen
Copy link
Contributor

I think I found your problem, could you send the logs of your task that runs spark in airflow (the airflow worker logs)?

I assume that the phrase exit code appears in your logs several times, but they do not belong to the driver, so here it loads the wrong value.

Then to fix your bug, you'll have to fix the regex.

@bin-lian
Copy link
Contributor Author

bin-lian commented Feb 18, 2025

If the return code is 1, then the spark submit subprocess exited abnormally.
image
First, I am using Kubernetes Executor, spark client mode. This is the worker log. The exit code is displayed in the log, but it does not mean that the spark program has really exited due to an error.

@bin-lian
Copy link
Contributor Author

The cluster mode is similar. The sparksubmit process is a child process, and the oom exit code may appear occasionally.

@nevcohen
Copy link
Contributor

The cluster mode is similar. The sparksubmit process is a child process, and the oom exit code may appear occasionally.

The exit code in your logs is belongs to the executor, not the driver's exit code.

@nevcohen
Copy link
Contributor

If the return code is 1, then the spark submit subprocess exited abnormally.
image
First, I am using Kubernetes Executor, spark client mode. This is the worker log. The exit code is displayed in the log, but it does not mean that the spark program has really exited due to an error.

You run the spark driver on the worker itself?

@bin-lian
Copy link
Contributor Author

bin-lian commented Feb 21, 2025

I did some tests here(spark on kubernetes).
cluster mode:we get that the returncode is 0 but the spark k8s driver exit code is 1.

import subprocess

if __name__ == '__main__':
    spark_submit_cmd = ["/usr/hdp/3.1.5.0-152/spark3/bin/spark-submit",
                        "--master", "k8s://https://kubernetes.default.svc.cluster.local" ,
                        "--deploy-mode", "cluster" ,
                        "--conf" ,"spark.executor.memoryOverhead=1g" ,
                        "--conf" ,"spark.kubernetes.container.image=patsnap-us.tencentcloudcr.com/data/spark:v3.2.4.16" ,
                        "--conf" ,"spark.kubernetes.authenticate.driver.serviceAccountName=spark" ,
                        "--conf" ,"spark.kubernetes.file.upload.path=s3a://testpatsnapus-1251949819/patent2/spark/upload" ,
                        "--conf" ,"spark.hadoop.fs.s3a.endpoint=http://na-ashburn.lan.s3-proxy.info",
                        "--conf" ,"spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" ,
                        "--conf" ,"spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" ,
                        "--conf" ,"spark.hadoop.fs.s3a.fast.upload=true",
                        "--conf" ,"spark.hadoop.fs.s3a.access.key=***" ,
                        "--conf" ,"spark.hadoop.fs.s3a.secret.key='***'" ,
                        "--conf", "spark.hadoop.fs.ofs.user.appid=1250000000",
                        "--conf", "spark.hadoop.fs.ofs.tmp.cache.dir=/tmp/hadoop_cos",
                        "--conf" ,"spark.hadoop.fs.cosn.credentials.provider=org.apache.hadoop.fs.auth.SimpleCredentialProvider",
                        "--conf", "spark.hadoop.fs.cosn.impl=org.apache.hadoop.fs.CosFileSystem",
                        "--conf", "spark.hadoop.fs.cosn.bucket.region=na-ashburn",
                        "--conf", "spark.hadoop.fs.cosn.bucket.endpoint_suffix=cos.na-ashburn.myqcloud.com" ,
                        "--conf", "spark.hadoop.fs.cosn.userinfo.secretId=***",
                        "--conf", "spark.hadoop.fs.cosn.userinfo.secretKey=***",
                        "--conf", "spark.hadoop.fs.AbstractFileSystem.cosn.impl=org.apache.hadoop.fs.CosN",
                        "--conf", "spark.kubernetes.driver.podTemplateFile=/usr/hdp/3.1.5.0-152/spark3/kubernetes/template/driver-template.yml",
                        "--conf", "spark.kubernetes.executor.podTemplateFile=/usr/hdp/3.1.5.0-152/spark3/kubernetes/template/executor-template.yml",
                        "--conf", "spark.history.fs.logDirectory=cosn://testpatsnapus-1251949819/patent2/spark3/share_log/spark_history_server/",
                        "--conf", "spark.eventLog.dir=cosn://testpatsnapus-1251949819/patent2/spark3/share_log/spark_history_server/",
                        "--conf", "spark.memory.fraction=0.1",
                        "--conf" ,"spark.eventLog.enabled=True" ,
                        "--conf" ,"spark.kubernetes.namespace=dm-poc" ,
                        "--num-executors" ,"2" ,
                        "--executor-cores" ,"10" ,
                        "--executor-memory" ,"512m" ,
                        "--driver-memory" ,"512m" ,
                        "--name" ,"test_code",
                        "--class","org.apache.spark.examples.SparkPi" ,
                        "/usr/hdp/3.1.5.0-152/spark3/examples/jars/spark-examples_2.12-3.2.4.jar" ,
                        "100000000"]
    _submit_sp = subprocess.Popen(
        spark_submit_cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        bufsize=-1,
        universal_newlines=True,
    )

    for line in iter(_submit_sp.stdout):
        print(line)

    returncode = _submit_sp.wait()
    print(returncode)


image

client mode: When the driver program runs on a worker, an exit code will appear in the log, but it cannot be used as a basis for judging the program status. You can directly use the return code to judge.
The same sparkpi program has an exit code, and the final calculation is successful.
image
image

I made corresponding adjustments to the program

@nevcohen
Copy link
Contributor

I did some tests here(spark on kubernetes).
cluster mode:we get that the returncode is 0 but the spark k8s driver exit code is 1.

import subprocess

if __name__ == '__main__':
    spark_submit_cmd = ["/usr/hdp/3.1.5.0-152/spark3/bin/spark-submit",
                        "--master", "k8s://https://kubernetes.default.svc.cluster.local" ,
                        "--deploy-mode", "cluster" ,
                        "--conf" ,"spark.executor.memoryOverhead=1g" ,
                        "--conf" ,"spark.kubernetes.container.image=patsnap-us.tencentcloudcr.com/data/spark:v3.2.4.16" ,
                        "--conf" ,"spark.kubernetes.authenticate.driver.serviceAccountName=spark" ,
                        "--conf" ,"spark.kubernetes.file.upload.path=s3a://testpatsnapus-1251949819/patent2/spark/upload" ,
                        "--conf" ,"spark.hadoop.fs.s3a.endpoint=http://na-ashburn.lan.s3-proxy.info",
                        "--conf" ,"spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" ,
                        "--conf" ,"spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" ,
                        "--conf" ,"spark.hadoop.fs.s3a.fast.upload=true",
                        "--conf" ,"spark.hadoop.fs.s3a.access.key=***" ,
                        "--conf" ,"spark.hadoop.fs.s3a.secret.key='***'" ,
                        "--conf", "spark.hadoop.fs.ofs.user.appid=1250000000",
                        "--conf", "spark.hadoop.fs.ofs.tmp.cache.dir=/tmp/hadoop_cos",
                        "--conf" ,"spark.hadoop.fs.cosn.credentials.provider=org.apache.hadoop.fs.auth.SimpleCredentialProvider",
                        "--conf", "spark.hadoop.fs.cosn.impl=org.apache.hadoop.fs.CosFileSystem",
                        "--conf", "spark.hadoop.fs.cosn.bucket.region=na-ashburn",
                        "--conf", "spark.hadoop.fs.cosn.bucket.endpoint_suffix=cos.na-ashburn.myqcloud.com" ,
                        "--conf", "spark.hadoop.fs.cosn.userinfo.secretId=***",
                        "--conf", "spark.hadoop.fs.cosn.userinfo.secretKey=***",
                        "--conf", "spark.hadoop.fs.AbstractFileSystem.cosn.impl=org.apache.hadoop.fs.CosN",
                        "--conf", "spark.kubernetes.driver.podTemplateFile=/usr/hdp/3.1.5.0-152/spark3/kubernetes/template/driver-template.yml",
                        "--conf", "spark.kubernetes.executor.podTemplateFile=/usr/hdp/3.1.5.0-152/spark3/kubernetes/template/executor-template.yml",
                        "--conf", "spark.history.fs.logDirectory=cosn://testpatsnapus-1251949819/patent2/spark3/share_log/spark_history_server/",
                        "--conf", "spark.eventLog.dir=cosn://testpatsnapus-1251949819/patent2/spark3/share_log/spark_history_server/",
                        "--conf", "spark.memory.fraction=0.1",
                        "--conf" ,"spark.eventLog.enabled=True" ,
                        "--conf" ,"spark.kubernetes.namespace=dm-poc" ,
                        "--num-executors" ,"2" ,
                        "--executor-cores" ,"10" ,
                        "--executor-memory" ,"512m" ,
                        "--driver-memory" ,"512m" ,
                        "--name" ,"test_code",
                        "--class","org.apache.spark.examples.SparkPi" ,
                        "/usr/hdp/3.1.5.0-152/spark3/examples/jars/spark-examples_2.12-3.2.4.jar" ,
                        "100000000"]
    _submit_sp = subprocess.Popen(
        spark_submit_cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        bufsize=-1,
        universal_newlines=True,
    )

    for line in iter(_submit_sp.stdout):
        print(line)

    returncode = _submit_sp.wait()
    print(returncode)


image

client mode: When the driver program runs on a worker, an exit code will appear in the log, but it cannot be used as a basis for judging the program status. You can directly use the return code to judge.
The same sparkpi program has an exit code, and the final calculation is successful.
image
image

I made corresponding adjustments to the program

Excellent! Now that makes sense! Looks great!

Copy link
Contributor

@bugraoz93 bugraoz93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Thanks for the changes and everyone for testing and involving!
Could you please also add/update the unit test for this new case?

Copy link
Contributor

@nevcohen nevcohen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, I think it's better to approach this from another direction, instead of changing the if like you did, it's better to add an if here.

Only if it is cluster mode will it process the exit code from the logs.

@bin-lian
Copy link
Contributor Author

OK, I'll make the adjustment and add/update a unit test accordingly

@bin-lian
Copy link
Contributor Author

I have changed

Copy link
Contributor

@nevcohen nevcohen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that's great!

Copy link
Contributor

@bugraoz93 bugraoz93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a lot cleaner! Thanks for including the tests!

@potiuk potiuk merged commit 8d0895b into apache:main Feb 24, 2025
61 checks passed
@boring-cyborg
Copy link

boring-cyborg bot commented Feb 24, 2025

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

pull bot pushed a commit to aliavni/airflow that referenced this pull request Feb 24, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
@bin-lian bin-lian deleted the fix-spark-submit-kubernetes-exit-code branch February 25, 2025 02:12
@bin-lian
Copy link
Contributor Author

bin-lian commented Feb 25, 2025

Thanks everyone for the assistance!

@bugraoz93
Copy link
Contributor

Congrats @bin-lian!

insomnes pushed a commit to insomnes/airflow that referenced this pull request Feb 26, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
potiuk pushed a commit that referenced this pull request Feb 26, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
potiuk pushed a commit that referenced this pull request Feb 26, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
potiuk pushed a commit that referenced this pull request Feb 26, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
insomnes pushed a commit to insomnes/airflow that referenced this pull request Feb 27, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 28, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 28, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes removes dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

* spark on kubernetes client mode remove dependency on Spark Exit code

---------

Co-authored-by: Bin Lian <lianbin@patsnap.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants