Skip to content

Commit

Permalink
Merge pull request #613 from leleyu/develope
Browse files Browse the repository at this point in the history
[DOCS] FTRL
  • Loading branch information
paynie authored Jan 11, 2019
2 parents a834b46 + 7ec9196 commit 68467a4
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 229 deletions.
185 changes: 49 additions & 136 deletions docs/algo/ftrl_lr_spark.md
Original file line number Diff line number Diff line change
@@ -1,149 +1,62 @@
# [Spark Streaming on Angel] FTRL
# Training Logistic Regression with FTRL on Spark on Angel

>随着近几年online learning的火热发展,FTRL这种优化算法不光更能适应海量数据的要求,同时还能比较轻松的学习到一个有效且稀疏的模型,自问世以来在学术界和工业界都倍受关注和好评。基于此,我们在Spark on Angel平台实现了在线与离线方式的以FTRL进行优化的分布式LR算法。下面介绍该算法的原理与使用。
FTRL (Follow-the-regularized-leader) is an optimization algorithm which is widely deployed by online learning. Employing FTRL is easy in Spark-on-Angel and you can train a model with billions, even ten billions, dimensions once you have enough machines.

Is you are not familiar with how to programming on Spark-on-Angel, please first refer to [Programming Guide for Spark-on-Angel](https://github.com/Angel-ML/angel/blob/master/docs/programmers_guide/spark_on_angel_programing_guide_en.md);

## 1. 算法介绍
## Using the FTRL Optimizer
```scala

`FTRL`算法兼顾了`FOBOS``RDA`两种算法的优势,既能同FOBOS保证比较高的精度,又能在损失一定精度的情况下产生更好的稀疏性。

该算法的特征权重的更新公式(参考文献1)为:

![](../img/ftrl_lr_w.png)

其中

* G函数表示损失函数的梯度

![](../img/ftrl_lr_g.png)

* w的更新公式(针对特征权重的各个维度将其拆解成N个独立的标量最小化问题)

![](../img/ftrl_lr_w_update.png)

* 如果对每一维度的学习率单独考虑,w的更新公式:

![](../img/ftrl_lr_d_t.png)


## 2.分布式实现

Google给出的带有L1和L2正则项的基于FTRL优化的逻辑回归算法的工程实现

![](../img/ftrl_lr_project.png)

为了加快收敛速度,算法还提供了基于SVRG的方差约减FTRL算法,即在梯度更新时对梯度进行方差约减。

* SVRG的一般过程为:

![](../img/svrg.png)

为此,算法在损失函数的梯度g处增加了一步基于SVRG的更新,同时为了符合SVRG算法的原理,增加了两个参数rho1,rho2,近似计算每个阶段的权重和梯度(参考文献2)。给出基于SVRG算法的FTRL算法(后文简称"FTRL_VRG")的一般过程:

![](../img/ftrl_vrg.png)

参考实现,结合Spark Streaming和Angel的特点,FTRL的分布式实现的框架图如下:

![](../img/ftrl_lr_framework.png)

FTRL_VRG的分布式实现框架图如下:

![](../img/ftrl_vrg_framework_new.png)

## 3. 运行 & 性能

提供了两种数据接入方式:**在线与离线**方式,其中**离线**方式的详情查看[这里](./sona/sparselr_ftrl.md)

**<在线方式>**

### **说明**
在线方式以kafka为消息发送机制,使用时需要填写kafka的配置信息。优化方式包括FTRL和FTRL_VRG两种

### **输入格式说明**
* 消息格式仅支持标准的["libsvm"](./data_format.md)数据格式或者["Dummy"](./data_format.md)格式
* 为了模型的准确性,算法内部都自动对每个样本增加了index为0,value为1的特征值,以实现偏置效果,因此该算法的输入数据中index从1开始

### **参数说明**

* **算法参数**
* alpha:w更新公式中的alpha
* beta: w更新公式中的beta
* lambda1: w更新公式中的lambda1
* lambda2: w更新公式中的lambda2
* rho1:FTRL_VRG中的权重更新系数
* rho2:FTRL_VRG中的梯度更新系数

* **输入输出参数**
* checkPointPath:streaming流数据的checkpoint路径
* zkQuorum:Zookeeper的配置信息,格式:"hostname:port"
* topic:kafka的topic信息
* group:kafka的group信息
* dim:输入数据的维度,特征ID默认从0开始计数
* isOneHot:数据格式是否为One-Hot,若是则为true
* receiverNum:kafka receiver的个数
* streamingWindow:控制spark streaming流中每批数据的持续时间
* modelPath:训练时模型的保存路
* logPath:每个batch的平均loss输出路径
* partitionNum:streaming中的分区数
* optMethod:选择采用ftrl还是ftrlVRG进行优化
* isIncrementLearn:是否增量学习
* batch2Save:间隔多少个batch对模型进行一次保存
import com.tencent.angel.ml.matrix.RowType
import com.tencent.angel.spark.ml.online_learning.FTRL

// allocate a ftrl optimizer with (lambda1, lambda2, alpha, beta)
val optim = new FTRL(lambda1, lambda2, alpha, beta)
// initializing the model
optim.init(dim)
```

* **资源参数**
* num-executors:executor个数
* executor-cores:executor的核数
* executor-memory:executor的内存
* driver-memory:driver端内存
* spark.ps.instances:Angel PS节点数
* spark.ps.cores:每个PS节点的Core数
* spark.ps.memory:每个PS节点的Memory大小
There are four hyper-parameters for the FTRL optimizer, which are lambda1, lambda2, alpha and beta. We allocate a FTRL optimizer with these four hyper-parameters. The next step is to initialized a FTRL model. There are three vectors for FTRL, including z, n and w. In the aboving code, we allocate a sparse distributed matrix with 3 rows and dim columns.

### **提交命令**
### set the dimension
In the scenaro of online learning, the index of features can be range from (long.min, long.max), which is usually generated by a hash function. In Spark-on-Angel, you can set the dim=-1 when your feature index range from (long.min, long.max). If the feature index range from [0, n), you can set the dim=n.

可以通过下面命令向Yarn集群提交FTRL_SparseLR算法的训练任务:
## Training with Spark

```shell
./bin/spark-submit \
--master yarn-cluster \
--conf spark.hadoop.angel.ps.ha.replication.number=2 \
--conf fs.default.name=$defaultFS \
--conf spark.yarn.allocation.am.maxMemory=55g \
--conf spark.yarn.allocation.executor.maxMemory=55g \
--conf spark.ps.jars=$SONA_ANGEL_JARS \
--conf spark.ps.instances=20 \
--conf spark.ps.cores=2 \
--conf spark.ps.memory=6g \
--jars $SONA_SPARK_JARS \
--name $name \
--driver-memory 5g \
--num-executors 10 \
--executor-cores 2 \
--executor-memory 12g \
--class com.tencent.angel.spark.ml.online_learning.FTRLRunner \
spark-on-angel-mllib-<version>.jar \
partitionNum:10 \
modelPath:$modelPath \
checkPointPath:$checkPointPath \
logPath:$logPath \
zkQuorum:<zookeeper IP> \
group:<kafka group> \
topic:<kafka topic> \
rho1:0.2 \
rho2:0.2 \
alpha:0.1 \
isIncrementLearn:false \
lambda1:0.3 \
lambda2:0.3 \
dim:175835 \
streamingWindow:10 \
receiverNum:10 \
batch2Save:10 \
optMethod:ftrlVRG
### loading data
Using the interface of RDD to load data and parse them to vectors.
```scala
val data = sc.textFile(input).repartition(partNum)
.map(s => (DataLoader.parseLongDouble(s, dim), DataLoader.parseLabel(s, false)))
.map {
f =>
f._1.setY(f._2)
f._1
}.map(point => DataLoader.appendBias(point))
```
### training model
```scala
val size = data.count()
for (epoch <- 1 to numEpoch) {
val totalLoss = data.mapPartitions {
case iterator =>
// for each partition
val loss = iterator.map(f => (f.getX, f.getY))
.sliding(batchSize, batchSize)
.map(f => optim.optimize(f.toArray, calcGradientLoss)).sum
Iterator.single(loss)
}.sum()
println(s"epoch=$epoch loss=${totalLoss / size}")
}
```


### saving model
```scala
output = "hdfs://xxx"
optim.weight
optim.saveWeight(output)
```

## 4. 参考文献
1. H. Brendan McMahan, Gary Holt, D. Sculley, Michael Young. Ad Click Prediction: a View from the Trenches.KDD’13, August 11–14, 2013
2. 腾讯大数据技术峰会2017-广告中的大数据与机器学习
The example code can be find at https://github.com/Angel-ML/angel/blob/master/spark-on-angel/examples/src/main/scala/com/tencent/angel/spark/examples/cluster/FTRLExample.scala

142 changes: 49 additions & 93 deletions docs/algo/ftrl_lr_spark_en.md
Original file line number Diff line number Diff line change
@@ -1,106 +1,62 @@
# [Spark Streaming on Angel] FTRL
# Training Logistic Regression with FTRL on Spark on Angel

>FTRL is a common online-learning optimization method with demonstrated good result in practice. Traditionally implemented in Storm, FTRL is widely used for online models of CTR prediction. In practice, chances are data are high-dimensional and sparse solution is desired. In this case, implementing FTRL in Spark Streaming on Angel can actually achieve better result with robust performance in just a few lines of code.
FTRL (Follow-the-regularized-leader) is an optimization algorithm which is widely deployed by online learning. Employing FTRL is easy in Spark-on-Angel and you can train a model with billions, even ten billions, dimensions once you have enough machines.

Is you are not familiar with how to programming on Spark-on-Angel, please first refer to [Programming Guide for Spark-on-Angel](https://github.com/Angel-ML/angel/blob/master/docs/programmers_guide/spark_on_angel_programing_guide_en.md);

## 1. Introduction to FTRL

`FTRL` blends the benefits of `FOBOS` and `RDA`: it guarantees comparatively high precision as FOBOS does, and can yield better sparcity in result with a reasonably loss in precision.

The equation for updating weights of features by FTRL is:

![](../img/ftrl_lr_w.png)

where

* G function is the gradient of the loss function:

![](../img/ftrl_lr_g_en.png)

* w's updating is separated to N independent scalar minimization problems, depending on the specific dimension:

![](../img/ftrl_lr_w_update.png)

* considering individual learning rate for each dimension, w's updating equation becomes:

![](../img/ftrl_lr_d_t.png)



## 2. Distributed Implementation

Google has provided the implementation of Logistic Regression with L1/L2 terms using FTRL:

![](../img/ftrl_lr_project.png)

Integrating the characteristics of Spark Streaming and Angel into above reference, the distributed implementation has the following framework:

![](../img/ftrl_lr_framework.png)


## 3. Execution & Performance
## Using the FTRL Optimizer
```scala

import com.tencent.angel.ml.matrix.RowType
import com.tencent.angel.spark.ml.online_learning.FTRL

// allocate a ftrl optimizer with (lambda1, lambda2, alpha, beta)
val optim = new FTRL(lambda1, lambda2, alpha, beta)
// initializing the model
optim.init(dim)
```

### **Input Format**
* dim: dimension of the input data
* Only supports the standard ["libsvm"](./data_format_en.md) format for message
* Uses kafka messaging mechanism, thus kafka needs to be configured
There are four hyper-parameters for the FTRL optimizer, which are lambda1, lambda2, alpha and beta. We allocate a FTRL optimizer with these four hyper-parameters. The next step is to initialized a FTRL model. There are three vectors for FTRL, including z, n and w. In the aboving code, we allocate a sparse distributed matrix with 3 rows and dim columns.

### **Parameters**
### set the dimension
In the scenaro of online learning, the index of features can be range from (long.min, long.max), which is usually generated by a hash function. In Spark-on-Angel, you can set the dim=-1 when your feature index range from (long.min, long.max). If the feature index range from [0, n), you can set the dim=n.

* **Algorithm Parameters**
* alpha: alpha in w's updating equation
* beta: beta in w's updating equation
* lambda1: lambda1 in w's updating equation
* lambda2: lambda2 in w's updating equation
## Training with Spark

* **I/O Parameters**
* checkPointPath: checkpoint path for the data stream
* modelPath: save path for model (trained by each batch)
* actionType: "train" and "predict"
* sampleRate: input rate for samples used for "predict"
* zkQuorum: configuration for Zookeeper with format: "hostname:port"
* topic: kafka topic
* group: kafka group
* streamingWindow: size of spark streaming batch
### loading data
Using the interface of RDD to load data and parse them to vectors.
```scala
val data = sc.textFile(input).repartition(partNum)
.map(s => (DataLoader.parseLongDouble(s, dim), DataLoader.parseLabel(s, false)))
.map {
f =>
f._1.setY(f._2)
f._1
}.map(point => DataLoader.appendBias(point))
```
### training model
```scala
val size = data.count()
for (epoch <- 1 to numEpoch) {
val totalLoss = data.mapPartitions {
case iterator =>
// for each partition
val loss = iterator.map(f => (f.getX, f.getY))
.sliding(batchSize, batchSize)
.map(f => optim.optimize(f.toArray, calcGradientLoss)).sum
Iterator.single(loss)
}.sum()
println(s"epoch=$epoch loss=${totalLoss / size}")
}
```

* **Resource Parameters**
* num-executors: number of executors
* executor-cores: number of executor cores
* executor-memory: executor memory
* driver-memory: driver memory
* spark.ps.instances: number of Angel PS nodes
* spark.ps.cores: number of cores in each PS node
* spark.ps.memory: PS node memory

### **Submission Command**
### saving model
```scala
output = "hdfs://xxx"
optim.weight
optim.saveWeight(output)
```

Submit the FTRL_SparseLR training job to Yarn using the following sample command:
The example code can be find at https://github.com/Angel-ML/angel/blob/master/spark-on-angel/examples/src/main/scala/com/tencent/angel/spark/examples/cluster/FTRLExample.scala

```shell
./bin/spark-submit \
--master yarn-cluster \
--conf spark.yarn.allocation.am.maxMemory=55g \
--conf spark.yarn.allocation.executor.maxMemory=55g \
--conf spark.ps.jars=$SONA_ANGEL_JARS \
--conf spark.ps.instances=2 \
--conf spark.ps.cores=2 \
--conf spark.ps.memory=10g \
--jars $SONA_SPARK_JARS \
--name "spark-on-angel-sparse-ftrl" \
--driver-memory 1g \
--num-executors 5 \
--executor-cores 2 \
--executor-memory 2g \
--class com.tencent.angel.spark.ml.classification.SparseLRWithFTRL \
spark-on-angel-mllib-2.1.0.jar \
partitionNum:3 \
actionType:train \
sampleRate:1.0 \
modelPath:$modelPath \
checkPointPath:$checkpoint \
group:$group \
zkquorum:$zkquorum \
topic:$topic
```

0 comments on commit 68467a4

Please sign in to comment.