diff --git a/mllib-dal/src/main/native/CorrelationImpl.cpp b/mllib-dal/src/main/native/CorrelationImpl.cpp index f2dcac75b..7fc481ba8 100644 --- a/mllib-dal/src/main/native/CorrelationImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationImpl.cpp @@ -197,19 +197,20 @@ static void doCorrelationOneAPICompute( JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numCols, - jint executorNum, jint executorCores, jint computeDeviceOrdinal, - jintArray gpuIdxArray, jobject resultObj) { + JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows, + jlong numCols, jint executorNum, jint executorCores, + jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring store_path, + jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); - ccl::communicator &cclComm = getComm(); - int rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { case ComputeDevice::host: case ComputeDevice::cpu: { + ccl::communicator &cclComm = getComm(); + int rankId = cclComm.rank(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads(executorCores); @@ -225,26 +226,17 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL( } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rankId); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - int size = cclComm.size(); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); - auto queue = - getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + const char* path = env->GetStringUTFChars(store_path, nullptr); + ccl::string kvs_store_path(str); + auto comm = createDalCommunicator(executorNum, rank, kvs_store_path); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, size, rankId, kvs); doCorrelationOneAPICompute(env, pNumTabData, numRows, numCols, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); + env->ReleaseStringUTFChars(store_path, path); break; } #endif diff --git a/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp b/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp index c1d064d1b..f56cf23b5 100644 --- a/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp +++ b/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp @@ -300,39 +300,27 @@ static jobject doRFClassifierOneAPICompute( */ JNIEXPORT jobject JNICALL Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassifierTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabFeature, jlong featureRows, - jlong featureCols, jlong pNumTabLabel, jlong labelCols, jint executorNum, - jint computeDeviceOrdinal, jint classCount, jint treeCount, - jint numFeaturesPerNode, jint minObservationsLeafNode, + JNIEnv *env, jobject obj, jint rank, jlong pNumTabFeature, + jlong featureRows, jlong featureCols, jlong pNumTabLabel, jlong labelCols, + jint executorNum, jint computeDeviceOrdinal, jint classCount, + jint treeCount, jint numFeaturesPerNode, jint minObservationsLeafNode, jint minObservationsSplitNode, jdouble minWeightFractionLeafNode, jdouble minImpurityDecreaseSplitNode, jint maxTreeDepth, jlong seed, - jint maxBins, jboolean bootstrap, jintArray gpuIdxArray, + jint maxBins, jboolean bootstrap, jintArray gpuIdxArray, jstring store_path, jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels"); - ccl::communicator &cclComm = getComm(); - int rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rankId); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - int size = cclComm.size(); - ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); - auto queue = - getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + const char* path = env->GetStringUTFChars(store_path, nullptr); + ccl::string kvs_store_path(str); + auto comm = createDalCommunicator(executorNum, rank, kvs_store_path); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, size, rankId, kvs); jobject hashmapObj = doRFClassifierOneAPICompute( env, pNumTabFeature, featureRows, featureCols, pNumTabLabel, labelCols, executorNum, computeDeviceOrdinal, classCount, treeCount, @@ -340,6 +328,7 @@ Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassif minObservationsSplitNode, minWeightFractionLeafNode, minImpurityDecreaseSplitNode, maxTreeDepth, seed, maxBins, bootstrap, comm, resultObj); + env->ReleaseStringUTFChars(store_path, path); return hashmapObj; } default: { diff --git a/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp b/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp index 7619c2879..e560cb3ca 100644 --- a/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp +++ b/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp @@ -292,42 +292,33 @@ static jobject doRFRegressorOneAPICompute( JNIEXPORT jobject JNICALL Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabFeature, jlong featureRows, - jlong featureCols, jlong pNumTabLabel, jlong labelCols, jint executorNum, - jint computeDeviceOrdinal, jint treeCount, jint numFeaturesPerNode, - jint minObservationsLeafNode, jint maxTreeDepth, jlong seed, jint maxbins, - jboolean bootstrap, jintArray gpuIdxArray, jobject resultObj) { + JNIEnv *env, jobject obj, jint rank, jlong pNumTabFeature, + jlong featureRows, jlong featureCols, jlong pNumTabLabel, jlong labelCols, + jint executorNum, jint computeDeviceOrdinal, jint treeCount, + jint numFeaturesPerNode, jint minObservationsLeafNode, jint maxTreeDepth, + jlong seed, jint maxbins, jboolean bootstrap, jintArray gpuIdxArray, + jstring store_path, jobject resultObj) { logger::println(logger::INFO, "OneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); - ccl::communicator &cclComm = getComm(); - int rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rankId); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - int size = cclComm.size(); + logger::println(logger::INFO, + "OneDAL (native): use GPU kernels with rankid %d", + rank); - auto queue = - getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + const char* path = env->GetStringUTFChars(store_path, nullptr); + ccl::string kvs_store_path(str); + auto comm = createDalCommunicator(executorNum, rank, kvs_store_path); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, size, rankId, kvs); jobject hashmapObj = doRFRegressorOneAPICompute( env, pNumTabFeature, featureRows, featureCols, pNumTabLabel, labelCols, executorNum, computeDeviceOrdinal, treeCount, numFeaturesPerNode, minObservationsLeafNode, maxTreeDepth, seed, maxbins, bootstrap, comm, resultObj); + env->ReleaseStringUTFChars(store_path, path); return hashmapObj; } default: { diff --git a/mllib-dal/src/main/native/GPU.cpp b/mllib-dal/src/main/native/GPU.cpp index 4d60f9d78..019e574e8 100644 --- a/mllib-dal/src/main/native/GPU.cpp +++ b/mllib-dal/src/main/native/GPU.cpp @@ -4,11 +4,14 @@ #include "GPU.h" #include "Logger.h" +#define STORE_TIMEOUT_SEC 120 +#define KVS_CREATE_SUCCESS 0 +#define KVS_CREATE_FAILURE -1 typedef std::shared_ptr queuePtr; - static std::mutex g_mtx; static std::vector g_queueVector; +std::shared_ptr store; static std::vector get_gpus() { auto platforms = sycl::platform::get_platforms(); @@ -24,8 +27,56 @@ static std::vector get_gpus() { return {}; } +int create_kvs_by_store(std::shared_ptr store, int rank, + ccl::shared_ptr_class &kvs) { + logger::println(logger::INFO, "OneCCL (native): create_kvs_by_store "); + auto t1 = std::chrono::high_resolution_clock::now(); + ccl::kvs::address_type main_addr; + auto start = std::chrono::system_clock::now(); + if (rank == 0) { + kvs = ccl::create_main_kvs(); + main_addr = kvs->get_address(); + if (store->write((void *)main_addr.data(), main_addr.size()) < 0) { + logger::println( + logger::INFO, + "OneCCL (native): error occurred during write attempt"); + kvs.reset(); + return KVS_CREATE_FAILURE; + } + auto end = std::chrono::system_clock::now(); + auto exec_time = + (float)std::chrono::duration_cast(end - + start) + .count(); + logger::println(logger::INFO, + "OneCCL (native): write to store time %f secs", + exec_time / 1000); + } else { + if (store->read((void *)main_addr.data(), main_addr.size()) < 0) { + logger::println( + logger::INFO, + "OneCCL (native): error occurred during read attempt"); + kvs.reset(); + return KVS_CREATE_FAILURE; + } + auto end = std::chrono::system_clock::now(); + auto exec_time = + (float)std::chrono::duration_cast(end - + start) + .count(); + logger::println(logger::INFO, + "OneCCL (native): read from store time %f secs", + exec_time / 1000); + kvs = ccl::create_kvs(main_addr); + } + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + return KVS_CREATE_SUCCESS; +} + static int getLocalRank(ccl::communicator &comm, int size, int rank) { - const int MPI_MAX_PROCESSOR_NAME = 128; /* Obtain local rank among nodes sharing the same host name */ char zero = static_cast(0); std::vector name(MPI_MAX_PROCESSOR_NAME + 1, zero); @@ -66,8 +117,7 @@ static sycl::queue getSyclQueue(const sycl::device device) { } } -sycl::queue getAssignedGPU(const ComputeDevice device, ccl::communicator &comm, - int size, int rankId, jint *gpu_indices, int n_gpu) { +sycl::queue getAssignedGPU(const ComputeDevice device, int *gpu_indices) { switch (device) { case ComputeDevice::host: case ComputeDevice::cpu: { @@ -78,19 +128,8 @@ sycl::queue getAssignedGPU(const ComputeDevice device, ccl::communicator &comm, } case ComputeDevice::gpu: { logger::println(logger::INFO, "selector GPU"); - auto local_rank = getLocalRank(comm, size, rankId); auto gpus = get_gpus(); - - logger::println(logger::INFO, - "rank: %d size: %d local_rank: %d n_gpu: %d", rankId, - size, local_rank, n_gpu); - - auto gpu_selected = gpu_indices[local_rank % n_gpu]; - logger::println(logger::INFO, "GPU selected for current rank: %d", - gpu_selected); - - // In case gpu_selected index is larger than number of GPU SYCL devices - auto rank_gpu = gpus[gpu_selected % gpus.size()]; + auto rank_gpu = gpus[0]; sycl::queue q{rank_gpu}; return q; } @@ -125,3 +164,46 @@ sycl::queue getQueue(const ComputeDevice device) { } } } + +preview::spmd::communicator +createDalCommunicator(const jint executorNum, const jint rank, + const ccl::string kvs_store_path) { + auto gpus = get_gpus(); + + auto t1 = std::chrono::high_resolution_clock::now(); + + ccl::init(); + + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + + logger::println(logger::INFO, "OneCCL singleton init took %f secs", + duration / 1000); + + t1 = std::chrono::high_resolution_clock::now(); + ccl::shared_ptr_class kvs; + + store = std::make_shared( + kvs_store_path, rank, std::chrono::seconds(STORE_TIMEOUT_SEC)); + if (create_kvs_by_store(store, rank, kvs) != KVS_CREATE_SUCCESS) { + logger::println(logger::INFO, "can not create kvs by store"); + throw std::runtime_error("Failed to create communicator"); + } + t2 = std::chrono::high_resolution_clock::now(); + duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs", + duration / 1000); + sycl::queue queue{gpus[0]}; + t1 = std::chrono::high_resolution_clock::now(); + auto comm = preview::spmd::make_communicator( + queue, executorNum, rank, kvs); + t2 = std::chrono::high_resolution_clock::now(); + duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + return comm; +} diff --git a/mllib-dal/src/main/native/GPU.h b/mllib-dal/src/main/native/GPU.h index 818d3ddb4..2798a0574 100644 --- a/mllib-dal/src/main/native/GPU.h +++ b/mllib-dal/src/main/native/GPU.h @@ -1,13 +1,15 @@ #pragma once +#include "Communicator.hpp" #include "service.h" +#include "store.hpp" #include #include #include #include #include - -sycl::queue getAssignedGPU(const ComputeDevice device, ccl::communicator &comm, - int size, int rankId, jint *gpu_indices, int n_gpu); +sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices); sycl::queue getQueue(const ComputeDevice device); +preview::spmd::communicator +createDalCommunicator(jint executorNum, jint rank, ccl::string ccl_ip_port); diff --git a/mllib-dal/src/main/native/KMeansImpl.cpp b/mllib-dal/src/main/native/KMeansImpl.cpp index a1c629612..3186a184d 100644 --- a/mllib-dal/src/main/native/KMeansImpl.cpp +++ b/mllib-dal/src/main/native/KMeansImpl.cpp @@ -305,21 +305,23 @@ static jlong doKMeansOneAPICompute( */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCenters( - JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numCols, - jlong pNumTabCenters, jint clusterNum, jdouble tolerance, jint iterationNum, - jint executorNum, jint executorCores, jint computeDeviceOrdinal, - jintArray gpuIdxArray, jobject resultObj) { + JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows, + jlong numCols, jlong pNumTabCenters, jint clusterNum, jdouble tolerance, + jint iterationNum, jint executorNum, jint executorCores, + jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring store_path, + jobject resultObj) { logger::println(logger::INFO, "OneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); jlong ret = 0L; - ccl::communicator &cclComm = getComm(); - int rankId = cclComm.rank(); + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { case ComputeDevice::host: case ComputeDevice::cpu: { + ccl::communicator &cclComm = getComm(); + int rankId = cclComm.rank(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); NumericTablePtr centroids = *((NumericTablePtr *)pNumTabCenters); // Set number of threads for OneDAL to use for each rank @@ -337,28 +339,19 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rankId); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - int size = cclComm.size(); + logger::println(logger::INFO, + "OneDAL (native): use GPU kernels with rankid %d", + rank); - auto queue = - getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + const char* path = env->GetStringUTFChars(store_path, nullptr); + ccl::string kvs_store_path(str); + auto comm = createDalCommunicator(executorNum, rank, kvs_store_path); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, size, rankId, kvs); ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numCols, pNumTabCenters, clusterNum, tolerance, iterationNum, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); + env->ReleaseStringUTFChars(store_path, path); break; } #endif diff --git a/mllib-dal/src/main/native/LinearRegressionImpl.cpp b/mllib-dal/src/main/native/LinearRegressionImpl.cpp index 017b7706f..27baf6d1c 100644 --- a/mllib-dal/src/main/native/LinearRegressionImpl.cpp +++ b/mllib-dal/src/main/native/LinearRegressionImpl.cpp @@ -214,21 +214,17 @@ ridge_regression_compute(size_t rankId, ccl::communicator &comm, } #ifdef CPU_GPU_PROFILE -static jlong doLROneAPICompute(JNIEnv *env, size_t rankId, - ccl::communicator &cclComm, sycl::queue &queue, - jlong pNumTabFeature, jlong featureRows, - jlong featureCols, jlong pNumTabLabel, - jlong labelCols, jboolean jfitIntercept, - jint executorNum, jobject resultObj) { +static jlong doLROneAPICompute( + JNIEnv *env, size_t rankId, + preview::spmd::communicator comm, + jlong pNumTabFeature, jlong featureRows, jlong featureCols, + jlong pNumTabLabel, jlong labelCols, jboolean jfitIntercept, + jint executorNum, jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): GPU compute start , rankid %d", rankId); const bool isRoot = (rankId == ccl_root); bool fitIntercept = bool(jfitIntercept); - int size = cclComm.size(); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = preview::spmd::make_communicator( - queue, size, rankId, kvs); homogen_table xtrain = *reinterpret_cast( createHomogenTableWithArrayPtr(pNumTabFeature, featureRows, featureCols, comm.get_queue()) @@ -262,19 +258,16 @@ static jlong doLROneAPICompute(JNIEnv *env, size_t rankId, */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTrainDAL( - JNIEnv *env, jobject obj, jlong feature, jlong featureRows, + JNIEnv *env, jobject obj, jint rank, jlong feature, jlong featureRows, jlong featureCols, jlong label, jlong labelCols, jboolean fitIntercept, jdouble regParam, jdouble elasticNetParam, jint executorNum, jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, - jobject resultObj) { + jstring store_path, jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); - ccl::communicator &cclComm = getComm(); - size_t rankId = cclComm.rank(); - ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); bool useGPU = false; if (device == ComputeDevice::gpu && regParam == 0) { @@ -284,23 +277,23 @@ Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTra jlong resultptr = 0L; if (useGPU) { #ifdef CPU_GPU_PROFILE - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rankId); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - int size = cclComm.size(); - auto queue = - getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); - - resultptr = doLROneAPICompute( - env, rankId, cclComm, queue, feature, featureRows, featureCols, - label, labelCols, fitIntercept, executorNum, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); + + const char* path = env->GetStringUTFChars(store_path, nullptr); + ccl::string kvs_store_path(str); + auto comm = createDalCommunicator(executorNum, rank, kvs_store_path); + + resultptr = doLROneAPICompute(env, rank, comm, feature, featureRows, + featureCols, label, labelCols, + fitIntercept, executorNum, resultObj); + env->ReleaseStringUTFChars(store_path, path); #endif } else { + ccl::communicator &cclComm = getComm(); + size_t rankId = cclComm.rank(); + NumericTablePtr pLabel = *((NumericTablePtr *)label); NumericTablePtr pData = *((NumericTablePtr *)feature); @@ -323,22 +316,18 @@ Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTra NumericTablePtr *coeffvectors = new NumericTablePtr(resultTable); resultptr = (jlong)coeffvectors; - } - - jlong ret = 0L; - if (rankId == ccl_root) { - // Get the class of the result object - jclass clazz = env->GetObjectClass(resultObj); - // Get Field references - jfieldID coeffNumericTableField = - env->GetFieldID(clazz, "coeffNumericTable", "J"); + if (rankId == ccl_root) { + // Get the class of the result object + jclass clazz = env->GetObjectClass(resultObj); + // Get Field references + jfieldID coeffNumericTableField = + env->GetFieldID(clazz, "coeffNumericTable", "J"); - env->SetLongField(resultObj, coeffNumericTableField, resultptr); + env->SetLongField(resultObj, coeffNumericTableField, resultptr); - // intercept is already in first column of coeffvectors - ret = resultptr; - } else { - ret = (jlong)0; + // intercept is already in first column of coeffvectors + resultptr = (jlong)coeffvectors; + } } - return ret; + return resultptr; } diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 7d147d1a8..e9a164507 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -32,6 +32,7 @@ #include "Logger.h" #include "OneCCL.h" #include "com_intel_oap_mllib_OneCCL__.h" +#include "service.h" extern const size_t ccl_root = 0; @@ -54,6 +55,7 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( auto t1 = std::chrono::high_resolution_clock::now(); ccl::init(); + auto t2 = std::chrono::high_resolution_clock::now(); const char *str = env->GetStringUTFChars(ip_port, 0); ccl::string ccl_ip_port(str); @@ -63,8 +65,6 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( g_kvs.push_back(singletonCCLInit.kvs); g_comms.push_back( ccl::create_communicator(size, rank, singletonCCLInit.kvs)); - - auto t2 = std::chrono::high_resolution_clock::now(); auto duration = (float)std::chrono::duration_cast(t2 - t1) .count(); @@ -85,19 +85,6 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( return 1; } -/* - * Class: com_intel_oap_mllib_OneCCL__ - * Method: c_init - * Signature: ()I - */ -JNIEXPORT jint JNICALL -Java_com_intel_oap_mllib_OneCCL_00024_c_1initDpcpp(JNIEnv *env, jobject) { - logger::printerrln(logger::INFO, "OneCCL (native): init dpcpp"); - ccl::init(); - - return 1; -} - JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(JNIEnv *env, jobject obj) { logger::printerrln(logger::INFO, "OneCCL (native): cleanup"); diff --git a/mllib-dal/src/main/native/PCAImpl.cpp b/mllib-dal/src/main/native/PCAImpl.cpp index 0600b47d9..640c95477 100644 --- a/mllib-dal/src/main/native/PCAImpl.cpp +++ b/mllib-dal/src/main/native/PCAImpl.cpp @@ -250,19 +250,19 @@ static void doPCAOneAPICompute( JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numCols, - jint executorNum, jint executorCores, jint computeDeviceOrdinal, - jintArray gpuIdxArray, jobject resultObj) { + JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows, + jlong numCols, jint executorNum, jint executorCores, + jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring store_path, + jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); - - ccl::communicator &cclComm = getComm(); - size_t rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { case ComputeDevice::host: case ComputeDevice::cpu: { + ccl::communicator &cclComm = getComm(); + size_t rankId = cclComm.rank(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads(executorCores); @@ -278,25 +278,16 @@ Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL( } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rankId); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - int size = cclComm.size(); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); - auto queue = - getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + const char* path = env->GetStringUTFChars(store_path, nullptr); + ccl::string kvs_store_path(str); + auto comm = createDalCommunicator(executorNum, rank, kvs_store_path); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, size, rankId, kvs); doPCAOneAPICompute(env, pNumTabData, numRows, numCols, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); + env->ReleaseStringUTFChars(store_path, path); break; } #endif diff --git a/mllib-dal/src/main/native/SummarizerImpl.cpp b/mllib-dal/src/main/native/SummarizerImpl.cpp index 52b585dc2..6e89f293f 100644 --- a/mllib-dal/src/main/native/SummarizerImpl.cpp +++ b/mllib-dal/src/main/native/SummarizerImpl.cpp @@ -268,19 +268,19 @@ static void doSummarizerOneAPICompute( JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numCols, - jint executorNum, jint executorCores, jint computeDeviceOrdinal, - jintArray gpuIdxArray, jobject resultObj) { + JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows, + jlong numCols, jint executorNum, jint executorCores, + jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring store_path, + jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); - - ccl::communicator &cclComm = getComm(); - int rankId = cclComm.rank(); ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { case ComputeDevice::host: case ComputeDevice::cpu: { + ccl::communicator &cclComm = getComm(); + int rankId = cclComm.rank(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads(executorCores); @@ -296,31 +296,22 @@ Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL( } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rankId); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - int size = cclComm.size(); - - auto queue = - getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); + const char* path = env->GetStringUTFChars(store_path, nullptr); + ccl::string kvs_store_path(str); + auto comm = createDalCommunicator(executorNum, rank, kvs_store_path); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, size, rankId, kvs); doSummarizerOneAPICompute(env, pNumTabData, numRows, numCols, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); + env->ReleaseStringUTFChars(store_path, path); break; } #endif default: { - deviceError("PCA", ComputeDeviceString[computeDeviceOrdinal].c_str()); + deviceError("Summarizer", + ComputeDeviceString[computeDeviceOrdinal].c_str()); } } return 0; diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_classification_RandomForestClassifierDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_classification_RandomForestClassifierDALImpl.h index 8c0c4ecdd..79bd6f16f 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_classification_RandomForestClassifierDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_classification_RandomForestClassifierDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JJIIIIIIIDDIJIZ[ILcom/intel/oap/mllib/classification/RandomForestResult;)Ljava/util/HashMap; */ JNIEXPORT jobject JNICALL Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassifierTrainDAL - (JNIEnv *, jobject, jlong, jlong, jlong, jlong, jlong, jint, jint, jint, jint, jint, jint, jint, jdouble, jdouble, jint, jlong, jint, jboolean, jintArray, jobject); + (JNIEnv *, jobject, jint, jlong, jlong, jlong, jlong, jlong, jint, jint, jint, jint, jint, jint, jint, jdouble, jdouble, jint, jlong, jint, jboolean, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h index 595f69fb5..9a00db0a2 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JJIDIIII[ILcom/intel/oap/mllib/clustering/KMeansResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCenters - (JNIEnv *, jobject, jlong, jlong, jlong, jlong, jint, jdouble, jint, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jint, jlong, jlong, jlong, jlong, jint, jdouble, jint, jint, jint, jint, jintArray, jstring, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h index 2ac220860..3f1875ca9 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JIII[ILcom/intel/oap/mllib/feature/PCAResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL - (JNIEnv *, jobject, jlong, jlong, jlong, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jint, jlong, jlong, jlong, jint, jint, jint, jintArray, jstring, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_LinearRegressionDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_LinearRegressionDALImpl.h index 28c7e8f42..4f90f23f8 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_LinearRegressionDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_LinearRegressionDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JJZDDIII[ILcom/intel/oap/mllib/regression/LiRResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTrainDAL - (JNIEnv *, jobject, jlong, jlong, jlong, jlong, jlong, jboolean, jdouble, jdouble, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jint, jlong, jlong, jlong, jlong, jlong, jboolean, jdouble, jdouble, jint, jint, jint, jintArray, jstring, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_RandomForestRegressorDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_RandomForestRegressorDALImpl.h index ac457b3bf..7bf694a19 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_RandomForestRegressorDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_RandomForestRegressorDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JJIIIIIIJIZ[ILcom/intel/oap/mllib/classification/RandomForestResult;)Ljava/util/HashMap; */ JNIEXPORT jobject JNICALL Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTrainDAL - (JNIEnv *, jobject, jlong, jlong, jlong, jlong, jlong, jint, jint, jint, jint, jint, jint, jlong, jint, jboolean, jintArray, jobject); + (JNIEnv *, jobject, jint, jlong, jlong, jlong, jlong, jlong, jint, jint, jint, jint, jint, jint, jlong, jint, jboolean, jintArray, jstring, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h index 96219ae4f..4c404b452 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JIII[ILcom/intel/oap/mllib/stat/CorrelationResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL - (JNIEnv *, jobject, jlong, jlong, jlong, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jint, jlong, jlong, jlong, jint, jint, jint, jintArray, jstring, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h index 754a5b645..4261d6fdd 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JIII[ILcom/intel/oap/mllib/stat/SummarizerResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL - (JNIEnv *, jobject, jlong, jlong, jlong, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jint, jlong, jlong, jlong, jint, jint, jint, jintArray, jstring, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/store.hpp b/mllib-dal/src/main/native/store.hpp new file mode 100644 index 000000000..9a39a4199 --- /dev/null +++ b/mllib-dal/src/main/native/store.hpp @@ -0,0 +1,165 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#define CHECK(ret, msg) \ + if ((ret) < 0) { \ + throw std::system_error(errno, std::system_category(), msg); \ + } + +class base_store { +public: + base_store(){}; + + virtual ~base_store(){}; + + virtual int write(const void* data, size_t size) = 0; + + virtual int read(void* data, size_t size) = 0; +}; + +class file_store : public base_store { +public: + file_store(const file_store& other) = delete; + file_store& operator=(const file_store& other) = delete; + file_store(std::string path, int rank, const std::chrono::seconds& timeout) + : base_store(), + path(path), + rank(rank), + pos(0), + fd(-1), + timeout(timeout){}; + + virtual ~file_store() { + if (rank == 0) + std::remove(path.c_str()); + }; + + void release_resources() { + try { + CHECK(flock(fd, LOCK_UN), "Unlocking file: "); + } + catch (const std::system_error& e) { + fprintf(stderr, "%d\n%s\n", e.code().value(), e.what()); + } + + close(fd); + fd = -1; + } + + int write(const void* data, size_t size) override { + int ret = 0; + std::unique_lock locker(mtx); + fd = open(path.c_str(), O_CREAT | O_RDWR, 0644); + CHECK(fd, "Open file to write into (" + path + "): "); + + try { + CHECK(flock(fd, LOCK_EX), "Setting exclusive rights for writing to the file: "); + CHECK(lseek(fd, 0, SEEK_END), "Setting a cursor at the EOF: "); + + // writing into the file + while (size > 0) { + auto wr_v = ::write(fd, data, size); + CHECK(wr_v, "An error occured while writing to the file: "); + data = (uint8_t*)data + wr_v; + size -= wr_v; + } + CHECK(fsync(fd), "Flushing file content: "); + } + catch (const std::system_error& e) { + fprintf(stderr, "%d\n%s\n", e.code().value(), e.what()); + ret = -1; + } + + release_resources(); + return ret; + }; + + int read(void* data, size_t size) override { + const auto time_start = std::chrono::steady_clock::now(); + while (1) { + std::unique_lock locker(mtx); + fd = open(path.c_str(), O_RDONLY); + if (fd < 0 && errno == ENOENT) { + // file might not exist yet + const auto time_passed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - time_start); + if (time_passed > timeout) { + throw std::runtime_error("Timeout " + std::to_string(timeout.count()) + + "s waiting for the file " + path + " to open"); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10 * rank)); + continue; + } + else { + CHECK(fd, "Open the file to read from (" + path + "): "); + } + + try { + CHECK(flock(fd, LOCK_SH), "Setting shared rights for reading the file: "); + + auto start = lseek(fd, 0, SEEK_SET); + CHECK(start, "Setting the cursor at the beginning of the file: "); + + // find the real size of the file + auto len = lseek(fd, 0, SEEK_END); + CHECK(len, "Setting the cursor at the EOF: "); + + if (len == start) { + // nothing has been written yet + release_resources(); + locker.unlock(); + const auto time_passed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - time_start); + if (time_passed > timeout) { + throw std::runtime_error("Timeout " + std::to_string(timeout.count()) + + "s waiting for the file " + path + " to read"); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10 * rank)); + continue; + } + + // start from where we stopped last time + start = lseek(fd, pos, SEEK_SET); + CHECK(start, "Setting the cursor at the last known position: "); + + // if there are still some bytes to read + if (len > start && size > 0) { + size -= len; + while (len > 0) { + auto rd = ::read(fd, data, len); + CHECK(rd, "An error occured while reading the file: ") + data = (uint8_t*)data + rd; + len -= rd; + } + pos = lseek(fd, 0, SEEK_CUR); + CHECK(pos, "Saving the cursor current position: "); + } + else { + release_resources(); + break; + } + } + catch (const std::system_error& e) { + fprintf(stderr, "%d\n%s\n", e.code().value(), e.what()); + release_resources(); + return -1; + } + } + return 0; + }; + +protected: + std::string path; + int rank; + off_t pos; + int fd; + std::chrono::seconds timeout; + std::mutex mtx; +}; \ No newline at end of file diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala new file mode 100644 index 000000000..e3e0aab58 --- /dev/null +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.mllib + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD + +object CommonJob { + + def setAffinityMask(data: RDD[_], useDevice: String): Unit = { + data.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() + } + + def createCCLInit(data: RDD[_], executorNum: Int, kvsIPPort: String, useDevice: String): Unit = { + if (useDevice == "CPU") { + data.mapPartitionsWithIndex { (rank, table) => + OneCCL.init(executorNum, rank, kvsIPPort) + Iterator.empty + }.count() + } + } + +} diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala index 48caebe1b..c89c9ffd2 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala @@ -42,12 +42,8 @@ object OneCCL extends Logging { } // Run on Executor - def setExecutorEnv(): Unit = { - setEnv("CCL_ATL_TRANSPORT", "ofi") - // Set CCL_ROOT to workaround CCL_ROOT env read bug, should remove when upstream fix this - setEnv("CCL_ROOT", "/opt/intel/oneapi/ccl/latest") - // Uncomment this if you whant to debug oneCCL - // setEnv("CCL_LOG_LEVEL", "debug") + def setExecutorEnv(key: String, value: String): Unit = { + setEnv(key, value) } // Run on Executor diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index 8e4d27160..1636c281d 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -16,7 +16,7 @@ package com.intel.oap.mllib.classification import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.annotation.Since import org.apache.spark.TaskContext @@ -29,6 +29,7 @@ import org.apache.spark.sql.Dataset import org.apache.spark.ml.tree import org.apache.spark.mllib.tree.model.ImpurityStats +import java.time.Instant import java.util import java.util.{ArrayList, Map} import scala.collection.mutable.HashMap @@ -57,6 +58,7 @@ class RandomForestClassifierDALImpl(val uid: String, val sparkContext = labeledPoints.rdd.sparkContext val rfcTimer = new Utils.AlgoTimeMetrics("RandomForestClassifier", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() // used run Random Forest unit test val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) @@ -75,10 +77,8 @@ class RandomForestClassifierDALImpl(val uid: String, rfcTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(labeledPointsTables, useDevice) + CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice) rfcTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => @@ -96,6 +96,7 @@ class RandomForestClassifierDALImpl(val uid: String, val computeStartTime = System.nanoTime() val result = new RandomForestResult val hashmap = cRFClassifierTrainDAL( + rank, feature._1, feature._2, feature._3, @@ -115,6 +116,7 @@ class RandomForestClassifierDALImpl(val uid: String, maxBins, bootstrap, gpuIndices, + storePath, result) val computeEndTime = System.nanoTime() @@ -140,7 +142,8 @@ class RandomForestClassifierDALImpl(val uid: String, results(0) } - @native private[mllib] def cRFClassifierTrainDAL(featureTabAddr: Long, + @native private[mllib] def cRFClassifierTrainDAL(rank: Int, + featureTabAddr: Long, numRows: Long, numCols: Long, lableTabAddr: Long, @@ -159,6 +162,7 @@ class RandomForestClassifierDALImpl(val uid: String, maxBins: Int, bootstrap: Boolean, gpuIndices: Array[Int], + storePath: String, result: RandomForestResult): java.util.HashMap[java.lang.Integer, java.util.ArrayList[LearningNode]] } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index e194e9d22..9d829220a 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.clustering import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -27,6 +27,8 @@ import org.apache.spark.mllib.clustering.{KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.rdd.RDD +import java.time.Instant + class KMeansDALImpl(var nClusters: Int, var maxIterations: Int, var tolerance: Double, @@ -40,6 +42,7 @@ class KMeansDALImpl(var nClusters: Int, val sparkContext = data.sparkContext val kmeansTimer = new Utils.AlgoTimeMetrics("KMeans", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) kmeansTimer.record("Preprocessing") @@ -52,10 +55,8 @@ class KMeansDALImpl(var nClusters: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(coalescedTables, useDevice) + CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice) kmeansTimer.record("OneCCL Init") val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => @@ -81,6 +82,7 @@ class KMeansDALImpl(var nClusters: Int, } cCentroids = cKMeansOneapiComputeWithInitCenters( + rank, tableArr, rows, columns, @@ -92,6 +94,7 @@ class KMeansDALImpl(var nClusters: Int, executorCores, computeDevice.ordinal(), gpuIndices, + storePath, result ) @@ -107,7 +110,9 @@ class KMeansDALImpl(var nClusters: Int, } else { Iterator.empty } - OneCCL.cleanup() + if (useDevice == "CPU") { + OneCCL.cleanup() + } ret }.collect() @@ -136,7 +141,8 @@ class KMeansDALImpl(var nClusters: Int, parentModel } - @native private[mllib] def cKMeansOneapiComputeWithInitCenters(data: Long, + @native private[mllib] def cKMeansOneapiComputeWithInitCenters( rank: Int, + data: Long, numRows: Long, numCols: Long, centers: Long, @@ -147,5 +153,6 @@ class KMeansDALImpl(var nClusters: Int, executorCores: Int, computeDeviceOrdinal: Int, gpuIndices: Array[Int], + storePath: String, result: KMeansResult): Long } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 0410c18a7..042b54d23 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -19,7 +19,7 @@ package com.intel.oap.mllib.feature import java.nio.DoubleBuffer import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable} import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Service, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Service, Utils} import org.apache.spark.TaskContext import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging @@ -32,6 +32,8 @@ import java.util.Arrays import com.intel.oneapi.dal.table.{Common, HomogenTable, RowAccessor} import org.apache.spark.storage.StorageLevel +import java.time.Instant + class PCADALModel private[mllib] ( val k: Int, val pc: OldDenseMatrix, @@ -47,6 +49,7 @@ class PCADALImpl(val k: Int, val sparkContext = normalizedData.sparkContext val pcaTimer = new Utils.AlgoTimeMetrics("PCA", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) pcaTimer.record("Preprocessing") @@ -59,10 +62,8 @@ class PCADALImpl(val k: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(coalescedTables, useDevice) + CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice) pcaTimer.record("OneCCL Init") val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => @@ -79,6 +80,7 @@ class PCADALImpl(val k: Int, null } cPCATrainDAL( + rank, tableArr, rows, columns, @@ -86,6 +88,7 @@ class PCADALImpl(val k: Int, executorCores, computeDevice.ordinal(), gpuIndices, + storePath, result ) @@ -214,12 +217,14 @@ class PCADALImpl(val k: Int, // Single entry to call Correlation PCA DAL backend with parameter K - @native private[mllib] def cPCATrainDAL(data: Long, + @native private[mllib] def cPCATrainDAL(rank: Int, + data: Long, numRows: Long, numCols: Long, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, gpuIndices: Array[Int], + storePath: String, result: PCAResult): Long } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index 806fdb40c..ab5a0aac1 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.regression import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.SparkException import org.apache.spark.TaskContext @@ -29,6 +29,8 @@ import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors import org.apache.spark.sql.Dataset import org.apache.spark.rdd.RDD +import java.time.Instant + /** * Model fitted by [[LinearRegressionDALImpl]]. @@ -73,6 +75,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, val sparkContext = labeledPoints.sparkSession.sparkContext val lrTimer = new Utils.AlgoTimeMetrics("LinearRegression", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false) @@ -106,6 +109,10 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } lrTimer.record("Data Convertion") + CommonJob.setAffinityMask(labeledPointsTables, useDevice) + CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice) + lrTimer.record("OneCCL Init") + val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => val (feature, label) = tables.next() val (featureTabAddr : Long, featureRows : Long, featureColumns : Long) = @@ -121,7 +128,6 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, (label.toString.toLong, 0L, 0L) } - OneCCL.init(executorNum, rank, kvsIPPort) val result = new LiRResult() val gpuIndices = if (useDevice == "GPU") { @@ -138,6 +144,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } val cbeta = cLinearRegressionTrainDAL( + rank, featureTabAddr, featureRows, featureColumns, @@ -150,6 +157,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, executorCores, computeDevice.ordinal(), gpuIndices, + storePath, result ) @@ -183,7 +191,8 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } // Single entry to call Linear Regression DAL backend with parameters - @native private def cLinearRegressionTrainDAL(data: Long, + @native private def cLinearRegressionTrainDAL(rank: Int, + data: Long, numRows: Long, numCols: Long, label: Long, @@ -195,6 +204,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, executorCores: Int, computeDeviceOrdinal: Int, gpuIndices: Array[Int], + storePath: String, result: LiRResult): Long } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index 16fd17cdb..1f03d4206 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.regression import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.classification.{LearningNode, RandomForestResult} -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -25,6 +25,7 @@ import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.linalg.Matrix import org.apache.spark.sql.Dataset +import java.time.Instant import java.util import scala.collection.JavaConversions._ @@ -49,6 +50,7 @@ class RandomForestRegressorDALImpl(val uid: String, val sparkContext = labeledPoints.rdd.sparkContext val rfrTimer = new Utils.AlgoTimeMetrics("RandomForestRegressor", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) // used run Random Forest unit test val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false) @@ -69,10 +71,8 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(labeledPointsTables, useDevice) + CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice) rfrTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => @@ -91,6 +91,7 @@ class RandomForestRegressorDALImpl(val uid: String, val computeStartTime = System.nanoTime() val result = new RandomForestResult val hashmap = cRFRegressorTrainDAL( + rank, feature._1, feature._2, feature._3, @@ -106,6 +107,7 @@ class RandomForestRegressorDALImpl(val uid: String, maxbins, bootstrap, gpuIndices, + storePath, result) val computeEndTime = System.nanoTime() @@ -141,7 +143,8 @@ class RandomForestRegressorDALImpl(val uid: String, results(0)._2 } - @native private[mllib] def cRFRegressorTrainDAL(featureTabAddr: Long, + @native private[mllib] def cRFRegressorTrainDAL(rank: Int, + featureTabAddr: Long, numRows: Long, numCols: Long, lableTabAddr: Long, @@ -156,5 +159,6 @@ class RandomForestRegressorDALImpl(val uid: String, maxbins: Int, bootstrap: Boolean, gpuIndices: Array[Int], + storePath: String, result: RandomForestResult): java.util.HashMap[java.lang.Integer, java.util.ArrayList[LearningNode]] } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 21465e1af..ab40a74d5 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -17,13 +17,15 @@ package com.intel.oap.mllib.stat import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.{Matrix, Vector} import org.apache.spark.rdd.RDD +import java.time.Instant + class CorrelationDALImpl( val executorNum: Int, val executorCores: Int) @@ -33,6 +35,7 @@ class CorrelationDALImpl( val sparkContext = data.sparkContext val corTimer = new Utils.AlgoTimeMetrics("Correlation", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) corTimer.record("Preprocessing") @@ -46,10 +49,8 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(coalescedTables, useDevice) + CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice) corTimer.record("OneCCL Init") val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => @@ -69,6 +70,7 @@ class CorrelationDALImpl( null } cCorrelationTrainDAL( + rank, tableArr, rows, columns, @@ -76,6 +78,7 @@ class CorrelationDALImpl( executorCores, computeDevice.ordinal(), gpuIndices, + storePath, result ) @@ -118,12 +121,14 @@ class CorrelationDALImpl( } - @native private[mllib] def cCorrelationTrainDAL(data: Long, + @native private[mllib] def cCorrelationTrainDAL(rank: Int, + data: Long, numRows: Long, numCols: Long, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, gpuIndices: Array[Int], + storePath: String, result: CorrelationResult): Long } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index a516962c3..294f6ec0e 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -16,7 +16,7 @@ package com.intel.oap.mllib.stat -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Vector @@ -26,6 +26,8 @@ import org.apache.spark.rdd.RDD import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oneapi.dal.table.Common +import java.time.Instant + class SummarizerDALImpl(val executorNum: Int, val executorCores: Int) extends Serializable with Logging { @@ -34,6 +36,7 @@ class SummarizerDALImpl(val executorNum: Int, val sparkContext = data.sparkContext val sumTimer = new Utils.AlgoTimeMetrics("Summarizer", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) + val storePath = sparkContext.getConf.get("spark.oap.mllib.kvsStorePath") + "/" + Instant.now() val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) sumTimer.record("Preprocessing") @@ -47,10 +50,8 @@ class SummarizerDALImpl(val executorNum: Int, val kvsIPPort = getOneCCLIPPort(data) - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(coalescedTables, useDevice) + CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice) sumTimer.record("OneCCL Init") val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => @@ -70,6 +71,7 @@ class SummarizerDALImpl(val executorNum: Int, null } cSummarizerTrainDAL( + rank, tableArr, rows, columns, @@ -77,6 +79,7 @@ class SummarizerDALImpl(val executorNum: Int, executorCores, computeDevice.ordinal(), gpuIndices, + storePath, result ) @@ -150,12 +153,14 @@ class SummarizerDALImpl(val executorNum: Int, summary } - @native private[mllib] def cSummarizerTrainDAL(data: Long, + @native private[mllib] def cSummarizerTrainDAL(rank: Int, + data: Long, numRows: Long, numCols: Long, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, gpuIndices: Array[Int], + storePath: String, result: SummarizerResult): Long } diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/ConvertHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/ConvertHomogenTableSuite.scala index bbb6bbe7e..3246387b3 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/ConvertHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/ConvertHomogenTableSuite.scala @@ -57,7 +57,7 @@ class ConvertHomogenTableSuite extends FunctionsSuite with Logging { val metadata = table.getMetaData for (i <- 0 until 10) { assert(metadata.getDataType(i) == FLOAT64) - assert(metadata.getFeatureType(i) == Common.FeatureType.RATIO) + assert(metadata.getFeatureType(i) == CommonJob.FeatureType.RATIO) } assertArrayEquals(table.getDoubleData, TestCommon.convertArray(data)) @@ -75,7 +75,7 @@ class ConvertHomogenTableSuite extends FunctionsSuite with Logging { val metadata = table.getMetaData for (i <- 0 until 10) { assert(metadata.getDataType(i) == FLOAT64) - assert(metadata.getFeatureType(i) == Common.FeatureType.RATIO) + assert(metadata.getFeatureType(i) == CommonJob.FeatureType.RATIO) } assertArrayEquals(table.getDoubleData, data) @@ -105,7 +105,7 @@ class ConvertHomogenTableSuite extends FunctionsSuite with Logging { val metadata = table.getMetaData for (i <- 0 until 10) { assert(metadata.getDataType(i) == FLOAT64) - assert(metadata.getFeatureType(i) == Common.FeatureType.RATIO) + assert(metadata.getFeatureType(i) == CommonJob.FeatureType.RATIO) } assertArrayEquals(table.getDoubleData, TestCommon.convertArray(data)) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala index 34361766d..98d37a338 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala @@ -45,7 +45,7 @@ class CorrelationHomogenTableSuite extends FunctionsSuite with Logging { val correlationDAL = new CorrelationDALImpl(1, 1) val gpuIndices = Array(0) val result = new CorrelationResult() - correlationDAL.cCorrelationTrainDAL(dataTable.getcObejct(), sourceData.length, sourceData(0).length, 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result); + correlationDAL.cCorrelationTrainDAL(dataTable.getcObejct(), sourceData.length, sourceData(0).length, 1, 1, CommonJob.ComputeDevice.HOST.ordinal(), gpuIndices, result); val correlationMatrix = TestCommon.getMatrixFromTable(OneDAL.makeHomogenTable( result.getCorrelationNumericTable), TestCommon.getComputeDevice) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala index 712cccbfa..5917af2e1 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala @@ -31,15 +31,15 @@ class SummarizerHomogenTableSuite extends FunctionsSuite with Logging{ val sourceData = TestCommon.readCSV("src/test/resources/data/covcormoments_dense.csv") - val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), Common.ComputeDevice.HOST); + val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), CommonJob.ComputeDevice.HOST); val summarizerDAL = new SummarizerDALImpl(1, 1) val gpuIndices = Array(0) val result = new SummarizerResult() - summarizerDAL.cSummarizerTrainDAL(dataTable.getcObejct(), sourceData.length, sourceData(0).length, 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result) - val meanTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMeanNumericTable), Common.ComputeDevice.HOST) - val varianceTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getVarianceNumericTable), Common.ComputeDevice.HOST) - val minimumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMinimumNumericTable), Common.ComputeDevice.HOST) - val maximumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMaximumNumericTable), Common.ComputeDevice.HOST) + summarizerDAL.cSummarizerTrainDAL(dataTable.getcObejct(), sourceData.length, sourceData(0).length, 1, 1, CommonJob.ComputeDevice.HOST.ordinal(), gpuIndices, result) + val meanTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMeanNumericTable), CommonJob.ComputeDevice.HOST) + val varianceTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getVarianceNumericTable), CommonJob.ComputeDevice.HOST) + val minimumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMinimumNumericTable), CommonJob.ComputeDevice.HOST) + val maximumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMaximumNumericTable), CommonJob.ComputeDevice.HOST) assertArrayEquals(expectMean , meanTable.toArray, 0.000001) assertArrayEquals(expectVariance, varianceTable.toDense.values, 0.000001) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala index 5a2ecef27..9ae20cec4 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala @@ -84,7 +84,7 @@ object TestCommon { arrayDouble } def getMatrixFromTable(table: HomogenTable, - device: Common.ComputeDevice): DenseMatrix = { + device: CommonJob.ComputeDevice): DenseMatrix = { val numRows = table.getRowCount.toInt val numCols = table.getColumnCount.toInt // returned DoubleBuffer is ByteByffer, need to copy as double array @@ -97,14 +97,14 @@ object TestCommon { matrix } - def getComputeDevice: Common.ComputeDevice = { + def getComputeDevice: CommonJob.ComputeDevice = { val device = System.getProperty("computeDevice") - var computeDevice: Common.ComputeDevice = Common.ComputeDevice.HOST + var computeDevice: CommonJob.ComputeDevice = CommonJob.ComputeDevice.HOST if(device != null) { device.toUpperCase match { - case "HOST" => computeDevice = Common.ComputeDevice.HOST - case "CPU" => computeDevice = Common.ComputeDevice.CPU - case "GPU" => computeDevice = Common.ComputeDevice.GPU + case "HOST" => computeDevice = CommonJob.ComputeDevice.HOST + case "CPU" => computeDevice = CommonJob.ComputeDevice.CPU + case "GPU" => computeDevice = CommonJob.ComputeDevice.GPU case _ => "Invalid Device" } }