Skip to content

Commit

Permalink
create kvs by store file
Browse files Browse the repository at this point in the history
  • Loading branch information
minmingzhu committed Aug 23, 2024
1 parent 00e411a commit 12774a8
Show file tree
Hide file tree
Showing 18 changed files with 383 additions and 139 deletions.
21 changes: 11 additions & 10 deletions mllib-dal/src/main/native/CorrelationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ static void doCorrelationOneAPICompute(

JNIEXPORT jlong JNICALL
Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows, jlong numCols,
jint executorNum, jint executorCores, jint computeDeviceOrdinal,
jintArray gpuIdxArray, jstring ip_port, jobject resultObj) {
JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows,
jlong numCols, jint executorNum, jint executorCores,
jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring store_path,
jobject resultObj) {
logger::println(logger::INFO,
"oneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());
Expand All @@ -225,17 +226,17 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d", rank);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);
const char* path = env->GetStringUTFChars(store_path, nullptr);
ccl::string kvs_store_path(str);
auto comm = createDalCommunicator(executorNum, rank, kvs_store_path);

doCorrelationOneAPICompute(env, pNumTabData, numRows, numCols, comm,
resultObj);
env->ReleaseStringUTFChars(ip_port, str);
env->ReleaseStringUTFChars(store_path, path);
break;
}
#endif
Expand Down
26 changes: 13 additions & 13 deletions mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,26 +300,26 @@ static jobject doRFClassifierOneAPICompute(
*/
JNIEXPORT jobject JNICALL
Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassifierTrainDAL(
JNIEnv *env, jobject obj, jint rank, jlong pNumTabFeature, jlong featureRows,
jlong featureCols, jlong pNumTabLabel, jlong labelCols, jint executorNum,
jint computeDeviceOrdinal, jint classCount, jint treeCount,
jint numFeaturesPerNode, jint minObservationsLeafNode,
JNIEnv *env, jobject obj, jint rank, jlong pNumTabFeature,
jlong featureRows, jlong featureCols, jlong pNumTabLabel, jlong labelCols,
jint executorNum, jint computeDeviceOrdinal, jint classCount,
jint treeCount, jint numFeaturesPerNode, jint minObservationsLeafNode,
jint minObservationsSplitNode, jdouble minWeightFractionLeafNode,
jdouble minImpurityDecreaseSplitNode, jint maxTreeDepth, jlong seed,
jint maxBins, jboolean bootstrap, jintArray gpuIdxArray,
jstring ip_port, jobject resultObj) {
jint maxBins, jboolean bootstrap, jintArray gpuIdxArray, jstring store_path,
jobject resultObj) {
logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels");

ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d", rank);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);
const char* path = env->GetStringUTFChars(store_path, nullptr);
ccl::string kvs_store_path(str);
auto comm = createDalCommunicator(executorNum, rank, kvs_store_path);

jobject hashmapObj = doRFClassifierOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
Expand All @@ -328,7 +328,7 @@ Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassif
minObservationsSplitNode, minWeightFractionLeafNode,
minImpurityDecreaseSplitNode, maxTreeDepth, seed, maxBins,
bootstrap, comm, resultObj);
env->ReleaseStringUTFChars(ip_port, str);
env->ReleaseStringUTFChars(store_path, path);
return hashmapObj;
}
default: {
Expand Down
25 changes: 13 additions & 12 deletions mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,32 +292,33 @@ static jobject doRFRegressorOneAPICompute(

JNIEXPORT jobject JNICALL
Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTrainDAL(
JNIEnv *env, jobject obj, jint rank, jlong pNumTabFeature, jlong featureRows,
jlong featureCols, jlong pNumTabLabel, jlong labelCols, jint executorNum,
jint computeDeviceOrdinal, jint treeCount, jint numFeaturesPerNode,
jint minObservationsLeafNode, jint maxTreeDepth, jlong seed, jint maxbins,
jboolean bootstrap, jintArray gpuIdxArray, jstring ip_port, jobject resultObj) {
JNIEnv *env, jobject obj, jint rank, jlong pNumTabFeature,
jlong featureRows, jlong featureCols, jlong pNumTabLabel, jlong labelCols,
jint executorNum, jint computeDeviceOrdinal, jint treeCount,
jint numFeaturesPerNode, jint minObservationsLeafNode, jint maxTreeDepth,
jlong seed, jint maxbins, jboolean bootstrap, jintArray gpuIdxArray,
jstring store_path, jobject resultObj) {
logger::println(logger::INFO,
"OneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());

ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d", rank);
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);

const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);
const char* path = env->GetStringUTFChars(store_path, nullptr);
ccl::string kvs_store_path(str);
auto comm = createDalCommunicator(executorNum, rank, kvs_store_path);

jobject hashmapObj = doRFRegressorOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, treeCount,
numFeaturesPerNode, minObservationsLeafNode, maxTreeDepth, seed,
maxbins, bootstrap, comm, resultObj);
env->ReleaseStringUTFChars(ip_port, str);
env->ReleaseStringUTFChars(store_path, path);
return hashmapObj;
}
default: {
Expand Down
122 changes: 88 additions & 34 deletions mllib-dal/src/main/native/GPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

#include "GPU.h"
#include "Logger.h"
#define STORE_TIMEOUT_SEC 120
#define KVS_CREATE_SUCCESS 0
#define KVS_CREATE_FAILURE -1

typedef std::shared_ptr<sycl::queue> queuePtr;

static std::mutex g_mtx;
static std::vector<sycl::queue> g_queueVector;
std::shared_ptr<file_store> store;

static std::vector<sycl::device> get_gpus() {
auto platforms = sycl::platform::get_platforms();
Expand All @@ -24,6 +27,55 @@ static std::vector<sycl::device> get_gpus() {
return {};
}

int create_kvs_by_store(std::shared_ptr<file_store> store, int rank,
ccl::shared_ptr_class<ccl::kvs> &kvs) {
logger::println(logger::INFO, "OneCCL (native): create_kvs_by_store ");
auto t1 = std::chrono::high_resolution_clock::now();
ccl::kvs::address_type main_addr;
auto start = std::chrono::system_clock::now();
if (rank == 0) {
kvs = ccl::create_main_kvs();
main_addr = kvs->get_address();
if (store->write((void *)main_addr.data(), main_addr.size()) < 0) {
logger::println(
logger::INFO,
"OneCCL (native): error occurred during write attempt");
kvs.reset();
return KVS_CREATE_FAILURE;
}
auto end = std::chrono::system_clock::now();
auto exec_time =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(end -
start)
.count();
logger::println(logger::INFO,
"OneCCL (native): write to store time %f secs",
exec_time / 1000);
} else {
if (store->read((void *)main_addr.data(), main_addr.size()) < 0) {
logger::println(
logger::INFO,
"OneCCL (native): error occurred during read attempt");
kvs.reset();
return KVS_CREATE_FAILURE;
}
auto end = std::chrono::system_clock::now();
auto exec_time =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(end -
start)
.count();
logger::println(logger::INFO,
"OneCCL (native): read from store time %f secs",
exec_time / 1000);
kvs = ccl::create_kvs(main_addr);
}
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
return KVS_CREATE_SUCCESS;
}

static int getLocalRank(ccl::communicator &comm, int size, int rank) {
/* Obtain local rank among nodes sharing the same host name */
char zero = static_cast<char>(0);
Expand Down Expand Up @@ -113,43 +165,45 @@ sycl::queue getQueue(const ComputeDevice device) {
}
}

preview::spmd::communicator<preview::spmd::device_memory_access::usm>
createDalCommunicator(const jint executorNum, const jint rank,
const ccl::string kvs_store_path) {
auto gpus = get_gpus();

preview::spmd::communicator<preview::spmd::device_memory_access::usm> createDalCommunicator(const jint executorNum, const jint rank, const ccl::string ccl_ip_port){
auto gpus = get_gpus();

auto t1 = std::chrono::high_resolution_clock::now();

ccl::init();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
auto t1 = std::chrono::high_resolution_clock::now();

logger::println(logger::INFO, "OneCCL singleton init took %f secs",
duration / 1000);
ccl::init();

t1 = std::chrono::high_resolution_clock::now();
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();

auto kvs_attr = ccl::create_kvs_attr();
logger::println(logger::INFO, "OneCCL singleton init took %f secs",
duration / 1000);

kvs_attr.set<ccl::kvs_attr_id::ip_port>(ccl_ip_port);
t1 = std::chrono::high_resolution_clock::now();
ccl::shared_ptr_class<ccl::kvs> kvs;

ccl::shared_ptr_class<ccl::kvs> kvs = ccl::create_main_kvs(kvs_attr);

t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs",
duration / 1000);
sycl::queue queue{gpus[0]};
t1 = std::chrono::high_resolution_clock::now();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
return comm;
store = std::make_shared<file_store>(
kvs_store_path, rank, std::chrono::seconds(STORE_TIMEOUT_SEC));
if (create_kvs_by_store(store, rank, kvs) != KVS_CREATE_SUCCESS) {
logger::println(logger::INFO, "can not create kvs by store");
throw std::runtime_error("Failed to create communicator");
}
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs",
duration / 1000);
sycl::queue queue{gpus[0]};
t1 = std::chrono::high_resolution_clock::now();
auto comm = preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
return comm;
}
7 changes: 4 additions & 3 deletions mllib-dal/src/main/native/GPU.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#pragma once

#include "Communicator.hpp"
#include "service.h"
#include "store.hpp"
#include <CL/cl.h>
#include <CL/sycl.hpp>
#include <daal_sycl.h>
#include <jni.h>
#include <oneapi/ccl.hpp>
#include "Communicator.hpp"

sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices);

sycl::queue getQueue(const ComputeDevice device);
preview::spmd::communicator<preview::spmd::device_memory_access::usm> createDalCommunicator(jint executorNum, jint rank, ccl::string ccl_ip_port);
preview::spmd::communicator<preview::spmd::device_memory_access::usm>
createDalCommunicator(jint executorNum, jint rank, ccl::string ccl_ip_port);
23 changes: 12 additions & 11 deletions mllib-dal/src/main/native/KMeansImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,11 @@ static jlong doKMeansOneAPICompute(
*/
JNIEXPORT jlong JNICALL
Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCenters(
JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows, jlong numCols,
jlong pNumTabCenters, jint clusterNum, jdouble tolerance, jint iterationNum,
jint executorNum, jint executorCores, jint computeDeviceOrdinal,
jintArray gpuIdxArray, jstring ip_port, jobject resultObj) {
JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows,
jlong numCols, jlong pNumTabCenters, jint clusterNum, jdouble tolerance,
jint iterationNum, jint executorNum, jint executorCores,
jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring store_path,
jobject resultObj) {
logger::println(logger::INFO,
"OneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());
Expand Down Expand Up @@ -338,19 +339,19 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d", rank);
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);

const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);
const char* path = env->GetStringUTFChars(store_path, nullptr);
ccl::string kvs_store_path(str);
auto comm = createDalCommunicator(executorNum, rank, kvs_store_path);

ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numCols,
pNumTabCenters, clusterNum, tolerance,
iterationNum, comm, resultObj);

env->ReleaseStringUTFChars(ip_port, str);
env->ReleaseStringUTFChars(store_path, path);
break;
}
#endif
Expand Down
Loading

0 comments on commit 12774a8

Please sign in to comment.