From ac216f402fe48a29ccf62037bc4ee408597204c5 Mon Sep 17 00:00:00 2001 From: Xiaochang Wu Date: Thu, 14 Jan 2021 18:57:51 +0800 Subject: [PATCH 1/5] Port oneccl to new C++ apis --- mllib-dal/src/main/native/KMeansDALImpl.cpp | 35 +++++++--------- mllib-dal/src/main/native/OneCCL.cpp | 46 +++++++++++---------- mllib-dal/src/main/native/OneCCL.h | 5 +++ mllib-dal/src/main/native/PCADALImpl.cpp | 18 ++++---- 4 files changed, 51 insertions(+), 53 deletions(-) create mode 100644 mllib-dal/src/main/native/OneCCL.h diff --git a/mllib-dal/src/main/native/KMeansDALImpl.cpp b/mllib-dal/src/main/native/KMeansDALImpl.cpp index 790b6a154..688dd84b3 100644 --- a/mllib-dal/src/main/native/KMeansDALImpl.cpp +++ b/mllib-dal/src/main/native/KMeansDALImpl.cpp @@ -14,13 +14,14 @@ * limitations under the License. *******************************************************************************/ -#include +#include #include +#include +#include #include "service.h" #include "org_apache_spark_ml_clustering_KMeansDALImpl.h" -#include -#include +#include "OneCCL.h" using namespace std; using namespace daal; @@ -30,7 +31,8 @@ const int ccl_root = 0; typedef double algorithmFPType; /* Algorithm floating-point type */ -static NumericTablePtr kmeans_compute(int rankId, const NumericTablePtr & pData, const NumericTablePtr & initialCentroids, +static NumericTablePtr kmeans_compute(int rankId, ccl::communicator &comm, + const NumericTablePtr & pData, const NumericTablePtr & initialCentroids, size_t nClusters, size_t nBlocks, algorithmFPType &ret_cost) { const bool isRoot = (rankId == ccl_root); @@ -43,17 +45,13 @@ static NumericTablePtr kmeans_compute(int rankId, const NumericTablePtr & pData, CentroidsArchLength = inputArch.getSizeOfArchive(); } - ccl_request_t request; - /* Get partial results from the root node */ - ccl_bcast(&CentroidsArchLength, sizeof(size_t), ccl_dtype_char, ccl_root, NULL, NULL, NULL, &request); - ccl_wait(request); + ccl::broadcast(&CentroidsArchLength, sizeof(size_t), ccl::datatype::uint8, ccl_root, comm).wait(); ByteBuffer nodeCentroids(CentroidsArchLength); if (isRoot) inputArch.copyArchiveToArray(&nodeCentroids[0], CentroidsArchLength); - ccl_bcast(&nodeCentroids[0], CentroidsArchLength, ccl_dtype_char, ccl_root, NULL, NULL, NULL, &request); - ccl_wait(request); + ccl::broadcast(&nodeCentroids[0], CentroidsArchLength, ccl::datatype::uint8, ccl_root, comm).wait(); /* Deserialize centroids data */ OutputDataArchive outArch(nodeCentroids.size() ? &nodeCentroids[0] : NULL, CentroidsArchLength); @@ -79,7 +77,7 @@ static NumericTablePtr kmeans_compute(int rankId, const NumericTablePtr & pData, ByteBuffer serializedData; /* Serialized data is of equal size on each node if each node called compute() equal number of times */ - size_t* recvCounts = new size_t[nBlocks]; + vector recvCounts(nBlocks); for (size_t i = 0; i < nBlocks; i++) { recvCounts[i] = perNodeArchLength; @@ -90,10 +88,7 @@ static NumericTablePtr kmeans_compute(int rankId, const NumericTablePtr & pData, dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength); /* Transfer partial results to step 2 on the root node */ - ccl_allgatherv(&nodeResults[0], perNodeArchLength, &serializedData[0], recvCounts, ccl_dtype_char, NULL, NULL, NULL, &request); - ccl_wait(request); - - delete [] recvCounts; + ccl::allgatherv(&nodeResults[0], perNodeArchLength, &serializedData[0], recvCounts, ccl::datatype::uint8, comm).wait(); if (isRoot) { @@ -168,8 +163,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMean jint executor_num, jint executor_cores, jobject resultObj) { - size_t rankId; - ccl_get_comm_rank(NULL, &rankId); + ccl::communicator *comm = getComm(); + size_t rankId = comm->rank(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); NumericTablePtr centroids = *((NumericTablePtr *)pNumTabCenters); @@ -189,16 +184,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMean for (it = 0; it < iteration_num && !converged; it++) { auto t1 = std::chrono::high_resolution_clock::now(); - newCentroids = kmeans_compute(rankId, pData, centroids, cluster_num, executor_num, totalCost); + newCentroids = kmeans_compute(rankId, *comm, pData, centroids, cluster_num, executor_num, totalCost); if (rankId == ccl_root) { converged = areAllCentersConverged(centroids, newCentroids, tolerance); } // Sync converged status - ccl_request_t request; - ccl_bcast(&converged, 1, ccl_dtype_char, ccl_root, NULL, NULL, NULL, &request); - ccl_wait(request); + ccl::broadcast(&converged, 1, ccl::datatype::uint8, ccl_root, *comm).wait(); centroids = newCentroids; diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index b23be9737..9d84754cf 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -1,23 +1,36 @@ #include -#include +#include #include "org_apache_spark_ml_util_OneCCL__.h" +// todo: fill initial comm_size and rank_id +size_t comm_size; +size_t rank_id; + +ccl::communicator *getComm() { + ccl::shared_ptr_class kvs; + static ccl::communicator b = ccl::create_communicator(comm_size, rank_id, kvs); + return &b; +} + JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init (JNIEnv *env, jobject obj, jobject param) { std::cout << "oneCCL (native): init" << std::endl; - ccl_init(); + ccl::init(); - jclass cls = env->GetObjectClass(param); - jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J"); - jfieldID fid_rank_id = env->GetFieldID(cls, "rankId", "J"); + ccl::shared_ptr_class kvs; + ccl::kvs::address_type main_addr; + kvs = ccl::create_kvs(main_addr); + + auto comm = getComm(); - size_t comm_size; - size_t rank_id; + rank_id = comm->rank(); + comm_size = comm->size(); - ccl_get_comm_size(NULL, &comm_size); - ccl_get_comm_rank(NULL, &rank_id); + jclass cls = env->GetObjectClass(param); + jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J"); + jfieldID fid_rank_id = env->GetFieldID(cls, "rankId", "J"); env->SetLongField(param, fid_comm_size, comm_size); env->SetLongField(param, fid_rank_id, rank_id); @@ -35,7 +48,6 @@ JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup std::cout << "oneCCL (native): cleanup" << std::endl; - ccl_finalize(); } /* @@ -44,12 +56,9 @@ JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup * Signature: ()Z */ JNIEXPORT jboolean JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_isRoot - (JNIEnv *env, jobject obj) { - - size_t rank_id; - ccl_get_comm_rank(NULL, &rank_id); + (JNIEnv *env, jobject obj) { - return (rank_id == 0); + return getComm()->rank() == 0; } /* @@ -59,12 +68,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_isRoot */ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_rankID (JNIEnv *env, jobject obj) { - - size_t rank_id; - ccl_get_comm_rank(NULL, &rank_id); - - return rank_id; - + return getComm()->rank(); } /* diff --git a/mllib-dal/src/main/native/OneCCL.h b/mllib-dal/src/main/native/OneCCL.h new file mode 100644 index 000000000..bead7fdee --- /dev/null +++ b/mllib-dal/src/main/native/OneCCL.h @@ -0,0 +1,5 @@ +#pragma once + +#include + +ccl::communicator *getComm(); diff --git a/mllib-dal/src/main/native/PCADALImpl.cpp b/mllib-dal/src/main/native/PCADALImpl.cpp index 3b06fc0dc..7aa1dd488 100644 --- a/mllib-dal/src/main/native/PCADALImpl.cpp +++ b/mllib-dal/src/main/native/PCADALImpl.cpp @@ -1,4 +1,3 @@ -#include #include #include "service.h" @@ -7,6 +6,7 @@ #include #include "org_apache_spark_ml_feature_PCADALImpl.h" +#include "OneCCL.h" using namespace std; using namespace daal; @@ -24,8 +24,9 @@ typedef double algorithmFPType; /* Algorithm floating-point type */ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL( JNIEnv *env, jobject obj, jlong pNumTabData, jint k, jint executor_num, jint executor_cores, jobject resultObj) { - size_t rankId; - ccl_get_comm_rank(NULL, &rankId); + + ccl::communicator *comm = getComm(); + size_t rankId = comm->rank(); const size_t nBlocks = executor_num; const int comm_size = executor_num; @@ -59,9 +60,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL byte* nodeResults = new byte[perNodeArchLength]; dataArch.copyArchiveToArray(nodeResults, perNodeArchLength); - ccl_request_t request; - - size_t* recv_counts = new size_t[comm_size * perNodeArchLength]; + vector recv_counts(comm_size * perNodeArchLength); for (int i = 0; i < comm_size; i++) recv_counts[i] = perNodeArchLength; cout << "PCA (native): ccl_allgatherv receiving " << perNodeArchLength * nBlocks << " bytes" << endl; @@ -71,17 +70,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL /* Transfer partial results to step 2 on the root node */ // MPI_Gather(nodeResults, perNodeArchLength, MPI_CHAR, serializedData.get(), // perNodeArchLength, MPI_CHAR, ccl_root, MPI_COMM_WORLD); - ccl_allgatherv(nodeResults, perNodeArchLength, serializedData.get(), recv_counts, - ccl_dtype_char, NULL, NULL, NULL, &request); - ccl_wait(request); + ccl::allgatherv(nodeResults, perNodeArchLength, serializedData.get(), recv_counts, + ccl::datatype::uint8, *comm).wait(); auto t2 = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast( t2 - t1 ).count(); std::cout << "PCA (native): ccl_allgatherv took " << duration << " secs" << std::endl; - delete[] nodeResults; - if (rankId == ccl_root) { auto t1 = std::chrono::high_resolution_clock::now(); From 99b9551cde660154a688af43a9dd5240d9933ce5 Mon Sep 17 00:00:00 2001 From: Xiaochang Wu Date: Tue, 26 Jan 2021 14:33:05 +0800 Subject: [PATCH 2/5] Add IP Port kvs_attr and Kmeans, PCA test cases validated --- mllib-dal/build.sh | 32 ++++++++ mllib-dal/pom.xml | 16 ++-- mllib-dal/src/assembly/assembly.xml | 13 +--- .../org/apache/spark/ml/util/LibLoader.java | 5 +- mllib-dal/src/main/native/KMeansDALImpl.cpp | 8 +- mllib-dal/src/main/native/Makefile | 2 +- mllib-dal/src/main/native/OneCCL.cpp | 34 ++++---- mllib-dal/src/main/native/OneCCL.h | 2 +- mllib-dal/src/main/native/PCADALImpl.cpp | 6 +- .../javah/org_apache_spark_ml_util_OneCCL__.h | 4 +- .../spark/ml/clustering/KMeansDALImpl.scala | 4 +- .../apache/spark/ml/feature/PCADALImpl.scala | 4 +- .../org/apache/spark/ml/util/OneCCL.scala | 78 ++++++++++--------- 13 files changed, 126 insertions(+), 82 deletions(-) create mode 100755 mllib-dal/build.sh diff --git a/mllib-dal/build.sh b/mllib-dal/build.sh new file mode 100755 index 000000000..da1d8df75 --- /dev/null +++ b/mllib-dal/build.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# Check envs for building +if [[ -z $JAVA_HOME ]]; then + echo $JAVA_HOME not defined! + exit 1 +fi + +if [[ -z $DAALROOT ]]; then + echo DAALROOT not defined! + exit 1 +fi + +if [[ -z $TBBROOT ]]; then + echo TBBROOT not defined! + exit 1 +fi + +if [[ -z $CCL_ROOT ]]; then + echo CCL_ROOT not defined! + exit 1 +fi + +echo === Building Environments === +echo JAVA_HOME=$JAVA_HOME +echo DAALROOT=$DAALROOT +echo TBBROOT=$TBBROOT +echo CCL_ROOT=$CCL_ROOT +echo GCC Version: $(gcc -dumpversion) +echo ============================= + +mvn -DskipTests clean package diff --git a/mllib-dal/pom.xml b/mllib-dal/pom.xml index 01e002830..4e51f9157 100644 --- a/mllib-dal/pom.xml +++ b/mllib-dal/pom.xml @@ -218,10 +218,12 @@ ${env.CCL_ROOT}/lib - libpmi.so.1 - libresizable_pmi.so.1 + + + libmpi.so.12.0.0 libfabric.so.1 - libccl_atl_ofi.so.1 + libccl.so + @@ -271,9 +273,13 @@ ${project.build.testOutputDirectory}/lib/libtbbmalloc.so.2 - ${project.build.testOutputDirectory}/lib/libccl_atl_ofi.so.1 - ${project.build.testOutputDirectory}/lib/libccl_atl_ofi.so + ${project.build.testOutputDirectory}/lib/libmpi.so.12.0.0 + ${project.build.testOutputDirectory}/lib/libmpi.so.12 + + + + diff --git a/mllib-dal/src/assembly/assembly.xml b/mllib-dal/src/assembly/assembly.xml index 137f19b81..498b90e02 100644 --- a/mllib-dal/src/assembly/assembly.xml +++ b/mllib-dal/src/assembly/assembly.xml @@ -58,26 +58,21 @@ - ${env.CCL_ROOT}/lib/libpmi.so.1 + ${env.CCL_ROOT}/lib/libfabric.so.1 lib - ${env.CCL_ROOT}/lib/libresizable_pmi.so.1 + ${env.CCL_ROOT}/lib/libmpi.so.12.0.0 lib + libmpi.so.12 - ${env.CCL_ROOT}/lib//libfabric.so.1 + ${env.CCL_ROOT}/lib/libccl.so lib ${env.CCL_ROOT}/lib/prov/libsockets-fi.so lib - - - ${env.CCL_ROOT}/lib/libccl_atl_ofi.so.1 - lib - libccl_atl_ofi.so - \ No newline at end of file diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java index 5b51451ae..c11b4e56e 100644 --- a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java +++ b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java @@ -55,11 +55,10 @@ public static synchronized void loadLibraries() throws IOException { * Load oneCCL libs in dependency order */ public static synchronized void loadLibCCL() throws IOException { - loadFromJar(subDir, "libpmi.so.1"); - loadFromJar(subDir, "libresizable_pmi.so.1"); loadFromJar(subDir, "libfabric.so.1"); + loadFromJar(subDir, "libmpi.so.12"); + loadFromJar(subDir, "libccl.so"); loadFromJar(subDir, "libsockets-fi.so"); - loadFromJar(subDir, "libccl_atl_ofi.so"); } /** diff --git a/mllib-dal/src/main/native/KMeansDALImpl.cpp b/mllib-dal/src/main/native/KMeansDALImpl.cpp index 688dd84b3..d9c7a2f29 100644 --- a/mllib-dal/src/main/native/KMeansDALImpl.cpp +++ b/mllib-dal/src/main/native/KMeansDALImpl.cpp @@ -163,8 +163,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMean jint executor_num, jint executor_cores, jobject resultObj) { - ccl::communicator *comm = getComm(); - size_t rankId = comm->rank(); + ccl::communicator &comm = getComm(); + size_t rankId = comm.rank(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); NumericTablePtr centroids = *((NumericTablePtr *)pNumTabCenters); @@ -184,14 +184,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMean for (it = 0; it < iteration_num && !converged; it++) { auto t1 = std::chrono::high_resolution_clock::now(); - newCentroids = kmeans_compute(rankId, *comm, pData, centroids, cluster_num, executor_num, totalCost); + newCentroids = kmeans_compute(rankId, comm, pData, centroids, cluster_num, executor_num, totalCost); if (rankId == ccl_root) { converged = areAllCentersConverged(centroids, newCentroids, tolerance); } // Sync converged status - ccl::broadcast(&converged, 1, ccl::datatype::uint8, ccl_root, *comm).wait(); + ccl::broadcast(&converged, 1, ccl::datatype::uint8, ccl_root, comm).wait(); centroids = newCentroids; diff --git a/mllib-dal/src/main/native/Makefile b/mllib-dal/src/main/native/Makefile index bb071d6ec..d6f5ce431 100644 --- a/mllib-dal/src/main/native/Makefile +++ b/mllib-dal/src/main/native/Makefile @@ -31,7 +31,7 @@ INCS := -I $(JAVA_HOME)/include \ # Use static link if possible, TBB is only available as dynamic libs -LIBS := -L${CCL_ROOT}/lib -l:libccl.a \ +LIBS := -L${CCL_ROOT}/lib -lccl \ -L$(DAALROOT)/lib/intel64 -l:libdaal_core.a -l:libdaal_thread.a \ -L$(TBBROOT)/lib -ltbb -ltbbmalloc # TODO: Add signal chaining support, should fix linking, package so and loading diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 9d84754cf..a0fb131a8 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -6,27 +6,32 @@ size_t comm_size; size_t rank_id; -ccl::communicator *getComm() { - ccl::shared_ptr_class kvs; - static ccl::communicator b = ccl::create_communicator(comm_size, rank_id, kvs); - return &b; +std::vector g_comms; + +ccl::communicator &getComm() { + return g_comms[0]; } JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init - (JNIEnv *env, jobject obj, jobject param) { + (JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, jobject param) { std::cout << "oneCCL (native): init" << std::endl; ccl::init(); + const char *str = env->GetStringUTFChars(ip_port, 0); + ccl::string ccl_ip_port(str); + + auto kvs_attr = ccl::create_kvs_attr(); + kvs_attr.set(ccl_ip_port); + ccl::shared_ptr_class kvs; - ccl::kvs::address_type main_addr; - kvs = ccl::create_kvs(main_addr); - - auto comm = getComm(); + kvs = ccl::create_main_kvs(kvs_attr); - rank_id = comm->rank(); - comm_size = comm->size(); + g_comms.push_back(ccl::create_communicator(size, rank, kvs)); + + rank_id = getComm().rank(); + comm_size = getComm().size(); jclass cls = env->GetObjectClass(param); jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J"); @@ -34,6 +39,7 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init env->SetLongField(param, fid_comm_size, comm_size); env->SetLongField(param, fid_rank_id, rank_id); + env->ReleaseStringUTFChars(ip_port, str); return 1; } @@ -46,6 +52,8 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup (JNIEnv *env, jobject obj) { + g_comms.pop_back(); + std::cout << "oneCCL (native): cleanup" << std::endl; } @@ -58,7 +66,7 @@ JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup JNIEXPORT jboolean JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_isRoot (JNIEnv *env, jobject obj) { - return getComm()->rank() == 0; + return getComm().rank() == 0; } /* @@ -68,7 +76,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_isRoot */ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_rankID (JNIEnv *env, jobject obj) { - return getComm()->rank(); + return getComm().rank(); } /* diff --git a/mllib-dal/src/main/native/OneCCL.h b/mllib-dal/src/main/native/OneCCL.h index bead7fdee..b579c4697 100644 --- a/mllib-dal/src/main/native/OneCCL.h +++ b/mllib-dal/src/main/native/OneCCL.h @@ -2,4 +2,4 @@ #include -ccl::communicator *getComm(); +ccl::communicator &getComm(); diff --git a/mllib-dal/src/main/native/PCADALImpl.cpp b/mllib-dal/src/main/native/PCADALImpl.cpp index 7aa1dd488..57e7f5dc5 100644 --- a/mllib-dal/src/main/native/PCADALImpl.cpp +++ b/mllib-dal/src/main/native/PCADALImpl.cpp @@ -25,8 +25,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL JNIEnv *env, jobject obj, jlong pNumTabData, jint k, jint executor_num, jint executor_cores, jobject resultObj) { - ccl::communicator *comm = getComm(); - size_t rankId = comm->rank(); + ccl::communicator &comm = getComm(); + size_t rankId = comm.rank(); const size_t nBlocks = executor_num; const int comm_size = executor_num; @@ -71,7 +71,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL // MPI_Gather(nodeResults, perNodeArchLength, MPI_CHAR, serializedData.get(), // perNodeArchLength, MPI_CHAR, ccl_root, MPI_COMM_WORLD); ccl::allgatherv(nodeResults, perNodeArchLength, serializedData.get(), recv_counts, - ccl::datatype::uint8, *comm).wait(); + ccl::datatype::uint8, comm).wait(); auto t2 = std::chrono::high_resolution_clock::now(); diff --git a/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h b/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h index 60825ae3f..4066067f6 100644 --- a/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h +++ b/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h @@ -10,10 +10,10 @@ extern "C" { /* * Class: org_apache_spark_ml_util_OneCCL__ * Method: c_init - * Signature: (Lorg/apache/spark/ml/util/CCLParam;)I + * Signature: (IILjava/lang/String;Lorg/apache/spark/ml/util/CCLParam;)I */ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init - (JNIEnv *, jobject, jobject); + (JNIEnv *, jobject, jint, jint, jstring, jobject); /* * Class: org_apache_spark_ml_util_OneCCL__ diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index 8a69d13f6..d8829b2c9 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -111,9 +111,9 @@ class KMeansDALImpl ( }.cache() - val results = coalescedTables.mapPartitions { table => + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, executorIPAddress, OneCCL.KVS_PORT) + OneCCL.init(executorNum, rank, executorIPAddress) val initCentroids = OneDAL.makeNumericTable(centers) val result = new KMeansResult() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 1760aa171..6f9aaa442 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -48,9 +48,9 @@ class PCADALImpl ( val executorIPAddress = Utils.sparkFirstExecutorIP(input.sparkContext) - val results = coalescedTables.mapPartitions { table => + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, executorIPAddress, OneCCL.KVS_PORT) + OneCCL.init(executorNum, rank, executorIPAddress) val result = new PCAResult() cPCATrainDAL( diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala index 87022f4c9..af9080856 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala @@ -23,52 +23,56 @@ object OneCCL { var cclParam = new CCLParam() - var kvsIPPort = sys.env.getOrElse("CCL_KVS_IP_PORT", "") - var worldSize = sys.env.getOrElse("CCL_WORLD_SIZE", "1").toInt - - val KVS_PORT = 51234 - - private def checkEnv() { - val altTransport = sys.env.getOrElse("CCL_ATL_TRANSPORT", "") - val pmType = sys.env.getOrElse("CCL_PM_TYPE", "") - val ipExchange = sys.env.getOrElse("CCL_KVS_IP_EXCHANGE", "") - - assert(altTransport == "ofi") - assert(pmType == "resizable") - assert(ipExchange == "env") - assert(kvsIPPort != "") - - } +// var kvsIPPort = sys.env.getOrElse("CCL_KVS_IP_PORT", "") +// var worldSize = sys.env.getOrElse("CCL_WORLD_SIZE", "1").toInt + + var kvsPort = 5000 + +// private def checkEnv() { +// val altTransport = sys.env.getOrElse("CCL_ATL_TRANSPORT", "") +// val pmType = sys.env.getOrElse("CCL_PM_TYPE", "") +// val ipExchange = sys.env.getOrElse("CCL_KVS_IP_EXCHANGE", "") +// +// assert(altTransport == "ofi") +// assert(pmType == "resizable") +// assert(ipExchange == "env") +// assert(kvsIPPort != "") +// +// } // Run on Executor - def setExecutorEnv(executor_num: Int, ip: String, port: Int): Unit = { - // Work around ccl by passings in a spark.executorEnv.CCL_KVS_IP_PORT. - val ccl_kvs_ip_port = sys.env.getOrElse("CCL_KVS_IP_PORT", s"${ip}_${port}") - - println(s"oneCCL: Initializing with CCL_KVS_IP_PORT: $ccl_kvs_ip_port") - - setEnv("CCL_PM_TYPE", "resizable") - setEnv("CCL_ATL_TRANSPORT","ofi") - setEnv("CCL_ATL_TRANSPORT_PATH", LibLoader.getTempSubDir()) - setEnv("CCL_KVS_IP_EXCHANGE","env") - setEnv("CCL_KVS_IP_PORT", ccl_kvs_ip_port) - setEnv("CCL_WORLD_SIZE", s"${executor_num}") - // Uncomment this if you whant to debug oneCCL - // setEnv("CCL_LOG_LEVEL", "2") - } - - def init(executor_num: Int, ip: String, port: Int) = { - - setExecutorEnv(executor_num, ip, port) +// def setExecutorEnv(executor_num: Int, ip: String, port: Int): Unit = { +// // Work around ccl by passings in a spark.executorEnv.CCL_KVS_IP_PORT. +// val ccl_kvs_ip_port = sys.env.getOrElse("CCL_KVS_IP_PORT", s"${ip}_${port}") +// +// println(s"oneCCL: Initializing with CCL_KVS_IP_PORT: $ccl_kvs_ip_port") +// +// setEnv("CCL_PM_TYPE", "resizable") +// setEnv("CCL_ATL_TRANSPORT","ofi") +// setEnv("CCL_ATL_TRANSPORT_PATH", LibLoader.getTempSubDir()) +// setEnv("CCL_KVS_IP_EXCHANGE","env") +// setEnv("CCL_KVS_IP_PORT", ccl_kvs_ip_port) +// setEnv("CCL_WORLD_SIZE", s"${executor_num}") +// // Uncomment this if you whant to debug oneCCL +// // setEnv("CCL_LOG_LEVEL", "2") +// } + + def init(executor_num: Int, rank: Int, ip: String) = { + +// setExecutorEnv(executor_num, ip, port) + println(s"oneCCL: Initializing with IP_PORT: ${ip}_${kvsPort}") // cclParam is output from native code - c_init(cclParam) + c_init(executor_num, rank, ip+"_"+kvsPort.toString, cclParam) // executor number should equal to oneCCL world size assert(executor_num == cclParam.commSize, "executor number should equal to oneCCL world size") println(s"oneCCL: Initialized with executorNum: $executor_num, commSize, ${cclParam.commSize}, rankId: ${cclParam.rankId}") + // Use a new port when calling init again + kvsPort = kvsPort + 1 + } // Run on Executor @@ -76,7 +80,7 @@ object OneCCL { c_cleanup() } - @native private def c_init(param: CCLParam) : Int + @native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam) : Int @native private def c_cleanup() : Unit @native def isRoot() : Boolean From 3c3e2cf0672da7c571f991beb174afe541dfb442 Mon Sep 17 00:00:00 2001 From: Xiaochang Wu Date: Tue, 26 Jan 2021 14:38:25 +0800 Subject: [PATCH 3/5] update CI --- dev/install-build-deps-centos.sh | 2 +- dev/install-build-deps-ubuntu.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/install-build-deps-centos.sh b/dev/install-build-deps-centos.sh index 7b27736ae..8a347fdef 100755 --- a/dev/install-build-deps-centos.sh +++ b/dev/install-build-deps-centos.sh @@ -23,7 +23,7 @@ cd /tmp rm -rf oneCCL git clone https://github.com/oneapi-src/oneCCL cd oneCCL -git checkout beta08 +git checkout 2021.1 mkdir -p build && cd build cmake .. make -j 2 install diff --git a/dev/install-build-deps-ubuntu.sh b/dev/install-build-deps-ubuntu.sh index 07019b834..d43e35b89 100755 --- a/dev/install-build-deps-ubuntu.sh +++ b/dev/install-build-deps-ubuntu.sh @@ -17,7 +17,7 @@ echo "Building oneCCL ..." cd /tmp git clone https://github.com/oneapi-src/oneCCL cd oneCCL -git checkout beta08 +git checkout 2021.1 mkdir build && cd build cmake .. make -j 2 install From 2d87d07940a4ed3709b825a8f5779ecefe1a7820 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Tue, 2 Feb 2021 17:38:51 +0800 Subject: [PATCH 4/5] fix LibLoader.loadLibMLlibDAL with loadLibraries --- .../src/main/java/org/apache/spark/ml/util/LibLoader.java | 6 +++--- .../src/main/scala/org/apache/spark/ml/util/Utils.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java index c11b4e56e..ed83f3fe8 100644 --- a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java +++ b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java @@ -54,7 +54,7 @@ public static synchronized void loadLibraries() throws IOException { /** * Load oneCCL libs in dependency order */ - public static synchronized void loadLibCCL() throws IOException { + private static synchronized void loadLibCCL() throws IOException { loadFromJar(subDir, "libfabric.so.1"); loadFromJar(subDir, "libmpi.so.12"); loadFromJar(subDir, "libccl.so"); @@ -65,12 +65,12 @@ public static synchronized void loadLibCCL() throws IOException { * Load MLlibDAL lib, it depends TBB libs that are loaded by oneDAL, * so this function should be called after oneDAL loadLibrary */ - public static synchronized void loadLibMLlibDAL() throws IOException { + private static synchronized void loadLibMLlibDAL() throws IOException { // oneDAL Java API doesn't load correct libtbb version for oneAPI Beta 10 // Rename in pom.xml and assembly.xml to workaround. // See https://github.com/oneapi-src/oneDAL/issues/1254 --> LibUtils.loadLibrary(); - + loadFromJar(subDir, "libMLlibDAL.so"); } diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala index 40a1c6823..a7b762945 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala @@ -72,7 +72,7 @@ object Utils { } def checkClusterPlatformCompatibility(sc: SparkContext) : Boolean = { - LibLoader.loadLibMLlibDAL() + LibLoader.loadLibraries() // check driver platform compatibility if (!OneDAL.cCheckPlatformCompatibility()) @@ -82,7 +82,7 @@ object Utils { val executor_num = Utils.sparkExecutorNum(sc) val data = sc.parallelize(1 to executor_num, executor_num) val result = data.map { p => - LibLoader.loadLibMLlibDAL() + LibLoader.loadLibraries() OneDAL.cCheckPlatformCompatibility() }.collect() From 318cae1790316faf8d968aad2c006b51feba1b57 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Tue, 2 Feb 2021 17:39:50 +0800 Subject: [PATCH 5/5] use spark.oap.mllib.oneccl.kvs.ip to workaround KVS IP hang issue --- .../scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala | 3 ++- .../main/scala/org/apache/spark/ml/feature/PCADALImpl.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index d8829b2c9..31b7e7c75 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -41,6 +41,7 @@ class KMeansDALImpl ( instr.foreach(_.logInfo(s"Processing partitions with $executorNum executors")) val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) + val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) // repartition to executorNum if not enough partitions val dataForConversion = if (data.getNumPartitions < executorNum) { @@ -113,7 +114,7 @@ class KMeansDALImpl ( val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, rank, executorIPAddress) + OneCCL.init(executorNum, rank, kvsIP) val initCentroids = OneDAL.makeNumericTable(centers) val result = new KMeansResult() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 6f9aaa442..33dbe8349 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -47,10 +47,11 @@ class PCADALImpl ( val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum) val executorIPAddress = Utils.sparkFirstExecutorIP(input.sparkContext) + val kvsIP = input.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, rank, executorIPAddress) + OneCCL.init(executorNum, rank, kvsIP) val result = new PCAResult() cPCATrainDAL(