Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] spark hangs when training is run in quick succession #4628

Closed
thesuperzapper opened this issue Jul 2, 2019 · 16 comments
Closed

Comments

@thesuperzapper
Copy link
Contributor

I am getting an infinite hang when I run the following code a few times in quick succession:

import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val dataPath = "__SPARK_HOME_LOCATION__/data/mllib/sample_binary_classification_data.txt"
val data = spark.read.format("libsvm").option("vectorType", "dense").load(dataPath)
val xgbClassifier = new XGBoostClassifier()

xgbClassifier.fit(data).transform(data).show()

Steps to reproduce:

  1. open spark-shell with xgboost jars
  2. Run the above code
  3. quickly rerun the last line until a hang happens

Other information:
When the hang happens I only get the tracker message, and nothing after that, I have to kill the spark job. (If I wait between runs, they always succeed. )

Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=XXX.XXX.XXX.XXX, DMLC_TRACKER_PORT=9096, DMLC_NUM_WORKER=2}

My environment:

  • XGBoost Master
  • Spark 2.4.3
  • (Happens in both: Zeppelin and Spark-Shell)
@CodingCat
Copy link
Member

Currently, parallel training for multiple models is not supported

@thesuperzapper
Copy link
Contributor Author

@CodingCat

In this case, I am letting each training finish, so it's only one model.

However, you're almost certainly right about the cause. That is, I think there is some "cool down" after training each model, in which you can't train another without hanging.

@thesuperzapper
Copy link
Contributor Author

@CodingCat
After further investigation, this crash is caused by running the same training within exactly 60sec of the last time you ran it.

Steps to reproduce:

  1. Download Spark 2.4.3 from here, and extract it.
  2. Download the following jars from maven:
  1. Run:
./spark-2.4.3-bin-hadoop2.7/bin/spark-shell --master local[1] --jars ./xgboost4j-0.90.jar,./xgboost4j-spark-0.90.jar,./akka-actor_2.11-2.3.11.jar,./config-1.2.1.jar
  1. Run the following code:
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val dataPath = "./spark-2.4.3-bin-hadoop2.7/data/mllib/sample_binary_classification_data.txt"
val data = spark.read.format("libsvm").option("vectorType", "dense").load(dataPath)
val xgbClassifier = new XGBoostClassifier()
  1. Run the following code:
xgbClassifier.fit(data).transform(data).show()
  1. Wait less than exactly 60sec.
  2. Run the following code:
xgbClassifier.fit(data).transform(data).show()
  1. ..Crash/Hang..

Note:

  • This also happens with the Scala Rabit tracker.
  • I have tested this on multiple servers/computers running Ubuntu and RedHat.

@thesuperzapper
Copy link
Contributor Author

@CodingCat are you able to reproduce this?

@chenqin
Copy link
Contributor

chenqin commented Aug 16, 2019

Can you try adding rabit.Shutdown() after calling

xgbClassifier.fit(data).transform(data).show()

@leafjungle
Copy link

I got the same problem:

org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:956)
ml.dmlc.xgboost4j.scala.spark.XGBoost$$anon$2.run(XGBoost.scala:295)

the spark application hang here for more than 24hours, even I just have 10 train data and set iteration as 2. I am not sure what it is waiting for.

@a-johnston
Copy link
Contributor

a-johnston commented Aug 24, 2020

I understand that this issue is regarding successive training runs and not parallel but @CodingCat can you give some brief context for why parallel training runs are not supported? Ideally I would like to allocate N workers for each of M parallel training calls (given a cluster allocated to have N*M available workers) to better scale tuning as it there's a sharp falloff in benefit from adding more workers to a single training job. However, bar this, this issue seems like it could be a blocker for even purely sequential tuning which IMO would be a pretty significant issue.

Edit: Forgot to mention that this is affecting some code of mine. It appears that models do train, instead hanging after completing training but prior (of course) to returning to non-xgboost4j code. Running with verbosity=3, it is unclear what the issue could be as Rabit seems to be tracking all the tasks as expected without warnings, errors etc. I'm fairly new to the code and am not sure where to first look but I would love to better understand the issue to implement a workaround if not a proper fix.

@trivialfis
Copy link
Member

Closing as barrier mode is now used in the spark package. Feel free to reopen if the issue persists with the latest branch.

@austinzh
Copy link
Contributor

austinzh commented Apr 17, 2023

Same issue still happen with spark 3.2.2 and xgboost 1.6.1

spark-shell  --master "local[1]" --packages ml.dmlc:xgboost4j-spark_2.12:1.6.1  ml.dmlc:xgboost4j_2.12:1.6.1 com.typesafe.akka:akka-actor_2.12:2.5.23 com.typesafe:config:1.3.3
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.2
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.18)

New observation:
In the same test replace xgbClassifier.fit(data).transform(data).show() with xgbClassifier.fit(data)
Then I can run without any failure.
And when it fail I see similar log every time
[07:49:07] [0] train-logloss:0.43654189050197600

I confirm that it's the PreXGBoost.scala:transformDataset cause this issue.
We shutdown Rabit only on the next() being call and hasNext() is false. As it show below.

        override def next(): Row = {
          val ret = batchIterImpl.next()
          if (!batchIterImpl.hasNext) {
            Rabit.shutdown()
          }
          ret
        }

But when we call show() or limit() instead of collect() or write() iterator won't reach hasNext() == false
This can be prove by change
xgbClassifier.fit(data).transform(data).show()
to
xgbClassifier.fit(data).transform(data).collect()

@trivialfis Would you mind reopen this ticket?

I can work on it if you feel like it.

@austinzh
Copy link
Contributor

@trivialfis and @wbo4958
There one more thing I am not sure why is when predict(transform), why we need rabit? It's for distribution training, but for the case of prediction, each worker don't need to talk to each other, Why rabit?

@trivialfis trivialfis reopened this Apr 17, 2023
@trivialfis
Copy link
Member

You are correct that prediction doesn't need communication. A PR for the proposed fix would be really appreciated!

@wbo4958
Copy link
Contributor

wbo4958 commented Apr 17, 2023

@austinzh will you put up a PR to fix it?

@austinzh
Copy link
Contributor

Sure. Let me take it.

@austinzh
Copy link
Contributor

austinzh commented Apr 17, 2023

For folks has similar issue.
A quick fix can be achieved by running this before your training job.

    val spark = org.apache.spark.sql.SparkSession.builder().getOrCreate()
    val nWorker = spark.sparkContext.defaultParallelism
    spark.range(0, nWorker).rdd.barrier.mapPartitions { x => { ml.dmlc.xgboost4j.java.Rabit.shutdown(); x } }.collect()

@austinzh
Copy link
Contributor

@wbo4958 Any doc or tips about setup a development environment for xgboost4j ?
I am using mac with intellij or vscode.

@wbo4958
Copy link
Contributor

wbo4958 commented Apr 24, 2023

@trivialfis since the PR is merged. could you help to close this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants