Skip to content
This repository has been archived by the owner on May 24, 2018. It is now read-only.

add kmeans.py and wrapper of blockdata_iter for python #9

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .gitignore
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
dmlc-core
*.txt
*.model

# User-specific config
config.mk

# VC
*.pdb
*.tlog
*.opensdf
*.sdf
*.log
**/ipch/

Empty file modified LICENSE
100644 → 100755
Empty file.
Empty file modified Makefile
100644 → 100755
Empty file.
Empty file modified README.md
100644 → 100755
Empty file.
Empty file modified learn/data/README.md
100644 → 100755
Empty file.
Empty file modified learn/data/agaricus.txt.test
100644 → 100755
Empty file.
Empty file modified learn/data/agaricus.txt.train
100644 → 100755
Empty file.
Empty file modified learn/data/featmap.txt
100644 → 100755
Empty file.
Empty file modified learn/kmeans/Makefile
100644 → 100755
Empty file.
70 changes: 65 additions & 5 deletions learn/kmeans/kmeans.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
* \file kmeans.cc
* \brief kmeans using rabit allreduce
*/
#define OMP_DBG
#define DMLC_USE_CXX11 1
#include <algorithm>
#include <vector>
#include <cmath>
#include <rabit.h>
#include <dmlc/io.h>
#include <dmlc/data.h>
#include <dmlc/logging.h>
#include <omp.h>

using namespace rabit;
using namespace dmlc;
Expand Down Expand Up @@ -117,7 +120,7 @@ inline double Cos(const float *row,
}
// get cluster of a certain vector
inline size_t GetCluster(const Matrix &centroids,
const Row<unsigned> &v) {
const Row<unsigned> &v, double* out_dist = NULL) {
size_t imin = 0;
double dmin = Cos(centroids[0], v);
for (size_t k = 1; k < centroids.nrow; ++k) {
Expand All @@ -126,9 +129,11 @@ inline size_t GetCluster(const Matrix &centroids,
dmin = dist; imin = k;
}
}
if (out_dist)
*out_dist = dmin;
return imin;
}

int main(int argc, char *argv[]) {
if (argc < 5) {
// intialize rabit engine
Expand All @@ -143,6 +148,7 @@ int main(int argc, char *argv[]) {
// set the parameters
int num_cluster = atoi(argv[2]);
int max_iter = atoi(argv[3]);
int omp_num = atoi(argv[5]);
// intialize rabit engine
rabit::Init(argc, argv);

Expand Down Expand Up @@ -172,12 +178,15 @@ int main(int argc, char *argv[]) {
{
// lambda function used to calculate the data if necessary
// this function may not be called when the result can be directly recovered
double dist_sum = 0;
double dist_tmp;
data->BeforeFirst();
while (data->Next()) {
const auto &batch = data->Value();
for (size_t i = 0; i < batch.size; ++i) {
auto v = batch[i];
size_t k = GetCluster(model.centroids, v);
size_t k = GetCluster(model.centroids, v, &dist_tmp);
dist_sum += dist_tmp;
// temp[k] += v
for (size_t j = 0; j < v.length; ++j) {
temp[k][v.index[j]] += v.get_value(j);
Expand All @@ -186,8 +195,59 @@ int main(int argc, char *argv[]) {
temp[k][num_feat] += 1.0f;
}
}
printf("total dist = %lf\n", dist_sum);
};
auto omp_get_centroid = [&]()
{
// lambda function used to calculate the data if necessary
// this function may not be called when the result can be directly recovered
std::vector<int> cid;
data->BeforeFirst();
while (data->Next()) {
const auto &batch = data->Value();
size_t batch_size = static_cast<int>(batch.size);
cid.resize(batch_size);

// get cluster_id for each instance, write to vector cid
#pragma omp parallel num_threads(omp_num)
{
#pragma omp for
for (int i = 0; i < batch_size; ++i) {
if (i >= batch_size)
continue;
int k = GetCluster(model.centroids, batch[i]);
cid[i] = k;
}
}
/*
for (int i=0; i<num_cluster; i++)
std::cout << instances_cid[i].size() << " ";
std::cout <<"\n";
*/

// compute centroid for each cluster
#pragma omp parallel for num_threads(omp_num) schedule(dynamic)
for (int k = 0; k < num_cluster; k++) {
if (k >= num_cluster)
continue;
for (size_t idx = 0; idx < batch_size; idx++) {
if (cid[idx] != k)
continue;
auto v = batch[idx];
// temp[k] += v
for (size_t j = 0; j < v.length; ++j) {
temp[k][v.index[j]] += v.get_value(j);
}
// use last column to record counts
temp[k][num_feat] += 1.0f;
}
}
}
//printf("total dist = %lf\n", dist_sum);
};
rabit::Allreduce<op::Sum>(&temp.data[0], temp.data.size(), lazy_get_centroid);
auto get_centroid = omp_get_centroid;
rabit::Allreduce<op::Sum>(&temp.data[0], temp.data.size(), get_centroid);
//printf("num_cluster = %d\n", num_cluster);
// set number
for (int k = 0; k < num_cluster; ++k) {
float cnt = temp[k][num_feat];
Expand All @@ -210,7 +270,7 @@ int main(int argc, char *argv[]) {

// output the model file to somewhere
if (rabit::GetRank() == 0) {
auto *fo = Stream::Create(argv[4], "w");
dmlc::Stream *fo = dmlc::Stream::Create(argv[4], "w");
model.centroids.Print(fo);
delete fo;
rabit::TrackerPrintf("All iteration finished, centroids saved to %s\n", argv[4]);
Expand Down
87 changes: 87 additions & 0 deletions learn/kmeans/kmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/python
"""
kmeans:
"""
import os
import sys
import numpy as np
from scipy import sparse
# import rabit, the tracker script will setup the lib path correctly
# for normal run without tracker script, add following line
sys.path.append('../../../rabit/wrapper')
sys.path.append('../../dmlc-core/wrapper')
import rabit
import dmlc_core

class Model(object):
def __init__(me, nbr_cluster, fdim):
me.centroid = np.zeros((nbr_cluster, fdim))
me.K = nbr_cluster
me.F = fdim
def InitCentroids(me, data):
data.BeforeFirst()
if not data.Next():
print 'error empty data'
spmat = data.ValueCSR()
for i in range(me.K):
rowid = np.random.randint(0, data.length - 1)
me.centroid[i] += spmat.getrow(rowid)
rabit.broadcast(me.centroid, 0)
def normalize(me):
norm = np.sqrt(me.centroid.sum(axis = 1))
me.centroid /= norm.reshape((norm.size, 1))

def main():

num_cluster = int(sys.argv[2])
max_iter = int(sys.argv[3])
rabit.init(sys.argv)
world_size = rabit.get_world_size()
rank = rabit.get_rank()
data_iter = dmlc_core.RowBlockIter()
data_iter.CreateFromUri(sys.argv[1], rank, world_size, 'libsvm')
iter_cnt = 0
fdim_array = np.array([0])
fdim_array[0] = data_iter.NumCol()
#print fdim_array
if iter_cnt == 0:
fdim_array = rabit.allreduce(fdim_array, rabit.MAX)
model = Model(num_cluster, int(fdim_array[0]))
model.InitCentroids(data_iter)
#model.normalize()
num_feat = fdim_array[0]
data_iter.setNumFeat(num_feat)
for it in range(iter_cnt, max_iter):
if rabit.get_rank() == 0:
print 'iter = ', it

temp = np.zeros((num_cluster, num_feat + 1), dtype=np.float32)

def preparefun(temp):
nbrline = 0
data_iter.BeforeFirst()
while data_iter.Next():
spmat = data_iter.ValueCSR()
nbrline += spmat.shape[0]
num_row = spmat.shape[0]

vnorm = np.sqrt(spmat.multiply(spmat).sum(axis = 1))
dotp = spmat.dot(model.centroid.T)
dist = dotp / vnorm
max_id = np.argmax(dist, axis = 1)
for k in range(num_cluster):
temp[:,num_feat] += np.where(max_id == k)[0].shape[1]
data_iter.CSRReduceSum(max_id, temp)
#print 'processed %d lines = ' % (nbrline)

rabit.allreduce(temp, rabit.SUM, preparefun)
model.centroid = temp[:,0:num_feat]
#print temp
for k in range(num_cluster):
assert(temp[k,num_feat] > 0)
model.centroid[k,:] /= temp[k,num_feat]
#print model.centroid
#dist /=
#model.normalize()
rabit.finalize()
main()
2 changes: 2 additions & 0 deletions learn/kmeans/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
run_local.sh 4 4
run_local.sh 1 16
14 changes: 14 additions & 0 deletions learn/kmeans/run_local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
nbr_node=$1
datan=rcv1_test.binary
#datan=agaricus.txt.train
datap=../data/$datan
nomp=$2
make
if [ ! -f $datap ];
then
cd ../data
wget http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/$datan.bz2
bzip2 -d $datan.bz2
cd ../kmeans
fi
../../dmlc-core/tracker/dmlc_local.py -n $nbr_node kmeans.dmlc $datap 40 5 $datap.out $nomp
15 changes: 15 additions & 0 deletions learn/kmeans/run_yarn.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
nbr_node=$1
datan=rcv1_test.binary
#datan=a3.txt
#datan=agaricus.txt.train
export fsprefix=hdfs://localhost:9000
datap=$fsprefix/user/wttian/libsvmdata/$datan
datapout=$fsprefix/user/wttian/libsvmdata/$datan.out
nomp=$2
make
if [ "$nbr_node" == "0" ]
then
./kmeans.dmlc $datap 40 5 $datapout $nomp
else
../../dmlc-core/tracker/dmlc_yarn.py -mem 4096 --log-level DEBUG --log-file log -n $nbr_node --vcores $nomp kmeans.dmlc $datap 40 10 $datapout $nomp
fi
Empty file modified learn/lbfgs-linear/Makefile
100644 → 100755
Empty file.
Empty file modified learn/lbfgs-linear/README.md
100644 → 100755
Empty file.
Empty file modified learn/lbfgs-linear/linear.cc
100644 → 100755
Empty file.
Empty file modified learn/lbfgs-linear/linear.h
100644 → 100755
Empty file.
Empty file modified learn/linear/.gitignore
100644 → 100755
Empty file.
Empty file modified learn/linear/Makefile
100644 → 100755
Empty file.
Empty file modified learn/linear/base/README.md
100644 → 100755
Empty file.
Empty file modified learn/linear/base/adfea_parser.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/adfea_rec_parser.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/arg_parser.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/crc32.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/criteo_parser.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/criteo_rec_parser.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/dist_monitor.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/evaluation.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/localizer.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/loss.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/minibatch_iter.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/monitor.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/parallel_sort.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/penalty.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/spmv.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/utils.h
100644 → 100755
Empty file.
Empty file modified learn/linear/base/workload_pool.h
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/README.md
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/build.md
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/conf.md
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/criteo.conf
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/criteo.md
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/criteo_s3.conf
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/demo.conf
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/dev.md
100644 → 100755
Empty file.
Empty file modified learn/linear/guide/download_criteo.sh
100644 → 100755
Empty file.
Empty file modified learn/linear/proto/config.proto
100644 → 100755
Empty file.
Empty file modified learn/linear/proto/data_format.proto
100644 → 100755
Empty file.
Empty file modified learn/linear/proto/workload.proto
100644 → 100755
Empty file.
Empty file modified learn/linear/sgd/async_sgd.cc
100644 → 100755
Empty file.
Empty file modified learn/linear/sgd/async_sgd.h
100644 → 100755
Empty file.
Empty file modified learn/linear/sgd/delay_tol_handle.h
100644 → 100755
Empty file.
Empty file modified learn/linear/sgd/sgd_server_handle.h
100644 → 100755
Empty file.
Empty file modified learn/linear/test/README.md
100644 → 100755
Empty file.
Empty file modified learn/linear/test/arg_parser.cc
100644 → 100755
Empty file.
Empty file modified learn/linear/test/ftrl.cc
100644 → 100755
Empty file.
Empty file modified learn/linear/test/localizer_test.cc
100644 → 100755
Empty file.
Empty file modified learn/linear/test/minibatch_iter_test.cc
100644 → 100755
Empty file.
Empty file modified learn/linear/test/spmv_test.cc
100644 → 100755
Empty file.
Empty file modified learn/linear/tool/print_rec.cc
100644 → 100755
Empty file.
Empty file modified learn/linear/tool/text2rec.cc
100644 → 100755
Empty file.
Empty file modified learn/solver/lbfgs.h
100644 → 100755
Empty file.
Empty file modified learn/xgboost/README.md
100644 → 100755
Empty file.
Empty file modified learn/xgboost/mushroom.hadoop.conf
100644 → 100755
Empty file.
7 changes: 5 additions & 2 deletions make/config.mk
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
export CC = gcc
export CXX = g++
export MPICXX = mpicxx

export HADOOP_HOME = /data/wttian/dev/hadoop
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79-2.5.5.1.el7_1.x86_64
# whether use HDFS support during compile
USE_HDFS = 1

# whether use AWS S3 support during compile
USE_S3 = 1
USE_S3 = 0

# path to libjvm.so
LIBJVM=$(JAVA_HOME)/jre/lib/amd64/server

DMLC_CFLAGS=-L/data/wttian/dev/hadoop/lib/native
Loading