diff --git a/mllib-dal/src/main/native/Common.hpp b/mllib-dal/src/main/native/Common.hpp new file mode 100644 index 000000000..baaf4b234 --- /dev/null +++ b/mllib-dal/src/main/native/Common.hpp @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +#pragma once + +#ifndef ONEDAL_DATA_PARALLEL +#define ONEDAL_DATA_PARALLEL +#endif + +#include "GPU.h" +#include "Communicator.hpp" +#include "OutputHelpers.hpp" +#include "oneapi/dal/table/homogen.hpp" diff --git a/mllib-dal/src/main/native/Communicator.hpp b/mllib-dal/src/main/native/Communicator.hpp index 26559a499..bdd5072c9 100644 --- a/mllib-dal/src/main/native/Communicator.hpp +++ b/mllib-dal/src/main/native/Communicator.hpp @@ -33,13 +33,10 @@ class ccl_info { friend class de::singleton; private: - ccl_info(int size, int rankId, const ccl::string &ipPort) { + ccl_info(int size, int rankId, ccl::shared_ptr_class keyvs) { rank = rankId; rank_count = size; - ccl::string ccl_ip_port(ipPort); - auto kvs_attr = ccl::create_kvs_attr(); - kvs_attr.set(ccl_ip_port); - kvs = ccl::create_main_kvs(kvs_attr); + kvs = keyvs; } public: @@ -49,8 +46,8 @@ class ccl_info { }; template -communicator make_communicator(int size, int rank, const ccl::string &ipPort) { - auto& info = de::singleton::get(size, rank, ipPort); +communicator make_communicator(int size, int rank, const ccl::shared_ptr_class kvs) { + auto& info = de::singleton::get(size, rank, kvs); // integral cast return oneapi::dal::detail::ccl_communicator{ info.kvs, info.rank, @@ -58,8 +55,8 @@ communicator make_communicator(int size, int rank, c } template -communicator make_communicator(sycl::queue& queue, int size, int rank, const ccl::string &ipPort) { - auto& info = de::singleton::get(size, rank, ipPort); +communicator make_communicator(sycl::queue& queue, int size, int rank, const ccl::shared_ptr_class kvs) { + auto& info = de::singleton::get(size, rank, kvs); return oneapi::dal::detail::ccl_communicator{ queue, info.kvs, diff --git a/mllib-dal/src/main/native/CorrelationImpl.cpp b/mllib-dal/src/main/native/CorrelationImpl.cpp index ebf94433f..a86e33a1a 100644 --- a/mllib-dal/src/main/native/CorrelationImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationImpl.cpp @@ -17,14 +17,8 @@ #include #ifdef CPU_GPU_PROFILE -#include "GPU.h" -#ifndef ONEDAL_DATA_PARALLEL -#define ONEDAL_DATA_PARALLEL -#endif -#include "Communicator.hpp" -#include "OutputHelpers.hpp" +#include "Common.hpp" #include "oneapi/dal/algo/covariance.hpp" -#include "oneapi/dal/table/homogen.hpp" #endif #include "OneCCL.h" @@ -153,41 +147,30 @@ static void doCorrelationDaalCompute(JNIEnv *env, jobject obj, int rankId, #endif #ifdef CPU_GPU_PROFILE -static void doCorrelationOneAPICompute(JNIEnv *env, jint rankId, - jlong pNumTabData, jint executorNum, - const ccl::string &ipPort, - ComputeDevice &device, - jobject resultObj) { - std::cout << "oneDAL (native): GPU compute start , rankid " << rankId - << std::endl; - const bool isRoot = (rankId == ccl_root); +static void doCorrelationOneAPICompute( + JNIEnv *env, jlong pNumTabData, + preview::spmd::communicator comm, + jobject resultObj) { + std::cout << "oneDAL (native): GPU compute start" << std::endl; + const bool isRoot = (comm.get_rank() == ccl_root); homogen_table htable = *reinterpret_cast(pNumTabData); const auto cor_desc = covariance::descriptor{}.set_result_options( covariance::result_options::cor_matrix | covariance::result_options::means); - auto queue = getQueue(device); - auto comm = preview::spmd::make_communicator( - queue, executorNum, rankId, ipPort); auto t1 = std::chrono::high_resolution_clock::now(); const auto result_train = preview::compute(comm, cor_desc, htable); - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - std::chrono::duration_cast(t2 - t1).count(); - std::cout << "Correlation (native): rankid " << rankId - << "; computing step took " << duration / 1000 << " secs" - << std::endl; if (isRoot) { std::cout << "Mean:\n" << result_train.get_means() << std::endl; std::cout << "Correlation:\n" << result_train.get_cor_matrix() << std::endl; - t2 = std::chrono::high_resolution_clock::now(); - duration = + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(t2 - t1) .count(); std::cout << "Correlation batch(native): computing step took " - << duration / 1000 << " secs in end. " << std::endl; + << duration / 1000 << " secs." << std::endl; // Return all covariance & mean jclass clazz = env->GetObjectClass(resultObj); @@ -208,18 +191,18 @@ static void doCorrelationOneAPICompute(JNIEnv *env, jint rankId, JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL( JNIEnv *env, jobject obj, jlong pNumTabData, jint executorNum, - jint executorCores, jint computeDeviceOrdinal, jint rankId, jstring ipPort, + jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) { - std::cout << "oneDAL (native): use DPC++ kernels " << std::endl; - const char *ipPortPtr = env->GetStringUTFChars(ipPort, 0); - std::string ipPortStr = std::string(ipPortPtr); - + std::cout << "oneDAL (native): use DPC++ kernels " + << "; device " << ComputeDeviceString[computeDeviceOrdinal] + << std::endl; + ccl::communicator &cclComm = getComm(); + int rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { #ifdef CPU_ONLY_PROFILE case ComputeDevice::host: case ComputeDevice::cpu: { - ccl::communicator &comm = getComm(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads(executorCores); @@ -228,17 +211,32 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL( services::Environment::getInstance()->getNumberOfThreads(); std::cout << "oneDAL (native): Number of CPU threads used" << nThreadsNew << std::endl; - doCorrelationDaalCompute(env, obj, rankId, comm, pData, executorNum, + doCorrelationDaalCompute(env, obj, rankId, cclComm, pData, executorNum, resultObj); } #else case ComputeDevice::gpu: { - doCorrelationOneAPICompute(env, rankId, pNumTabData, executorNum, - ipPortStr, device, resultObj); + int nGpu = env->GetArrayLength(gpuIdxArray); + std::cout << "oneDAL (native): use GPU kernels with " << nGpu + << " GPU(s)" + << " rankid " << rankId << std::endl; + + jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); + + int size = cclComm.size(); + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); + + auto queue = + getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + + ccl::shared_ptr_class &kvs = getKvs(); + auto comm = + preview::spmd::make_communicator( + queue, size, rankId, kvs); + doCorrelationOneAPICompute(env, pNumTabData, comm, resultObj); } #endif } - env->ReleaseStringUTFChars(ipPort, ipPortPtr); return 0; } diff --git a/mllib-dal/src/main/native/GPU.cpp b/mllib-dal/src/main/native/GPU.cpp index 1574c430e..4205b8c26 100644 --- a/mllib-dal/src/main/native/GPU.cpp +++ b/mllib-dal/src/main/native/GPU.cpp @@ -50,24 +50,6 @@ static int getLocalRank(ccl::communicator &comm, int size, int rank) { // return 0; } -sycl::device getAssignedGPU(ccl::communicator &comm, int size, int rankId, - jint *gpu_indices, int n_gpu) { - auto local_rank = getLocalRank(comm, size, rankId); - auto gpus = get_gpus(); - - std::cout << "rank: " << rankId << " size: " << size - << " local_rank: " << local_rank << " n_gpu: " << n_gpu - << std::endl; - - auto gpu_selected = gpu_indices[local_rank % n_gpu]; - std::cout << "GPU selected for current rank: " << gpu_selected << std::endl; - - // In case gpu_selected index is larger than number of GPU SYCL devices - auto rank_gpu = gpus[gpu_selected % gpus.size()]; - - return rank_gpu; -} - static sycl::queue getSyclQueue(const sycl::device device) { g_mtx.lock(); if (!g_queueVector.empty()) { @@ -83,9 +65,8 @@ static sycl::queue getSyclQueue(const sycl::device device) { } } -sycl::queue getQueue(const ComputeDevice device) { - std::cout << "Get Queue" << std::endl; - +sycl::queue getAssignedGPU(const ComputeDevice device, ccl::communicator &comm, + int size, int rankId, jint *gpu_indices, int n_gpu) { switch (device) { case ComputeDevice::host: case ComputeDevice::cpu: { @@ -94,13 +75,49 @@ sycl::queue getQueue(const ComputeDevice device) { << std::endl; exit(-1); } + case ComputeDevice::gpu: { + std::cout << "selector GPU" << std::endl; + auto local_rank = getLocalRank(comm, size, rankId); + auto gpus = get_gpus(); + + std::cout << "rank: " << rankId << " size: " << size + << " local_rank: " << local_rank << " n_gpu: " << n_gpu + << std::endl; + + auto gpu_selected = gpu_indices[local_rank % n_gpu]; + std::cout << "GPU selected for current rank: " << gpu_selected + << std::endl; + + // In case gpu_selected index is larger than number of GPU SYCL devices + auto rank_gpu = gpus[gpu_selected % gpus.size()]; + sycl::queue q{rank_gpu}; + return q; + } + + default: { + std::cout << "No Device!" << std::endl; + exit(-1); + } + } +} + +sycl::queue getQueue(const ComputeDevice device) { + std::cout << "Get Queue" << std::endl; + + switch (device) { + case ComputeDevice::host: + case ComputeDevice::cpu: { + std::cout << "Not implemented for HOST/CPU device, Please run on " + "GPU device." + << std::endl; + exit(-1); + } case ComputeDevice::gpu: { std::cout << "selector GPU" << std::endl; auto device_gpu = sycl::gpu_selector{}.select_device(); std::cout << "selector GPU end" << std::endl; return getSyclQueue(device_gpu); } - default: { std::cout << "No Device!" << std::endl; exit(-1); diff --git a/mllib-dal/src/main/native/GPU.h b/mllib-dal/src/main/native/GPU.h index 7c6aa8d64..818d3ddb4 100644 --- a/mllib-dal/src/main/native/GPU.h +++ b/mllib-dal/src/main/native/GPU.h @@ -1,9 +1,5 @@ #pragma once -#ifndef ONEDAL_DATA_PARALLEL -#define ONEDAL_DATA_PARALLEL -#endif - #include "service.h" #include #include @@ -11,7 +7,7 @@ #include #include -sycl::device getAssignedGPU(ccl::communicator &comm, int size, int rankId, - jint *gpu_indices, int n_gpu); +sycl::queue getAssignedGPU(const ComputeDevice device, ccl::communicator &comm, + int size, int rankId, jint *gpu_indices, int n_gpu); sycl::queue getQueue(const ComputeDevice device); diff --git a/mllib-dal/src/main/native/KMeansImpl.cpp b/mllib-dal/src/main/native/KMeansImpl.cpp index d58fae796..8a7d969ff 100644 --- a/mllib-dal/src/main/native/KMeansImpl.cpp +++ b/mllib-dal/src/main/native/KMeansImpl.cpp @@ -19,14 +19,8 @@ #include #ifdef CPU_GPU_PROFILE -#include "GPU.h" -#ifndef ONEDAL_DATA_PARALLEL -#define ONEDAL_DATA_PARALLEL -#endif -#include "Communicator.hpp" -#include "OutputHelpers.hpp" +#include "Common.hpp" #include "oneapi/dal/algo/kmeans.hpp" -#include "oneapi/dal/table/homogen.hpp" #endif #include "OneCCL.h" @@ -187,8 +181,7 @@ static jlong doKMeansDaalCompute(JNIEnv *env, jobject obj, int rankId, NumericTablePtr ¢roids, jint cluster_num, jdouble tolerance, jint iteration_num, jint executor_num, jobject resultObj) { - std::cout << "oneDAL (native): CPU compute start , rankid " << rankId - << std::endl; + std::cout << "oneDAL (native): CPU compute start" << std::endl; algorithmFPType totalCost; NumericTablePtr newCentroids; @@ -249,14 +242,13 @@ static jlong doKMeansDaalCompute(JNIEnv *env, jobject obj, int rankId, #endif #ifdef CPU_GPU_PROFILE -static jlong doKMeansOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, - jlong pNumTabCenters, jint clusterNum, - jdouble tolerance, jint iterationNum, - jint executorNum, const ccl::string &ipPort, - ComputeDevice &device, jobject resultObj) { - std::cout << "oneDAL (native): GPU compute start , rankid " << rankId - << std::endl; - const bool isRoot = (rankId == ccl_root); +static jlong doKMeansOneAPICompute( + JNIEnv *env, jlong pNumTabData, jlong pNumTabCenters, jint clusterNum, + jdouble tolerance, jint iterationNum, + preview::spmd::communicator comm, + jobject resultObj) { + std::cout << "oneDAL (native): GPU compute start" << std::endl; + const bool isRoot = (comm.get_rank() == ccl_root); homogen_table htable = *reinterpret_cast(pNumTabData); homogen_table centroids = @@ -266,29 +258,20 @@ static jlong doKMeansOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, .set_max_iteration_count(iterationNum) .set_accuracy_threshold(tolerance); kmeans::train_input local_input{htable, centroids}; - auto queue = getQueue(device); - auto comm = preview::spmd::make_communicator( - queue, executorNum, rankId, ipPort); auto t1 = std::chrono::high_resolution_clock::now(); kmeans::train_result result_train = preview::train(comm, kmeans_desc, local_input); - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - std::chrono::duration_cast(t2 - t1).count(); - std::cout << "KMeans (native): rankid " << rankId - << "; training step took " << duration / 1000 << " secs" - << std::endl; if (isRoot) { std::cout << "Iteration count: " << result_train.get_iteration_count() << std::endl; std::cout << "Centroids:\n" << result_train.get_model().get_centroids() << std::endl; - t2 = std::chrono::high_resolution_clock::now(); - duration = + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(t2 - t1) .count(); std::cout << "KMeans (native): training step took " << duration / 1000 - << " secs in end. " << std::endl; + << " secs." << std::endl; // Get the class of the input object jclass clazz = env->GetObjectClass(resultObj); // Get Field references @@ -321,20 +304,19 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCenters( JNIEnv *env, jobject obj, jlong pNumTabData, jlong pNumTabCenters, jint clusterNum, jdouble tolerance, jint iterationNum, jint executorNum, - jint executorCores, jint computeDeviceOrdinal, jint rankId, jstring ipPort, + jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) { std::cout << "oneDAL (native): use DPC++ kernels " << "; device " << ComputeDeviceString[computeDeviceOrdinal] << std::endl; - const char *ipPortPtr = env->GetStringUTFChars(ipPort, 0); - std::string ipPortStr = std::string(ipPortPtr); jlong ret = 0L; + ccl::communicator &cclComm = getComm(); + int rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { #ifdef CPU_ONLY_PROFILE case ComputeDevice::host: case ComputeDevice::cpu: { - ccl::communicator &comm = getComm(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); NumericTablePtr centroids = *((NumericTablePtr *)pNumTabCenters); // Set number of threads for oneDAL to use for each rank @@ -344,19 +326,34 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe services::Environment::getInstance()->getNumberOfThreads(); std::cout << "oneDAL (native): Number of CPU threads used " << nThreadsNew << std::endl; - ret = doKMeansDaalCompute(env, obj, rankId, comm, pData, centroids, + ret = doKMeansDaalCompute(env, obj, rankId, cclComm, pData, centroids, clusterNum, tolerance, iterationNum, executorNum, resultObj); } #else case ComputeDevice::gpu: { - ret = doKMeansOneAPICompute(env, rankId, pNumTabData, pNumTabCenters, - clusterNum, tolerance, iterationNum, - executorNum, ipPortStr, device, resultObj); + int nGpu = env->GetArrayLength(gpuIdxArray); + std::cout << "oneDAL (native): use GPU kernels with " << nGpu + << " GPU(s)" + << " rankid " << rankId << std::endl; + + jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); + + int size = cclComm.size(); + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); + + auto queue = + getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + + ccl::shared_ptr_class &kvs = getKvs(); + auto comm = + preview::spmd::make_communicator( + queue, size, rankId, kvs); + ret = + doKMeansOneAPICompute(env, pNumTabData, pNumTabCenters, clusterNum, + tolerance, iterationNum, comm, resultObj); } #endif } - - env->ReleaseStringUTFChars(ipPort, ipPortPtr); return ret; } diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 49ad53386..bbfc9fbe9 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -38,8 +38,10 @@ static std::list local_host_ips; static size_t comm_size = 0; static size_t rank_id = 0; static std::vector g_comms; +static std::vector> g_kvs; ccl::communicator &getComm() { return g_comms[0]; } +ccl::shared_ptr_class &getKvs() { return g_kvs[0]; } JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, @@ -60,6 +62,7 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( ccl::shared_ptr_class kvs; kvs = ccl::create_main_kvs(kvs_attr); + g_kvs.push_back(kvs); g_comms.push_back(ccl::create_communicator(size, rank, kvs)); auto t2 = std::chrono::high_resolution_clock::now(); @@ -98,6 +101,7 @@ Java_com_intel_oap_mllib_OneCCL_00024_c_1initDpcpp(JNIEnv *env, jobject) { JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(JNIEnv *env, jobject obj) { + g_kvs.pop_back(); g_comms.pop_back(); std::cerr << "OneCCL (native): cleanup" << std::endl; diff --git a/mllib-dal/src/main/native/OneCCL.h b/mllib-dal/src/main/native/OneCCL.h index 8079a3ced..4be15fcab 100644 --- a/mllib-dal/src/main/native/OneCCL.h +++ b/mllib-dal/src/main/native/OneCCL.h @@ -43,4 +43,5 @@ event CCL_API gather(const BufferType *sendbuf, int sendcount, } // namespace ccl ccl::communicator &getComm(); +ccl::shared_ptr_class &getKvs(); extern const int ccl_root; diff --git a/mllib-dal/src/main/native/PCAImpl.cpp b/mllib-dal/src/main/native/PCAImpl.cpp index 1f8cf8d2f..1996d7291 100644 --- a/mllib-dal/src/main/native/PCAImpl.cpp +++ b/mllib-dal/src/main/native/PCAImpl.cpp @@ -18,15 +18,9 @@ #include #ifdef CPU_GPU_PROFILE -#include "GPU.h" -#ifndef ONEDAL_DATA_PARALLEL -#define ONEDAL_DATA_PARALLEL -#endif -#include "Communicator.hpp" -#include "OutputHelpers.hpp" +#include "Common.hpp" #include "oneapi/dal/algo/covariance.hpp" #include "oneapi/dal/algo/pca.hpp" -#include "oneapi/dal/table/homogen.hpp" #endif #include "OneCCL.h" @@ -48,8 +42,7 @@ typedef double algorithmFPType; /* Algorithm floating-point type */ static void doPCADAALCompute(JNIEnv *env, jobject obj, int rankId, ccl::communicator &comm, NumericTablePtr &pData, int nBlocks, jobject resultObj) { - std::cout << "oneDAL (native): CPU compute start , rankid " << rankId - << std::endl; + std::cout << "oneDAL (native): CPU compute start" << std::endl; using daal::byte; auto t1 = std::chrono::high_resolution_clock::now(); @@ -185,20 +178,17 @@ static void doPCADAALCompute(JNIEnv *env, jobject obj, int rankId, #endif #ifdef CPU_GPU_PROFILE -static void doPCAOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, - jint executorNum, const ccl::string &ipPort, - ComputeDevice &device, jobject resultObj) { - std::cout << "oneDAL (native): GPU compute start , rankid " << rankId - << std::endl; - const bool isRoot = (rankId == ccl_root); +static void doPCAOneAPICompute( + JNIEnv *env, jlong pNumTabData, + preview::spmd::communicator comm, + jobject resultObj) { + std::cout << "oneDAL (native): GPU compute start" << std::endl; + const bool isRoot = (comm.get_rank() == ccl_root); homogen_table htable = *reinterpret_cast(pNumTabData); const auto cov_desc = covariance::descriptor{}.set_result_options( covariance::result_options::cov_matrix); - auto queue = getQueue(device); - auto comm = preview::spmd::make_communicator( - queue, executorNum, rankId, ipPort); auto t1 = std::chrono::high_resolution_clock::now(); const auto result = preview::compute(comm, cov_desc, htable); @@ -214,15 +204,15 @@ static void doPCAOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, using descriptor_t = pca::descriptor; const auto pca_desc = descriptor_t().set_deterministic(true); - auto t1 = std::chrono::high_resolution_clock::now(); + t1 = std::chrono::high_resolution_clock::now(); const auto result_train = - train(queue, pca_desc, result.get_cov_matrix()); + preview::train(comm, pca_desc, result.get_cov_matrix()); t2 = std::chrono::high_resolution_clock::now(); duration = std::chrono::duration_cast(t2 - t1) .count(); - std::cout << "PCA (native): rankid " << rankId << "; Eigen step took " - << duration / 1000 << " secs in end. " << std::endl; + std::cout << "PCA (native): Eigen step took " << duration / 1000 + << " secs." << std::endl; // Return all eigenvalues & eigenvectors // Get the class of the input object jclass clazz = env->GetObjectClass(resultObj); @@ -255,19 +245,18 @@ static void doPCAOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL( JNIEnv *env, jobject obj, jlong pNumTabData, jint executorNum, - jint executorCores, jint computeDeviceOrdinal, jint rankId, jstring ipPort, + jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) { std::cout << "oneDAL (native): use DPC++ kernels " << "; device " << ComputeDeviceString[computeDeviceOrdinal] << std::endl; - const char *ipPortPtr = env->GetStringUTFChars(ipPort, 0); - std::string ipPortStr = std::string(ipPortPtr); + ccl::communicator &cclComm = getComm(); + int rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { #ifdef CPU_ONLY_PROFILE case ComputeDevice::host: case ComputeDevice::cpu: { - ccl::communicator &comm = getComm(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads(executorCores); @@ -276,16 +265,31 @@ Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL( services::Environment::getInstance()->getNumberOfThreads(); std::cout << "oneDAL (native): Number of CPU threads used " << nThreadsNew << std::endl; - doPCADAALCompute(env, obj, rankId, comm, pData, executorNum, resultObj); + doPCADAALCompute(env, obj, rankId, cclComm, pData, executorNum, + resultObj); } #else case ComputeDevice::gpu: { - doPCAOneAPICompute(env, rankId, pNumTabData, executorNum, ipPortStr, - device, resultObj); + int nGpu = env->GetArrayLength(gpuIdxArray); + std::cout << "oneDAL (native): use GPU kernels with " << nGpu + << " GPU(s)" + << " rankid " << rankId << std::endl; + + jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); + + int size = cclComm.size(); + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); + + auto queue = + getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + + ccl::shared_ptr_class &kvs = getKvs(); + auto comm = + preview::spmd::make_communicator( + queue, size, rankId, kvs); + doPCAOneAPICompute(env, pNumTabData, comm, resultObj); } #endif } - - env->ReleaseStringUTFChars(ipPort, ipPortPtr); return 0; } diff --git a/mllib-dal/src/main/native/Singleton.hpp b/mllib-dal/src/main/native/Singleton.hpp index 913ac09fb..1169feac4 100644 --- a/mllib-dal/src/main/native/Singleton.hpp +++ b/mllib-dal/src/main/native/Singleton.hpp @@ -23,17 +23,17 @@ namespace v1 { template class singleton { public: - static T& get(int size, int rank, ccl::string ip_port) { + static T& get(int size, int rank, ccl::shared_ptr_class kvs) { static std::once_flag flag; - std::call_once(flag, [size, rank, ip_port] { - get_instance(size, rank, ip_port); + std::call_once(flag, [size, rank, kvs] { + get_instance(size, rank, kvs); }); - return get_instance(size, rank, ip_port); + return get_instance(size, rank, kvs); } private: - static T& get_instance(int size, int rank, ccl::string ip_port) { - static T instance{size, rank, ip_port}; + static T& get_instance(int size, int rank, ccl::shared_ptr_class kvs) { + static T instance{size, rank, kvs}; return instance; } }; diff --git a/mllib-dal/src/main/native/SummarizerImpl.cpp b/mllib-dal/src/main/native/SummarizerImpl.cpp index 362ae223b..22c50ace2 100644 --- a/mllib-dal/src/main/native/SummarizerImpl.cpp +++ b/mllib-dal/src/main/native/SummarizerImpl.cpp @@ -17,14 +17,8 @@ #include #ifdef CPU_GPU_PROFILE -#include "GPU.h" -#ifndef ONEDAL_DATA_PARALLEL -#define ONEDAL_DATA_PARALLEL -#endif -#include "Communicator.hpp" -#include "OutputHelpers.hpp" +#include "Common.hpp" #include "oneapi/dal/algo/basic_statistics.hpp" -#include "oneapi/dal/table/homogen.hpp" #endif #include "OneCCL.h" @@ -48,8 +42,7 @@ static void doSummarizerDAALCompute(JNIEnv *env, jobject obj, int rankId, ccl::communicator &comm, const NumericTablePtr &pData, size_t nBlocks, jobject resultObj) { - std::cout << "oneDAL (native): CPU compute start , rankid " << rankId - << std::endl; + std::cout << "oneDAL (native): CPU compute start" << std::endl; using daal::byte; auto t1 = std::chrono::high_resolution_clock::now(); @@ -209,40 +202,29 @@ static void doSummarizerDAALCompute(JNIEnv *env, jobject obj, int rankId, #endif #ifdef CPU_GPU_PROFILE -static void doSummarizerOneAPICompute(JNIEnv *env, jint rankId, - jlong pNumTabData, jint executorNum, - const ccl::string &ipPort, - ComputeDevice &device, - jobject resultObj) { - std::cout << "oneDAL (native): GPU compute start , rankid " << rankId - << std::endl; - const bool isRoot = (rankId == ccl_root); +static void doSummarizerOneAPICompute( + JNIEnv *env, jlong pNumTabData, + preview::spmd::communicator comm, + jobject resultObj) { + std::cout << "oneDAL (native): GPU compute start" << std::endl; + const bool isRoot = (comm.get_rank() == ccl_root); homogen_table htable = *reinterpret_cast(pNumTabData); const auto bs_desc = basic_statistics::descriptor{}; - auto queue = getQueue(device); - auto comm = preview::spmd::make_communicator( - queue, executorNum, rankId, ipPort); auto t1 = std::chrono::high_resolution_clock::now(); const auto result_train = preview::compute(comm, bs_desc, htable); - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - (float)std::chrono::duration_cast(t2 - t1) - .count(); - std::cout << "Summarizer (native): rankid " << rankId - << "; computing step took " << duration / 1000 << " secs" - << std::endl; if (isRoot) { std::cout << "Minimum:\n" << result_train.get_min() << std::endl; std::cout << "Maximum:\n" << result_train.get_max() << std::endl; std::cout << "Mean:\n" << result_train.get_mean() << std::endl; std::cout << "Variance:\n" << result_train.get_variance() << std::endl; - t2 = std::chrono::high_resolution_clock::now(); - duration = (float)std::chrono::duration_cast( - t2 - t1) - .count(); + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + (float)std::chrono::duration_cast(t2 - + t1) + .count(); std::cout << "Summarizer (native): computing step took " - << duration / 1000 << " secs in end. " << std::endl; + << duration / 1000 << " secs." << std::endl; // Return all covariance & mean jclass clazz = env->GetObjectClass(resultObj); @@ -280,21 +262,18 @@ static void doSummarizerOneAPICompute(JNIEnv *env, jint rankId, JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL( JNIEnv *env, jobject obj, jlong pNumTabData, jint executorNum, - jint executorCores, jint computeDeviceOrdinal, jint rankId, jstring ipPort, + jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) { std::cout << "oneDAL (native): use DPC++ kernels " << "; device " << ComputeDeviceString[computeDeviceOrdinal] << std::endl; - const char *ipPortPtr = env->GetStringUTFChars(ipPort, 0); - std::string ipPortStr = std::string(ipPortPtr); - + ccl::communicator &cclComm = getComm(); + int rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { #ifdef CPU_ONLY_PROFILE case ComputeDevice::host: case ComputeDevice::cpu: { - ccl::communicator &comm = getComm(); - NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads(executorCores); @@ -303,17 +282,32 @@ Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL( services::Environment::getInstance()->getNumberOfThreads(); std::cout << "oneDAL (native): Number of CPU threads used " << nThreadsNew << std::endl; - doSummarizerDAALCompute(env, obj, rankId, comm, pData, executorNum, + doSummarizerDAALCompute(env, obj, rankId, cclComm, pData, executorNum, resultObj); } #else case ComputeDevice::gpu: { - doSummarizerOneAPICompute(env, rankId, pNumTabData, executorNum, - ipPortStr, device, resultObj); + int nGpu = env->GetArrayLength(gpuIdxArray); + std::cout << "oneDAL (native): use GPU kernels with " << nGpu + << " GPU(s)" + << " rankid " << rankId << std::endl; + + jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); + + int size = cclComm.size(); + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); + + auto queue = + getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + + ccl::shared_ptr_class &kvs = getKvs(); + auto comm = + preview::spmd::make_communicator( + queue, size, rankId, kvs); + doSummarizerOneAPICompute(env, pNumTabData, comm, resultObj); } #endif } - env->ReleaseStringUTFChars(ipPort, ipPortPtr); return 0; } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneCCL__.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneCCL__.h index 71069a6cb..6d8a86f6a 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneCCL__.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneCCL__.h @@ -7,30 +7,6 @@ #ifdef __cplusplus extern "C" { #endif -/* - * Class: com_intel_oap_mllib_OneCCL__ - * Method: c_init - * Signature: (IILjava/lang/String;Lcom/intel/oap/mllib/CCLParam;)I - */ -JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init - (JNIEnv *, jobject, jint, jint, jstring, jobject); - -/* - * Class: com_intel_oap_mllib_OneCCL__ - * Method: c_initDpcpp - * Signature: ()I - */ -JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1initDpcpp - (JNIEnv *, jobject); - -/* - * Class: com_intel_oap_mllib_OneCCL__ - * Method: c_cleanup - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup - (JNIEnv *, jobject); - /* * Class: com_intel_oap_mllib_OneCCL__ * Method: isRoot @@ -63,6 +39,30 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_setEnv JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1getAvailPort (JNIEnv *, jobject, jstring); +/* + * Class: com_intel_oap_mllib_OneCCL__ + * Method: c_init + * Signature: (IILjava/lang/String;Lcom/intel/oap/mllib/CCLParam;)I + */ +JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init + (JNIEnv *, jobject, jint, jint, jstring, jobject); + +/* + * Class: com_intel_oap_mllib_OneCCL__ + * Method: c_initDpcpp + * Signature: ()I + */ +JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1initDpcpp + (JNIEnv *, jobject); + +/* + * Class: com_intel_oap_mllib_OneCCL__ + * Method: c_cleanup + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup + (JNIEnv *, jobject); + #ifdef __cplusplus } #endif diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h index d6c95b3c9..445c364cb 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h @@ -10,10 +10,10 @@ extern "C" { /* * Class: com_intel_oap_mllib_clustering_KMeansDALImpl * Method: cKMeansOneapiComputeWithInitCenters - * Signature: (JJIDIIIIILjava/lang/String;Lcom/intel/oap/mllib/clustering/KMeansResult;)J + * Signature: (JJIDIIII[ILcom/intel/oap/mllib/clustering/KMeansResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCenters - (JNIEnv *, jobject, jlong, jlong, jint, jdouble, jint, jint, jint, jint, jint, jstring, jobject); + (JNIEnv *, jobject, jlong, jlong, jint, jdouble, jint, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h index 1b0fe0f91..1960d384c 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h @@ -10,10 +10,10 @@ extern "C" { /* * Class: com_intel_oap_mllib_feature_PCADALImpl * Method: cPCATrainDAL - * Signature: (JIIIILjava/lang/String;Lcom/intel/oap/mllib/feature/PCAResult;)J + * Signature: (JIII[ILcom/intel/oap/mllib/feature/PCAResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL - (JNIEnv *, jobject, jlong, jint, jint, jint, jint, jstring, jobject); + (JNIEnv *, jobject, jlong, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h index 052bd2934..e5fff962d 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h @@ -10,10 +10,10 @@ extern "C" { /* * Class: com_intel_oap_mllib_stat_CorrelationDALImpl * Method: cCorrelationTrainDAL - * Signature: (JIIIILjava/lang/String;Lcom/intel/oap/mllib/stat/CorrelationResult;)J + * Signature: (JIII[ILcom/intel/oap/mllib/stat/CorrelationResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL - (JNIEnv *, jobject, jlong, jint, jint, jint, jint, jstring, jobject); + (JNIEnv *, jobject, jlong, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h index a453b3315..7058bfa6d 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h @@ -10,10 +10,10 @@ extern "C" { /* * Class: com_intel_oap_mllib_stat_SummarizerDALImpl * Method: cSummarizerTrainDAL - * Signature: (JIIIILjava/lang/String;Lcom/intel/oap/mllib/stat/SummarizerResult;)J + * Signature: (JIII[ILcom/intel/oap/mllib/stat/SummarizerResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL - (JNIEnv *, jobject, jlong, jint, jint, jint, jint, jstring, jobject); + (JNIEnv *, jobject, jlong, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/oneapi/dal/ColumnAccessorImpl.cpp b/mllib-dal/src/main/native/oneapi/dal/ColumnAccessorImpl.cpp index 9dc5df8a9..479e4fcb3 100644 --- a/mllib-dal/src/main/native/oneapi/dal/ColumnAccessorImpl.cpp +++ b/mllib-dal/src/main/native/oneapi/dal/ColumnAccessorImpl.cpp @@ -23,14 +23,9 @@ #include #ifdef CPU_GPU_PROFILE -#include "GPU.h" - -#ifndef ONEDAL_DATA_PARALLEL -#define ONEDAL_DATA_PARALLEL -#endif +#include "Common.hpp" #include "com_intel_oneapi_dal_table_ColumnAccessor.h" -#include "oneapi/dal/table/homogen.hpp" #include "oneapi/dal/table/column_accessor.hpp" #include "service.h" diff --git a/mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp b/mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp index 88077b85f..4821d61b9 100644 --- a/mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp +++ b/mllib-dal/src/main/native/oneapi/dal/HomogenTableImpl.cpp @@ -24,14 +24,9 @@ #include #ifdef CPU_GPU_PROFILE -#include "GPU.h" - -#ifndef ONEDAL_DATA_PARALLEL -#define ONEDAL_DATA_PARALLEL -#endif +#include "Common.hpp" #include "com_intel_oneapi_dal_table_HomogenTableImpl.h" -#include "oneapi/dal/table/homogen.hpp" #include "service.h" using namespace std; diff --git a/mllib-dal/src/main/native/oneapi/dal/RowAccessorImpl.cpp b/mllib-dal/src/main/native/oneapi/dal/RowAccessorImpl.cpp index 1f31ef0e4..f2cd64d9e 100644 --- a/mllib-dal/src/main/native/oneapi/dal/RowAccessorImpl.cpp +++ b/mllib-dal/src/main/native/oneapi/dal/RowAccessorImpl.cpp @@ -23,14 +23,9 @@ #include #ifdef CPU_GPU_PROFILE -#include "GPU.h" - -#ifndef ONEDAL_DATA_PARALLEL -#define ONEDAL_DATA_PARALLEL -#endif +#include "Common.hpp" #include "com_intel_oneapi_dal_table_RowAccessor.h" -#include "oneapi/dal/table/homogen.hpp" #include "oneapi/dal/table/row_accessor.hpp" #include "service.h" diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala index 7821ffd62..aa94be073 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala @@ -40,13 +40,6 @@ object OneCCL extends Logging { s"commSize, ${cclParam.commSize}, rankId: ${cclParam.rankId}") } - def initDpcpp(): Unit = { - setExecutorEnv() - logInfo(s"oneccl init") - // cclParam is output from native code - c_initDpcpp() - } - // Run on Executor def setExecutorEnv(): Unit = { setEnv("CCL_ATL_TRANSPORT", "ofi") @@ -75,7 +68,5 @@ object OneCCL extends Logging { @native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam): Int - @native private def c_initDpcpp(): Int - @native private def c_cleanup(): Unit } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 2ab94c7a1..f68c8d4f1 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -50,12 +50,18 @@ class KMeansDALImpl(var nClusters: Int, val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => var cCentroids = 0L val result = new KMeansResult() + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + val tableArr = table.next() + OneCCL.init(executorNum, rank, kvsIPPort) val initCentroids = if (useDevice == "GPU") { - OneCCL.initDpcpp() OneDAL.makeHomogenTable(centers, computeDevice).getcObejct() } else { - OneCCL.init(executorNum, rank, kvsIPPort) OneDAL.makeNumericTable(centers).getCNumericTable } cCentroids = cKMeansOneapiComputeWithInitCenters( @@ -67,8 +73,7 @@ class KMeansDALImpl(var nClusters: Int, executorNum, executorCores, computeDevice.ordinal(), - rank, - kvsIPPort, + gpuIndices, result ) @@ -80,14 +85,11 @@ class KMeansDALImpl(var nClusters: Int, } else { OneDAL.numericTableToVectors(OneDAL.makeNumericTable(cCentroids)) } - Iterator((centerVectors, result.totalCost, result.iterationNum)) } else { Iterator.empty } - if (useDevice == "CPU") { - OneCCL.cleanup() - } + OneCCL.cleanup() ret }.collect() @@ -122,7 +124,6 @@ class KMeansDALImpl(var nClusters: Int, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, - rankId: Int, - ipPort: String, + gpuIndices: Array[Int], result: KMeansResult): Long } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index b815daf6c..13a7056b6 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD import java.util.Arrays import com.intel.oneapi.dal.table.{Common, HomogenTable, RowAccessor} +import org.apache.spark.storage.StorageLevel class PCADALModel private[mllib] ( val k: Int, @@ -56,19 +57,20 @@ class PCADALImpl(val k: Int, val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - if (useDevice == "GPU") { - OneCCL.initDpcpp() + OneCCL.init(executorNum, rank, kvsIPPort) + val result = new PCAResult() + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) } else { - OneCCL.init(executorNum, rank, kvsIPPort) + null } - val result = new PCAResult() cPCATrainDAL( tableArr, executorNum, executorCores, computeDevice.ordinal(), - rank, - kvsIPPort, + gpuIndices, result ) @@ -96,9 +98,7 @@ class PCADALImpl(val k: Int, } else { Iterator.empty } - if (useDevice == "CPU") { - OneCCL.cleanup() - } + OneCCL.cleanup() ret }.collect() @@ -201,7 +201,6 @@ class PCADALImpl(val k: Int, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, - rankId: Int, - ipPort: String, + gpuIndices: Array[Int], result: PCAResult): Long } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 33ffe4a81..55ba0da09 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -42,24 +42,24 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => - val tableArr = table.next() - if (useDevice == "GPU") { - OneCCL.initDpcpp() - } else { - OneCCL.init(executorNum, rank, kvsIPPort) - } + OneCCL.init(executorNum, rank, kvsIPPort) val computeStartTime = System.nanoTime() val result = new CorrelationResult() + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } cCorrelationTrainDAL( tableArr, executorNum, executorCores, computeDevice.ordinal(), - rank, - kvsIPPort, + gpuIndices, result ) @@ -87,9 +87,7 @@ class CorrelationDALImpl( } else { Iterator.empty } - if (useDevice == "CPU") { - OneCCL.cleanup() - } + OneCCL.cleanup() ret }.collect() @@ -106,7 +104,6 @@ class CorrelationDALImpl( executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, - rankId: Int, - ipPort: String, + gpuIndices: Array[Int], result: CorrelationResult): Long } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 8ce7014b2..4141cf99d 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -44,22 +44,23 @@ class SummarizerDALImpl(val executorNum: Int, val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - if (useDevice == "GPU") { - OneCCL.initDpcpp() - } else { - OneCCL.init(executorNum, rank, kvsIPPort) - } + OneCCL.init(executorNum, rank, kvsIPPort) val computeStartTime = System.nanoTime() val result = new SummarizerResult() + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } cSummarizerTrainDAL( tableArr, executorNum, executorCores, computeDevice.ordinal(), - rank, - kvsIPPort, + gpuIndices, result ) @@ -111,9 +112,7 @@ class SummarizerDALImpl(val executorNum: Int, } else { Iterator.empty } - if (useDevice == "CPU") { - OneCCL.cleanup() - } + OneCCL.cleanup() ret }.collect() @@ -137,7 +136,6 @@ class SummarizerDALImpl(val executorNum: Int, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, - rankId: Int, - ipPort: String, + gpuIndices: Array[Int], result: SummarizerResult): Long } diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala index 4156c8ea5..74d28fe24 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala @@ -40,13 +40,14 @@ class CorrelationHomogenTableSuite extends FunctionsSuite with Logging { Vectors.dense(0.010893,-0.076968,0.023333,0.021695,0.041524,0.082077,0.053211,0.157492,-0.058518,1.000000)) val sourceData = TestCommon.readCSV("src/test/resources/data/covcormoments_dense.csv") - val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), Common.ComputeDevice.HOST); + val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), TestCommon.getComputeDevice); val correlationDAL = new CorrelationDALImpl(1, 1) + val gpuIndices = Array(0) val result = new CorrelationResult() - correlationDAL.cCorrelationTrainDAL(dataTable.getcObejct(), 1, 1, Common.ComputeDevice.HOST.ordinal(), 0, "127.0.0.1_3000", result); + correlationDAL.cCorrelationTrainDAL(dataTable.getcObejct(), 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result); val correlationMatrix = TestCommon.getMatrixFromTable(OneDAL.makeHomogenTable( - result.correlationNumericTable), Common.ComputeDevice.HOST) + result.correlationNumericTable), TestCommon.getComputeDevice) assertArrayEquals(TestCommon.convertArray(expectCorrelation), correlationMatrix.toArray, 0.000001) } diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/KmeansHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/KmeansHomogenTableSuite.scala index bdcc7a6a4..f7a95e760 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/KmeansHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/KmeansHomogenTableSuite.scala @@ -23,7 +23,6 @@ import org.apache.spark.ml.FunctionsSuite import org.apache.spark.ml.linalg.Vectors import org.junit.jupiter.api.Assertions.assertArrayEquals - class KmeansHomogenTableSuite extends FunctionsSuite with Logging { test("test compute Kmeans HomogenTable") { @@ -47,9 +46,11 @@ class KmeansHomogenTableSuite extends FunctionsSuite with Logging { val kmeansDAL = new KMeansDALImpl(0, 0, 0, null, null, 0, 0); + OneCCL.init(1, 1, "127.0.0.1_3000") + val gpuIndices = Array(0) val result = new KMeansResult(); val centroids = kmeansDAL.cKMeansOneapiComputeWithInitCenters(dataTable.getcObejct(), centroidsTable.getcObejct(),10, 0.001, - 5, 1, 1, TestCommon.getComputeDevice.ordinal(), 0, "127.0.0.1_3000" , result); + 5, 1, 1, TestCommon.getComputeDevice.ordinal(), gpuIndices, result); val resultVectors = OneDAL.homogenTableToVectors(OneDAL.makeHomogenTable(centroids), TestCommon.getComputeDevice); assertArrayEquals(TestCommon.convertArray(expectCentroids), TestCommon.convertArray(resultVectors), 0.000001) } diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/PCAHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/PCAHomogenTableSuite.scala index f3c8dbcdf..d9178161a 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/PCAHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/PCAHomogenTableSuite.scala @@ -45,8 +45,9 @@ class PCAHomogenTableSuite extends FunctionsSuite with Logging { val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), TestCommon.getComputeDevice); val pcaDAL = new PCADALImpl(5, 1, 1) + val gpuIndices = Array(0) val result = new PCAResult() - pcaDAL.cPCATrainDAL(dataTable.getcObejct(), 1, 1, TestCommon.getComputeDevice.ordinal(), 0, "127.0.0.1_3000", result); + pcaDAL.cPCATrainDAL(dataTable.getcObejct(), 1, 1, TestCommon.getComputeDevice.ordinal(), gpuIndices, result); val pcNumericTable = OneDAL.makeHomogenTable(result.pcNumericTable) val explainedVarianceNumericTable = OneDAL.makeHomogenTable( result.explainedVarianceNumericTable) @@ -76,8 +77,9 @@ class PCAHomogenTableSuite extends FunctionsSuite with Logging { val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), TestCommon.getComputeDevice); val pcaDAL = new PCADALImpl(5, 1, 1) + val gpuIndices = Array(0) val result = new PCAResult() - pcaDAL.cPCATrainDAL(dataTable.getcObejct(), 1, 1, TestCommon.getComputeDevice.ordinal(), 0, "127.0.0.1_3000", result); + pcaDAL.cPCATrainDAL(dataTable.getcObejct(), 1, 1, TestCommon.getComputeDevice.ordinal(), gpuIndices, result); val pcNumericTable = OneDAL.makeHomogenTable(result.pcNumericTable) val explainedVarianceNumericTable = OneDAL.makeHomogenTable( result.explainedVarianceNumericTable) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala index fc2315650..9e41c2aad 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala @@ -33,8 +33,9 @@ class SummarizerHomogenTableSuite extends FunctionsSuite with Logging{ val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), Common.ComputeDevice.HOST); val summarizerDAL = new SummarizerDALImpl(1, 1) + val gpuIndices = Array(0) val result = new SummarizerResult() - summarizerDAL.cSummarizerTrainDAL(dataTable.getcObejct(), 1, 1, Common.ComputeDevice.HOST.ordinal(), 0, "127.0.0.1_3000", result) + summarizerDAL.cSummarizerTrainDAL(dataTable.getcObejct(), 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result) val meanTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.meanNumericTable), Common.ComputeDevice.HOST) val varianceTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.varianceNumericTable), Common.ComputeDevice.HOST) val minimumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.minimumNumericTable), Common.ComputeDevice.HOST) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala index 3e9e11b54..5a2ecef27 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala @@ -99,7 +99,7 @@ object TestCommon { def getComputeDevice: Common.ComputeDevice = { val device = System.getProperty("computeDevice") - var computeDevice: Common.ComputeDevice = Common.ComputeDevice.GPU + var computeDevice: Common.ComputeDevice = Common.ComputeDevice.HOST if(device != null) { device.toUpperCase match { case "HOST" => computeDevice = Common.ComputeDevice.HOST