From b9efeeac9b9e40d6f1b8a1e1fa70ff12a7d6ce5a Mon Sep 17 00:00:00 2001 From: William Benton Date: Wed, 21 Dec 2022 14:59:59 -0600 Subject: [PATCH 1/6] Allow users to specify module serialization hints This commit adds an optional parameter to Workflow.save so that users can indicate that certain modules should be serialized by value. This is necessary if a LambdaOp in a workflow depends on a module whose source file will not be available in the deployment environment. Related to #1737. --- nvtabular/workflow/workflow.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/nvtabular/workflow/workflow.py b/nvtabular/workflow/workflow.py index 312c9e8cdbd..a9a5da9c20f 100755 --- a/nvtabular/workflow/workflow.py +++ b/nvtabular/workflow/workflow.py @@ -255,13 +255,17 @@ def _transform_df(self, df): return LocalExecutor().transform(df, self.output_node, self.output_dtypes) - def save(self, path): + def save(self, path, modules_byvalue=[]): """Save this workflow to disk Parameters ---------- path: str The path to save the workflow to + modules_byvalue: + A list of modules that should be serialized by value — this + should include any modules that will not be available on + the host where this workflow is ultimately deserialized. """ # avoid a circular import getting the version from nvtabular import __version__ as nvt_version @@ -290,6 +294,10 @@ def save(self, path): o, ) + # direct cloudpickle to serialize selected modules by value + for m in modules_byvalue: + cloudpickle.register_pickle_by_value(m) + # dump out the full workflow (graph/stats/operators etc) using cloudpickle with fs.open(fs.sep.join([path, "workflow.pkl"]), "wb") as o: cloudpickle.dump(self, o) From 0e0bf20081feaf64b1e3d78438223c34747850c8 Mon Sep 17 00:00:00 2001 From: William Benton Date: Thu, 22 Dec 2022 22:25:10 -0600 Subject: [PATCH 2/6] Adds automatic inference of LambdaOp module dependencies This commit adds code to automatically infer LambdaOp module dependencies in several common cases: 1. in which a function is passed to LambdaOp by name, 2. in which the argument to LambdaOp is a lambda expression that refers to a function by a fully-qualified name, and 3. in which the argument to LambdaOp is a lambda expression that refers to a function via another variable The current implementation does not inspect the bodies of any function passed to LambdaOp, and many corner cases are (necessarily) omitted. However, this support should be complete enough to be useful for many users. Automatic inference is optional (via a parameter to Workflow.save) but it could be the default in the future. Related to issue #1737. --- nvtabular/workflow/workflow.py | 80 +++++++++++++++++++++++++++++++--- 1 file changed, 75 insertions(+), 5 deletions(-) diff --git a/nvtabular/workflow/workflow.py b/nvtabular/workflow/workflow.py index a9a5da9c20f..56f24b6a976 100755 --- a/nvtabular/workflow/workflow.py +++ b/nvtabular/workflow/workflow.py @@ -13,11 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +import inspect import json import logging import sys import time import warnings +import types from typing import TYPE_CHECKING, Optional import cloudpickle @@ -30,10 +33,11 @@ import pandas as pd from merlin.dag import Graph +from merlin.dag.node import iter_nodes from merlin.dag.executors import DaskExecutor, LocalExecutor from merlin.io import Dataset from merlin.schema import Schema -from nvtabular.ops import StatOperator +from nvtabular.ops import StatOperator, LambdaOp from nvtabular.workflow.node import WorkflowNode LOG = logging.getLogger("nvtabular") @@ -255,7 +259,52 @@ def _transform_df(self, df): return LocalExecutor().transform(df, self.output_node, self.output_dtypes) - def save(self, path, modules_byvalue=[]): + @classmethod + def _getmodules(cls, fs): + """ + Returns an imprecise but useful approximation of the list of modules + necessary to execute a given list of functions. This approximation is + sound (all modules listed are required by the supplied functions) but not + necessarily complete (not all modules required will necessarily be returned). + + For function literals (lambda expressions), this returns + 1. the names of every module referenced in the lambda expression, e.g., + `m` for `lambda x: m.f(x)` and + 2. the names of the declaring module for every function referenced in + the lambda expression, e.g. `m` for `import m.f; lambda x: f(x)` + + For declared functions, this returns the names of their declaring modules. + + The return value will exclude all built-in modules and (on Python 3.10 or later) + all standard library modules. + """ + result = set() + + exclusions = set(sys.builtin_module_names) + if hasattr(sys, "stdlib_module_names"): + # sys.stdlib_module_names is only available in Python 3.10 and beyond + exclusions = exclusions | sys.stdlib_module_names + + for f in fs: + if f.__name__ == '': + closurevars = inspect.getclosurevars(f) + for vars in [inspect.getclosurevars(f).globals, inspect.getclosurevars(f).nonlocals]: + for name, val in vars.items(): + print(f"{name} = {val}") + if isinstance(val, types.ModuleType): + result.add(val) + elif isinstance(val, types.FunctionType): + mod = inspect.getmodule(val) + if mod is not None: + result.add(mod) + else: + mod = inspect.getmodule(f) + if mod is not None: + result.add(mod) + + return [mod for mod in result if mod.__name__ not in exclusions] + + def save(self, path, modules_byvalue=None): """Save this workflow to disk Parameters @@ -263,9 +312,13 @@ def save(self, path, modules_byvalue=[]): path: str The path to save the workflow to modules_byvalue: - A list of modules that should be serialized by value — this + A list of modules that should be serialized by value. This should include any modules that will not be available on the host where this workflow is ultimately deserialized. + + In lieu of an explicit list, pass None to serialize all modules + by reference or pass "auto" to use a heuristic to infer which + modules to serialize by value. """ # avoid a circular import getting the version from nvtabular import __version__ as nvt_version @@ -295,8 +348,25 @@ def save(self, path, modules_byvalue=[]): ) # direct cloudpickle to serialize selected modules by value - for m in modules_byvalue: - cloudpickle.register_pickle_by_value(m) + if modules_byvalue is None: + modules_byvalue = [] + elif modules_byvalue == "auto": + l_nodes = self.graph.get_nodes_by_op_type( + list(iter_nodes([self.graph.output_node])), LambdaOp) + + try: + modules_byvalue = Workflow._getmodules([ln.op.f for ln in l_nodes]) + except RuntimeError as ex: + warnings.warn(f"Failed to automatically infer modules to serialize by value. Reason given was \"{str(ex)}\"") + + try: + for m in modules_byvalue: + if isinstance(m, types.ModuleType): + cloudpickle.register_pickle_by_value(m) + elif isinstance(m, str) and m in sys.modules: + cloudpickle.register_pickle_by_value(sys.modules[m]) + except RuntimeError as ex: + warnings.warn(f"Failed to register modules to serialize by value. Reason given was \"{str(ex)}\"") # dump out the full workflow (graph/stats/operators etc) using cloudpickle with fs.open(fs.sep.join([path, "workflow.pkl"]), "wb") as o: From 8e04cdc4f89cd92423a2376ac041dc426394f722 Mon Sep 17 00:00:00 2001 From: William Benton Date: Thu, 22 Dec 2022 22:28:10 -0600 Subject: [PATCH 3/6] Added tests related to issue #1737 --- tests/unit/workflow/test_workflow.py | 68 ++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/tests/unit/workflow/test_workflow.py b/tests/unit/workflow/test_workflow.py index 136fd3eaecf..866cec1e268 100755 --- a/tests/unit/workflow/test_workflow.py +++ b/tests/unit/workflow/test_workflow.py @@ -18,6 +18,7 @@ import math import os import shutil +import sys try: import cudf @@ -667,3 +668,70 @@ def test_workflow_saved_schema(tmpdir): for node in postorder_iter_nodes(workflow2.output_node): assert node.input_schema is not None assert node.output_schema is not None + +def test_workflow_infer_modules_byvalue(tmp_path): + module_fn = tmp_path / "not_a_real_module.py" + sys.path.append(str(tmp_path)) + + with open(module_fn, "w") as module_f: + module_f.write("def identity(col):\n return col") + + import not_a_real_module + + f_0 = not_a_real_module.identity + f_1 = lambda x: not_a_real_module.identity(x) + f_2 = lambda x: f_0(x) + + try: + for fn, f in {"not_a_real_module.identity" : f_0, + "lambda x: not_a_real_module.identity(x)" : f_1, + "lambda x: f_0(x)" : f_2}.items(): + assert(not_a_real_module in Workflow._getmodules([f]), f"inferred module dependencies from {fn}") + + finally: + sys.path.pop() + del sys.modules["not_a_real_module"] + + +def test_workflow_explicit_modules_byvalue(tmp_path): + module_fn = tmp_path / "not_a_real_module.py" + sys.path.append(str(tmp_path)) + + with open(module_fn, "w") as module_f: + module_f.write("def identity(col):\n return col") + + import not_a_real_module + + wf = nvt.Workflow( + ["col_a"] >> nvt.ops.LambdaOp(not_a_real_module.identity) + ) + + wf.save(str(tmp_path / "identity-workflow"), modules_byvalue=[not_a_real_module]) + + del not_a_real_module + del sys.modules["not_a_real_module"] + os.unlink(str(tmp_path / "not_a_real_module.py")) + + Workflow.load(str(tmp_path / "identity-workflow")) + +def test_workflow_auto_infer_modules_byvalue(tmp_path): + module_fn = tmp_path / "not_a_real_module.py" + sys.path.append(str(tmp_path)) + + with open(module_fn, "w") as module_f: + module_f.write("def identity(col):\n return col") + + import not_a_real_module + + wf = nvt.Workflow( + ["col_a"] >> nvt.ops.LambdaOp(not_a_real_module.identity) + ) + + wf.save(str(tmp_path / "identity-workflow"), modules_byvalue="auto") + + del not_a_real_module + del sys.modules["not_a_real_module"] + os.unlink(str(tmp_path / "not_a_real_module.py")) + + Workflow.load(str(tmp_path / "identity-workflow")) + From ae23ba3e36df2fc8d10ab4f1b43cf6f4b7233829 Mon Sep 17 00:00:00 2001 From: Karl Higley Date: Wed, 28 Dec 2022 13:28:33 -0500 Subject: [PATCH 4/6] Fix linter errors --- nvtabular/workflow/workflow.py | 46 ++++++++++++++++------------ tests/unit/workflow/test_workflow.py | 35 +++++++++++---------- 2 files changed, 45 insertions(+), 36 deletions(-) diff --git a/nvtabular/workflow/workflow.py b/nvtabular/workflow/workflow.py index 56f24b6a976..3b4457960c7 100755 --- a/nvtabular/workflow/workflow.py +++ b/nvtabular/workflow/workflow.py @@ -19,8 +19,8 @@ import logging import sys import time -import warnings import types +import warnings from typing import TYPE_CHECKING, Optional import cloudpickle @@ -33,11 +33,11 @@ import pandas as pd from merlin.dag import Graph -from merlin.dag.node import iter_nodes from merlin.dag.executors import DaskExecutor, LocalExecutor +from merlin.dag.node import iter_nodes from merlin.io import Dataset from merlin.schema import Schema -from nvtabular.ops import StatOperator, LambdaOp +from nvtabular.ops import LambdaOp, StatOperator from nvtabular.workflow.node import WorkflowNode LOG = logging.getLogger("nvtabular") @@ -262,12 +262,12 @@ def _transform_df(self, df): @classmethod def _getmodules(cls, fs): """ - Returns an imprecise but useful approximation of the list of modules - necessary to execute a given list of functions. This approximation is + Returns an imprecise but useful approximation of the list of modules + necessary to execute a given list of functions. This approximation is sound (all modules listed are required by the supplied functions) but not necessarily complete (not all modules required will necessarily be returned). - For function literals (lambda expressions), this returns + For function literals (lambda expressions), this returns 1. the names of every module referenced in the lambda expression, e.g., `m` for `lambda x: m.f(x)` and 2. the names of the declaring module for every function referenced in @@ -286,10 +286,12 @@ def _getmodules(cls, fs): exclusions = exclusions | sys.stdlib_module_names for f in fs: - if f.__name__ == '': - closurevars = inspect.getclosurevars(f) - for vars in [inspect.getclosurevars(f).globals, inspect.getclosurevars(f).nonlocals]: - for name, val in vars.items(): + if f.__name__ == "": + for closurevars in [ + inspect.getclosurevars(f).globals, + inspect.getclosurevars(f).nonlocals, + ]: + for name, val in closurevars.items(): print(f"{name} = {val}") if isinstance(val, types.ModuleType): result.add(val) @@ -312,12 +314,12 @@ def save(self, path, modules_byvalue=None): path: str The path to save the workflow to modules_byvalue: - A list of modules that should be serialized by value. This + A list of modules that should be serialized by value. This should include any modules that will not be available on the host where this workflow is ultimately deserialized. - - In lieu of an explicit list, pass None to serialize all modules - by reference or pass "auto" to use a heuristic to infer which + + In lieu of an explicit list, pass None to serialize all modules + by reference or pass "auto" to use a heuristic to infer which modules to serialize by value. """ # avoid a circular import getting the version @@ -352,13 +354,17 @@ def save(self, path, modules_byvalue=None): modules_byvalue = [] elif modules_byvalue == "auto": l_nodes = self.graph.get_nodes_by_op_type( - list(iter_nodes([self.graph.output_node])), LambdaOp) - + list(iter_nodes([self.graph.output_node])), LambdaOp + ) + try: modules_byvalue = Workflow._getmodules([ln.op.f for ln in l_nodes]) except RuntimeError as ex: - warnings.warn(f"Failed to automatically infer modules to serialize by value. Reason given was \"{str(ex)}\"") - + warnings.warn( + "Failed to automatically infer modules to serialize by value. " + f'Reason given was "{str(ex)}"' + ) + try: for m in modules_byvalue: if isinstance(m, types.ModuleType): @@ -366,7 +372,9 @@ def save(self, path, modules_byvalue=None): elif isinstance(m, str) and m in sys.modules: cloudpickle.register_pickle_by_value(sys.modules[m]) except RuntimeError as ex: - warnings.warn(f"Failed to register modules to serialize by value. Reason given was \"{str(ex)}\"") + warnings.warn( + f'Failed to register modules to serialize by value. Reason given was "{str(ex)}"' + ) # dump out the full workflow (graph/stats/operators etc) using cloudpickle with fs.open(fs.sep.join([path, "workflow.pkl"]), "wb") as o: diff --git a/tests/unit/workflow/test_workflow.py b/tests/unit/workflow/test_workflow.py index 866cec1e268..591256fa09a 100755 --- a/tests/unit/workflow/test_workflow.py +++ b/tests/unit/workflow/test_workflow.py @@ -669,25 +669,30 @@ def test_workflow_saved_schema(tmpdir): assert node.input_schema is not None assert node.output_schema is not None + def test_workflow_infer_modules_byvalue(tmp_path): module_fn = tmp_path / "not_a_real_module.py" sys.path.append(str(tmp_path)) with open(module_fn, "w") as module_f: module_f.write("def identity(col):\n return col") - + import not_a_real_module f_0 = not_a_real_module.identity - f_1 = lambda x: not_a_real_module.identity(x) - f_2 = lambda x: f_0(x) + f_1 = lambda x: not_a_real_module.identity(x) # noqa + f_2 = lambda x: f_0(x) # noqa try: - for fn, f in {"not_a_real_module.identity" : f_0, - "lambda x: not_a_real_module.identity(x)" : f_1, - "lambda x: f_0(x)" : f_2}.items(): - assert(not_a_real_module in Workflow._getmodules([f]), f"inferred module dependencies from {fn}") - + for fn, f in { + "not_a_real_module.identity": f_0, + "lambda x: not_a_real_module.identity(x)": f_1, + "lambda x: f_0(x)": f_2, + }.items(): + assert not_a_real_module in Workflow._getmodules( + [f] + ), f"inferred module dependencies from {fn}" + finally: sys.path.pop() del sys.modules["not_a_real_module"] @@ -699,12 +704,10 @@ def test_workflow_explicit_modules_byvalue(tmp_path): with open(module_fn, "w") as module_f: module_f.write("def identity(col):\n return col") - + import not_a_real_module - wf = nvt.Workflow( - ["col_a"] >> nvt.ops.LambdaOp(not_a_real_module.identity) - ) + wf = nvt.Workflow(["col_a"] >> nvt.ops.LambdaOp(not_a_real_module.identity)) wf.save(str(tmp_path / "identity-workflow"), modules_byvalue=[not_a_real_module]) @@ -714,18 +717,17 @@ def test_workflow_explicit_modules_byvalue(tmp_path): Workflow.load(str(tmp_path / "identity-workflow")) + def test_workflow_auto_infer_modules_byvalue(tmp_path): module_fn = tmp_path / "not_a_real_module.py" sys.path.append(str(tmp_path)) with open(module_fn, "w") as module_f: module_f.write("def identity(col):\n return col") - + import not_a_real_module - wf = nvt.Workflow( - ["col_a"] >> nvt.ops.LambdaOp(not_a_real_module.identity) - ) + wf = nvt.Workflow(["col_a"] >> nvt.ops.LambdaOp(not_a_real_module.identity)) wf.save(str(tmp_path / "identity-workflow"), modules_byvalue="auto") @@ -734,4 +736,3 @@ def test_workflow_auto_infer_modules_byvalue(tmp_path): os.unlink(str(tmp_path / "not_a_real_module.py")) Workflow.load(str(tmp_path / "identity-workflow")) - From 742393f49588bb75284c979151649b9b725021b3 Mon Sep 17 00:00:00 2001 From: William Benton Date: Wed, 4 Jan 2023 20:06:35 -0600 Subject: [PATCH 5/6] Workflow.save: reset cloudpickle behavior changes on return --- nvtabular/workflow/workflow.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/nvtabular/workflow/workflow.py b/nvtabular/workflow/workflow.py index 3b4457960c7..f50cc8c9676 100755 --- a/nvtabular/workflow/workflow.py +++ b/nvtabular/workflow/workflow.py @@ -349,6 +349,9 @@ def save(self, path, modules_byvalue=None): o, ) + # track existing by-value modules + preexisting_modules_byvalue = set(cloudpickle.list_registry_pickle_by_value()) + # direct cloudpickle to serialize selected modules by value if modules_byvalue is None: modules_byvalue = [] @@ -376,9 +379,22 @@ def save(self, path, modules_byvalue=None): f'Failed to register modules to serialize by value. Reason given was "{str(ex)}"' ) - # dump out the full workflow (graph/stats/operators etc) using cloudpickle - with fs.open(fs.sep.join([path, "workflow.pkl"]), "wb") as o: - cloudpickle.dump(self, o) + try: + # dump out the full workflow (graph/stats/operators etc) using cloudpickle + with fs.open(fs.sep.join([path, "workflow.pkl"]), "wb") as o: + cloudpickle.dump(self, o) + finally: + # return all modules that we set to serialize by value to by-reference + # (i.e., retain modules that were set to serialize by value before this invocation) + + for m in modules_byvalue: + if isinstance(m, types.ModuleType): + if m.__name__ not in preexisting_modules_byvalue: + cloudpickle.unregister_pickle_by_value(m) + elif isinstance(m, str) and m in sys.modules: + if m not in preexisting_modules_byvalue: + cloudpickle.unregister_pickle_by_value(sys.modules[m]) + @classmethod def load(cls, path, client=None) -> "Workflow": From cb625fb9c8ceee4be1a336d92bfa35a66b6f9932 Mon Sep 17 00:00:00 2001 From: William Benton Date: Thu, 19 Jan 2023 11:15:39 -0600 Subject: [PATCH 6/6] aligned formatting with black's expectations --- nvtabular/workflow/workflow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nvtabular/workflow/workflow.py b/nvtabular/workflow/workflow.py index f50cc8c9676..2d0d5f9125d 100755 --- a/nvtabular/workflow/workflow.py +++ b/nvtabular/workflow/workflow.py @@ -390,12 +390,11 @@ def save(self, path, modules_byvalue=None): for m in modules_byvalue: if isinstance(m, types.ModuleType): if m.__name__ not in preexisting_modules_byvalue: - cloudpickle.unregister_pickle_by_value(m) + cloudpickle.unregister_pickle_by_value(m) elif isinstance(m, str) and m in sys.modules: if m not in preexisting_modules_byvalue: cloudpickle.unregister_pickle_by_value(sys.modules[m]) - @classmethod def load(cls, path, client=None) -> "Workflow": """Load up a saved workflow object from disk