diff --git a/.github/workflows/oap-mllib-ci.yml b/.github/workflows/oap-mllib-ci.yml index 2c6973321..ef9695f1b 100644 --- a/.github/workflows/oap-mllib-ci.yml +++ b/.github/workflows/oap-mllib-ci.yml @@ -1,6 +1,6 @@ name: OAP MLlib CI -on: [push, pull_request] +on: [push] jobs: build: @@ -38,5 +38,4 @@ jobs: source /opt/intel/oneapi/dal/latest/env/vars.sh source /opt/intel/oneapi/tbb/latest/env/vars.sh source /tmp/oneCCL/build/_install/env/setvars.sh - # temp disable and will enable for new release of oneCCL - #./build.sh + ./test.sh diff --git a/.gitignore b/.gitignore index 1d621bdd4..21a5d40c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ *.o *.log .vscode -*.iml target/ .idea/ .idea_modules/ diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 0f6c774c1..a0fb131a8 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -1,13 +1,5 @@ #include -#include - -#include -#include -#include -#include - #include - #include "org_apache_spark_ml_util_OneCCL__.h" // todo: fill initial comm_size and rank_id @@ -25,12 +17,10 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init std::cout << "oneCCL (native): init" << std::endl; - auto t1 = std::chrono::high_resolution_clock::now(); - ccl::init(); const char *str = env->GetStringUTFChars(ip_port, 0); - ccl::string ccl_ip_port(str); + ccl::string ccl_ip_port(str); auto kvs_attr = ccl::create_kvs_attr(); kvs_attr.set(ccl_ip_port); @@ -40,10 +30,6 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init g_comms.push_back(ccl::create_communicator(size, rank, kvs)); - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = std::chrono::duration_cast( t2 - t1 ).count(); - std::cout << "oneCCL (native): init took " << duration << " secs" << std::endl; - rank_id = getComm().rank(); comm_size = getComm().size(); @@ -111,40 +97,3 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_setEnv return err; } - -/* - * Class: org_apache_spark_ml_util_OneCCL__ - * Method: getAvailPort - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort - (JNIEnv *env, jobject obj, jstring localIP) { - - const int port_start_base = 3000; - - char* local_host_ip = (char *) env->GetStringUTFChars(localIP, NULL); - - struct sockaddr_in main_server_address; - int server_listen_sock; - - if ((server_listen_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("OneCCL (native) getAvailPort error!"); - return -1; - } - - main_server_address.sin_family = AF_INET; - main_server_address.sin_addr.s_addr = inet_addr(local_host_ip); - main_server_address.sin_port = port_start_base; - - while (bind(server_listen_sock, - (const struct sockaddr *)&main_server_address, - sizeof(main_server_address)) < 0) { - main_server_address.sin_port++; - } - - close(server_listen_sock); - - env->ReleaseStringUTFChars(localIP, local_host_ip); - - return main_server_address.sin_port; -} diff --git a/mllib-dal/src/main/native/build-jni.sh b/mllib-dal/src/main/native/build-jni.sh index a5ae7afe1..bee614dcd 100755 --- a/mllib-dal/src/main/native/build-jni.sh +++ b/mllib-dal/src/main/native/build-jni.sh @@ -19,4 +19,3 @@ javah -d $WORK_DIR/javah -classpath "$WORK_DIR/../../../target/classes:$DAAL_JAR org.apache.spark.ml.util.OneDAL$ \ org.apache.spark.ml.clustering.KMeansDALImpl \ org.apache.spark.ml.feature.PCADALImpl - diff --git a/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h b/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h index 52e6691ee..4066067f6 100644 --- a/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h +++ b/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h @@ -47,14 +47,6 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_rankID JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_setEnv (JNIEnv *, jobject, jstring, jstring, jboolean); -/* - * Class: org_apache_spark_ml_util_OneCCL__ - * Method: getAvailPort - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort - (JNIEnv *, jobject, jstring); - #ifdef __cplusplus } #endif diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index 2ac551745..31b7e7c75 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -43,11 +43,6 @@ class KMeansDALImpl ( val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - val kvsPortDetected = Utils.checkExecutorAvailPort(data.sparkContext, kvsIP) - val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) - - val kvsIPPort = kvsIP+"_"+kvsPort - // repartition to executorNum if not enough partitions val dataForConversion = if (data.getNumPartitions < executorNum) { data.repartition(executorNum).setName("Repartitioned for conversion").cache() @@ -119,7 +114,7 @@ class KMeansDALImpl ( val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIP) val initCentroids = OneDAL.makeNumericTable(centers) val result = new KMeansResult() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 15ee0538e..33dbe8349 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -40,23 +40,18 @@ class PCADALImpl ( res.map(_.asML) } - def fitWithDAL(data: RDD[Vector]) : MLlibPCAModel = { + def fitWithDAL(input: RDD[Vector]) : MLlibPCAModel = { - val normalizedData = normalizeData(data) + val normalizedData = normalizeData(input) val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum) - val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) - val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - - val kvsPortDetected = Utils.checkExecutorAvailPort(data.sparkContext, kvsIP) - val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) - - val kvsIPPort = kvsIP+"_"+kvsPort + val executorIPAddress = Utils.sparkFirstExecutorIP(input.sparkContext) + val kvsIP = input.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIP) val result = new PCAResult() cPCATrainDAL( diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala index a0a1679e9..af9080856 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala @@ -26,7 +26,7 @@ object OneCCL { // var kvsIPPort = sys.env.getOrElse("CCL_KVS_IP_PORT", "") // var worldSize = sys.env.getOrElse("CCL_WORLD_SIZE", "1").toInt -// var kvsPort = 5000 + var kvsPort = 5000 // private def checkEnv() { // val altTransport = sys.env.getOrElse("CCL_ATL_TRANSPORT", "") @@ -57,13 +57,13 @@ object OneCCL { // // setEnv("CCL_LOG_LEVEL", "2") // } - def init(executor_num: Int, rank: Int, ip_port: String) = { + def init(executor_num: Int, rank: Int, ip: String) = { // setExecutorEnv(executor_num, ip, port) - println(s"oneCCL: Initializing with IP_PORT: ${ip_port}") + println(s"oneCCL: Initializing with IP_PORT: ${ip}_${kvsPort}") // cclParam is output from native code - c_init(executor_num, rank, ip_port, cclParam) + c_init(executor_num, rank, ip+"_"+kvsPort.toString, cclParam) // executor number should equal to oneCCL world size assert(executor_num == cclParam.commSize, "executor number should equal to oneCCL world size") @@ -71,7 +71,7 @@ object OneCCL { println(s"oneCCL: Initialized with executorNum: $executor_num, commSize, ${cclParam.commSize}, rankId: ${cclParam.rankId}") // Use a new port when calling init again -// kvsPort = kvsPort + 1 + kvsPort = kvsPort + 1 } @@ -87,5 +87,4 @@ object OneCCL { @native def rankID() : Int @native def setEnv(key: String, value: String, overwrite: Boolean = true): Int - @native def getAvailPort(localIP: String): Int } \ No newline at end of file diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala index 14cf1ab27..a7b762945 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala @@ -71,20 +71,6 @@ object Utils { ip } - def checkExecutorAvailPort(sc: SparkContext, localIP: String) : Int = { - val executor_num = Utils.sparkExecutorNum(sc) - val data = sc.parallelize(1 to executor_num, executor_num) - val result = data.mapPartitionsWithIndex { (index, p) => - LibLoader.loadLibraries() - if (index == 0) - Iterator(OneCCL.getAvailPort(localIP)) - else - Iterator() - }.collect() - - return result(0) - } - def checkClusterPlatformCompatibility(sc: SparkContext) : Boolean = { LibLoader.loadLibraries()