diff --git a/mllib-dal/src/main/native/KMeansOneAPIImpl.cpp b/mllib-dal/src/main/native/KMeansOneAPIImpl.cpp index a68b8568f..4c4b94c1e 100644 --- a/mllib-dal/src/main/native/KMeansOneAPIImpl.cpp +++ b/mllib-dal/src/main/native/KMeansOneAPIImpl.cpp @@ -35,46 +35,32 @@ using namespace std; using namespace oneapi::dal; const int ccl_root = 0; -typedef std::shared_ptr homogenPtr; - -std::mutex kmtx; -std::vector cVector; - -static void saveShareHomogenPtrVector(const homogenPtr &ptr) { - kmtx.lock(); - cVector.push_back(ptr); - kmtx.unlock(); -} - static jlong doKMeansOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, - jlong pNumTabCenters, jint cluster_num, - jdouble tolerance, jint iteration_num, - jint executor_num, const ccl::string &ipPort, - jint cComputeDevice, jobject resultObj) { - std::cout << "oneDAL (native): OneAPI compute start , rankid %ld " << rankId + jlong pNumTabCenters, jint clusterNum, + jdouble tolerance, jint iterationNum, + jint executorNum, const ccl::string &ipPort, + jint computeDeviceOrdinal, + jobject resultObj) { + std::cout << "oneDAL (native): GPU/CPU compute start , rankid = " << rankId + << "; device = " << computeDeviceOrdinal << "(0:HOST;1:GPU;2:CPU)" << std::endl; const bool isRoot = (rankId == ccl_root); - compute_device device = getComputeDevice(cComputeDevice); + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); homogen_table htable = *reinterpret_cast(pNumTabData); homogen_table centroids = *reinterpret_cast(pNumTabCenters); const auto kmeans_desc = kmeans::descriptor<>() - .set_cluster_count(cluster_num) - .set_max_iteration_count(iteration_num) + .set_cluster_count(clusterNum) + .set_max_iteration_count(iterationNum) .set_accuracy_threshold(tolerance); kmeans::train_input local_input{htable, centroids}; auto queue = getQueue(device); auto comm = preview::spmd::make_communicator( - queue, executor_num, rankId, ipPort); + queue, executorNum, rankId, ipPort); kmeans::train_result result_train = preview::train(comm, kmeans_desc, local_input); if (isRoot) { - std::cout << "iteration_num: " << iteration_num << std::endl; - std::cout << "Iteration count: " << result_train.get_iteration_count() - << std::endl; - std::cout << "Objective function value: " - << result_train.get_objective_function_value() << std::endl; // Get the class of the input object jclass clazz = env->GetObjectClass(resultObj); // Get Field references @@ -88,9 +74,9 @@ static jlong doKMeansOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, env->SetDoubleField(resultObj, totalCostField, result_train.get_objective_function_value()); - homogenPtr centroidsPtr = std::make_shared( + HomogenTablePtr centroidsPtr = std::make_shared( result_train.get_model().get_centroids()); - saveShareHomogenPtrVector(centroidsPtr); + saveHomogenTablePtrToVector(centroidsPtr); return (jlong)centroidsPtr.get(); } else { return (jlong)0; @@ -105,18 +91,16 @@ static jlong doKMeansOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCenters( JNIEnv *env, jobject obj, jlong pNumTabData, jlong pNumTabCenters, - jint cluster_num, jdouble tolerance, jint iteration_num, jint executor_num, - jint cComputeDevice, jint rankId, jstring ip_port, jobject resultObj) { - std::cout << "oneDAL (native): use GPU DPC++ kernels with " << std::endl; - const char *ipport = env->GetStringUTFChars(ip_port, 0); - std::string ipPort = std::string(ipport); + jint clusterNum, jdouble tolerance, jint iterationNum, jint executorNum, + jint computeDeviceOrdinal, jint rankId, jstring ipPort, jobject resultObj) { + std::cout << "oneDAL (native): use GPU DPC++ kernels " << std::endl; + const char *ipPortPtr = env->GetStringUTFChars(ipPort, 0); + std::string ipPortStr = std::string(ipPortPtr); jlong ret = 0L; - printf("oneDAL (native): KMeansOneapiComputeWithInitCenters %d \n", - cComputeDevice); ret = doKMeansOneAPICompute( - env, rankId, pNumTabData, pNumTabCenters, cluster_num, tolerance, - iteration_num, executor_num, ipPort, cComputeDevice, resultObj); - env->ReleaseStringUTFChars(ip_port, ipport); + env, rankId, pNumTabData, pNumTabCenters, clusterNum, tolerance, + iterationNum, executorNum, ipPortStr, computeDeviceOrdinal, resultObj); + env->ReleaseStringUTFChars(ipPort, ipPortPtr); return ret; } #endif diff --git a/mllib-dal/src/main/native/PCAOneAPIImpl.cpp b/mllib-dal/src/main/native/PCAOneAPIImpl.cpp new file mode 100644 index 000000000..f0e7679b1 --- /dev/null +++ b/mllib-dal/src/main/native/PCAOneAPIImpl.cpp @@ -0,0 +1,90 @@ +/******************************************************************************* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +#include +#include + +#ifdef CPU_GPU_PROFILE +#include "GPU.h" +#ifndef ONEDAL_DATA_PARALLEL +#define ONEDAL_DATA_PARALLEL +#endif +#include "Communicator.hpp" +#include "OutputHelpers.hpp" +#include "com_intel_oap_mllib_feature_PCADALImpl.h" +#include "oneapi/dal/algo/pca.hpp" +#include "oneapi/dal/table/homogen.hpp" +#include "service.h" + +using namespace std; +using namespace oneapi::dal; +const int ccl_root = 0; + +static void doPCAOneAPICompute(JNIEnv *env, jint rankId, jlong pNumTabData, + jint executorNum, const ccl::string &ipPort, + jint computeDeviceOrdinal, jobject resultObj) { + const bool isRoot = (rankId == ccl_root); + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); + std::cout << "oneDAL (native): GPU/CPU compute start , rankid = " << rankId + << "; device = " << computeDeviceOrdinal << "(0:HOST;1:GPU;2:CPU)" + << std::endl; + homogen_table htable = + *reinterpret_cast(pNumTabData); + + const auto pca_desc = pca::descriptor{}; + auto queue = getQueue(device); + auto comm = preview::spmd::make_communicator( + queue, executorNum, rankId, ipPort); + pca::train_input local_input{htable}; + const auto result_train = preview::train(comm, pca_desc, local_input); + if (isRoot) { + // Return all eigenvalues & eigenvectors + // Get the class of the input object + jclass clazz = env->GetObjectClass(resultObj); + // Get Field references + jfieldID pcNumericTableField = + env->GetFieldID(clazz, "pcNumericTable", "J"); + jfieldID explainedVarianceNumericTableField = + env->GetFieldID(clazz, "explainedVarianceNumericTable", "J"); + + HomogenTablePtr eigenvectors = + std::make_shared(result_train.get_eigenvectors()); + saveHomogenTablePtrToVector(eigenvectors); + + HomogenTablePtr eigenvalues = + std::make_shared(result_train.get_eigenvalues()); + saveHomogenTablePtrToVector(eigenvalues); + + env->SetLongField(resultObj, pcNumericTableField, + (jlong)eigenvectors.get()); + env->SetLongField(resultObj, explainedVarianceNumericTableField, + (jlong)eigenvalues.get()); + } +} + +JNIEXPORT jlong JNICALL +Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL( + JNIEnv *env, jobject obj, jlong pNumTabData, jint executorNum, + jint computeDeviceOrdinal, jint rankId, jstring ipPort, jobject resultObj) { + std::cout << "oneDAL (native): use GPU DPC++ kernels " << std::endl; + const char *ipPortPtr = env->GetStringUTFChars(ipPort, 0); + std::string ipPortStr = std::string(ipPortPtr); + doPCAOneAPICompute(env, rankId, pNumTabData, executorNum, ipPortStr, + computeDeviceOrdinal, resultObj); + env->ReleaseStringUTFChars(ipPort, ipPortPtr); + return 0; +} +#endif diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index eab407e22..10db5f50c 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -17,11 +17,7 @@ package com.intel.oap.mllib.clustering import com.intel.oap.mllib.Utils.getOneCCLIPPort -<<<<<<< HEAD import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} -======= -import com.intel.oap.mllib.{OneCCL, OneDAL} ->>>>>>> Migrate KMeans daal to DPC++ (#209) import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index b815daf6c..5eea3a32e 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -164,15 +164,15 @@ class PCADALImpl(val k: Int, val numCols = table.getNumberOfColumns.toInt require(k <= numRows, "k should be less or equal to row number") - val arrayDouble = getDoubleBufferDataFromDAL(table, numRows, numCols) + val arrayDouble = getDoubleBufferDataFromDAL(table, numRows, device) // Column-major, transpose of top K rows of NumericTable new DenseMatrix(numCols, k, arrayDouble.slice(0, numCols * k), false) } - private def getExplainedVarianceFromDAL(table_1xn: NumericTable, k: Int): DenseVector = { - val dataNumCols = table_1xn.getNumberOfColumns.toInt - val arrayDouble = getDoubleBufferDataFromDAL(table_1xn, 1, dataNumCols) + private[mllib] def getExplainedVarianceFromDAL(table_1xn: HomogenTable, k: Int, + device: Common.ComputeDevice): DenseVector = { + val arrayDouble = getDoubleBufferDataFromDAL(table_1xn, 1, device) val sum = arrayDouble.sum val topK = Arrays.copyOfRange(arrayDouble, 0, k) for (i <- 0 until k) @@ -182,15 +182,13 @@ class PCADALImpl(val k: Int, // table.asInstanceOf[HomogenNumericTable].getDoubleArray() would error on GPU, // so use table.getBlockOfRows instead of it. - private def getDoubleBufferDataFromDAL(table: NumericTable, + private[mllib] def getDoubleBufferDataFromDAL(table: HomogenTable, numRows: Int, - numCols: Int): Array[Double] = { - var dataDouble: DoubleBuffer = null + device: Common.ComputeDevice): Array[Double] = { // returned DoubleBuffer is ByteByffer, need to copy as double array - dataDouble = table.getBlockOfRows(0, numRows, dataDouble) - val arrayDouble: Array[Double] = new Array[Double](numRows * numCols) - dataDouble.get(arrayDouble) + val accessor = new RowAccessor(table.getcObejct(), device) + val arrayDouble: Array[Double] = accessor.pullDouble(0, numRows) arrayDouble }