Skip to content

Commit

Permalink
[ML-290] Add logger to improve messages output for C++ only (#358)
Browse files Browse the repository at this point in the history
* update spark to 3.3.3

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* First commit

* Logger tmp

* ALS done

* Correlation tmp

* Add table output

* Use looger function to print table

* Correlation done

* Add println

* Fix typo

* HomogenTable done

* Summarizer done

* Kmeans done

* KMeans done

* error_handling done

* DFR and NB

* GPU done

* LR done

* PCA and DF classifier done

* table

* Format cpp

* All done

* Format

* Replace print()

* Move print(table to service

* Format

* Change pringf

* Remove spaces

* Make all error print in stderr

* Add switch

* Code style

* Recover

* Add environment control

* Format

* Clean up

* Fix other printf, fix new line bug

* Format

* Update run-gpu.sh

* Add level

* Format

* Remove variable

* Fix typo

* Add flush

* Fix device call host function

* Format

* OneDAL rename

* fix typo

* Fix typo

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* Update ALSShuffle.cpp

* Update ALSDALImpl.cpp

* Update ALSDALImpl.cpp

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

---------

Signed-off-by: minmingzhu <minming.zhu@intel.com>
Co-authored-by: kunpeng <kunpeng.jiang@intel.com>
Co-authored-by: Xiaochang Wu <xiaochang.wu@intel.com>
  • Loading branch information
3 people authored Oct 9, 2023
1 parent 67747d0 commit d019fc1
Show file tree
Hide file tree
Showing 27 changed files with 608 additions and 385 deletions.
65 changes: 36 additions & 29 deletions mllib-dal/src/main/native/ALSDALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

#include "ALSShuffle.h"

#include "Logger.h"

using namespace std;
using namespace daal;
using namespace daal::algorithms;
Expand Down Expand Up @@ -210,7 +212,7 @@ void initializeStep2Local(

void initializeModel(size_t rankId, ccl::communicator &comm, size_t partitionId,
size_t nBlocks, size_t nUsers, size_t nFactors) {
std::cout << "ALS (native): initializeModel " << std::endl;
logger::println(logger::INFO, "ALS (native): initializeModel");

auto t1 = std::chrono::high_resolution_clock::now();

Expand All @@ -229,8 +231,8 @@ void initializeModel(size_t rankId, ccl::communicator &comm, size_t partitionId,
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "ALS (native): initializeModel took " << duration << " secs"
<< std::endl;
logger::println(logger::INFO, "ALS (native): initializeModel took %d secs",
duration);
}

training::DistributedPartialResultStep1Ptr computeStep1Local(
Expand Down Expand Up @@ -312,7 +314,7 @@ computeStep4Local(const CSRNumericTablePtr &dataTable,

void trainModel(size_t rankId, ccl::communicator &comm, size_t partitionId,
size_t nBlocks, size_t nFactors, size_t maxIterations) {
std::cout << "ALS (native): trainModel" << std::endl;
logger::println(logger::INFO, "ALS (native): trainModel");

auto tStart = std::chrono::high_resolution_clock::now();

Expand All @@ -338,7 +340,7 @@ void trainModel(size_t rankId, ccl::communicator &comm, size_t partitionId,

serializeDAALObject(step1LocalResult.get(), nodeResults);

/* Gathering step1LocalResult on the master */
// Gathering step1LocalResult on the master
gather(rankId, comm, nBlocks, nodeResults, step1LocalResultsOnMaster);

if (rankId == ccl_root) {
Expand Down Expand Up @@ -381,7 +383,7 @@ void trainModel(size_t rankId, ccl::communicator &comm, size_t partitionId,

serializeDAALObject(step1LocalResult.get(), nodeResults);

/* Gathering step1LocalResult on the master */
// Gathering step1LocalResult on the master
gather(rankId, comm, nBlocks, nodeResults, step1LocalResultsOnMaster);

if (rankId == ccl_root) {
Expand Down Expand Up @@ -409,7 +411,7 @@ void trainModel(size_t rankId, ccl::communicator &comm, size_t partitionId,
step3LocalResult = computeStep3Local(
userOffset, usersPartialResultLocal, userStep3LocalInput, nFactors);

/* MPI_Alltoallv to populate step4LocalInput */
// all2all to populate step4LocalInput
for (size_t i = 0; i < nBlocks; i++) {
serializeDAALObject((*step3LocalResult)[i].get(), nodeCPs[i]);
}
Expand All @@ -421,15 +423,15 @@ void trainModel(size_t rankId, ccl::communicator &comm, size_t partitionId,
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "ALS (native): iteration " << iteration << " took "
<< duration << " secs" << std::endl;
logger::println(logger::INFO, "ALS (native): iteration %d took %f secs",
iteration, duration);
}

auto tEnd = std::chrono::high_resolution_clock::now();
auto durationTotal =
std::chrono::duration_cast<std::chrono::seconds>(tEnd - tStart).count();
std::cout << "ALS (native): trainModel took " << durationTotal << " secs"
<< std::endl;
logger::println(logger::INFO, "ALS (native): trainModel took %d secs",
durationTotal);
}

static size_t getOffsetFromOffsetTable(NumericTablePtr offsetTable) {
Expand All @@ -446,8 +448,7 @@ JNIEXPORT jobject JNICALL
Java_com_intel_oap_mllib_recommendation_ALSDALImpl_cShuffleData(
JNIEnv *env, jobject obj, jobject dataBuffer, jint nTotalKeys, jint nBlocks,
jobject infoObj) {
// cout << "cShuffleData: rank " << rankId << endl;
cout << "RATING_SIZE: " << RATING_SIZE << endl;
logger::println(logger::INFO, "RATING_SIZE: %d", RATING_SIZE);

ccl::communicator &comm = getComm();

Expand Down Expand Up @@ -491,19 +492,23 @@ Java_com_intel_oap_mllib_recommendation_ALSDALImpl_cDALImplictALS(

dataTable = *((CSRNumericTablePtr *)numTableAddr);

cout << "ALS (native): Input info: " << endl;
cout << "- NumberOfRows: " << dataTable->getNumberOfRows() << endl;
cout << "- NumberOfColumns: " << dataTable->getNumberOfColumns() << endl;
cout << "- NumberOfRatings: " << dataTable->getDataSize() << endl;
cout << "- fullNUsers: " << nUsers << endl;
cout << "- nFactors: " << nFactors << endl;

// Set number of threads for oneDAL to use for each rank
logger::println(logger::INFO, "ALS (native): Input info:");
logger::println(logger::INFO, "- NumberOfRows: %d",
dataTable->getNumberOfRows());
logger::println(logger::INFO, "- NumberOfColumns: %d",
dataTable->getNumberOfColumns());
logger::println(logger::INFO, "- NumberOfRatings: %d",
dataTable->getDataSize());
logger::println(logger::INFO, "- fullNUsers: %d", nUsers);
logger::println(logger::INFO, "- nFactors: %d", nFactors);

// Set number of threads for OneDAL to use for each rank
services::Environment::getInstance()->setNumberOfThreads(executor_cores);
int nThreadsNew =
services::Environment::getInstance()->getNumberOfThreads();
cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew
<< endl;
logger::println(logger::INFO,
"OneDAL (native): Number of CPU threads used: %d",
nThreadsNew);

int nBlocks = executor_num;
initializeModel(rankId, comm, partitionId, nBlocks, nUsers, nFactors);
Expand All @@ -514,16 +519,18 @@ Java_com_intel_oap_mllib_recommendation_ALSDALImpl_cDALImplictALS(
auto pItem = itemsPartialResultLocal->get(training::outputOfStep4ForStep1)
->getFactors();

std::cout << "\n=== Results for Rank " << rankId << "===\n" << std::endl;
logger::println(logger::INFO, "");
logger::println(logger::INFO, "=== Results for Rank %d ===", rankId);
logger::println(logger::INFO, "");
printNumericTable(pUser, "User Factors (first 10 rows x 20 columns):", 10,
20);
printNumericTable(pItem, "Item Factors (first 10 rows x 20 columns):", 10,
20);
std::cout << "User Offset: " << getOffsetFromOffsetTable(userOffset)
<< std::endl;
std::cout << "Item Offset: " << getOffsetFromOffsetTable(itemOffset)
<< std::endl;
std::cout << std::endl;
logger::println(logger::INFO, "User Offset: %d",
getOffsetFromOffsetTable(userOffset));
logger::println(logger::INFO, "Item Offset: %d",
getOffsetFromOffsetTable(itemOffset));
logger::println(logger::INFO, "");

// Get the class of the input object
jclass clazz = env->GetObjectClass(resultObj);
Expand Down
19 changes: 5 additions & 14 deletions mllib-dal/src/main/native/ALSShuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include "ALSShuffle.h"

#include "Logger.h"

using namespace std;

std::vector<Rating> recvData;
Expand Down Expand Up @@ -72,11 +74,9 @@ Rating *shuffle_all2all(ccl::communicator &comm,
// Calculate send buffer size
for (size_t i = 0; i < nBlocks; i++) {
perNodeSendLens[i] = partitions[i].size() * RATING_SIZE;
// cout << "rank " << rankId << " Send partition " << i << " size " <<
// perNodeSendLens[i] << endl;
sendBufSize += perNodeSendLens[i];
}
cout << "sendData size " << sendBufSize << endl;
logger::println(logger::INFO, "sendData size %d", sendBufSize);
sendData.resize(sendBufSize);

// Fill in send buffer
Expand All @@ -94,8 +94,6 @@ Rating *shuffle_all2all(ccl::communicator &comm,

// Calculate recv buffer size
for (size_t i = 0; i < nBlocks; i++) {
// cout << "rank " << rankId << " Recv partition " << i << " size " <<
// perNodeRecvLens[i] << endl;
recvBufSize += perNodeRecvLens[i];
}

Expand All @@ -109,18 +107,11 @@ Rating *shuffle_all2all(ccl::communicator &comm,

sort(recvData.begin(), recvData.end(), compareRatingByUser);

// for (auto r : recvData) {
// cout << r.user << " " << r.item << " " << r.rating << endl;
// }

newRatingsNum = recvData.size();
// RatingPartition::iterator iter = std::unique(recvData.begin(),
// recvData.end(), compareRatingUserEquality); newCsrRowNum =
// std::distance(recvData.begin(), iter);
newCsrRowNum = distinct_count(recvData);

cout << "newRatingsNum: " << newRatingsNum
<< " newCsrRowNum: " << newCsrRowNum << endl;
logger::println(logger::INFO, "newRatingsNum: %d, newCsrRowNum: %d",
newRatingsNum, newCsrRowNum);

return recvData.data();
}
1 change: 0 additions & 1 deletion mllib-dal/src/main/native/Common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@

#include "GPU.h"
#include "Communicator.hpp"
#include "OutputHelpers.hpp"
#include "oneapi/dal/table/homogen.hpp"
55 changes: 33 additions & 22 deletions mllib-dal/src/main/native/CorrelationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "com_intel_oap_mllib_stat_CorrelationDALImpl.h"
#include "service.h"

#include "Logger.h"

using namespace std;
#ifdef CPU_GPU_PROFILE
namespace covariance_gpu = oneapi::dal::covariance;
Expand Down Expand Up @@ -53,8 +55,9 @@ static void doCorrelationDaalCompute(JNIEnv *env, jobject obj, size_t rankId,
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
std::cout << "Correleation (native): local step took " << duration / 1000
<< " secs" << std::endl;
logger::println(logger::INFO,
"Correleation (native): local step took %d secs",
duration / 1000);

t1 = std::chrono::high_resolution_clock::now();

Expand All @@ -80,8 +83,9 @@ static void doCorrelationDaalCompute(JNIEnv *env, jobject obj, size_t rankId,

duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
std::cout << "Correleation (native): ccl_allgatherv took "
<< duration / 1000 << " secs" << std::endl;
logger::println(logger::INFO,
"Correleation (native): ccl_allgatherv took %d secs",
duration / 1000);
if (isRoot) {
auto t1 = std::chrono::high_resolution_clock::now();
/* Create an algorithm to compute covariance on the master node */
Expand Down Expand Up @@ -119,8 +123,9 @@ static void doCorrelationDaalCompute(JNIEnv *env, jobject obj, size_t rankId,
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
std::cout << "Correlation (native): master step took "
<< duration / 1000 << " secs" << std::endl;
logger::println(logger::INFO,
"Correleation (native): master step took %d secs",
duration / 1000);

/* Print the results */
printNumericTable(result->get(covariance_cpu::correlation),
Expand All @@ -147,7 +152,7 @@ static void doCorrelationOneAPICompute(
JNIEnv *env, jlong pNumTabData,
preview::spmd::communicator<preview::spmd::device_memory_access::usm> comm,
jobject resultObj) {
std::cout << "oneDAL (native): GPU compute start" << std::endl;
logger::println(logger::INFO, "oneDAL (native): GPU compute start");
const bool isRoot = (comm.get_rank() == ccl_root);
homogen_table htable =
*reinterpret_cast<const homogen_table *>(pNumTabData);
Expand All @@ -159,15 +164,18 @@ static void doCorrelationOneAPICompute(
auto t1 = std::chrono::high_resolution_clock::now();
const auto result_train = preview::compute(comm, cor_desc, htable);
if (isRoot) {
std::cout << "Mean:\n" << result_train.get_means() << std::endl;
std::cout << "Correlation:\n"
<< result_train.get_cor_matrix() << std::endl;
logger::println(logger::INFO, "Mean:");
printHomegenTable(result_train.get_means());
logger::println(logger::INFO, "Correlation:");
printHomegenTable(result_train.get_cor_matrix());
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
std::cout << "Correlation batch(native): computing step took "
<< duration / 1000 << " secs." << std::endl;
logger::println(
logger::INFO,
"Correlation batch(native): computing step took %d secs.",
duration / 1000);
// Return all covariance & mean
jclass clazz = env->GetObjectClass(resultObj);

Expand All @@ -190,9 +198,10 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
JNIEnv *env, jobject obj, jlong pNumTabData, jint executorNum,
jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray,
jobject resultObj) {
std::cout << "oneDAL (native): use DPC++ kernels "
<< "; device " << ComputeDeviceString[computeDeviceOrdinal]
<< std::endl;
logger::println(logger::INFO,
"oneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());

ccl::communicator &cclComm = getComm();
int rankId = cclComm.rank();
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
Expand All @@ -205,18 +214,20 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(

int nThreadsNew =
services::Environment::getInstance()->getNumberOfThreads();
std::cout << "oneDAL (native): Number of CPU threads used"
<< nThreadsNew << std::endl;
logger::println(logger::INFO,
"oneDAL (native): Number of CPU threads used %d",
nThreadsNew);
doCorrelationDaalCompute(env, obj, rankId, cclComm, pData, executorNum,
resultObj);
break;
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
std::cout << "oneDAL (native): use GPU kernels with " << nGpu
<< " GPU(s)"
<< " rankid " << rankId << std::endl;
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rankId);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

Expand All @@ -235,8 +246,8 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
}
#endif
default: {
std::cout << "no supported device!" << std::endl;
exit(-1);
deviceError("Correlation",
ComputeDeviceString[computeDeviceOrdinal].c_str());
}
}
return 0;
Expand Down
Loading

0 comments on commit d019fc1

Please sign in to comment.