Skip to content

Commit

Permalink
Fix multi-task models (#115, #125)
Browse files Browse the repository at this point in the history
  • Loading branch information
xpai committed Nov 6, 2024
1 parent 4892b6b commit 25536e3
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 112 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
[Doing] Add support for saving pb file, exporting embeddings
[Doing] Add support of multi-gpu training

**FuxiCTR v2.3.5, 2024-11-06**
+ [Fix] Fix get_inputs() bug ([#115](https://github.com/reczoo/FuxiCTR/issues/115))

**FuxiCTR v2.3.4, 2024-11-05**
+ [Feature] Add WuKong model
+ [Fix] Fix OOV token update ([#119](https://github.com/reczoo/FuxiCTR/issues/119))
Expand Down
1 change: 0 additions & 1 deletion experiment/run_expid.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import sys
import logging
import fuxictr_version
from fuxictr import datasets
from datetime import datetime
from fuxictr.utils import load_config, set_logger, print_to_json, print_to_list
from fuxictr.features import FeatureMap
Expand Down
2 changes: 1 addition & 1 deletion fuxictr/autotuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def enumerate_params(config_file, exclude_expid=[]):
dataset_params = dict(zip(dataset_para_keys, values))
if (dataset_params["data_format"] == "npz" or
(dataset_params["data_format"] == "parquet" and
dataset_params["rebuild_dataset"] == False)):
dataset_params.get("rebuild_dataset") == False)):
dataset_para_combs[dataset_id] = dataset_params
else:
hash_id = hashlib.md5("".join(sorted(print_to_json(dataset_params))).encode("utf-8")).hexdigest()[0:8]
Expand Down
37 changes: 20 additions & 17 deletions fuxictr/pytorch/models/multitask_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,29 @@ def __init__(self,
reduce_lr_on_plateau=True,
**kwargs):
super(MultiTaskModel, self).__init__(feature_map=feature_map,
model_id=model_id,
task="binary_classification",
gpu=gpu,
loss_weight=loss_weight,
monitor=monitor,
save_best_only=save_best_only,
monitor_mode=monitor_mode,
early_stop_patience=early_stop_patience,
eval_steps=eval_steps,
embedding_regularizer=embedding_regularizer,
net_regularizer=net_regularizer,
reduce_lr_on_plateau=reduce_lr_on_plateau,
**kwargs)
model_id=model_id,
task="binary_classification",
gpu=gpu,
loss_weight=loss_weight,
monitor=monitor,
save_best_only=save_best_only,
monitor_mode=monitor_mode,
early_stop_patience=early_stop_patience,
eval_steps=eval_steps,
embedding_regularizer=embedding_regularizer,
net_regularizer=net_regularizer,
reduce_lr_on_plateau=reduce_lr_on_plateau,
**kwargs)
self.device = get_device(gpu)
self.num_tasks = num_tasks
self.loss_weight = loss_weight
if isinstance(task, list):
assert len(task) == num_tasks, "the number of tasks must equal the length of \"task\""
self.output_activation = nn.ModuleList([self.get_output_activation(str(t)) for t in task])
else:
self.output_activation = nn.ModuleList([self.get_output_activation(task) for _ in range(num_tasks)])
self.output_activation = nn.ModuleList(
[self.get_output_activation(task) for _ in range(num_tasks)]
)

def compile(self, optimizer, loss, lr):
self.optimizer = get_optimizer(optimizer, self.parameters(), lr)
Expand All @@ -74,8 +76,9 @@ def compile(self, optimizer, loss, lr):
self.loss_fn = [get_loss(loss) for _ in range(self.num_tasks)]

def get_labels(self, inputs):
""" Override get_labels() to use multiple labels """
labels = self.feature_map.labels
y = [inputs[:, self.feature_map.get_column_index(labels[i])].to(self.device).float().view(-1, 1)
y = [inputs[labels[i]].to(self.device).float().view(-1, 1)
for i in range(len(labels))]
return y

Expand Down Expand Up @@ -140,7 +143,7 @@ def evaluate(self, data_generator, metrics=None):
val_logs = self.evaluate_metrics(y_true, y_pred, metrics, group_id)
else:
val_logs = self.evaluate_metrics(y_true, y_pred, self.validation_metrics, group_id)
logging.info('[Metrics] [Task: {}] '.format(labels[i]) + ' - '.join(
logging.info('[Task: {}][Metrics] '.format(labels[i]) + ' - '.join(
'{}: {:.6f}'.format(k, v) for k, v in val_logs.items()))
for k, v in val_logs.items():
all_val_logs['{}_{}'.format(labels[i], k)] = v
Expand All @@ -162,4 +165,4 @@ def predict(self, data_generator):
for i in range(len(labels)):
y_pred_all[labels[i]].extend(
return_dict["{}_pred".format(labels[i])].data.cpu().numpy().reshape(-1))
return y_pred_all
return y_pred_all
2 changes: 1 addition & 1 deletion fuxictr/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__="2.3.4"
__version__="2.3.5"
33 changes: 31 additions & 2 deletions model_zoo/multitask/MMoE/config/model_config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
### Base: This base setting will be inherited by all the expid configs.
Base:
model_root: './checkpoints/'
num_workers: 3
Expand All @@ -12,6 +13,34 @@ Base:
feature_specs: null
feature_config: null

### ModelName_default: This is a config template for hyper-tuning use
MMoE_default:
model: MMoE
dataset_id: TBD
loss: ['binary_crossentropy','binary_crossentropy']
metrics: ['logloss', 'AUC']
task: ['binary_classification','binary_classification']
num_tasks: 2
optimizer: adam
learning_rate: 1.e-3
num_experts: 8
expert_hidden_units: [512,256,128]
gate_hidden_units: [128, 64]
tower_hidden_units: [128, 64]
hidden_activations: relu
net_regularizer: 0
embedding_regularizer: 1.e-6
batch_norm: False
net_dropout: 0
batch_size: 128
embedding_dim: 128
epochs: 100
shuffle: True
seed: 2023
monitor: 'AUC'
monitor_mode: 'max'

### ModelName_test: This is a config for test only
MMoE_test:
model: MMoE
dataset_id: tiny_mtl
Expand All @@ -32,8 +61,8 @@ MMoE_test:
net_dropout: 0
batch_size: 128
embedding_dim: 128
epochs: 50
epochs: 1
shuffle: True
seed: 2023
monitor: 'AUC'
monitor_mode: 'max'
monitor_mode: 'max'
2 changes: 1 addition & 1 deletion model_zoo/multitask/MMoE/fuxictr_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# pip install -U fuxictr
import fuxictr
assert fuxictr.__version__ >= "2.2.0"
assert fuxictr.__version__ >= "2.3.4"
35 changes: 33 additions & 2 deletions model_zoo/multitask/PLE/config/model_config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
### Base: This base setting will be inherited by all the expid configs.
Base:
model_root: './checkpoints/'
num_workers: 3
Expand All @@ -12,6 +13,36 @@ Base:
feature_specs: null
feature_config: null

### ModelName_default: This is a config template for hyper-tuning use
PLE_default:
model: PLE
dataset_id: TBD
loss: ['binary_crossentropy','binary_crossentropy']
metrics: ['logloss', 'AUC']
task: ['binary_classification','binary_classification']
num_tasks: 2
optimizer: adam
learning_rate: 1.e-3
num_layers: 1
num_shared_experts: 8
num_specific_experts: 1
expert_hidden_units: [512,256,128]
gate_hidden_units: [128, 64]
tower_hidden_units: [128, 64]
hidden_activations: relu
net_regularizer: 0
embedding_regularizer: 1.e-6
batch_norm: False
net_dropout: 0
batch_size: 128
embedding_dim: 128
epochs: 50
shuffle: True
seed: 2023
monitor: 'AUC'
monitor_mode: 'max'

### ModelName_test: This is a config for test only
PLE_test:
model: PLE
dataset_id: tiny_mtl
Expand All @@ -34,8 +65,8 @@ PLE_test:
net_dropout: 0
batch_size: 128
embedding_dim: 128
epochs: 50
epochs: 1
shuffle: True
seed: 2023
monitor: 'AUC'
monitor_mode: 'max'
monitor_mode: 'max'
2 changes: 1 addition & 1 deletion model_zoo/multitask/PLE/fuxictr_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# pip install -U fuxictr
import fuxictr
assert fuxictr.__version__ >= "2.2.0"
assert fuxictr.__version__ >= "2.3.4"
71 changes: 41 additions & 30 deletions model_zoo/multitask/PLE/src/PLE.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,38 @@


class CGC_Layer(nn.Module):
def __init__(self, num_shared_experts, num_specific_experts, num_tasks, input_dim, expert_hidden_units, gate_hidden_units, hidden_activations,
def __init__(self, num_shared_experts, num_specific_experts, num_tasks, input_dim,
expert_hidden_units, gate_hidden_units, hidden_activations,
net_dropout, batch_norm):
super(CGC_Layer, self).__init__()
self.num_shared_experts = num_shared_experts
self.num_specific_experts = num_specific_experts
self.num_tasks = num_tasks
self.shared_experts = nn.ModuleList([MLP_Block(input_dim=input_dim,
hidden_units=expert_hidden_units,
hidden_activations=hidden_activations,
output_activation=None,
dropout_rates=net_dropout,
batch_norm=batch_norm) for _ in range(self.num_shared_experts)])
self.specific_experts = nn.ModuleList([nn.ModuleList([MLP_Block(input_dim=input_dim,
hidden_units=expert_hidden_units,
hidden_activations=hidden_activations,
output_activation=None,
dropout_rates=net_dropout,
batch_norm=batch_norm) for _ in range(self.num_specific_experts)]) for _ in range(num_tasks)])
self.gate = nn.ModuleList([MLP_Block(input_dim=input_dim,
output_dim=num_specific_experts+num_shared_experts if i < num_tasks else num_shared_experts,
hidden_units=gate_hidden_units,
hidden_activations=hidden_activations,
output_activation=None,
dropout_rates=net_dropout,
batch_norm=batch_norm) for i in range(self.num_tasks+1)])
self.shared_experts = nn.ModuleList(
[MLP_Block(input_dim=input_dim,
hidden_units=expert_hidden_units,
hidden_activations=hidden_activations,
output_activation=None,
dropout_rates=net_dropout,
batch_norm=batch_norm) for _ in range(self.num_shared_experts)]
)
self.specific_experts = nn.ModuleList(
[nn.ModuleList([MLP_Block(input_dim=input_dim,
hidden_units=expert_hidden_units,
hidden_activations=hidden_activations,
output_activation=None,
dropout_rates=net_dropout,
batch_norm=batch_norm) for _ in range(self.num_specific_experts)]) for _ in range(num_tasks)]
)
self.gate = nn.ModuleList(
[MLP_Block(input_dim=input_dim,
output_dim=num_specific_experts+num_shared_experts if i < num_tasks else num_shared_experts,
hidden_units=gate_hidden_units,
hidden_activations=hidden_activations,
output_activation=None,
dropout_rates=net_dropout,
batch_norm=batch_norm) for i in range(self.num_tasks+1)]
)
self.gate_activation = get_activation('softmax')
def forward(self, x, require_gate=False):
"""
Expand All @@ -69,7 +76,8 @@ def forward(self, x, require_gate=False):
for i in range(self.num_tasks+1):
if i < self.num_tasks:
# for specific experts
gate_input = torch.stack(specific_expert_outputs[i] + shared_expert_outputs, dim=1) # (?, num_specific_experts+num_shared_experts, dim)
# gate_input: (?, num_specific_experts+num_shared_experts, dim)
gate_input = torch.stack(specific_expert_outputs[i] + shared_expert_outputs, dim=1)
gate = self.gate_activation(self.gate[i](x[i])) # (?, num_specific_experts+num_shared_experts)
gates.append(gate.mean(0))
cgc_output = torch.sum(gate.unsqueeze(-1) * gate_input, dim=1) # (?, dim)
Expand All @@ -86,6 +94,7 @@ def forward(self, x, require_gate=False):
else:
return cgc_outputs


class PLE(MultiTaskModel):
def __init__(self,
feature_map,
Expand Down Expand Up @@ -117,15 +126,17 @@ def __init__(self,
**kwargs)
self.embedding_layer = FeatureEmbedding(feature_map, embedding_dim)
self.num_layers = num_layers
self.cgc_layers = nn.ModuleList([CGC_Layer(num_shared_experts,
num_specific_experts,
num_tasks,
input_dim= embedding_dim * feature_map.num_fields if i==0 else expert_hidden_units[-1],
expert_hidden_units= expert_hidden_units,
gate_hidden_units=gate_hidden_units,
hidden_activations=hidden_activations,
net_dropout=net_dropout,
batch_norm=batch_norm) for i in range(self.num_layers)])
self.cgc_layers = nn.ModuleList(
[CGC_Layer(num_shared_experts,
num_specific_experts,
num_tasks,
input_dim= embedding_dim * feature_map.num_fields if i==0 else expert_hidden_units[-1],
expert_hidden_units= expert_hidden_units,
gate_hidden_units=gate_hidden_units,
hidden_activations=hidden_activations,
net_dropout=net_dropout,
batch_norm=batch_norm) for i in range(self.num_layers)]
)
self.tower = nn.ModuleList([MLP_Block(input_dim=expert_hidden_units[-1],
output_dim=1,
hidden_units=tower_hidden_units,
Expand Down
29 changes: 28 additions & 1 deletion model_zoo/multitask/ShareBottom/config/model_config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
### Base: This base setting will be inherited by all the expid configs.
Base:
model_root: './checkpoints/'
num_workers: 3
Expand All @@ -12,8 +13,34 @@ Base:
feature_specs: null
feature_config: null

### ModelName_default: This is a config template for hyper-tuning use
ShareBottom_default:
model: ShareBottom
dataset_id: TBD
loss: ['binary_crossentropy', 'binary_crossentropy']
metrics: ['logloss', 'AUC']
task: ['binary_classification', 'binary_classification']
num_tasks: 2
optimizer: adam
learning_rate: 1.e-3
bottom_hidden_units: [512, 256, 128]
tower_hidden_units: [128, 64]
hidden_activations: relu
net_regularizer: 0
embedding_regularizer: 1.e-6
batch_norm: False
net_dropout: 0
batch_size: 128
embedding_dim: 128
epochs: 100
shuffle: True
seed: 2023
monitor: 'AUC'
monitor_mode: 'max'

### ModelName_test: This is a config for test only
ShareBottom_test:
model: SharedBottom
model: ShareBottom
dataset_id: tiny_mtl
loss: ['binary_crossentropy', 'binary_crossentropy']
metrics: ['logloss', 'AUC']
Expand Down
2 changes: 1 addition & 1 deletion model_zoo/multitask/ShareBottom/fuxictr_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# pip install -U fuxictr
import fuxictr
assert fuxictr.__version__ >= "2.2.0"
assert fuxictr.__version__ >= "2.3.4"
5 changes: 2 additions & 3 deletions model_zoo/multitask/ShareBottom/run_expid.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from fuxictr.pytorch.torch_utils import seed_everything
from fuxictr.pytorch.dataloaders import RankDataLoader
from fuxictr.preprocess import FeatureProcessor, build_dataset
import src as model_zoo
import src
import gc
import argparse
import os
Expand Down Expand Up @@ -61,7 +61,7 @@
feature_map.load(feature_map_json, params)
logging.info("Feature specs: " + print_to_json(feature_map.features))

model_class = getattr(model_zoo, params['model'])
model_class = getattr(src, params['model'])
model = model_class(feature_map, **params)
model.count_parameters() # print number of parameters used in model

Expand All @@ -85,4 +85,3 @@
.format(datetime.now().strftime('%Y%m%d-%H%M%S'),
' '.join(sys.argv), experiment_id, params['dataset_id'],
"N.A.", print_to_list(valid_result), print_to_list(test_result)))

2 changes: 1 addition & 1 deletion model_zoo/multitask/ShareBottom/src/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .ShareBottom import *
from .ShareBottom import ShareBottom
Loading

0 comments on commit 25536e3

Please sign in to comment.