Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Runtimes and Executors to serve NVT Workflows #320

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion merlin/systems/dag/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from merlin.core.protocols import Transformable
from merlin.dag import Graph
from merlin.systems.dag.runtimes.base_runtime import Runtime
from merlin.systems.dag.runtimes.triton import TritonExecutorRuntime


Expand Down Expand Up @@ -74,7 +75,7 @@ def transform(self, transformable: Transformable, runtime=None):
Transformable
transformed data
"""
runtime = runtime or TritonExecutorRuntime()
runtime = runtime or Runtime()
return runtime.transform(self.graph, transformable)

def save(self, path):
Expand Down
2 changes: 1 addition & 1 deletion merlin/systems/dag/runtimes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# flake8: noqa

# flake8: noqa
from .base_runtime import Runtime
38 changes: 34 additions & 4 deletions merlin/systems/dag/runtimes/base_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
# limitations under the License.
#
from merlin.core.protocols import Transformable
from merlin.dag import Graph
from merlin.dag import Graph, postorder_iter_nodes
from merlin.dag.executors import LocalExecutor
from merlin.systems.dag.runtimes.op_table import OpTable


class Runtime:
"""A Systems Graph Runtime.
"""
A runtime for Merlin DAGs that supports using custom implementations of existing operators

This class can be used as a base class for custom runtimes.
"""
Expand All @@ -33,9 +35,32 @@ def __init__(self, executor=None):
The Graph Executor to use to use for the transform, by default None
"""
self.executor = executor or LocalExecutor()
self.op_table = {}
self.op_table = OpTable()

def convert(self, graph: Graph):
"""
Replace the operators in the supplied graph with ops from this runtime's op table

Parameters
----------
graph : Graph
Graph of nodes container operator chains for data manipulation.

def transform(self, graph: Graph, transformable: Transformable):
Returns
-------
Graph
Copy of the graph with operators converted to this runtime's versions
"""
if not self.op_table.empty:
nodes = list(postorder_iter_nodes(graph.output_node))

for node in nodes:
if self.op_table.has_impl(node.op):
node.op = self.op_table.replace(node.op)

return graph

def transform(self, graph: Graph, transformable: Transformable, convert=True):
"""Run the graph with the input data.

Parameters
Expand All @@ -44,12 +69,17 @@ def transform(self, graph: Graph, transformable: Transformable):
Graph of nodes container operator chains for data manipulation.
transformable : Transformable
Input data to transform in graph.
convert: bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the scenario where we'd want to pass convert=False?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The split between convert and transform allows you to do the conversion ahead of time manually and then pass convert=False in order to avoid doing it every time the transform runs as a small optimization. We put this in so that it was possible to maintain the previous behavior where the conversion only happens once, while also offering the convenience of doing it automatically in the cases where you're not worried about the perf implications.

If True, converts the operators in the graph to this runtime's versions

Returns
-------
Transformable
Input data after it has been transformed via graph.
"""
if convert:
graph = self.convert(graph)

return self.executor.transform(transformable, [graph.output_node])

def export(self):
Expand Down
15 changes: 15 additions & 0 deletions merlin/systems/dag/runtimes/nvtabular/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Loading