From 318cae1790316faf8d968aad2c006b51feba1b57 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Tue, 2 Feb 2021 17:39:50 +0800 Subject: [PATCH] use spark.oap.mllib.oneccl.kvs.ip to workaround KVS IP hang issue --- .../scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala | 3 ++- .../main/scala/org/apache/spark/ml/feature/PCADALImpl.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index d8829b2c9..31b7e7c75 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -41,6 +41,7 @@ class KMeansDALImpl ( instr.foreach(_.logInfo(s"Processing partitions with $executorNum executors")) val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) + val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) // repartition to executorNum if not enough partitions val dataForConversion = if (data.getNumPartitions < executorNum) { @@ -113,7 +114,7 @@ class KMeansDALImpl ( val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, rank, executorIPAddress) + OneCCL.init(executorNum, rank, kvsIP) val initCentroids = OneDAL.makeNumericTable(centers) val result = new KMeansResult() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 6f9aaa442..33dbe8349 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -47,10 +47,11 @@ class PCADALImpl ( val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum) val executorIPAddress = Utils.sparkFirstExecutorIP(input.sparkContext) + val kvsIP = input.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, rank, executorIPAddress) + OneCCL.init(executorNum, rank, kvsIP) val result = new PCAResult() cPCATrainDAL(