Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
Signed-off-by: minmingzhu <minming.zhu@intel.com>
  • Loading branch information
minmingzhu committed Oct 24, 2023
1 parent 7829b0c commit 83feaee
Show file tree
Hide file tree
Showing 17 changed files with 103 additions and 112 deletions.
24 changes: 12 additions & 12 deletions examples/python/pca-pyspark/pca-pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,29 @@

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.ml.functions import array_to_vector
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
spark = SparkSession.builder.master("local") \
.appName("PCAExample").getOrCreate()

if (len(sys.argv) != 2) :
print("bin/spark-submit pca-pyspark.py <data_set.csv>")
sys.exit(1)

input = spark.read.load(sys.argv[1], format="csv", inferSchema="true", header="false")
input = spark.read.parquet(sys.argv[1]).toDF("features")
# input = spark.read.load(sys.argv[1], format="csv", inferSchema="true", header="false")

assembler = VectorAssembler(
inputCols=input.columns,
outputCol="features")

dataset = assembler.transform(input)
dataset.show()
input = input.select(split(col("features"), ",").alias("features"))
input = input.withColumn("features", input.features.cast("array<double>"))
input = input.withColumn("features", array_to_vector("features"))
input.show()
input.printSchema()

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(dataset)
model = pca.fit(input)

print("Principal Components: ", model.pc, sep='\n')
print("Explained Variance: ", model.explainedVariance, sep='\n')
Expand Down
34 changes: 34 additions & 0 deletions mllib-dal/src/main/native/Common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*******************************************************************************
* Copyright 2020 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/

#pragma once

#include "Common.hpp"
#include "Logger.h"
#include "error_handling.h"
#include "service.h"

HomogenTablePtr createHomogenTableWithArrayPtr(size_t pNumTabData,
size_t numRows, size_t numClos,
sycl::queue queue) {
double *htableArray = reinterpret_cast<double *>(pNumTabData);
auto data = sycl::malloc_shared<double>(numRows * numClos, queue);
queue.memcpy(data, htableArray, sizeof(double) * numRows * numClos).wait();
HomogenTablePtr tablePtr = std::make_shared<homogen_table>(
queue, data, numRows, numClos,
detail::make_default_delete<const double>(queue));
return tablePtr;
}
2 changes: 2 additions & 0 deletions mllib-dal/src/main/native/Common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
#include "GPU.h"
#include "Communicator.hpp"
#include "oneapi/dal/table/homogen.hpp"

HomogenTablePtr createHomogenTableWithArrayPtr(size_t pNumTabData, size_t numRows, size_t numClos, sycl::queue queue);
9 changes: 4 additions & 5 deletions mllib-dal/src/main/native/CorrelationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ static void doCorrelationOneAPICompute(
jobject resultObj, sycl::queue &queue) {
logger::println(logger::INFO, "oneDAL (native): GPU compute start");
const bool isRoot = (comm.get_rank() == ccl_root);
double *htableArray = reinterpret_cast<double *>(pNumTabData);
auto data = sycl::malloc_shared<double>(numRows * numClos, queue);
queue.memcpy(data, htableArray, sizeof(double) * numRows * numClos).wait();
homogen_table htable{queue, data, numRows, numClos,
detail::make_default_delete<const double>(queue)};
homogen_table htable = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabData, numRows, numClos,
comm.get_queue())
.get());

const auto cor_desc =
covariance_gpu::descriptor<GpuAlgorithmFPType>{}.set_result_options(
Expand Down
29 changes: 8 additions & 21 deletions mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,27 +219,14 @@ static jobject doRFClassifierOneAPICompute(
jobject resultObj, sycl::queue &queue) {
logger::println(logger::INFO, "oneDAL (native): GPU compute start");
const bool isRoot = (comm.get_rank() == ccl_root);
double *htableFeatureArray = reinterpret_cast<double *>(pNumTabFeature);
double *htableLabelArray = reinterpret_cast<double *>(pNumTabLabel);

auto featureData =
sycl::malloc_shared<double>(featureRows * featureCols, queue);
queue
.memcpy(featureData, htableFeatureArray,
sizeof(double) * featureRows * featureCols)
.wait();
homogen_table hFeaturetable{
queue, featureData, featureRows, featureCols,
detail::make_default_delete<const double>(queue)};

auto labelData =
sycl::malloc_shared<double>(featureRows * labelCols, queue);
queue
.memcpy(labelData, htableLabelArray,
sizeof(double) * featureRows * labelCols)
.wait();
homogen_table hLabeltable{queue, labelData, featureRows, labelCols,
detail::make_default_delete<const double>(queue)};
homogen_table hFeaturetable = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabFeature, featureRows, featureCols,
comm.get_queue())
.get());
homogen_table hLabeltable = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabLabel, featureRows, labelCols,
comm.get_queue())
.get());

const auto df_desc =
df::descriptor<GpuAlgorithmFPType, df::method::hist,
Expand Down
31 changes: 8 additions & 23 deletions mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,29 +216,14 @@ static jobject doRFRegressorOneAPICompute(
jobject resultObj, sycl::queue &queue) {
logger::println(logger::INFO, "OneDAL (native): GPU compute start");
const bool isRoot = (comm.get_rank() == ccl_root);
double *htableFeatureArray = reinterpret_cast<double *>(pNumTabFeature);
double *htableLabelArray = reinterpret_cast<double *>(pNumTabLabel);
auto featureData =
sycl::malloc_shared<double>(featureRows * featureCols, queue);
queue
.memcpy(featureData, htableFeatureArray,
sizeof(double) * featureRows * featureCols)
.wait();
homogen_table hFeaturetable{
queue, featureData, featureRows, featureCols,
detail::make_default_delete<const double>(queue)};

auto labelData =
sycl::malloc_shared<double>(featureRows * labelCols, queue);
queue
.memcpy(labelData, htableLabelArray,
sizeof(double) * featureRows * labelCols)
.wait();
homogen_table hLabeltable{queue, labelData, featureRows, labelCols,
detail::make_default_delete<const double>(queue)};
logger::println(logger::INFO,
"doRFRegressorOneAPICompute get_column_count = %d",
hFeaturetable.get_column_count());
homogen_table hFeaturetable = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabFeature, featureRows, featureCols,
comm.get_queue())
.get());
homogen_table hLabeltable = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabLabel, featureRows, labelCols,
comm.get_queue())
.get());
const auto df_desc =
df::descriptor<GpuAlgorithmFPType, df::method::hist,
df::task::regression>{}
Expand Down
14 changes: 6 additions & 8 deletions mllib-dal/src/main/native/KMeansImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,13 @@ static jlong doKMeansOneAPICompute(
JNIEnv *env, jlong pNumTabData, jlong numRows, jlong numClos,
jlong pNumTabCenters, jint clusterNum, jdouble tolerance, jint iterationNum,
preview::spmd::communicator<preview::spmd::device_memory_access::usm> comm,
jobject resultObj, sycl::queue &queue) {
jobject resultObj) {
logger::println(logger::INFO, "OneDAL (native): GPU compute start");
const bool isRoot = (comm.get_rank() == ccl_root);
double *htableArray = reinterpret_cast<double *>(pNumTabData);
auto data = sycl::malloc_shared<double>(numRows * numClos, queue);
queue.memcpy(data, htableArray, sizeof(double) * numRows * numClos).wait();
homogen_table htable{queue, data, numRows, numClos,
detail::make_default_delete<const double>(queue)};

homogen_table htable = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabData, numRows, numClos,
comm.get_queue())
.get());
homogen_table centroids =
*reinterpret_cast<const homogen_table *>(pNumTabCenters);
const auto kmeans_desc = kmeans_gpu::descriptor<GpuAlgorithmFPType>()
Expand Down Expand Up @@ -358,7 +356,7 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe
queue, size, rankId, kvs);
ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numClos,
pNumTabCenters, clusterNum, tolerance,
iterationNum, comm, resultObj, queue);
iterationNum, comm, resultObj);

env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
Expand Down
27 changes: 8 additions & 19 deletions mllib-dal/src/main/native/LinearRegressionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,25 +229,14 @@ static jlong doLROneAPICompute(JNIEnv *env, size_t rankId,
ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm = preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, size, rankId, kvs);
double *htableFeatureArray = reinterpret_cast<double *>(pNumTabFeature);
double *htableLabelArray = reinterpret_cast<double *>(pNumTabLabel);
auto featureData =
sycl::malloc_shared<double>(featureRows * featureCols, queue);
queue
.memcpy(featureData, htableFeatureArray,
sizeof(double) * featureRows * featureCols)
.wait();
homogen_table xtrain{queue, featureData, featureRows, featureCols,
detail::make_default_delete<const double>(queue)};

auto labelData =
sycl::malloc_shared<double>(featureRows * labelCols, queue);
queue
.memcpy(labelData, htableLabelArray,
sizeof(double) * featureRows * labelCols)
.wait();
homogen_table ytrain{queue, labelData, featureRows, labelCols,
detail::make_default_delete<const double>(queue)};
homogen_table xtrain = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabFeature, featureRows, featureCols,
comm.get_queue())
.get());
homogen_table ytrain = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabLabel, featureRows, labelCols,
comm.get_queue())
.get());

linear_regression_gpu::train_input local_input{xtrain, ytrain};
const auto linear_regression_desc =
Expand Down
2 changes: 2 additions & 0 deletions mllib-dal/src/main/native/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ else ifeq ($(PLATFORM_PROFILE),CPU_GPU_PROFILE)
endif

CPP_SRCS += \
./Common.cpp \
./service.cpp ./error_handling.cpp \
./daal/csr_numeric_table_impl.cpp \
./daal/homogen_numeric_table_byte_buffer_impl.cpp \
Expand Down Expand Up @@ -109,6 +110,7 @@ CPP_SRCS += \


OBJS += \
./Common.o \
./service.o ./error_handling.o \
./daal/csr_numeric_table_impl.o \
./daal/homogen_numeric_table_byte_buffer_impl.o \
Expand Down
15 changes: 7 additions & 8 deletions mllib-dal/src/main/native/PCAImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ static void doPCADAALCompute(JNIEnv *env, jobject obj, size_t rankId,
static void doPCAOneAPICompute(
JNIEnv *env, jlong pNumTabData, jlong numRows, jlong numClos,
preview::spmd::communicator<preview::spmd::device_memory_access::usm> comm,
jobject resultObj, sycl::queue &queue) {
jobject resultObj) {
logger::println(logger::INFO, "oneDAL (native): GPU compute start");
const bool isRoot = (comm.get_rank() == ccl_root);
double *htableArray = reinterpret_cast<double *>(pNumTabData);
auto data = sycl::malloc_shared<double>(numRows * numClos, queue);
queue.memcpy(data, htableArray, sizeof(double) * numRows * numClos).wait();
homogen_table htable{queue, data, numRows, numClos,
detail::make_default_delete<const double>(queue)};
homogen_table htable = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabData, numRows, numClos,
comm.get_queue())
.get());

const auto cov_desc =
covariance_gpu::descriptor<GpuAlgorithmFPType>{}.set_result_options(
covariance_gpu::result_options::cov_matrix);
Expand Down Expand Up @@ -295,8 +295,7 @@ Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL(
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, size, rankId, kvs);
doPCAOneAPICompute(env, pNumTabData, numRows, numClos, comm, resultObj,
queue);
doPCAOneAPICompute(env, pNumTabData, numRows, numClos, comm, resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
Expand Down
10 changes: 5 additions & 5 deletions mllib-dal/src/main/native/SummarizerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ static void doSummarizerOneAPICompute(
jobject resultObj, sycl::queue &queue) {
logger::println(logger::INFO, "oneDAL (native): GPU compute start");
const bool isRoot = (comm.get_rank() == ccl_root);
double *htableArray = reinterpret_cast<double *>(pNumTabData);
auto data = sycl::malloc_shared<double>(numRows * numClos, queue);
queue.memcpy(data, htableArray, sizeof(double) * numRows * numClos).wait();
homogen_table htable{queue, data, numRows, numClos,
detail::make_default_delete<const double>(queue)};
homogen_table htable = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabData, numRows, numClos,
comm.get_queue())
.get());

const auto bs_desc = basic_statistics::descriptor<GpuAlgorithmFPType>{};
auto t1 = std::chrono::high_resolution_clock::now();
const auto result_train = preview::compute(comm, bs_desc, htable);
Expand Down
4 changes: 2 additions & 2 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ object OneDAL {
}

def coalesceVectorsToHomogenTables(data: RDD[Vector], executorNum: Int,
device: Common.ComputeDevice): RDD[String] = {
device: Common.ComputeDevice): RDD[Tuple3[Long, Long, Long]] = {
logger.info(s"Processing partitions with $executorNum executors")
val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1)

Expand Down Expand Up @@ -640,7 +640,7 @@ object OneDAL {
val result = Future.sequence(futureList)
Await.result(result, Duration.Inf)

Iterator(targetArrayAddress + "_" + numRows.toLong + "_" + numCols.toLong)
Iterator((targetArrayAddress, numRows.toLong, numCols.toLong))
}.setName("coalescedTables").cache()
coalescedTables.count()
// Unpersist instances RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ class KMeansDALImpl(var nClusters: Int,
}

val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
val parts = iter.next().toString.split("_")
(parts(0).toLong, parts(1).toLong, parts(2).toLong)
iter.next()
} else {
(iter.next().toString.toLong, 0L, 0L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class PCADALImpl(val k: Int,

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
val parts = iter.next().toString.split("_")
(parts(0).toLong, parts(1).toLong, parts(2).toLong)
iter.next()
} else {
(iter.next().toString.toLong, 0L, 0L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ class CorrelationDALImpl(

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
val parts = iter.next().toString.split("_")
(parts(0).toLong, parts(1).toLong, parts(2).toLong)
iter.next()
} else {
(iter.next().toString.toLong, 0L, 0L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ class SummarizerDALImpl(val executorNum: Int,

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
val parts = iter.next().toString.split("_")
(parts(0).toLong, parts(1).toLong, parts(2).toLong)
iter.next()
} else {
(iter.next().toString.toLong, 0L, 0L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class oneDALSuite extends FunctionsSuite with Logging {
val result = OneDAL.coalesceVectorsToHomogenTables(rddVectors, 1, TestCommon.getComputeDevice)

val tableAddr = result.collect()
val table = new HomogenTable(tableAddr(0).split("_")(0).toLong)
val table = new HomogenTable(tableAddr(0)._1)
val rData: Array[Double] = table.getDoubleData()
assertArrayEquals(rData, expectData)
}
Expand Down

0 comments on commit 83feaee

Please sign in to comment.