From 0141ad4f77cd22afe872af83e35b4cd56f443cdb Mon Sep 17 00:00:00 2001 From: zhuwenjing Date: Fri, 4 Nov 2022 20:13:02 +0800 Subject: [PATCH 01/34] new sok class --- merlin/models/tf/inputs/embedding.py | 145 +++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/merlin/models/tf/inputs/embedding.py b/merlin/models/tf/inputs/embedding.py index a8a0b3d437..bacd4d030e 100644 --- a/merlin/models/tf/inputs/embedding.py +++ b/merlin/models/tf/inputs/embedding.py @@ -150,6 +150,151 @@ def from_config(cls, config): return cls(dim, *schema, **config) +class SOKEmbedding(tf.keras.layers.Layer): + """ + dim: int The last dimension of the variable + vocab_sizes: list, rows of the variable list + initializer: string, a list of dict {"indices":numpy.array, "value": numpy.array} = "uniform" + When it's string, it specifies the initializer used to generate initial values. + When it's list of numpy.array, its shape must be [vocab_size[i], embedding_vec_size], + and will be used as the initial indices and value. + use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable + localized: If set to None, use Distributed Variable, otherwise Localized Variable. where the list indicates which GPU you want to put this variable on. + Default is None + Examples + -------- + .. code-block:: python + + Notes + ----- + """ + def __init__(self, + dim: int, + *col_schemas: ColumnSchema, + vocab_sizes: list, + initializer: Union[str, tf.Tensor, list] = "uniform" + use_dynamic_variable = False, + localized = None, + **kwargs + ): + super(Embedding, self).__init__(**kwargs) + self._embedding_vec_size = dim + self._vocab_sizes = vocab_sizes + self._use_dynamic_variable = use_dynamic_variable + self._localized = localized + if self._localized is None and self._use_dynamic_variable == False : + prefix_sum = [] + offset = 0 + for i in range(len(vocab_sizes)): + prefix_sum.append(offset) + offset += self._vocab_sizes[i] + prefix_sum = np.array(prefix_sum, dtype=np.int64).reshape(1, -1) + self._vocab_prefix_sum = tf.constant(prefix_sum) + print("[Info] Total vocabulary size:", offset) + + if isinstance(initializer, str): + self._var = sok.Variable( + shape=[offset, self._embedding_vec_size], initializer=tf.keras.initializers.get(initializer), dtype=tf.float32 + ) + else: + self._var = sok.Variable(initializer) + else: + self._vars = [] + for i in range(len(vocab_sizes)): + if _use_dynamic_variable: + if isinstance(initializer, str): + v = sok.DynamicVariable(dimension=self._embedding_vec_size, initializer=initializer) + else: + indices = tf.convert_to_tensor(initializer[i][0]) + values = tf.convert_to_tensor(initializer[i][1]) + sok.assign(v, indices, values) + elif self._localized is not None: + if isinstance(initializer, str): + v = sok.Variable( + shape=[self._vocab_sizes[i], self._embedding_vec_size], + initializer=tf.keras.initializers.get(intializer), + dtype=tf.float32, + mode="localized:%d" % self._localized[i], + ) + else: + v = sok.Variable( + initializer[i], + mode="localized:%d" % self._localized[i], + ) + else: + raise ValueError("Wrong Configuration!!!") + self._trainable_weights.append(v) + self._vars.append(v) + + def call(self, inputs, training=True): + if self._localized is None and self._use_dynamic_variable == False : + fused_inputs = tf.add(inputs, self._vocab_prefix_sum) + fused_inputs = tf.reshape(fused_inputs, [-1]) + fused_inputs = tf.RaggedTensor.from_tensor(tf.reshape(fused_inputs, [-1, 1])) + + emb_vectors = sok.lookup_sparse(self._var, fused_inputs, 1, "sum") + emb_vectors = tf.reshape( + emb_vectors, [-1, len(self._vocab_sizes), self._embedding_vec_size] + ) + return emb_vectors + # localized mode + else: + input_list = tf.split(inputs, num_or_size_splits=len(self._vocab_sizes), axis=1) + for i in range(len(self._vocab_sizes)): + input_list[i] = tf.RaggedTensor.from_tensor(tf.reshape(input_list[i], [-1, 1])) + emb_vectors = sok.lookup_sparse( + self._vars, + input_list, + [1 for _ in range(len(self._vocab_sizes))], + ["sum" for _ in range(len(self._vocab_sizes))], + ) + for i in range(len(self._vocab_sizes)): + emb_vectors[i] = tf.reshape(emb_vectors[i], [-1, 1, self._embedding_vec_size]) + emb_vectors = tf.concat(emb_vectors, axis=1) + return emb_vectors + + @classmethod + def from_pretrained( + cls, + data: list, + trainable=True, + name=None, + col_schema=None, + use_dynamic_variable = False, + localized = None, + **kwargs, + ) -> "SOKEmbedding": + """Create From pre-trained embeddings from a Dataset or DataFrame. + Parameters + ---------- + data : Union[Dataset, DataFrameType] + A dataset containing the pre-trained embedding weights + trainable : bool + Whether the layer should be trained or not. + name : str + The name of the layer. + """ + vocab_size=[] + weights = [] + for i, item in enumerate(data): + if use_dynamic_variable: + if isinstance(item, dict) and item.has_key("indice") and item.has_key("values"): + weights.append([item["indice"], item["values"]]) + else: + raise ValueError("DynamicVariable should be initialized with indice and values") + else: + weights.append(item) + + return cls( + dim, + col_schema, + name=name, + initializer=weights, + use_dynamic_variable = use_dynamic_variable, + localized = localized, + trainable=trainable, + **kwargs, + ) CombinerType = Union[str, tf.keras.layers.Layer] From d7ad9bad1ae35e312dfce1a16b702f4124b53c30 Mon Sep 17 00:00:00 2001 From: zhuwenjing Date: Fri, 4 Nov 2022 21:29:35 +0800 Subject: [PATCH 02/34] new sok class --- merlin/models/tf/inputs/embedding.py | 35 ++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/merlin/models/tf/inputs/embedding.py b/merlin/models/tf/inputs/embedding.py index bacd4d030e..1a4180151d 100644 --- a/merlin/models/tf/inputs/embedding.py +++ b/merlin/models/tf/inputs/embedding.py @@ -150,7 +150,8 @@ def from_config(cls, config): return cls(dim, *schema, **config) -class SOKEmbedding(tf.keras.layers.Layer): +@tf.keras.utils.register_keras_serializable(package="merlin.models") +class SOKEmbedding(EmbeddingTableBase): """ dim: int The last dimension of the variable vocab_sizes: list, rows of the variable list @@ -175,9 +176,16 @@ def __init__(self, initializer: Union[str, tf.Tensor, list] = "uniform" use_dynamic_variable = False, localized = None, + name = None, **kwargs ): - super(Embedding, self).__init__(**kwargs) + super(SOKEmbedding, self).__init__( + dim, + *col_schemas, + trainable=trainable, + name=name, + dtype=dtype, + **kwargs) self._embedding_vec_size = dim self._vocab_sizes = vocab_sizes self._use_dynamic_variable = use_dynamic_variable @@ -295,6 +303,29 @@ def from_pretrained( trainable=trainable, **kwargs, ) + def get_config(self): + config = super().get_config() + config["dim"] = self.dim + + schema = schema_to_tensorflow_metadata_json(self.schema) + config["schema"] = schema + config["vocab_sizes"] = self._vocab_sizes + config["initializer"] = self._initializer + config["use_dynamic_variable"] = self._use_dynamic_variable + config["localized"] = self._localized + + return config + + @classmethod + def from_config(cls, config): + dim = config.pop("dim") + schema = tensorflow_metadata_json_to_schema(config.pop("schema")) + vocab_size = config["vocab_sizes"] + initializer = config["initializer"] + use_dynamic_variable = config["use_dynamic_variable"] + localized = config["localized"] + + return cls(dim, *schema, vocab_size, initializer, use_dynamic_variable, localized, **config) CombinerType = Union[str, tf.keras.layers.Layer] From 7825963871866983a837bb2f85461864dbfc3bfa Mon Sep 17 00:00:00 2001 From: zhuwenjing Date: Mon, 7 Nov 2022 11:05:44 +0800 Subject: [PATCH 03/34] test sok dynamic variable --- tests/unit/tf/inputs/test_embedding.py | 54 ++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/unit/tf/inputs/test_embedding.py b/tests/unit/tf/inputs/test_embedding.py index aadfbb637e..39656967f9 100644 --- a/tests/unit/tf/inputs/test_embedding.py +++ b/tests/unit/tf/inputs/test_embedding.py @@ -316,6 +316,60 @@ def test_select_by_tag(self): assert sorted(item.features) == ["c"] assert item.table is embedding_table.table +class TestSOKEmbedding: + sample_column_schema = ColumnSchema( + "item_id", + dtype=np.int32, + properties={"domain": {"min": 0, "max": 10, "name": "item_id"}}, + tags=[Tags.CATEGORICAL], + ) + + def test_raises_with_invalid_schema(self): + column_schema = ColumnSchema("item_id") + with pytest.raises(ValueError) as exc_info: + mm.EmbeddingTable(16, column_schema) + assert "needs to have an int-domain" in str(exc_info.value) + def test_sok_variables(self, dim): + rows = [65536 * 10, 65536] + cols = [128, 4] + hotness = [10, 3] + combiners = ["sum", "sum"] + batch_size = 65536 + iters = 100 + initial_vals = [13, 17] + + # sok variables + sok_vars = [ + sok.DynamicVariable(dimension=cols[i], initializer=str(initial_vals[i])) + for i in range(len(cols)) + ] + local_indices = [] + for row in rows: + local_size = row // hvd.size() + if hvd.rank() < row % hvd.size(): + local_size += 1 + indices = np.arange(local_size) * hvd.size() + hvd.rank() + indices = tf.convert_to_tensor(indices, dtype=tf.int64) + local_indices.append(indices) + out1 = [] + for i in range(len(sok_vars)): + out1.append(tf.nn.embedding_lookup(sok_vars[i], local_indices[i])) + + tf_vars = [ + tf.Variable(tf.constant(initial_vals[i], shape=[rows[i], cols[i]], dtype=tf.float32)) + for i in range(len(rows)) + ] + out2 = [] + for i, v in enumerate(tf_vars): + out2.append(tf.nn.embedding_lookup(v, local_indices[i])) + + # Check results + diff = 0 + for i in range(len(out1)): + length = out1[i] ** 2 + out2[i] ** 2 + 1e-8 + diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) + print("[SOK INFO] diff:", diff) + assert diff < 1e-6 @pytest.mark.parametrize("trainable", [True, False]) def test_pretrained_from_InputBlockV2(trainable, music_streaming_data: Dataset): From 475145a583de8734ca3b2b03888459f149000c6a Mon Sep 17 00:00:00 2001 From: zhuwenjing Date: Mon, 7 Nov 2022 11:07:17 +0800 Subject: [PATCH 04/34] test sok dynamic variable --- tests/unit/tf/inputs/test_embedding.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/tf/inputs/test_embedding.py b/tests/unit/tf/inputs/test_embedding.py index 39656967f9..b535f9997f 100644 --- a/tests/unit/tf/inputs/test_embedding.py +++ b/tests/unit/tf/inputs/test_embedding.py @@ -329,7 +329,7 @@ def test_raises_with_invalid_schema(self): with pytest.raises(ValueError) as exc_info: mm.EmbeddingTable(16, column_schema) assert "needs to have an int-domain" in str(exc_info.value) - def test_sok_variables(self, dim): + def test_sok_dynamic_variables(self, dim): rows = [65536 * 10, 65536] cols = [128, 4] hotness = [10, 3] From 4ef7a9e281fdb8f84aae202ca3b2b90d1882ce6c Mon Sep 17 00:00:00 2001 From: zhuwenjing Date: Mon, 7 Nov 2022 13:47:42 +0800 Subject: [PATCH 05/34] bug fix comma --- merlin/models/tf/inputs/embedding.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/models/tf/inputs/embedding.py b/merlin/models/tf/inputs/embedding.py index 1a4180151d..8a54ed96fb 100644 --- a/merlin/models/tf/inputs/embedding.py +++ b/merlin/models/tf/inputs/embedding.py @@ -173,7 +173,7 @@ def __init__(self, dim: int, *col_schemas: ColumnSchema, vocab_sizes: list, - initializer: Union[str, tf.Tensor, list] = "uniform" + initializer: Union[str, tf.Tensor, list] = "uniform", use_dynamic_variable = False, localized = None, name = None, From 720eacf80e9b8ca049a742e2f070e70d45c0d41f Mon Sep 17 00:00:00 2001 From: zhuwenjing Date: Wed, 16 Nov 2022 20:40:27 +0800 Subject: [PATCH 06/34] add some comments and test distributed var --- merlin/models/tf/inputs/embedding.py | 14 +++--- tests/unit/tf/inputs/test_embedding.py | 59 +++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/merlin/models/tf/inputs/embedding.py b/merlin/models/tf/inputs/embedding.py index 8a54ed96fb..0227dcc0ce 100644 --- a/merlin/models/tf/inputs/embedding.py +++ b/merlin/models/tf/inputs/embedding.py @@ -153,15 +153,20 @@ def from_config(cls, config): @tf.keras.utils.register_keras_serializable(package="merlin.models") class SOKEmbedding(EmbeddingTableBase): """ + Wrap GPU accelerated opererations dedicated for sparse training / inference case. dim: int The last dimension of the variable vocab_sizes: list, rows of the variable list initializer: string, a list of dict {"indices":numpy.array, "value": numpy.array} = "uniform" When it's string, it specifies the initializer used to generate initial values. + For sok.DynamicVariable, currently, only support "random" or string of a float + value(meaning const initializer). + For sok.Variable, it is compatible with tf.Variable. + Default value is "uniform". When it's list of numpy.array, its shape must be [vocab_size[i], embedding_vec_size], and will be used as the initial indices and value. - use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable - localized: If set to None, use Distributed Variable, otherwise Localized Variable. where the list indicates which GPU you want to put this variable on. - Default is None + use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable. DynamicVariable can allocates memory dynamically. Variable is a model-parallel distributed variable + localized: When utilizing sok.Variable, we change choose two mode: distributed(Distributed Va riable) and localized(Localized Variable). If set to None, use Distributed Variable, otherwise Localized Variable. where the list indicates which GPU you want to put this variable on. + Default is None. Examples -------- .. code-block:: python @@ -240,7 +245,7 @@ def call(self, inputs, training=True): fused_inputs = tf.reshape(fused_inputs, [-1]) fused_inputs = tf.RaggedTensor.from_tensor(tf.reshape(fused_inputs, [-1, 1])) - emb_vectors = sok.lookup_sparse(self._var, fused_inputs, 1, "sum") + emb_vectors = sok.lookup_sparse(self._var, fused_inputs, "sum") emb_vectors = tf.reshape( emb_vectors, [-1, len(self._vocab_sizes), self._embedding_vec_size] ) @@ -253,7 +258,6 @@ def call(self, inputs, training=True): emb_vectors = sok.lookup_sparse( self._vars, input_list, - [1 for _ in range(len(self._vocab_sizes))], ["sum" for _ in range(len(self._vocab_sizes))], ) for i in range(len(self._vocab_sizes)): diff --git a/tests/unit/tf/inputs/test_embedding.py b/tests/unit/tf/inputs/test_embedding.py index b535f9997f..d3c1c47e9f 100644 --- a/tests/unit/tf/inputs/test_embedding.py +++ b/tests/unit/tf/inputs/test_embedding.py @@ -330,9 +330,16 @@ def test_raises_with_invalid_schema(self): mm.EmbeddingTable(16, column_schema) assert "needs to have an int-domain" in str(exc_info.value) def test_sok_dynamic_variables(self, dim): + hvd.init() + gpus = tf.config.experimental.list_physical_devices("GPU") + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") + sok.init() + rows = [65536 * 10, 65536] cols = [128, 4] - hotness = [10, 3] combiners = ["sum", "sum"] batch_size = 65536 iters = 100 @@ -370,6 +377,56 @@ def test_sok_dynamic_variables(self, dim): diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) print("[SOK INFO] diff:", diff) assert diff < 1e-6 + def test_distributed_variables(self,dim): + hvd.init() + gpus = tf.config.experimental.list_physical_devices("GPU") + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") + sok.init() + + rows = [65536 * 10, 65536] + cols = [128, 4] + hotness = [10, 3] + combiners = ["sum", "sum"] + batch_size = 65536 + iters = 100 + initial_vals = [13, 17] + + # initial value of embedding table + weights = [] + for i in range(len(rows)): + weight = np.random.rand(rows[i], cols[i]).astype(np.float32) + weight = tf.convert_to_tensor(weight, dtype=tf.float32) + # make sure the weight is same on each rank + weight = hvd.allreduce(weight) + weights.append(weight) + + # sok variables + sok_vars = [sok.Variable(w) for w in weights] + local_indices = [] + for row in rows: + local_size = row // hvd.size() + if hvd.rank() < row % hvd.size(): + local_size += 1 + indices = np.arange(local_size) * hvd.size() + hvd.rank() + indices = tf.convert_to_tensor(indices, dtype=tf.int64) + local_indices.append(indices) + + out1 = sok_vars + tf_vars = [tf.Variable(w) for w in weights] + out2 = [] + for i, v in enumerate(tf_vars): + out2.append(tf.nn.embedding_lookup(v, local_indices[i])) + + # Check results + diff = 0 + for i in range(len(out1)): + length = out1[i] ** 2 + out2[i] ** 2 + 1e-8 + diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) + print("[SOK INFO] diff:", diff) + assert diff < 1e-6 @pytest.mark.parametrize("trainable", [True, False]) def test_pretrained_from_InputBlockV2(trainable, music_streaming_data: Dataset): From 7a3a177a07522985a29e2d4875120e50e5f4295a Mon Sep 17 00:00:00 2001 From: zhuwenjing Date: Thu, 17 Nov 2022 13:45:27 +0800 Subject: [PATCH 07/34] format the comments --- merlin/models/tf/inputs/embedding.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/merlin/models/tf/inputs/embedding.py b/merlin/models/tf/inputs/embedding.py index 0227dcc0ce..68032120cc 100644 --- a/merlin/models/tf/inputs/embedding.py +++ b/merlin/models/tf/inputs/embedding.py @@ -164,8 +164,12 @@ class SOKEmbedding(EmbeddingTableBase): Default value is "uniform". When it's list of numpy.array, its shape must be [vocab_size[i], embedding_vec_size], and will be used as the initial indices and value. - use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable. DynamicVariable can allocates memory dynamically. Variable is a model-parallel distributed variable - localized: When utilizing sok.Variable, we change choose two mode: distributed(Distributed Va riable) and localized(Localized Variable). If set to None, use Distributed Variable, otherwise Localized Variable. where the list indicates which GPU you want to put this variable on. + use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable. DynamicVariable + can allocates memory dynamically. Variable is a model-parallel distributed variable + localized: When utilizing sok.Variable, we change choose two mode: distributed(Distributed Va + riable) and localized(Localized Variable). If set to None, use Distributed Variable, + otherwise Localized Variable. where the list indicates which GPU you want to put this + variable on. Default is None. Examples -------- From 54f795b452ab6618546d9564425688a16ba4bfda Mon Sep 17 00:00:00 2001 From: wenjingz Date: Mon, 5 Dec 2022 23:59:04 -0800 Subject: [PATCH 08/34] assert condition in sok lookup sparse --- merlin/models/tf/inputs/embedding.py | 90 ++++++++++++++-------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/merlin/models/tf/inputs/embedding.py b/merlin/models/tf/inputs/embedding.py index 68032120cc..96ddf172b5 100644 --- a/merlin/models/tf/inputs/embedding.py +++ b/merlin/models/tf/inputs/embedding.py @@ -156,14 +156,17 @@ class SOKEmbedding(EmbeddingTableBase): Wrap GPU accelerated opererations dedicated for sparse training / inference case. dim: int The last dimension of the variable vocab_sizes: list, rows of the variable list - initializer: string, a list of dict {"indices":numpy.array, "value": numpy.array} = "uniform" + initializer: string, list = "uniform" When it's string, it specifies the initializer used to generate initial values. For sok.DynamicVariable, currently, only support "random" or string of a float value(meaning const initializer). For sok.Variable, it is compatible with tf.Variable. Default value is "uniform". - When it's list of numpy.array, its shape must be [vocab_size[i], embedding_vec_size], - and will be used as the initial indices and value. + When it's list, it specifies the values in the embedding table. + For sok.DynamicVariable, initializer[i] must be list of [index, value], + and will be used as the initial indices and value for i-th sok.DynamicVariable. + For sok.Variable, initializer[i] must be a numpy with shape [vocab_size[i], embedding_vec_size], + and will be used as the initial value for i-th sok.Variable. use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable. DynamicVariable can allocates memory dynamically. Variable is a model-parallel distributed variable localized: When utilizing sok.Variable, we change choose two mode: distributed(Distributed Va @@ -199,24 +202,16 @@ def __init__(self, self._vocab_sizes = vocab_sizes self._use_dynamic_variable = use_dynamic_variable self._localized = localized + self._vars = [] if self._localized is None and self._use_dynamic_variable == False : - prefix_sum = [] - offset = 0 for i in range(len(vocab_sizes)): - prefix_sum.append(offset) - offset += self._vocab_sizes[i] - prefix_sum = np.array(prefix_sum, dtype=np.int64).reshape(1, -1) - self._vocab_prefix_sum = tf.constant(prefix_sum) - print("[Info] Total vocabulary size:", offset) - - if isinstance(initializer, str): - self._var = sok.Variable( - shape=[offset, self._embedding_vec_size], initializer=tf.keras.initializers.get(initializer), dtype=tf.float32 - ) - else: - self._var = sok.Variable(initializer) + if isinstance(initializer, str): + v = sok.Variable( + shape=[self._vocab_sizes[i], self._embedding_vec_size], initializer=tf.keras.initializers.get(initializer), dtype=tf.float32 + ) + else: + v = sok.Variable(initializer[i]) else: - self._vars = [] for i in range(len(vocab_sizes)): if _use_dynamic_variable: if isinstance(initializer, str): @@ -240,34 +235,38 @@ def __init__(self, ) else: raise ValueError("Wrong Configuration!!!") - self._trainable_weights.append(v) - self._vars.append(v) + self._trainable_weights.append(v) + self._vars.append(v) - def call(self, inputs, training=True): - if self._localized is None and self._use_dynamic_variable == False : - fused_inputs = tf.add(inputs, self._vocab_prefix_sum) - fused_inputs = tf.reshape(fused_inputs, [-1]) - fused_inputs = tf.RaggedTensor.from_tensor(tf.reshape(fused_inputs, [-1, 1])) - - emb_vectors = sok.lookup_sparse(self._var, fused_inputs, "sum") - emb_vectors = tf.reshape( - emb_vectors, [-1, len(self._vocab_sizes), self._embedding_vec_size] - ) - return emb_vectors - # localized mode + def call(self, inputs, combiners, training=True): + """ + inputs: list, tuple + a list or tuple of tf.SparseTensor or tf.RaggedTensor. + combiners: list, tuple + a list or tuple of string to specify the combiner of each lookup. + """ + is_list = isinstance(inputs, list) or isinstance(inputs, tuple) + if is_list: + for cur_input in inputs: + if not isinstance(cur_input, tf.SparseTensor): + if not isinstance(cur_input, tf.RaggedTensor): + raise ValueError("The input must be a list of tf.SparseTensor or tf.RaggedTensor") + else: + if not len(cur_input.shape)==2: + raise ValueError ("The rank of input RaggedTensor must be 2") else: - input_list = tf.split(inputs, num_or_size_splits=len(self._vocab_sizes), axis=1) - for i in range(len(self._vocab_sizes)): - input_list[i] = tf.RaggedTensor.from_tensor(tf.reshape(input_list[i], [-1, 1])) - emb_vectors = sok.lookup_sparse( - self._vars, - input_list, - ["sum" for _ in range(len(self._vocab_sizes))], - ) - for i in range(len(self._vocab_sizes)): - emb_vectors[i] = tf.reshape(emb_vectors[i], [-1, 1, self._embedding_vec_size]) - emb_vectors = tf.concat(emb_vectors, axis=1) - return emb_vectors + if not isinstance(cur_input, tf.SparseTensor): + if not isinstance(cur_input, tf.RaggedTensor): + raise ValueError("The input must be a list of tf.SparseTensor or tf.RaggedTensor") + else: + if not len(cur_input.shape)==2: + raise ValueError ("The rank of input RaggedTensor must be 2") + emb_vectors = sok.lookup_sparse( + self._vars, + inputs, + combiners, + ) + return emb_vectors @classmethod def from_pretrained( @@ -283,8 +282,7 @@ def from_pretrained( """Create From pre-trained embeddings from a Dataset or DataFrame. Parameters ---------- - data : Union[Dataset, DataFrameType] - A dataset containing the pre-trained embedding weights + data : A list of numpy.array or A list of dict {"indice": numpy.array, "values": numpy.array} trainable : bool Whether the layer should be trained or not. name : str From 7555c6f8fbc42db7def23309b757a6c9e7fc7208 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 14 Dec 2022 05:53:18 -0800 Subject: [PATCH 09/34] Move SOKEmbedding to a separate file --- merlin/models/tf/distributed/embedding.py | 193 ++++++++++++++++++++++ merlin/models/tf/inputs/embedding.py | 182 -------------------- tests/unit/tf/horovod/test_embedding.py | 124 ++++++++++++++ tests/unit/tf/inputs/test_embedding.py | 111 ------------- 4 files changed, 317 insertions(+), 293 deletions(-) create mode 100644 merlin/models/tf/distributed/embedding.py create mode 100644 tests/unit/tf/horovod/test_embedding.py diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py new file mode 100644 index 0000000000..c0de96110a --- /dev/null +++ b/merlin/models/tf/distributed/embedding.py @@ -0,0 +1,193 @@ +from typing import Union + +import tensorflow as tf + +from merlin.models.tf.inputs.embedding import EmbeddingTableBase +from merlin.models.utils.schema_utils import ( + schema_to_tensorflow_metadata_json, + tensorflow_metadata_json_to_schema, +) +from merlin.schema import ColumnSchema + + +@tf.keras.utils.register_keras_serializable(package="merlin.models") +class SOKEmbedding(EmbeddingTableBase): + """ + Wrap GPU accelerated opererations dedicated for sparse training / inference case. + dim: int The last dimension of the variable + vocab_sizes: list, rows of the variable list + initializer: string, list = "uniform" + When it's string, it specifies the initializer used to generate initial values. + For sok.DynamicVariable, currently, only support "random" or string of a float + value(meaning const initializer). + For sok.Variable, it is compatible with tf.Variable. + Default value is "uniform". + When it's list, it specifies the values in the embedding table. + For sok.DynamicVariable, initializer[i] must be list of [index, value], + and will be used as the initial indices and value for i-th sok.DynamicVariable. + For sok.Variable, initializer[i] must be a numpy with shape [vocab_size[i], embedding_vec_size], + and will be used as the initial value for i-th sok.Variable. + use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable. DynamicVariable + can allocates memory dynamically. Variable is a model-parallel distributed variable + localized: When utilizing sok.Variable, we change choose two mode: distributed(Distributed Va + riable) and localized(Localized Variable). If set to None, use Distributed Variable, + otherwise Localized Variable. where the list indicates which GPU you want to put this + variable on. + Default is None. + Examples + -------- + .. code-block:: python + Notes + ----- + """ + def __init__(self, + dim: int, + *col_schemas: ColumnSchema, + vocab_sizes: list, + initializer: Union[str, tf.Tensor, list] = "uniform", + use_dynamic_variable = False, + localized = None, + name = None, + **kwargs + ): + super(SOKEmbedding, self).__init__( + dim, + *col_schemas, + trainable=trainable, + name=name, + dtype=dtype, + **kwargs) + self._embedding_vec_size = dim + self._vocab_sizes = vocab_sizes + self._use_dynamic_variable = use_dynamic_variable + self._localized = localized + self._vars = [] + if self._localized is None and self._use_dynamic_variable == False : + for i in range(len(vocab_sizes)): + if isinstance(initializer, str): + v = sok.Variable( + shape=[self._vocab_sizes[i], self._embedding_vec_size], initializer=tf.keras.initializers.get(initializer), dtype=tf.float32 + ) + else: + v = sok.Variable(initializer[i]) + else: + for i in range(len(vocab_sizes)): + if _use_dynamic_variable: + if isinstance(initializer, str): + v = sok.DynamicVariable(dimension=self._embedding_vec_size, initializer=initializer) + else: + indices = tf.convert_to_tensor(initializer[i][0]) + values = tf.convert_to_tensor(initializer[i][1]) + sok.assign(v, indices, values) + elif self._localized is not None: + if isinstance(initializer, str): + v = sok.Variable( + shape=[self._vocab_sizes[i], self._embedding_vec_size], + initializer=tf.keras.initializers.get(intializer), + dtype=tf.float32, + mode="localized:%d" % self._localized[i], + ) + else: + v = sok.Variable( + initializer[i], + mode="localized:%d" % self._localized[i], + ) + else: + raise ValueError("Wrong Configuration!!!") + self._trainable_weights.append(v) + self._vars.append(v) + + def call(self, inputs, combiners, training=True): + """ + inputs: list, tuple + a list or tuple of tf.SparseTensor or tf.RaggedTensor. + combiners: list, tuple + a list or tuple of string to specify the combiner of each lookup. + """ + is_list = isinstance(inputs, list) or isinstance(inputs, tuple) + if is_list: + for cur_input in inputs: + if not isinstance(cur_input, tf.SparseTensor): + if not isinstance(cur_input, tf.RaggedTensor): + raise ValueError("The input must be a list of tf.SparseTensor or tf.RaggedTensor") + else: + if not len(cur_input.shape)==2: + raise ValueError ("The rank of input RaggedTensor must be 2") + else: + if not isinstance(cur_input, tf.SparseTensor): + if not isinstance(cur_input, tf.RaggedTensor): + raise ValueError("The input must be a list of tf.SparseTensor or tf.RaggedTensor") + else: + if not len(cur_input.shape)==2: + raise ValueError ("The rank of input RaggedTensor must be 2") + emb_vectors = sok.lookup_sparse( + self._vars, + inputs, + combiners, + ) + return emb_vectors + + @classmethod + def from_pretrained( + cls, + data: list, + trainable=True, + name=None, + col_schema=None, + use_dynamic_variable = False, + localized = None, + **kwargs, + ) -> "SOKEmbedding": + """Create From pre-trained embeddings from a Dataset or DataFrame. + Parameters + ---------- + data : A list of numpy.array or A list of dict {"indice": numpy.array, "values": numpy.array} + trainable : bool + Whether the layer should be trained or not. + name : str + The name of the layer. + """ + vocab_size=[] + weights = [] + for i, item in enumerate(data): + if use_dynamic_variable: + if isinstance(item, dict) and item.has_key("indice") and item.has_key("values"): + weights.append([item["indice"], item["values"]]) + else: + raise ValueError("DynamicVariable should be initialized with indice and values") + else: + weights.append(item) + + return cls( + dim, + col_schema, + name=name, + initializer=weights, + use_dynamic_variable = use_dynamic_variable, + localized = localized, + trainable=trainable, + **kwargs, + ) + def get_config(self): + config = super().get_config() + config["dim"] = self.dim + + schema = schema_to_tensorflow_metadata_json(self.schema) + config["schema"] = schema + config["vocab_sizes"] = self._vocab_sizes + config["initializer"] = self._initializer + config["use_dynamic_variable"] = self._use_dynamic_variable + config["localized"] = self._localized + + return config + + @classmethod + def from_config(cls, config): + dim = config.pop("dim") + schema = tensorflow_metadata_json_to_schema(config.pop("schema")) + vocab_size = config["vocab_sizes"] + initializer = config["initializer"] + use_dynamic_variable = config["use_dynamic_variable"] + localized = config["localized"] + + return cls(dim, *schema, vocab_size, initializer, use_dynamic_variable, localized, **config) diff --git a/merlin/models/tf/inputs/embedding.py b/merlin/models/tf/inputs/embedding.py index e9fe93d201..cb59688658 100644 --- a/merlin/models/tf/inputs/embedding.py +++ b/merlin/models/tf/inputs/embedding.py @@ -150,188 +150,6 @@ def from_config(cls, config): return cls(dim, *schema, **config) -@tf.keras.utils.register_keras_serializable(package="merlin.models") -class SOKEmbedding(EmbeddingTableBase): - """ - Wrap GPU accelerated opererations dedicated for sparse training / inference case. - dim: int The last dimension of the variable - vocab_sizes: list, rows of the variable list - initializer: string, list = "uniform" - When it's string, it specifies the initializer used to generate initial values. - For sok.DynamicVariable, currently, only support "random" or string of a float - value(meaning const initializer). - For sok.Variable, it is compatible with tf.Variable. - Default value is "uniform". - When it's list, it specifies the values in the embedding table. - For sok.DynamicVariable, initializer[i] must be list of [index, value], - and will be used as the initial indices and value for i-th sok.DynamicVariable. - For sok.Variable, initializer[i] must be a numpy with shape [vocab_size[i], embedding_vec_size], - and will be used as the initial value for i-th sok.Variable. - use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable. DynamicVariable - can allocates memory dynamically. Variable is a model-parallel distributed variable - localized: When utilizing sok.Variable, we change choose two mode: distributed(Distributed Va - riable) and localized(Localized Variable). If set to None, use Distributed Variable, - otherwise Localized Variable. where the list indicates which GPU you want to put this - variable on. - Default is None. - Examples - -------- - .. code-block:: python - - Notes - ----- - """ - def __init__(self, - dim: int, - *col_schemas: ColumnSchema, - vocab_sizes: list, - initializer: Union[str, tf.Tensor, list] = "uniform", - use_dynamic_variable = False, - localized = None, - name = None, - **kwargs - ): - super(SOKEmbedding, self).__init__( - dim, - *col_schemas, - trainable=trainable, - name=name, - dtype=dtype, - **kwargs) - self._embedding_vec_size = dim - self._vocab_sizes = vocab_sizes - self._use_dynamic_variable = use_dynamic_variable - self._localized = localized - self._vars = [] - if self._localized is None and self._use_dynamic_variable == False : - for i in range(len(vocab_sizes)): - if isinstance(initializer, str): - v = sok.Variable( - shape=[self._vocab_sizes[i], self._embedding_vec_size], initializer=tf.keras.initializers.get(initializer), dtype=tf.float32 - ) - else: - v = sok.Variable(initializer[i]) - else: - for i in range(len(vocab_sizes)): - if _use_dynamic_variable: - if isinstance(initializer, str): - v = sok.DynamicVariable(dimension=self._embedding_vec_size, initializer=initializer) - else: - indices = tf.convert_to_tensor(initializer[i][0]) - values = tf.convert_to_tensor(initializer[i][1]) - sok.assign(v, indices, values) - elif self._localized is not None: - if isinstance(initializer, str): - v = sok.Variable( - shape=[self._vocab_sizes[i], self._embedding_vec_size], - initializer=tf.keras.initializers.get(intializer), - dtype=tf.float32, - mode="localized:%d" % self._localized[i], - ) - else: - v = sok.Variable( - initializer[i], - mode="localized:%d" % self._localized[i], - ) - else: - raise ValueError("Wrong Configuration!!!") - self._trainable_weights.append(v) - self._vars.append(v) - - def call(self, inputs, combiners, training=True): - """ - inputs: list, tuple - a list or tuple of tf.SparseTensor or tf.RaggedTensor. - combiners: list, tuple - a list or tuple of string to specify the combiner of each lookup. - """ - is_list = isinstance(inputs, list) or isinstance(inputs, tuple) - if is_list: - for cur_input in inputs: - if not isinstance(cur_input, tf.SparseTensor): - if not isinstance(cur_input, tf.RaggedTensor): - raise ValueError("The input must be a list of tf.SparseTensor or tf.RaggedTensor") - else: - if not len(cur_input.shape)==2: - raise ValueError ("The rank of input RaggedTensor must be 2") - else: - if not isinstance(cur_input, tf.SparseTensor): - if not isinstance(cur_input, tf.RaggedTensor): - raise ValueError("The input must be a list of tf.SparseTensor or tf.RaggedTensor") - else: - if not len(cur_input.shape)==2: - raise ValueError ("The rank of input RaggedTensor must be 2") - emb_vectors = sok.lookup_sparse( - self._vars, - inputs, - combiners, - ) - return emb_vectors - - @classmethod - def from_pretrained( - cls, - data: list, - trainable=True, - name=None, - col_schema=None, - use_dynamic_variable = False, - localized = None, - **kwargs, - ) -> "SOKEmbedding": - """Create From pre-trained embeddings from a Dataset or DataFrame. - Parameters - ---------- - data : A list of numpy.array or A list of dict {"indice": numpy.array, "values": numpy.array} - trainable : bool - Whether the layer should be trained or not. - name : str - The name of the layer. - """ - vocab_size=[] - weights = [] - for i, item in enumerate(data): - if use_dynamic_variable: - if isinstance(item, dict) and item.has_key("indice") and item.has_key("values"): - weights.append([item["indice"], item["values"]]) - else: - raise ValueError("DynamicVariable should be initialized with indice and values") - else: - weights.append(item) - - return cls( - dim, - col_schema, - name=name, - initializer=weights, - use_dynamic_variable = use_dynamic_variable, - localized = localized, - trainable=trainable, - **kwargs, - ) - def get_config(self): - config = super().get_config() - config["dim"] = self.dim - - schema = schema_to_tensorflow_metadata_json(self.schema) - config["schema"] = schema - config["vocab_sizes"] = self._vocab_sizes - config["initializer"] = self._initializer - config["use_dynamic_variable"] = self._use_dynamic_variable - config["localized"] = self._localized - - return config - - @classmethod - def from_config(cls, config): - dim = config.pop("dim") - schema = tensorflow_metadata_json_to_schema(config.pop("schema")) - vocab_size = config["vocab_sizes"] - initializer = config["initializer"] - use_dynamic_variable = config["use_dynamic_variable"] - localized = config["localized"] - - return cls(dim, *schema, vocab_size, initializer, use_dynamic_variable, localized, **config) CombinerType = Union[str, tf.keras.layers.Layer] diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py new file mode 100644 index 0000000000..ae4f463ad2 --- /dev/null +++ b/tests/unit/tf/horovod/test_embedding.py @@ -0,0 +1,124 @@ +import numpy as np +import pytest +import tensorflow as tf + +import merlin.models.tf as mm +from merlin.schema import ColumnSchema, Tags + +hvd = pytest.importorskip("horovod.tensorflow") +sok = pytest.importorskip("sparse_operation_kit") + + +class TestSOKEmbedding: + sample_column_schema = ColumnSchema( + "item_id", + dtype=np.int32, + properties={"domain": {"min": 0, "max": 10, "name": "item_id"}}, + tags=[Tags.CATEGORICAL], + ) + + def test_raises_with_invalid_schema(self): + column_schema = ColumnSchema("item_id") + with pytest.raises(ValueError) as exc_info: + mm.EmbeddingTable(16, column_schema) + assert "needs to have an int-domain" in str(exc_info.value) + + def test_sok_dynamic_variables(self, dim): + hvd.init() + gpus = tf.config.experimental.list_physical_devices("GPU") + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") + sok.init() + + rows = [65536 * 10, 65536] + cols = [128, 4] + combiners = ["sum", "sum"] + batch_size = 65536 + iters = 100 + initial_vals = [13, 17] + + # sok variables + sok_vars = [ + sok.DynamicVariable(dimension=cols[i], initializer=str(initial_vals[i])) + for i in range(len(cols)) + ] + local_indices = [] + for row in rows: + local_size = row // hvd.size() + if hvd.rank() < row % hvd.size(): + local_size += 1 + indices = np.arange(local_size) * hvd.size() + hvd.rank() + indices = tf.convert_to_tensor(indices, dtype=tf.int64) + local_indices.append(indices) + out1 = [] + for i in range(len(sok_vars)): + out1.append(tf.nn.embedding_lookup(sok_vars[i], local_indices[i])) + + tf_vars = [ + tf.Variable(tf.constant(initial_vals[i], shape=[rows[i], cols[i]], dtype=tf.float32)) + for i in range(len(rows)) + ] + out2 = [] + for i, v in enumerate(tf_vars): + out2.append(tf.nn.embedding_lookup(v, local_indices[i])) + + # Check results + diff = 0 + for i in range(len(out1)): + length = out1[i] ** 2 + out2[i] ** 2 + 1e-8 + diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) + print("[SOK INFO] diff:", diff) + assert diff < 1e-6 + + def test_distributed_variables(self, dim): + hvd.init() + gpus = tf.config.experimental.list_physical_devices("GPU") + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") + sok.init() + + rows = [65536 * 10, 65536] + cols = [128, 4] + hotness = [10, 3] + combiners = ["sum", "sum"] + batch_size = 65536 + iters = 100 + initial_vals = [13, 17] + + # initial value of embedding table + weights = [] + for i in range(len(rows)): + weight = np.random.rand(rows[i], cols[i]).astype(np.float32) + weight = tf.convert_to_tensor(weight, dtype=tf.float32) + # make sure the weight is same on each rank + weight = hvd.allreduce(weight) + weights.append(weight) + + # sok variables + sok_vars = [sok.Variable(w) for w in weights] + local_indices = [] + for row in rows: + local_size = row // hvd.size() + if hvd.rank() < row % hvd.size(): + local_size += 1 + indices = np.arange(local_size) * hvd.size() + hvd.rank() + indices = tf.convert_to_tensor(indices, dtype=tf.int64) + local_indices.append(indices) + + out1 = sok_vars + tf_vars = [tf.Variable(w) for w in weights] + out2 = [] + for i, v in enumerate(tf_vars): + out2.append(tf.nn.embedding_lookup(v, local_indices[i])) + + # Check results + diff = 0 + for i in range(len(out1)): + length = out1[i] ** 2 + out2[i] ** 2 + 1e-8 + diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) + print("[SOK INFO] diff:", diff) + assert diff < 1e-6 diff --git a/tests/unit/tf/inputs/test_embedding.py b/tests/unit/tf/inputs/test_embedding.py index de302b0362..fa11c61cb6 100644 --- a/tests/unit/tf/inputs/test_embedding.py +++ b/tests/unit/tf/inputs/test_embedding.py @@ -316,117 +316,6 @@ def test_select_by_tag(self): assert sorted(item.features) == ["c"] assert item.table is embedding_table.table -class TestSOKEmbedding: - sample_column_schema = ColumnSchema( - "item_id", - dtype=np.int32, - properties={"domain": {"min": 0, "max": 10, "name": "item_id"}}, - tags=[Tags.CATEGORICAL], - ) - - def test_raises_with_invalid_schema(self): - column_schema = ColumnSchema("item_id") - with pytest.raises(ValueError) as exc_info: - mm.EmbeddingTable(16, column_schema) - assert "needs to have an int-domain" in str(exc_info.value) - def test_sok_dynamic_variables(self, dim): - hvd.init() - gpus = tf.config.experimental.list_physical_devices("GPU") - for gpu in gpus: - tf.config.experimental.set_memory_growth(gpu, True) - if gpus: - tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") - sok.init() - - rows = [65536 * 10, 65536] - cols = [128, 4] - combiners = ["sum", "sum"] - batch_size = 65536 - iters = 100 - initial_vals = [13, 17] - - # sok variables - sok_vars = [ - sok.DynamicVariable(dimension=cols[i], initializer=str(initial_vals[i])) - for i in range(len(cols)) - ] - local_indices = [] - for row in rows: - local_size = row // hvd.size() - if hvd.rank() < row % hvd.size(): - local_size += 1 - indices = np.arange(local_size) * hvd.size() + hvd.rank() - indices = tf.convert_to_tensor(indices, dtype=tf.int64) - local_indices.append(indices) - out1 = [] - for i in range(len(sok_vars)): - out1.append(tf.nn.embedding_lookup(sok_vars[i], local_indices[i])) - - tf_vars = [ - tf.Variable(tf.constant(initial_vals[i], shape=[rows[i], cols[i]], dtype=tf.float32)) - for i in range(len(rows)) - ] - out2 = [] - for i, v in enumerate(tf_vars): - out2.append(tf.nn.embedding_lookup(v, local_indices[i])) - - # Check results - diff = 0 - for i in range(len(out1)): - length = out1[i] ** 2 + out2[i] ** 2 + 1e-8 - diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) - print("[SOK INFO] diff:", diff) - assert diff < 1e-6 - def test_distributed_variables(self,dim): - hvd.init() - gpus = tf.config.experimental.list_physical_devices("GPU") - for gpu in gpus: - tf.config.experimental.set_memory_growth(gpu, True) - if gpus: - tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") - sok.init() - - rows = [65536 * 10, 65536] - cols = [128, 4] - hotness = [10, 3] - combiners = ["sum", "sum"] - batch_size = 65536 - iters = 100 - initial_vals = [13, 17] - - # initial value of embedding table - weights = [] - for i in range(len(rows)): - weight = np.random.rand(rows[i], cols[i]).astype(np.float32) - weight = tf.convert_to_tensor(weight, dtype=tf.float32) - # make sure the weight is same on each rank - weight = hvd.allreduce(weight) - weights.append(weight) - - # sok variables - sok_vars = [sok.Variable(w) for w in weights] - local_indices = [] - for row in rows: - local_size = row // hvd.size() - if hvd.rank() < row % hvd.size(): - local_size += 1 - indices = np.arange(local_size) * hvd.size() + hvd.rank() - indices = tf.convert_to_tensor(indices, dtype=tf.int64) - local_indices.append(indices) - - out1 = sok_vars - tf_vars = [tf.Variable(w) for w in weights] - out2 = [] - for i, v in enumerate(tf_vars): - out2.append(tf.nn.embedding_lookup(v, local_indices[i])) - - # Check results - diff = 0 - for i in range(len(out1)): - length = out1[i] ** 2 + out2[i] ** 2 + 1e-8 - diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) - print("[SOK INFO] diff:", diff) - assert diff < 1e-6 @pytest.mark.parametrize("trainable", [True, False]) def test_pretrained_from_InputBlockV2(trainable, music_streaming_data: Dataset): From d0111b10495cdabf4ba3523c62b93aeac5565927 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 14 Dec 2022 06:06:15 -0800 Subject: [PATCH 10/34] Clean up --- merlin/models/tf/distributed/embedding.py | 72 +++++++++++++---------- requirements/horovod.txt | 1 + tests/unit/tf/horovod/test_embedding.py | 12 +--- 3 files changed, 43 insertions(+), 42 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index c0de96110a..5b0ec87e3b 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -1,5 +1,6 @@ from typing import Union +import sparse_operation_kit as sok import tensorflow as tf from merlin.models.tf.inputs.embedding import EmbeddingTableBase @@ -15,13 +16,13 @@ class SOKEmbedding(EmbeddingTableBase): """ Wrap GPU accelerated opererations dedicated for sparse training / inference case. dim: int The last dimension of the variable - vocab_sizes: list, rows of the variable list + vocab_sizes: list, rows of the variable list initializer: string, list = "uniform" When it's string, it specifies the initializer used to generate initial values. For sok.DynamicVariable, currently, only support "random" or string of a float value(meaning const initializer). For sok.Variable, it is compatible with tf.Variable. - Default value is "uniform". + Default value is "uniform". When it's list, it specifies the values in the embedding table. For sok.DynamicVariable, initializer[i] must be list of [index, value], and will be used as the initial indices and value for i-th sok.DynamicVariable. @@ -30,57 +31,59 @@ class SOKEmbedding(EmbeddingTableBase): use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable. DynamicVariable can allocates memory dynamically. Variable is a model-parallel distributed variable localized: When utilizing sok.Variable, we change choose two mode: distributed(Distributed Va - riable) and localized(Localized Variable). If set to None, use Distributed Variable, + riable) and localized(Localized Variable). If set to None, use Distributed Variable, otherwise Localized Variable. where the list indicates which GPU you want to put this variable on. - Default is None. + Default is None. Examples -------- .. code-block:: python Notes ----- """ - def __init__(self, + + def __init__( + self, dim: int, *col_schemas: ColumnSchema, vocab_sizes: list, initializer: Union[str, tf.Tensor, list] = "uniform", - use_dynamic_variable = False, - localized = None, - name = None, - **kwargs - ): + use_dynamic_variable=False, + localized=None, + name=None, + **kwargs, + ): super(SOKEmbedding, self).__init__( - dim, - *col_schemas, - trainable=trainable, - name=name, - dtype=dtype, - **kwargs) + dim, *col_schemas, trainable=trainable, name=name, dtype=dtype, **kwargs + ) self._embedding_vec_size = dim self._vocab_sizes = vocab_sizes self._use_dynamic_variable = use_dynamic_variable self._localized = localized self._vars = [] - if self._localized is None and self._use_dynamic_variable == False : + if self._localized is None and self._use_dynamic_variable == False: for i in range(len(vocab_sizes)): if isinstance(initializer, str): v = sok.Variable( - shape=[self._vocab_sizes[i], self._embedding_vec_size], initializer=tf.keras.initializers.get(initializer), dtype=tf.float32 + shape=[self._vocab_sizes[i], self._embedding_vec_size], + initializer=tf.keras.initializers.get(initializer), + dtype=tf.float32, ) else: v = sok.Variable(initializer[i]) else: for i in range(len(vocab_sizes)): if _use_dynamic_variable: - if isinstance(initializer, str): - v = sok.DynamicVariable(dimension=self._embedding_vec_size, initializer=initializer) + if isinstance(initializer, str): + v = sok.DynamicVariable( + dimension=self._embedding_vec_size, initializer=initializer + ) else: indices = tf.convert_to_tensor(initializer[i][0]) values = tf.convert_to_tensor(initializer[i][1]) sok.assign(v, indices, values) elif self._localized is not None: - if isinstance(initializer, str): + if isinstance(initializer, str): v = sok.Variable( shape=[self._vocab_sizes[i], self._embedding_vec_size], initializer=tf.keras.initializers.get(intializer), @@ -109,17 +112,21 @@ def call(self, inputs, combiners, training=True): for cur_input in inputs: if not isinstance(cur_input, tf.SparseTensor): if not isinstance(cur_input, tf.RaggedTensor): - raise ValueError("The input must be a list of tf.SparseTensor or tf.RaggedTensor") + raise ValueError( + "The input must be a list of tf.SparseTensor or tf.RaggedTensor" + ) else: - if not len(cur_input.shape)==2: - raise ValueError ("The rank of input RaggedTensor must be 2") + if not len(cur_input.shape) == 2: + raise ValueError("The rank of input RaggedTensor must be 2") else: if not isinstance(cur_input, tf.SparseTensor): if not isinstance(cur_input, tf.RaggedTensor): - raise ValueError("The input must be a list of tf.SparseTensor or tf.RaggedTensor") + raise ValueError( + "The input must be a list of tf.SparseTensor or tf.RaggedTensor" + ) else: - if not len(cur_input.shape)==2: - raise ValueError ("The rank of input RaggedTensor must be 2") + if not len(cur_input.shape) == 2: + raise ValueError("The rank of input RaggedTensor must be 2") emb_vectors = sok.lookup_sparse( self._vars, inputs, @@ -134,8 +141,8 @@ def from_pretrained( trainable=True, name=None, col_schema=None, - use_dynamic_variable = False, - localized = None, + use_dynamic_variable=False, + localized=None, **kwargs, ) -> "SOKEmbedding": """Create From pre-trained embeddings from a Dataset or DataFrame. @@ -147,7 +154,7 @@ def from_pretrained( name : str The name of the layer. """ - vocab_size=[] + vocab_size = [] weights = [] for i, item in enumerate(data): if use_dynamic_variable: @@ -163,11 +170,12 @@ def from_pretrained( col_schema, name=name, initializer=weights, - use_dynamic_variable = use_dynamic_variable, - localized = localized, + use_dynamic_variable=use_dynamic_variable, + localized=localized, trainable=trainable, **kwargs, ) + def get_config(self): config = super().get_config() config["dim"] = self.dim diff --git a/requirements/horovod.txt b/requirements/horovod.txt index 8229a149aa..e342149c04 100644 --- a/requirements/horovod.txt +++ b/requirements/horovod.txt @@ -1 +1,2 @@ horovod +merlin-sok diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index ae4f463ad2..921eb0866f 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -23,13 +23,9 @@ def test_raises_with_invalid_schema(self): mm.EmbeddingTable(16, column_schema) assert "needs to have an int-domain" in str(exc_info.value) + @pytest.mark.parametrize("dim", [16, 32]) def test_sok_dynamic_variables(self, dim): hvd.init() - gpus = tf.config.experimental.list_physical_devices("GPU") - for gpu in gpus: - tf.config.experimental.set_memory_growth(gpu, True) - if gpus: - tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") sok.init() rows = [65536 * 10, 65536] @@ -72,13 +68,9 @@ def test_sok_dynamic_variables(self, dim): print("[SOK INFO] diff:", diff) assert diff < 1e-6 + @pytest.mark.parametrize("dim", [16, 32]) def test_distributed_variables(self, dim): hvd.init() - gpus = tf.config.experimental.list_physical_devices("GPU") - for gpu in gpus: - tf.config.experimental.set_memory_growth(gpu, True) - if gpus: - tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") sok.init() rows = [65536 * 10, 65536] From debb6e2ca04d0ff1995dccbc6a44cbe3ea4ec6ff Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 14 Dec 2022 06:11:07 -0800 Subject: [PATCH 11/34] Clean up --- merlin/models/tf/distributed/embedding.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 5b0ec87e3b..2999b98884 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -26,7 +26,8 @@ class SOKEmbedding(EmbeddingTableBase): When it's list, it specifies the values in the embedding table. For sok.DynamicVariable, initializer[i] must be list of [index, value], and will be used as the initial indices and value for i-th sok.DynamicVariable. - For sok.Variable, initializer[i] must be a numpy with shape [vocab_size[i], embedding_vec_size], + For sok.Variable, initializer[i] must be a numpy with shape + [vocab_size[i], embedding_vec_size], and will be used as the initial value for i-th sok.Variable. use_dynamic_variable: bool = "False" use sok.DynamicVariable or sok.Variable. DynamicVariable can allocates memory dynamically. Variable is a model-parallel distributed variable @@ -61,7 +62,7 @@ def __init__( self._use_dynamic_variable = use_dynamic_variable self._localized = localized self._vars = [] - if self._localized is None and self._use_dynamic_variable == False: + if self._localized is None and self._use_dynamic_variable is False: for i in range(len(vocab_sizes)): if isinstance(initializer, str): v = sok.Variable( @@ -86,7 +87,7 @@ def __init__( if isinstance(initializer, str): v = sok.Variable( shape=[self._vocab_sizes[i], self._embedding_vec_size], - initializer=tf.keras.initializers.get(intializer), + initializer=tf.keras.initializers.get(initializer), dtype=tf.float32, mode="localized:%d" % self._localized[i], ) @@ -148,17 +149,17 @@ def from_pretrained( """Create From pre-trained embeddings from a Dataset or DataFrame. Parameters ---------- - data : A list of numpy.array or A list of dict {"indice": numpy.array, "values": numpy.array} + data : + A list of numpy.array or A list of dict {"indice": numpy.array, "values": numpy.array} trainable : bool Whether the layer should be trained or not. name : str The name of the layer. """ - vocab_size = [] weights = [] for i, item in enumerate(data): if use_dynamic_variable: - if isinstance(item, dict) and item.has_key("indice") and item.has_key("values"): + if isinstance(item, dict) and "indice" in item and "values" in item: weights.append([item["indice"], item["values"]]) else: raise ValueError("DynamicVariable should be initialized with indice and values") From a4e3ffcb6f5738373a4902995bf2ba101890309e Mon Sep 17 00:00:00 2001 From: wenjingz Date: Tue, 27 Dec 2022 06:55:45 -0800 Subject: [PATCH 12/34] fix some import and param bug --- merlin/models/tf/distributed/embedding.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 2999b98884..14d0536077 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -1,6 +1,7 @@ from typing import Union -import sparse_operation_kit as sok +#import sparse_operation_kit as sok +from sparse_operation_kit import experiment as sok import tensorflow as tf from merlin.models.tf.inputs.embedding import EmbeddingTableBase @@ -51,7 +52,9 @@ def __init__( initializer: Union[str, tf.Tensor, list] = "uniform", use_dynamic_variable=False, localized=None, + trainable=True, name=None, + dtype=None, **kwargs, ): super(SOKEmbedding, self).__init__( @@ -74,7 +77,7 @@ def __init__( v = sok.Variable(initializer[i]) else: for i in range(len(vocab_sizes)): - if _use_dynamic_variable: + if self._use_dynamic_variable: if isinstance(initializer, str): v = sok.DynamicVariable( dimension=self._embedding_vec_size, initializer=initializer From 97d51f585f7f1d9f00f45012db169f8963bf6c81 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Tue, 27 Dec 2022 23:54:47 -0800 Subject: [PATCH 13/34] remove some unused variable --- merlin/models/tf/distributed/embedding.py | 1 + tests/unit/tf/horovod/test_embedding.py | 7 ------- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 14d0536077..9f984876b1 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -141,6 +141,7 @@ def call(self, inputs, combiners, training=True): @classmethod def from_pretrained( cls, + dim: int, data: list, trainable=True, name=None, diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 921eb0866f..ef0be03342 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -30,9 +30,6 @@ def test_sok_dynamic_variables(self, dim): rows = [65536 * 10, 65536] cols = [128, 4] - combiners = ["sum", "sum"] - batch_size = 65536 - iters = 100 initial_vals = [13, 17] # sok variables @@ -75,10 +72,6 @@ def test_distributed_variables(self, dim): rows = [65536 * 10, 65536] cols = [128, 4] - hotness = [10, 3] - combiners = ["sum", "sum"] - batch_size = 65536 - iters = 100 initial_vals = [13, 17] # initial value of embedding table From b978afac5be48c17e1a18571de2616a5a5c27e1f Mon Sep 17 00:00:00 2001 From: wenjingz Date: Tue, 27 Dec 2022 23:59:33 -0800 Subject: [PATCH 14/34] remove intial vals --- tests/unit/tf/horovod/test_embedding.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index ef0be03342..7bf11aed52 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -72,7 +72,6 @@ def test_distributed_variables(self, dim): rows = [65536 * 10, 65536] cols = [128, 4] - initial_vals = [13, 17] # initial value of embedding table weights = [] From 4f22c0f8b7b59d08718c8a593eff2de43f22bab6 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Wed, 28 Dec 2022 00:07:48 -0800 Subject: [PATCH 15/34] fix import --- merlin/models/tf/distributed/embedding.py | 1 - 1 file changed, 1 deletion(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 9f984876b1..857c5dc038 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -1,6 +1,5 @@ from typing import Union -#import sparse_operation_kit as sok from sparse_operation_kit import experiment as sok import tensorflow as tf From 16fb41490d61ef56088c774e4a7351b650d51bcf Mon Sep 17 00:00:00 2001 From: wenjingz Date: Wed, 28 Dec 2022 00:12:18 -0800 Subject: [PATCH 16/34] reorder the import --- merlin/models/tf/distributed/embedding.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 857c5dc038..3186e7e9a7 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -1,7 +1,7 @@ from typing import Union -from sparse_operation_kit import experiment as sok import tensorflow as tf +from sparse_operation_kit import experiment as sok from merlin.models.tf.inputs.embedding import EmbeddingTableBase from merlin.models.utils.schema_utils import ( From f41f52fd2efa293ca5c0156d9304c56373afd705 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Thu, 12 Jan 2023 01:57:45 -0800 Subject: [PATCH 17/34] fix import error in test embedding --- tests/unit/tf/horovod/test_embedding.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 7bf11aed52..a22c4369e9 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -6,9 +6,11 @@ from merlin.schema import ColumnSchema, Tags hvd = pytest.importorskip("horovod.tensorflow") -sok = pytest.importorskip("sparse_operation_kit") - +sparse_operation_kit = pytest.importorskip("sparse_operation_kit") +from sparse_operation_kit import experiment as sok +hvd.init() +sok.init() class TestSOKEmbedding: sample_column_schema = ColumnSchema( "item_id", @@ -25,8 +27,6 @@ def test_raises_with_invalid_schema(self): @pytest.mark.parametrize("dim", [16, 32]) def test_sok_dynamic_variables(self, dim): - hvd.init() - sok.init() rows = [65536 * 10, 65536] cols = [128, 4] @@ -67,9 +67,6 @@ def test_sok_dynamic_variables(self, dim): @pytest.mark.parametrize("dim", [16, 32]) def test_distributed_variables(self, dim): - hvd.init() - sok.init() - rows = [65536 * 10, 65536] cols = [128, 4] From fe34f9dc974c251df6357d068825fef15e1c6768 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Thu, 12 Jan 2023 18:55:41 -0800 Subject: [PATCH 18/34] format the code --- tests/unit/tf/horovod/test_embedding.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index a22c4369e9..18b3fe6c4c 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -11,6 +11,8 @@ hvd.init() sok.init() + + class TestSOKEmbedding: sample_column_schema = ColumnSchema( "item_id", From 98bb17b2caa619ee99bd90173bec455cf3588d35 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Thu, 12 Jan 2023 23:00:50 -0800 Subject: [PATCH 19/34] change the way of import --- tests/unit/tf/horovod/test_embedding.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 18b3fe6c4c..1ecebeef29 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -6,8 +6,7 @@ from merlin.schema import ColumnSchema, Tags hvd = pytest.importorskip("horovod.tensorflow") -sparse_operation_kit = pytest.importorskip("sparse_operation_kit") -from sparse_operation_kit import experiment as sok +sok = pytest.importorskip("sparse_operation_kit.experiment") hvd.init() sok.init() From c3373e8ccb46b65bfe9c8fa780b11cfdffa25812 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Wed, 15 Feb 2023 06:59:25 -0800 Subject: [PATCH 20/34] support sp_weights in lookup --- merlin/models/tf/distributed/embedding.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 3186e7e9a7..303cfef6f3 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -131,9 +131,10 @@ def call(self, inputs, combiners, training=True): if not len(cur_input.shape) == 2: raise ValueError("The rank of input RaggedTensor must be 2") emb_vectors = sok.lookup_sparse( - self._vars, - inputs, - combiners, + params=self._vars, + sp_ids=inputs, + sp_weights=None, + combiners=combiners, ) return emb_vectors From d7531f6cfe70b0cfe0056ff8986ffba1826adb95 Mon Sep 17 00:00:00 2001 From: edknv <109497216+edknv@users.noreply.github.com> Date: Wed, 15 Feb 2023 09:34:40 -0800 Subject: [PATCH 21/34] Add unit tests for SOKEmbedding (#980) * Add unit tests for SOKEmbedding * pip install sparse_operation_kit --- merlin/models/tf/distributed/backend.py | 15 +++ merlin/models/tf/distributed/embedding.py | 8 +- tests/unit/tf/horovod/test_embedding.py | 112 ++++------------------ tox.ini | 1 + 4 files changed, 43 insertions(+), 93 deletions(-) diff --git a/merlin/models/tf/distributed/backend.py b/merlin/models/tf/distributed/backend.py index b696713d1e..b69a99252d 100644 --- a/merlin/models/tf/distributed/backend.py +++ b/merlin/models/tf/distributed/backend.py @@ -1,6 +1,10 @@ hvd = None hvd_installed = False +sok = None +sok_installed = False + + try: import horovod.tensorflow.keras as hvd # noqa: F401 @@ -11,3 +15,14 @@ if hvd_installed: hvd.init() + +try: + from sparse_operation_kit import experiment as sok # noqa: F401 + + sok_installed = True +except ImportError: + pass + + +if sok_installed: + sok.init() diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 303cfef6f3..271cf63698 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -1,8 +1,8 @@ from typing import Union import tensorflow as tf -from sparse_operation_kit import experiment as sok +from merlin.models.tf.distributed.backend import hvd_installed, sok, sok_installed from merlin.models.tf.inputs.embedding import EmbeddingTableBase from merlin.models.utils.schema_utils import ( schema_to_tensorflow_metadata_json, @@ -56,6 +56,12 @@ def __init__( dtype=None, **kwargs, ): + if not hvd_installed or not sok_installed: + raise ImportError( + "'horovod' and 'sparse_operation_kit' are required to use " + f"{self.__class__.__name__}." + ) + super(SOKEmbedding, self).__init__( dim, *col_schemas, trainable=trainable, name=name, dtype=dtype, **kwargs ) diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 1ecebeef29..041fe2f8b9 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -1,16 +1,9 @@ import numpy as np -import pytest import tensorflow as tf -import merlin.models.tf as mm +from merlin.models.tf.distributed.embedding import SOKEmbedding from merlin.schema import ColumnSchema, Tags -hvd = pytest.importorskip("horovod.tensorflow") -sok = pytest.importorskip("sparse_operation_kit.experiment") - -hvd.init() -sok.init() - class TestSOKEmbedding: sample_column_schema = ColumnSchema( @@ -20,87 +13,22 @@ class TestSOKEmbedding: tags=[Tags.CATEGORICAL], ) - def test_raises_with_invalid_schema(self): - column_schema = ColumnSchema("item_id") - with pytest.raises(ValueError) as exc_info: - mm.EmbeddingTable(16, column_schema) - assert "needs to have an int-domain" in str(exc_info.value) - - @pytest.mark.parametrize("dim", [16, 32]) - def test_sok_dynamic_variables(self, dim): - - rows = [65536 * 10, 65536] - cols = [128, 4] - initial_vals = [13, 17] - - # sok variables - sok_vars = [ - sok.DynamicVariable(dimension=cols[i], initializer=str(initial_vals[i])) - for i in range(len(cols)) - ] - local_indices = [] - for row in rows: - local_size = row // hvd.size() - if hvd.rank() < row % hvd.size(): - local_size += 1 - indices = np.arange(local_size) * hvd.size() + hvd.rank() - indices = tf.convert_to_tensor(indices, dtype=tf.int64) - local_indices.append(indices) - out1 = [] - for i in range(len(sok_vars)): - out1.append(tf.nn.embedding_lookup(sok_vars[i], local_indices[i])) - - tf_vars = [ - tf.Variable(tf.constant(initial_vals[i], shape=[rows[i], cols[i]], dtype=tf.float32)) - for i in range(len(rows)) - ] - out2 = [] - for i, v in enumerate(tf_vars): - out2.append(tf.nn.embedding_lookup(v, local_indices[i])) - - # Check results - diff = 0 - for i in range(len(out1)): - length = out1[i] ** 2 + out2[i] ** 2 + 1e-8 - diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) - print("[SOK INFO] diff:", diff) - assert diff < 1e-6 - - @pytest.mark.parametrize("dim", [16, 32]) - def test_distributed_variables(self, dim): - rows = [65536 * 10, 65536] - cols = [128, 4] - - # initial value of embedding table - weights = [] - for i in range(len(rows)): - weight = np.random.rand(rows[i], cols[i]).astype(np.float32) - weight = tf.convert_to_tensor(weight, dtype=tf.float32) - # make sure the weight is same on each rank - weight = hvd.allreduce(weight) - weights.append(weight) - - # sok variables - sok_vars = [sok.Variable(w) for w in weights] - local_indices = [] - for row in rows: - local_size = row // hvd.size() - if hvd.rank() < row % hvd.size(): - local_size += 1 - indices = np.arange(local_size) * hvd.size() + hvd.rank() - indices = tf.convert_to_tensor(indices, dtype=tf.int64) - local_indices.append(indices) - - out1 = sok_vars - tf_vars = [tf.Variable(w) for w in weights] - out2 = [] - for i, v in enumerate(tf_vars): - out2.append(tf.nn.embedding_lookup(v, local_indices[i])) - - # Check results - diff = 0 - for i in range(len(out1)): - length = out1[i] ** 2 + out2[i] ** 2 + 1e-8 - diff = diff + tf.reduce_sum((out1[i] - out2[i]) ** 2 / length) - print("[SOK INFO] diff:", diff) - assert diff < 1e-6 + def test_sok_embedding_basic(self): + embedding = SOKEmbedding(16, self.sample_column_schema, vocab_sizes=[10]) + inputs = [tf.ragged.constant([[0, 1, 0], [1, 0]])] + combiners = ["sum"] + outputs = embedding(inputs, combiners) + assert outputs[0].shape == (2, 16) + + def test_sok_embedding_pretrained(self): + weights = np.random.rand(10, 16) + embedding = SOKEmbedding.from_pretrained(16, [weights]) + inputs = [tf.ragged.constant([[0, 1, 0], [1, 0]])] + combiners = ["sum"] + outputs = embedding(inputs, combiners) + assert outputs[0].shape == (2, 16) + + def test_sok_embedding_config(self): + embedding = SOKEmbedding(16, self.sample_column_schema, vocab_sizes=[10]) + config = embedding.get_config() + _ = SOKEmbedding.from_config(config) diff --git a/tox.ini b/tox.ini index c60664685c..b676ead690 100644 --- a/tox.ini +++ b/tox.ini @@ -35,6 +35,7 @@ commands = conda env create --prefix {envdir}/env --file requirements/horovod-cpu-environment.yml --force {envdir}/env/bin/python -m pip install horovod --no-cache-dir {envdir}/env/bin/horovodrun --check-build + {envdir}/env/bin/python -m pip install sparse_operation_kit {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git From 1e4628d77bdbb922fb8660298b553d41796fe0c9 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 15 Feb 2023 13:25:43 -0800 Subject: [PATCH 22/34] remove sok from ci since no gpu --- merlin/models/tf/distributed/backend.py | 14 +++++++++----- requirements/horovod.txt | 2 +- tests/unit/tf/horovod/test_embedding.py | 3 +++ tox.ini | 1 - 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/merlin/models/tf/distributed/backend.py b/merlin/models/tf/distributed/backend.py index b69a99252d..ab6bad8ff2 100644 --- a/merlin/models/tf/distributed/backend.py +++ b/merlin/models/tf/distributed/backend.py @@ -1,3 +1,6 @@ +from merlin.core.dispatch import HAS_GPU + + hvd = None hvd_installed = False @@ -16,12 +19,13 @@ if hvd_installed: hvd.init() -try: - from sparse_operation_kit import experiment as sok # noqa: F401 +if HAS_GPU: + try: + from sparse_operation_kit import experiment as sok # noqa: F401 - sok_installed = True -except ImportError: - pass + sok_installed = True + except ImportError: + pass if sok_installed: diff --git a/requirements/horovod.txt b/requirements/horovod.txt index e342149c04..269e6b70d4 100644 --- a/requirements/horovod.txt +++ b/requirements/horovod.txt @@ -1,2 +1,2 @@ horovod -merlin-sok +sparse_operation_kit diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 041fe2f8b9..4d80419076 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -1,10 +1,13 @@ import numpy as np +import pytest import tensorflow as tf +from merlin.core.dispatch import HAS_GPU from merlin.models.tf.distributed.embedding import SOKEmbedding from merlin.schema import ColumnSchema, Tags +@pytest.mark.skipif(not HAS_GPU, reason="No GPU available") class TestSOKEmbedding: sample_column_schema = ColumnSchema( "item_id", diff --git a/tox.ini b/tox.ini index b676ead690..c60664685c 100644 --- a/tox.ini +++ b/tox.ini @@ -35,7 +35,6 @@ commands = conda env create --prefix {envdir}/env --file requirements/horovod-cpu-environment.yml --force {envdir}/env/bin/python -m pip install horovod --no-cache-dir {envdir}/env/bin/horovodrun --check-build - {envdir}/env/bin/python -m pip install sparse_operation_kit {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git From 3c0104ed14824a5c7621abf20173f67c7f081e90 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 15 Feb 2023 22:56:06 -0800 Subject: [PATCH 23/34] lint --- merlin/models/tf/distributed/backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/merlin/models/tf/distributed/backend.py b/merlin/models/tf/distributed/backend.py index ab6bad8ff2..6d8a499d0f 100644 --- a/merlin/models/tf/distributed/backend.py +++ b/merlin/models/tf/distributed/backend.py @@ -1,6 +1,5 @@ from merlin.core.dispatch import HAS_GPU - hvd = None hvd_installed = False From b716adc24ec3fc9ff1809b92de3cd1fa5a747c35 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 8 Mar 2023 10:07:28 -0800 Subject: [PATCH 24/34] pip install sparse_operation_kit in tox.ini --- tox.ini | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tox.ini b/tox.ini index c05e574fef..decb05b552 100644 --- a/tox.ini +++ b/tox.ini @@ -29,8 +29,6 @@ commands = ; Runs GPU-based tests. allowlist_externals = horovodrun -deps = - -rrequirements/test.txt passenv = OPAL_PREFIX setenv = @@ -40,7 +38,11 @@ commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git@{posargs:main} - horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh pytest -m horovod -rxs tests/unit + # TODO: Move SOK installtion to ci-runner dockerfile + # Install SOK + python -m pip install sparse_operation_kit + # Run multi-gpu tests marked with `horovod` marker + horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh python -m pytest -m horovod -rxs tests/unit [testenv:py38-horovod-cpu] setenv = From b1c1031799f83d91215063dd977ecc1ec3a221c2 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 8 Mar 2023 10:10:52 -0800 Subject: [PATCH 25/34] fix spelling --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index decb05b552..cb1ed89d22 100644 --- a/tox.ini +++ b/tox.ini @@ -38,7 +38,7 @@ commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git@{posargs:main} - # TODO: Move SOK installtion to ci-runner dockerfile + # TODO: Move SOK installation to ci-runner dockerfile # Install SOK python -m pip install sparse_operation_kit # Run multi-gpu tests marked with `horovod` marker From 91f25d18cf578eda83f776f13f99fe9b566e37b7 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Sun, 12 Mar 2023 07:10:58 -0700 Subject: [PATCH 26/34] resolve init method issue --- merlin/models/tf/distributed/embedding.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 271cf63698..d87d0357f0 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -69,6 +69,7 @@ def __init__( self._vocab_sizes = vocab_sizes self._use_dynamic_variable = use_dynamic_variable self._localized = localized + self._initializer = initialzier self._vars = [] if self._localized is None and self._use_dynamic_variable is False: for i in range(len(vocab_sizes)): @@ -148,6 +149,7 @@ def call(self, inputs, combiners, training=True): def from_pretrained( cls, dim: int, + vocab_sizes: list, data: list, trainable=True, name=None, @@ -156,7 +158,7 @@ def from_pretrained( localized=None, **kwargs, ) -> "SOKEmbedding": - """Create From pre-trained embeddings from a Dataset or DataFrame. + """Create From pre-trained embeddings from a Dataset. Parameters ---------- data : @@ -179,6 +181,7 @@ def from_pretrained( return cls( dim, col_schema, + vocab_sizes=vocab_size name=name, initializer=weights, use_dynamic_variable=use_dynamic_variable, From d0671c9a2b442eb66078b1cb0c3aadf54e0db917 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Sun, 12 Mar 2023 07:48:07 -0700 Subject: [PATCH 27/34] fix schema issue --- merlin/models/tf/distributed/embedding.py | 12 +++++++++--- tests/unit/tf/horovod/test_embedding.py | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index d87d0357f0..32f2f19e87 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -5,6 +5,7 @@ from merlin.models.tf.distributed.backend import hvd_installed, sok, sok_installed from merlin.models.tf.inputs.embedding import EmbeddingTableBase from merlin.models.utils.schema_utils import ( + create_categorical_column, schema_to_tensorflow_metadata_json, tensorflow_metadata_json_to_schema, ) @@ -69,7 +70,7 @@ def __init__( self._vocab_sizes = vocab_sizes self._use_dynamic_variable = use_dynamic_variable self._localized = localized - self._initializer = initialzier + self._initializer = initializer self._vars = [] if self._localized is None and self._use_dynamic_variable is False: for i in range(len(vocab_sizes)): @@ -140,7 +141,6 @@ def call(self, inputs, combiners, training=True): emb_vectors = sok.lookup_sparse( params=self._vars, sp_ids=inputs, - sp_weights=None, combiners=combiners, ) return emb_vectors @@ -168,6 +168,12 @@ def from_pretrained( name : str The name of the layer. """ + + if not col_schema: + if not name: + raise ValueError("`name` is required when not using a ColumnSchema") + col_schema = create_categorical_column(name, num_items - 1) + weights = [] for i, item in enumerate(data): if use_dynamic_variable: @@ -181,7 +187,7 @@ def from_pretrained( return cls( dim, col_schema, - vocab_sizes=vocab_size + vocab_sizes=vocab_sizes, name=name, initializer=weights, use_dynamic_variable=use_dynamic_variable, diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 4d80419076..8651fe2572 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -25,7 +25,7 @@ def test_sok_embedding_basic(self): def test_sok_embedding_pretrained(self): weights = np.random.rand(10, 16) - embedding = SOKEmbedding.from_pretrained(16, [weights]) + embedding = SOKEmbedding.from_pretrained(16, vocab_sizes=[10], data=[weights]) inputs = [tf.ragged.constant([[0, 1, 0], [1, 0]])] combiners = ["sum"] outputs = embedding(inputs, combiners) From 0eb266d0aa477c11ae50ac3ae4d630bae5c87456 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Sun, 12 Mar 2023 08:40:24 -0700 Subject: [PATCH 28/34] add indices and weights in from_pretrained --- tests/unit/tf/horovod/test_embedding.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 8651fe2572..cbe47ff9b5 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -24,14 +24,18 @@ def test_sok_embedding_basic(self): assert outputs[0].shape == (2, 16) def test_sok_embedding_pretrained(self): - weights = np.random.rand(10, 16) - embedding = SOKEmbedding.from_pretrained(16, vocab_sizes=[10], data=[weights]) + weights = {} + indices = np.array([0, 1, 2]) + values = np.arange(3 * 16).reshape(3, 16) + weights["indice"]=indices + weight["values"]=values + embedding = SOKEmbedding.from_pretrained(16, vocab_sizes=[10], data=[weights], name='item_id') inputs = [tf.ragged.constant([[0, 1, 0], [1, 0]])] combiners = ["sum"] outputs = embedding(inputs, combiners) assert outputs[0].shape == (2, 16) def test_sok_embedding_config(self): - embedding = SOKEmbedding(16, self.sample_column_schema, vocab_sizes=[10]) + embedding = SOKEmbedding(16, self.sample_column_schema, vocab_sizes=[10], name= 'item_id') config = embedding.get_config() _ = SOKEmbedding.from_config(config) From e9cc754f0cd73a637367045435c51442dc55de55 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Sun, 12 Mar 2023 08:45:05 -0700 Subject: [PATCH 29/34] schema issue in from_pretrained --- merlin/models/tf/distributed/embedding.py | 2 +- tests/unit/tf/horovod/test_embedding.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 32f2f19e87..c6147f1592 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -172,7 +172,7 @@ def from_pretrained( if not col_schema: if not name: raise ValueError("`name` is required when not using a ColumnSchema") - col_schema = create_categorical_column(name, num_items - 1) + col_schema = create_categorical_column(name, sum(vocab_sizes)-1) weights = [] for i, item in enumerate(data): diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index cbe47ff9b5..d4dcff9fd6 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -28,7 +28,7 @@ def test_sok_embedding_pretrained(self): indices = np.array([0, 1, 2]) values = np.arange(3 * 16).reshape(3, 16) weights["indice"]=indices - weight["values"]=values + weights["values"]=values embedding = SOKEmbedding.from_pretrained(16, vocab_sizes=[10], data=[weights], name='item_id') inputs = [tf.ragged.constant([[0, 1, 0], [1, 0]])] combiners = ["sum"] From 5532ab75de1d9367ceceb9312c4e47bc3d6f0169 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Sun, 12 Mar 2023 08:52:51 -0700 Subject: [PATCH 30/34] init method in DET --- merlin/models/tf/distributed/embedding.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index c6147f1592..8bf5bfac40 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -90,6 +90,9 @@ def __init__( dimension=self._embedding_vec_size, initializer=initializer ) else: + v = sok.DynamicVariable( + dimension=self._embedding_vec_size, initializer='random' + ) indices = tf.convert_to_tensor(initializer[i][0]) values = tf.convert_to_tensor(initializer[i][1]) sok.assign(v, indices, values) @@ -154,7 +157,7 @@ def from_pretrained( trainable=True, name=None, col_schema=None, - use_dynamic_variable=False, + use_dynamic_variable=True, localized=None, **kwargs, ) -> "SOKEmbedding": From c9d3baf48950849779b67e3f3656835f860a1569 Mon Sep 17 00:00:00 2001 From: wenjingz Date: Sun, 12 Mar 2023 08:55:25 -0700 Subject: [PATCH 31/34] tensor type in sok assing --- merlin/models/tf/distributed/embedding.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 8bf5bfac40..c267d3cae6 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -93,8 +93,8 @@ def __init__( v = sok.DynamicVariable( dimension=self._embedding_vec_size, initializer='random' ) - indices = tf.convert_to_tensor(initializer[i][0]) - values = tf.convert_to_tensor(initializer[i][1]) + indices = tf.convert_to_tensor(initializer[i][0], dtype=tf.int64) + values = tf.convert_to_tensor(initializer[i][1], dtype=tf.float32) sok.assign(v, indices, values) elif self._localized is not None: if isinstance(initializer, str): From 33abba017fb319eda35bb5ecbaf94711dc89d6ae Mon Sep 17 00:00:00 2001 From: wenjingz Date: Wed, 15 Mar 2023 02:59:35 -0700 Subject: [PATCH 32/34] resolve config passing --- merlin/models/tf/distributed/embedding.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index c267d3cae6..0c4b6c2030 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -216,9 +216,17 @@ def get_config(self): def from_config(cls, config): dim = config.pop("dim") schema = tensorflow_metadata_json_to_schema(config.pop("schema")) - vocab_size = config["vocab_sizes"] - initializer = config["initializer"] - use_dynamic_variable = config["use_dynamic_variable"] - localized = config["localized"] + vocab_size = config.pop("vocab_sizes") + initializer = config.pop("initializer") + use_dynamic_variable = config.pop("use_dynamic_variable") + localized = config.pop("localized") - return cls(dim, *schema, vocab_size, initializer, use_dynamic_variable, localized, **config) + return cls( + dim, + *schema, + vocab_sizes=vocab_size, + initializer=initializer, + use_dynamic_variable=use_dynamic_variable, + localized=localized, + **config, + ) From b2a01c2c5e43885ca2cf0a6a575f6382be590015 Mon Sep 17 00:00:00 2001 From: edknv Date: Thu, 16 Mar 2023 23:42:30 -0700 Subject: [PATCH 33/34] lint --- merlin/models/tf/distributed/embedding.py | 6 +++--- tests/unit/tf/horovod/test_embedding.py | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 0c4b6c2030..c9d4244c93 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -91,7 +91,7 @@ def __init__( ) else: v = sok.DynamicVariable( - dimension=self._embedding_vec_size, initializer='random' + dimension=self._embedding_vec_size, initializer="random" ) indices = tf.convert_to_tensor(initializer[i][0], dtype=tf.int64) values = tf.convert_to_tensor(initializer[i][1], dtype=tf.float32) @@ -175,7 +175,7 @@ def from_pretrained( if not col_schema: if not name: raise ValueError("`name` is required when not using a ColumnSchema") - col_schema = create_categorical_column(name, sum(vocab_sizes)-1) + col_schema = create_categorical_column(name, sum(vocab_sizes) - 1) weights = [] for i, item in enumerate(data): @@ -226,7 +226,7 @@ def from_config(cls, config): *schema, vocab_sizes=vocab_size, initializer=initializer, - use_dynamic_variable=use_dynamic_variable, + use_dynamic_variable=use_dynamic_variable, localized=localized, **config, ) diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index d4dcff9fd6..25cfe889e5 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -27,15 +27,17 @@ def test_sok_embedding_pretrained(self): weights = {} indices = np.array([0, 1, 2]) values = np.arange(3 * 16).reshape(3, 16) - weights["indice"]=indices - weights["values"]=values - embedding = SOKEmbedding.from_pretrained(16, vocab_sizes=[10], data=[weights], name='item_id') + weights["indice"] = indices + weights["values"] = values + embedding = SOKEmbedding.from_pretrained( + 16, vocab_sizes=[10], data=[weights], name="item_id" + ) inputs = [tf.ragged.constant([[0, 1, 0], [1, 0]])] combiners = ["sum"] outputs = embedding(inputs, combiners) assert outputs[0].shape == (2, 16) def test_sok_embedding_config(self): - embedding = SOKEmbedding(16, self.sample_column_schema, vocab_sizes=[10], name= 'item_id') + embedding = SOKEmbedding(16, self.sample_column_schema, vocab_sizes=[10], name="item_id") config = embedding.get_config() _ = SOKEmbedding.from_config(config) From 0fd059a5e0a19ff81d0de896b8a5fec1d84d904f Mon Sep 17 00:00:00 2001 From: edknv Date: Fri, 17 Mar 2023 13:24:47 -0700 Subject: [PATCH 34/34] install sok in tox --- .../multi-gpu/install_sparse_operation_kit.sh | 16 ++++++++++++++++ merlin/models/tf/distributed/backend.py | 4 +++- tox.ini | 9 ++++++--- 3 files changed, 25 insertions(+), 4 deletions(-) create mode 100644 examples/usecases/multi-gpu/install_sparse_operation_kit.sh diff --git a/examples/usecases/multi-gpu/install_sparse_operation_kit.sh b/examples/usecases/multi-gpu/install_sparse_operation_kit.sh new file mode 100644 index 0000000000..c35143f3a0 --- /dev/null +++ b/examples/usecases/multi-gpu/install_sparse_operation_kit.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -e + +ROOT_DIR=$1 + +cd $ROOT_DIR + +rm -rf hugectr/ + +git clone -b release-23.02 https://github.com/NVIDIA-Merlin/HugeCTR.git hugectr + +cd hugectr/sparse_operation_kit/ +python setup.py install + +rm -rf ${HUGECTR_HOME} diff --git a/merlin/models/tf/distributed/backend.py b/merlin/models/tf/distributed/backend.py index 6d8a499d0f..ec953ed447 100644 --- a/merlin/models/tf/distributed/backend.py +++ b/merlin/models/tf/distributed/backend.py @@ -1,3 +1,5 @@ +import tensorflow as tf + from merlin.core.dispatch import HAS_GPU hvd = None @@ -23,7 +25,7 @@ from sparse_operation_kit import experiment as sok # noqa: F401 sok_installed = True - except ImportError: + except (ImportError, tf.errors.NotFoundError): pass diff --git a/tox.ini b/tox.ini index d7ca23de3f..9ba8649e6f 100644 --- a/tox.ini +++ b/tox.ini @@ -32,21 +32,24 @@ commands = ; Runs GPU-based tests. allowlist_externals = horovodrun + sh #deps = # -rrequirements/test.txt passenv = OPAL_PREFIX setenv = TF_GPU_ALLOCATOR=cuda_malloc_async + CPATH={env:CPATH}{:}{envdir}/hugectr/include + LD_LIBRARY_PATH=${envdir}/hugectr/include/lib{:}/usr/local/lib/python3.8/dist-packages/tensorflow{:}{env:LD_LIBRARY_PATH} + LIBRARY_PATH=${envdir}/hugectr/lib{:}{env:LIBRARY_PATH} sitepackages=true commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git@{posargs:main} - #python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git@{posargs:main} - python -m pip install --upgrade git+https://github.com/bschifferer/dataloader.git@change_output + python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git@{posargs:main} # TODO: Move SOK installation to ci-runner dockerfile # Install SOK - python -m pip install sparse_operation_kit + sh examples/usecases/multi-gpu/install_sparse_operation_kit.sh {envdir} # Run multi-gpu tests marked with `horovod` marker horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh python -m pytest -m horovod -rxs tests/unit