From cf4d65b8b82ee152dff7ee25d631ef5e43cb1a6c Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Mon, 21 Aug 2023 08:28:12 +0000 Subject: [PATCH 1/9] add barrier and used context.partitionId() replace rank Signed-off-by: minmingzhu --- .../oap/mllib/classification/NaiveBayesDALImpl.scala | 10 ++++++---- .../RandomForestClassifierDALImpl.scala | 10 ++++++---- .../intel/oap/mllib/clustering/KMeansDALImpl.scala | 11 ++++++----- .../com/intel/oap/mllib/feature/PCADALImpl.scala | 10 ++++++---- .../intel/oap/mllib/recommendation/ALSDALImpl.scala | 8 +++++--- .../mllib/regression/LinearRegressionDALImpl.scala | 11 ++++++----- .../regression/RandomForestRegressorDALImpl.scala | 12 ++++++------ .../intel/oap/mllib/stat/CorrelationDALImpl.scala | 10 ++++++---- .../com/intel/oap/mllib/stat/SummarizerDALImpl.scala | 10 ++++++---- 9 files changed, 53 insertions(+), 39 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala index 5a274cf7f..04807c51c 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala @@ -18,6 +18,7 @@ package com.intel.oap.mllib.classification import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL} +import org.apache.spark.BarrierTaskContext import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.classification.NaiveBayesModel @@ -50,11 +51,12 @@ class NaiveBayesDALImpl(val uid: String, labelCol, featuresCol, executorNum) } - val results = labeledPointsTables.mapPartitionsWithIndex { + val results = labeledPointsTables.barrier().mapPartitionsWithIndex { case (rank: Int, tables: Iterator[(Long, Long)]) => + val context = BarrierTaskContext.get() val (featureTabAddr, lableTabAddr) = tables.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val computeStartTime = System.nanoTime() @@ -68,7 +70,7 @@ class NaiveBayesDALImpl(val uid: String, println(s"NaiveBayesDAL compute took ${durationCompute} secs") - val ret = if (OneCCL.isRoot()) { + val ret = if (context.partitionId() == 0) { val convResultStartTime = System.nanoTime() val pi = OneDAL.numericTableNx1ToVector(OneDAL.makeNumericTable(result.getPiNumericTable)) @@ -86,7 +88,7 @@ class NaiveBayesDALImpl(val uid: String, } else { Iterator.empty } - + context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index cb3ef3e52..b8ea65b8f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -19,7 +19,7 @@ import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.annotation.Since -import org.apache.spark.TaskContext +import org.apache.spark.{BarrierTaskContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.linalg.{Matrix, Vector} @@ -75,8 +75,9 @@ class RandomForestClassifierDALImpl(val uid: String, rfcTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - val results = labeledPointsTables.mapPartitionsWithIndex { + val results = labeledPointsTables.barrier().mapPartitionsWithIndex { (rank: Int, tables: Iterator[(Long, Long)]) => + val context = BarrierTaskContext.get() val (featureTabAddr, lableTabAddr) = tables.next() val gpuIndices = if (useDevice == "GPU") { @@ -89,7 +90,7 @@ class RandomForestClassifierDALImpl(val uid: String, } else { null } - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val computeStartTime = System.nanoTime() val result = new RandomForestResult val hashmap = cRFClassifierTrainDAL( @@ -117,11 +118,12 @@ class RandomForestClassifierDALImpl(val uid: String, logInfo(s"RandomForestClassifierDAL compute took ${durationCompute} secs") - val ret = if (rank == 0) { + val ret = if (context.partitionId() == 0) { Iterator(hashmap) } else { Iterator.empty } + context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 3a23a1d65..1122d4ceb 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -19,7 +19,7 @@ package com.intel.oap.mllib.clustering import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common -import org.apache.spark.TaskContext +import org.apache.spark.{BarrierTaskContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.util._ @@ -51,7 +51,8 @@ class KMeansDALImpl(var nClusters: Int, kmeansTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(coalescedTables) - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + val results = coalescedTables.barrier().mapPartitionsWithIndex { (rank, table) => + val context = BarrierTaskContext.get() var cCentroids = 0L val result = new KMeansResult() val gpuIndices = if (useDevice == "GPU") { @@ -62,7 +63,7 @@ class KMeansDALImpl(var nClusters: Int, } val tableArr = table.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val initCentroids = if (useDevice == "GPU") { OneDAL.makeHomogenTable(centers, computeDevice).getcObejct() } else { @@ -81,7 +82,7 @@ class KMeansDALImpl(var nClusters: Int, result ) - val ret = if (rank == 0) { + val ret = if (context.partitionId() == 0) { assert(cCentroids != 0) val centerVectors = if (useDevice == "GPU") { OneDAL.homogenTableToVectors(OneDAL.makeHomogenTable(cCentroids), @@ -93,10 +94,10 @@ class KMeansDALImpl(var nClusters: Int, } else { Iterator.empty } + context.barrier() OneCCL.cleanup() ret }.collect() - // Make sure there is only one result from rank 0 assert(results.length == 1) kmeansTimer.record("Training") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 4f0807abd..b86cd6202 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -20,7 +20,7 @@ import java.nio.DoubleBuffer import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable} import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Service, Utils} -import org.apache.spark.TaskContext +import org.apache.spark.{BarrierTaskContext, TaskContext} import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg._ @@ -59,9 +59,10 @@ class PCADALImpl(val k: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + val results = coalescedTables.barrier().mapPartitionsWithIndex { (rank, table) => + val context = BarrierTaskContext.get() val tableArr = table.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val result = new PCAResult() val gpuIndices = if (useDevice == "GPU") { val resources = TaskContext.get().resources() @@ -78,7 +79,7 @@ class PCADALImpl(val k: Int, result ) - val ret = if (rank == 0) { + val ret = if (context.partitionId() == 0) { val principleComponents = if (useDevice == "GPU") { val pcNumericTable = OneDAL.makeHomogenTable(result.getPcNumericTable) getPrincipleComponentsFromOneAPI(pcNumericTable, k, computeDevice) @@ -102,6 +103,7 @@ class PCADALImpl(val k: Int, } else { Iterator.empty } + context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala index f8c781caf..2976a76d3 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala @@ -20,7 +20,7 @@ import com.intel.daal.data_management.data.CSRNumericTable import com.intel.daal.services.DaalContext import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} -import org.apache.spark.Partitioner +import org.apache.spark.{BarrierTaskContext, Partitioner} import org.apache.spark.internal.Logging import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.rdd.RDD @@ -83,8 +83,9 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], .map { p => Rating(p.item, p.user, p.rating) } - .mapPartitionsWithIndex { (rank, iter) => - OneCCL.init(executorNum, rank, kvsIPPort) + .barrier().mapPartitionsWithIndex { (rank, iter) => + val context = BarrierTaskContext.get() + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val rankId = OneCCL.rankID() println("rankId", rankId, "nUsers", nVectors, "nItems", nFeatures) @@ -105,6 +106,7 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], rankId, result ) + context.barrier() Iterator(result) }.cache() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index ee93892b2..bbdd8909f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -19,8 +19,7 @@ package com.intel.oap.mllib.regression import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common -import org.apache.spark.SparkException -import org.apache.spark.TaskContext +import org.apache.spark.{BarrierTaskContext, SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.{DenseVector, Vector} import org.apache.spark.ml.util._ @@ -106,10 +105,11 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } lrTimer.record("Data Convertion") - val results = labeledPointsTables.mapPartitionsWithIndex { + val results = labeledPointsTables.barrier().mapPartitionsWithIndex { case (rank: Int, tables: Iterator[(Long, Long)]) => + val context = BarrierTaskContext.get() val (featureTabAddr, lableTabAddr) = tables.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val result = new LiRResult() val gpuIndices = if (useDevice == "GPU") { @@ -138,7 +138,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, result ) - val ret = if (rank == 0) { + val ret = if (context.partitionId() == 0) { val coefficientArray = if (useDevice == "GPU") { OneDAL.homogenTableToVectors(OneDAL.makeHomogenTable(cbeta), computeDevice) @@ -149,6 +149,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } else { Iterator.empty } + context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index a4e9a0f78..d02c3f655 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -19,7 +19,7 @@ import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.classification.{LearningNode, RandomForestResult} import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common -import org.apache.spark.TaskContext +import org.apache.spark.{BarrierTaskContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.linalg.Matrix @@ -69,10 +69,10 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - val results = labeledPointsTables.mapPartitionsWithIndex { + val results = labeledPointsTables.barrier().mapPartitionsWithIndex { (rank: Int, tables: Iterator[(Long, Long)]) => + val context = BarrierTaskContext.get() val (featureTabAddr, lableTabAddr) = tables.next() - val gpuIndices = if (useDevice == "GPU") { if (isTest) { Array(0) @@ -84,7 +84,7 @@ class RandomForestRegressorDALImpl(val uid: String, null } - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val computeStartTime = System.nanoTime() val result = new RandomForestResult @@ -109,7 +109,7 @@ class RandomForestRegressorDALImpl(val uid: String, logInfo(s"RandomForestRegressorDALImpl compute took ${durationCompute} secs") - val ret = if (rank == 0) { + val ret = if (context.partitionId() == 0) { val convResultStartTime = System.nanoTime() val predictionNumericTable = OneDAL.homogenTableToMatrix( OneDAL.makeHomogenTable(result.getPredictionNumericTable), @@ -124,7 +124,7 @@ class RandomForestRegressorDALImpl(val uid: String, } else { Iterator.empty } - + context.barrier() ret }.collect() rfrTimer.record("Training") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 20362b896..9fb34ff6c 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -19,7 +19,7 @@ package com.intel.oap.mllib.stat import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common -import org.apache.spark.TaskContext +import org.apache.spark.{BarrierTaskContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.{Matrix, Vector} import org.apache.spark.rdd.RDD @@ -46,9 +46,10 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + val results = coalescedTables.barrier().mapPartitionsWithIndex { (rank, table) => + val context = BarrierTaskContext.get() val tableArr = table.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val computeStartTime = System.nanoTime() @@ -74,7 +75,7 @@ class CorrelationDALImpl( logInfo(s"CorrelationDAL compute took ${durationCompute} secs") - val ret = if (rank == 0) { + val ret = if (context.partitionId() == 0) { val convResultStartTime = System.nanoTime() val correlationNumericTable = if (useDevice == "GPU") { OneDAL.homogenTableToMatrix(OneDAL.makeHomogenTable(result.getCorrelationNumericTable), @@ -92,6 +93,7 @@ class CorrelationDALImpl( } else { Iterator.empty } + context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 3f108364c..acb55c871 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.stat import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} -import org.apache.spark.TaskContext +import org.apache.spark.{BarrierTaskContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Vector import org.apache.spark.mllib.linalg.{Vectors => OldVectors} @@ -47,9 +47,10 @@ class SummarizerDALImpl(val executorNum: Int, val kvsIPPort = getOneCCLIPPort(data) - val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => + val results = coalescedTables.barrier().mapPartitionsWithIndex { (rank, table) => + val context = BarrierTaskContext.get() val tableArr = table.next() - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, context.partitionId(), kvsIPPort) val computeStartTime = System.nanoTime() @@ -75,7 +76,7 @@ class SummarizerDALImpl(val executorNum: Int, logInfo(s"SummarizerDAL compute took ${durationCompute} secs") - val ret = if (rank == 0) { + val ret = if (context.partitionId() == 0) { val convResultStartTime = System.nanoTime() val meanVector = if (useDevice == "GPU") { @@ -117,6 +118,7 @@ class SummarizerDALImpl(val executorNum: Int, } else { Iterator.empty } + context.barrier() OneCCL.cleanup() ret }.collect() From 90292ce2e99da6130abfca020a2b7bb3e7fd30f7 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 22 Aug 2023 03:03:07 +0000 Subject: [PATCH 2/9] rollback Signed-off-by: minmingzhu --- .../src/main/scala/com/intel/oap/mllib/OneDAL.scala | 8 +++++--- .../oap/mllib/classification/NaiveBayesDALImpl.scala | 10 ++++------ .../RandomForestClassifierDALImpl.scala | 10 ++++------ .../intel/oap/mllib/clustering/KMeansDALImpl.scala | 11 +++++------ .../com/intel/oap/mllib/feature/PCADALImpl.scala | 10 ++++------ .../intel/oap/mllib/recommendation/ALSDALImpl.scala | 8 +++----- .../mllib/regression/LinearRegressionDALImpl.scala | 11 +++++------ .../regression/RandomForestRegressorDALImpl.scala | 12 ++++++------ .../intel/oap/mllib/stat/CorrelationDALImpl.scala | 10 ++++------ .../com/intel/oap/mllib/stat/SummarizerDALImpl.scala | 10 ++++------ 10 files changed, 44 insertions(+), 56 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index be4b8aa00..e0e86e7e9 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -588,14 +588,16 @@ object OneDAL { logger.info(s"Processing partitions with $executorNum executors") val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1) + val barrierRDD = data.barrier()mapPartitions(iter => iter) + // Repartition to executorNum if not enough partitions - val dataForConversion = if (data.getNumPartitions < executorNum) { + val dataForConversion = if (barrierRDD.getNumPartitions < executorNum) { logger.info(s"Repartition to executorNum if not enough partitions") - val reData = data.repartition(executorNum).setName("RepartitionedRDD") + val reData = barrierRDD.repartition(executorNum).setName("RepartitionedRDD") reData.cache().count() reData } else { - data + barrierRDD } // Get dimensions for each partition diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala index 04807c51c..5a274cf7f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala @@ -18,7 +18,6 @@ package com.intel.oap.mllib.classification import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL} -import org.apache.spark.BarrierTaskContext import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.classification.NaiveBayesModel @@ -51,12 +50,11 @@ class NaiveBayesDALImpl(val uid: String, labelCol, featuresCol, executorNum) } - val results = labeledPointsTables.barrier().mapPartitionsWithIndex { + val results = labeledPointsTables.mapPartitionsWithIndex { case (rank: Int, tables: Iterator[(Long, Long)]) => - val context = BarrierTaskContext.get() val (featureTabAddr, lableTabAddr) = tables.next() - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) val computeStartTime = System.nanoTime() @@ -70,7 +68,7 @@ class NaiveBayesDALImpl(val uid: String, println(s"NaiveBayesDAL compute took ${durationCompute} secs") - val ret = if (context.partitionId() == 0) { + val ret = if (OneCCL.isRoot()) { val convResultStartTime = System.nanoTime() val pi = OneDAL.numericTableNx1ToVector(OneDAL.makeNumericTable(result.getPiNumericTable)) @@ -88,7 +86,7 @@ class NaiveBayesDALImpl(val uid: String, } else { Iterator.empty } - context.barrier() + OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index b8ea65b8f..cb3ef3e52 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -19,7 +19,7 @@ import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.annotation.Since -import org.apache.spark.{BarrierTaskContext, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.linalg.{Matrix, Vector} @@ -75,9 +75,8 @@ class RandomForestClassifierDALImpl(val uid: String, rfcTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - val results = labeledPointsTables.barrier().mapPartitionsWithIndex { + val results = labeledPointsTables.mapPartitionsWithIndex { (rank: Int, tables: Iterator[(Long, Long)]) => - val context = BarrierTaskContext.get() val (featureTabAddr, lableTabAddr) = tables.next() val gpuIndices = if (useDevice == "GPU") { @@ -90,7 +89,7 @@ class RandomForestClassifierDALImpl(val uid: String, } else { null } - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) val computeStartTime = System.nanoTime() val result = new RandomForestResult val hashmap = cRFClassifierTrainDAL( @@ -118,12 +117,11 @@ class RandomForestClassifierDALImpl(val uid: String, logInfo(s"RandomForestClassifierDAL compute took ${durationCompute} secs") - val ret = if (context.partitionId() == 0) { + val ret = if (rank == 0) { Iterator(hashmap) } else { Iterator.empty } - context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 1122d4ceb..3a23a1d65 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -19,7 +19,7 @@ package com.intel.oap.mllib.clustering import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common -import org.apache.spark.{BarrierTaskContext, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.util._ @@ -51,8 +51,7 @@ class KMeansDALImpl(var nClusters: Int, kmeansTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(coalescedTables) - val results = coalescedTables.barrier().mapPartitionsWithIndex { (rank, table) => - val context = BarrierTaskContext.get() + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => var cCentroids = 0L val result = new KMeansResult() val gpuIndices = if (useDevice == "GPU") { @@ -63,7 +62,7 @@ class KMeansDALImpl(var nClusters: Int, } val tableArr = table.next() - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) val initCentroids = if (useDevice == "GPU") { OneDAL.makeHomogenTable(centers, computeDevice).getcObejct() } else { @@ -82,7 +81,7 @@ class KMeansDALImpl(var nClusters: Int, result ) - val ret = if (context.partitionId() == 0) { + val ret = if (rank == 0) { assert(cCentroids != 0) val centerVectors = if (useDevice == "GPU") { OneDAL.homogenTableToVectors(OneDAL.makeHomogenTable(cCentroids), @@ -94,10 +93,10 @@ class KMeansDALImpl(var nClusters: Int, } else { Iterator.empty } - context.barrier() OneCCL.cleanup() ret }.collect() + // Make sure there is only one result from rank 0 assert(results.length == 1) kmeansTimer.record("Training") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index b86cd6202..4f0807abd 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -20,7 +20,7 @@ import java.nio.DoubleBuffer import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable} import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Service, Utils} -import org.apache.spark.{BarrierTaskContext, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg._ @@ -59,10 +59,9 @@ class PCADALImpl(val k: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") - val results = coalescedTables.barrier().mapPartitionsWithIndex { (rank, table) => - val context = BarrierTaskContext.get() + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) val result = new PCAResult() val gpuIndices = if (useDevice == "GPU") { val resources = TaskContext.get().resources() @@ -79,7 +78,7 @@ class PCADALImpl(val k: Int, result ) - val ret = if (context.partitionId() == 0) { + val ret = if (rank == 0) { val principleComponents = if (useDevice == "GPU") { val pcNumericTable = OneDAL.makeHomogenTable(result.getPcNumericTable) getPrincipleComponentsFromOneAPI(pcNumericTable, k, computeDevice) @@ -103,7 +102,6 @@ class PCADALImpl(val k: Int, } else { Iterator.empty } - context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala index 2976a76d3..f8c781caf 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala @@ -20,7 +20,7 @@ import com.intel.daal.data_management.data.CSRNumericTable import com.intel.daal.services.DaalContext import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} -import org.apache.spark.{BarrierTaskContext, Partitioner} +import org.apache.spark.Partitioner import org.apache.spark.internal.Logging import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.rdd.RDD @@ -83,9 +83,8 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], .map { p => Rating(p.item, p.user, p.rating) } - .barrier().mapPartitionsWithIndex { (rank, iter) => - val context = BarrierTaskContext.get() - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + .mapPartitionsWithIndex { (rank, iter) => + OneCCL.init(executorNum, rank, kvsIPPort) val rankId = OneCCL.rankID() println("rankId", rankId, "nUsers", nVectors, "nItems", nFeatures) @@ -106,7 +105,6 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], rankId, result ) - context.barrier() Iterator(result) }.cache() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index bbdd8909f..ee93892b2 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -19,7 +19,8 @@ package com.intel.oap.mllib.regression import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common -import org.apache.spark.{BarrierTaskContext, SparkException, TaskContext} +import org.apache.spark.SparkException +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.{DenseVector, Vector} import org.apache.spark.ml.util._ @@ -105,11 +106,10 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } lrTimer.record("Data Convertion") - val results = labeledPointsTables.barrier().mapPartitionsWithIndex { + val results = labeledPointsTables.mapPartitionsWithIndex { case (rank: Int, tables: Iterator[(Long, Long)]) => - val context = BarrierTaskContext.get() val (featureTabAddr, lableTabAddr) = tables.next() - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) val result = new LiRResult() val gpuIndices = if (useDevice == "GPU") { @@ -138,7 +138,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, result ) - val ret = if (context.partitionId() == 0) { + val ret = if (rank == 0) { val coefficientArray = if (useDevice == "GPU") { OneDAL.homogenTableToVectors(OneDAL.makeHomogenTable(cbeta), computeDevice) @@ -149,7 +149,6 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } else { Iterator.empty } - context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index d02c3f655..a4e9a0f78 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -19,7 +19,7 @@ import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.classification.{LearningNode, RandomForestResult} import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common -import org.apache.spark.{BarrierTaskContext, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.linalg.Matrix @@ -69,10 +69,10 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - val results = labeledPointsTables.barrier().mapPartitionsWithIndex { + val results = labeledPointsTables.mapPartitionsWithIndex { (rank: Int, tables: Iterator[(Long, Long)]) => - val context = BarrierTaskContext.get() val (featureTabAddr, lableTabAddr) = tables.next() + val gpuIndices = if (useDevice == "GPU") { if (isTest) { Array(0) @@ -84,7 +84,7 @@ class RandomForestRegressorDALImpl(val uid: String, null } - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) val computeStartTime = System.nanoTime() val result = new RandomForestResult @@ -109,7 +109,7 @@ class RandomForestRegressorDALImpl(val uid: String, logInfo(s"RandomForestRegressorDALImpl compute took ${durationCompute} secs") - val ret = if (context.partitionId() == 0) { + val ret = if (rank == 0) { val convResultStartTime = System.nanoTime() val predictionNumericTable = OneDAL.homogenTableToMatrix( OneDAL.makeHomogenTable(result.getPredictionNumericTable), @@ -124,7 +124,7 @@ class RandomForestRegressorDALImpl(val uid: String, } else { Iterator.empty } - context.barrier() + ret }.collect() rfrTimer.record("Training") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 9fb34ff6c..20362b896 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -19,7 +19,7 @@ package com.intel.oap.mllib.stat import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common -import org.apache.spark.{BarrierTaskContext, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.{Matrix, Vector} import org.apache.spark.rdd.RDD @@ -46,10 +46,9 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) - val results = coalescedTables.barrier().mapPartitionsWithIndex { (rank, table) => - val context = BarrierTaskContext.get() + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) val computeStartTime = System.nanoTime() @@ -75,7 +74,7 @@ class CorrelationDALImpl( logInfo(s"CorrelationDAL compute took ${durationCompute} secs") - val ret = if (context.partitionId() == 0) { + val ret = if (rank == 0) { val convResultStartTime = System.nanoTime() val correlationNumericTable = if (useDevice == "GPU") { OneDAL.homogenTableToMatrix(OneDAL.makeHomogenTable(result.getCorrelationNumericTable), @@ -93,7 +92,6 @@ class CorrelationDALImpl( } else { Iterator.empty } - context.barrier() OneCCL.cleanup() ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index acb55c871..3f108364c 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.stat import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} -import org.apache.spark.{BarrierTaskContext, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Vector import org.apache.spark.mllib.linalg.{Vectors => OldVectors} @@ -47,10 +47,9 @@ class SummarizerDALImpl(val executorNum: Int, val kvsIPPort = getOneCCLIPPort(data) - val results = coalescedTables.barrier().mapPartitionsWithIndex { (rank, table) => - val context = BarrierTaskContext.get() + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, context.partitionId(), kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort) val computeStartTime = System.nanoTime() @@ -76,7 +75,7 @@ class SummarizerDALImpl(val executorNum: Int, logInfo(s"SummarizerDAL compute took ${durationCompute} secs") - val ret = if (context.partitionId() == 0) { + val ret = if (rank == 0) { val convResultStartTime = System.nanoTime() val meanVector = if (useDevice == "GPU") { @@ -118,7 +117,6 @@ class SummarizerDALImpl(val executorNum: Int, } else { Iterator.empty } - context.barrier() OneCCL.cleanup() ret }.collect() From 999487e197b9aba420b3c5e202234976fd6c14a0 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 22 Aug 2023 03:27:23 +0000 Subject: [PATCH 3/9] add barrierRDD Signed-off-by: minmingzhu --- .../src/main/scala/com/intel/oap/mllib/OneDAL.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index e0e86e7e9..14a79a045 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -430,7 +430,8 @@ object OneDAL { // Filter out empty partitions, if there is no such rdd, coalesce will report an error // "No partitions or no locations for partitions found". // TODO: ML-312: Improve ExecutorInProcessCoalescePartitioner - val nonEmptyPartitions = dataForConversion.select(labelCol, featuresCol).toDF().rdd.mapPartitionsWithIndex { + val nonEmptyPartitions = dataForConversion.select(labelCol, featuresCol).toDF().rdd. + barrier().mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) => Iterator(Tuple3(partitionDims(index)._1, index, it)) }.filter { _._1 > 0 @@ -680,12 +681,14 @@ object OneDAL { require(executorNum > 0) logger.info(s"Processing partitions with $executorNum executors") + val barrierRDD = vectors.barrier()mapPartitions(iter => iter) + // Repartition to executorNum if not enough partitions val dataForConversion = if (vectors.getNumPartitions < executorNum) { - vectors.repartition(executorNum).setName("Repartitioned for conversion").cache() + barrierRDD.repartition(executorNum).setName("Repartitioned for conversion").cache() } else { - vectors + barrierRDD } // Get dimensions for each partition From a84c33e4e1032cb8d7529dd616cfbafc438b87d8 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 22 Aug 2023 05:54:29 +0000 Subject: [PATCH 4/9] update Signed-off-by: minmingzhu --- .../main/scala/com/intel/oap/mllib/OneDAL.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index 14a79a045..123e7f3e4 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -409,29 +409,29 @@ object OneDAL { val spark = SparkSession.active import spark.implicits._ - val labeledPointsRDD = labeledPoints.rdd + val labeledPointsRDD = labeledPoints.select(labelCol, featuresCol).toDF().rdd + .barrier().mapPartitions(iter => iter) // Repartition to executorNum if not enough partitions val dataForConversion = if (labeledPointsRDD.getNumPartitions < executorNum) { logger.info(s"Repartition to executorNum if not enough partitions") - val rePartitions = labeledPoints.repartition(executorNum).cache() + val rePartitions = labeledPointsRDD.repartition(executorNum).cache() rePartitions.count() rePartitions } else { - labeledPoints + labeledPointsRDD } // Get dimensions for each partition - val partitionDims = Utils.getPartitionDims(dataForConversion.select(featuresCol).rdd.map{ row => - val vector = row.getAs[Vector](0) + val partitionDims = Utils.getPartitionDims(dataForConversion.map{ row => + val vector = row.getAs[Vector](1) vector }) // Filter out empty partitions, if there is no such rdd, coalesce will report an error // "No partitions or no locations for partitions found". // TODO: ML-312: Improve ExecutorInProcessCoalescePartitioner - val nonEmptyPartitions = dataForConversion.select(labelCol, featuresCol).toDF().rdd. - barrier().mapPartitionsWithIndex { + val nonEmptyPartitions = dataForConversion.mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) => Iterator(Tuple3(partitionDims(index)._1, index, it)) }.filter { _._1 > 0 @@ -589,7 +589,7 @@ object OneDAL { logger.info(s"Processing partitions with $executorNum executors") val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1) - val barrierRDD = data.barrier()mapPartitions(iter => iter) + val barrierRDD = data.barrier().mapPartitions(iter => iter) // Repartition to executorNum if not enough partitions val dataForConversion = if (barrierRDD.getNumPartitions < executorNum) { From d99bb81bc011aec32c6c3b1ff073f9029ed67d21 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Tue, 22 Aug 2023 06:48:03 +0000 Subject: [PATCH 5/9] update Signed-off-by: minmingzhu --- mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala | 8 +++----- .../com/intel/oap/mllib/clustering/KMeansDALImpl.scala | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index 123e7f3e4..e7f03f17d 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -589,16 +589,14 @@ object OneDAL { logger.info(s"Processing partitions with $executorNum executors") val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1) - val barrierRDD = data.barrier().mapPartitions(iter => iter) - // Repartition to executorNum if not enough partitions - val dataForConversion = if (barrierRDD.getNumPartitions < executorNum) { + val dataForConversion = if (data.getNumPartitions < executorNum) { logger.info(s"Repartition to executorNum if not enough partitions") - val reData = barrierRDD.repartition(executorNum).setName("RepartitionedRDD") + val reData = data.repartition(executorNum).setName("RepartitionedRDD") reData.cache().count() reData } else { - barrierRDD + data } // Get dimensions for each partition diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 3a23a1d65..4f84de262 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -95,7 +95,7 @@ class KMeansDALImpl(var nClusters: Int, } OneCCL.cleanup() ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() // Make sure there is only one result from rank 0 assert(results.length == 1) From d71fbc311d52793624f4783ec1a997cb17b412f0 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Wed, 23 Aug 2023 08:32:33 +0000 Subject: [PATCH 6/9] update barrier mode Signed-off-by: minmingzhu --- .../scala/com/intel/oap/mllib/OneDAL.scala | 19 ++++++++----------- .../oap/mllib/clustering/KMeansDALImpl.scala | 8 ++++---- .../intel/oap/mllib/feature/PCADALImpl.scala | 6 ++++-- .../oap/mllib/recommendation/ALSDALImpl.scala | 3 ++- .../oap/mllib/stat/CorrelationDALImpl.scala | 5 +++-- .../oap/mllib/stat/SummarizerDALImpl.scala | 5 +++-- .../ml/clustering/MLlibKMeansSuite.scala | 1 + .../spark/ml/feature/MLlibPCASuite.scala | 2 ++ 8 files changed, 27 insertions(+), 22 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index e7f03f17d..be4b8aa00 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -409,29 +409,28 @@ object OneDAL { val spark = SparkSession.active import spark.implicits._ - val labeledPointsRDD = labeledPoints.select(labelCol, featuresCol).toDF().rdd - .barrier().mapPartitions(iter => iter) + val labeledPointsRDD = labeledPoints.rdd // Repartition to executorNum if not enough partitions val dataForConversion = if (labeledPointsRDD.getNumPartitions < executorNum) { logger.info(s"Repartition to executorNum if not enough partitions") - val rePartitions = labeledPointsRDD.repartition(executorNum).cache() + val rePartitions = labeledPoints.repartition(executorNum).cache() rePartitions.count() rePartitions } else { - labeledPointsRDD + labeledPoints } // Get dimensions for each partition - val partitionDims = Utils.getPartitionDims(dataForConversion.map{ row => - val vector = row.getAs[Vector](1) + val partitionDims = Utils.getPartitionDims(dataForConversion.select(featuresCol).rdd.map{ row => + val vector = row.getAs[Vector](0) vector }) // Filter out empty partitions, if there is no such rdd, coalesce will report an error // "No partitions or no locations for partitions found". // TODO: ML-312: Improve ExecutorInProcessCoalescePartitioner - val nonEmptyPartitions = dataForConversion.mapPartitionsWithIndex { + val nonEmptyPartitions = dataForConversion.select(labelCol, featuresCol).toDF().rdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) => Iterator(Tuple3(partitionDims(index)._1, index, it)) }.filter { _._1 > 0 @@ -679,14 +678,12 @@ object OneDAL { require(executorNum > 0) logger.info(s"Processing partitions with $executorNum executors") - val barrierRDD = vectors.barrier()mapPartitions(iter => iter) - // Repartition to executorNum if not enough partitions val dataForConversion = if (vectors.getNumPartitions < executorNum) { - barrierRDD.repartition(executorNum).setName("Repartitioned for conversion").cache() + vectors.repartition(executorNum).setName("Repartitioned for conversion").cache() } else { - barrierRDD + vectors } // Get dimensions for each partition diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 4f84de262..4ba42fb45 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -42,11 +42,11 @@ class KMeansDALImpl(var nClusters: Int, val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) kmeansTimer.record("Preprocessing") - + val barrierRDD = data.barrier().mapPartitions(iter => iter) val coalescedTables = if (useDevice == "GPU") { - OneDAL.coalesceVectorsToHomogenTables(data, executorNum, computeDevice) + OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, computeDevice) } else { - OneDAL.coalesceVectorsToNumericTables(data, executorNum) + OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum) } kmeansTimer.record("Data Convertion") @@ -95,7 +95,7 @@ class KMeansDALImpl(var nClusters: Int, } OneCCL.cleanup() ret - }.barrier().mapPartitions(iter => iter).collect() + }.collect() // Make sure there is only one result from rank 0 assert(results.length == 1) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 4f0807abd..1fddc55e2 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -45,16 +45,18 @@ class PCADALImpl(val k: Int, def train(data: RDD[Vector]): PCADALModel = { val pcaTimer = new Utils.AlgoTimeMetrics("PCA") val normalizedData = normalizeData(data) + val barrierRDD = normalizedData.barrier().mapPartitions(iter => iter) + val sparkContext = normalizedData.sparkContext val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) pcaTimer.record("Preprocessing") val coalescedTables = if (useDevice == "GPU") { - OneDAL.coalesceVectorsToHomogenTables(normalizedData, executorNum, + OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, computeDevice) } else { - OneDAL.coalesceVectorsToNumericTables(normalizedData, executorNum) + OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum) } val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala index f8c781caf..002afa5a4 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala @@ -72,8 +72,9 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], logInfo(s"ALSDAL fit using $executorNum Executors " + s"for $nVectors vectors and $nFeatures features") + val barrierRDD = data.barrier().mapPartitions(iter => iter) - val numericTables = data.repartition(executorNum) + val numericTables = barrierRDD.repartition(executorNum) .setName("Repartitioned for conversion").cache() val kvsIPPort = getOneCCLIPPort(numericTables) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 20362b896..5823c84e8 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -35,12 +35,13 @@ class CorrelationDALImpl( val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) corTimer.record("Preprocessing") + val barrierRDD = data.barrier().mapPartitions(iter => iter) val coalescedTables = if (useDevice == "GPU") { - OneDAL.coalesceVectorsToHomogenTables(data, executorNum, + OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, computeDevice) } else { - OneDAL.coalesceVectorsToNumericTables(data, executorNum) + OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum) } corTimer.record("Data Convertion") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 3f108364c..d74356c81 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -36,12 +36,13 @@ class SummarizerDALImpl(val executorNum: Int, val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) sumTimer.record("Preprocessing") + val barrierRDD = data.barrier().mapPartitions(iter => iter) val coalescedTables = if (useDevice == "GPU") { - OneDAL.coalesceVectorsToHomogenTables(data, executorNum, + OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, computeDevice) } else { - OneDAL.coalesceVectorsToNumericTables(data, executorNum) + OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum) } sumTimer.record("Data Convertion") diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala index 5fac9df0d..198553fb5 100644 --- a/mllib-dal/src/test/scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala @@ -47,6 +47,7 @@ class MLlibKMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWri override def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.oap.mllib.device", TestCommon.getComputeDevice.toString) + conf.set("spark.driver.bindAddress", "10.239.34.1"); } test("default parameters") { diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala index ee0990802..db92afd24 100644 --- a/mllib-dal/src/test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala @@ -32,6 +32,8 @@ class MLlibPCASuite extends MLTest with DefaultReadWriteTest { override def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.oap.mllib.device", TestCommon.getComputeDevice.toString) + conf.set("spark.driver.bindAddress", "10.239.34.1"); + } test("params") { From 03b024633cb92a7c45573d5bdb3366b94a01fff4 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Fri, 25 Aug 2023 07:45:26 +0000 Subject: [PATCH 7/9] update Signed-off-by: minmingzhu --- .../oap/mllib/classification/NaiveBayesDALImpl.scala | 2 +- .../classification/RandomForestClassifierDALImpl.scala | 2 +- .../com/intel/oap/mllib/clustering/KMeansDALImpl.scala | 8 ++++---- .../scala/com/intel/oap/mllib/feature/PCADALImpl.scala | 7 +++---- .../com/intel/oap/mllib/recommendation/ALSDALImpl.scala | 7 +++---- .../oap/mllib/regression/LinearRegressionDALImpl.scala | 2 +- .../mllib/regression/RandomForestRegressorDALImpl.scala | 2 +- .../com/intel/oap/mllib/stat/CorrelationDALImpl.scala | 7 +++---- .../com/intel/oap/mllib/stat/SummarizerDALImpl.scala | 7 +++---- 9 files changed, 20 insertions(+), 24 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala index 5a274cf7f..b497ba1b5 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala @@ -89,7 +89,7 @@ class NaiveBayesDALImpl(val uid: String, OneCCL.cleanup() ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() // Make sure there is only one result from rank 0 assert(results.length == 1) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index cb3ef3e52..772b1b281 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -124,7 +124,7 @@ class RandomForestClassifierDALImpl(val uid: String, } OneCCL.cleanup() ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() rfcTimer.record("Training") rfcTimer.print() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 4ba42fb45..4f84de262 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -42,11 +42,11 @@ class KMeansDALImpl(var nClusters: Int, val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) kmeansTimer.record("Preprocessing") - val barrierRDD = data.barrier().mapPartitions(iter => iter) + val coalescedTables = if (useDevice == "GPU") { - OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, computeDevice) + OneDAL.coalesceVectorsToHomogenTables(data, executorNum, computeDevice) } else { - OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum) + OneDAL.coalesceVectorsToNumericTables(data, executorNum) } kmeansTimer.record("Data Convertion") @@ -95,7 +95,7 @@ class KMeansDALImpl(var nClusters: Int, } OneCCL.cleanup() ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() // Make sure there is only one result from rank 0 assert(results.length == 1) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 1fddc55e2..2133e64e4 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -45,7 +45,6 @@ class PCADALImpl(val k: Int, def train(data: RDD[Vector]): PCADALModel = { val pcaTimer = new Utils.AlgoTimeMetrics("PCA") val normalizedData = normalizeData(data) - val barrierRDD = normalizedData.barrier().mapPartitions(iter => iter) val sparkContext = normalizedData.sparkContext val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) @@ -53,10 +52,10 @@ class PCADALImpl(val k: Int, pcaTimer.record("Preprocessing") val coalescedTables = if (useDevice == "GPU") { - OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, + OneDAL.coalesceVectorsToHomogenTables(normalizedData, executorNum, computeDevice) } else { - OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum) + OneDAL.coalesceVectorsToNumericTables(normalizedData, executorNum) } val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") @@ -106,7 +105,7 @@ class PCADALImpl(val k: Int, } OneCCL.cleanup() ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() pcaTimer.record("Training") pcaTimer.print() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala index 002afa5a4..78783c566 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala @@ -72,9 +72,8 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], logInfo(s"ALSDAL fit using $executorNum Executors " + s"for $nVectors vectors and $nFeatures features") - val barrierRDD = data.barrier().mapPartitions(iter => iter) - val numericTables = barrierRDD.repartition(executorNum) + val numericTables = data.repartition(executorNum) .setName("Repartitioned for conversion").cache() val kvsIPPort = getOneCCLIPPort(numericTables) @@ -107,7 +106,7 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], result ) Iterator(result) - }.cache() + }.cache().barrier().mapPartitions(iter => iter) val usersFactorsRDD = results .mapPartitionsWithIndex { (index: Int, partiton: Iterator[ALSResult]) => @@ -128,7 +127,7 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], }.toIterator } ret - }.setName("userFactors").cache() + }.setName("userFactors").cache().barrier().mapPartitions(iter => iter) val itemsFactorsRDD = results .mapPartitionsWithIndex { (index: Int, partiton: Iterator[ALSResult]) => diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index ee93892b2..067f0e3dd 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -151,7 +151,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } OneCCL.cleanup() ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() // Make sure there is only one result from rank 0 assert(results.length == 1) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index a4e9a0f78..1de452cf8 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -126,7 +126,7 @@ class RandomForestRegressorDALImpl(val uid: String, } ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() rfrTimer.record("Training") rfrTimer.print() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 5823c84e8..96ea7a61b 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -35,13 +35,12 @@ class CorrelationDALImpl( val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) corTimer.record("Preprocessing") - val barrierRDD = data.barrier().mapPartitions(iter => iter) val coalescedTables = if (useDevice == "GPU") { - OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, + OneDAL.coalesceVectorsToHomogenTables(data, executorNum, computeDevice) } else { - OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum) + OneDAL.coalesceVectorsToNumericTables(data, executorNum) } corTimer.record("Data Convertion") @@ -95,7 +94,7 @@ class CorrelationDALImpl( } OneCCL.cleanup() ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() corTimer.record("Training") corTimer.print() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index d74356c81..0b0d97520 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -36,13 +36,12 @@ class SummarizerDALImpl(val executorNum: Int, val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) sumTimer.record("Preprocessing") - val barrierRDD = data.barrier().mapPartitions(iter => iter) val coalescedTables = if (useDevice == "GPU") { - OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, + OneDAL.coalesceVectorsToHomogenTables(data, executorNum, computeDevice) } else { - OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum) + OneDAL.coalesceVectorsToNumericTables(data, executorNum) } sumTimer.record("Data Convertion") @@ -120,7 +119,7 @@ class SummarizerDALImpl(val executorNum: Int, } OneCCL.cleanup() ret - }.collect() + }.barrier().mapPartitions(iter => iter).collect() sumTimer.record("Training") sumTimer.print() From 29d514e6928552f6409bc08ee4ace77e1e7d1663 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Fri, 25 Aug 2023 08:05:45 +0000 Subject: [PATCH 8/9] update Signed-off-by: minmingzhu --- .../scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala | 1 - .../test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala | 2 -- 2 files changed, 3 deletions(-) diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala index 198553fb5..5fac9df0d 100644 --- a/mllib-dal/src/test/scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/clustering/MLlibKMeansSuite.scala @@ -47,7 +47,6 @@ class MLlibKMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWri override def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.oap.mllib.device", TestCommon.getComputeDevice.toString) - conf.set("spark.driver.bindAddress", "10.239.34.1"); } test("default parameters") { diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala index db92afd24..ee0990802 100644 --- a/mllib-dal/src/test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/feature/MLlibPCASuite.scala @@ -32,8 +32,6 @@ class MLlibPCASuite extends MLTest with DefaultReadWriteTest { override def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.oap.mllib.device", TestCommon.getComputeDevice.toString) - conf.set("spark.driver.bindAddress", "10.239.34.1"); - } test("params") { From 8281bbd03d6b1b4472d4b5687486a3e57e7ebcb0 Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Fri, 25 Aug 2023 08:50:01 +0000 Subject: [PATCH 9/9] update Signed-off-by: minmingzhu --- .../intel/oap/mllib/clustering/KMeansDALImpl.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 4f84de262..6f4b55547 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -95,16 +95,18 @@ class KMeansDALImpl(var nClusters: Int, } OneCCL.cleanup() ret - }.barrier().mapPartitions(iter => iter).collect() + } + results.count() + val barrierRDD = results.barrier().mapPartitions(iter => iter).collect() // Make sure there is only one result from rank 0 - assert(results.length == 1) + assert(barrierRDD.length == 1) kmeansTimer.record("Training") kmeansTimer.print() - val centerVectors = results(0)._1 - val totalCost = results(0)._2 - val iterationNum = results(0)._3 + val centerVectors = barrierRDD(0)._1 + val totalCost = barrierRDD(0)._2 + val iterationNum = barrierRDD(0)._3 if (iterationNum == maxIterations) { logInfo(s"KMeans reached the max number of iterations: $maxIterations.")