diff --git a/easy_rec/python/compat/estimator_train.py b/easy_rec/python/compat/estimator_train.py index 99f65380f..1ec71e491 100644 --- a/easy_rec/python/compat/estimator_train.py +++ b/easy_rec/python/compat/estimator_train.py @@ -109,3 +109,8 @@ def train_and_evaluate(estimator, train_spec, eval_spec): fout.write('Train Done.') return result + + +def estimator_train_done(estimator): + train_done_file = os.path.join(estimator.model_dir, 'ESTIMATOR_TRAIN_DONE') + return gfile.Exists(train_done_file) diff --git a/easy_rec/python/compat/sync_replicas_optimizer.py b/easy_rec/python/compat/sync_replicas_optimizer.py index 4f1d97d4b..24c2921ba 100644 --- a/easy_rec/python/compat/sync_replicas_optimizer.py +++ b/easy_rec/python/compat/sync_replicas_optimizer.py @@ -135,6 +135,8 @@ class SyncReplicasOptimizer(optimizer.Optimizer): ``` """ + sync_que_id = -1 + def __init__(self, opt, replicas_to_aggregate, @@ -299,15 +301,24 @@ def apply_gradients(self, grads_and_vars, global_step=None, name=None): update_op = self._opt.apply_gradients(aggregated_grads_and_vars, global_step) + def _get_token_qname(): + SyncReplicasOptimizer.sync_que_id += 1 + if SyncReplicasOptimizer.sync_que_id == 0: + return 'sync_token_q' + else: + return 'sync_token_q_' + str(SyncReplicasOptimizer.sync_que_id) + # Create token queue. + token_qname = _get_token_qname() + logging.info('create sync_token_queue[%s]' % token_qname) with ops.device(global_step.device), ops.name_scope(''): sync_token_queue = ( data_flow_ops.FIFOQueue( -1, global_step.dtype.base_dtype, shapes=(), - name='sync_token_q', - shared_name='sync_token_q')) + name=token_qname, + shared_name=token_qname)) self._sync_token_queue = sync_token_queue self._is_sync_que_closed = sync_token_queue.is_closed() self._close_sync_que = sync_token_queue.close( @@ -342,6 +353,8 @@ def apply_gradients(self, grads_and_vars, global_step=None, name=None): self._chief_queue_runner = queue_runner.QueueRunner( dummy_queue, [sync_op]) + ops.add_to_collection(ops.GraphKeys.QUEUE_RUNNERS, + self._chief_queue_runner) for accum, dev in self._accumulator_list: with ops.device(dev): chief_init_ops.append( @@ -479,14 +492,12 @@ def begin(self): self._local_init_op = self._sync_optimizer.chief_init_op self._ready_for_local_init_op = ( self._sync_optimizer.ready_for_local_init_op) - self._q_runner = self._sync_optimizer.get_chief_queue_runner() self._init_tokens_op = self._sync_optimizer.get_init_tokens_op( self._num_tokens) else: self._local_init_op = self._sync_optimizer.local_step_init_op self._ready_for_local_init_op = ( self._sync_optimizer.ready_for_local_init_op) - self._q_runner = None self._init_tokens_op = None def after_create_session(self, session, coord): @@ -500,11 +511,10 @@ def after_create_session(self, session, coord): 'local_init. Init op: %s, error: %s' % (self._local_init_op.name, msg)) session.run(self._local_init_op) + is_closed = session.run(self._sync_optimizer._is_sync_que_closed) + assert not is_closed, 'sync_que is closed' if self._init_tokens_op is not None: session.run(self._init_tokens_op) - if self._q_runner is not None: - self._q_runner.create_threads( - session, coord=coord, daemon=True, start=True) def end(self, session): try: diff --git a/easy_rec/python/main.py b/easy_rec/python/main.py index 218452e79..f2b1ad719 100644 --- a/easy_rec/python/main.py +++ b/easy_rec/python/main.py @@ -9,6 +9,7 @@ import logging import math import os +import time import six import tensorflow as tf @@ -279,7 +280,9 @@ def train_and_evaluate(pipeline_config_path, continue_train=False): def _train_and_evaluate_impl(pipeline_config, continue_train=False, - check_mode=False): + check_mode=False, + fit_on_eval=False, + fit_on_eval_steps=None): train_config = pipeline_config.train_config data_config = pipeline_config.data_config feature_configs = config_util.get_compatible_feature_configs(pipeline_config) @@ -301,18 +304,15 @@ def _train_and_evaluate_impl(pipeline_config, estimator, run_config = _create_estimator( pipeline_config, distribution=distribution, params=params) - master_stat_file = os.path.join(pipeline_config.model_dir, 'master.stat') version_file = os.path.join(pipeline_config.model_dir, 'version') if estimator_utils.is_chief(): _check_model_dir(pipeline_config.model_dir, continue_train) config_util.save_pipeline_config(pipeline_config, pipeline_config.model_dir) with gfile.GFile(version_file, 'w') as f: f.write(easy_rec.__version__ + '\n') - if gfile.Exists(master_stat_file): - gfile.Remove(master_stat_file) train_steps = None - if train_config.HasField('num_steps'): + if train_config.HasField('num_steps') and train_config.num_steps > 0: train_steps = train_config.num_steps assert train_steps is not None or data_config.num_epochs > 0, ( 'either num_steps and num_epochs must be set to an integer > 0.') @@ -347,6 +347,30 @@ def _train_and_evaluate_impl(pipeline_config, from easy_rec.python.compat import estimator_train estimator_train.train_and_evaluate(estimator, train_spec, eval_spec) logging.info('Train and evaluate finish') + if fit_on_eval and (not estimator_utils.is_evaluator()): + tf.reset_default_graph() + logging.info('Start continue training on eval data') + eval_input_fn = _get_input_fn(data_config, feature_configs, eval_data, + **input_fn_kwargs) + if fit_on_eval_steps is not None: + # wait estimator train done to get the correct train_steps + while not estimator_train.estimator_train_done(estimator): + time.sleep(1) + train_steps = estimator_utils.get_trained_steps(estimator.model_dir) + logging.info('\ttrain_steps=%d fit_on_eval_steps=%d' % + (train_steps, fit_on_eval_steps)) + fit_on_eval_steps += train_steps + # Do not use estimator_train.train_and_evaluate as it starts tf.Server, + # which is redundant and reports port not available error. + estimator.train( + input_fn=eval_input_fn, + max_steps=fit_on_eval_steps, + hooks=list(train_spec.hooks), + saving_listeners=train_spec.saving_listeners if hasattr( + train_spec, 'saving_listeners') else None) + logging.info('Finished training on eval data') + # return estimator for custom training using estimator.train + return estimator def evaluate(pipeline_config, diff --git a/easy_rec/python/test/train_eval_test.py b/easy_rec/python/test/train_eval_test.py index d594c7dfe..2ae51751f 100644 --- a/easy_rec/python/test/train_eval_test.py +++ b/easy_rec/python/test/train_eval_test.py @@ -756,6 +756,23 @@ def test_train_with_ps_worker(self): 'samples/model_config/multi_tower_on_taobao.config', self._test_dir) self.assertTrue(self._success) + def test_fit_on_eval(self): + self._success = test_utils.test_distributed_train_eval( + 'samples/model_config/multi_tower_on_taobao.config', + self._test_dir, + num_evaluator=1, + fit_on_eval=True) + self.assertTrue(self._success) + + def test_unbalance_data(self): + self._success = test_utils.test_distributed_train_eval( + 'samples/model_config/multi_tower_on_taobao_unblanace.config', + self._test_dir, + total_steps=0, + num_epoch=1, + num_evaluator=1) + self.assertTrue(self._success) + def test_train_with_ps_worker_with_evaluator(self): self._success = test_utils.test_distributed_train_eval( 'samples/model_config/multi_tower_on_taobao.config', diff --git a/easy_rec/python/train_eval.py b/easy_rec/python/train_eval.py index daaf3d572..a7ebca5db 100644 --- a/easy_rec/python/train_eval.py +++ b/easy_rec/python/train_eval.py @@ -67,6 +67,16 @@ nargs='*', default=None, help='eval data input path') + parser.add_argument( + '--fit_on_eval', + action='store_true', + default=False, + help='Fit evaluation data after fitting and evaluating train data') + parser.add_argument( + '--fit_on_eval_steps', + type=int, + default=None, + help='Fit evaluation data steps') parser.add_argument( '--fine_tune_checkpoint', type=str, @@ -169,7 +179,11 @@ has_evaluator=False) else: config_util.auto_expand_share_feature_configs(pipeline_config) - _train_and_evaluate_impl(pipeline_config, args.continue_train, - args.check_mode) + _train_and_evaluate_impl( + pipeline_config, + args.continue_train, + args.check_mode, + fit_on_eval=args.fit_on_eval, + fit_on_eval_steps=args.fit_on_eval_steps) else: raise ValueError('pipeline_config_path should not be empty when training!') diff --git a/easy_rec/python/utils/estimator_utils.py b/easy_rec/python/utils/estimator_utils.py index 0525f84e6..3d42b410a 100644 --- a/easy_rec/python/utils/estimator_utils.py +++ b/easy_rec/python/utils/estimator_utils.py @@ -922,6 +922,14 @@ def latest_checkpoint(model_dir): return None +def get_trained_steps(model_dir): + ckpt_path = latest_checkpoint(model_dir) + if ckpt_path is not None: + return int(ckpt_path.split('-')[-1]) + else: + return 0 + + def master_to_chief(): if 'TF_CONFIG' in os.environ: tf_config = json.loads(os.environ['TF_CONFIG']) diff --git a/easy_rec/python/utils/test_utils.py b/easy_rec/python/utils/test_utils.py index 46e47e520..a54591ce5 100644 --- a/easy_rec/python/utils/test_utils.py +++ b/easy_rec/python/utils/test_utils.py @@ -159,7 +159,10 @@ def _replace_data_for_test(data_path): return data_path -def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50): +def _load_config_for_test(pipeline_config_path, + test_dir, + total_steps=50, + num_epochs=0): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) train_config = pipeline_config.train_config @@ -171,7 +174,7 @@ def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50): pipeline_config.model_dir = os.path.join(test_dir, 'train') logging.info('test_model_dir %s' % pipeline_config.model_dir) eval_config.num_examples = max(10, data_config.batch_size) - data_config.num_epochs = 0 + data_config.num_epochs = num_epochs return pipeline_config @@ -529,7 +532,9 @@ def _get_ports(num_worker): def _ps_worker_train(pipeline_config_path, test_dir, num_worker, - num_evaluator=0): + num_evaluator=0, + fit_on_eval=False, + fit_on_eval_steps=None): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: @@ -547,6 +552,10 @@ def _ps_worker_train(pipeline_config_path, os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[0]) train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % pipeline_config_path + if fit_on_eval: + train_cmd += ' --fit_on_eval' + if fit_on_eval_steps is not None: + train_cmd += ' --fit_on_eval_steps ' + str(int(fit_on_eval_steps)) procs[chief_or_master] = run_cmd( train_cmd, '%s/log_%s.txt' % (test_dir, chief_or_master)) tf_config['task'] = {'type': 'ps', 'index': 0} @@ -665,10 +674,12 @@ def test_distributed_train_eval(pipeline_config_path, total_steps=50, num_evaluator=0, edit_config_json=None, - use_hvd=False): + use_hvd=False, + fit_on_eval=False, + num_epoch=0): logging.info('testing pipeline config %s' % pipeline_config_path) pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, - total_steps) + total_steps, num_epoch) if edit_config_json is not None: config_util.edit_config(pipeline_config, edit_config_json) @@ -687,8 +698,13 @@ def test_distributed_train_eval(pipeline_config_path, return _multi_worker_hvd_train(test_pipeline_config_path, test_dir, 2) if train_config.train_distribute == DistributionStrategy.NoStrategy: num_worker = 2 - procs = _ps_worker_train(test_pipeline_config_path, test_dir, num_worker, - num_evaluator) + procs = _ps_worker_train( + test_pipeline_config_path, + test_dir, + num_worker, + num_evaluator, + fit_on_eval, + fit_on_eval_steps=int(total_steps // 2)) elif train_config.train_distribute == DistributionStrategy.MultiWorkerMirroredStrategy: num_worker = 2 procs = _multi_worker_mirror_train(test_pipeline_config_path, test_dir, diff --git a/samples/model_config/multi_tower_on_taobao_unblanace.config b/samples/model_config/multi_tower_on_taobao_unblanace.config new file mode 100644 index 000000000..8105483c2 --- /dev/null +++ b/samples/model_config/multi_tower_on_taobao_unblanace.config @@ -0,0 +1,304 @@ +train_input_path: "data/test/tb_data/taobao_train_data,data/test/tb_data/taobao_test_data" +eval_input_path: "data/test/tb_data/taobao_test_data" +model_dir: "experiments/multi_tower_taobao_ckpt" + +train_config { + log_step_count_steps: 100 + optimizer_config: { + adam_optimizer: { + learning_rate: { + exponential_decay_learning_rate { + initial_learning_rate: 0.001 + decay_steps: 1000 + decay_factor: 0.5 + min_learning_rate: 0.00001 + } + } + } + use_moving_average: false + } + save_checkpoints_steps: 100 + sync_replicas: True + num_steps: 200 +} + +eval_config { + metrics_set: { + auc {} + } +} + +data_config { + file_shard: true + input_fields { + input_name:'clk' + input_type: INT32 + } + input_fields { + input_name:'buy' + input_type: INT32 + } + input_fields { + input_name: 'pid' + input_type: STRING + } + input_fields { + input_name: 'adgroup_id' + input_type: STRING + } + input_fields { + input_name: 'cate_id' + input_type: STRING + } + input_fields { + input_name: 'campaign_id' + input_type: STRING + } + input_fields { + input_name: 'customer' + input_type: STRING + } + input_fields { + input_name: 'brand' + input_type: STRING + } + input_fields { + input_name: 'user_id' + input_type: STRING + } + input_fields { + input_name: 'cms_segid' + input_type: STRING + } + input_fields { + input_name: 'cms_group_id' + input_type: STRING + } + input_fields { + input_name: 'final_gender_code' + input_type: STRING + } + input_fields { + input_name: 'age_level' + input_type: STRING + } + input_fields { + input_name: 'pvalue_level' + input_type: STRING + } + input_fields { + input_name: 'shopping_level' + input_type: STRING + } + input_fields { + input_name: 'occupation' + input_type: STRING + } + input_fields { + input_name: 'new_user_class_level' + input_type: STRING + } + input_fields { + input_name: 'tag_category_list' + input_type: STRING + } + input_fields { + input_name: 'tag_brand_list' + input_type: STRING + } + input_fields { + input_name: 'price' + input_type: INT32 + } + + label_fields: 'clk' + batch_size: 4096 + num_epochs: 1 + prefetch_size: 32 + input_type: CSVInput +} + +feature_config: { + features: { + input_names: 'pid' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'adgroup_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'cate_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10000 + } + features: { + input_names: 'campaign_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'customer' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'brand' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'user_id' + feature_type: IdFeature + embedding_dim: 32 + hash_bucket_size: 100000 + } + features: { + input_names: 'cms_segid' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: 'cms_group_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: 'final_gender_code' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'age_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'pvalue_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'shopping_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'occupation' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'new_user_class_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'tag_category_list' + feature_type: TagFeature + separator: '|' + hash_bucket_size: 100000 + embedding_dim: 16 + } + features: { + input_names: 'tag_brand_list' + feature_type: TagFeature + separator: '|' + hash_bucket_size: 100000 + embedding_dim: 16 + } + features: { + input_names: 'price' + feature_type: IdFeature + embedding_dim: 16 + num_buckets: 50 + } +} +model_config: { + model_class: 'MultiTower' + feature_groups: { + group_name: 'user' + feature_names: 'user_id' + feature_names: 'cms_segid' + feature_names: 'cms_group_id' + feature_names: 'age_level' + feature_names: 'pvalue_level' + feature_names: 'shopping_level' + feature_names: 'occupation' + feature_names: 'new_user_class_level' + wide_deep: DEEP + } + feature_groups: { + group_name: 'item' + feature_names: 'adgroup_id' + feature_names: 'cate_id' + feature_names: 'campaign_id' + feature_names: 'customer' + feature_names: 'brand' + feature_names: 'price' + wide_deep: DEEP + } + feature_groups: { + group_name: 'combo' + feature_names: 'pid' + feature_names: 'tag_category_list' + feature_names: 'tag_brand_list' + wide_deep: DEEP + } + losses { + loss_type: F1_REWEIGHTED_LOSS + weight: 1.0 + f1_reweighted_loss { + f1_beta_square: 2.25 + } + } + losses { + loss_type: PAIR_WISE_LOSS + weight: 1.0 + } + multi_tower { + towers { + input: "user" + dnn { + hidden_units: [256, 128, 96, 64] + } + } + towers { + input: "item" + dnn { + hidden_units: [256, 128, 96, 64] + } + } + towers { + input: "combo" + dnn { + hidden_units: [128, 96, 64, 32] + } + } + final_dnn { + hidden_units: [128, 96, 64, 32, 16] + } + l2_regularization: 1e-6 + } + embedding_regularization: 1e-4 +} + +export_config { + multi_placeholder: false +}