Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] update hive predictor: load predict result to hive #215

Merged
merged 3 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/source/feature/feature.rst
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ rank模型中配置相应字段:
- embedding\_wise\_variational\_dropout: 变分dropout层维度是否为embedding维度(true:embedding维度;false:feature维度;默认false)
- `启动训练 <../train.md>`_

查看特征重要性:
查看特征重要性:

.. code:: sql

Expand All @@ -433,12 +433,12 @@ rank模型中配置相应字段:
-Dbuckets='oss://{oss_bucket}/'
-Darn='acs:ram::xxx:role/aliyunodpspaidefaultrole'
-DossHost='oss-{region}-internal.aliyuncs.com';

- extra_params:
- config_path: EasyRec config path
- output_dir: 输出目录
- topk: 输出top_k重要的特征
- visualize: 输出重要性可视化的图
- visualize: 输出重要性可视化的图
- fg_path: `RTP-FG <./rtp_fg.md>`_ json配置文件, 可选
- arn: `rolearn <https://ram.console.aliyun.com/roles/AliyunODPSPAIDefaultRole>`_ to access oss.
- version: EasyRec version, 默认stable
Expand Down
20 changes: 10 additions & 10 deletions docs/source/predict/MaxCompute 离线预测.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ pai -name easy_rec_ext
- 其他类型: 暂不支持
- 二分类模型(要求num_class=1),导出字段:logits、probs,对应: sigmoid之前的值/概率
- 回归模型,导出字段: y,对应: 预测值
- 多分类模型(num_class > 1),导出字段:
- logits: string(json), softmax之前的vector, shape[num_class]
- probs: string(json), softmax之后的vector, shape[num_class]
- logits_y: logits[y], float, 类别y对应的softmax之前的概率
- probs_y: probs[y], float, 类别y对应的概率
- 多分类模型(num_class > 1),导出字段:
- logits: string(json), softmax之前的vector, shape\[num_class\]
- probs: string(json), softmax之后的vector, shape\[num_class\]
- logits_y: logits\[y\], float, 类别y对应的softmax之前的概率
- probs_y: probs\[y\], float, 类别y对应的概率
- y: 类别id, = argmax(probs_y), int, 概率最大的类别
- 示例:
```sql
Expand All @@ -70,7 +70,7 @@ pai -name easy_rec_ext
- 输出信息:
```text
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['adgroup_id'] tensor_info:
Expand All @@ -89,11 +89,11 @@ pai -name easy_rec_ext
dtype: DT_FLOAT
shape: (-1)
name: Sigmoid:0
Method name is: tensorflow/serving/predict
Method name is: tensorflow/serving/predict
```
- 可以看到导出的字段包括:
- logits, float
- probs, float
- 可以看到导出的字段包括:
- logits, float
- probs, float
- model_outputs: 导出saved_model时模型的导出字段,可以不指定,默认和output_cols一致
- 如果output_cols和model_outputs不一致时需要指定,如:
```sql
Expand Down
2 changes: 1 addition & 1 deletion docs/source/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ sh pai_jobs/deploy_ext.sh -V ${VERSION} -G
ls -lh pai_jobs/easy_rec_ext_${VERSION}_res.tar.gz
```

将资源包pai_jobs/easy_rec_ext_${VERSION}_res.tar.gz上传至ODPS
将资源包pai_jobs/easy_rec_ext\_${VERSION}\_res.tar.gz上传至ODPS

![newresource.png](../images/release/newresource.png)

Expand Down
2 changes: 1 addition & 1 deletion easy_rec/python/feature_column/feature_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def _build_partitioner(self, config):
else:
return None

def _add_shared_embedding_column(self, embedding_name, fc, deep=True):
def _add_shared_embedding_column(self, embedding_name, fc, deep=True):
if deep:
curr_id = len(self._deep_share_embed_columns[embedding_name])
self._deep_share_embed_columns[embedding_name].append(fc)
Expand Down
68 changes: 59 additions & 9 deletions easy_rec/python/inference/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from tensorflow.python.saved_model import signature_constants

from easy_rec.python.protos.dataset_pb2 import DatasetConfig
from easy_rec.python.utils import numpy_utils
from easy_rec.python.utils import tf_utils
from easy_rec.python.utils.check_utils import check_split
from easy_rec.python.utils.config_util import get_configs_from_pipeline_file
from easy_rec.python.utils.config_util import get_input_name_from_fg_json
Expand All @@ -28,7 +30,6 @@
from easy_rec.python.utils.load_class import get_register_class_meta
from easy_rec.python.utils.odps_util import odps_type_to_input_type
from easy_rec.python.utils.tf_utils import get_tf_type
from easy_rec.python.utils import numpy_utils

if tf.__version__ >= '2.0':
tf = tf.compat.v1
Expand Down Expand Up @@ -394,6 +395,9 @@ def out_of_range_exception(self):
def _write_line(self, table_writer, outputs):
pass

def load_to_table(self, output_path):
pass

def _get_fg_json(self, fg_json_path, model_path):
if fg_json_path and gfile.Exists(fg_json_path):
logging.info('load fg_json_path: ', fg_json_path)
Expand Down Expand Up @@ -502,7 +506,10 @@ def _parse_value(all_vals):
if outputs[x].dtype == np.object:
outputs[x] = [val.decode('utf-8') for val in outputs[x]]
elif len(outputs[x].shape) > 1:
outputs[x] = [json.dumps(val, cls=numpy_utils.NumpyEncoder) for val in outputs[x]]
outputs[x] = [
json.dumps(val, cls=numpy_utils.NumpyEncoder)
for val in outputs[x]
]
for k in self._reserved_cols:
if all_vals[k].dtype == np.object:
all_vals[k] = [val.decode('utf-8') for val in all_vals[k]]
Expand All @@ -529,6 +536,7 @@ def _parse_value(all_vals):
logging.info('Final_time_stats: read: %.2f predict: %.2f write: %.2f' %
(sum_t0, sum_t1, sum_t2))
table_writer.close()
self.load_to_table(output_path)
logging.info('Predict %s done.' % input_path)

def predict(self, input_data_dict_list, output_names=None, batch_size=1):
Expand Down Expand Up @@ -710,7 +718,7 @@ def _get_dataset(self, input_path, num_parallel_calls, batch_size, slice_num,
def _get_writer(self, output_path, slice_id):
if not gfile.Exists(output_path):
gfile.MakeDirs(output_path)
res_path = os.path.join(output_path, 'slice_%d.csv' % slice_id)
res_path = os.path.join(output_path, 'part-%d.csv' % slice_id)
table_writer = gfile.GFile(res_path, 'w')
table_writer.write(
self._output_sep.join(self._output_cols + self._reserved_cols) + '\n')
Expand Down Expand Up @@ -859,16 +867,30 @@ def _get_dataset(self, input_path, num_parallel_calls, batch_size, slice_num,
output_types=list_type,
output_shapes=list_shapes,
args=(input_path,))

return dataset

def get_table_info(self, output_path):
partition_name, partition_val = None, None
if len(output_path.split('/')) == 2:
table_name, partition = output_path.split('/')
partition_name, partition_val = partition.split('=')
else:
table_name = output_path
return table_name, partition_name, partition_val

def _get_writer(self, output_path, slice_id):
if not gfile.Exists(output_path):
gfile.MakeDirs(output_path)
res_path = os.path.join(output_path, 'slice_%d.csv' % slice_id)
table_name, partition_name, partition_val = self.get_table_info(
output_path)
is_exist = self._hive_util.is_table_or_partition_exist(
table_name, partition_name, partition_val)
assert not is_exist, '%s is already exists. Please drop it.' % output_path

output_path = output_path.replace('.', '/')
self._hdfs_path = 'hdfs://%s:9000/user/easy_rec/%s' % (self._hive_config.host, output_path)
if not gfile.Exists(self._hdfs_path):
gfile.MakeDirs(self._hdfs_path)
res_path = os.path.join(self._hdfs_path, 'part-%d.csv' % slice_id)
table_writer = gfile.GFile(res_path, 'w')
table_writer.write(
self._output_sep.join(self._output_cols + self._reserved_cols) + '\n')
return table_writer

def _write_line(self, table_writer, outputs):
Expand All @@ -881,6 +903,34 @@ def _get_reserve_vals(self, reserved_cols, output_cols, all_vals, outputs):
[all_vals[k] for k in reserved_cols]
return reserve_vals

def load_to_table(self, output_path):
schema = ''
for output_col_name in self._output_cols:
tf_type = self._predictor_impl._outputs_map[output_col_name].dtype
col_type = tf_utils.get_col_type(tf_type)
schema += output_col_name + ' ' + col_type + ','

for output_col_name in self._reserved_cols:
assert output_col_name in self._all_cols, 'Column: %s not exists.' % output_col_name
idx = self._all_cols.index(output_col_name)
output_col_types = self._all_col_types[idx]
schema += output_col_name + ' ' + output_col_types + ','
schema = schema.rstrip(',')

table_name, partition_name, partition_val = self.get_table_info(
output_path)
if partition_name and partition_val:
sql = "create table if not exists %s (%s) PARTITIONED BY (%s string)" % \
(table_name, schema, partition_name)
self._hive_util.run_sql(sql)
sql = "LOAD DATA INPATH '%s/*' INTO TABLE %s PARTITION (%s=%s)" % \
(self._hdfs_path, table_name, partition_name, partition_val)
self._hive_util.run_sql(sql)
else:
sql = "create external table if not exists %s (%s) location '%s'" % \
(table_name, schema, self._hdfs_path)
self._hive_util.run_sql(sql)

@property
def out_of_range_exception(self):
return (tf.errors.OutOfRangeError)
14 changes: 8 additions & 6 deletions easy_rec/python/input/csv_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def _build(self, mode, params):
file_paths.extend(tf.gfile.Glob(x))
assert len(file_paths) > 0, 'match no files with %s' % self._input_path

assert not file_paths[0].endswith('.tar.gz'), 'could only support .csv or .gz(not .tar.gz) files.'
assert not file_paths[0].endswith(
'.tar.gz'), 'could only support .csv or .gz(not .tar.gz) files.'

compression_type = 'GZIP' if file_paths[0].endswith('.gz') else ''
if compression_type:
Expand All @@ -100,7 +101,7 @@ def _build(self, mode, params):
logging.info('train files[%d]: %s' %
(len(file_paths), ','.join(file_paths)))
dataset = tf.data.Dataset.from_tensor_slices(file_paths)

if self._data_config.file_shard:
dataset = self._safe_shard(dataset)

Expand All @@ -112,9 +113,9 @@ def _build(self, mode, params):
# as the same data will be read multiple times
parallel_num = min(num_parallel_calls, len(file_paths))
dataset = dataset.interleave(
lambda x: tf.data.TextLineDataset(x,
compression_type=compression_type
).skip(int(self._with_header)),
lambda x: tf.data.TextLineDataset(
x, compression_type=compression_type).skip(
int(self._with_header)),
cycle_length=parallel_num,
num_parallel_calls=parallel_num)

Expand All @@ -130,7 +131,8 @@ def _build(self, mode, params):
else:
logging.info('eval files[%d]: %s' %
(len(file_paths), ','.join(file_paths)))
dataset = tf.data.TextLineDataset(file_paths,
dataset = tf.data.TextLineDataset(
file_paths,
compression_type=compression_type).skip(int(self._with_header))
dataset = dataset.repeat(1)

Expand Down
4 changes: 2 additions & 2 deletions easy_rec/python/input/rtp_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ def _build(self, mode, params):
tf.data.TextLineDataset,
cycle_length=parallel_num,
num_parallel_calls=parallel_num)

if not self._data_config.file_shard:
dataset = self._safe_shard(dataset)

if self._data_config.shuffle:
dataset = dataset.shuffle(
self._data_config.shuffle_buffer_size,
Expand Down
3 changes: 2 additions & 1 deletion easy_rec/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,8 @@ def export(export_dir,
asset_file_dict = {}
for asset_file in asset_files.split(','):
asset_file = asset_file.strip()
if ':' not in asset_file or asset_file.startswith('oss:') or asset_file.startswith('hdfs:'):
if ':' not in asset_file or asset_file.startswith(
'oss:') or asset_file.startswith('hdfs:'):
_, asset_name = os.path.split(asset_file)
else:
asset_name, asset_file = asset_file.split(':', 1)
Expand Down
2 changes: 1 addition & 1 deletion easy_rec/python/model/easy_rec_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from tensorflow.python.framework import ops
from tensorflow.python.framework.sparse_tensor import SparseTensor
from tensorflow.python.ops import variables
from tensorflow.python.platform import gfile
from tensorflow.python.saved_model import signature_constants
from tensorflow.python.training import saver
from tensorflow.python.platform import gfile

from easy_rec.python.builders import optimizer_builder
from easy_rec.python.compat import optimizers
Expand Down
19 changes: 11 additions & 8 deletions easy_rec/python/model/rank_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ def _output_to_prediction_impl(self,
else:
probs = tf.nn.softmax(output, axis=1)
prediction_dict['logits' + suffix] = output
prediction_dict['probs' + suffix] = probs
prediction_dict['logits' + suffix + '_y'] = math_ops.reduce_max(output, axis=1)
prediction_dict['probs' + suffix + '_y'] = math_ops.reduce_max(probs, axis=1)
prediction_dict['probs' + suffix] = probs
prediction_dict['logits' + suffix + '_y'] = math_ops.reduce_max(
output, axis=1)
prediction_dict['probs' + suffix + '_y'] = math_ops.reduce_max(
probs, axis=1)
prediction_dict['y' + suffix] = tf.argmax(output, axis=1)
elif loss_type == LossType.L2_LOSS:
output = tf.squeeze(output, axis=1)
Expand Down Expand Up @@ -177,9 +179,8 @@ def _build_metric_impl(self,
label = tf.to_int64(self._labels[label_name])
uids = self._feature_dict[metric.gauc.uid_field]
if isinstance(uids, tf.sparse.SparseTensor):
uids = tf.sparse_to_dense(uids.indices,
uids.dense_shape, uids.values,
default_value='')
uids = tf.sparse_to_dense(
uids.indices, uids.dense_shape, uids.values, default_value='')
uids = tf.reshape(uids, [-1])
metric_dict['gauc' + suffix] = metrics_lib.gauc(
label,
Expand Down Expand Up @@ -426,8 +427,10 @@ def _get_outputs_impl(self, loss_type, num_class=1, suffix=''):
if num_class == 1:
return ['probs' + suffix, 'logits' + suffix]
else:
return ['y' + suffix, 'probs' + suffix, 'logits' + suffix,
'probs' + suffix + '_y', 'logits' + suffix + '_y']
return [
'y' + suffix, 'probs' + suffix, 'logits' + suffix,
'probs' + suffix + '_y', 'logits' + suffix + '_y'
]
elif loss_type in [LossType.L2_LOSS, LossType.SIGMOID_L2_LOSS]:
return ['y' + suffix]
else:
Expand Down
9 changes: 7 additions & 2 deletions easy_rec/python/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from easy_rec.python.inference.predictor import HivePredictor
from easy_rec.python.main import predict
from easy_rec.python.utils import config_util
from easy_rec.python.utils.hive_utils import HiveUtils
from easy_rec.python.utils import numpy_utils
from easy_rec.python.utils.hive_utils import HiveUtils

if tf.__version__ >= '2.0':
tf = tf.compat.v1
Expand Down Expand Up @@ -56,8 +56,13 @@ def main(argv):

if FLAGS.saved_model_dir:
logging.info('Predict by saved_model.')
if FLAGS.pipeline_config_path:
pipeline_config_path = FLAGS.pipeline_config_path
else:
pipeline_config_path = config_util.search_pipeline_config(
FLAGS.saved_model_dir)
pipeline_config = config_util.get_configs_from_pipeline_file(
FLAGS.pipeline_config_path, False)
pipeline_config_path, False)
if pipeline_config.WhichOneof('train_path') == 'hive_train_input':
all_cols, all_col_types = HiveUtils(
data_config=pipeline_config.data_config,
Expand Down
2 changes: 1 addition & 1 deletion easy_rec/python/protos/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ message EasyRecConfig {
// based on fg_json.
// * After generation, a prefix '!' is added:
// fg_json_path = '!' + fg_json_path
// indicates config update is already done, and should not
// indicates config update is already done, and should not
// be updated anymore. In this way, we make load_fg_json_to_config
// function reentrant.
// This step is done before edit_config_json to take effect.
Expand Down
3 changes: 2 additions & 1 deletion easy_rec/python/test/export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ def _post_check_func(pipeline_config):
post_check_func=_post_check_func))

def test_multi_class_predict(self):
self._export_test('samples/model_config/deepfm_multi_cls_on_avazu_ctr.config',
self._export_test(
'samples/model_config/deepfm_multi_cls_on_avazu_ctr.config',
extract_data_func=self._extract_data,
keys=['probs', 'logits', 'probs_y', 'logits_y', 'y'])

Expand Down
Loading