Skip to content

Commit

Permalink
Revert "[ML-33] Optimize oneCCL port detecting (oap-project#34)"
Browse files Browse the repository at this point in the history
This reverts commit c07d70c.
  • Loading branch information
xwu99 committed Mar 19, 2021
1 parent c07d70c commit b6b6274
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 93 deletions.
79 changes: 13 additions & 66 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
#include <sys/socket.h>
#include <unistd.h>

#include <list>
#include <ifaddrs.h>
#include <netdb.h>

#include <oneapi/ccl.hpp>

#include "org_apache_spark_ml_util_OneCCL__.h"
Expand Down Expand Up @@ -116,79 +112,30 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_setEnv
return err;
}

static const int CCL_IP_LEN = 128;
std::list<std::string> local_host_ips;

static int fill_local_host_ip() {
struct ifaddrs *ifaddr, *ifa;
int family = AF_UNSPEC;
char local_ip[CCL_IP_LEN];
if (getifaddrs(&ifaddr) < 0) {
// LOG_ERROR("fill_local_host_ip: can not get host IP");
return -1;
}

const char iface_name[] = "lo";
local_host_ips.clear();

for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == NULL)
continue;
if (strstr(ifa->ifa_name, iface_name) == NULL) {
family = ifa->ifa_addr->sa_family;
if (family == AF_INET) {
memset(local_ip, 0, CCL_IP_LEN);
int res = getnameinfo(
ifa->ifa_addr,
(family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6),
local_ip,
CCL_IP_LEN,
NULL,
0,
NI_NUMERICHOST);
if (res != 0) {
std::string s("fill_local_host_ip: getnameinfo error > ");
s.append(gai_strerror(res));
// LOG_ERROR(s.c_str());
return -1;
}
local_host_ips.push_back(local_ip);
}
}
}
if (local_host_ips.empty()) {
// LOG_ERROR("fill_local_host_ip: can't find interface to get host IP");
return -1;
}
// memset(local_host_ip, 0, CCL_IP_LEN);
// strncpy(local_host_ip, local_host_ips.front().c_str(), CCL_IP_LEN);

// for (auto &ip : local_host_ips)
// cout << ip << endl;

freeifaddrs(ifaddr);
return 0;
}
#define GET_IP_CMD "hostname -I"
#define MAX_KVS_VAL_LENGTH 130
#define READ_ONLY "r"

static bool is_valid_ip(char ip[]) {
if (fill_local_host_ip() == -1) {
std::cerr << "fill_local_host_ip error" << std::endl;
};
for (std::list<std::string>::iterator it = local_host_ips.begin(); it != local_host_ips.end(); ++it) {
if (*it == ip) {
return true;
FILE *fp;
// TODO: use getifaddrs instead of popen
if ((fp = popen(GET_IP_CMD, READ_ONLY)) == NULL) {
printf("Can't get host IP\n");
exit(1);
}
}
char host_ips[MAX_KVS_VAL_LENGTH];
fgets(host_ips, MAX_KVS_VAL_LENGTH, fp);
pclose(fp);

return false;
return strstr(host_ips, ip) ? true : false;
}

/*
* Class: org_apache_spark_ml_util_OneCCL__
* Method: getAvailPort
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1getAvailPort
JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort
(JNIEnv *env, jobject obj, jstring localIP) {

// start from beginning of dynamic port
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,21 @@ class KMeansDALImpl (

instr.foreach(_.logInfo(s"Processing partitions with $executorNum executors"))

val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

// repartition to executorNum if not enough partitions
val dataForConversion = if (data.getNumPartitions < executorNum) {
data.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
data
}

val executorIPAddress = Utils.sparkFirstExecutorIP(dataForConversion.sparkContext)
val kvsIP = dataForConversion.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val kvsPortDetected = Utils.checkExecutorAvailPort(dataForConversion, kvsIP)
val kvsPort = dataForConversion.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

val partitionDims = Utils.getPartitionDims(dataForConversion)

// filter the empty partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class PCADALImpl (

val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum)

val executorIPAddress = Utils.sparkFirstExecutorIP(coalescedTables.sparkContext)
val kvsIP = coalescedTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsPortDetected = Utils.checkExecutorAvailPort(coalescedTables, kvsIP)
val kvsPort = coalescedTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)
val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,23 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag](

logInfo(s"ALSDAL fit using $executorNum Executors for $nVectors vectors and $nFeatures features")

val numericTables = data.repartition(executorNum).setName("Repartitioned for conversion").cache()

val executorIPAddress = Utils.sparkFirstExecutorIP(numericTables.sparkContext)
val kvsIP = numericTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsPortDetected = Utils.checkExecutorAvailPort(numericTables, kvsIP)
val kvsPort = numericTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)
val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

val numericTables = data.repartition(executorNum).setName("Repartitioned for conversion").cache()

/*
val numericTables = if (data.getNumPartitions < executorNum) {
data.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
data.coalesce(executorNum).setName("Coalesced for conversion").cache()
}
*/
val results = numericTables
// Transpose the dataset
.map { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,12 @@ object OneCCL extends Logging {
c_cleanup()
}

def getAvailPort(localIP: String): Int = synchronized {
c_getAvailPort(localIP)
}

@native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam) : Int
@native private def c_cleanup() : Unit

@native def isRoot() : Boolean
@native def rankID() : Int

@native def setEnv(key: String, value: String, overwrite: Boolean = true): Int
@native def c_getAvailPort(localIP: String): Int
@native def getAvailPort(localIP: String): Int
}
6 changes: 3 additions & 3 deletions mllib-dal/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ export LD_PRELOAD=$JAVA_HOME/jre/lib/amd64/libjsig.so
# -Dtest=none to turn off the Java tests

# Test all
# mvn -Dtest=none -Dmaven.test.skip=false test
mvn -Dtest=none -Dmaven.test.skip=false test

# Individual test
mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.clustering.IntelKMeansSuite test
mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.feature.IntelPCASuite test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.clustering.IntelKMeansSuite test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.feature.IntelPCASuite test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.recommendation.IntelALSSuite test

0 comments on commit b6b6274

Please sign in to comment.