From abd5babbbb15362c0d2146541ccfdff260937644 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Fri, 8 Jul 2022 19:51:35 +0800 Subject: [PATCH] add support for frequency filter and steps_to_alive for embedding variable --- .../compat/feature_column/feature_column.py | 12 +++-- .../feature_column/feature_column_v2.py | 20 ++++---- .../python/feature_column/feature_column.py | 46 +++++++++++++------ easy_rec/python/layers/input_layer.py | 6 +-- easy_rec/python/layers/seq_input_layer.py | 4 +- easy_rec/python/model/easy_rec_model.py | 7 ++- easy_rec/python/model/multi_tower_bst.py | 3 +- easy_rec/python/model/multi_tower_din.py | 3 +- easy_rec/python/protos/easy_rec_model.proto | 2 +- easy_rec/python/protos/feature_config.proto | 9 +++- samples/model_config/fg_ev_v2.config | 5 +- samples/model_config/taobao_fg_ev.config | 4 +- samples/model_config/taobao_fg_ev_v2.config | 5 +- 13 files changed, 85 insertions(+), 41 deletions(-) diff --git a/easy_rec/python/compat/feature_column/feature_column.py b/easy_rec/python/compat/feature_column/feature_column.py index 6860abcaa..72942d6ab 100644 --- a/easy_rec/python/compat/feature_column/feature_column.py +++ b/easy_rec/python/compat/feature_column/feature_column.py @@ -2521,7 +2521,7 @@ class _SharedEmbeddingColumn( ('categorical_column', 'dimension', 'combiner', 'initializer', 'shared_embedding_collection_name', 'ckpt_to_load_from', 'tensor_name_in_ckpt', 'max_norm', 'trainable', 'partitioner', - 'use_embedding_variable'))): + 'ev_params'))): """See `embedding_column`.""" @property @@ -2581,7 +2581,7 @@ def _get_dense_tensor_internal(self, 'hood.'.format(shared_embedding_collection)) embedding_weights = shared_embedding_collection[0] if embedding_weights.get_shape( - ) != embedding_shape and not self.use_embedding_variable: + ) != embedding_shape and not self.ev_params is not None: raise ValueError( 'Shared embedding collection {} contains variable {} of ' 'unexpected shape {}. Expected shape is {}. ' @@ -2592,7 +2592,7 @@ def _get_dense_tensor_internal(self, embedding_weights.name, embedding_weights.get_shape(), embedding_shape)) else: - if not self.use_embedding_variable: + if self.ev_params is None: embedding_weights = variable_scope.get_variable( name='embedding_weights', shape=embedding_shape, @@ -2617,7 +2617,9 @@ def _get_dense_tensor_internal(self, initializer=initializer, trainable=self.trainable and trainable, partitioner=self.partitioner, - collections=weight_collections) + collections=weight_collections, + steps_to_live=self.ev_params.steps_to_live if self.ev_params is not None else None, + filter_options=variables.CounterFilterOptions(self.ev_params.filter_freq)) ops.add_to_collection(self.shared_embedding_collection_name, embedding_weights) @@ -2629,7 +2631,7 @@ def _get_dense_tensor_internal(self, self.ckpt_to_load_from, {self.tensor_name_in_ckpt: to_restore}) # Return embedding lookup result. - if self.use_embedding_variable: + if self.ev_params is not None: return ev_embedding_ops.safe_embedding_lookup_sparse( embedding_weights=embedding_weights, sparse_ids=sparse_ids, diff --git a/easy_rec/python/compat/feature_column/feature_column_v2.py b/easy_rec/python/compat/feature_column/feature_column_v2.py index 6aa34d6cb..152ecbee7 100644 --- a/easy_rec/python/compat/feature_column/feature_column_v2.py +++ b/easy_rec/python/compat/feature_column/feature_column_v2.py @@ -817,7 +817,7 @@ def embedding_column(categorical_column, max_norm=None, trainable=True, partitioner=None, - use_embedding_variable=False): + ev_params=None): """`DenseColumn` that converts from sparse, categorical input. Use this when your inputs are sparse, but you want to convert them to a dense @@ -913,7 +913,7 @@ def model_fn(features, ...): max_norm=max_norm, trainable=trainable, partitioner=partitioner, - use_embedding_variable=use_embedding_variable) + ev_params=ev_params) def shared_embedding_columns(categorical_columns, @@ -926,7 +926,7 @@ def shared_embedding_columns(categorical_columns, max_norm=None, trainable=True, partitioner=None, - use_embedding_variable=False): + ev_params=None): """List of dense columns that convert from sparse, categorical input. This is similar to `embedding_column`, except that it produces a list of @@ -1081,7 +1081,7 @@ def model_fn(features, ...): max_norm=max_norm, trainable=trainable, partitioner=partitioner, - use_embedding_variable=use_embedding_variable)) + ev_params=ev_params)) return result @@ -3433,7 +3433,7 @@ class EmbeddingColumn( 'EmbeddingColumn', ('categorical_column', 'dimension', 'combiner', 'initializer', 'ckpt_to_load_from', 'tensor_name_in_ckpt', 'max_norm', 'trainable', - 'partitioner', 'use_embedding_variable'))): + 'partitioner', 'ev_params'))): """See `embedding_column`.""" @property @@ -3507,7 +3507,7 @@ def _get_dense_tensor_internal_helper(self, sparse_tensors, self.ckpt_to_load_from, {self.tensor_name_in_ckpt: to_restore}) # Return embedding lookup result. - if not self.use_embedding_variable: + if self.ev_params is None: return embedding_ops.safe_embedding_lookup_sparse( embedding_weights=embedding_weights, sparse_ids=sparse_ids, @@ -3538,7 +3538,7 @@ def _old_get_dense_tensor_internal(self, sparse_tensors, weight_collections, if (weight_collections and ops.GraphKeys.GLOBAL_VARIABLES not in weight_collections): weight_collections.append(ops.GraphKeys.GLOBAL_VARIABLES) - if not self.use_embedding_variable: + if self.ev_params is None: embedding_weights = variable_scope.get_variable( name='embedding_weights', shape=embedding_shape, @@ -3563,7 +3563,9 @@ def _old_get_dense_tensor_internal(self, sparse_tensors, weight_collections, initializer=initializer, trainable=self.trainable and trainable, partitioner=self.partitioner, - collections=weight_collections) + collections=weight_collections, + steps_to_live=self.ev_params.steps_to_live if self.ev_params is not None else None, + filter_options=variables.CounterFilterOptions(self.ev_params.filter_freq)) # Write the embedding configuration to RTP-specified collections. This will inform RTP to # optimize this embedding operation. @@ -3572,7 +3574,7 @@ def _old_get_dense_tensor_internal(self, sparse_tensors, weight_collections, variable=embedding_weights, bucket_size=self.categorical_column._num_buckets, combiner=self.combiner, - is_embedding_var=self.use_embedding_variable) + is_embedding_var=(self.ev_params is not None)) embedding_attrs['name'] = layer_utils.unique_name_in_collection( compat_ops.GraphKeys.RANK_SERVICE_EMBEDDING, embedding_attrs['name']) layer_utils.update_attr_to_collection( diff --git a/easy_rec/python/feature_column/feature_column.py b/easy_rec/python/feature_column/feature_column.py index 3129dc0f5..481d4972c 100644 --- a/easy_rec/python/feature_column/feature_column.py +++ b/easy_rec/python/feature_column/feature_column.py @@ -1,6 +1,7 @@ # -*- encoding:utf-8 -*- # Copyright (c) Alibaba, Inc. and its affiliates. import logging +import collections import tensorflow as tf @@ -32,6 +33,7 @@ def __init__(self, embedding_name, index, sequence_combiner=None): self.index = index self.sequence_combiner = sequence_combiner +EVParams = collections.namedtuple("EVParams", ["filter_freq", "steps_to_live"]) class FeatureColumnParser(object): """Parse and generate feature columns.""" @@ -40,7 +42,7 @@ def __init__(self, feature_configs, wide_deep_dict={}, wide_output_dim=-1, - use_embedding_variable=False): + ev_params=None): """Initializes a `FeatureColumnParser`. Args: @@ -51,7 +53,7 @@ def __init__(self, easy_rec.python.layers.input_layer.InputLayer, it is defined in easy_rec.python.protos.easy_rec_model_pb2.EasyRecModel.feature_groups wide_output_dim: output dimension for wide columns - use_embedding_variable: use EmbeddingVariable, which is provided by pai-tf + ev_params: params used by EmbeddingVariable, which is provided by pai-tf """ self._feature_configs = feature_configs self._wide_output_dim = wide_output_dim @@ -63,13 +65,16 @@ def __init__(self, self._share_embed_names = {} self._share_embed_infos = {} - self._use_embedding_variable = use_embedding_variable self._vocab_size = {} + self._global_ev_params = None + if ev_params is not None: + self._global_ev_params = self._build_ev_params(ev_params) + def _cmp_embed_config(a, b): return a.embedding_dim == b.embedding_dim and a.combiner == b.combiner and\ a.initializer == b.initializer and a.max_partitions == b.max_partitions and\ - a.use_embedding_variable == b.use_embedding_variable + a.embedding_name == b.embedding_name for config in self._feature_configs: if not config.HasField('embedding_name'): @@ -133,8 +138,12 @@ def _cmp_embed_config(a, b): initializer = hyperparams_builder.build_initializer( self._share_embed_infos[embed_name].initializer) partitioner = self._build_partitioner(self._share_embed_infos[embed_name]) - use_ev = self._use_embedding_variable or \ - self._share_embed_infos[embed_name].use_embedding_variable + + if self._share_embed_infos[embed_name].HasField('ev_params'): + ev_params = self._build_ev_params(self._share_embed_infos[embed_name].ev_params) + else: + ev_params = self._global_ev_params + # for handling share embedding columns share_embed_fcs = feature_column.shared_embedding_columns( self._deep_share_embed_columns[embed_name], @@ -143,7 +152,7 @@ def _cmp_embed_config(a, b): shared_embedding_collection_name=embed_name, combiner=self._share_embed_infos[embed_name].combiner, partitioner=partitioner, - use_embedding_variable=use_ev) + ev_params=ev_params) self._deep_share_embed_columns[embed_name] = share_embed_fcs # for handling wide share embedding columns if len(self._wide_share_embed_columns[embed_name]) == 0: @@ -155,7 +164,7 @@ def _cmp_embed_config(a, b): shared_embedding_collection_name=embed_name + '_wide', combiner='sum', partitioner=partitioner, - use_embedding_variable=use_ev) + ev_params=ev_params) self._wide_share_embed_columns[embed_name] = share_embed_fcs for fc_name in self._deep_columns: @@ -474,7 +483,7 @@ def parse_sequence_feature(self, config): def _build_partitioner(self, config): if config.max_partitions > 1: - if self._use_embedding_variable or config.use_embedding_variable: + if self._global_ev_params is not None or config.HasField('ev_params'): # pai embedding_variable should use fixed_size_partitioner return tf.fixed_size_partitioner(num_shards=config.max_partitions) else: @@ -517,14 +526,17 @@ def _add_wide_embedding_column(self, fc, config): initializer = None if config.HasField('initializer'): initializer = hyperparams_builder.build_initializer(config.initializer) + if config.HasField('ev_params'): + ev_params = self._build_ev_params(config.ev_params) + else: + ev_params = self._global_ev_params wide_fc = feature_column.embedding_column( fc, self._wide_output_dim, combiner='sum', initializer=initializer, partitioner=self._build_partitioner(config), - use_embedding_variable=self._use_embedding_variable or - config.use_embedding_variable) + ev_params=ev_params) self._wide_columns[feature_name] = wide_fc def _add_deep_embedding_column(self, fc, config): @@ -538,14 +550,17 @@ def _add_deep_embedding_column(self, fc, config): initializer = None if config.HasField('initializer'): initializer = hyperparams_builder.build_initializer(config.initializer) + if config.HasField('ev_params'): + ev_params = self._build_ev_params(config.ev_params) + else: + ev_params = self._global_ev_params fc = feature_column.embedding_column( fc, config.embedding_dim, combiner=config.combiner, initializer=initializer, partitioner=self._build_partitioner(config), - use_embedding_variable=self._use_embedding_variable or - config.use_embedding_variable) + ev_params=ev_params) fc.max_seq_length = config.max_seq_len if config.HasField( 'max_seq_len') else -1 @@ -555,3 +570,8 @@ def _add_deep_embedding_column(self, fc, config): if config.HasField('sequence_combiner'): fc.sequence_combiner = config.sequence_combiner self._sequence_columns[feature_name] = fc + + def _build_ev_params(self, ev_params): + """Build embedding_variables params.""" + ev_params = EVParams(ev_params.filter_freq, ev_params.steps_to_live if ev_params.steps_to_live > 0 else None) + return ev_params diff --git a/easy_rec/python/layers/input_layer.py b/easy_rec/python/layers/input_layer.py index 52f972b85..47833aaec 100644 --- a/easy_rec/python/layers/input_layer.py +++ b/easy_rec/python/layers/input_layer.py @@ -33,7 +33,7 @@ def __init__(self, feature_groups_config, variational_dropout_config=None, wide_output_dim=-1, - use_embedding_variable=False, + ev_params=None, embedding_regularizer=None, kernel_regularizer=None, is_training=False): @@ -54,13 +54,13 @@ def __init__(self, self._seq_input_layer = seq_input_layer.SeqInputLayer( feature_configs, self._seq_feature_groups_config, - use_embedding_variable=use_embedding_variable) + ev_params=ev_params) wide_and_deep_dict = self.get_wide_deep_dict() self._fc_parser = FeatureColumnParser( feature_configs, wide_and_deep_dict, wide_output_dim, - use_embedding_variable=use_embedding_variable) + ev_params=ev_params) self._embedding_regularizer = embedding_regularizer self._kernel_regularizer = kernel_regularizer diff --git a/easy_rec/python/layers/seq_input_layer.py b/easy_rec/python/layers/seq_input_layer.py index 4c2150535..bdb8dd326 100644 --- a/easy_rec/python/layers/seq_input_layer.py +++ b/easy_rec/python/layers/seq_input_layer.py @@ -18,7 +18,7 @@ class SeqInputLayer(object): def __init__(self, feature_configs, feature_groups_config, - use_embedding_variable=False): + ev_params=None): self._feature_groups_config = { x.group_name: x for x in feature_groups_config } @@ -26,7 +26,7 @@ def __init__(self, self._fc_parser = FeatureColumnParser( feature_configs, wide_and_deep_dict, - use_embedding_variable=use_embedding_variable) + ev_params=ev_params) def __call__(self, features, diff --git a/easy_rec/python/model/easy_rec_model.py b/easy_rec/python/model/easy_rec_model.py index a43904dd4..3f4654853 100644 --- a/easy_rec/python/model/easy_rec_model.py +++ b/easy_rec/python/model/easy_rec_model.py @@ -38,6 +38,11 @@ def __init__(self, self._is_training = is_training self._feature_dict = features + # embedding variable parameters + self._global_ev_params = None + if model_config.HasField('ev_params'): + self._global_ev_params = model_config.ev_params + self._emb_reg = regularizers.l2_regularizer(self.embedding_regularization) self._l2_reg = regularizers.l2_regularizer(self.l2_regularization) # only used by model with wide feature groups, e.g. WideAndDeep @@ -83,7 +88,7 @@ def build_input_layer(self, model_config, feature_configs): feature_configs, model_config.feature_groups, wide_output_dim=self._wide_output_dim, - use_embedding_variable=model_config.use_embedding_variable, + ev_params=self._global_ev_params, embedding_regularizer=self._emb_reg, kernel_regularizer=self._l2_reg, variational_dropout_config=model_config.variational_dropout diff --git a/easy_rec/python/model/multi_tower_bst.py b/easy_rec/python/model/multi_tower_bst.py index 30c8874b3..dcbdc7091 100644 --- a/easy_rec/python/model/multi_tower_bst.py +++ b/easy_rec/python/model/multi_tower_bst.py @@ -28,7 +28,8 @@ def __init__(self, super(MultiTowerBST, self).__init__(model_config, feature_configs, features, labels, is_training) self._seq_input_layer = seq_input_layer.SeqInputLayer( - feature_configs, model_config.seq_att_groups) + feature_configs, model_config.seq_att_groups, + ev_params=self._global_ev_params) assert self._model_config.WhichOneof('model') == 'multi_tower', \ 'invalid model config: %s' % self._model_config.WhichOneof('model') self._model_config = self._model_config.multi_tower diff --git a/easy_rec/python/model/multi_tower_din.py b/easy_rec/python/model/multi_tower_din.py index afd473f3d..2b66ac098 100644 --- a/easy_rec/python/model/multi_tower_din.py +++ b/easy_rec/python/model/multi_tower_din.py @@ -26,7 +26,8 @@ def __init__(self, super(MultiTowerDIN, self).__init__(model_config, feature_configs, features, labels, is_training) self._seq_input_layer = seq_input_layer.SeqInputLayer( - feature_configs, model_config.seq_att_groups) + feature_configs, model_config.seq_att_groups, + ev_params=self._global_ev_params) assert self._model_config.WhichOneof('model') == 'multi_tower', \ 'invalid model config: %s' % self._model_config.WhichOneof('model') self._model_config = self._model_config.multi_tower diff --git a/easy_rec/python/protos/easy_rec_model.proto b/easy_rec/python/protos/easy_rec_model.proto index e2a724980..d0df557d7 100644 --- a/easy_rec/python/protos/easy_rec_model.proto +++ b/easy_rec/python/protos/easy_rec_model.proto @@ -86,7 +86,7 @@ message EasyRecModel { optional uint32 num_class = 10 [default = 1]; - optional bool use_embedding_variable = 11 [default=false]; + optional EVParams ev_params = 11; repeated KD kd = 12; diff --git a/easy_rec/python/protos/feature_config.proto b/easy_rec/python/protos/feature_config.proto index 2f322d16c..0ed6580a1 100644 --- a/easy_rec/python/protos/feature_config.proto +++ b/easy_rec/python/protos/feature_config.proto @@ -28,6 +28,11 @@ message SequenceCombiner { } } +message EVParams { + optional uint64 filter_freq = 1 [default=0]; + optional uint64 steps_to_live = 2 [default=0]; +} + message FeatureConfig { enum FeatureType { IdFeature = 0; @@ -116,8 +121,8 @@ message FeatureConfig { // for expr feature optional string expression = 30; - // use embedding variables - optional bool use_embedding_variable = 31 [default=false]; + // embedding variable params + optional EVParams ev_params = 31; } message FeatureConfigV2 { diff --git a/samples/model_config/fg_ev_v2.config b/samples/model_config/fg_ev_v2.config index cde4f279d..19c7b61c8 100644 --- a/samples/model_config/fg_ev_v2.config +++ b/samples/model_config/fg_ev_v2.config @@ -3392,7 +3392,10 @@ model_config { } l2_regularization: 9.9999997e-05 } - use_embedding_variable: true + ev_params { + filter_freq: 5 + steps_to_live: 1000 + } } export_config{ multi_placeholder: false diff --git a/samples/model_config/taobao_fg_ev.config b/samples/model_config/taobao_fg_ev.config index 3f8b1bf38..f490aa6d2 100644 --- a/samples/model_config/taobao_fg_ev.config +++ b/samples/model_config/taobao_fg_ev.config @@ -290,7 +290,9 @@ model_config { } l2_regularization: 0.0001 } - use_embedding_variable: true + ev_params { + filter_freq: 5 + } } export_config { multi_placeholder: false diff --git a/samples/model_config/taobao_fg_ev_v2.config b/samples/model_config/taobao_fg_ev_v2.config index 58e1ca9fa..3a2f03b77 100644 --- a/samples/model_config/taobao_fg_ev_v2.config +++ b/samples/model_config/taobao_fg_ev_v2.config @@ -291,7 +291,10 @@ model_config { } l2_regularization: 0.0001 } - use_embedding_variable: true + ev_params { + filter_freq: 5 + steps_to_live: 1000 + } } export_config { multi_placeholder: false