Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-33] Optimize oneCCL port detecting #34

Merged
merged 12 commits into from
Mar 8, 2021
79 changes: 66 additions & 13 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include <sys/socket.h>
#include <unistd.h>

#include <list>
#include <ifaddrs.h>
#include <netdb.h>

#include <oneapi/ccl.hpp>

#include "org_apache_spark_ml_util_OneCCL__.h"
Expand Down Expand Up @@ -112,30 +116,79 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_setEnv
return err;
}

#define GET_IP_CMD "hostname -I"
#define MAX_KVS_VAL_LENGTH 130
#define READ_ONLY "r"
static const int CCL_IP_LEN = 128;
std::list<std::string> local_host_ips;

static int fill_local_host_ip() {
struct ifaddrs *ifaddr, *ifa;
int family = AF_UNSPEC;
char local_ip[CCL_IP_LEN];
if (getifaddrs(&ifaddr) < 0) {
// LOG_ERROR("fill_local_host_ip: can not get host IP");
return -1;
}

const char iface_name[] = "lo";
local_host_ips.clear();

for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == NULL)
continue;
if (strstr(ifa->ifa_name, iface_name) == NULL) {
family = ifa->ifa_addr->sa_family;
if (family == AF_INET) {
memset(local_ip, 0, CCL_IP_LEN);
int res = getnameinfo(
ifa->ifa_addr,
(family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6),
local_ip,
CCL_IP_LEN,
NULL,
0,
NI_NUMERICHOST);
if (res != 0) {
std::string s("fill_local_host_ip: getnameinfo error > ");
s.append(gai_strerror(res));
// LOG_ERROR(s.c_str());
return -1;
}
local_host_ips.push_back(local_ip);
}
}
}
if (local_host_ips.empty()) {
// LOG_ERROR("fill_local_host_ip: can't find interface to get host IP");
return -1;
}
// memset(local_host_ip, 0, CCL_IP_LEN);
// strncpy(local_host_ip, local_host_ips.front().c_str(), CCL_IP_LEN);

// for (auto &ip : local_host_ips)
// cout << ip << endl;

freeifaddrs(ifaddr);
return 0;
}

static bool is_valid_ip(char ip[]) {
FILE *fp;
// TODO: use getifaddrs instead of popen
if ((fp = popen(GET_IP_CMD, READ_ONLY)) == NULL) {
printf("Can't get host IP\n");
exit(1);
if (fill_local_host_ip() == -1) {
std::cerr << "fill_local_host_ip error" << std::endl;
};
for (std::list<std::string>::iterator it = local_host_ips.begin(); it != local_host_ips.end(); ++it) {
if (*it == ip) {
return true;
}
char host_ips[MAX_KVS_VAL_LENGTH];
fgets(host_ips, MAX_KVS_VAL_LENGTH, fp);
pclose(fp);
}

return strstr(host_ips, ip) ? true : false;
return false;
}

/*
* Class: org_apache_spark_ml_util_OneCCL__
* Method: getAvailPort
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort
JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1getAvailPort
(JNIEnv *env, jobject obj, jstring localIP) {

// start from beginning of dynamic port
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,20 @@ class KMeansDALImpl (

instr.foreach(_.logInfo(s"Processing partitions with $executorNum executors"))

val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

// repartition to executorNum if not enough partitions
val dataForConversion = if (data.getNumPartitions < executorNum) {
data.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
data
}

val executorIPAddress = Utils.sparkFirstExecutorIP(dataForConversion.sparkContext)
val kvsIP = dataForConversion.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val kvsPortDetected = Utils.checkExecutorAvailPort(dataForConversion, kvsIP)
val kvsPort = dataForConversion.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

val partitionDims = Utils.getPartitionDims(dataForConversion)

// filter the empty partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class PCADALImpl (

val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum)

val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val executorIPAddress = Utils.sparkFirstExecutorIP(coalescedTables.sparkContext)
val kvsIP = coalescedTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)
val kvsPortDetected = Utils.checkExecutorAvailPort(coalescedTables, kvsIP)
val kvsPort = coalescedTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,16 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag](

logInfo(s"ALSDAL fit using $executorNum Executors for $nVectors vectors and $nFeatures features")

val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val numericTables = data.repartition(executorNum).setName("Repartitioned for conversion").cache()

val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)
val executorIPAddress = Utils.sparkFirstExecutorIP(numericTables.sparkContext)
val kvsIP = numericTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsIPPort = kvsIP+"_"+kvsPort
val kvsPortDetected = Utils.checkExecutorAvailPort(numericTables, kvsIP)
val kvsPort = numericTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val numericTables = data.repartition(executorNum).setName("Repartitioned for conversion").cache()
val kvsIPPort = kvsIP+"_"+kvsPort

/*
val numericTables = if (data.getNumPartitions < executorNum) {
data.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
data.coalesce(executorNum).setName("Coalesced for conversion").cache()
}
*/
val results = numericTables
// Transpose the dataset
.map { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ object OneCCL extends Logging {
c_cleanup()
}

def getAvailPort(localIP: String): Int = synchronized {
c_getAvailPort(localIP)
}

@native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam) : Int
@native private def c_cleanup() : Unit

@native def isRoot() : Boolean
@native def rankID() : Int

@native def setEnv(key: String, value: String, overwrite: Boolean = true): Int
@native def getAvailPort(localIP: String): Int
@native def c_getAvailPort(localIP: String): Int
}
6 changes: 3 additions & 3 deletions mllib-dal/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ export LD_PRELOAD=$JAVA_HOME/jre/lib/amd64/libjsig.so
# -Dtest=none to turn off the Java tests

# Test all
mvn -Dtest=none -Dmaven.test.skip=false test
# mvn -Dtest=none -Dmaven.test.skip=false test

# Individual test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.clustering.IntelKMeansSuite test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.feature.IntelPCASuite test
mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.clustering.IntelKMeansSuite test
mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.feature.IntelPCASuite test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.recommendation.IntelALSSuite test