Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-184]Fix code style issues #195

Merged
merged 8 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions mllib-dal/src/main/java/com/intel/oap/mllib/LibLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ public static String getTempSubDir() {
* Load all native libs
*/
public static synchronized void loadLibraries() throws IOException {
if (isLoaded)
if (isLoaded) {
return;
}

if (!loadLibSYCL()) {
log.debug("SYCL libraries are not available, will load CPU libraries only.");
Expand Down Expand Up @@ -80,7 +81,8 @@ private static synchronized void loadLibCCL() throws IOException {
*/
private static synchronized Boolean loadLibSYCL() throws IOException {
// Check if SYCL libraries are available
InputStream streamIn = LibLoader.class.getResourceAsStream(LIBRARY_PATH_IN_JAR + "/libsycl.so.5");
InputStream streamIn = LibLoader.class.getResourceAsStream(LIBRARY_PATH_IN_JAR +
"/libsycl.so.5");
if (streamIn == null) {
return false;
}
Expand Down Expand Up @@ -160,7 +162,7 @@ private static void loadFromJar(String path, String name) throws IOException {
streamIn.close();
}

System.load(fileOut.toString());
System.load(fileOut.toString());
log.debug("DONE: Loading library " + fileOut.toString() +" as resource.");
}

Expand Down
264 changes: 132 additions & 132 deletions mllib-dal/src/main/native/CorrelationDALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,117 +31,118 @@ using namespace daal::algorithms;

typedef double algorithmFPType; /* Algorithm floating-point type */

static void correlation_compute(JNIEnv *env,
jobject obj,
int rankId,
ccl::communicator &comm,
const NumericTablePtr &pData,
size_t nBlocks,
jobject resultObj) {
using daal::byte;
auto t1 = std::chrono::high_resolution_clock::now();

const bool isRoot = (rankId == ccl_root);

covariance::Distributed<step1Local, algorithmFPType> localAlgorithm;

/* Set the input data set to the algorithm */
localAlgorithm.input.set(covariance::data, pData);

/* Compute covariance */
localAlgorithm.compute();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correleation (native): local step took " << duration << " secs"
<< std::endl;

t1 = std::chrono::high_resolution_clock::now();

/* Serialize partial results required by step 2 */
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize(dataArch);
size_t perNodeArchLength = dataArch.getSizeOfArchive();

serializedData =
services::SharedPtr<byte>(new byte[perNodeArchLength * nBlocks]);

byte *nodeResults = new byte[perNodeArchLength];
dataArch.copyArchiveToArray(nodeResults, perNodeArchLength);
std::vector<size_t> aReceiveCount(comm.size(),
perNodeArchLength); // 4 x "14016"

/* Transfer partial results to step 2 on the root node */
ccl::gather((int8_t *)nodeResults, perNodeArchLength,
(int8_t *)(serializedData.get()), perNodeArchLength, comm)
.wait();
t2 = std::chrono::high_resolution_clock::now();

duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correleation (native): ccl_allgatherv took " << duration << " secs"
<< std::endl;
if (isRoot) {
auto t1 = std::chrono::high_resolution_clock::now();
/* Create an algorithm to compute covariance on the master node */
covariance::Distributed<step2Master, algorithmFPType> masterAlgorithm;

for (size_t i = 0; i < nBlocks; i++) {
/* Deserialize partial results from step 1 */
OutputDataArchive dataArch(serializedData.get() +
perNodeArchLength * i,
perNodeArchLength);

covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);

/* Set local partial results as input for the master-node algorithm
*/
masterAlgorithm.input.add(covariance::partialResults,
dataForStep2FromStep1);
}

/* Set the parameter to choose the type of the output matrix */
masterAlgorithm.parameter.outputMatrixType = covariance::correlationMatrix;

/* Merge and finalizeCompute covariance decomposition on the master node */
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();

/* Retrieve the algorithm results */
covariance::ResultPtr result = masterAlgorithm.getResult();
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correlation (native): master step took " << duration << " secs"
<< std::endl;

/* Print the results */
printNumericTable(result->get(covariance::correlation),
"Correlation first 20 columns of "
"correlation matrix:",
1, 20);
static void correlation_compute(JNIEnv *env, jobject obj, int rankId,
ccl::communicator &comm,
const NumericTablePtr &pData, size_t nBlocks,
jobject resultObj) {
using daal::byte;
auto t1 = std::chrono::high_resolution_clock::now();

const bool isRoot = (rankId == ccl_root);

covariance::Distributed<step1Local, algorithmFPType> localAlgorithm;

/* Set the input data set to the algorithm */
localAlgorithm.input.set(covariance::data, pData);

/* Compute covariance */
localAlgorithm.compute();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correleation (native): local step took " << duration
<< " secs" << std::endl;

t1 = std::chrono::high_resolution_clock::now();

/* Serialize partial results required by step 2 */
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize(dataArch);
size_t perNodeArchLength = dataArch.getSizeOfArchive();

serializedData =
services::SharedPtr<byte>(new byte[perNodeArchLength * nBlocks]);

byte *nodeResults = new byte[perNodeArchLength];
dataArch.copyArchiveToArray(nodeResults, perNodeArchLength);
std::vector<size_t> aReceiveCount(comm.size(),
perNodeArchLength); // 4 x "14016"

/* Transfer partial results to step 2 on the root node */
ccl::gather((int8_t *)nodeResults, perNodeArchLength,
(int8_t *)(serializedData.get()), perNodeArchLength, comm)
.wait();
t2 = std::chrono::high_resolution_clock::now();

duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correleation (native): ccl_allgatherv took " << duration
<< " secs" << std::endl;
if (isRoot) {
auto t1 = std::chrono::high_resolution_clock::now();
/* Create an algorithm to compute covariance on the master node */
covariance::Distributed<step2Master, algorithmFPType> masterAlgorithm;

for (size_t i = 0; i < nBlocks; i++) {
/* Deserialize partial results from step 1 */
OutputDataArchive dataArch(serializedData.get() +
perNodeArchLength * i,
perNodeArchLength);

covariance::PartialResultPtr dataForStep2FromStep1(
new covariance::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);

/* Set local partial results as input for the master-node algorithm
*/
masterAlgorithm.input.add(covariance::partialResults,
dataForStep2FromStep1);
}

/* Set the parameter to choose the type of the output matrix */
masterAlgorithm.parameter.outputMatrixType =
covariance::correlationMatrix;

/* Merge and finalizeCompute covariance decomposition on the master node
*/
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();

/* Retrieve the algorithm results */
covariance::ResultPtr result = masterAlgorithm.getResult();
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correlation (native): master step took " << duration
<< " secs" << std::endl;

/* Print the results */
printNumericTable(result->get(covariance::correlation),
"Correlation first 20 columns of "
"correlation matrix:",
1, 20);
// Return all covariance & mean
jclass clazz = env->GetObjectClass(resultObj);

// Get Field references
jfieldID correlationNumericTableField =
env->GetFieldID(clazz, "correlationNumericTable", "J");

NumericTablePtr *correlation =
new NumericTablePtr(result->get(covariance::correlation));
// Get Field references
jfieldID correlationNumericTableField =
env->GetFieldID(clazz, "correlationNumericTable", "J");

env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation);
NumericTablePtr *correlation =
new NumericTablePtr(result->get(covariance::correlation));

}
env->SetLongField(resultObj, correlationNumericTableField,
(jlong)correlation);
}
}

JNIEXPORT jlong JNICALL
Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
JNIEnv *env, jobject obj, jlong pNumTabData,
jint executor_num, jint executor_cores, jboolean use_gpu, jintArray gpu_idx_array, jobject resultObj) {
JNIEnv *env, jobject obj, jlong pNumTabData, jint executor_num,
jint executor_cores, jboolean use_gpu, jintArray gpu_idx_array,
jobject resultObj) {

ccl::communicator &comm = getComm();
size_t rankId = comm.rank();
Expand All @@ -150,43 +151,42 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(

NumericTablePtr pData = *((NumericTablePtr *)pNumTabData);

#ifdef CPU_GPU_PROFILE
#ifdef CPU_GPU_PROFILE

if (use_gpu) {
int n_gpu = env->GetArrayLength(gpu_idx_array);
cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)"
<< endl;
if (use_gpu) {
int n_gpu = env->GetArrayLength(gpu_idx_array);
cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)"
<< endl;

jint *gpu_indices = env->GetIntArrayElements(gpu_idx_array, 0);
jint *gpu_indices = env->GetIntArrayElements(gpu_idx_array, 0);

int size = comm.size();
auto assigned_gpu =
getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu);
int size = comm.size();
auto assigned_gpu =
getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu);

// Set SYCL context
cl::sycl::queue queue(assigned_gpu);
daal::services::SyclExecutionContext ctx(queue);
daal::services::Environment::getInstance()->setDefaultExecutionContext(
ctx);
// Set SYCL context
cl::sycl::queue queue(assigned_gpu);
daal::services::SyclExecutionContext ctx(queue);
daal::services::Environment::getInstance()->setDefaultExecutionContext(
ctx);

correlation_compute(
env, obj, rankId, comm, pData, nBlocks, resultObj);
correlation_compute(env, obj, rankId, comm, pData, nBlocks, resultObj);

env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0);
} else
#endif
{
// Set number of threads for oneDAL to use for each rank
services::Environment::getInstance()->setNumberOfThreads(executor_cores);
env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0);
} else
#endif
{
// Set number of threads for oneDAL to use for each rank
services::Environment::getInstance()->setNumberOfThreads(
executor_cores);

int nThreadsNew =
services::Environment::getInstance()->getNumberOfThreads();
cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew
<< endl;
int nThreadsNew =
services::Environment::getInstance()->getNumberOfThreads();
cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew
<< endl;

correlation_compute(
env, obj, rankId, comm, pData, nBlocks, resultObj);
}
correlation_compute(env, obj, rankId, comm, pData, nBlocks, resultObj);
}

return 0;
return 0;
}
12 changes: 7 additions & 5 deletions mllib-dal/src/main/native/KMeansDALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,18 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansDALComputeWithInitCente
using daal::data_management::internal::convertToSyclHomogen;

Status st;
NumericTablePtr pSyclHomogen = convertToSyclHomogen<algorithmFPType>(*pData, st);
NumericTablePtr pSyclHomogen =
convertToSyclHomogen<algorithmFPType>(*pData, st);
if (!st.ok()) {
std::cout << "Failed to convert row merged table to SYCL homogen one"
<< std::endl;
std::cout
<< "Failed to convert row merged table to SYCL homogen one"
<< std::endl;
return 0L;
}

ret = doKMeansDALComputeWithInitCenters(
env, obj, rankId, comm, pSyclHomogen, centroids, cluster_num, tolerance,
iteration_num, executor_num, resultObj);
env, obj, rankId, comm, pSyclHomogen, centroids, cluster_num,
tolerance, iteration_num, executor_num, resultObj);

env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0);
} else
Expand Down
10 changes: 4 additions & 6 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
return 1;
}

JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(
JNIEnv *env, jobject obj) {
JNIEXPORT void JNICALL
Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(JNIEnv *env, jobject obj) {

g_comms.pop_back();

Expand Down Expand Up @@ -176,10 +176,8 @@ static bool is_valid_ip(char ip[]) {
return false;
}

JNIEXPORT jint JNICALL
Java_com_intel_oap_mllib_OneCCL_00024_c_1getAvailPort(JNIEnv *env,
jobject obj,
jstring localIP) {
JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1getAvailPort(
JNIEnv *env, jobject obj, jstring localIP) {

// start from beginning of dynamic port
const int port_start_base = 3000;
Expand Down
Loading