Skip to content

Commit

Permalink
Merge branch 'oap-project:master' into update-build
Browse files Browse the repository at this point in the history
  • Loading branch information
xwu99 authored May 31, 2023
2 parents 04ebf40 + cc7198f commit 5826a9f
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 733 deletions.
3 changes: 3 additions & 0 deletions mllib-dal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<systemProperties>
<computeDevice>${computeDevice}</computeDevice>
</systemProperties>
<filereports>test-reports.txt</filereports>
</configuration>
<executions>
Expand Down
52 changes: 30 additions & 22 deletions mllib-dal/src/main/native/LinearRegressionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ namespace ridge_regression_cpu = daal::algorithms::ridge_regression;

typedef double algorithmFPType; /* Algorithm floating-point type */

static NumericTablePtr linear_regression_compute(size_t rankId,
ccl::communicator &comm,
const NumericTablePtr &pData,
const NumericTablePtr &pLabel,
size_t nBlocks) {
static NumericTablePtr linear_regression_compute(
size_t rankId, ccl::communicator &comm, const NumericTablePtr &pData,
const NumericTablePtr &pLabel, bool fitIntercept, size_t nBlocks) {
using daal::byte;

linear_regression_cpu::training::Distributed<step1Local> localAlgorithm;
Expand All @@ -54,6 +52,7 @@ static NumericTablePtr linear_regression_compute(size_t rankId,
localAlgorithm.input.set(linear_regression_cpu::training::data, pData);
localAlgorithm.input.set(
linear_regression_cpu::training::dependentVariables, pLabel);
localAlgorithm.parameter.interceptFlag = fitIntercept;

/* Train the multiple linear regression model on local nodes */
localAlgorithm.compute();
Expand Down Expand Up @@ -107,6 +106,7 @@ static NumericTablePtr linear_regression_compute(size_t rankId,

/* Merge and finalizeCompute the multiple linear regression model on the
* master node */
masterAlgorithm.parameter.interceptFlag = fitIntercept;
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();

Expand All @@ -125,17 +125,19 @@ static NumericTablePtr linear_regression_compute(size_t rankId,
return resultTable;
}

static NumericTablePtr ridge_regression_compute(
size_t rankId, ccl::communicator &comm, const NumericTablePtr &pData,
const NumericTablePtr &pLabel, double regParam, size_t nBlocks) {
static NumericTablePtr
ridge_regression_compute(size_t rankId, ccl::communicator &comm,
const NumericTablePtr &pData,
const NumericTablePtr &pLabel, bool fitIntercept,
double regParam, size_t nBlocks) {

using daal::byte;

NumericTablePtr ridgeParams(new HomogenNumericTable<double>(
1, 1, NumericTable::doAllocate, regParam));

ridge_regression_cpu::training::Distributed<step1Local> localAlgorithm;
localAlgorithm.parameter.ridgeParameters = ridgeParams;
localAlgorithm.parameter.interceptFlag = fitIntercept;

/* Pass a training data set and dependent values to the algorithm */
localAlgorithm.input.set(ridge_regression_cpu::training::data, pData);
Expand Down Expand Up @@ -195,6 +197,7 @@ static NumericTablePtr ridge_regression_compute(

/* Merge and finalizeCompute the multiple ridge regression model on the
* master node */
masterAlgorithm.parameter.interceptFlag = fitIntercept;
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();

Expand All @@ -215,11 +218,13 @@ static NumericTablePtr ridge_regression_compute(
#ifdef CPU_GPU_PROFILE
static jlong doLROneAPICompute(JNIEnv *env, size_t rankId,
ccl::communicator &cclComm, sycl::queue &queue,
jlong pData, jlong pLabel, jint executorNum,
jlong pData, jlong pLabel,
jboolean jfitIntercept, jint executorNum,
jobject resultObj) {
std::cout << "oneDAL (native): GPU compute start , rankid " << rankId
<< std::endl;
const bool isRoot = (rankId == ccl_root);
bool fitIntercept = bool(jfitIntercept);

int size = cclComm.size();
ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
Expand All @@ -230,7 +235,8 @@ static jlong doLROneAPICompute(JNIEnv *env, size_t rankId,
homogen_table ytrain = *reinterpret_cast<const homogen_table *>(pLabel);

linear_regression_gpu::train_input local_input{xtrain, ytrain};
const auto linear_regression_desc = linear_regression_gpu::descriptor<>();
const auto linear_regression_desc =
linear_regression_gpu::descriptor<>(fitIntercept);

linear_regression_gpu::train_result result_train =
preview::train(comm, linear_regression_desc, xtrain, ytrain);
Expand All @@ -248,14 +254,14 @@ static jlong doLROneAPICompute(JNIEnv *env, size_t rankId,
/*
* Class: com_intel_oap_mllib_regression_LinearRegressionDALImpl
* Method: cLinearRegressionTrainDAL
* Signature:
* (JJDDIIIILjava/lang/String;Lcom/intel/oap/mllib/regression/LiRResult;)J
* Signature: (JJZDDIII[ILcom/intel/oap/mllib/regression/LiRResult;)J
*/
JNIEXPORT jlong JNICALL
Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTrainDAL(
JNIEnv *env, jobject obj, jlong data, jlong label, jdouble regParam,
jdouble elasticNetParam, jint executorNum, jint executorCores,
jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) {
JNIEnv *env, jobject obj, jlong data, jlong label, jboolean fitIntercept,
jdouble regParam, jdouble elasticNetParam, jint executorNum,
jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray,
jobject resultObj) {

std::cout << "oneDAL (native): use DPC++ kernels "
<< "; device " << ComputeDeviceString[computeDeviceOrdinal]
Expand Down Expand Up @@ -284,8 +290,9 @@ Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTra

jlong pDatagpu = (jlong)data;
jlong pLabelgpu = (jlong)label;
resultptr = doLROneAPICompute(env, rankId, cclComm, queue, pDatagpu,
pLabelgpu, executorNum, resultObj);
resultptr =
doLROneAPICompute(env, rankId, cclComm, queue, pDatagpu, pLabelgpu,
fitIntercept, executorNum, resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
#endif
} else {
Expand All @@ -300,11 +307,12 @@ Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTra
cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew
<< endl;
if (regParam == 0) {
resultTable = linear_regression_compute(rankId, cclComm, pData,
pLabel, executorNum);
resultTable = linear_regression_compute(
rankId, cclComm, pData, pLabel, fitIntercept, executorNum);
} else {
resultTable = ridge_regression_compute(
rankId, cclComm, pData, pLabel, regParam, executorNum);
resultTable =
ridge_regression_compute(rankId, cclComm, pData, pLabel,
fitIntercept, regParam, executorNum);
}

NumericTablePtr *coeffvectors = new NumericTablePtr(resultTable);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)

val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false)

val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd)

val labeledPointsTables = if (useDevice == "GPU") {
if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.rddLabeledPointToMergedHomogenTables(labeledPoints, labelCol, featuresCol, executorNum, computeDevice)
} else {
val msg = s"OAPMLlib: Sparse table is not supported for gpu now."
//todo sparse table is not supported

val msg = s"OAP MLlib: Sparse table is not supported for GPU now."
logError(msg)
throw new SparkException(msg)
}
Expand All @@ -94,22 +94,36 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
}
}

// OAP MLlib: Only normal linear regression is supported for GPU currently
if (useDevice == "GPU" && regParam != 0){
val msg = s"OAP MLlib: Regularization parameter is not supported for GPU now."
logError(msg)
throw new SparkException(msg)
}

val results = labeledPointsTables.mapPartitionsWithIndex {
case (rank: Int, tables: Iterator[(Long, Long)]) =>
val (featureTabAddr, lableTabAddr) = tables.next()
OneCCL.init(executorNum, rank, kvsIPPort)
val result = new LiRResult()

val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
// OAP MLlib: This ia a hack for unit test, and will be repaireid in the future.
// GPU info can't be detected automatically in UT enviroment.
if (isTest) {
Array(0)
} else {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
}
} else {
null
}

val cbeta = cLinearRegressionTrainDAL(
featureTabAddr,
lableTabAddr,
fitIntercept,
regParam,
elasticNetParam,
executorNum,
Expand Down Expand Up @@ -149,6 +163,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
// Single entry to call Linear Regression DAL backend with parameters
@native private def cLinearRegressionTrainDAL(data: Long,
label: Long,
fitIntercept: Boolean,
regParam: Double,
elasticNetParam: Double,
executorNum: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,9 @@ class LinearRegression @Since("1.3") (@Since("1.3.0") override val uid: String)
private def trainWithNormal(
dataset: Dataset[_],
instr: Instrumentation): LinearRegressionModel = {
// oneDAL only support simple linear regression and ridge regression
val paramSupported = ($(regParam) == 0) || ($(regParam) != 0 && $(elasticNetParam) == 0)
val paramSupported = ($(regParam) == 0) && (!isDefined(weightCol) || getWeightCol.isEmpty)
val sparkContext = dataset.sparkSession.sparkContext
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val isPlatformSupported = Utils.checkClusterPlatformCompatibility(
dataset.sparkSession.sparkContext)
if (paramSupported && Utils.isOAPEnabled && isPlatformSupported) {
Expand Down Expand Up @@ -489,6 +490,13 @@ class LinearRegression @Since("1.3") (@Since("1.3.0") override val uid: String)
} else {
// For low dimensional data, WeightedLeastSquares is more efficient since the
// training algorithm only requires one pass through the data. (SPARK-10668)
val fallbackmsg = s"OAP MLlib: Fall back to vanilla Spark MLlib"
logError(fallbackmsg)
if (!paramSupported && useDevice == "GPU") {
val msg = s"OAP MLlib: Parameter used is not supported for GPU now."
logError(msg)
throw new SparkException(msg)
}

val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam),
elasticNetParam = $(elasticNetParam), $(standardization), true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
private def trainWithNormal(
dataset: Dataset[_],
instr: Instrumentation): LinearRegressionModel = {
// oneDAL only support simple linear regression and ridge regression
val paramSupported = ($(regParam) == 0) || ($(regParam) != 0 && $(elasticNetParam) == 0)
val paramSupported = ($(regParam) == 0) && (!isDefined(weightCol) || getWeightCol.isEmpty)
val sparkContext = dataset.sparkSession.sparkContext
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val isPlatformSupported = Utils.checkClusterPlatformCompatibility(
dataset.sparkSession.sparkContext)
if (paramSupported && Utils.isOAPEnabled && isPlatformSupported) {
Expand Down Expand Up @@ -488,6 +489,13 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
} else {
// For low dimensional data, WeightedLeastSquares is more efficient since the
// training algorithm only requires one pass through the data. (SPARK-10668)
val fallbackmsg = s"OAP MLlib: Fall back to vanilla Spark MLlib"
logError(fallbackmsg)
if (!paramSupported && useDevice == "GPU") {
val msg = s"OAP MLlib: Parameter used is not supported for GPU now."
logError(msg)
throw new SparkException(msg)
}

val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam),
elasticNetParam = $(elasticNetParam), $(standardization), true,
Expand Down
Loading

0 comments on commit 5826a9f

Please sign in to comment.