Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] add incremental update for odl #257

Merged
merged 116 commits into from
Sep 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
116 commits
Select commit Hold shift + click to select a range
58c4b2f
update datahub input to save offsets.
Mar 15, 2022
44fed7f
fix continue train bug
Mar 16, 2022
f40d3bb
add third_party code package: datahub, cprotobuf, lz4
Mar 17, 2022
ce2b6b9
fix kafka.so bug
Mar 19, 2022
12021fd
add support for online train with kafka input
Mar 23, 2022
bf1b43b
fix merge master conflicts
Mar 31, 2022
b05c373
add support for increment checkpoint
Apr 1, 2022
1f3cb8f
fix rtp fg convert bug
chengmengli06 Apr 6, 2022
d0454f8
update export big model graph and ops
Apr 6, 2022
02b6264
merge incr ckpt
Apr 6, 2022
bf3dbb3
add normalizer support for raw feature
chengmengli06 Apr 7, 2022
b705672
add normalizer_fn support in fg.json and easy_rec feature config
chengmengli06 Apr 7, 2022
3eeb0fa
temporary save
Apr 14, 2022
3fb9682
add kafka python package
Apr 14, 2022
22ff3fc
Merge branch 'merge_kafka' of https://github.com/AlibabaPAI/EasyRec i…
Apr 14, 2022
3358d6e
succeed
Apr 15, 2022
58e0558
add datahub train eval input
Apr 16, 2022
6d056c8
enable oss stop signal
Apr 17, 2022
a8513ed
fix pai kafka dataset ops bug
Apr 18, 2022
3236c1c
add pai ops specially, as some of the headers maybe modified in pai-tf
Apr 18, 2022
8230c87
add easy_rec estimator
Apr 18, 2022
0fcff00
fix format
Apr 18, 2022
14cd4f2
add support for offset info restore
Apr 19, 2022
672fb7c
add support for offset info restore
Apr 19, 2022
d15b156
add kafka read write
Apr 19, 2022
0fe3c15
update ops
Apr 19, 2022
97068b3
fix code style
Apr 20, 2022
36c6d2f
add support for asset_files from config, and fix latest_checkpoint bug
Apr 20, 2022
94db4c7
add for timestamp based offset for kafka
Apr 24, 2022
364e954
fix oss stop signal bug
Apr 24, 2022
4fcc95e
add oss stop hook
Apr 24, 2022
afad0e1
add api timeout setting, which would otherwise timeout inside a vpc
Apr 24, 2022
eb217bf
add incr update test
Apr 25, 2022
b244a8d
merge kafka updates
Apr 25, 2022
6032d76
add online train docs
Apr 26, 2022
fe90101
update auth pic
Apr 26, 2022
a5ed861
update auth pic
Apr 26, 2022
73e0b1d
update requirements
Apr 26, 2022
032362b
update requirements
Apr 26, 2022
8205274
update requirements.txt
Apr 26, 2022
fc868d8
update requirements.txt
Apr 26, 2022
2204c12
fix traceback bug
Apr 26, 2022
3a48089
update docs
Apr 26, 2022
e647e20
update docs
Apr 26, 2022
8ab6b6b
add summary for labels and predictions
Apr 26, 2022
eab6004
add summary for labels and predictions
Apr 26, 2022
be45bea
add support for evaluate offset restore
May 6, 2022
87b286c
add support for inputs
May 6, 2022
bea3121
fix chief redundant bug
May 6, 2022
faf88f2
update log func
May 7, 2022
5396bb6
optimize kafka train data loader
May 11, 2022
6c93ae8
update restore function
May 11, 2022
fd56a53
add dead_line stop hook
May 17, 2022
834ef70
add support for save increment updates to model_dir
Jun 6, 2022
26e4de1
fix fg convert bug
chengmengli06 Jun 6, 2022
9f42891
merge master
chengmengli06 Jun 8, 2022
66a19f8
add support for embedding variable in config convert
chengmengli06 Jun 8, 2022
eb36d62
fix ut bug
chengmengli06 Jun 8, 2022
6063ef1
update default combiner
Jun 19, 2022
13f031f
update datahub input
Jul 1, 2022
f630cf2
fix shared embedding not used bug
Jul 2, 2022
1c6face
add shuffle for kafka_input
Jul 5, 2022
97d280a
udpate kafka ops to filter empty feature samples
Jul 6, 2022
8653e45
merge filter_freq params for embedding variable
chengmengli06 Jul 12, 2022
848eaaf
add support for filter_freq and steps_to_live in fg.json
chengmengli06 Jul 12, 2022
30c3493
fix bug
chengmengli06 Jul 12, 2022
dbd22bf
set ev hash_bucket_size to MAX_HASH_BUCKET_SIZE
chengmengli06 Jul 13, 2022
d9ade24
fix bug for restore embedding variable
chengmengli06 Jul 15, 2022
e9252ec
add skip empty func
chengmengli06 Jul 17, 2022
27d04ed
update check script
chengmengli06 Jul 17, 2022
ec0794a
fix bug
chengmengli06 Jul 19, 2022
4fa3c3a
add save kafka
chengmengli06 Jul 20, 2022
d9d9c6a
merge kafka input
chengmengli06 Jul 20, 2022
bd0ef84
update processor to recent version
chengmengli06 Jul 20, 2022
93106e4
debug updates
chengmengli06 Jul 22, 2022
ba5fbaa
update incr_update so
chengmengli06 Jul 23, 2022
2f34172
fix read the doc bug
chengmengli06 Jul 24, 2022
35ea9c2
update docs
chengmengli06 Jul 25, 2022
e7ef1ac
refactor processor test and fix incr_record bug
chengmengli06 Jul 27, 2022
16d4de6
fix merge conflicts
chengmengli06 Jul 27, 2022
307d4ca
make small the hash_bucket_size
chengmengli06 Jul 27, 2022
374223b
fix dense update bug
chengmengli06 Jul 28, 2022
0b4af0c
support local incr_save
chengmengli06 Jul 29, 2022
ade25aa
update online train document
chengmengli06 Jul 29, 2022
8b1991f
add wait ckpt function in run.py
chengmengli06 Jul 30, 2022
d9284a9
fix typo
chengmengli06 Jul 30, 2022
73c199f
fix typo
chengmengli06 Jul 30, 2022
a33ae9a
fix export bug for share embedding
chengmengli06 Jul 30, 2022
6a91a08
break up lookup ops into small ops
chengmengli06 Aug 2, 2022
c506c11
fix incr update bug for ev
chengmengli06 Aug 8, 2022
f70f26e
add incr save
chengmengli06 Aug 9, 2022
5cb4646
update processor version
chengmengli06 Aug 10, 2022
092b881
fix ev not work bug
chengmengli06 Aug 23, 2022
247b40f
reset xml version update
chengmengli06 Sep 1, 2022
30877e9
remove duplicate code
chengmengli06 Sep 1, 2022
8e584ca
delete backup file
chengmengli06 Sep 2, 2022
926a43e
remove duplicate entry
chengmengli06 Sep 2, 2022
6b46199
reset odps test config changes
chengmengli06 Sep 2, 2022
0da779d
add online sample prepartion document
chengmengli06 Sep 6, 2022
9461048
update document format
chengmengli06 Sep 6, 2022
225518b
Update odl_sample.md
chengmengli06 Sep 6, 2022
fbb298e
Update odl_sample.md
chengmengli06 Sep 6, 2022
c6d439f
Update online_train.md
chengmengli06 Sep 6, 2022
fa37c60
update default value
chengmengli06 Sep 6, 2022
1b66662
add export scripts
chengmengli06 Sep 6, 2022
3b4ef53
update docs
chengmengli06 Sep 7, 2022
1fab603
add data science reference
chengmengli06 Sep 7, 2022
bd2de46
add more details for online sample
chengmengli06 Sep 7, 2022
e0a7605
update odl document
chengmengli06 Sep 8, 2022
64c5f92
update table format
chengmengli06 Sep 8, 2022
b23ce43
update table format
chengmengli06 Sep 8, 2022
771079c
update odl document
chengmengli06 Sep 8, 2022
830fe71
fix merge conflicts
chengmengli06 Sep 10, 2022
e8af096
fix test case merge bug
chengmengli06 Sep 10, 2022
ade6b5a
fix test case merge bug
chengmengli06 Sep 11, 2022
8e76e53
remove gpu requirement for odps test
chengmengli06 Sep 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include easy_rec/python/ops/1.12/*.so*
include easy_rec/python/ops/1.15/*.so*
Binary file added docs/images/odl_events_aggr.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/odl_kafka_sample.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/odl_label_gen.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/odl_label_sum.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/other/online_auth.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/other/online_train.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 5 additions & 3 deletions docs/source/emr_tensorboard.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# 在Header上启动tensorboard
# EMR tensorboard

1. 在Header上启动tensorboard

```bash
ssh root@39.104.103.119 # login to header
Expand All @@ -7,7 +9,7 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server
CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) tensorboard --logdir=hdfs:///user/experiments/mnist_train_v2 --port 6006
```

# 通过SSH隧道方式访问Header
2. 通过SSH隧道方式建立代理

- 详细见 [通过SSH隧道方式访问开源组件Web UI](https://help.aliyun.com/document_detail/169151.html?spm=a2c4g.11186623.6.598.658d727beowT5O)

Expand All @@ -17,4 +19,4 @@ ssh -N -D 8157 root@39.104.103.119
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --proxy-server="socks5://localhost:8157" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp/
```

在浏览器中输入: [http://emr-header-1:6006/](http://emr-header-1:6006/)
3. 在浏览器中输入: [http://emr-header-1:6006/](http://emr-header-1:6006/)
2 changes: 1 addition & 1 deletion docs/source/feature/data.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# 数据
# 数据格式

EasyRec作为阿里云PAI的推荐算法包,可以无缝对接MaxCompute的数据表,也可以读取OSS中的大文件,还支持E-MapReduce环境中的HDFS文件,也支持local环境中的csv文件。

Expand Down
3 changes: 1 addition & 2 deletions docs/source/feature/feature.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

在上一节介绍了输入数据包括MaxCompute表、csv文件、hdfs文件、OSS文件等,表或文件的一列对应一个特征。

在数据中可以有一个或者多个label字段,而特征比较丰富,支持的类型包括IdFeature,RawFeature,TagFeature,SequenceFeature,
ComboFeature。
在数据中可以有一个或者多个label字段,而特征比较丰富,支持的类型包括IdFeature,RawFeature,TagFeature,SequenceFeature, ComboFeature.

各种特征共用字段
----------------------------------------------------------------
Expand Down
224 changes: 224 additions & 0 deletions docs/source/feature/odl_sample.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# 样本

## 离线样本

- 离线样本可以使用SQL在MaxCompute或者Hive/Spark平台上构造.
- 可以使用 [推荐算法定制](https://pairec.yuque.com/books/share/72cb101c-e89d-453b-be81-0fadf09db4dd) 来自动生成离线特征 和 离线样本的流程.

## 实时样本

### 前置条件

- 服务开通:
- 除了MaxCompute, OSS, Dataworks, Hologres, 需要额外开通Flink, Datahub(或者Kafka)
- 产品具体开通手册,参考PAI-REC最佳实践里面的[开通手册](https://pairec.yuque.com/staff-nodpws/kwr84w/wz2og0)

### 数据准备

- 用户行为实时流
- 通过datahub接入
- 包含字段:
- event_type, 事件类型: exposure / click / buy / like / play /...
- event_time, 时间发生时间
- duration, 持续时间,可选
- request_id, 请求id
- user_id, 用户id
- item_id, 商品id
- 其他信息,可选
- 特征埋点(callback)
- 需要部署EAS callback服务, 服务配置和[EAS打分服务](./rtp_fg.html#id9)一致

- 单独部署EAS callback服务的原因是避免影响EAS打分服务的性能
- EAS callback对rt的要求低于EAS打分服务

- 通过[PAI-REC推荐引擎](http://pai-vision-data-hz.oss-cn-zhangjiakou.aliyuncs.com/pairec/docs/pairec/html/intro/callback_api.html)写入Datahub

- PAI-REC[配置](./pai_rec_callback_conf.md)
- 特征保存在Datahub topic: odl_callback_log, schema:

<table class="docutils" border=1>
<tr><th>request_id</th><th>request_time</th><th>module</th><th>user_id</th><th>item_id</th><th>scene</th><th>context_features</th><th>generate_features</th><th>raw_features</th><th>request_info</th><th>item_features</th><th>user_features</th><th>callback_log</th></tr>
<tr><td>string</td><td>bigint</td><td>string</td><td>string</td><td>string</td><td>string</td><td>string</td><td>string</td><td>string</td><td>string</td><td>string</td><td>string</td><td>string</td></tr>
<tr><td>请求id</td><td>请求时间</td><td>算法id</td><td>用户id</td><td>商品id</td><td>场景</td><td>context特征</td><td>FG生成的特征</td><td>原始特征</td><td>请求信息</td><td>商品特征</td><td>用户特征</td><td>callback日志</td></tr>
</table>

- request_id, user_id, item_id, request_time, generate_features 是后续需要的字段

- Custom推荐引擎:

- 调用EAS服务获取全埋点特征, 调用方式[参考文档](./rtp_fg.html#id10)
- 请求的item list为下发列表,不是排序阶段的列表
- EasyrecRequest.setDebugLevel(3), 生成EasyRec训练所需要的特征
- 通过PBResponse.getGenerateFeaturesMap获取生成的特征
- 特征写入Datahub topic: odl_callback_log

### 样本生成

1. 样本Events聚合(OnlineSampleAggr):

- 上传资源包: [rec-realtime-0.8-SNAPSHOT.jar](http://easyrec.oss-cn-beijing.aliyuncs.com/deploy/rec-realtime-0.8-SNAPSHOT.jar)
![image.png](../../images/odl_events_aggr.png)

- flink配置:

```sql
datahub.endpoint: 'http://dh-cn-beijing-int-vpc.aliyuncs.com/'
datahub.accessId: xxx
datahub.accessKey: xxx
datahub.inputTopic: user_behavior_log
datahub.sinkTopic: odl_sample_aggr
datahub.projectName: odl_sample
datahub.startInMs: '1655571600'

input.userid: user_id
input.itemid: item_id
input.request-id: request_id
input.event-type: event
input.event-duration: play_time
input.event-ts: ts
input.expose-event: exposure
input.event-extra: 'scene'
input.wait-positive-secs: '900'
```

- datahub参数配置
- accessId: 鉴权id
- accessKey: 鉴权secret
- projectName: 项目名称
- endpoint: 使用带vpc的endpoint
- inputTopic: 读取的datahub topic
- sinkTopic: 写入的datahub topic
- startInSecs: 开始读取的位点,单位是seconds
- input: datahub schema配置
- userid: userid字段名
- itemid: itemid字段名
- request-id: request_id字段名
- event-duration: event持续时间
- event-type: event类型字段
- event-ts: event发生时间字段(seconds)
- expose-event: 曝光事件类型
- 曝光事件延迟不再下发
- 其它事件延迟会补充下发
- event-extra: 其它event相关字段,多个字段以","分割
- wait-positive-secs: 等待正样本的时间, 单位是seconds
- datahub topic schema:
- inputTopic: user_behavior_log
<table class="docutils" border=1>
<tr><th> request_id </th><th> user_id </th><th> item_id </th><th> play_time </th><th> event </th><th> ts </th><th> scene </th><th> ... </th>
<tr><td> string </td><td> string </td><td> string </td><td> double </td><td> string </td><td> bigint </td><td> string </td><td> ... </td>
</table>
- sinkTopic: odl_sample_aggr
<table class="docutils" border=1>
<tr> <th>request_id </th><th> user_id </th><th> item_id </th><th> events </th></tr>
<tr> <td> string </td><td> string </td><td> string </td><td> string </td></tr>
</table>
- events数据格式:
```json
[
{"duration":6493,"eventTime":1659667790,"eventType":"play","properties":{"scene":"main"}},
{"duration":6259,"eventTime":1659667796,"eventType":"play","properties":{"scene":"main"}}
]
```

1. label生成, 目前提供三种[udf](http://easyrec.oss-cn-beijing.aliyuncs.com/deploy/label_gen.zip):

- playtime: sum_over(events, 'playtime')
- click: has_event(events, 'click')
- min_over / max_over: min_over(events, 'eventTime')
- 可以使用python自定义任意udf, [参考文档](https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html)
- udf 上传([vvp-console](https://vvp.console.aliyun.com/)):
![image.png](../../images/odl_label_gen.png)
- 示例:
```sql
insert into odl_sample_with_lbl
select request_id, user_id, item_id,
ln(if(playtime < 0, 0, playtime) + 1) as ln_play_time,
cast((playtime > 10 or is_like > 0) as bigint) as is_valid_play,
is_like, ts
from (
select *, sum_over(events, 'duration', TRUE) / 1000.0 as playtime,
has_event(events, 'likes') as is_like,
min_over(events, 'eventTime') as ts
from odl_sample_aggr
);
```

1. 样本join全埋点特征

```sql
create temporary view sample_view as
select a.request_id, a.user_id, a.item_id, a.ln_play_time, a.is_valid_play, feature, b.request_time
from odl_sample_with_lbl a
inner join (
select * from (
select request_id, item_id, request_time, generate_features as feature, ts,
row_number() over(partition by request_id, item_id order by proctime() asc) as rn
from odl_callback_log
where `module` = 'item' and (generate_features is not null and generate_features <> '')
) where rn = 1
) b
on a.request_id = b.request_id and a.item_id = b.item_id
where a.ts between b.ts - INTERVAL '30' SECONDS and b.ts + INTERVAL '30' MINUTE;
```

- odl_callback_log需要做去重, 防止因为重复调用造成样本重复
- flink配置开启ttl(millisecond), 控制state大小:
```sql
table.exec.state.ttl: '2400000'
```
- ttl(miliseconds)的设置考虑两个因素:
- odl_sample_with_lbl相对请求时间request_time的延迟
- ttl \< 相对延迟, 就会有样本丢失
- 统计相对延迟:
- 将odl_sample_with_lbl / odl_callback_log落到MaxCompute
- 按request_id join 计算ts的差异
- ttl越大state越大, 保存checkpoint时间越长, 性能下降
- 存储引擎开启gemini kv分离(generate_features字段值很大):
```sql
state.backend.gemini.kv.separate.mode: GLOBAL_ENABLE
state.backend.gemini.kv.separate.value.size.threshold: '500'
```

1. 实时样本写入Datahub / Kafka

```sql
create temporary table odl_sample_with_fea_and_lbl(
`request_id` string,
`user_id` string,
`item_id` string,
`ln_play_time` double,
`is_valid_play` bigint,
`feature` string,
`request_time` bigint
) WITH (
'connector' = 'datahub',
'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/',
'project' = 'odl_sample',
'topic' = 'odl_sample_with_fea_and_lbl',
'subId' = '1656xxxxxx',
'accessId' = 'LTAIxxxxxxx',
'accessKey' = 'Q82Mxxxxxxxx'
);
insert into odl_sample_with_fea_and_lbl
select * from sample_view;
```

- subId: datahub subscription id

### 数据诊断

- 实时样本需要关注下面的信息和离线是否一致:

- 样本总量: 因为样本延迟和全埋点特征join不上,导致样本量下降,需要增加interval join的区间和state ttl
- 正负样本比例: 因为正样本延迟到达导致的延迟下发导致在线正样本占比偏低, 增加wait-positive-secs
- 特征一致性: EAS callback服务和EAS打分引擎配置是否一样.

- 校验方法:

- 实时样本落到maxcompute, 和离线的数据作对比
- EasyRec训练的summary里面查看label的正负样本比
![image.png](../../images/odl_label_sum.png)

### 实时训练

- 启动训练: [文档](../online_train.md)
Loading