Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,6 @@
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<!-- Exclude libgfortran, libgcc for license issues -->
<artifact>org.jblas:jblas</artifact>
<excludes>
<!-- Linux amd64 is OK; not statically linked -->
<exclude>lib/static/Linux/i386/**</exclude>
<exclude>lib/static/Mac OS X/**</exclude>
<exclude>lib/static/Windows/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
Expand Down
5 changes: 0 additions & 5 deletions docs/mllib-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ include `netlib-java`'s native proxies by default. To configure
[netlib-java](https://github.com/fommil/netlib-java) documentation for
your platform's additional installation instructions.

MLlib also uses [jblas](https://github.com/mikiobraun/jblas) which
will require you to install the
[gfortran runtime library](https://github.com/mikiobraun/jblas/wiki/Missing-Libraries)
if it is not already present on your nodes.

To use MLlib in Python, you will need [NumPy](http://www.numpy.org)
version 1.4 or newer.

Expand Down
11 changes: 8 additions & 3 deletions graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,14 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>core</artifactId>
<version>${netlib.java.version}</version>
</dependency>
<dependency>
<groupId>net.sourceforge.f2j</groupId>
<artifactId>arpack_combined_all</artifactId>
<version>0.1</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be runtime-only scope? is this basically a pure Java backend to netlib? I checked the .jar and it seems to be all Java. I double-checked and this is BSD licensed. Since license issues are front of mind in this change, add a line for this new lib in the LICENSE file under the other BSD dependencies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some netlib-java BLAS routines contains org.netlib.intW, which is not part of com.github.fommil.netlib but arpack_combined_all. In the netlib:core pom file (https://repo1.maven.org/maven2/com/github/fommil/netlib/core/1.1.2/core-1.1.2.pom), arpack_combined_all is specified as a dependency but I cannot run the tests without explicitly specifying in the pom. Maybe it is because that it also include arpack_combined_all:javadoc as a dependency, sbt/maven doesn't resolve the dependency correctly.

Breeze already depends on netlib-java and arpack_combined_all. Should we already have something under LICENSE?

</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
Expand Down
96 changes: 59 additions & 37 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.graphx.lib

import scala.util.Random
import org.jblas.DoubleMatrix

import com.github.fommil.netlib.BLAS.{getInstance => blas}

import org.apache.spark.rdd._
import org.apache.spark.graphx._

Expand Down Expand Up @@ -53,7 +55,7 @@ object SVDPlusPlus {
* a Multifaceted Collaborative Filtering Model",
* available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
*
* The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)),
* The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^^-0.5^^*sum(y)),
* see the details on page 6.
*
* @param edges edges for constructing the graph
Expand All @@ -66,13 +68,10 @@ object SVDPlusPlus {
: (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) =
{
// Generate default vertex attribute
def defaultF(rank: Int): (DoubleMatrix, DoubleMatrix, Double, Double) = {
val v1 = new DoubleMatrix(rank)
val v2 = new DoubleMatrix(rank)
for (i <- 0 until rank) {
v1.put(i, Random.nextDouble())
v2.put(i, Random.nextDouble())
}
def defaultF(rank: Int): (Array[Double], Array[Double], Double, Double) = {
// TODO: use a fixed random seed
val v1 = Array.fill(rank)(Random.nextDouble())
val v2 = Array.fill(rank)(Random.nextDouble())
(v1, v2, 0.0, 0.0)
}

Expand All @@ -92,7 +91,7 @@ object SVDPlusPlus {
(g1, g2) => (g1._1 + g2._1, g1._2 + g2._2))

val gJoinT0 = g.outerJoinVertices(t0) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
(vid: VertexId, vd: (Array[Double], Array[Double], Double, Double),
msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}.cache()
Expand All @@ -102,39 +101,52 @@ object SVDPlusPlus {

def sendMsgTrainF(conf: Conf, u: Double)
(ctx: EdgeContext[
(DoubleMatrix, DoubleMatrix, Double, Double),
(Array[Double], Array[Double], Double, Double),
Double,
(DoubleMatrix, DoubleMatrix, Double)]) {
(Array[Double], Array[Double], Double)]) {
val (usr, itm) = (ctx.srcAttr, ctx.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dot(usr._2)
val rank = p.length
var pred = u + usr._3 + itm._3 + blas.ddot(rank, q, 1, usr._2, 1)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = ctx.attr - pred
val updateP = q.mul(err)
.subColumnVector(p.mul(conf.gamma7))
.mul(conf.gamma2)
val updateQ = usr._2.mul(err)
.subColumnVector(q.mul(conf.gamma7))
.mul(conf.gamma2)
val updateY = q.mul(err * usr._4)
.subColumnVector(itm._2.mul(conf.gamma7))
.mul(conf.gamma2)
// updateP = (err * q - conf.gamma7 * p) * conf.gamma2
val updateP = q.clone()
blas.dscal(rank, err * conf.gamma2, updateP, 1)
blas.daxpy(rank, -conf.gamma7 * conf.gamma2, p, 1, updateP, 1)
// updateQ = (err * usr._2 - conf.gamma7 * q) * conf.gamma2
val updateQ = usr._2.clone()
blas.dscal(rank, err * conf.gamma2, updateQ, 1)
blas.daxpy(rank, -conf.gamma7 * conf.gamma2, q, 1, updateQ, 1)
// updateY = (err * usr._4 * q - conf.gamma7 * itm._2) * conf.gamma2
val updateY = q.clone()
blas.dscal(rank, err * usr._4 * conf.gamma2, updateY, 1)
blas.daxpy(rank, -conf.gamma7 * conf.gamma2, itm._2, 1, updateY, 1)
ctx.sendToSrc((updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1))
ctx.sendToDst((updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))
}

for (i <- 0 until conf.maxIters) {
// Phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
g.cache()
val t1 = g.aggregateMessages[DoubleMatrix](
val t1 = g.aggregateMessages[Array[Double]](
ctx => ctx.sendToSrc(ctx.dstAttr._2),
(g1, g2) => g1.addColumnVector(g2))
(g1, g2) => {
val out = g1.clone()
blas.daxpy(out.length, 1.0, g2, 1, out, 1)
out
})
val gJoinT1 = g.outerJoinVertices(t1) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
msg: Option[DoubleMatrix]) =>
if (msg.isDefined) (vd._1, vd._1
.addColumnVector(msg.get.mul(vd._4)), vd._3, vd._4) else vd
(vid: VertexId, vd: (Array[Double], Array[Double], Double, Double),
msg: Option[Array[Double]]) =>
if (msg.isDefined) {
val out = vd._1.clone()
blas.daxpy(out.length, vd._4, msg.get, 1, out, 1)
(vd._1, out, vd._3, vd._4)
} else {
vd
}
}.cache()
materialize(gJoinT1)
g.unpersist()
Expand All @@ -144,14 +156,24 @@ object SVDPlusPlus {
g.cache()
val t2 = g.aggregateMessages(
sendMsgTrainF(conf, u),
(g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, DoubleMatrix, Double)) =>
(g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + g2._3))
(g1: (Array[Double], Array[Double], Double), g2: (Array[Double], Array[Double], Double)) =>
{
val out1 = g1._1.clone()
blas.daxpy(out1.length, 1.0, g2._1, 1, out1, 1)
val out2 = g2._2.clone()
blas.daxpy(out2.length, 1.0, g2._2, 1, out2, 1)
(out1, out2, g1._3 + g2._3)
})
val gJoinT2 = g.outerJoinVertices(t2) {
(vid: VertexId,
vd: (DoubleMatrix, DoubleMatrix, Double, Double),
msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) =>
(vd._1.addColumnVector(msg.get._1), vd._2.addColumnVector(msg.get._2),
vd._3 + msg.get._3, vd._4)
vd: (Array[Double], Array[Double], Double, Double),
msg: Option[(Array[Double], Array[Double], Double)]) => {
val out1 = vd._1.clone()
blas.daxpy(out1.length, 1.0, msg.get._1, 1, out1, 1)
val out2 = vd._2.clone()
blas.daxpy(out2.length, 1.0, msg.get._2, 1, out2, 1)
(out1, out2, vd._3 + msg.get._3, vd._4)
}
}.cache()
materialize(gJoinT2)
g.unpersist()
Expand All @@ -160,10 +182,10 @@ object SVDPlusPlus {

// calculate error on training set
def sendMsgTestF(conf: Conf, u: Double)
(ctx: EdgeContext[(DoubleMatrix, DoubleMatrix, Double, Double), Double, Double]) {
(ctx: EdgeContext[(Array[Double], Array[Double], Double, Double), Double, Double]) {
val (usr, itm) = (ctx.srcAttr, ctx.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dot(usr._2)
var pred = u + usr._3 + itm._3 + blas.ddot(q.length, q, 1, usr._2, 1)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = (ctx.attr - pred) * (ctx.attr - pred)
Expand All @@ -173,7 +195,7 @@ object SVDPlusPlus {
g.cache()
val t3 = g.aggregateMessages[Double](sendMsgTestF(conf, u), _ + _)
val gJoinT3 = g.outerJoinVertices(t3) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[Double]) =>
(vid: VertexId, vd: (Array[Double], Array[Double], Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}.cache()
materialize(gJoinT3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ class SVDPlusPlusSuite extends FunSuite with LocalSparkContext {
Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
}
val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
var (graph, u) = SVDPlusPlus.runSVDPlusPlus(edges, conf)
val (graph, _) = SVDPlusPlus.run(edges, conf)
graph.cache()
val err = graph.vertices.collect().map{ case (vid, vd) =>
val err = graph.vertices.map { case (vid, vd) =>
if (vid % 2 == 1) vd._4 else 0.0
}.reduce(_ + _) / graph.triplets.collect().size
}.reduce(_ + _) / graph.numEdges
assert(err <= svdppErr)
}
}
Expand Down
3 changes: 2 additions & 1 deletion mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
<scope>test</scope>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so for "phase 1", leave the tests as-is, to break this up.

</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
Expand Down Expand Up @@ -116,7 +117,7 @@
<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>all</artifactId>
<version>1.1.2</version>
<version>${netlib.java.version}</version>
<type>pom</type>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.util.hashing.byteswap64

import com.github.fommil.netlib.BLAS.{getInstance => blas}
import com.github.fommil.netlib.LAPACK.{getInstance => lapack}
import org.jblas.DoubleMatrix
import org.netlib.util.intW

import org.apache.spark.{Logging, Partitioner}
Expand Down Expand Up @@ -361,14 +360,14 @@ object ALS extends Logging {
private[recommendation] class NNLSSolver extends LeastSquaresNESolver {
private var rank: Int = -1
private var workspace: NNLS.Workspace = _
private var ata: DoubleMatrix = _
private var ata: Array[Double] = _
private var initialized: Boolean = false

private def initialize(rank: Int): Unit = {
if (!initialized) {
this.rank = rank
workspace = NNLS.createWorkspace(rank)
ata = new DoubleMatrix(rank, rank)
ata = new Array[Double](rank * rank)
initialized = true
} else {
require(this.rank == rank)
Expand All @@ -385,7 +384,7 @@ object ALS extends Logging {
val rank = ne.k
initialize(rank)
fillAtA(ne.ata, lambda * ne.n)
val x = NNLS.solve(ata, new DoubleMatrix(rank, 1, ne.atb: _*), workspace)
val x = NNLS.solve(ata, ne.atb, workspace)
ne.reset()
x.map(x => x.toFloat)
}
Expand All @@ -398,17 +397,16 @@ object ALS extends Logging {
var i = 0
var pos = 0
var a = 0.0
val data = ata.data
while (i < rank) {
var j = 0
while (j <= i) {
a = triAtA(pos)
data(i * rank + j) = a
data(j * rank + i) = a
ata(i * rank + j) = a
ata(j * rank + i) = a
pos += 1
j += 1
}
data(i * rank + i) += lambda
ata(i * rank + i) += lambda
i += 1
}
}
Expand Down
Loading