Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]: add support for frequency filter and steps_to_alive for embedding variable #231

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions easy_rec/python/compat/feature_column/feature_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -2521,7 +2521,7 @@ class _SharedEmbeddingColumn(
('categorical_column', 'dimension', 'combiner', 'initializer',
'shared_embedding_collection_name', 'ckpt_to_load_from',
'tensor_name_in_ckpt', 'max_norm', 'trainable', 'partitioner',
'use_embedding_variable'))):
'ev_params'))):
"""See `embedding_column`."""

@property
Expand Down Expand Up @@ -2581,7 +2581,7 @@ def _get_dense_tensor_internal(self,
'hood.'.format(shared_embedding_collection))
embedding_weights = shared_embedding_collection[0]
if embedding_weights.get_shape(
) != embedding_shape and not self.use_embedding_variable:
) != embedding_shape and not self.ev_params is not None:
raise ValueError(
'Shared embedding collection {} contains variable {} of '
'unexpected shape {}. Expected shape is {}. '
Expand All @@ -2592,7 +2592,7 @@ def _get_dense_tensor_internal(self,
embedding_weights.name,
embedding_weights.get_shape(), embedding_shape))
else:
if not self.use_embedding_variable:
if self.ev_params is None:
embedding_weights = variable_scope.get_variable(
name='embedding_weights',
shape=embedding_shape,
Expand All @@ -2617,7 +2617,9 @@ def _get_dense_tensor_internal(self,
initializer=initializer,
trainable=self.trainable and trainable,
partitioner=self.partitioner,
collections=weight_collections)
collections=weight_collections,
steps_to_live=self.ev_params.steps_to_live if self.ev_params is not None else None,
filter_options=variables.CounterFilterOptions(self.ev_params.filter_freq))

ops.add_to_collection(self.shared_embedding_collection_name,
embedding_weights)
Expand All @@ -2629,7 +2631,7 @@ def _get_dense_tensor_internal(self,
self.ckpt_to_load_from, {self.tensor_name_in_ckpt: to_restore})

# Return embedding lookup result.
if self.use_embedding_variable:
if self.ev_params is not None:
return ev_embedding_ops.safe_embedding_lookup_sparse(
embedding_weights=embedding_weights,
sparse_ids=sparse_ids,
Expand Down
20 changes: 11 additions & 9 deletions easy_rec/python/compat/feature_column/feature_column_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ def embedding_column(categorical_column,
max_norm=None,
trainable=True,
partitioner=None,
use_embedding_variable=False):
ev_params=None):
"""`DenseColumn` that converts from sparse, categorical input.

Use this when your inputs are sparse, but you want to convert them to a dense
Expand Down Expand Up @@ -913,7 +913,7 @@ def model_fn(features, ...):
max_norm=max_norm,
trainable=trainable,
partitioner=partitioner,
use_embedding_variable=use_embedding_variable)
ev_params=ev_params)


def shared_embedding_columns(categorical_columns,
Expand All @@ -926,7 +926,7 @@ def shared_embedding_columns(categorical_columns,
max_norm=None,
trainable=True,
partitioner=None,
use_embedding_variable=False):
ev_params=None):
"""List of dense columns that convert from sparse, categorical input.

This is similar to `embedding_column`, except that it produces a list of
Expand Down Expand Up @@ -1081,7 +1081,7 @@ def model_fn(features, ...):
max_norm=max_norm,
trainable=trainable,
partitioner=partitioner,
use_embedding_variable=use_embedding_variable))
ev_params=ev_params))

return result

Expand Down Expand Up @@ -3433,7 +3433,7 @@ class EmbeddingColumn(
'EmbeddingColumn',
('categorical_column', 'dimension', 'combiner', 'initializer',
'ckpt_to_load_from', 'tensor_name_in_ckpt', 'max_norm', 'trainable',
'partitioner', 'use_embedding_variable'))):
'partitioner', 'ev_params'))):
"""See `embedding_column`."""

@property
Expand Down Expand Up @@ -3507,7 +3507,7 @@ def _get_dense_tensor_internal_helper(self, sparse_tensors,
self.ckpt_to_load_from, {self.tensor_name_in_ckpt: to_restore})

# Return embedding lookup result.
if not self.use_embedding_variable:
if self.ev_params is None:
return embedding_ops.safe_embedding_lookup_sparse(
embedding_weights=embedding_weights,
sparse_ids=sparse_ids,
Expand Down Expand Up @@ -3538,7 +3538,7 @@ def _old_get_dense_tensor_internal(self, sparse_tensors, weight_collections,
if (weight_collections and
ops.GraphKeys.GLOBAL_VARIABLES not in weight_collections):
weight_collections.append(ops.GraphKeys.GLOBAL_VARIABLES)
if not self.use_embedding_variable:
if self.ev_params is None:
embedding_weights = variable_scope.get_variable(
name='embedding_weights',
shape=embedding_shape,
Expand All @@ -3563,7 +3563,9 @@ def _old_get_dense_tensor_internal(self, sparse_tensors, weight_collections,
initializer=initializer,
trainable=self.trainable and trainable,
partitioner=self.partitioner,
collections=weight_collections)
collections=weight_collections,
steps_to_live=self.ev_params.steps_to_live if self.ev_params is not None else None,
filter_options=variables.CounterFilterOptions(self.ev_params.filter_freq))

# Write the embedding configuration to RTP-specified collections. This will inform RTP to
# optimize this embedding operation.
Expand All @@ -3572,7 +3574,7 @@ def _old_get_dense_tensor_internal(self, sparse_tensors, weight_collections,
variable=embedding_weights,
bucket_size=self.categorical_column._num_buckets,
combiner=self.combiner,
is_embedding_var=self.use_embedding_variable)
is_embedding_var=(self.ev_params is not None))
embedding_attrs['name'] = layer_utils.unique_name_in_collection(
compat_ops.GraphKeys.RANK_SERVICE_EMBEDDING, embedding_attrs['name'])
layer_utils.update_attr_to_collection(
Expand Down
46 changes: 33 additions & 13 deletions easy_rec/python/feature_column/feature_column.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
import logging
import collections

import tensorflow as tf

Expand Down Expand Up @@ -32,6 +33,7 @@ def __init__(self, embedding_name, index, sequence_combiner=None):
self.index = index
self.sequence_combiner = sequence_combiner

EVParams = collections.namedtuple("EVParams", ["filter_freq", "steps_to_live"])

class FeatureColumnParser(object):
"""Parse and generate feature columns."""
Expand All @@ -40,7 +42,7 @@ def __init__(self,
feature_configs,
wide_deep_dict={},
wide_output_dim=-1,
use_embedding_variable=False):
ev_params=None):
"""Initializes a `FeatureColumnParser`.

Args:
Expand All @@ -51,7 +53,7 @@ def __init__(self,
easy_rec.python.layers.input_layer.InputLayer, it is defined in
easy_rec.python.protos.easy_rec_model_pb2.EasyRecModel.feature_groups
wide_output_dim: output dimension for wide columns
use_embedding_variable: use EmbeddingVariable, which is provided by pai-tf
ev_params: params used by EmbeddingVariable, which is provided by pai-tf
"""
self._feature_configs = feature_configs
self._wide_output_dim = wide_output_dim
Expand All @@ -63,13 +65,16 @@ def __init__(self,
self._share_embed_names = {}
self._share_embed_infos = {}

self._use_embedding_variable = use_embedding_variable
self._vocab_size = {}

self._global_ev_params = None
if ev_params is not None:
self._global_ev_params = self._build_ev_params(ev_params)

def _cmp_embed_config(a, b):
return a.embedding_dim == b.embedding_dim and a.combiner == b.combiner and\
a.initializer == b.initializer and a.max_partitions == b.max_partitions and\
a.use_embedding_variable == b.use_embedding_variable
a.embedding_name == b.embedding_name

for config in self._feature_configs:
if not config.HasField('embedding_name'):
Expand Down Expand Up @@ -133,8 +138,12 @@ def _cmp_embed_config(a, b):
initializer = hyperparams_builder.build_initializer(
self._share_embed_infos[embed_name].initializer)
partitioner = self._build_partitioner(self._share_embed_infos[embed_name])
use_ev = self._use_embedding_variable or \
self._share_embed_infos[embed_name].use_embedding_variable

if self._share_embed_infos[embed_name].HasField('ev_params'):
ev_params = self._build_ev_params(self._share_embed_infos[embed_name].ev_params)
else:
ev_params = self._global_ev_params

# for handling share embedding columns
share_embed_fcs = feature_column.shared_embedding_columns(
self._deep_share_embed_columns[embed_name],
Expand All @@ -143,7 +152,7 @@ def _cmp_embed_config(a, b):
shared_embedding_collection_name=embed_name,
combiner=self._share_embed_infos[embed_name].combiner,
partitioner=partitioner,
use_embedding_variable=use_ev)
ev_params=ev_params)
self._deep_share_embed_columns[embed_name] = share_embed_fcs
# for handling wide share embedding columns
if len(self._wide_share_embed_columns[embed_name]) == 0:
Expand All @@ -155,7 +164,7 @@ def _cmp_embed_config(a, b):
shared_embedding_collection_name=embed_name + '_wide',
combiner='sum',
partitioner=partitioner,
use_embedding_variable=use_ev)
ev_params=ev_params)
self._wide_share_embed_columns[embed_name] = share_embed_fcs

for fc_name in self._deep_columns:
Expand Down Expand Up @@ -474,7 +483,7 @@ def parse_sequence_feature(self, config):

def _build_partitioner(self, config):
if config.max_partitions > 1:
if self._use_embedding_variable or config.use_embedding_variable:
if self._global_ev_params is not None or config.HasField('ev_params'):
# pai embedding_variable should use fixed_size_partitioner
return tf.fixed_size_partitioner(num_shards=config.max_partitions)
else:
Expand Down Expand Up @@ -517,14 +526,17 @@ def _add_wide_embedding_column(self, fc, config):
initializer = None
if config.HasField('initializer'):
initializer = hyperparams_builder.build_initializer(config.initializer)
if config.HasField('ev_params'):
ev_params = self._build_ev_params(config.ev_params)
else:
ev_params = self._global_ev_params
wide_fc = feature_column.embedding_column(
fc,
self._wide_output_dim,
combiner='sum',
initializer=initializer,
partitioner=self._build_partitioner(config),
use_embedding_variable=self._use_embedding_variable or
config.use_embedding_variable)
ev_params=ev_params)
self._wide_columns[feature_name] = wide_fc

def _add_deep_embedding_column(self, fc, config):
Expand All @@ -538,14 +550,17 @@ def _add_deep_embedding_column(self, fc, config):
initializer = None
if config.HasField('initializer'):
initializer = hyperparams_builder.build_initializer(config.initializer)
if config.HasField('ev_params'):
ev_params = self._build_ev_params(config.ev_params)
else:
ev_params = self._global_ev_params
fc = feature_column.embedding_column(
fc,
config.embedding_dim,
combiner=config.combiner,
initializer=initializer,
partitioner=self._build_partitioner(config),
use_embedding_variable=self._use_embedding_variable or
config.use_embedding_variable)
ev_params=ev_params)
fc.max_seq_length = config.max_seq_len if config.HasField(
'max_seq_len') else -1

Expand All @@ -555,3 +570,8 @@ def _add_deep_embedding_column(self, fc, config):
if config.HasField('sequence_combiner'):
fc.sequence_combiner = config.sequence_combiner
self._sequence_columns[feature_name] = fc

def _build_ev_params(self, ev_params):
"""Build embedding_variables params."""
ev_params = EVParams(ev_params.filter_freq, ev_params.steps_to_live if ev_params.steps_to_live > 0 else None)
return ev_params
6 changes: 3 additions & 3 deletions easy_rec/python/layers/input_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self,
feature_groups_config,
variational_dropout_config=None,
wide_output_dim=-1,
use_embedding_variable=False,
ev_params=None,
embedding_regularizer=None,
kernel_regularizer=None,
is_training=False):
Expand All @@ -54,13 +54,13 @@ def __init__(self,
self._seq_input_layer = seq_input_layer.SeqInputLayer(
feature_configs,
self._seq_feature_groups_config,
use_embedding_variable=use_embedding_variable)
ev_params=ev_params)
wide_and_deep_dict = self.get_wide_deep_dict()
self._fc_parser = FeatureColumnParser(
feature_configs,
wide_and_deep_dict,
wide_output_dim,
use_embedding_variable=use_embedding_variable)
ev_params=ev_params)

self._embedding_regularizer = embedding_regularizer
self._kernel_regularizer = kernel_regularizer
Expand Down
4 changes: 2 additions & 2 deletions easy_rec/python/layers/seq_input_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ class SeqInputLayer(object):
def __init__(self,
feature_configs,
feature_groups_config,
use_embedding_variable=False):
ev_params=None):
self._feature_groups_config = {
x.group_name: x for x in feature_groups_config
}
wide_and_deep_dict = self.get_wide_deep_dict()
self._fc_parser = FeatureColumnParser(
feature_configs,
wide_and_deep_dict,
use_embedding_variable=use_embedding_variable)
ev_params=ev_params)

def __call__(self,
features,
Expand Down
7 changes: 6 additions & 1 deletion easy_rec/python/model/easy_rec_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def __init__(self,
self._is_training = is_training
self._feature_dict = features

# embedding variable parameters
self._global_ev_params = None
if model_config.HasField('ev_params'):
self._global_ev_params = model_config.ev_params

self._emb_reg = regularizers.l2_regularizer(self.embedding_regularization)
self._l2_reg = regularizers.l2_regularizer(self.l2_regularization)
# only used by model with wide feature groups, e.g. WideAndDeep
Expand Down Expand Up @@ -83,7 +88,7 @@ def build_input_layer(self, model_config, feature_configs):
feature_configs,
model_config.feature_groups,
wide_output_dim=self._wide_output_dim,
use_embedding_variable=model_config.use_embedding_variable,
ev_params=self._global_ev_params,
embedding_regularizer=self._emb_reg,
kernel_regularizer=self._l2_reg,
variational_dropout_config=model_config.variational_dropout
Expand Down
3 changes: 2 additions & 1 deletion easy_rec/python/model/multi_tower_bst.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def __init__(self,
super(MultiTowerBST, self).__init__(model_config, feature_configs, features,
labels, is_training)
self._seq_input_layer = seq_input_layer.SeqInputLayer(
feature_configs, model_config.seq_att_groups)
feature_configs, model_config.seq_att_groups,
ev_params=self._global_ev_params)
assert self._model_config.WhichOneof('model') == 'multi_tower', \
'invalid model config: %s' % self._model_config.WhichOneof('model')
self._model_config = self._model_config.multi_tower
Expand Down
3 changes: 2 additions & 1 deletion easy_rec/python/model/multi_tower_din.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def __init__(self,
super(MultiTowerDIN, self).__init__(model_config, feature_configs, features,
labels, is_training)
self._seq_input_layer = seq_input_layer.SeqInputLayer(
feature_configs, model_config.seq_att_groups)
feature_configs, model_config.seq_att_groups,
ev_params=self._global_ev_params)
assert self._model_config.WhichOneof('model') == 'multi_tower', \
'invalid model config: %s' % self._model_config.WhichOneof('model')
self._model_config = self._model_config.multi_tower
Expand Down
2 changes: 1 addition & 1 deletion easy_rec/python/protos/easy_rec_model.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ message EasyRecModel {

optional uint32 num_class = 10 [default = 1];

optional bool use_embedding_variable = 11 [default=false];
optional EVParams ev_params = 11;
chengmengli06 marked this conversation as resolved.
Show resolved Hide resolved

repeated KD kd = 12;

Expand Down
9 changes: 7 additions & 2 deletions easy_rec/python/protos/feature_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ message SequenceCombiner {
}
}

message EVParams {
optional uint64 filter_freq = 1 [default=0];
optional uint64 steps_to_live = 2 [default=0];
}

message FeatureConfig {
enum FeatureType {
IdFeature = 0;
Expand Down Expand Up @@ -116,8 +121,8 @@ message FeatureConfig {
// for expr feature
optional string expression = 30;

// use embedding variables
optional bool use_embedding_variable = 31 [default=false];
// embedding variable params
optional EVParams ev_params = 31;
}

message FeatureConfigV2 {
Expand Down
Loading