From d52fd51b795b25a374476c7c8c6a7c3b0bba89ff Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Mon, 30 Aug 2021 16:44:16 +0800 Subject: [PATCH 01/20] 1. enable correlation on OAP MLlib 2. add Correlation example 3. enable spark 3.1.1 and spark 3.0.0 Signed-off-by: minmingzhu --- examples/correlation/build.sh | 3 + examples/correlation/pom.xml | 94 ++++++++++ examples/correlation/run.sh | 29 +++ .../examples/ml/CorrelationExample.scala | 63 +++++++ .../spark/ml/stat/CorrelationResult.java | 23 +++ .../src/main/native/CorrelationDALImpl.cpp | 177 ++++++++++++++++++ mllib-dal/src/main/native/Makefile | 6 +- mllib-dal/src/main/native/build-jni.sh | 3 +- .../spark/ml/stat/CorrelationDALImpl.scala | 108 +++++++++++ .../apache/spark/ml/stat/Correlation.scala | 109 +++++++++++ .../apache/spark/ml/stat/Correlation.scala | 104 ++++++++++ 11 files changed, 716 insertions(+), 3 deletions(-) create mode 100644 examples/correlation/build.sh create mode 100644 examples/correlation/pom.xml create mode 100644 examples/correlation/run.sh create mode 100644 examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala create mode 100644 mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java create mode 100644 mllib-dal/src/main/native/CorrelationDALImpl.cpp create mode 100644 mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala create mode 100644 mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala create mode 100644 mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala diff --git a/examples/correlation/build.sh b/examples/correlation/build.sh new file mode 100644 index 000000000..da373645b --- /dev/null +++ b/examples/correlation/build.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +mvn clean package diff --git a/examples/correlation/pom.xml b/examples/correlation/pom.xml new file mode 100644 index 000000000..98d0d7585 --- /dev/null +++ b/examples/correlation/pom.xml @@ -0,0 +1,94 @@ + + 4.0.0 + + com.intel.oap + oap-mllib-examples + 1.2.0 + jar + + LinearRegressionExample + https://github.com/oap-project/oap-mllib.git + + + UTF-8 + 1.2.0 + 2.12.10 + 2.12 + 3.1.1 + + + + + + org.scala-lang + scala-library + 2.12.10 + + + + com.github.scopt + scopt_2.12 + 3.7.0 + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + provided + + + + org.apache.spark + spark-mllib_2.12 + ${spark.version} + provided + + + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + testCompile + + + + + ${scala.version} + + -target:jvm-1.8 + + + + + maven-assembly-plugin + 3.0.0 + + false + + jar-with-dependencies + + + + + assembly + package + + single + + + + + + + + diff --git a/examples/correlation/run.sh b/examples/correlation/run.sh new file mode 100644 index 000000000..b7a0b64b5 --- /dev/null +++ b/examples/correlation/run.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +source ../../conf/env.sh + +# Data file is from Spark Examples (data/mllib/sample_linear_regression_data.txt) and put in examples/data +# The data file should be copied to $HDFS_ROOT before running examples +DATA_FILE=$HDFS_ROOT/data/sample_linear_regression_data.txt + +APP_JAR=target/oap-mllib-examples-$OAP_MLLIB_VERSION.jar +APP_CLASS=org.apache.spark.examples.ml.LinearRegressionExample + +time $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \ + --num-executors $SPARK_NUM_EXECUTORS \ + --executor-cores $SPARK_EXECUTOR_CORES \ + --total-executor-cores $SPARK_TOTAL_CORES \ + --driver-memory $SPARK_DRIVER_MEMORY \ + --executor-memory $SPARK_EXECUTOR_MEMORY \ + --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ + --conf "spark.default.parallelism=$SPARK_DEFAULT_PARALLELISM" \ + --conf "spark.sql.shuffle.partitions=$SPARK_DEFAULT_PARALLELISM" \ + --conf "spark.driver.extraClassPath=$SPARK_DRIVER_CLASSPATH" \ + --conf "spark.executor.extraClassPath=$SPARK_EXECUTOR_CLASSPATH" \ + --conf "spark.shuffle.reduceLocality.enabled=false" \ + --conf "spark.network.timeout=1200s" \ + --conf "spark.task.maxFailures=1" \ + --jars $OAP_MLLIB_JAR \ + --class $APP_CLASS \ + $APP_JAR $DATA_FILE \ + 2>&1 | tee LinearRegression-$(date +%m%d_%H_%M_%S).log diff --git a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala new file mode 100644 index 000000000..d7f1fc8ed --- /dev/null +++ b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.ml + +// $example on$ +import org.apache.spark.ml.linalg.{Matrix, Vectors} +import org.apache.spark.ml.stat.Correlation +import org.apache.spark.sql.Row +// $example off$ +import org.apache.spark.sql.SparkSession + +/** + * An example for computing correlation matrix. + * Run with + * {{{ + * bin/run-example ml.CorrelationExample + * }}} + */ +object CorrelationExample { + + def main(args: Array[String]): Unit = { + val spark = SparkSession + .builder + .appName("CorrelationExample") + .getOrCreate() + import spark.implicits._ + + // $example on$ + val data = Seq( + Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))), + Vectors.dense(4.0, 5.0, 0.0, 3.0), + Vectors.dense(6.0, 7.0, 0.0, 8.0), + Vectors.sparse(4, Seq((0, 9.0), (3, 1.0))) + ) + + val df = data.map(Tuple1.apply).toDF("features") + val Row(coeff1: Matrix) = Correlation.corr(df, "features").head + println(s"Pearson correlation matrix:\n $coeff1") + + val Row(coeff2: Matrix) = Correlation.corr(df, "features", "spearman").head + println(s"Spearman correlation matrix:\n $coeff2") + // $example off$ + + spark.stop() + } +} +// scalastyle:on println diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java b/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java new file mode 100644 index 000000000..ddf094db9 --- /dev/null +++ b/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java @@ -0,0 +1,23 @@ +/******************************************************************************* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +package org.apache.spark.ml.stat; + +public class CorrelationResult { + public long covarianceNumericTable; + public long correlationNumericTable; + public long meanNumericTable; +} \ No newline at end of file diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp new file mode 100644 index 000000000..86cbca52c --- /dev/null +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -0,0 +1,177 @@ +/******************************************************************************* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +#include +#include +#include + +#include "OneCCL.h" +#include "org_apache_spark_ml_stat_CorrelationDALImpl.h" +#include "service.h" + + +using namespace std; +using namespace daal; +using namespace daal::algorithms; + + +typedef double algorithmFPType; /* Algorithm floating-point type */ + +/* + * Class: org_apache_spark_ml_stat_CorrelationDALImpl + * Method: cCorrelationTrainDAL + * Signature: (JJDDIILorg/apache/spark/ml/stat/CorrelationResult;)J + */ + +JNIEXPORT jlong JNICALL +Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( + JNIEnv *env, jobject obj, jlong pNumTabData, + jint executor_num, jint executor_cores, jobject resultObj) { + + ccl::communicator &comm = getComm(); + size_t rankId = comm.rank(); + std::cout << " rankId : " << rankId << " ! " + << std::endl; + + const size_t nBlocks = executor_num; + + NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); + // Source data already normalized + pData->setNormalizationFlag(NumericTableIface::standardScoreNormalized); + + // Set number of threads for oneDAL to use for each rank + services::Environment::getInstance()->setNumberOfThreads(executor_cores); + + int nThreadsNew = + services::Environment::getInstance()->getNumberOfThreads(); + cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew + << endl; + + auto t1 = std::chrono::high_resolution_clock::now(); + + const bool isRoot = (rankId == ccl_root); + + covariance::Distributed localAlgorithm; + + /* Set the input data set to the algorithm */ + localAlgorithm.input.set(covariance::data, pData); + + /* Compute covariance */ + localAlgorithm.compute(); + + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(t2 - t1).count(); + std::cout << "Correleation (native): local step took " << duration << " secs" + << std::endl; + + t1 = std::chrono::high_resolution_clock::now(); + + /* Serialize partial results required by step 2 */ + InputDataArchive dataArch; + localAlgorithm.getPartialResult()->serialize(dataArch); + size_t perNodeArchLength = dataArch.getSizeOfArchive(); + + std::vector aPerNodeArchLength(comm.size()); + std::vector aReceiveCount(comm.size(), 1); + /* Transfer archive length to the step 2 on the root node */ + ccl::allgatherv(&perNodeArchLength, 1, aPerNodeArchLength.data(), aReceiveCount, comm).wait(); + + ByteBuffer serializedData; + /* Calculate total archive length */ + int totalArchLength = 0; + + for (size_t i = 0; i < nBlocks; ++i) + { + totalArchLength += aPerNodeArchLength[i]; + } + aReceiveCount[ccl_root] = totalArchLength; + + serializedData.resize(totalArchLength); + + + ByteBuffer nodeResults(perNodeArchLength); + dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength); + + /* Transfer partial results to step 2 on the root node */ + ccl::allgatherv(&nodeResults[0], perNodeArchLength, &serializedData[0], aPerNodeArchLength, comm).wait(); + t2 = std::chrono::high_resolution_clock::now(); + + duration = + std::chrono::duration_cast(t2 - t1).count(); + std::cout << "Correleation (native): ccl_allgatherv took " << duration << " secs" + << std::endl; + if (isRoot) { + auto t1 = std::chrono::high_resolution_clock::now(); + /* Create an algorithm to compute covariance on the master node */ + covariance::Distributed masterAlgorithm; + std::cout << "nBlocks :" << nBlocks + << std::endl; + for (size_t i = 0, shift = 0; i < nBlocks; shift += aPerNodeArchLength[i], ++i) { + /* Deserialize partial results from step 1 */ + OutputDataArchive dataArch(&serializedData[shift], aPerNodeArchLength[i]); + covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult()); + dataForStep2FromStep1->deserialize(dataArch); + /* Set local partial results as input for the master-node algorithm + */ + masterAlgorithm.input.add(covariance::partialResults, + dataForStep2FromStep1); + } + /* Merge and finalizeCompute covariance decomposition on the master node */ + masterAlgorithm.compute(); + masterAlgorithm.finalizeCompute(); + + /* Retrieve the algorithm results */ + covariance::ResultPtr result = masterAlgorithm.getResult(); + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(t2 - t1).count(); + std::cout << "Correlation (native): master step took " << duration << " secs" + << std::endl; + + /* Print the results */ + printNumericTable(result->get(covariance::covariance), + "Covariance matrix:"); + printNumericTable(result->get(covariance::mean), + "Mean vector:"); + + // Return all covariance & mean + + // Get the class of the input object + jclass clazz = env->GetObjectClass(resultObj); + // Get Field references + jfieldID covarianceNumericTableField = + env->GetFieldID(clazz, "covarianceNumericTable", "J"); + jfieldID correlationNumericTableField = + env->GetFieldID(clazz, "correlationNumericTable", "J"); + jfieldID meanNumericTableField = + env->GetFieldID(clazz, "meanNumericTable", "J"); + + NumericTablePtr *covariance = + new NumericTablePtr(result->get(covariance::covariance)); + NumericTablePtr *correlation = + new NumericTablePtr(result->get(covariance::correlation)); + NumericTablePtr *mean = + new NumericTablePtr(result->get(covariance::mean)); + + env->SetLongField(resultObj, covarianceNumericTableField, (jlong)covariance); + env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); + env->SetLongField(resultObj, meanNumericTableField,(jlong)mean); + } + + return 0; + +} \ No newline at end of file diff --git a/mllib-dal/src/main/native/Makefile b/mllib-dal/src/main/native/Makefile index 94ab3a9b1..3baf5ec4c 100644 --- a/mllib-dal/src/main/native/Makefile +++ b/mllib-dal/src/main/native/Makefile @@ -60,7 +60,8 @@ CPP_SRCS += \ ./PCADALImpl.cpp \ ./ALSDALImpl.cpp ./ALSShuffle.cpp \ ./NaiveBayesDALImpl.cpp \ - ./LinearRegressionDALImpl.cpp + ./LinearRegressionDALImpl.cpp \ + ./CorrelationDALImpl.cpp OBJS += \ ./OneCCL.o ./OneDAL.o ./service.o ./error_handling.o \ @@ -68,7 +69,8 @@ OBJS += \ ./PCADALImpl.o \ ./ALSDALImpl.o ./ALSShuffle.o \ ./NaiveBayesDALImpl.o \ - ./LinearRegressionDALImpl.o + ./LinearRegressionDALImpl.o \ + ./CorrelationDALImpl.o DEFINES=-D$(PLATFORM_PROFILE) diff --git a/mllib-dal/src/main/native/build-jni.sh b/mllib-dal/src/main/native/build-jni.sh index 4da92dd67..c58385d40 100755 --- a/mllib-dal/src/main/native/build-jni.sh +++ b/mllib-dal/src/main/native/build-jni.sh @@ -35,4 +35,5 @@ javah -d $WORK_DIR/javah -classpath "$WORK_DIR/../../../target/classes:$DAAL_JAR org.apache.spark.ml.feature.PCADALImpl \ org.apache.spark.ml.recommendation.ALSDALImpl \ org.apache.spark.ml.classification.NaiveBayesDALImpl \ - org.apache.spark.ml.regression.LinearRegressionDALImpl + org.apache.spark.ml.regression.LinearRegressionDALImpl \ + org.apache.spark.ml.stat.CorrelationDALImpl diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala new file mode 100644 index 000000000..3378f11bf --- /dev/null +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala @@ -0,0 +1,108 @@ +/* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import java.util.Arrays + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.ml.util.{OneCCL, OneDAL, Utils} +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.util.Utils.getOneCCLIPPort +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrix} + + +class CorrelationDALImpl( + val executorNum: Int, + val executorCores: Int) + extends Serializable with Logging { + + def computeCorrelationMatrix(data: RDD[Vector] ): Matrix = { + + val kvsIPPort = getOneCCLIPPort(data) + println(s"OneCCL ip port : ${kvsIPPort} ") + + val coalescedTables = OneDAL.rddVectorToMergedTables(data, executorNum) + println(s"executorNum : ${executorNum} ") + println(s"executorCores : ${executorCores} ") + + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + println(s"map partition :") + val tableArr = table.next() + + OneCCL.init(executorNum, rank, kvsIPPort) + + val computeStartTime = System.nanoTime() + + val result = new CorrelationResult() + cCorrelationTrainDAL( + tableArr, + executorNum, + executorCores, + result + ) + + val computeEndTime = System.nanoTime() + + val durationCompute = (computeEndTime - computeStartTime).toDouble / 1E9 + + println(s"CorrelationDAL compute took ${durationCompute} secs") + + val ret = if (OneCCL.isRoot()) { + + val convResultStartTime = System.nanoTime() + val covarianceNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.covarianceNumericTable)) + val correlationNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.correlationNumericTable)) + val meanNumericTable = OneDAL.numericTableToVectors(OneDAL.makeNumericTable(result.meanNumericTable)) + + val convResultEndTime = System.nanoTime() + + val durationCovResult = (convResultEndTime - convResultStartTime).toDouble / 1E9 + + println(s"CorrelationDAL result conversion took ${durationCovResult} secs") + + Iterator((covarianceNumericTable, correlationNumericTable, meanNumericTable)) + } else { + Iterator.empty + } + + OneCCL.cleanup() + + ret + }.collect() + + // Make sure there is only one result from rank 0 + assert(results.length == 1) + + // Release native memory for numeric tables + OneDAL.releaseNumericTables(data.sparkContext) + + val covarianceMatrix = results(0)._1 + val correlationMatrix = results(0)._2 + val meanVectors = results(0)._3 + + correlationMatrix + } + + + @native private def cCorrelationTrainDAL(data: Long, + executor_num: Int, + executor_cores: Int, + result: CorrelationResult): Long + + } \ No newline at end of file diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala new file mode 100644 index 000000000..72ea0be76 --- /dev/null +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} +import org.apache.spark.ml.util.Utils +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.storage.StorageLevel + +import scala.collection.JavaConverters._ + +/** + * API for correlation functions in MLlib, compatible with DataFrames and Datasets. + * + * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset#stat]] + * to spark.ml's Vector types. + */ +@Since("2.2.0") +@Experimental +object Correlation { + + /** + * :: Experimental :: + * Compute the correlation matrix for the input Dataset of Vectors using the specified method. + * Methods currently supported: `pearson` (default), `spearman`. + * + * @param dataset A dataset or a dataframe + * @param column The name of the column of vectors for which the correlation coefficient needs + * to be computed. This must be a column of the dataset, and it must contain + * Vector objects. + * @param method String specifying the method to use for computing correlation. + * Supported: `pearson` (default), `spearman` + * @return A dataframe that contains the correlation matrix of the column of vectors. This + * dataframe contains a single row and a single column of name + * '$METHODNAME($COLUMN)'. + * @throws IllegalArgumentException if the column is not a valid column in the dataset, or if + * the content of this column is not of type Vector. + * + * Here is how to access the correlation coefficient: + * {{{ + * val data: Dataset[Vector] = ... + * val Row(coeff: Matrix) = Correlation.corr(data, "value").head + * // coeff now contains the Pearson correlation matrix. + * }}} + * + * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column + * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + * which is fairly costly. Cache the input Dataset before calling corr with `method = "spearman"` + * to avoid recomputing the common lineage. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { + + val isPlatformSupported = Utils.checkClusterPlatformCompatibility( + dataset.sparkSession.sparkContext) + if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => v + } + rdd.persist(StorageLevel.MEMORY_AND_DISK) + + // Cache for init + val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) + val executor_cores = Utils.sparkExecutorCores() + val oldM = new CorrelationDALImpl(executor_num, executor_cores) + .computeCorrelationMatrix(rdd) + rdd.unpersist() + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + + } else { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => OldVectors.fromML(v) + } + val oldM = OldStatistics.corr(rdd, method) + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + } + } + + /** + * Compute the Pearson correlation matrix for the input Dataset of Vectors. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String): DataFrame = { + corr(dataset, column, "pearson") + } +} diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala new file mode 100644 index 000000000..851d0ddfd --- /dev/null +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * API for correlation functions in MLlib, compatible with DataFrames and Datasets. + * + * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset#stat]] + * to spark.ml's Vector types. + */ +@Since("2.2.0") +object Correlation { + + /** + * Compute the correlation matrix for the input Dataset of Vectors using the specified method. + * Methods currently supported: `pearson` (default), `spearman`. + * + * @param dataset A dataset or a dataframe + * @param column The name of the column of vectors for which the correlation coefficient needs + * to be computed. This must be a column of the dataset, and it must contain + * Vector objects. + * @param method String specifying the method to use for computing correlation. + * Supported: `pearson` (default), `spearman` + * @return A dataframe that contains the correlation matrix of the column of vectors. This + * dataframe contains a single row and a single column of name + * `$METHODNAME($COLUMN)`. + * @throws IllegalArgumentException if the column is not a valid column in the dataset, or if + * the content of this column is not of type Vector. + * + * Here is how to access the correlation coefficient: + * {{{ + * val data: Dataset[Vector] = ... + * val Row(coeff: Matrix) = Correlation.corr(data, "value").head + * // coeff now contains the Pearson correlation matrix. + * }}} + * + * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column + * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + * which is fairly costly. Cache the input Dataset before calling corr with `method = "spearman"` + * to avoid recomputing the common lineage. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { + val isPlatformSupported = Utils.checkClusterPlatformCompatibility( + dataset.sparkSession.sparkContext) + if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => v + } + rdd.persist(StorageLevel.MEMORY_AND_DISK) + + // Cache for init + val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) + val executor_cores = Utils.sparkExecutorCores() + val oldM = new CorrelationDALImpl(executor_num, executor_cores) + .computeCorrelationMatrix(rdd) + rdd.unpersist() + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + + } else { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => OldVectors.fromML(v) + } + val oldM = OldStatistics.corr(rdd, method) + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + } + } + + /** + * Compute the Pearson correlation matrix for the input Dataset of Vectors. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String): DataFrame = { + corr(dataset, column, "pearson") + } +} From 757561a09d5d733b0437a3828217ec2468e930e7 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Thu, 2 Sep 2021 18:12:28 +0800 Subject: [PATCH 02/20] 1. resolve comments Signed-off-by: minmingzhu --- examples/correlation/pom.xml | 2 +- examples/correlation/run.sh | 8 ++----- .../examples/ml/CorrelationExample.scala | 11 ++++++---- .../spark/ml/stat/CorrelationResult.java | 4 +--- .../src/main/native/CorrelationDALImpl.cpp | 22 ++++++------------- .../spark/ml/stat/CorrelationDALImpl.scala | 12 ++-------- .../org/apache/spark/ml/util/OneDAL.scala | 22 ++++++++++++++++++- 7 files changed, 41 insertions(+), 40 deletions(-) diff --git a/examples/correlation/pom.xml b/examples/correlation/pom.xml index 98d0d7585..40b64af2d 100644 --- a/examples/correlation/pom.xml +++ b/examples/correlation/pom.xml @@ -7,7 +7,7 @@ 1.2.0 jar - LinearRegressionExample + CorrelationExample https://github.com/oap-project/oap-mllib.git diff --git a/examples/correlation/run.sh b/examples/correlation/run.sh index b7a0b64b5..fed6809d8 100644 --- a/examples/correlation/run.sh +++ b/examples/correlation/run.sh @@ -2,12 +2,8 @@ source ../../conf/env.sh -# Data file is from Spark Examples (data/mllib/sample_linear_regression_data.txt) and put in examples/data -# The data file should be copied to $HDFS_ROOT before running examples -DATA_FILE=$HDFS_ROOT/data/sample_linear_regression_data.txt - APP_JAR=target/oap-mllib-examples-$OAP_MLLIB_VERSION.jar -APP_CLASS=org.apache.spark.examples.ml.LinearRegressionExample +APP_CLASS=org.apache.spark.examples.ml.CorrelationExample time $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \ --num-executors $SPARK_NUM_EXECUTORS \ @@ -26,4 +22,4 @@ time $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \ --jars $OAP_MLLIB_JAR \ --class $APP_CLASS \ $APP_JAR $DATA_FILE \ - 2>&1 | tee LinearRegression-$(date +%m%d_%H_%M_%S).log + 2>&1 | tee Correlation-$(date +%m%d_%H_%M_%S).log \ No newline at end of file diff --git a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala index d7f1fc8ed..f2378831c 100644 --- a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala +++ b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala @@ -50,11 +50,14 @@ object CorrelationExample { ) val df = data.map(Tuple1.apply).toDF("features") - val Row(coeff1: Matrix) = Correlation.corr(df, "features").head - println(s"Pearson correlation matrix:\n $coeff1") + Correlation.corr(df, "features").collect().foreach((coeff1) => { + println(s"Pearson correlation matrix:\n $coeff1.") + }) - val Row(coeff2: Matrix) = Correlation.corr(df, "features", "spearman").head - println(s"Spearman correlation matrix:\n $coeff2") + + Correlation.corr(df, "features", "spearman").collect().foreach((coeff2) => { + println(s"Pearson correlation matrix:\n $coeff2.") + }) // $example off$ spark.stop() diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java b/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java index ddf094db9..047c042a7 100644 --- a/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java +++ b/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java @@ -17,7 +17,5 @@ package org.apache.spark.ml.stat; public class CorrelationResult { - public long covarianceNumericTable; public long correlationNumericTable; - public long meanNumericTable; -} \ No newline at end of file +} diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp index 86cbca52c..3d83fb30e 100644 --- a/mllib-dal/src/main/native/CorrelationDALImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -64,7 +64,7 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( const bool isRoot = (rankId == ccl_root); - covariance::Distributed localAlgorithm; + covariance::Distributed localAlgorithm; /* Set the input data set to the algorithm */ localAlgorithm.input.set(covariance::data, pData); @@ -143,33 +143,25 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( << std::endl; /* Print the results */ - printNumericTable(result->get(covariance::covariance), - "Covariance matrix:"); - printNumericTable(result->get(covariance::mean), - "Mean vector:"); + printNumericTable(result->get(covariance::correlation), + "Correlation first 20 columns of " + "correlation matrix:", + 1, 20); // Return all covariance & mean // Get the class of the input object jclass clazz = env->GetObjectClass(resultObj); // Get Field references - jfieldID covarianceNumericTableField = - env->GetFieldID(clazz, "covarianceNumericTable", "J"); jfieldID correlationNumericTableField = env->GetFieldID(clazz, "correlationNumericTable", "J"); - jfieldID meanNumericTableField = - env->GetFieldID(clazz, "meanNumericTable", "J"); - NumericTablePtr *covariance = - new NumericTablePtr(result->get(covariance::covariance)); + NumericTablePtr *correlation = new NumericTablePtr(result->get(covariance::correlation)); - NumericTablePtr *mean = - new NumericTablePtr(result->get(covariance::mean)); - env->SetLongField(resultObj, covarianceNumericTableField, (jlong)covariance); + env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); - env->SetLongField(resultObj, meanNumericTableField,(jlong)mean); } return 0; diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala index 3378f11bf..b5e0d8d9b 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala @@ -66,17 +66,14 @@ class CorrelationDALImpl( val ret = if (OneCCL.isRoot()) { val convResultStartTime = System.nanoTime() - val covarianceNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.covarianceNumericTable)) val correlationNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.correlationNumericTable)) - val meanNumericTable = OneDAL.numericTableToVectors(OneDAL.makeNumericTable(result.meanNumericTable)) - val convResultEndTime = System.nanoTime() val durationCovResult = (convResultEndTime - convResultStartTime).toDouble / 1E9 println(s"CorrelationDAL result conversion took ${durationCovResult} secs") - Iterator((covarianceNumericTable, correlationNumericTable, meanNumericTable)) + Iterator(correlationNumericTable) } else { Iterator.empty } @@ -89,12 +86,7 @@ class CorrelationDALImpl( // Make sure there is only one result from rank 0 assert(results.length == 1) - // Release native memory for numeric tables - OneDAL.releaseNumericTables(data.sparkContext) - - val covarianceMatrix = results(0)._1 - val correlationMatrix = results(0)._2 - val meanVectors = results(0)._3 + val correlationMatrix = results(0) correlationMatrix } diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala index a77521d28..cd00d473c 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala @@ -20,7 +20,7 @@ import com.intel.daal.data_management.data.{CSRNumericTable, HomogenNumericTable import com.intel.daal.services.DaalContext import org.apache.spark.SparkContext import org.apache.spark.ml.linalg.{DenseMatrix, DenseVector, Matrix, SparseVector, Vector, Vectors} -import org.apache.spark.mllib.linalg.{Vector => OldVector} +import org.apache.spark.mllib.linalg.{Vector => OldVector, Matrix => OldMatrix, DenseMatrix => OldDenseMatrix} import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, RDD} import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.storage.StorageLevel @@ -54,6 +54,24 @@ object OneDAL { matrix } + + def numericTableToOldMatrix(table: NumericTable): OldMatrix = { + val numRows = table.getNumberOfRows.toInt + val numCols = table.getNumberOfColumns.toInt + + var dataDouble: DoubleBuffer = null + // returned DoubleBuffer is ByteByffer, need to copy as double array + dataDouble = table.getBlockOfRows(0, numRows, dataDouble) + val arrayDouble = new Array[Double](numRows * numCols) + dataDouble.get(arrayDouble) + + // Transpose as DAL numeric table is row-major and DenseMatrix is column major + val OldMatrix = new OldDenseMatrix(numRows, numCols, arrayDouble, isTransposed = true) + + table.releaseBlockOfRows(0, numRows, dataDouble) + + OldMatrix + } def isDenseDataset(ds: Dataset[_]): Boolean = { val row = ds.select("features").head() @@ -121,6 +139,7 @@ object OneDAL { matrix } + def rddDoubleToNumericTables(doubles: RDD[Double], executorNum: Int): RDD[Long] = { require(executorNum > 0) @@ -141,6 +160,7 @@ object OneDAL { doublesTables } + def rddLabeledPointToSparseTables(labeledPoints: Dataset[_], executorNum: Int): RDD[(Long, Long)] = { require(executorNum > 0) From c933bdde7156b2a1763773c09288fbe313d4096e Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 7 Sep 2021 00:17:32 +0800 Subject: [PATCH 03/20] 1. fix oap matrix result error. Signed-off-by: minmingzhu --- .../spark/ml/stat/CorrelationResult.java | 1 + .../src/main/native/CorrelationDALImpl.cpp | 35 ++++++++++++------- .../spark/ml/stat/CorrelationDALImpl.scala | 10 ++++-- .../org/apache/spark/ml/util/OneDAL.scala | 2 +- 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java b/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java index 047c042a7..3893b644b 100644 --- a/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java +++ b/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java @@ -18,4 +18,5 @@ public class CorrelationResult { public long correlationNumericTable; + public long meanNumericTable; } diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp index 3d83fb30e..aa5f66c36 100644 --- a/mllib-dal/src/main/native/CorrelationDALImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -16,7 +16,6 @@ #include #include -#include #include "OneCCL.h" #include "org_apache_spark_ml_stat_CorrelationDALImpl.h" @@ -49,8 +48,6 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( const size_t nBlocks = executor_num; NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); - // Source data already normalized - pData->setNormalizationFlag(NumericTableIface::standardScoreNormalized); // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads(executor_cores); @@ -83,7 +80,8 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( /* Serialize partial results required by step 2 */ InputDataArchive dataArch; localAlgorithm.getPartialResult()->serialize(dataArch); - size_t perNodeArchLength = dataArch.getSizeOfArchive(); + const uint64_t perNodeArchLength = (size_t)dataArch.getSizeOfArchive(); + std::vector aPerNodeArchLength(comm.size()); std::vector aReceiveCount(comm.size(), 1); @@ -107,9 +105,10 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength); /* Transfer partial results to step 2 on the root node */ - ccl::allgatherv(&nodeResults[0], perNodeArchLength, &serializedData[0], aPerNodeArchLength, comm).wait(); + ccl::allgatherv((int8_t *)&nodeResults[0], perNodeArchLength, (int8_t *)&serializedData[0], aPerNodeArchLength, comm).wait(); t2 = std::chrono::high_resolution_clock::now(); + duration = std::chrono::duration_cast(t2 - t1).count(); std::cout << "Correleation (native): ccl_allgatherv took " << duration << " secs" @@ -118,18 +117,23 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( auto t1 = std::chrono::high_resolution_clock::now(); /* Create an algorithm to compute covariance on the master node */ covariance::Distributed masterAlgorithm; - std::cout << "nBlocks :" << nBlocks - << std::endl; + for (size_t i = 0, shift = 0; i < nBlocks; shift += aPerNodeArchLength[i], ++i) { /* Deserialize partial results from step 1 */ OutputDataArchive dataArch(&serializedData[shift], aPerNodeArchLength[i]); + covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult()); dataForStep2FromStep1->deserialize(dataArch); + /* Set local partial results as input for the master-node algorithm */ masterAlgorithm.input.add(covariance::partialResults, dataForStep2FromStep1); } + + /* Set the parameter to choose the type of the output matrix */ + masterAlgorithm.parameter.outputMatrixType = covariance::correlationMatrix; + /* Merge and finalizeCompute covariance decomposition on the master node */ masterAlgorithm.compute(); masterAlgorithm.finalizeCompute(); @@ -147,21 +151,28 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( "Correlation first 20 columns of " "correlation matrix:", 1, 20); + printNumericTable(result->get(covariance::mean), + "Correlation first 20 columns of " + "mean matrix:", + 1, 20); - // Return all covariance & mean - - // Get the class of the input object + // Return all correlation & mean jclass clazz = env->GetObjectClass(resultObj); + // Get Field references jfieldID correlationNumericTableField = env->GetFieldID(clazz, "correlationNumericTable", "J"); - + jfieldID meanNumericTableField = + env->GetFieldID(clazz, "meanNumericTable", "J"); NumericTablePtr *correlation = new NumericTablePtr(result->get(covariance::correlation)); - + NumericTablePtr *mean = + new NumericTablePtr(result->get(covariance::mean)); env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); + env->SetLongField(resultObj, meanNumericTableField,(jlong)mean); + } return 0; diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala index b5e0d8d9b..365b23939 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala @@ -42,7 +42,6 @@ class CorrelationDALImpl( println(s"executorCores : ${executorCores} ") val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => - println(s"map partition :") val tableArr = table.next() OneCCL.init(executorNum, rank, kvsIPPort) @@ -67,13 +66,15 @@ class CorrelationDALImpl( val convResultStartTime = System.nanoTime() val correlationNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.correlationNumericTable)) + val meanNumericTable = OneDAL.numericTableToVectors(OneDAL.makeNumericTable(result.meanNumericTable)) + val convResultEndTime = System.nanoTime() val durationCovResult = (convResultEndTime - convResultStartTime).toDouble / 1E9 println(s"CorrelationDAL result conversion took ${durationCovResult} secs") - Iterator(correlationNumericTable) + Iterator((correlationNumericTable, meanNumericTable)) } else { Iterator.empty } @@ -86,7 +87,10 @@ class CorrelationDALImpl( // Make sure there is only one result from rank 0 assert(results.length == 1) - val correlationMatrix = results(0) + val correlationMatrix = results(0)._1 + val meanVectors = results(0)._2 + + println(s"correlationMatrix : ${correlationMatrix.toString} ") correlationMatrix } diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala index cd00d473c..9ae7d3fef 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala @@ -66,7 +66,7 @@ object OneDAL { dataDouble.get(arrayDouble) // Transpose as DAL numeric table is row-major and DenseMatrix is column major - val OldMatrix = new OldDenseMatrix(numRows, numCols, arrayDouble, isTransposed = true) + val OldMatrix = new OldDenseMatrix(numRows, numCols, arrayDouble, isTransposed = false) table.releaseBlockOfRows(0, numRows, dataDouble) From cebea3f42ab7073370eefb9a74cae008e880f43b Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 7 Sep 2021 10:39:54 +0800 Subject: [PATCH 04/20] 1. add jni file Signed-off-by: minmingzhu --- ..._apache_spark_ml_stat_CorrelationDALImpl.h | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 mllib-dal/src/main/native/javah/org_apache_spark_ml_stat_CorrelationDALImpl.h diff --git a/mllib-dal/src/main/native/javah/org_apache_spark_ml_stat_CorrelationDALImpl.h b/mllib-dal/src/main/native/javah/org_apache_spark_ml_stat_CorrelationDALImpl.h new file mode 100644 index 000000000..6e4f033e1 --- /dev/null +++ b/mllib-dal/src/main/native/javah/org_apache_spark_ml_stat_CorrelationDALImpl.h @@ -0,0 +1,21 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class org_apache_spark_ml_stat_CorrelationDALImpl */ + +#ifndef _Included_org_apache_spark_ml_stat_CorrelationDALImpl +#define _Included_org_apache_spark_ml_stat_CorrelationDALImpl +#ifdef __cplusplus +extern "C" { +#endif +/* + * * Class: org_apache_spark_ml_stat_CorrelationDALImpl + * * Method: cCorrelationTrainDAL + * * Signature: (JIILorg/apache/spark/ml/stat/CorrelationResult;)J + * */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL + (JNIEnv *, jobject, jlong, jint, jint, jobject); + +#ifdef __cplusplus +} +#endif +#endif From d5c1e249ae5204f9384ae808e5a68358f383e921 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 7 Sep 2021 16:00:34 +0800 Subject: [PATCH 05/20] 1. add test code Signed-off-by: minmingzhu --- .../spark/ml/stat/MLlibCorrelationSuite.scala | 77 +++++++++++++++++++ mllib-dal/test-cpu-gpu.sh | 4 +- mllib-dal/test.sh | 3 +- 3 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 mllib-dal/src/test/scala/org/apache/spark/ml/stat/MLlibCorrelationSuite.scala diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/stat/MLlibCorrelationSuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/stat/MLlibCorrelationSuite.scala new file mode 100644 index 000000000..94339ab45 --- /dev/null +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/stat/MLlibCorrelationSuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.Logging +import org.apache.spark.ml.linalg.{Matrices, Matrix, Vectors} +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Row} + + +class MLlibCorrelationSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { + + val xData = Array(1.0, 0.0, -2.0) + val yData = Array(4.0, 5.0, 3.0) + val zeros = new Array[Double](3) + val data = Seq( + Vectors.dense(1.0, 0.0, 0.0, -2.0), + Vectors.dense(4.0, 5.0, 0.0, 3.0), + Vectors.dense(6.0, 7.0, 0.0, 8.0), + Vectors.dense(9.0, 0.0, 0.0, 1.0) + ) + + private def X = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") + + private def extract(df: DataFrame): BDM[Double] = { + val Array(Row(mat: Matrix)) = df.collect() + mat.asBreeze.toDenseMatrix + } + + + test("corr(X) default, pearson") { + val defaultMat = Correlation.corr(X, "features") + val pearsonMat = Correlation.corr(X, "features", "pearson") + // scalastyle:off + val expected = Matrices.fromBreeze(BDM( + (1.00000000, 0.05564149, Double.NaN, 0.4004714), + (0.05564149, 1.00000000, Double.NaN, 0.9135959), + (Double.NaN, Double.NaN, 1.00000000, Double.NaN), + (0.40047142, 0.91359586, Double.NaN, 1.0000000))) + // scalastyle:on + + assert(Matrices.fromBreeze(extract(defaultMat)) ~== expected absTol 1e-4) + assert(Matrices.fromBreeze(extract(pearsonMat)) ~== expected absTol 1e-4) + } + + test("corr(X) spearman") { + val spearmanMat = Correlation.corr(X, "features", "spearman") + // scalastyle:off + val expected = Matrices.fromBreeze(BDM( + (1.0000000, 0.1054093, Double.NaN, 0.4000000), + (0.1054093, 1.0000000, Double.NaN, 0.9486833), + (Double.NaN, Double.NaN, 1.00000000, Double.NaN), + (0.4000000, 0.9486833, Double.NaN, 1.0000000))) + // scalastyle:on + assert(Matrices.fromBreeze(extract(spearmanMat)) ~== expected absTol 1e-4) + } + +} diff --git a/mllib-dal/test-cpu-gpu.sh b/mllib-dal/test-cpu-gpu.sh index eb32e62c8..dc6ddd5ee 100755 --- a/mllib-dal/test-cpu-gpu.sh +++ b/mllib-dal/test-cpu-gpu.sh @@ -38,7 +38,9 @@ suiteArray=( "feature.MLlibPCASuite" \ "recommendation.MLlibALSSuite" \ "classification.MLlibNaiveBayesSuite" \ - "regression.MLlibLinearRegressionSuite" + "regression.MLlibLinearRegressionSuite" \ + "stat.MLlibCorrelationSuite" + ) # Set default version diff --git a/mllib-dal/test.sh b/mllib-dal/test.sh index b9bfd215d..b4c1cde36 100755 --- a/mllib-dal/test.sh +++ b/mllib-dal/test.sh @@ -38,7 +38,8 @@ suiteArray=( "feature.MLlibPCASuite" \ "recommendation.MLlibALSSuite" \ "classification.MLlibNaiveBayesSuite" \ - "regression.MLlibLinearRegressionSuite" + "regression.MLlibLinearRegressionSuite" \ + "stat.MLlibCorrelationSuite" ) # Set default version From 7ff6dd4428fda2885a4c5b5244d852102f3f253c Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Wed, 8 Sep 2021 13:28:38 +0800 Subject: [PATCH 06/20] 1. vanilla rdd add cache Signed-off-by: minmingzhu --- .../org/apache/spark/ml/stat/Correlation.scala | 5 ++++- .../org/apache/spark/ml/stat/Correlation.scala | 13 ++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala index 72ea0be76..7d220b1a2 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -69,10 +69,10 @@ object Correlation { */ @Since("2.2.0") def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { - val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + println("OAP MLlib running") val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } @@ -89,11 +89,14 @@ object Correlation { dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) } else { + println("Vanilla/MKL running") val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } + rdd.persist(StorageLevel.MEMORY_AND_DISK) val oldM = OldStatistics.corr(rdd, method) val name = s"$method($column)" + rdd.unpersist() val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) } diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala index 851d0ddfd..4c694a8aa 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -19,12 +19,14 @@ package org.apache.spark.ml.stat import scala.collection.JavaConverters._ -import org.apache.spark.annotation.Since +import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} +import org.apache.spark.ml.util.Utils import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.mllib.stat.{Statistics => OldStatistics} -import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.storage.StorageLevel /** * API for correlation functions in MLlib, compatible with DataFrames and Datasets. @@ -68,11 +70,12 @@ object Correlation { val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + println("OAP MLlib running") + val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } rdd.persist(StorageLevel.MEMORY_AND_DISK) - // Cache for init val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() @@ -84,10 +87,14 @@ object Correlation { dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) } else { + println("Vanilla/MKL running") + val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } + rdd.persist(StorageLevel.MEMORY_AND_DISK) val oldM = OldStatistics.corr(rdd, method) + rdd.unpersist() val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) From 01e1e8c85fc6f3ce6357beac7b36d394dab63d28 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Thu, 9 Sep 2021 23:30:39 +0800 Subject: [PATCH 07/20] remove rdd cache --- .../main/scala/org/apache/spark/ml/stat/Correlation.scala | 6 ------ .../main/scala/org/apache/spark/ml/stat/Correlation.scala | 5 ----- 2 files changed, 11 deletions(-) diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala index 7d220b1a2..bdf6bde69 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -76,14 +76,10 @@ object Correlation { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } - rdd.persist(StorageLevel.MEMORY_AND_DISK) - - // Cache for init val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() val oldM = new CorrelationDALImpl(executor_num, executor_cores) .computeCorrelationMatrix(rdd) - rdd.unpersist() val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) @@ -93,10 +89,8 @@ object Correlation { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } - rdd.persist(StorageLevel.MEMORY_AND_DISK) val oldM = OldStatistics.corr(rdd, method) val name = s"$method($column)" - rdd.unpersist() val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) } diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala index 4c694a8aa..99244122b 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -75,13 +75,10 @@ object Correlation { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } - rdd.persist(StorageLevel.MEMORY_AND_DISK) - // Cache for init val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() val oldM = new CorrelationDALImpl(executor_num, executor_cores) .computeCorrelationMatrix(rdd) - rdd.unpersist() val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) @@ -92,9 +89,7 @@ object Correlation { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } - rdd.persist(StorageLevel.MEMORY_AND_DISK) val oldM = OldStatistics.corr(rdd, method) - rdd.unpersist() val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) From 9a5f2d7bdf4820e3837b453abf483f0975d1d158 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Fri, 10 Sep 2021 13:30:11 +0800 Subject: [PATCH 08/20] 1. change algorithmFPType float to double 2. modify test 3. enable spark 3.0.1 and spark 3.0.2 4. remove print Signed-off-by: minmingzhu --- .../examples/ml/CorrelationExample.scala | 20 +-- .../src/main/native/CorrelationDALImpl.cpp | 163 +++++++++--------- .../spark/ml/stat/CorrelationDALImpl.scala | 91 +++++----- .../apache/spark/ml/stat/Correlation.scala | 2 - .../apache/spark/ml/stat/Correlation.scala | 104 +++++++++++ .../apache/spark/ml/stat/Correlation.scala | 104 +++++++++++ .../apache/spark/ml/stat/Correlation.scala | 4 - 7 files changed, 344 insertions(+), 144 deletions(-) create mode 100644 mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala create mode 100644 mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala diff --git a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala index f2378831c..dc9120964 100644 --- a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala +++ b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala @@ -43,10 +43,16 @@ object CorrelationExample { // $example on$ val data = Seq( - Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))), - Vectors.dense(4.0, 5.0, 0.0, 3.0), - Vectors.dense(6.0, 7.0, 0.0, 8.0), - Vectors.sparse(4, Seq((0, 9.0), (3, 1.0))) + Vectors.dense(5.308206,9.869278,1.018934,4.292158,6.081011,6.585723,2.411094,4.767308,-3.256320,-6.029562), + Vectors.dense(7.279464,0.390664,-9.619284,3.435376,-4.769490,-4.873188,-0.118791,-5.117316,-0.418655,-0.475422), + Vectors.dense(-6.615791,-6.191542,0.402459,-9.743521,-9.990568,9.105346,1.691312,-2.605659,9.534952,-7.829027), + Vectors.dense(-4.792007,-2.491098,-2.939393,8.086467,3.773812,-9.997300,0.222378,8.995244,-5.753282,6.091060), + Vectors.dense(7.700725,-6.414918,1.684476,-8.983361,4.284580,-9.017608,0.552379,-7.705741,2.589852,0.411561), + Vectors.dense(6.991900,-1.063721,9.321163,-0.429719,-2.167696,-1.736425,-0.919139,6.980681,-0.711914,3.414347), + Vectors.dense(5.794488,-1.062261,0.955322,0.389642,3.012921,-9.953994,-3.197309,3.992421,-6.935902,8.147622), + Vectors.dense(-2.486670,6.973242,-4.047004,-5.655629,5.081786,5.533859,7.821403,2.763313,-0.454056,6.554309), + Vectors.dense(2.204855,7.839522,7.381886,1.618749,-6.566877,7.584285,-8.355983,-5.501410,-8.191205,-2.608499), + Vectors.dense(-9.948613,-8.941953,-8.106389,4.863542,5.852806,-1.659259,6.342504,-8.190106,-3.110330,-7.484658), ) val df = data.map(Tuple1.apply).toDF("features") @@ -54,12 +60,6 @@ object CorrelationExample { println(s"Pearson correlation matrix:\n $coeff1.") }) - - Correlation.corr(df, "features", "spearman").collect().foreach((coeff2) => { - println(s"Pearson correlation matrix:\n $coeff2.") - }) - // $example off$ - spark.stop() } } diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp index aa5f66c36..90fabd184 100644 --- a/mllib-dal/src/main/native/CorrelationDALImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -21,12 +21,10 @@ #include "org_apache_spark_ml_stat_CorrelationDALImpl.h" #include "service.h" - using namespace std; using namespace daal; using namespace daal::algorithms; - typedef double algorithmFPType; /* Algorithm floating-point type */ /* @@ -37,13 +35,12 @@ typedef double algorithmFPType; /* Algorithm floating-point type */ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, - jint executor_num, jint executor_cores, jobject resultObj) { + JNIEnv *env, jobject obj, jlong pNumTabData, jint executor_num, + jint executor_cores, jobject resultObj) { ccl::communicator &comm = getComm(); size_t rankId = comm.rank(); - std::cout << " rankId : " << rankId << " ! " - << std::endl; + std::cout << " rankId : " << rankId << " ! " << std::endl; const size_t nBlocks = executor_num; @@ -58,12 +55,12 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( << endl; auto t1 = std::chrono::high_resolution_clock::now(); - + const bool isRoot = (rankId == ccl_root); - covariance::Distributed localAlgorithm; + covariance::Distributed localAlgorithm; - /* Set the input data set to the algorithm */ + /* Set the input data set to the algorithm */ localAlgorithm.input.set(covariance::data, pData); /* Compute covariance */ @@ -72,8 +69,8 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( auto t2 = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast(t2 - t1).count(); - std::cout << "Correleation (native): local step took " << duration << " secs" - << std::endl; + std::cout << "Correleation (native): local step took " << duration + << " secs" << std::endl; t1 = std::chrono::high_resolution_clock::now(); @@ -82,99 +79,103 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( localAlgorithm.getPartialResult()->serialize(dataArch); const uint64_t perNodeArchLength = (size_t)dataArch.getSizeOfArchive(); - std::vector aPerNodeArchLength(comm.size()); std::vector aReceiveCount(comm.size(), 1); /* Transfer archive length to the step 2 on the root node */ - ccl::allgatherv(&perNodeArchLength, 1, aPerNodeArchLength.data(), aReceiveCount, comm).wait(); + ccl::allgatherv(&perNodeArchLength, 1, aPerNodeArchLength.data(), + aReceiveCount, comm) + .wait(); ByteBuffer serializedData; /* Calculate total archive length */ int totalArchLength = 0; - for (size_t i = 0; i < nBlocks; ++i) - { + for (size_t i = 0; i < nBlocks; ++i) { totalArchLength += aPerNodeArchLength[i]; } aReceiveCount[ccl_root] = totalArchLength; serializedData.resize(totalArchLength); - ByteBuffer nodeResults(perNodeArchLength); dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength); /* Transfer partial results to step 2 on the root node */ - ccl::allgatherv((int8_t *)&nodeResults[0], perNodeArchLength, (int8_t *)&serializedData[0], aPerNodeArchLength, comm).wait(); + ccl::allgatherv((int8_t *)&nodeResults[0], perNodeArchLength, + (int8_t *)&serializedData[0], aPerNodeArchLength, comm) + .wait(); t2 = std::chrono::high_resolution_clock::now(); - duration = std::chrono::duration_cast(t2 - t1).count(); - std::cout << "Correleation (native): ccl_allgatherv took " << duration << " secs" - << std::endl; + std::cout << "Correleation (native): ccl_allgatherv took " << duration + << " secs" << std::endl; if (isRoot) { - auto t1 = std::chrono::high_resolution_clock::now(); - /* Create an algorithm to compute covariance on the master node */ - covariance::Distributed masterAlgorithm; - - for (size_t i = 0, shift = 0; i < nBlocks; shift += aPerNodeArchLength[i], ++i) { - /* Deserialize partial results from step 1 */ - OutputDataArchive dataArch(&serializedData[shift], aPerNodeArchLength[i]); - - covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult()); - dataForStep2FromStep1->deserialize(dataArch); - - /* Set local partial results as input for the master-node algorithm - */ - masterAlgorithm.input.add(covariance::partialResults, - dataForStep2FromStep1); - } - - /* Set the parameter to choose the type of the output matrix */ - masterAlgorithm.parameter.outputMatrixType = covariance::correlationMatrix; - - /* Merge and finalizeCompute covariance decomposition on the master node */ - masterAlgorithm.compute(); - masterAlgorithm.finalizeCompute(); - - /* Retrieve the algorithm results */ - covariance::ResultPtr result = masterAlgorithm.getResult(); - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - std::chrono::duration_cast(t2 - t1).count(); - std::cout << "Correlation (native): master step took " << duration << " secs" - << std::endl; - - /* Print the results */ - printNumericTable(result->get(covariance::correlation), - "Correlation first 20 columns of " - "correlation matrix:", - 1, 20); - printNumericTable(result->get(covariance::mean), - "Correlation first 20 columns of " - "mean matrix:", - 1, 20); - - // Return all correlation & mean - jclass clazz = env->GetObjectClass(resultObj); - - // Get Field references - jfieldID correlationNumericTableField = - env->GetFieldID(clazz, "correlationNumericTable", "J"); - jfieldID meanNumericTableField = - env->GetFieldID(clazz, "meanNumericTable", "J"); - - NumericTablePtr *correlation = - new NumericTablePtr(result->get(covariance::correlation)); - NumericTablePtr *mean = - new NumericTablePtr(result->get(covariance::mean)); - - env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); - env->SetLongField(resultObj, meanNumericTableField,(jlong)mean); - + auto t1 = std::chrono::high_resolution_clock::now(); + /* Create an algorithm to compute covariance on the master node */ + covariance::Distributed masterAlgorithm; + + for (size_t i = 0, shift = 0; i < nBlocks; + shift += aPerNodeArchLength[i], ++i) { + /* Deserialize partial results from step 1 */ + OutputDataArchive dataArch(&serializedData[shift], + aPerNodeArchLength[i]); + + covariance::PartialResultPtr dataForStep2FromStep1( + new covariance::PartialResult()); + dataForStep2FromStep1->deserialize(dataArch); + + /* Set local partial results as input for the master-node algorithm + */ + masterAlgorithm.input.add(covariance::partialResults, + dataForStep2FromStep1); } - return 0; - + /* Set the parameter to choose the type of the output matrix */ + masterAlgorithm.parameter.outputMatrixType = + covariance::correlationMatrix; + + /* Merge and finalizeCompute covariance decomposition on the master node + */ + masterAlgorithm.compute(); + masterAlgorithm.finalizeCompute(); + + /* Retrieve the algorithm results */ + covariance::ResultPtr result = masterAlgorithm.getResult(); + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(t2 - t1).count(); + std::cout << "Correlation (native): master step took " << duration + << " secs" << std::endl; + + /* Print the results */ + printNumericTable(result->get(covariance::correlation), + "Correlation first 20 columns of " + "correlation matrix:", + 1, 20); + printNumericTable(result->get(covariance::mean), + "Correlation first 20 columns of " + "mean matrix:", + 1, 20); + + // Return all correlation & mean + jclass clazz = env->GetObjectClass(resultObj); + + // Get Field references + jfieldID correlationNumericTableField = + env->GetFieldID(clazz, "correlationNumericTable", "J"); + jfieldID meanNumericTableField = + env->GetFieldID(clazz, "meanNumericTable", "J"); + + NumericTablePtr *correlation = + new NumericTablePtr(result->get(covariance::correlation)); + NumericTablePtr *mean = + new NumericTablePtr(result->get(covariance::mean)); + + env->SetLongField(resultObj, correlationNumericTableField, + (jlong)correlation); + env->SetLongField(resultObj, meanNumericTableField, (jlong)mean); + } + + return 0; } \ No newline at end of file diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala index 365b23939..ff0e0c846 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala @@ -28,77 +28,74 @@ import org.apache.spark.mllib.linalg.{Matrix} class CorrelationDALImpl( - val executorNum: Int, - val executorCores: Int) + val executorNum: Int, + val executorCores: Int) extends Serializable with Logging { - def computeCorrelationMatrix(data: RDD[Vector] ): Matrix = { + def computeCorrelationMatrix(data: RDD[Vector]): Matrix = { - val kvsIPPort = getOneCCLIPPort(data) - println(s"OneCCL ip port : ${kvsIPPort} ") + val kvsIPPort = getOneCCLIPPort(data) - val coalescedTables = OneDAL.rddVectorToMergedTables(data, executorNum) - println(s"executorNum : ${executorNum} ") - println(s"executorCores : ${executorCores} ") + val coalescedTables = OneDAL.rddVectorToMergedTables(data, executorNum) - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => - val tableArr = table.next() + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + val tableArr = table.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) - val computeStartTime = System.nanoTime() + val computeStartTime = System.nanoTime() - val result = new CorrelationResult() - cCorrelationTrainDAL( - tableArr, - executorNum, - executorCores, - result - ) + val result = new CorrelationResult() + cCorrelationTrainDAL( + tableArr, + executorNum, + executorCores, + result + ) - val computeEndTime = System.nanoTime() + val computeEndTime = System.nanoTime() - val durationCompute = (computeEndTime - computeStartTime).toDouble / 1E9 + val durationCompute = (computeEndTime - computeStartTime).toDouble / 1E9 - println(s"CorrelationDAL compute took ${durationCompute} secs") + println(s"CorrelationDAL compute took ${durationCompute} secs") - val ret = if (OneCCL.isRoot()) { + val ret = if (OneCCL.isRoot()) { - val convResultStartTime = System.nanoTime() - val correlationNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.correlationNumericTable)) - val meanNumericTable = OneDAL.numericTableToVectors(OneDAL.makeNumericTable(result.meanNumericTable)) + val convResultStartTime = System.nanoTime() + val correlationNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.correlationNumericTable)) + val meanNumericTable = OneDAL.numericTableToVectors(OneDAL.makeNumericTable(result.meanNumericTable)) - val convResultEndTime = System.nanoTime() + val convResultEndTime = System.nanoTime() - val durationCovResult = (convResultEndTime - convResultStartTime).toDouble / 1E9 + val durationCovResult = (convResultEndTime - convResultStartTime).toDouble / 1E9 - println(s"CorrelationDAL result conversion took ${durationCovResult} secs") + println(s"CorrelationDAL result conversion took ${durationCovResult} secs") - Iterator((correlationNumericTable, meanNumericTable)) - } else { - Iterator.empty - } + Iterator((correlationNumericTable, meanNumericTable)) + } else { + Iterator.empty + } - OneCCL.cleanup() + OneCCL.cleanup() - ret - }.collect() + ret + }.collect() - // Make sure there is only one result from rank 0 - assert(results.length == 1) + // Make sure there is only one result from rank 0 + assert(results.length == 1) - val correlationMatrix = results(0)._1 - val meanVectors = results(0)._2 + val correlationMatrix = results(0)._1 + val meanVectors = results(0)._2 - println(s"correlationMatrix : ${correlationMatrix.toString} ") + println(s"correlationMatrix : ${correlationMatrix.toString} ") - correlationMatrix - } + correlationMatrix + } @native private def cCorrelationTrainDAL(data: Long, - executor_num: Int, - executor_cores: Int, - result: CorrelationResult): Long + executor_num: Int, + executor_cores: Int, + result: CorrelationResult): Long - } \ No newline at end of file +} \ No newline at end of file diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala index bdf6bde69..4eadc1e02 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -72,7 +72,6 @@ object Correlation { val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { - println("OAP MLlib running") val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } @@ -85,7 +84,6 @@ object Correlation { dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) } else { - println("Vanilla/MKL running") val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } diff --git a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala new file mode 100644 index 000000000..903c3abfc --- /dev/null +++ b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} +import org.apache.spark.ml.util.Utils +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.storage.StorageLevel + +import scala.collection.JavaConverters._ + +/** + * API for correlation functions in MLlib, compatible with DataFrames and Datasets. + * + * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset#stat]] + * to spark.ml's Vector types. + */ +@Since("2.2.0") +@Experimental +object Correlation { + + /** + * :: Experimental :: + * Compute the correlation matrix for the input Dataset of Vectors using the specified method. + * Methods currently supported: `pearson` (default), `spearman`. + * + * @param dataset A dataset or a dataframe + * @param column The name of the column of vectors for which the correlation coefficient needs + * to be computed. This must be a column of the dataset, and it must contain + * Vector objects. + * @param method String specifying the method to use for computing correlation. + * Supported: `pearson` (default), `spearman` + * @return A dataframe that contains the correlation matrix of the column of vectors. This + * dataframe contains a single row and a single column of name + * '$METHODNAME($COLUMN)'. + * @throws IllegalArgumentException if the column is not a valid column in the dataset, or if + * the content of this column is not of type Vector. + * + * Here is how to access the correlation coefficient: + * {{{ + * val data: Dataset[Vector] = ... + * val Row(coeff: Matrix) = Correlation.corr(data, "value").head + * // coeff now contains the Pearson correlation matrix. + * }}} + * + * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column + * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + * which is fairly costly. Cache the input Dataset before calling corr with `method = "spearman"` + * to avoid recomputing the common lineage. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { + val isPlatformSupported = Utils.checkClusterPlatformCompatibility( + dataset.sparkSession.sparkContext) + if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => v + } + val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) + val executor_cores = Utils.sparkExecutorCores() + val oldM = new CorrelationDALImpl(executor_num, executor_cores) + .computeCorrelationMatrix(rdd) + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + + } else { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => OldVectors.fromML(v) + } + val oldM = OldStatistics.corr(rdd, method) + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + } + } + + /** + * Compute the Pearson correlation matrix for the input Dataset of Vectors. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String): DataFrame = { + corr(dataset, column, "pearson") + } +} diff --git a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala new file mode 100644 index 000000000..4eadc1e02 --- /dev/null +++ b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} +import org.apache.spark.ml.util.Utils +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.storage.StorageLevel + +import scala.collection.JavaConverters._ + +/** + * API for correlation functions in MLlib, compatible with DataFrames and Datasets. + * + * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset#stat]] + * to spark.ml's Vector types. + */ +@Since("2.2.0") +@Experimental +object Correlation { + + /** + * :: Experimental :: + * Compute the correlation matrix for the input Dataset of Vectors using the specified method. + * Methods currently supported: `pearson` (default), `spearman`. + * + * @param dataset A dataset or a dataframe + * @param column The name of the column of vectors for which the correlation coefficient needs + * to be computed. This must be a column of the dataset, and it must contain + * Vector objects. + * @param method String specifying the method to use for computing correlation. + * Supported: `pearson` (default), `spearman` + * @return A dataframe that contains the correlation matrix of the column of vectors. This + * dataframe contains a single row and a single column of name + * '$METHODNAME($COLUMN)'. + * @throws IllegalArgumentException if the column is not a valid column in the dataset, or if + * the content of this column is not of type Vector. + * + * Here is how to access the correlation coefficient: + * {{{ + * val data: Dataset[Vector] = ... + * val Row(coeff: Matrix) = Correlation.corr(data, "value").head + * // coeff now contains the Pearson correlation matrix. + * }}} + * + * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column + * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + * which is fairly costly. Cache the input Dataset before calling corr with `method = "spearman"` + * to avoid recomputing the common lineage. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { + val isPlatformSupported = Utils.checkClusterPlatformCompatibility( + dataset.sparkSession.sparkContext) + if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => v + } + val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) + val executor_cores = Utils.sparkExecutorCores() + val oldM = new CorrelationDALImpl(executor_num, executor_cores) + .computeCorrelationMatrix(rdd) + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + + } else { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => OldVectors.fromML(v) + } + val oldM = OldStatistics.corr(rdd, method) + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + } + } + + /** + * Compute the Pearson correlation matrix for the input Dataset of Vectors. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String): DataFrame = { + corr(dataset, column, "pearson") + } +} diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala index 99244122b..15d707328 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -70,8 +70,6 @@ object Correlation { val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { - println("OAP MLlib running") - val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } @@ -84,8 +82,6 @@ object Correlation { dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) } else { - println("Vanilla/MKL running") - val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } From 4755d02caaf06b1d615940220cfee43a90448570 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Wed, 15 Sep 2021 22:07:43 +0800 Subject: [PATCH 09/20] 1.remove mean vector Signed-off-by: minmingzhu --- .../java/org/apache/spark/ml/stat/CorrelationResult.java | 1 - mllib-dal/src/main/native/CorrelationDALImpl.cpp | 9 --------- .../org/apache/spark/ml/stat/CorrelationDALImpl.scala | 8 ++------ 3 files changed, 2 insertions(+), 16 deletions(-) diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java b/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java index 3893b644b..047c042a7 100644 --- a/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java +++ b/mllib-dal/src/main/java/org/apache/spark/ml/stat/CorrelationResult.java @@ -18,5 +18,4 @@ public class CorrelationResult { public long correlationNumericTable; - public long meanNumericTable; } diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp index 90fabd184..d7d72efaf 100644 --- a/mllib-dal/src/main/native/CorrelationDALImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -153,10 +153,6 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( "Correlation first 20 columns of " "correlation matrix:", 1, 20); - printNumericTable(result->get(covariance::mean), - "Correlation first 20 columns of " - "mean matrix:", - 1, 20); // Return all correlation & mean jclass clazz = env->GetObjectClass(resultObj); @@ -164,17 +160,12 @@ Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( // Get Field references jfieldID correlationNumericTableField = env->GetFieldID(clazz, "correlationNumericTable", "J"); - jfieldID meanNumericTableField = - env->GetFieldID(clazz, "meanNumericTable", "J"); NumericTablePtr *correlation = new NumericTablePtr(result->get(covariance::correlation)); - NumericTablePtr *mean = - new NumericTablePtr(result->get(covariance::mean)); env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); - env->SetLongField(resultObj, meanNumericTableField, (jlong)mean); } return 0; diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala index ff0e0c846..2a5b424bf 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala @@ -63,7 +63,6 @@ class CorrelationDALImpl( val convResultStartTime = System.nanoTime() val correlationNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.correlationNumericTable)) - val meanNumericTable = OneDAL.numericTableToVectors(OneDAL.makeNumericTable(result.meanNumericTable)) val convResultEndTime = System.nanoTime() @@ -71,7 +70,7 @@ class CorrelationDALImpl( println(s"CorrelationDAL result conversion took ${durationCovResult} secs") - Iterator((correlationNumericTable, meanNumericTable)) + Iterator(correlationNumericTable) } else { Iterator.empty } @@ -84,10 +83,7 @@ class CorrelationDALImpl( // Make sure there is only one result from rank 0 assert(results.length == 1) - val correlationMatrix = results(0)._1 - val meanVectors = results(0)._2 - - println(s"correlationMatrix : ${correlationMatrix.toString} ") + val correlationMatrix = results(0) correlationMatrix } From dafda4a90f697c761ae12a6fc01b8122572fe1b4 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Sat, 18 Sep 2021 10:25:34 +0800 Subject: [PATCH 10/20] 1. enable Correlation GPU Signed-off-by: minmingzhu --- .../src/main/native/CorrelationDALImpl.cpp | 266 ++++++++++-------- .../spark/ml/stat/CorrelationDALImpl.scala | 18 +- 2 files changed, 172 insertions(+), 112 deletions(-) diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp index aa5f66c36..07a0205d1 100644 --- a/mllib-dal/src/main/native/CorrelationDALImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -17,6 +17,10 @@ #include #include +#ifdef CPU_GPU_PROFILE +#include "GPU.h" +#endif + #include "OneCCL.h" #include "org_apache_spark_ml_stat_CorrelationDALImpl.h" #include "service.h" @@ -29,152 +33,194 @@ using namespace daal::algorithms; typedef double algorithmFPType; /* Algorithm floating-point type */ -/* - * Class: org_apache_spark_ml_stat_CorrelationDALImpl - * Method: cCorrelationTrainDAL - * Signature: (JJDDIILorg/apache/spark/ml/stat/CorrelationResult;)J - */ +static NumericTablePtr correlation_compute(JNIEnv *env, + jobject obj, + int rankId, + ccl::communicator &comm, + const NumericTablePtr &pData, + size_t nBlocks, + jobject resultObj) { -JNIEXPORT jlong JNICALL -Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, - jint executor_num, jint executor_cores, jobject resultObj) { + auto t1 = std::chrono::high_resolution_clock::now(); - ccl::communicator &comm = getComm(); - size_t rankId = comm.rank(); - std::cout << " rankId : " << rankId << " ! " - << std::endl; + const bool isRoot = (rankId == ccl_root); - const size_t nBlocks = executor_num; + covariance::Distributed localAlgorithm; - NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); + /* Set the input data set to the algorithm */ + localAlgorithm.input.set(covariance::data, pData); + + /* Compute covariance */ + localAlgorithm.compute(); - // Set number of threads for oneDAL to use for each rank - services::Environment::getInstance()->setNumberOfThreads(executor_cores); + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(t2 - t1).count(); + std::cout << "Correleation (native): local step took " << duration << " secs" + << std::endl; - int nThreadsNew = - services::Environment::getInstance()->getNumberOfThreads(); - cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew - << endl; + t1 = std::chrono::high_resolution_clock::now(); - auto t1 = std::chrono::high_resolution_clock::now(); - - const bool isRoot = (rankId == ccl_root); + /* Serialize partial results required by step 2 */ + InputDataArchive dataArch; + localAlgorithm.getPartialResult()->serialize(dataArch); + const uint64_t perNodeArchLength = (size_t)dataArch.getSizeOfArchive(); - covariance::Distributed localAlgorithm; - /* Set the input data set to the algorithm */ - localAlgorithm.input.set(covariance::data, pData); + std::vector aPerNodeArchLength(comm.size()); + std::vector aReceiveCount(comm.size(), 1); + /* Transfer archive length to the step 2 on the root node */ + ccl::allgatherv(&perNodeArchLength, 1, aPerNodeArchLength.data(), aReceiveCount, comm).wait(); - /* Compute covariance */ - localAlgorithm.compute(); + ByteBuffer serializedData; + /* Calculate total archive length */ + int totalArchLength = 0; - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - std::chrono::duration_cast(t2 - t1).count(); - std::cout << "Correleation (native): local step took " << duration << " secs" - << std::endl; + for (size_t i = 0; i < nBlocks; ++i) + { + totalArchLength += aPerNodeArchLength[i]; + } + aReceiveCount[ccl_root] = totalArchLength; - t1 = std::chrono::high_resolution_clock::now(); + serializedData.resize(totalArchLength); - /* Serialize partial results required by step 2 */ - InputDataArchive dataArch; - localAlgorithm.getPartialResult()->serialize(dataArch); - const uint64_t perNodeArchLength = (size_t)dataArch.getSizeOfArchive(); + ByteBuffer nodeResults(perNodeArchLength); + dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength); - std::vector aPerNodeArchLength(comm.size()); - std::vector aReceiveCount(comm.size(), 1); - /* Transfer archive length to the step 2 on the root node */ - ccl::allgatherv(&perNodeArchLength, 1, aPerNodeArchLength.data(), aReceiveCount, comm).wait(); + /* Transfer partial results to step 2 on the root node */ + ccl::allgatherv((int8_t *)&nodeResults[0], perNodeArchLength, (int8_t *)&serializedData[0], aPerNodeArchLength, comm).wait(); + t2 = std::chrono::high_resolution_clock::now(); - ByteBuffer serializedData; - /* Calculate total archive length */ - int totalArchLength = 0; - for (size_t i = 0; i < nBlocks; ++i) - { - totalArchLength += aPerNodeArchLength[i]; - } - aReceiveCount[ccl_root] = totalArchLength; + duration = + std::chrono::duration_cast(t2 - t1).count(); + std::cout << "Correleation (native): ccl_allgatherv took " << duration << " secs" + << std::endl; + if (isRoot) { + auto t1 = std::chrono::high_resolution_clock::now(); + /* Create an algorithm to compute covariance on the master node */ + covariance::Distributed masterAlgorithm; - serializedData.resize(totalArchLength); + for (size_t i = 0, shift = 0; i < nBlocks; shift += aPerNodeArchLength[i], ++i) { + /* Deserialize partial results from step 1 */ + OutputDataArchive dataArch(&serializedData[shift], aPerNodeArchLength[i]); + covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult()); + dataForStep2FromStep1->deserialize(dataArch); - ByteBuffer nodeResults(perNodeArchLength); - dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength); + /* Set local partial results as input for the master-node algorithm + */ + masterAlgorithm.input.add(covariance::partialResults, + dataForStep2FromStep1); + } - /* Transfer partial results to step 2 on the root node */ - ccl::allgatherv((int8_t *)&nodeResults[0], perNodeArchLength, (int8_t *)&serializedData[0], aPerNodeArchLength, comm).wait(); - t2 = std::chrono::high_resolution_clock::now(); + /* Set the parameter to choose the type of the output matrix */ + masterAlgorithm.parameter.outputMatrixType = covariance::correlationMatrix; + /* Merge and finalizeCompute covariance decomposition on the master node */ + masterAlgorithm.compute(); + masterAlgorithm.finalizeCompute(); - duration = - std::chrono::duration_cast(t2 - t1).count(); - std::cout << "Correleation (native): ccl_allgatherv took " << duration << " secs" - << std::endl; - if (isRoot) { - auto t1 = std::chrono::high_resolution_clock::now(); - /* Create an algorithm to compute covariance on the master node */ - covariance::Distributed masterAlgorithm; + /* Retrieve the algorithm results */ + covariance::ResultPtr result = masterAlgorithm.getResult(); + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(t2 - t1).count(); + std::cout << "Correlation (native): master step took " << duration << " secs" + << std::endl; - for (size_t i = 0, shift = 0; i < nBlocks; shift += aPerNodeArchLength[i], ++i) { - /* Deserialize partial results from step 1 */ - OutputDataArchive dataArch(&serializedData[shift], aPerNodeArchLength[i]); + /* Print the results */ + printNumericTable(result->get(covariance::correlation), + "Correlation first 20 columns of " + "correlation matrix:", + 1, 20); + printNumericTable(result->get(covariance::mean), + "Correlation first 20 columns of " + "mean matrix:", + 1, 20); - covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult()); - dataForStep2FromStep1->deserialize(dataArch); + printNumericTable(result->get(covariance::correlation), "correlation matrix:"); + printNumericTable(result->get(covariance::mean), "Mean vector:"); + // Return all covariance & mean + jclass clazz = env->GetObjectClass(resultObj); - /* Set local partial results as input for the master-node algorithm - */ - masterAlgorithm.input.add(covariance::partialResults, - dataForStep2FromStep1); - } + // Get Field references + jfieldID correlationNumericTableField = + env->GetFieldID(clazz, "correlationNumericTable", "J"); + jfieldID meanNumericTableField = + env->GetFieldID(clazz, "meanNumericTable", "J"); - /* Set the parameter to choose the type of the output matrix */ - masterAlgorithm.parameter.outputMatrixType = covariance::correlationMatrix; + NumericTablePtr *correlation = + new NumericTablePtr(result->get(covariance::correlation)); + NumericTablePtr *mean = + new NumericTablePtr(result->get(covariance::mean)); - /* Merge and finalizeCompute covariance decomposition on the master node */ - masterAlgorithm.compute(); - masterAlgorithm.finalizeCompute(); + env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); + env->SetLongField(resultObj, meanNumericTableField,(jlong)mean); - /* Retrieve the algorithm results */ - covariance::ResultPtr result = masterAlgorithm.getResult(); - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - std::chrono::duration_cast(t2 - t1).count(); - std::cout << "Correlation (native): master step took " << duration << " secs" - << std::endl; + } + return NumericTablePtr(); - /* Print the results */ - printNumericTable(result->get(covariance::correlation), - "Correlation first 20 columns of " - "correlation matrix:", - 1, 20); - printNumericTable(result->get(covariance::mean), - "Correlation first 20 columns of " - "mean matrix:", - 1, 20); +} - // Return all correlation & mean - jclass clazz = env->GetObjectClass(resultObj); +/* + * Class: org_apache_spark_ml_stat_CorrelationDALImpl + * Method: cCorrelationTrainDAL + * Signature: (JJDDIILorg/apache/spark/ml/stat/CorrelationResult;)J + */ + +JNIEXPORT jlong JNICALL +Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( + JNIEnv *env, jobject obj, jlong pNumTabData, + jint executor_num, jint executor_cores, jboolean use_gpu, jintArray gpu_idx_array, jobject resultObj) { + + ccl::communicator &comm = getComm(); + size_t rankId = comm.rank(); + std::cout << " rankId : " << rankId << " ! " + << std::endl; + + const size_t nBlocks = executor_num; + + NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); + + #ifdef CPU_GPU_PROFILE + + if (use_gpu) { + int n_gpu = env->GetArrayLength(gpu_idx_array); + cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)" + << endl; + + jint *gpu_indices = env->GetIntArrayElements(gpu_idx_array, 0); + + int size = comm.size(); + auto assigned_gpu = + getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu); + + // Set SYCL context + cl::sycl::queue queue(assigned_gpu); + daal::services::SyclExecutionContext ctx(queue); + daal::services::Environment::getInstance()->setDefaultExecutionContext( + ctx); - // Get Field references - jfieldID correlationNumericTableField = - env->GetFieldID(clazz, "correlationNumericTable", "J"); - jfieldID meanNumericTableField = - env->GetFieldID(clazz, "meanNumericTable", "J"); + correlation_compute( + env, obj, rankId, comm, pData, nBlocks, resultObj); - NumericTablePtr *correlation = - new NumericTablePtr(result->get(covariance::correlation)); - NumericTablePtr *mean = - new NumericTablePtr(result->get(covariance::mean)); + env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0); + } else + #endif + { + // Set number of threads for oneDAL to use for each rank + services::Environment::getInstance()->setNumberOfThreads(executor_cores); - env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); - env->SetLongField(resultObj, meanNumericTableField,(jlong)mean); + int nThreadsNew = + services::Environment::getInstance()->getNumberOfThreads(); + cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew + << endl; + correlation_compute( + env, obj, rankId, comm, pData, nBlocks, resultObj); } return 0; - } \ No newline at end of file diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala index 365b23939..4205f55ab 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala @@ -17,7 +17,6 @@ package org.apache.spark.ml.stat import java.util.Arrays - import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.util.{OneCCL, OneDAL, Utils} @@ -26,7 +25,6 @@ import org.apache.spark.ml.util.Utils.getOneCCLIPPort import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Matrix} - class CorrelationDALImpl( val executorNum: Int, val executorCores: Int) @@ -37,11 +35,23 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(data) println(s"OneCCL ip port : ${kvsIPPort} ") + val sparkContext = data.sparkContext + val useGPU = sparkContext.conf.getBoolean("spark.oap.mllib.useGPU", false) + val coalescedTables = OneDAL.rddVectorToMergedTables(data, executorNum) println(s"executorNum : ${executorNum} ") println(s"executorCores : ${executorCores} ") val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + + val gpuIndices = if (useGPU) { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + + val tableArr = table.next() OneCCL.init(executorNum, rank, kvsIPPort) @@ -53,6 +63,8 @@ class CorrelationDALImpl( tableArr, executorNum, executorCores, + useGPU, + gpuIndices, result ) @@ -99,6 +111,8 @@ class CorrelationDALImpl( @native private def cCorrelationTrainDAL(data: Long, executor_num: Int, executor_cores: Int, + useGPU: Boolean, + gpuIndices: Array[Int], result: CorrelationResult): Long } \ No newline at end of file From cf4bd580802bf679805b0cebb3eafce1e6710da1 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Sat, 18 Sep 2021 15:45:49 +0800 Subject: [PATCH 11/20] 1. add GPU test script. Signed-off-by: minmingzhu --- examples/correlation/run-gpu-standalone.sh | 42 ++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100755 examples/correlation/run-gpu-standalone.sh diff --git a/examples/correlation/run-gpu-standalone.sh b/examples/correlation/run-gpu-standalone.sh new file mode 100755 index 000000000..b5a432398 --- /dev/null +++ b/examples/correlation/run-gpu-standalone.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash + +source ../../conf/env.sh + +# Data file is from Spark Examples (data/mllib/sample_kmeans_data.txt) and put in examples/data +# The data file should be copied to $HDFS_ROOT before running examples +DATA_FILE=$HDFS_ROOT/data/sample_kmeans_data.txt + +APP_JAR=target/oap-mllib-examples-$OAP_MLLIB_VERSION.jar +APP_CLASS=org.apache.spark.examples.ml.CorrelationExample + +USE_GPU=true +RESOURCE_FILE=$PWD/IntelGpuResourceFile.json +WORKER_GPU_AMOUNT=4 +EXECUTOR_GPU_AMOUNT=1 +TASK_GPU_AMOUNT=1 + +# Should run in standalone mode +time $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \ + --num-executors $SPARK_NUM_EXECUTORS \ + --executor-cores $SPARK_EXECUTOR_CORES \ + --total-executor-cores $SPARK_TOTAL_CORES \ + --driver-memory $SPARK_DRIVER_MEMORY \ + --executor-memory $SPARK_EXECUTOR_MEMORY \ + --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ + --conf "spark.default.parallelism=$SPARK_DEFAULT_PARALLELISM" \ + --conf "spark.sql.shuffle.partitions=$SPARK_DEFAULT_PARALLELISM" \ + --conf "spark.driver.extraClassPath=$SPARK_DRIVER_CLASSPATH" \ + --conf "spark.executor.extraClassPath=$SPARK_EXECUTOR_CLASSPATH" \ + --conf "spark.oap.mllib.useGPU=$USE_GPU" \ + --conf "spark.worker.resourcesFile=$RESOURCE_FILE" \ + --conf "spark.worker.resource.gpu.amount=$WORKER_GPU_AMOUNT" \ + --conf "spark.executor.resource.gpu.amount=$EXECUTOR_GPU_AMOUNT" \ + --conf "spark.task.resource.gpu.amount=$TASK_GPU_AMOUNT" \ + --conf "spark.shuffle.reduceLocality.enabled=false" \ + --conf "spark.network.timeout=1200s" \ + --conf "spark.task.maxFailures=1" \ + --jars $OAP_MLLIB_JAR \ + --class $APP_CLASS \ + $APP_JAR $DATA_FILE \ + 2>&1 | tee Correlation-$(date +%m%d_%H_%M_%S).log + From 04fbe03f447106cfa07ffcd0664938b01a39b522 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Mon, 20 Sep 2021 15:33:04 +0800 Subject: [PATCH 12/20] 1. update correlation 2. add GPU test Signed-off-by: minmingzhu --- examples/correlation/IntelGpuResourceFile.json | 1 + .../scala/org/apache/spark/examples/ml/CorrelationExample.scala | 2 +- mllib-dal/src/assembly/assembly-cpu-gpu.xml | 2 +- mllib-dal/src/main/native/Makefile | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) create mode 100644 examples/correlation/IntelGpuResourceFile.json diff --git a/examples/correlation/IntelGpuResourceFile.json b/examples/correlation/IntelGpuResourceFile.json new file mode 100644 index 000000000..4b5c3cc98 --- /dev/null +++ b/examples/correlation/IntelGpuResourceFile.json @@ -0,0 +1 @@ +[{"id":{"componentName": "spark.worker","resourceName":"gpu"},"addresses":["0","1","2","3"]}] diff --git a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala index dc9120964..a7beade5e 100644 --- a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala +++ b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala @@ -55,7 +55,7 @@ object CorrelationExample { Vectors.dense(-9.948613,-8.941953,-8.106389,4.863542,5.852806,-1.659259,6.342504,-8.190106,-3.110330,-7.484658), ) - val df = data.map(Tuple1.apply).toDF("features") + val df = data.map(Tuple1.apply).toDF("features").cache() Correlation.corr(df, "features").collect().foreach((coeff1) => { println(s"Pearson correlation matrix:\n $coeff1.") }) diff --git a/mllib-dal/src/assembly/assembly-cpu-gpu.xml b/mllib-dal/src/assembly/assembly-cpu-gpu.xml index d49fd1adf..3eb9a0079 100644 --- a/mllib-dal/src/assembly/assembly-cpu-gpu.xml +++ b/mllib-dal/src/assembly/assembly-cpu-gpu.xml @@ -43,7 +43,7 @@ ${env.TBBROOT}/lib/intel64/gcc4.8/${tbb.lib} lib - libtbb.so.2 + libtbb.so.12 ${env.TBBROOT}/lib/intel64/gcc4.8/${tbb.malloc.lib} diff --git a/mllib-dal/src/main/native/Makefile b/mllib-dal/src/main/native/Makefile index 3baf5ec4c..4f18a363b 100644 --- a/mllib-dal/src/main/native/Makefile +++ b/mllib-dal/src/main/native/Makefile @@ -51,7 +51,7 @@ LIBS_COMMON := -L$(CCL_ROOT)/lib -lccl \ ifeq ($(PLATFORM_PROFILE),CPU_ONLY_PROFILE) LIBS := $(LIBS_COMMON) else ifeq ($(PLATFORM_PROFILE),CPU_GPU_PROFILE) - LIBS := "$(LIBS_COMMON) -l:libonedal_sycl.a" + LIBS := $(LIBS_COMMON) -l:libonedal_sycl.a endif CPP_SRCS += \ From 268e9df8c70bfac09c65279146f192cc8cf3e25d Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Fri, 8 Oct 2021 11:05:45 +0800 Subject: [PATCH 13/20] 1. resolve comments Signed-off-by: minmingzhu --- .../apache/spark/examples/ml/CorrelationExample.scala | 5 ++--- mllib-dal/src/main/native/CorrelationDALImpl.cpp | 10 +++++----- .../org_apache_spark_ml_stat_CorrelationDALImpl.h | 10 +++++----- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala index a7beade5e..b5245b4ba 100644 --- a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala +++ b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala @@ -56,9 +56,8 @@ object CorrelationExample { ) val df = data.map(Tuple1.apply).toDF("features").cache() - Correlation.corr(df, "features").collect().foreach((coeff1) => { - println(s"Pearson correlation matrix:\n $coeff1.") - }) + val Row(coeff1: Matrix) = Correlation.corr(df, "features").head + println(s"Pearson correlation matrix:\n $coeff1") spark.stop() } diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp index cb3a7bfd3..7d6631ce5 100644 --- a/mllib-dal/src/main/native/CorrelationDALImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -31,7 +31,7 @@ using namespace daal::algorithms; typedef double algorithmFPType; /* Algorithm floating-point type */ -static NumericTablePtr correlation_compute(JNIEnv *env, +static void correlation_compute(JNIEnv *env, jobject obj, int rankId, ccl::communicator &comm, @@ -43,7 +43,7 @@ static NumericTablePtr correlation_compute(JNIEnv *env, const bool isRoot = (rankId == ccl_root); - covariance::Distributed localAlgorithm; + covariance::Distributed localAlgorithm; /* Set the input data set to the algorithm */ localAlgorithm.input.set(covariance::data, pData); @@ -68,7 +68,7 @@ static NumericTablePtr correlation_compute(JNIEnv *env, std::vector aPerNodeArchLength(comm.size()); std::vector aReceiveCount(comm.size(), 1); /* Transfer archive length to the step 2 on the root node */ - ccl::allgatherv(&perNodeArchLength, 1, aPerNodeArchLength.data(), aReceiveCount, comm).wait(); + ccl::gatherv(&perNodeArchLength, 1, aPerNodeArchLength.data(), aReceiveCount, comm).wait(); ByteBuffer serializedData; /* Calculate total archive length */ @@ -87,7 +87,7 @@ static NumericTablePtr correlation_compute(JNIEnv *env, dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength); /* Transfer partial results to step 2 on the root node */ - ccl::allgatherv((int8_t *)&nodeResults[0], perNodeArchLength, (int8_t *)&serializedData[0], aPerNodeArchLength, comm).wait(); + ccl::gatherv((int8_t *)&nodeResults[0], perNodeArchLength, (int8_t *)&serializedData[0], aPerNodeArchLength, comm).wait(); t2 = std::chrono::high_resolution_clock::now(); duration = @@ -97,7 +97,7 @@ static NumericTablePtr correlation_compute(JNIEnv *env, if (isRoot) { auto t1 = std::chrono::high_resolution_clock::now(); /* Create an algorithm to compute covariance on the master node */ - covariance::Distributed masterAlgorithm; + covariance::Distributed masterAlgorithm; for (size_t i = 0, shift = 0; i < nBlocks; shift += aPerNodeArchLength[i], ++i) { /* Deserialize partial results from step 1 */ diff --git a/mllib-dal/src/main/native/javah/org_apache_spark_ml_stat_CorrelationDALImpl.h b/mllib-dal/src/main/native/javah/org_apache_spark_ml_stat_CorrelationDALImpl.h index 6e4f033e1..4cc53b475 100644 --- a/mllib-dal/src/main/native/javah/org_apache_spark_ml_stat_CorrelationDALImpl.h +++ b/mllib-dal/src/main/native/javah/org_apache_spark_ml_stat_CorrelationDALImpl.h @@ -8,12 +8,12 @@ extern "C" { #endif /* - * * Class: org_apache_spark_ml_stat_CorrelationDALImpl - * * Method: cCorrelationTrainDAL - * * Signature: (JIILorg/apache/spark/ml/stat/CorrelationResult;)J - * */ + * Class: org_apache_spark_ml_stat_CorrelationDALImpl + * Method: cCorrelationTrainDAL + * Signature: (JIIZ[ILorg/apache/spark/ml/stat/CorrelationResult;)J + */ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL - (JNIEnv *, jobject, jlong, jint, jint, jobject); + (JNIEnv *, jobject, jlong, jint, jint, jboolean, jintArray, jobject); #ifdef __cplusplus } From ad7ac6b9783468acd4ab3530164192361538c912 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Mon, 11 Oct 2021 14:51:04 +0800 Subject: [PATCH 14/20] 1. used ccl::gatherv in OneCCL.h instead of ccl::allgatherv Signed-off-by: minmingzhu --- .../src/main/native/CorrelationDALImpl.cpp | 136 ++++++++---------- 1 file changed, 63 insertions(+), 73 deletions(-) diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp index 7d6631ce5..202341cd4 100644 --- a/mllib-dal/src/main/native/CorrelationDALImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -38,7 +38,7 @@ static void correlation_compute(JNIEnv *env, const NumericTablePtr &pData, size_t nBlocks, jobject resultObj) { - + using daal::byte; auto t1 = std::chrono::high_resolution_clock::now(); const bool isRoot = (rankId == ccl_root); @@ -60,34 +60,24 @@ static void correlation_compute(JNIEnv *env, t1 = std::chrono::high_resolution_clock::now(); /* Serialize partial results required by step 2 */ + services::SharedPtr serializedData; InputDataArchive dataArch; localAlgorithm.getPartialResult()->serialize(dataArch); - const uint64_t perNodeArchLength = (size_t)dataArch.getSizeOfArchive(); - - - std::vector aPerNodeArchLength(comm.size()); - std::vector aReceiveCount(comm.size(), 1); - /* Transfer archive length to the step 2 on the root node */ - ccl::gatherv(&perNodeArchLength, 1, aPerNodeArchLength.data(), aReceiveCount, comm).wait(); + size_t perNodeArchLength = dataArch.getSizeOfArchive(); - ByteBuffer serializedData; - /* Calculate total archive length */ - int totalArchLength = 0; - - for (size_t i = 0; i < nBlocks; ++i) - { - totalArchLength += aPerNodeArchLength[i]; - } - aReceiveCount[ccl_root] = totalArchLength; + serializedData = + services::SharedPtr(new byte[perNodeArchLength * nBlocks]); - serializedData.resize(totalArchLength); - - - ByteBuffer nodeResults(perNodeArchLength); - dataArch.copyArchiveToArray(&nodeResults[0], perNodeArchLength); + byte *nodeResults = new byte[perNodeArchLength]; + dataArch.copyArchiveToArray(nodeResults, perNodeArchLength); + std::vector aReceiveCount(comm.size(), + perNodeArchLength); // 4 x "14016" /* Transfer partial results to step 2 on the root node */ - ccl::gatherv((int8_t *)&nodeResults[0], perNodeArchLength, (int8_t *)&serializedData[0], aPerNodeArchLength, comm).wait(); + ccl::gather((int8_t *)nodeResults, perNodeArchLength, + (int8_t *)(serializedData.get()), perNodeArchLength, comm) + .wait(); + t2 = std::chrono::high_resolution_clock::now(); duration = @@ -95,58 +85,58 @@ static void correlation_compute(JNIEnv *env, std::cout << "Correleation (native): ccl_allgatherv took " << duration << " secs" << std::endl; if (isRoot) { - auto t1 = std::chrono::high_resolution_clock::now(); - /* Create an algorithm to compute covariance on the master node */ - covariance::Distributed masterAlgorithm; - - for (size_t i = 0, shift = 0; i < nBlocks; shift += aPerNodeArchLength[i], ++i) { - /* Deserialize partial results from step 1 */ - OutputDataArchive dataArch(&serializedData[shift], aPerNodeArchLength[i]); - - covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult()); - dataForStep2FromStep1->deserialize(dataArch); - - /* Set local partial results as input for the master-node algorithm - */ - masterAlgorithm.input.add(covariance::partialResults, - dataForStep2FromStep1); - } - - /* Set the parameter to choose the type of the output matrix */ - masterAlgorithm.parameter.outputMatrixType = covariance::correlationMatrix; - - /* Merge and finalizeCompute covariance decomposition on the master node */ - masterAlgorithm.compute(); - masterAlgorithm.finalizeCompute(); - - /* Retrieve the algorithm results */ - covariance::ResultPtr result = masterAlgorithm.getResult(); - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - std::chrono::duration_cast(t2 - t1).count(); - std::cout << "Correlation (native): master step took " << duration << " secs" - << std::endl; - - /* Print the results */ - printNumericTable(result->get(covariance::correlation), - "Correlation first 20 columns of " - "correlation matrix:", - 1, 20); - // Return all covariance & mean - jclass clazz = env->GetObjectClass(resultObj); - - // Get Field references - jfieldID correlationNumericTableField = - env->GetFieldID(clazz, "correlationNumericTable", "J"); - - NumericTablePtr *correlation = - new NumericTablePtr(result->get(covariance::correlation)); - - env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); - + auto t1 = std::chrono::high_resolution_clock::now(); + /* Create an algorithm to compute covariance on the master node */ + covariance::Distributed masterAlgorithm; + + for (size_t i = 0; i < nBlocks; i++) { + /* Deserialize partial results from step 1 */ + OutputDataArchive dataArch(serializedData.get() + + perNodeArchLength * i, + perNodeArchLength); + + covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult()); + dataForStep2FromStep1->deserialize(dataArch); + + /* Set local partial results as input for the master-node algorithm + */ + masterAlgorithm.input.add(covariance::partialResults, + dataForStep2FromStep1); } - return NumericTablePtr(); + /* Set the parameter to choose the type of the output matrix */ + masterAlgorithm.parameter.outputMatrixType = covariance::correlationMatrix; + + /* Merge and finalizeCompute covariance decomposition on the master node */ + masterAlgorithm.compute(); + masterAlgorithm.finalizeCompute(); + + /* Retrieve the algorithm results */ + covariance::ResultPtr result = masterAlgorithm.getResult(); + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + std::chrono::duration_cast(t2 - t1).count(); + std::cout << "Correlation (native): master step took " << duration << " secs" + << std::endl; + + /* Print the results */ + printNumericTable(result->get(covariance::correlation), + "Correlation first 20 columns of " + "correlation matrix:", + 1, 20); + // Return all covariance & mean + jclass clazz = env->GetObjectClass(resultObj); + + // Get Field references + jfieldID correlationNumericTableField = + env->GetFieldID(clazz, "correlationNumericTable", "J"); + + NumericTablePtr *correlation = + new NumericTablePtr(result->get(covariance::correlation)); + + env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation); + + } } /* From bf6c00cfee4d6eb06977f87674fee9288bb5bfa4 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Mon, 11 Oct 2021 15:20:38 +0800 Subject: [PATCH 15/20] 1.used ml.Matrix for Correlation, reduce a step to mllib.Matrix convert to ml.Matrix. Signed-off-by: minmingzhu --- .../org/apache/spark/ml/stat/CorrelationDALImpl.scala | 7 +++---- .../main/scala/org/apache/spark/ml/stat/Correlation.scala | 4 ++-- .../main/scala/org/apache/spark/ml/stat/Correlation.scala | 4 ++-- .../main/scala/org/apache/spark/ml/stat/Correlation.scala | 4 ++-- .../main/scala/org/apache/spark/ml/stat/Correlation.scala | 4 ++-- 5 files changed, 11 insertions(+), 12 deletions(-) diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala index 64bc74923..0a068cebd 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala @@ -20,10 +20,9 @@ import java.util.Arrays import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.util.{OneCCL, OneDAL, Utils} -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg.{Vector, Matrix} import org.apache.spark.ml.util.Utils.getOneCCLIPPort import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.{Matrix} class CorrelationDALImpl( val executorNum: Int, @@ -32,7 +31,7 @@ class CorrelationDALImpl( def computeCorrelationMatrix(data: RDD[Vector]): Matrix = { - val kvsIPPort = getOneCCLIPPort(data) + val kvsIPPort = getOneCCLIPPort(data) val sparkContext = data.sparkContext val useGPU = sparkContext.conf.getBoolean("spark.oap.mllib.useGPU", false) @@ -74,7 +73,7 @@ class CorrelationDALImpl( val ret = if (OneCCL.isRoot()) { val convResultStartTime = System.nanoTime() - val correlationNumericTable = OneDAL.numericTableToOldMatrix(OneDAL.makeNumericTable(result.correlationNumericTable)) + val correlationNumericTable = OneDAL.numericTableToMatrix(OneDAL.makeNumericTable(result.correlationNumericTable)) val convResultEndTime = System.nanoTime() diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala index 4eadc1e02..5d5ef199c 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -77,11 +77,11 @@ object Correlation { } val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() - val oldM = new CorrelationDALImpl(executor_num, executor_cores) + val matrix = new CorrelationDALImpl(executor_num, executor_cores) .computeCorrelationMatrix(rdd) val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) - dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) } else { val rdd = dataset.select(column).rdd.map { diff --git a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala index 903c3abfc..8affe56de 100644 --- a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -77,11 +77,11 @@ object Correlation { } val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() - val oldM = new CorrelationDALImpl(executor_num, executor_cores) + val matrix = new CorrelationDALImpl(executor_num, executor_cores) .computeCorrelationMatrix(rdd) val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) - dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) } else { val rdd = dataset.select(column).rdd.map { diff --git a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala index 4eadc1e02..5d5ef199c 100644 --- a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -77,11 +77,11 @@ object Correlation { } val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() - val oldM = new CorrelationDALImpl(executor_num, executor_cores) + val matrix = new CorrelationDALImpl(executor_num, executor_cores) .computeCorrelationMatrix(rdd) val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) - dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) } else { val rdd = dataset.select(column).rdd.map { diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala index 15d707328..b1df0334f 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -75,11 +75,11 @@ object Correlation { } val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() - val oldM = new CorrelationDALImpl(executor_num, executor_cores) + val matrix = new CorrelationDALImpl(executor_num, executor_cores) .computeCorrelationMatrix(rdd) val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) - dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) } else { val rdd = dataset.select(column).rdd.map { From e1d8b31986bc936d3bd546a52f7ebe4202852f7e Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Fri, 15 Oct 2021 11:44:29 +0800 Subject: [PATCH 16/20] 1. reformat code style 2. make-up code Signed-off-by: minmingzhu --- examples/correlation/run.sh | 1 - .../examples/ml/CorrelationExample.scala | 2 +- .../src/main/native/CorrelationDALImpl.cpp | 2 - mllib-dal/src/main/native/OneCCL.h | 2 +- .../spark/ml/stat/CorrelationDALImpl.scala | 56 +++++++++---------- 5 files changed, 29 insertions(+), 34 deletions(-) diff --git a/examples/correlation/run.sh b/examples/correlation/run.sh index b7bec0391..5505d60c1 100644 --- a/examples/correlation/run.sh +++ b/examples/correlation/run.sh @@ -20,7 +20,6 @@ time $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \ --conf "spark.shuffle.reduceLocality.enabled=false" \ --conf "spark.network.timeout=1200s" \ --conf "spark.task.maxFailures=1" \ - --conf "spark.oap.mllib.enabled=true" \ --conf "spark.eventLog.enabled=true" \ --conf "spark.eventLog.dir=hdfs://beaver:9000/spark-history-server" \ --conf "spark.history.fs.logDirectory=hdfs://beaver:9000/spark-history-server" \ diff --git a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala index b5245b4ba..19b1c253a 100644 --- a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala +++ b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala @@ -57,7 +57,7 @@ object CorrelationExample { val df = data.map(Tuple1.apply).toDF("features").cache() val Row(coeff1: Matrix) = Correlation.corr(df, "features").head - println(s"Pearson correlation matrix:\n $coeff1") + println(s"Pearson correleation matrix:\n $coeff1") spark.stop() } diff --git a/mllib-dal/src/main/native/CorrelationDALImpl.cpp b/mllib-dal/src/main/native/CorrelationDALImpl.cpp index 202341cd4..347f5afda 100644 --- a/mllib-dal/src/main/native/CorrelationDALImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationDALImpl.cpp @@ -77,7 +77,6 @@ static void correlation_compute(JNIEnv *env, ccl::gather((int8_t *)nodeResults, perNodeArchLength, (int8_t *)(serializedData.get()), perNodeArchLength, comm) .wait(); - t2 = std::chrono::high_resolution_clock::now(); duration = @@ -144,7 +143,6 @@ static void correlation_compute(JNIEnv *env, * Method: cCorrelationTrainDAL * Signature: (JJDDIILorg/apache/spark/ml/stat/CorrelationResult;)J */ - JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_stat_CorrelationDALImpl_cCorrelationTrainDAL( JNIEnv *env, jobject obj, jlong pNumTabData, diff --git a/mllib-dal/src/main/native/OneCCL.h b/mllib-dal/src/main/native/OneCCL.h index 5550ce904..8079a3ced 100644 --- a/mllib-dal/src/main/native/OneCCL.h +++ b/mllib-dal/src/main/native/OneCCL.h @@ -36,7 +36,7 @@ event CCL_API gather(const BufferType *sendbuf, int sendcount, send_counts[root_rank] = sendcount; if (comm.rank() == root_rank) - std::fill(recv_counts.begin(), recv_counts.end(), sendcount); + std::fill(recv_counts.begin(), recv_counts.end(), recvcount); return ccl::alltoallv(sendbuf, send_counts, recvbuf, recv_counts, comm); } diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala index 0a068cebd..146c8fcd0 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/stat/CorrelationDALImpl.scala @@ -31,44 +31,42 @@ class CorrelationDALImpl( def computeCorrelationMatrix(data: RDD[Vector]): Matrix = { - val kvsIPPort = getOneCCLIPPort(data) + val kvsIPPort = getOneCCLIPPort(data) - val sparkContext = data.sparkContext - val useGPU = sparkContext.conf.getBoolean("spark.oap.mllib.useGPU", false) + val sparkContext = data.sparkContext + val useGPU = sparkContext.conf.getBoolean("spark.oap.mllib.useGPU", false) - val coalescedTables = OneDAL.rddVectorToMergedTables(data, executorNum) - println(s"executorNum : ${executorNum} ") - println(s"executorCores : ${executorCores} ") + val coalescedTables = OneDAL.rddVectorToMergedTables(data, executorNum) - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => - val gpuIndices = if (useGPU) { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } + val gpuIndices = if (useGPU) { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } val tableArr = table.next() OneCCL.init(executorNum, rank, kvsIPPort) val computeStartTime = System.nanoTime() - val result = new CorrelationResult() - cCorrelationTrainDAL( - tableArr, - executorNum, - executorCores, - useGPU, - gpuIndices, - result - ) + val result = new CorrelationResult() + cCorrelationTrainDAL( + tableArr, + executorNum, + executorCores, + useGPU, + gpuIndices, + result + ) val computeEndTime = System.nanoTime() val durationCompute = (computeEndTime - computeStartTime).toDouble / 1E9 - println(s"CorrelationDAL compute took ${durationCompute} secs") + logInfo(s"CorrelationDAL compute took ${durationCompute} secs") val ret = if (OneCCL.isRoot()) { @@ -79,7 +77,7 @@ class CorrelationDALImpl( val durationCovResult = (convResultEndTime - convResultStartTime).toDouble / 1E9 - println(s"CorrelationDAL result conversion took ${durationCovResult} secs") + logInfo(s"CorrelationDAL result conversion took ${durationCovResult} secs") Iterator(correlationNumericTable) } else { @@ -101,9 +99,9 @@ class CorrelationDALImpl( @native private def cCorrelationTrainDAL(data: Long, - executor_num: Int, - executor_cores: Int, - useGPU: Boolean, - gpuIndices: Array[Int], - result: CorrelationResult): Long + executor_num: Int, + executor_cores: Int, + useGPU: Boolean, + gpuIndices: Array[Int], + result: CorrelationResult): Long } From 89846f72e5f98c155012055d92fcfea0cd118473 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Mon, 18 Oct 2021 13:29:29 +0800 Subject: [PATCH 17/20] check whether data was cached, if no cached, set data to be cached Signed-off-by: minmingzhu --- .../apache/spark/examples/ml/CorrelationExample.scala | 4 ++-- .../scala/org/apache/spark/ml/stat/Correlation.scala | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala index 19b1c253a..26c546edc 100644 --- a/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala +++ b/examples/correlation/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala @@ -55,9 +55,9 @@ object CorrelationExample { Vectors.dense(-9.948613,-8.941953,-8.106389,4.863542,5.852806,-1.659259,6.342504,-8.190106,-3.110330,-7.484658), ) - val df = data.map(Tuple1.apply).toDF("features").cache() + val df = data.map(Tuple1.apply).toDF("features") val Row(coeff1: Matrix) = Correlation.corr(df, "features").head - println(s"Pearson correleation matrix:\n $coeff1") + println(s"Pearson correlation matrix:\n $coeff1") spark.stop() } diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala index 5d5ef199c..5b411011a 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -72,6 +72,10 @@ object Correlation { val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + val handlePersistence = (dataset.storageLevel == StorageLevel.NONE) + if (handlePersistence) { + dataset.persist(StorageLevel.MEMORY_AND_DISK) + } val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } @@ -81,8 +85,11 @@ object Correlation { .computeCorrelationMatrix(rdd) val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) - dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) - + val dataframe = dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) + if (handlePersistence) { + dataset.unpersist() + } + dataframe } else { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) From c7972e0e0fe4aa442546ee55eec301f85d590636 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 19 Oct 2021 13:42:46 +0800 Subject: [PATCH 18/20] update Correlation that check whether data was cached, if no cached, set data to be cached on spark 3.0.1, 3.0.2 and 3.1.1 Signed-off-by: minmingzhu --- .../scala/org/apache/spark/ml/stat/Correlation.scala | 11 +++++++++-- .../scala/org/apache/spark/ml/stat/Correlation.scala | 11 +++++++++-- .../scala/org/apache/spark/ml/stat/Correlation.scala | 11 +++++++++-- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala index 8affe56de..13e1f4993 100644 --- a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -72,6 +72,10 @@ object Correlation { val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + val handlePersistence = (dataset.storageLevel == StorageLevel.NONE) + if (handlePersistence) { + dataset.persist(StorageLevel.MEMORY_AND_DISK) + } val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } @@ -81,8 +85,11 @@ object Correlation { .computeCorrelationMatrix(rdd) val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) - dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) - + val dataframe = dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) + if (handlePersistence) { + dataset.unpersist() + } + dataframe } else { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) diff --git a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala index 5d5ef199c..5b411011a 100644 --- a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -72,6 +72,10 @@ object Correlation { val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + val handlePersistence = (dataset.storageLevel == StorageLevel.NONE) + if (handlePersistence) { + dataset.persist(StorageLevel.MEMORY_AND_DISK) + } val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } @@ -81,8 +85,11 @@ object Correlation { .computeCorrelationMatrix(rdd) val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) - dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) - + val dataframe = dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) + if (handlePersistence) { + dataset.unpersist() + } + dataframe } else { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala index b1df0334f..81e75e64d 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -70,6 +70,10 @@ object Correlation { val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) if (Utils.isOAPEnabled() && isPlatformSupported && method == "pearson") { + val handlePersistence = (dataset.storageLevel == StorageLevel.NONE) + if (handlePersistence) { + dataset.persist(StorageLevel.MEMORY_AND_DISK) + } val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => v } @@ -79,8 +83,11 @@ object Correlation { .computeCorrelationMatrix(rdd) val name = s"$method($column)" val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) - dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) - + val dataframe = dataset.sparkSession.createDataFrame(Seq(Row(matrix)).asJava, schema) + if (handlePersistence) { + dataset.unpersist() + } + dataframe } else { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) From ee8a105ff7a2c1f2845804958810716c24351551 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 19 Oct 2021 15:56:23 +0800 Subject: [PATCH 19/20] 1. add Correlation to CI Signed-off-by: minmingzhu --- examples/correlation/run.sh | 4 ---- examples/run-all-scala.sh | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/examples/correlation/run.sh b/examples/correlation/run.sh index 5505d60c1..43b7c3300 100644 --- a/examples/correlation/run.sh +++ b/examples/correlation/run.sh @@ -20,10 +20,6 @@ time $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \ --conf "spark.shuffle.reduceLocality.enabled=false" \ --conf "spark.network.timeout=1200s" \ --conf "spark.task.maxFailures=1" \ - --conf "spark.eventLog.enabled=true" \ - --conf "spark.eventLog.dir=hdfs://beaver:9000/spark-history-server" \ - --conf "spark.history.fs.logDirectory=hdfs://beaver:9000/spark-history-server" \ - --conf "spark.driver.log.dfsDir=/user/spark/driverLogs" \ --jars $OAP_MLLIB_JAR \ --class $APP_CLASS \ $APP_JAR $DATA_FILE \ diff --git a/examples/run-all-scala.sh b/examples/run-all-scala.sh index 684ef7433..00f9b2129 100755 --- a/examples/run-all-scala.sh +++ b/examples/run-all-scala.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -exampleDirs=(kmeans pca als naive-bayes linear-regression) +exampleDirs=(kmeans pca als naive-bayes linear-regression correlation) for dir in ${exampleDirs[*]} do From 9ed980707b39afffaa7f8a3004fd27008f1d99bc Mon Sep 17 00:00:00 2001 From: minmingzhu <45281494+minmingzhu@users.noreply.github.com> Date: Wed, 20 Oct 2021 16:40:09 +0800 Subject: [PATCH 20/20] Update build-all.sh build correlation --- examples/build-all.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/build-all.sh b/examples/build-all.sh index 0136e603d..ee5aa64e3 100755 --- a/examples/build-all.sh +++ b/examples/build-all.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -exampleDirs=(kmeans pca als naive-bayes linear-regression) +exampleDirs=(kmeans pca als naive-bayes linear-regression correlation) for dir in ${exampleDirs[*]} do