Skip to content

Commit

Permalink
Renaming ProcessList to PrepareFeatures (#992)
Browse files Browse the repository at this point in the history
* Fixed error that was causing the broadcasted context feature to have fixed size first dim in graph mode and not being compatible with the ragged sequential features

* Enforcing non-list (scalar) features to be 2D (batch size,1) if 1D or with last dim undefined (which happens in graph mode)

* Making Continuous support_masking=True (to cascade mask)

* Changing BroadcastToSequence to fix some issues and simplify the masking

* Fixed tests

* Fixed test

* Renaming ProcessList to PrepareFeatures, as it nows also prepares not only list features but also scalar features

* Fixed tests
  • Loading branch information
gabrielspmoreira authored and sararb committed Feb 28, 2023
1 parent f78bec7 commit 0d28de4
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 41 deletions.
6 changes: 3 additions & 3 deletions merlin/models/tf/core/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from merlin.models.tf.inputs.embedding import CombinerType, EmbeddingTable
from merlin.models.tf.models.base import BaseModel, get_output_schema
from merlin.models.tf.outputs.topk import TopKOutput
from merlin.models.tf.transforms.tensor import ProcessList
from merlin.models.tf.transforms.tensor import PrepareFeatures
from merlin.models.tf.utils import tf_utils
from merlin.schema import ColumnSchema, Schema, Tags

Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(
self.blocks = [input_block] + list(blocks) if blocks else [input_block]
self.pre = pre
self.post = post
self.process_list = ProcessList(self._schema)
self.prepare_features = PrepareFeatures(self._schema)

def encode(
self,
Expand Down Expand Up @@ -163,7 +163,7 @@ def call(self, inputs, training=False, testing=False, targets=None, **kwargs):
return combinators.call_sequentially(
list(self.to_call),
inputs=inputs,
features=self.process_list(inputs),
features=self.prepare_features(inputs),
targets=targets,
training=training,
testing=testing,
Expand Down
4 changes: 2 additions & 2 deletions merlin/models/tf/inputs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
Embeddings,
SequenceEmbeddingFeatures,
)
from merlin.models.tf.transforms.tensor import ListToDense, ProcessList
from merlin.models.tf.transforms.tensor import ListToDense, PrepareFeatures
from merlin.schema import Schema, Tags, TagsType

LOG = logging.getLogger("merlin-models")
Expand Down Expand Up @@ -325,7 +325,7 @@ def InputBlockV2(
if not parsed:
raise ValueError("No columns selected for the input block")

_pre = ProcessList(schema)
_pre = PrepareFeatures(schema)
if pre:
_pre = _pre.connect(pre)

Expand Down
13 changes: 9 additions & 4 deletions merlin/models/tf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def sample_batch(
include_targets: bool = True,
to_ragged: bool = False,
to_dense: bool = False,
process_lists=False,
prepare_features=False,
):
"""Util function to generate a batch of input tensors from a merlin.io.Dataset instance
Expand All @@ -387,6 +387,11 @@ def sample_batch(
Whether to convert the tuple of sparse tensors into ragged tensors, by default False.
to_dense: bool
Whether to convert the tuple of sparse tensors into dense tensors, by default False.
prepare_features: bool
Whether to prepare features from dataloader for the model, by default False.
If enabled, it converts multi-hot/list features to dense or ragged based on the schema.
It also ensures that scalar features are converted to 2D (batch size, 1).
P.s. The features are automatically prepared by InputBlockV2 if it is used
Returns:
-------
batch: Dict[tf.tensor]
Expand All @@ -397,7 +402,7 @@ def sample_batch(
"Sparse values cannot be converted to both ragged tensors and dense tensors"
)

from merlin.models.tf.transforms.tensor import ListToDense, ListToRagged, ProcessList
from merlin.models.tf.transforms.tensor import ListToDense, ListToRagged, PrepareFeatures

if isinstance(dataset_or_loader, Dataset):
if not batch_size:
Expand All @@ -414,8 +419,8 @@ def sample_batch(
inputs = ListToRagged()(inputs)
elif to_dense:
inputs = ListToDense()(inputs)
if process_lists:
inputs = ProcessList(loader.schema)(inputs)
if prepare_features:
inputs = PrepareFeatures(loader.schema)(inputs)
if not include_targets:
return inputs
return inputs, targets
Expand Down
8 changes: 4 additions & 4 deletions merlin/models/tf/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,7 @@ def __init__(
]
self.schema = sum(input_block_schemas, Schema())

self.process_list = ProcessList(self.schema)
self.prepare_features = PrepareFeatures(self.schema)
self._frozen_blocks = set()

def save(
Expand Down Expand Up @@ -1543,7 +1543,7 @@ def _maybe_build(self, inputs):
f"\n\t{call_input_features.difference(model_input_features)}"
)

_ragged_inputs = self.process_list(inputs)
_ragged_inputs = self.prepare_features(inputs)
feature_shapes = {k: v.shape for k, v in _ragged_inputs.items()}
feature_dtypes = {k: v.dtype for k, v in _ragged_inputs.items()}

Expand All @@ -1565,7 +1565,7 @@ def build(self, input_shape=None):
"""
last_layer = None

input_shape = self.process_list.compute_output_shape(input_shape)
input_shape = self.prepare_features.compute_output_shape(input_shape)

if self.pre is not None:
self.pre.build(input_shape)
Expand All @@ -1592,7 +1592,7 @@ def build(self, input_shape=None):

def call(self, inputs, targets=None, training=False, testing=False, output_context=False):
context = self._create_context(
self.process_list(inputs),
self.prepare_features(inputs),
targets=targets,
training=training,
testing=testing,
Expand Down
2 changes: 1 addition & 1 deletion merlin/models/tf/transforms/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def compute_call_output_shape(self, input_shapes):


@tf.keras.utils.register_keras_serializable(package="merlin.models")
class ProcessList(TabularBlock):
class PrepareFeatures(TabularBlock):
"""Process all list (multi-hot/sequential) features.add()
In NVTabular, list-columns are represented as a tuple of (values, offsets).
Expand Down
2 changes: 1 addition & 1 deletion merlin/models/tf/utils/testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def model_test(

assert isinstance(loaded_model, type(model))

x, y = sample_batch(dataloader, batch_size=50, to_ragged=False, process_lists=False)
x, y = sample_batch(dataloader, batch_size=50, to_ragged=False, prepare_features=False)
batch = [(x, y)]

model_preds = model.predict(iter(batch))
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/datasets/test_synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_tf_tensors_generation_cpu():

from merlin.models.tf import sample_batch

tensors, _ = sample_batch(data, batch_size=100, process_lists=False)
tensors, _ = sample_batch(data, batch_size=100)
assert tensors["user_id"].shape == (100, 1)
assert tensors["user_age"].dtype == tf.float64
for name, val in filter_dict_by_schema(tensors, schema.select_by_tag(Tags.LIST)).items():
Expand All @@ -68,7 +68,7 @@ def test_sequence_data_length(generate_data_kwargs, expected_sequence_length):

from merlin.models.tf import sample_batch

tensors, y = sample_batch(data, batch_size=1, process_lists=False)
tensors, y = sample_batch(data, batch_size=1)

for col in ["item_id_seq", "categories"]:
assert all(tensors[col][1] == expected_sequence_length)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/tf/blocks/retrieval/test_two_tower.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_two_tower_block_with_l2_norm_on_towers_outputs(testing_data: Dataset):

def test_two_tower_block_tower_save(testing_data: Dataset, tmp_path):
two_tower = ml.TwoTowerBlock(testing_data.schema, query_tower=ml.MLPBlock([64, 128]))
features, _ = ml.sample_batch(testing_data, batch_size=100, process_lists=False)
features, _ = ml.sample_batch(testing_data, batch_size=100)
two_tower(features)

query_tower = two_tower.query_block()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/tf/blocks/test_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_fm_block_with_multi_hot_categ_features(testing_data: Dataset):
factors_dim=32,
)

batch, _ = mm.sample_batch(testing_data, batch_size=16, process_lists=False)
batch, _ = mm.sample_batch(testing_data, batch_size=16)

output = fm_block(batch)
assert output.shape.as_list() == [16, 1]
2 changes: 1 addition & 1 deletion tests/unit/tf/inputs/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_concat_sequence(sequence_testing_data):
),
)

inputs = mm.sample_batch(sequence_testing_data, 8, include_targets=False, process_lists=True)
inputs = mm.sample_batch(sequence_testing_data, 8, include_targets=False, prepare_features=True)

outputs = seq_inputs(inputs)

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/tf/inputs/test_continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_continuous_features_ragged(sequence_testing_data: Dataset):
inputs = ml.ContinuousFeatures.from_schema(
schema, post=ml.BroadcastToSequence(context_schema, seq_schema), aggregation="concat"
)
features, _ = ml.sample_batch(sequence_testing_data, batch_size=100, process_lists=True)
features, _ = ml.sample_batch(sequence_testing_data, batch_size=100, prepare_features=True)
outputs = inputs(features)

assert outputs.to_tensor().shape == (100, 4, 6)
2 changes: 1 addition & 1 deletion tests/unit/tf/models/test_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def test_wide_deep_model_wide_onehot_multihot_feature_interaction(ecommerce_data
),
]

batch, _ = mm.sample_batch(ml_dataset, batch_size=100, process_lists=False)
batch, _ = mm.sample_batch(ml_dataset, batch_size=100)

output_wide_features = mm.ParallelBlock(wide_preprocessing_blocks)(batch)
assert set(output_wide_features.keys()) == set(
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/tf/transforms/test_negative_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def test_in_model(self, run_eagerly, music_streaming_data: Dataset, tf_random_se
testing_utils.model_test(model, dataset, run_eagerly=run_eagerly, reload_model=True)

batch_size = 10
features, targets = mm.sample_batch(dataset, batch_size=batch_size, process_lists=True)
features, targets = mm.sample_batch(dataset, batch_size=batch_size, prepare_features=True)

with_negatives = model(features, targets=targets, training=True)
assert with_negatives.predictions.shape[0] >= 50
Expand Down
25 changes: 7 additions & 18 deletions tests/unit/tf/transforms/test_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@
def test_seq_predict_next(sequence_testing_data: Dataset):
seq_schema = sequence_testing_data.schema.select_by_tag(Tags.SEQUENCE)
target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0]
predict_next = mm.SequencePredictNext(schema=seq_schema, target=target, pre=mm.ListToRagged())
predict_next = mm.SequencePredictNext(schema=seq_schema, target=target)

batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False)
batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, prepare_features=True)
output = predict_next(batch)
output_x, output_y = output
output_y = output_y[target]

as_ragged = mm.ListToRagged()
batch = as_ragged(batch)

# Checks if sequential input features were truncated in the last position
for k, v in batch.items():
if k in seq_schema.column_names:
Expand All @@ -55,14 +52,11 @@ def test_seq_predict_last(sequence_testing_data: Dataset):
target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0]
predict_last = mm.SequencePredictLast(schema=seq_schema, target=target)

batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False)
batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, prepare_features=True)
output = predict_last(batch)
output_x, output_y = output
output_y = output_y[target]

as_ragged = mm.ListToRagged()
batch = as_ragged(batch)

# Checks if sequential input features were truncated in the last position
for k, v in batch.items():
if k in seq_schema.column_names:
Expand All @@ -83,13 +77,11 @@ def test_seq_predict_random(sequence_testing_data: Dataset):
target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0]
predict_random = mm.SequencePredictRandom(schema=seq_schema, target=target)

batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False)
batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, prepare_features=True)
output = predict_random(batch)
output_x, output_y = output
output_y = output_y[target]

as_ragged = mm.ListToRagged()
batch = as_ragged(batch)
batch_size = batch[target].shape[0]

for k, v in batch.items():
Expand All @@ -108,7 +100,7 @@ def test_seq_predict_random(sequence_testing_data: Dataset):
tf.Assert(tf.reduce_all(output_x[k] == v), [output_x[k], v])

# Checks if the target has the right shape
tf.Assert(tf.reduce_all(tf.shape(output_y) == batch_size), [])
tf.Assert(tf.reduce_all(tf.shape(output_y) == tf.TensorShape((batch_size, 1))), [])


def test_seq_predict_next_output_shape(sequence_testing_data):
Expand Down Expand Up @@ -160,7 +152,7 @@ def test_seq_random_masking(sequence_testing_data: Dataset):
target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0]
predict_masked = mm.SequenceMaskRandom(schema=seq_schema, target=target, masking_prob=0.3)

batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False)
batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, prepare_features=True)

output_x, output_y = predict_masked(batch)
output_y = output_y[target]
Expand All @@ -171,9 +163,6 @@ def test_seq_random_masking(sequence_testing_data: Dataset):

asserts_mlm_target_mask(target_mask)

as_ragged = mm.ListToRagged()
batch = as_ragged(batch)

for k, v in batch.items():
# Checking if inputs values didn't change
tf.Assert(tf.reduce_all(output_x[k] == v), [output_x[k], v])
Expand Down Expand Up @@ -215,7 +204,7 @@ def test_seq_mask_random_replace_embeddings(
target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0]
predict_masked = mm.SequenceMaskRandom(schema=seq_schema, target=target, masking_prob=0.3)

batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False)
batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, prepare_features=False)

inputs, targets = predict_masked(batch)
targets = targets[target]
Expand Down

0 comments on commit 0d28de4

Please sign in to comment.