From 44489c8ac98e87b8866b5e57be05fabd156a6c15 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Wed, 11 Oct 2023 02:58:34 +0000 Subject: [PATCH] Create new Homogen Table with assign GPU before algorithms running. Signed-off-by: minmingzhu --- mllib-dal/src/main/native/CorrelationImpl.cpp | 26 ++++++--- .../native/DecisionForestClassifierImpl.cpp | 56 ++++++++++++------- .../native/DecisionForestRegressorImpl.cpp | 50 ++++++++++++----- mllib-dal/src/main/native/KMeansImpl.cpp | 34 +++++++---- .../src/main/native/LinearRegressionImpl.cpp | 46 ++++++++++----- mllib-dal/src/main/native/PCAImpl.cpp | 27 ++++++--- mllib-dal/src/main/native/SummarizerImpl.cpp | 26 ++++++--- ...sification_RandomForestClassifierDALImpl.h | 2 +- ...intel_oap_mllib_clustering_KMeansDALImpl.h | 2 +- .../com_intel_oap_mllib_feature_PCADALImpl.h | 2 +- ...mllib_regression_LinearRegressionDALImpl.h | 2 +- ..._regression_RandomForestRegressorDALImpl.h | 2 +- ..._intel_oap_mllib_stat_CorrelationDALImpl.h | 4 +- ...m_intel_oap_mllib_stat_SummarizerDALImpl.h | 2 +- .../scala/com/intel/oap/mllib/OneDAL.scala | 32 +++++------ .../RandomForestClassifierDALImpl.scala | 27 +++++++-- .../oap/mllib/clustering/KMeansDALImpl.scala | 33 +++++++---- .../intel/oap/mllib/feature/PCADALImpl.scala | 13 ++++- .../regression/LinearRegressionDALImpl.scala | 27 ++++++++- .../RandomForestRegressorDALImpl.scala | 27 +++++++-- .../oap/mllib/stat/CorrelationDALImpl.scala | 13 ++++- .../oap/mllib/stat/SummarizerDALImpl.scala | 13 ++++- .../mllib/CorrelationHomogenTableSuite.scala | 2 +- .../oap/mllib/KmeansHomogenTableSuite.scala | 2 +- .../oap/mllib/PCAHomogenTableSuite.scala | 4 +- .../mllib/SummarizerHomogenTableSuite.scala | 2 +- .../org/apache/spark/ml/oneDALSuite.scala | 6 +- 27 files changed, 335 insertions(+), 147 deletions(-) diff --git a/mllib-dal/src/main/native/CorrelationImpl.cpp b/mllib-dal/src/main/native/CorrelationImpl.cpp index 49206f22a..abd7fd0f9 100644 --- a/mllib-dal/src/main/native/CorrelationImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationImpl.cpp @@ -149,13 +149,22 @@ static void doCorrelationDaalCompute(JNIEnv *env, jobject obj, size_t rankId, #ifdef CPU_GPU_PROFILE static void doCorrelationOneAPICompute( - JNIEnv *env, jlong pNumTabData, + JNIEnv *env, jlong pNumTabData, jlong numRows, jlong numClos, preview::spmd::communicator comm, - jobject resultObj) { + jobject resultObj, sycl::queue &queue) { logger::println(logger::INFO, "oneDAL (native): GPU compute start"); const bool isRoot = (comm.get_rank() == ccl_root); - homogen_table htable = - *reinterpret_cast(pNumTabData); + GpuAlgorithmFPType *htableArray = + reinterpret_cast(pNumTabData); + auto data = + sycl::malloc_shared(numRows * numClos, queue); + queue + .memcpy(data, htableArray, + sizeof(GpuAlgorithmFPType) * numRows * numClos) + .wait(); + homogen_table htable{ + queue, data, numRows, numClos, + detail::make_default_delete(queue)}; const auto cor_desc = covariance_gpu::descriptor{}.set_result_options( @@ -195,9 +204,9 @@ static void doCorrelationOneAPICompute( JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, jint executorNum, - jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, - jobject resultObj) { + JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numClos, + jint executorNum, jint executorCores, jint computeDeviceOrdinal, + jintArray gpuIdxArray, jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); @@ -240,7 +249,8 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL( auto comm = preview::spmd::make_communicator( queue, size, rankId, kvs); - doCorrelationOneAPICompute(env, pNumTabData, comm, resultObj); + doCorrelationOneAPICompute(env, pNumTabData, numRows, numClos, comm, + resultObj, queue); env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); break; } diff --git a/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp b/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp index ae4d084b7..535e6f1cd 100644 --- a/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp +++ b/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp @@ -208,25 +208,41 @@ jobject collect_model(JNIEnv *env, const df::model &m, } static jobject doRFClassifierOneAPICompute( - JNIEnv *env, jlong pNumTabFeature, jlong pNumTabLabel, jint executorNum, + JNIEnv *env, jlong pNumTabFeature, jlong featureRows, jlong featureCols, + jlong pNumTabLabel, jlong labelCols, jint executorNum, jint computeDeviceOrdinal, jint classCount, jint treeCount, jint numFeaturesPerNode, jint minObservationsLeafNode, jint minObservationsSplitNode, jdouble minWeightFractionLeafNode, jdouble minImpurityDecreaseSplitNode, jint maxTreeDepth, jlong seed, jint maxBins, jboolean bootstrap, preview::spmd::communicator comm, - jobject resultObj) { + jobject resultObj, sycl::queue &queue) { logger::println(logger::INFO, "oneDAL (native): GPU compute start"); const bool isRoot = (comm.get_rank() == ccl_root); - homogen_table hFeaturetable = - *reinterpret_cast(pNumTabFeature); - homogen_table hLabeltable = - *reinterpret_cast(pNumTabLabel); - logger::println(logger::INFO, - "doRFClassifierOneAPICompute get_column_count = %d", - hFeaturetable.get_column_count()); - logger::println(logger::INFO, "doRFClassifierOneAPICompute classCount = %d", - classCount); + GpuAlgorithmFPType *htableFeatureArray = + reinterpret_cast(pNumTabFeature); + GpuAlgorithmFPType *htableLabelArray = + reinterpret_cast(pNumTabLabel); + + auto featureData = sycl::malloc_shared( + featureRows * featureCols, queue); + queue + .memcpy(featureData, htableFeatureArray, + sizeof(GpuAlgorithmFPType) * featureRows * featureCols) + .wait(); + homogen_table hFeaturetable{ + queue, featureData, featureRows, featureCols, + detail::make_default_delete(queue)}; + + auto labelData = + sycl::malloc_shared(featureRows * labelCols, queue); + queue + .memcpy(labelData, htableLabelArray, + sizeof(GpuAlgorithmFPType) * featureRows * labelCols) + .wait(); + homogen_table hLabeltable{ + queue, labelData, featureRows, labelCols, + detail::make_default_delete(queue)}; const auto df_desc = df::descriptor( queue, size, rankId, kvs); jobject hashmapObj = doRFClassifierOneAPICompute( - env, pNumTabFeature, pNumTabLabel, executorNum, - computeDeviceOrdinal, classCount, treeCount, numFeaturesPerNode, - minObservationsLeafNode, minObservationsSplitNode, - minWeightFractionLeafNode, minImpurityDecreaseSplitNode, - maxTreeDepth, seed, maxBins, bootstrap, comm, resultObj); + env, pNumTabFeature, featureRows, featureCols, pNumTabLabel, + labelCols, executorNum, computeDeviceOrdinal, classCount, treeCount, + numFeaturesPerNode, minObservationsLeafNode, + minObservationsSplitNode, minWeightFractionLeafNode, + minImpurityDecreaseSplitNode, maxTreeDepth, seed, maxBins, + bootstrap, comm, resultObj, queue); return hashmapObj; } default: { diff --git a/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp b/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp index 3a96b52de..f2b784a53 100644 --- a/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp +++ b/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp @@ -207,18 +207,38 @@ jobject collect_model(JNIEnv *env, const df::model &m, } static jobject doRFRegressorOneAPICompute( - JNIEnv *env, jlong pNumTabFeature, jlong pNumTabLabel, jint executorNum, + JNIEnv *env, jlong pNumTabFeature, jlong featureRows, jlong featureCols, + jlong pNumTabLabel, jlong labelCols, jint executorNum, jint computeDeviceOrdinal, jint treeCount, jint numFeaturesPerNode, jint minObservationsLeafNode, jint maxTreeDepth, jlong seed, jint maxbins, jboolean bootstrap, preview::spmd::communicator comm, - jobject resultObj) { + jobject resultObj, sycl::queue &queue) { logger::println(logger::INFO, "OneDAL (native): GPU compute start"); const bool isRoot = (comm.get_rank() == ccl_root); - homogen_table hFeaturetable = - *reinterpret_cast(pNumTabFeature); - homogen_table hLabeltable = - *reinterpret_cast(pNumTabLabel); + GpuAlgorithmFPType *htableFeatureArray = + reinterpret_cast(pNumTabFeature); + GpuAlgorithmFPType *htableLabelArray = + reinterpret_cast(pNumTabLabel); + auto featureData = sycl::malloc_shared( + featureRows * featureCols, queue); + queue + .memcpy(featureData, htableFeatureArray, + sizeof(GpuAlgorithmFPType) * featureRows * featureCols) + .wait(); + homogen_table hFeaturetable{ + queue, featureData, featureRows, featureCols, + detail::make_default_delete(queue)}; + + auto labelData = + sycl::malloc_shared(featureRows * labelCols, queue); + queue + .memcpy(labelData, htableLabelArray, + sizeof(GpuAlgorithmFPType) * featureRows * labelCols) + .wait(); + homogen_table hLabeltable{ + queue, labelData, featureRows, labelCols, + detail::make_default_delete(queue)}; logger::println(logger::INFO, "doRFRegressorOneAPICompute get_column_count = %d", hFeaturetable.get_column_count()); @@ -290,11 +310,11 @@ static jobject doRFRegressorOneAPICompute( JNIEXPORT jobject JNICALL Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabFeature, jlong pNumTabLabel, - jint executorNum, jint computeDeviceOrdinal, jint treeCount, - jint numFeaturesPerNode, jint minObservationsLeafNode, jint maxTreeDepth, - jlong seed, jint maxbins, jboolean bootstrap, jintArray gpuIdxArray, - jobject resultObj) { + JNIEnv *env, jobject obj, jlong pNumTabFeature, jlong featureRows, + jlong featureCols, jlong pNumTabLabel, jlong labelCols, jint executorNum, + jint computeDeviceOrdinal, jint treeCount, jint numFeaturesPerNode, + jint minObservationsLeafNode, jint maxTreeDepth, jlong seed, jint maxbins, + jboolean bootstrap, jintArray gpuIdxArray, jobject resultObj) { logger::println(logger::INFO, "OneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); @@ -322,10 +342,10 @@ Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTra preview::spmd::make_communicator( queue, size, rankId, kvs); jobject hashmapObj = doRFRegressorOneAPICompute( - env, pNumTabFeature, pNumTabLabel, executorNum, - computeDeviceOrdinal, treeCount, numFeaturesPerNode, - minObservationsLeafNode, maxTreeDepth, seed, maxbins, bootstrap, - comm, resultObj); + env, pNumTabFeature, featureRows, featureCols, pNumTabLabel, + labelCols, executorNum, computeDeviceOrdinal, treeCount, + numFeaturesPerNode, minObservationsLeafNode, maxTreeDepth, seed, + maxbins, bootstrap, comm, resultObj, queue); return hashmapObj; } default: { diff --git a/mllib-dal/src/main/native/KMeansImpl.cpp b/mllib-dal/src/main/native/KMeansImpl.cpp index 3998a06ea..a0bc2fd41 100644 --- a/mllib-dal/src/main/native/KMeansImpl.cpp +++ b/mllib-dal/src/main/native/KMeansImpl.cpp @@ -243,14 +243,24 @@ static jlong doKMeansDaalCompute(JNIEnv *env, jobject obj, size_t rankId, #ifdef CPU_GPU_PROFILE static jlong doKMeansOneAPICompute( - JNIEnv *env, jlong pNumTabData, jlong pNumTabCenters, jint clusterNum, - jdouble tolerance, jint iterationNum, + JNIEnv *env, jlong pNumTabData, jlong numRows, jlong numClos, + jlong pNumTabCenters, jint clusterNum, jdouble tolerance, jint iterationNum, preview::spmd::communicator comm, - jobject resultObj) { + jobject resultObj, sycl::queue &queue) { logger::println(logger::INFO, "OneDAL (native): GPU compute start"); const bool isRoot = (comm.get_rank() == ccl_root); - homogen_table htable = - *reinterpret_cast(pNumTabData); + GpuAlgorithmFPType *htableArray = + reinterpret_cast(pNumTabData); + auto data = + sycl::malloc_shared(numRows * numClos, queue); + queue + .memcpy(data, htableArray, + sizeof(GpuAlgorithmFPType) * numRows * numClos) + .wait(); + homogen_table htable{ + queue, data, numRows, numClos, + detail::make_default_delete(queue)}; + homogen_table centroids = *reinterpret_cast(pNumTabCenters); const auto kmeans_desc = kmeans_gpu::descriptor() @@ -303,10 +313,10 @@ static jlong doKMeansOneAPICompute( */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCenters( - JNIEnv *env, jobject obj, jlong pNumTabData, jlong pNumTabCenters, - jint clusterNum, jdouble tolerance, jint iterationNum, jint executorNum, - jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, - jobject resultObj) { + JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numClos, + jlong pNumTabCenters, jint clusterNum, jdouble tolerance, jint iterationNum, + jint executorNum, jint executorCores, jint computeDeviceOrdinal, + jintArray gpuIdxArray, jobject resultObj) { logger::println(logger::INFO, "OneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); @@ -352,9 +362,9 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe auto comm = preview::spmd::make_communicator( queue, size, rankId, kvs); - ret = - doKMeansOneAPICompute(env, pNumTabData, pNumTabCenters, clusterNum, - tolerance, iterationNum, comm, resultObj); + ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numClos, + pNumTabCenters, clusterNum, tolerance, + iterationNum, comm, resultObj, queue); env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); break; diff --git a/mllib-dal/src/main/native/LinearRegressionImpl.cpp b/mllib-dal/src/main/native/LinearRegressionImpl.cpp index a991140ac..753f1a7af 100644 --- a/mllib-dal/src/main/native/LinearRegressionImpl.cpp +++ b/mllib-dal/src/main/native/LinearRegressionImpl.cpp @@ -216,9 +216,10 @@ ridge_regression_compute(size_t rankId, ccl::communicator &comm, #ifdef CPU_GPU_PROFILE static jlong doLROneAPICompute(JNIEnv *env, size_t rankId, ccl::communicator &cclComm, sycl::queue &queue, - jlong pData, jlong pLabel, - jboolean jfitIntercept, jint executorNum, - jobject resultObj) { + jlong pNumTabFeature, jlong featureRows, + jlong featureCols, jlong pNumTabLabel, + jlong labelCols, jboolean jfitIntercept, + jint executorNum, jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): GPU compute start , rankid %d", rankId); const bool isRoot = (rankId == ccl_root); @@ -228,9 +229,29 @@ static jlong doLROneAPICompute(JNIEnv *env, size_t rankId, ccl::shared_ptr_class &kvs = getKvs(); auto comm = preview::spmd::make_communicator( queue, size, rankId, kvs); - - homogen_table xtrain = *reinterpret_cast(pData); - homogen_table ytrain = *reinterpret_cast(pLabel); + GpuAlgorithmFPType *htableFeatureArray = + reinterpret_cast(pNumTabFeature); + GpuAlgorithmFPType *htableLabelArray = + reinterpret_cast(pNumTabLabel); + auto featureData = sycl::malloc_shared( + featureRows * featureCols, queue); + queue + .memcpy(featureData, htableFeatureArray, + sizeof(GpuAlgorithmFPType) * featureRows * featureCols) + .wait(); + homogen_table xtrain{ + queue, featureData, featureRows, featureCols, + detail::make_default_delete(queue)}; + + auto labelData = + sycl::malloc_shared(featureRows * labelCols, queue); + queue + .memcpy(labelData, htableLabelArray, + sizeof(GpuAlgorithmFPType) * featureRows * labelCols) + .wait(); + homogen_table ytrain{ + queue, labelData, featureRows, labelCols, + detail::make_default_delete(queue)}; linear_regression_gpu::train_input local_input{xtrain, ytrain}; const auto linear_regression_desc = @@ -256,7 +277,8 @@ static jlong doLROneAPICompute(JNIEnv *env, size_t rankId, */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTrainDAL( - JNIEnv *env, jobject obj, jlong data, jlong label, jboolean fitIntercept, + JNIEnv *env, jobject obj, jlong feature, jlong featureRows, + jlong featureCols, jlong label, jlong labelCols, jboolean fitIntercept, jdouble regParam, jdouble elasticNetParam, jint executorNum, jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) { @@ -288,16 +310,14 @@ Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTra auto queue = getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu); - jlong pDatagpu = (jlong)data; - jlong pLabelgpu = (jlong)label; - resultptr = - doLROneAPICompute(env, rankId, cclComm, queue, pDatagpu, pLabelgpu, - fitIntercept, executorNum, resultObj); + resultptr = doLROneAPICompute( + env, rankId, cclComm, queue, feature, featureRows, featureCols, + label, labelCols, fitIntercept, executorNum, resultObj); env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); #endif } else { NumericTablePtr pLabel = *((NumericTablePtr *)label); - NumericTablePtr pData = *((NumericTablePtr *)data); + NumericTablePtr pData = *((NumericTablePtr *)feature); // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads(executorCores); diff --git a/mllib-dal/src/main/native/PCAImpl.cpp b/mllib-dal/src/main/native/PCAImpl.cpp index cd163b8dd..37fd7fe0d 100644 --- a/mllib-dal/src/main/native/PCAImpl.cpp +++ b/mllib-dal/src/main/native/PCAImpl.cpp @@ -182,14 +182,22 @@ static void doPCADAALCompute(JNIEnv *env, jobject obj, size_t rankId, #ifdef CPU_GPU_PROFILE static void doPCAOneAPICompute( - JNIEnv *env, jlong pNumTabData, + JNIEnv *env, jlong pNumTabData, jlong numRows, jlong numClos, preview::spmd::communicator comm, - jobject resultObj) { + jobject resultObj, sycl::queue &queue) { logger::println(logger::INFO, "oneDAL (native): GPU compute start"); const bool isRoot = (comm.get_rank() == ccl_root); - homogen_table htable = - *reinterpret_cast(pNumTabData); - + GpuAlgorithmFPType *htableArray = + reinterpret_cast(pNumTabData); + auto data = + sycl::malloc_shared(numRows * numClos, queue); + queue + .memcpy(data, htableArray, + sizeof(GpuAlgorithmFPType) * numRows * numClos) + .wait(); + homogen_table htable{ + queue, data, numRows, numClos, + detail::make_default_delete(queue)}; const auto cov_desc = covariance_gpu::descriptor{}.set_result_options( covariance_gpu::result_options::cov_matrix); @@ -248,9 +256,9 @@ static void doPCAOneAPICompute( JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, jint executorNum, - jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, - jobject resultObj) { + JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numClos, + jint executorNum, jint executorCores, jint computeDeviceOrdinal, + jintArray gpuIdxArray, jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); @@ -293,7 +301,8 @@ Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL( auto comm = preview::spmd::make_communicator( queue, size, rankId, kvs); - doPCAOneAPICompute(env, pNumTabData, comm, resultObj); + doPCAOneAPICompute(env, pNumTabData, numRows, numClos, comm, resultObj, + queue); env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); break; } diff --git a/mllib-dal/src/main/native/SummarizerImpl.cpp b/mllib-dal/src/main/native/SummarizerImpl.cpp index 0c28e1874..ec10aef28 100644 --- a/mllib-dal/src/main/native/SummarizerImpl.cpp +++ b/mllib-dal/src/main/native/SummarizerImpl.cpp @@ -202,13 +202,22 @@ static void doSummarizerDAALCompute(JNIEnv *env, jobject obj, size_t rankId, #ifdef CPU_GPU_PROFILE static void doSummarizerOneAPICompute( - JNIEnv *env, jlong pNumTabData, + JNIEnv *env, jlong pNumTabData, jlong numRows, jlong numClos, preview::spmd::communicator comm, - jobject resultObj) { + jobject resultObj, sycl::queue &queue) { logger::println(logger::INFO, "oneDAL (native): GPU compute start"); const bool isRoot = (comm.get_rank() == ccl_root); - homogen_table htable = - *reinterpret_cast(pNumTabData); + GpuAlgorithmFPType *htableArray = + reinterpret_cast(pNumTabData); + auto data = + sycl::malloc_shared(numRows * numClos, queue); + queue + .memcpy(data, htableArray, + sizeof(GpuAlgorithmFPType) * numRows * numClos) + .wait(); + homogen_table htable{ + queue, data, numRows, numClos, + detail::make_default_delete(queue)}; const auto bs_desc = basic_statistics::descriptor{}; auto t1 = std::chrono::high_resolution_clock::now(); const auto result_train = preview::compute(comm, bs_desc, htable); @@ -265,9 +274,9 @@ static void doSummarizerOneAPICompute( JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, jint executorNum, - jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, - jobject resultObj) { + JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numClos, + jint executorNum, jint executorCores, jint computeDeviceOrdinal, + jintArray gpuIdxArray, jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); @@ -310,7 +319,8 @@ Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL( auto comm = preview::spmd::make_communicator( queue, size, rankId, kvs); - doSummarizerOneAPICompute(env, pNumTabData, comm, resultObj); + doSummarizerOneAPICompute(env, pNumTabData, numRows, numClos, comm, + resultObj, queue); env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); break; } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_classification_RandomForestClassifierDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_classification_RandomForestClassifierDALImpl.h index b04209914..8c0c4ecdd 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_classification_RandomForestClassifierDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_classification_RandomForestClassifierDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JJIIIIIIIDDIJIZ[ILcom/intel/oap/mllib/classification/RandomForestResult;)Ljava/util/HashMap; */ JNIEXPORT jobject JNICALL Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassifierTrainDAL - (JNIEnv *, jobject, jlong, jlong, jint, jint, jint, jint, jint, jint, jint, jdouble, jdouble, jint, jlong, jint, jboolean, jintArray, jobject); + (JNIEnv *, jobject, jlong, jlong, jlong, jlong, jlong, jint, jint, jint, jint, jint, jint, jint, jdouble, jdouble, jint, jlong, jint, jboolean, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h index 445c364cb..595f69fb5 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_clustering_KMeansDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JJIDIIII[ILcom/intel/oap/mllib/clustering/KMeansResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCenters - (JNIEnv *, jobject, jlong, jlong, jint, jdouble, jint, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jlong, jlong, jlong, jlong, jint, jdouble, jint, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h index 1960d384c..2ac220860 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_feature_PCADALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JIII[ILcom/intel/oap/mllib/feature/PCAResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL - (JNIEnv *, jobject, jlong, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jlong, jlong, jlong, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_LinearRegressionDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_LinearRegressionDALImpl.h index d92ed6ece..28c7e8f42 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_LinearRegressionDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_LinearRegressionDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JJZDDIII[ILcom/intel/oap/mllib/regression/LiRResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTrainDAL - (JNIEnv *, jobject, jlong, jlong, jboolean, jdouble, jdouble, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jlong, jlong, jlong, jlong, jlong, jboolean, jdouble, jdouble, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_RandomForestRegressorDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_RandomForestRegressorDALImpl.h index b8f99f768..ac457b3bf 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_RandomForestRegressorDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_regression_RandomForestRegressorDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JJIIIIIIJIZ[ILcom/intel/oap/mllib/classification/RandomForestResult;)Ljava/util/HashMap; */ JNIEXPORT jobject JNICALL Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTrainDAL - (JNIEnv *, jobject, jlong, jlong, jint, jint, jint, jint, jint, jint, jlong, jint, jboolean, jintArray, jobject); + (JNIEnv *, jobject, jlong, jlong, jlong, jlong, jlong, jint, jint, jint, jint, jint, jint, jlong, jint, jboolean, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h index e5fff962d..e0e4f38af 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_CorrelationDALImpl.h @@ -12,8 +12,8 @@ extern "C" { * Method: cCorrelationTrainDAL * Signature: (JIII[ILcom/intel/oap/mllib/stat/CorrelationResult;)J */ -JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL - (JNIEnv *, jobject, jlong, jint, jint, jint, jintArray, jobject); +JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL + (JNIEnv *, jobject, jlong, jlong, jlong, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h index 7058bfa6d..754a5b645 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_stat_SummarizerDALImpl.h @@ -13,7 +13,7 @@ extern "C" { * Signature: (JIII[ILcom/intel/oap/mllib/stat/SummarizerResult;)J */ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL - (JNIEnv *, jobject, jlong, jint, jint, jint, jintArray, jobject); + (JNIEnv *, jobject, jlong, jlong, jlong, jint, jint, jint, jintArray, jobject); #ifdef __cplusplus } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index 9a925fb92..16d79fdcb 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -399,7 +399,7 @@ object OneDAL { labelCol: String, featuresCol: String, executorNum: Int, - device: Common.ComputeDevice): RDD[(Long, Long)] = { + device: Common.ComputeDevice): RDD[(String, String)] = { require(executorNum > 0) logger.info(s"Processing partitions with $executorNum executors") @@ -444,12 +444,12 @@ object OneDAL { val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) => val list = it.toList val subRowCount: Int = list.size / numberCores - val labeledPointsList: ListBuffer[Future[(Array[Double], Long)]] = - new ListBuffer[Future[(Array[Double], Long)]]() + val labeledPointsList: ListBuffer[Future[(Long, Long)]] = + new ListBuffer[Future[(Long, Long)]]() val numRows = list.size val numCols = list(0).getAs[Vector](1).toArray.size - val labelsArray = new Array[Double](numRows) + val labelsAddress = OneDAL.cNewDoubleArray(numRows.toLong) val featuresAddress= OneDAL.cNewDoubleArray(numRows.toLong * numCols) for ( i <- 0 until numberCores) { val f = Future { @@ -462,21 +462,17 @@ object OneDAL { slice.toArray.zipWithIndex.map { case (row, index) => val length = row.getAs[Vector](1).toArray.length OneDAL.cCopyDoubleArrayToNative(featuresAddress, row.getAs[Vector](1).toArray, subRowCount.toLong * numCols * i + length * index) - labelsArray(subRowCount * i + index) = row.getAs[Double](0) + OneDAL.cCopyDoubleArrayToNative(labelsAddress, Array(row.getAs[Double](0)), subRowCount * i + index) } - (labelsArray, featuresAddress) + (labelsAddress, featuresAddress) } labeledPointsList += f - - val result = Future.sequence(labeledPointsList) - Await.result(result, Duration.Inf) } - val labelsTable = new HomogenTable(numRows.toLong, 1, labelsArray, - device) - val featuresTable = new HomogenTable(numRows.toLong, numCols.toLong, featuresAddress, Common.DataType.FLOAT64, - device) + val result = Future.sequence(labeledPointsList) + Await.result(result, Duration.Inf) + + Iterator((featuresAddress + "_" + numRows.toLong + "_" + numCols.toLong, labelsAddress + "_" + numRows.toLong + "_" + 1)) - Iterator((featuresTable.getcObejct(), labelsTable.getcObejct())) }.setName("coalescedTables").cache() coalescedTables.count() @@ -583,7 +579,7 @@ object OneDAL { } def coalesceVectorsToHomogenTables(data: RDD[Vector], executorNum: Int, - device: Common.ComputeDevice): RDD[Long] = { + device: Common.ComputeDevice): RDD[String] = { logger.info(s"Processing partitions with $executorNum executors") val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1) @@ -640,13 +636,11 @@ object OneDAL { } futureList += f + } val result = Future.sequence(futureList) Await.result(result, Duration.Inf) - } - val table = new HomogenTable(numRows.toLong, numCols.toLong, targetArrayAddress, - Common.DataType.FLOAT64, device) - Iterator(table.getcObejct()) + Iterator(targetArrayAddress + "_" + numRows.toLong + "_" + numCols.toLong) }.setName("coalescedTables").cache() coalescedTables.count() // Unpersist instances RDD diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index ed5cbe3fa..e671776f2 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -82,9 +82,22 @@ class RandomForestClassifierDALImpl(val uid: String, rfcTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { - (rank: Int, tables: Iterator[(Long, Long)]) => - val (featureTabAddr, lableTabAddr) = tables.next() - + (rank: Int, tables: Iterator[(String, String)]) => + val (feature, label) = tables.next() + val (featureTabAddr : Long, featureRows : Long, featureColumns : Long) = + if (useDevice == "GPU") { + val parts = feature.toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (feature.toString.toLong, 0, 0) + } + val (labelTabAddr : Long, labelRows : Long, labelColumns : Long) = + if (useDevice == "GPU") { + val parts = feature.toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (label.toString.toLong, 0, 0) + } val gpuIndices = if (useDevice == "GPU") { if (isTest) { Array(0) @@ -99,7 +112,10 @@ class RandomForestClassifierDALImpl(val uid: String, val result = new RandomForestResult val hashmap = cRFClassifierTrainDAL( featureTabAddr, - lableTabAddr, + featureRows, + featureColumns, + labelTabAddr, + labelColumns, executorNum, computeDevice.ordinal(), classCount, @@ -140,7 +156,10 @@ class RandomForestClassifierDALImpl(val uid: String, } @native private[mllib] def cRFClassifierTrainDAL(featureTabAddr: Long, + numRows: Long, + numCols: Long, lableTabAddr: Long, + labelNumCols: Long, executorNum: Int, computeDeviceOrdinal: Int, classCount: Int, diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 9cd275b04..5c9936197 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -58,7 +58,7 @@ class KMeansDALImpl(var nClusters: Int, }.count() kmeansTimer.record("OneCCL Init") - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => var cCentroids = 0L val result = new KMeansResult() val gpuIndices = if (useDevice == "GPU") { @@ -68,14 +68,23 @@ class KMeansDALImpl(var nClusters: Int, null } - val tableArr = table.next() + val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { + val parts = iter.next().toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (iter.next(), 0, 0) + } + val initCentroids = if (useDevice == "GPU") { OneDAL.makeHomogenTable(centers, computeDevice).getcObejct() } else { OneDAL.makeNumericTable(centers).getCNumericTable } + cCentroids = cKMeansOneapiComputeWithInitCenters( tableArr, + rows, + columns, initCentroids, nClusters, tolerance, @@ -129,13 +138,15 @@ class KMeansDALImpl(var nClusters: Int, } @native private[mllib] def cKMeansOneapiComputeWithInitCenters(data: Long, - centers: Long, - clusterNum: Int, - tolerance: Double, - iterationNum: Int, - executorNum: Int, - executorCores: Int, - computeDeviceOrdinal: Int, - gpuIndices: Array[Int], - result: KMeansResult): Long + numRows: Long, + numCols: Long, + centers: Long, + clusterNum: Int, + tolerance: Double, + iterationNum: Int, + executorNum: Int, + executorCores: Int, + computeDeviceOrdinal: Int, + gpuIndices: Array[Int], + result: KMeansResult): Long } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 03ae340a4..1e1713f1f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -65,8 +65,13 @@ class PCADALImpl(val k: Int, }.count() pcaTimer.record("OneCCL Init") - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => - val tableArr = table.next() + val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { + val parts = iter.next().toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (iter.next(), 0, 0) + } val result = new PCAResult() val gpuIndices = if (useDevice == "GPU") { val resources = TaskContext.get().resources() @@ -76,6 +81,8 @@ class PCADALImpl(val k: Int, } cPCATrainDAL( tableArr, + rows, + columns, executorNum, executorCores, computeDevice.ordinal(), @@ -209,6 +216,8 @@ class PCADALImpl(val k: Int, // Single entry to call Correlation PCA DAL backend with parameter K @native private[mllib] def cPCATrainDAL(data: Long, + numRows: Long, + numCols: Long, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index 86d574557..65c613780 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -107,8 +107,23 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, lrTimer.record("Data Convertion") val results = labeledPointsTables.mapPartitionsWithIndex { - case (rank: Int, tables: Iterator[(Long, Long)]) => - val (featureTabAddr, lableTabAddr) = tables.next() + case (rank: Int, tables: Iterator[(String, String)]) => + val (feature, label) = tables.next() + val (featureTabAddr : Long, featureRows : Long, featureColumns : Long) = + if (useDevice == "GPU") { + val parts = feature.toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (feature.toString.toLong, 0, 0) + } + val (labelTabAddr : Long, labelRows : Long, labelColumns : Long) = + if (useDevice == "GPU") { + val parts = feature.toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (label.toString.toLong, 0, 0) + } + OneCCL.init(executorNum, rank, kvsIPPort) val result = new LiRResult() @@ -127,7 +142,10 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, val cbeta = cLinearRegressionTrainDAL( featureTabAddr, - lableTabAddr, + featureRows, + featureColumns, + labelTabAddr, + labelColumns, fitIntercept, regParam, elasticNetParam, @@ -169,7 +187,10 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, // Single entry to call Linear Regression DAL backend with parameters @native private def cLinearRegressionTrainDAL(data: Long, + numRows: Long, + numCols: Long, label: Long, + labelNumCols: Long, fitIntercept: Boolean, regParam: Double, elasticNetParam: Double, diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index 0b2c68a2e..d458a92b3 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -76,9 +76,22 @@ class RandomForestRegressorDALImpl(val uid: String, rfrTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { - (rank: Int, tables: Iterator[(Long, Long)]) => - val (featureTabAddr, lableTabAddr) = tables.next() - + (rank: Int, tables: Iterator[(String, String)]) => + val (feature, label) = tables.next() + val (featureTabAddr : Long, featureRows : Long, featureColumns : Long) = + if (useDevice == "GPU") { + val parts = feature.toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (feature.toString.toLong, 0, 0) + } + val (labelTabAddr : Long, labelRows : Long, labelColumns : Long) = + if (useDevice == "GPU") { + val parts = feature.toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (label.toString.toLong, 0, 0) + } val gpuIndices = if (useDevice == "GPU") { if (isTest) { Array(0) @@ -94,7 +107,10 @@ class RandomForestRegressorDALImpl(val uid: String, val result = new RandomForestResult val hashmap = cRFRegressorTrainDAL( featureTabAddr, - lableTabAddr, + featureRows, + featureColumns, + labelTabAddr, + labelColumns, executorNum, computeDevice.ordinal(), treeCount, @@ -141,7 +157,10 @@ class RandomForestRegressorDALImpl(val uid: String, } @native private[mllib] def cRFRegressorTrainDAL(featureTabAddr: Long, + numRows: Long, + numCols: Long, lableTabAddr: Long, + labelNumCols: Long, executorNum: Int, computeDeviceOrdinal: Int, treeCount: Int, diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 800436387..d8df97b3f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -52,8 +52,13 @@ class CorrelationDALImpl( }.count() corTimer.record("OneCCL Init") - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => - val tableArr = table.next() + val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { + val parts = iter.next().toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (iter.next(), 0, 0) + } val computeStartTime = System.nanoTime() @@ -66,6 +71,8 @@ class CorrelationDALImpl( } cCorrelationTrainDAL( tableArr, + rows, + columns, executorNum, executorCores, computeDevice.ordinal(), @@ -113,6 +120,8 @@ class CorrelationDALImpl( @native private[mllib] def cCorrelationTrainDAL(data: Long, + numRows: Long, + numCols: Long, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index e325f007e..29275d689 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -53,8 +53,13 @@ class SummarizerDALImpl(val executorNum: Int, }.count() sumTimer.record("OneCCL Init") - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => - val tableArr = table.next() + val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { + val parts = iter.next().toString.split("_") + (parts(0).toLong, parts(1).toLong, parts(2).toLong) + } else { + (iter.next(), 0, 0) + } val computeStartTime = System.nanoTime() @@ -67,6 +72,8 @@ class SummarizerDALImpl(val executorNum: Int, } cSummarizerTrainDAL( tableArr, + rows, + columns, executorNum, executorCores, computeDevice.ordinal(), @@ -145,6 +152,8 @@ class SummarizerDALImpl(val executorNum: Int, } @native private[mllib] def cSummarizerTrainDAL(data: Long, + numRows: Long, + numCols: Long, executorNum: Int, executorCores: Int, computeDeviceOrdinal: Int, diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala index d79431c77..4836dd4e5 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala @@ -45,7 +45,7 @@ class CorrelationHomogenTableSuite extends FunctionsSuite with Logging { val correlationDAL = new CorrelationDALImpl(1, 1) val gpuIndices = Array(0) val result = new CorrelationResult() - correlationDAL.cCorrelationTrainDAL(dataTable.getcObejct(), 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result); + correlationDAL.cCorrelationTrainDAL(dataTable.getcObejct(), 200, 10, 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result); val correlationMatrix = TestCommon.getMatrixFromTable(OneDAL.makeHomogenTable( result.getCorrelationNumericTable), TestCommon.getComputeDevice) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/KmeansHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/KmeansHomogenTableSuite.scala index f7a95e760..12d5c70e0 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/KmeansHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/KmeansHomogenTableSuite.scala @@ -49,7 +49,7 @@ class KmeansHomogenTableSuite extends FunctionsSuite with Logging { OneCCL.init(1, 1, "127.0.0.1_3000") val gpuIndices = Array(0) val result = new KMeansResult(); - val centroids = kmeansDAL.cKMeansOneapiComputeWithInitCenters(dataTable.getcObejct(), centroidsTable.getcObejct(),10, 0.001, + val centroids = kmeansDAL.cKMeansOneapiComputeWithInitCenters(dataTable.getcObejct(), 10000, 20, centroidsTable.getcObejct(),10, 0.001, 5, 1, 1, TestCommon.getComputeDevice.ordinal(), gpuIndices, result); val resultVectors = OneDAL.homogenTableToVectors(OneDAL.makeHomogenTable(centroids), TestCommon.getComputeDevice); assertArrayEquals(TestCommon.convertArray(expectCentroids), TestCommon.convertArray(resultVectors), 0.000001) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/PCAHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/PCAHomogenTableSuite.scala index c56c357b7..f29d52684 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/PCAHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/PCAHomogenTableSuite.scala @@ -47,7 +47,7 @@ class PCAHomogenTableSuite extends FunctionsSuite with Logging { val pcaDAL = new PCADALImpl(5, 1, 1) val gpuIndices = Array(0) val result = new PCAResult() - pcaDAL.cPCATrainDAL(dataTable.getcObejct(), 1, 1, TestCommon.getComputeDevice.ordinal(), gpuIndices, result); + pcaDAL.cPCATrainDAL(dataTable.getcObejct(), 1000, 10, 1, 1, TestCommon.getComputeDevice.ordinal(), gpuIndices, result); val pcNumericTable = OneDAL.makeHomogenTable(result.getPcNumericTable) val explainedVarianceNumericTable = OneDAL.makeHomogenTable( result.getExplainedVarianceNumericTable) @@ -79,7 +79,7 @@ class PCAHomogenTableSuite extends FunctionsSuite with Logging { val pcaDAL = new PCADALImpl(5, 1, 1) val gpuIndices = Array(0) val result = new PCAResult() - pcaDAL.cPCATrainDAL(dataTable.getcObejct(), 1, 1, TestCommon.getComputeDevice.ordinal(), gpuIndices, result); + pcaDAL.cPCATrainDAL(dataTable.getcObejct(), 1000, 10, 1, 1, TestCommon.getComputeDevice.ordinal(), gpuIndices, result); val pcNumericTable = OneDAL.makeHomogenTable(result.getPcNumericTable) val explainedVarianceNumericTable = OneDAL.makeHomogenTable( result.getExplainedVarianceNumericTable) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala index da94540a5..a1767626b 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala @@ -35,7 +35,7 @@ class SummarizerHomogenTableSuite extends FunctionsSuite with Logging{ val summarizerDAL = new SummarizerDALImpl(1, 1) val gpuIndices = Array(0) val result = new SummarizerResult() - summarizerDAL.cSummarizerTrainDAL(dataTable.getcObejct(), 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result) + summarizerDAL.cSummarizerTrainDAL(dataTable.getcObejct(), 200, 10, 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result) val meanTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMeanNumericTable), Common.ComputeDevice.HOST) val varianceTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getVarianceNumericTable), Common.ComputeDevice.HOST) val minimumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMinimumNumericTable), Common.ComputeDevice.HOST) diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala index e44ab33a0..25f0bc011 100644 --- a/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala @@ -84,8 +84,8 @@ class oneDALSuite extends FunctionsSuite with Logging { val mergedata = OneDAL.coalesceLabelPointsToHomogenTables(df, "label", "features", 1, TestCommon.getComputeDevice) val results = mergedata.collect() - val featureTable = new HomogenTable(results(0)._1) - val labelTable = new HomogenTable(results(0)._2) + val featureTable = new HomogenTable(results(0)._1.split("_")(0).toLong) + val labelTable = new HomogenTable(results(0)._2.split("_")(0).toLong) val fData: Array[Double] = featureTable.getDoubleData() val lData: Array[Double] = labelTable.getDoubleData() @@ -111,7 +111,7 @@ class oneDALSuite extends FunctionsSuite with Logging { val result = OneDAL.coalesceVectorsToHomogenTables(rddVectors, 1, TestCommon.getComputeDevice) val tableAddr = result.collect() - val table = new HomogenTable(tableAddr(0)) + val table = new HomogenTable(tableAddr(0).split("_")(0).toLong) val rData: Array[Double] = table.getDoubleData() assertArrayEquals(rData, expectData) }