From df176c7ac3acda0ac17cf376a5b9c6686e597e7c Mon Sep 17 00:00:00 2001 From: shingjan Date: Fri, 20 Aug 2021 13:54:12 -0700 Subject: [PATCH 1/7] replacd multiprocessing.Pool with PopenPoolExecutor --- .../tvm/autotvm/tuner/xgboost_cost_model.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/python/tvm/autotvm/tuner/xgboost_cost_model.py b/python/tvm/autotvm/tuner/xgboost_cost_model.py index 81904354c5fd..fb884d048d98 100644 --- a/python/tvm/autotvm/tuner/xgboost_cost_model.py +++ b/python/tvm/autotvm/tuner/xgboost_cost_model.py @@ -23,6 +23,8 @@ import numpy as np +from tvm.contrib.popen_pool import PopenPoolExecutor + from .. import feature from ..utils import get_rank from .metric import max_curve, recall_curve, cover_curve @@ -153,20 +155,18 @@ def _reset_pool(self, space, target, task): self._close_pool() - # Use global variable to pass common arguments. This is only used when - # new processes are started with fork. We have to set the globals - # before we create the pool, so that processes in the pool get the - # correct globals. - global _extract_space, _extract_target, _extract_task - _extract_space = space - _extract_target = target - _extract_task = task - self.pool = multiprocessing.Pool(self.num_threads) + def initializer(space_arg, target_arg, task_arg): + global _extract_space, _extract_target, _extract_task + _extract_space = space_arg + _extract_target = target_arg + _extract_task = task_arg + + self.pool = PopenPoolExecutor( + max_workers=self.num_threads, initializer=initializer, initargs=(space, target, task) + ) def _close_pool(self): if self.pool: - self.pool.terminate() - self.pool.join() self.pool = None def _get_pool(self): @@ -247,7 +247,7 @@ def fit_log(self, records, plan_size, min_seed_records=500): feature_extract_func = _extract_curve_feature_log else: raise RuntimeError("Invalid feature type: " + self.fea_type) - res = pool.map(feature_extract_func, data) + res = pool.map_with_error_catching(feature_extract_func, data) # filter out feature with different shapes fea_len = len(self._get_feature([0])[0]) @@ -329,10 +329,10 @@ def _get_feature(self, indexes): pool = self._get_pool() # If we are forking, we can pass arguments in globals for better performance if multiprocessing.get_start_method(False) == "fork": - feas = pool.map(self.feature_extract_func, need_extract) + feas = pool.map_with_error_catching(self.feature_extract_func, need_extract) else: args = [(self.space.get(x), self.target, self.task) for x in need_extract] - feas = pool.map(self.feature_extract_func, args) + feas = pool.map_with_error_catching(self.feature_extract_func, args) for i, fea in zip(need_extract, feas): fea_cache[i] = fea From 169e23c54a820ee1a5361acea2bd1913fc9a7a69 Mon Sep 17 00:00:00 2001 From: shingjan Date: Fri, 20 Aug 2021 17:27:40 -0700 Subject: [PATCH 2/7] add initializer func --- .../tvm/autotvm/tuner/xgboost_cost_model.py | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/python/tvm/autotvm/tuner/xgboost_cost_model.py b/python/tvm/autotvm/tuner/xgboost_cost_model.py index fb884d048d98..209838478d83 100644 --- a/python/tvm/autotvm/tuner/xgboost_cost_model.py +++ b/python/tvm/autotvm/tuner/xgboost_cost_model.py @@ -146,6 +146,12 @@ def __init__( self._sample_size = 0 self._reset_pool(self.space, self.target, self.task) + def _initializer(space, target, task): + global _extract_space, _extract_target, _extract_task + _extract_space = space + _extract_target = target + _extract_task = task + def _reset_pool(self, space, target, task): """reset processing pool for feature extraction""" @@ -155,14 +161,10 @@ def _reset_pool(self, space, target, task): self._close_pool() - def initializer(space_arg, target_arg, task_arg): - global _extract_space, _extract_target, _extract_task - _extract_space = space_arg - _extract_target = target_arg - _extract_task = task_arg - self.pool = PopenPoolExecutor( - max_workers=self.num_threads, initializer=initializer, initargs=(space, target, task) + max_workers=self.num_threads, + initializer=self._initializer, + initargs=(space, target, task), ) def _close_pool(self): @@ -327,14 +329,11 @@ def _get_feature(self, indexes): if need_extract: pool = self._get_pool() - # If we are forking, we can pass arguments in globals for better performance - if multiprocessing.get_start_method(False) == "fork": - feas = pool.map_with_error_catching(self.feature_extract_func, need_extract) - else: - args = [(self.space.get(x), self.target, self.task) for x in need_extract] - feas = pool.map_with_error_catching(self.feature_extract_func, args) + feas = pool.map_with_error_catching(self.feature_extract_func, need_extract) for i, fea in zip(need_extract, feas): - fea_cache[i] = fea + fea_cache[i] = fea.value + print(fea) + print("_______________________\n") feature_len = None for idx in indexes: @@ -361,14 +360,10 @@ def __del__(self): def _extract_itervar_feature_index(args): """extract iteration var feature for an index in extract_space""" try: - if multiprocessing.get_start_method(False) == "fork": - config = _extract_space.get(args) - with _extract_target: - sch, fargs = _extract_task.instantiate(config) - else: - config, target, task = args - with target: - sch, fargs = task.instantiate(config) + config = _extract_space.get(args) + with _extract_target: + sch, fargs = _extract_task.instantiate(config) + fea = feature.get_itervar_feature_flatten(sch, fargs, take_log=True) fea = np.concatenate((fea, list(config.get_other_option().values()))) return fea @@ -398,10 +393,9 @@ def _extract_itervar_feature_log(arg): def _extract_knob_feature_index(args): """extract knob feature for an index in extract_space""" try: - if multiprocessing.get_start_method(False) == "fork": - config = _extract_space.get(args) - else: - config = args[0] + + config = _extract_space.get(args) + return config.get_flatten_feature() except Exception: # pylint: disable=broad-except return None @@ -428,14 +422,11 @@ def _extract_knob_feature_log(arg): def _extract_curve_feature_index(args): """extract sampled curve feature for an index in extract_space""" try: - if multiprocessing.get_start_method(False) == "fork": - config = _extract_space.get(args) - with _extract_target: - sch, fargs = _extract_task.instantiate(config) - else: - config, target, task = args - with target: - sch, fargs = task.instantiate(config) + + config = _extract_space.get(args) + with _extract_target: + sch, fargs = _extract_task.instantiate(config) + fea = feature.get_buffer_curve_sample_flatten(sch, fargs, sample_n=20) fea = np.concatenate((fea, list(config.get_other_option().values()))) return np.array(fea) From fea379141dc525793c91abff63860958630700fd Mon Sep 17 00:00:00 2001 From: shingjan Date: Mon, 23 Aug 2021 14:00:09 -0700 Subject: [PATCH 3/7] static init func --- python/tvm/autotvm/tuner/xgboost_cost_model.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/tvm/autotvm/tuner/xgboost_cost_model.py b/python/tvm/autotvm/tuner/xgboost_cost_model.py index 209838478d83..55196082c512 100644 --- a/python/tvm/autotvm/tuner/xgboost_cost_model.py +++ b/python/tvm/autotvm/tuner/xgboost_cost_model.py @@ -163,7 +163,7 @@ def _reset_pool(self, space, target, task): self.pool = PopenPoolExecutor( max_workers=self.num_threads, - initializer=self._initializer, + initializer=XGBoostCostModel._initializer, initargs=(space, target, task), ) @@ -332,8 +332,6 @@ def _get_feature(self, indexes): feas = pool.map_with_error_catching(self.feature_extract_func, need_extract) for i, fea in zip(need_extract, feas): fea_cache[i] = fea.value - print(fea) - print("_______________________\n") feature_len = None for idx in indexes: From dd0d7bbf8bcfe0a455d1d04d3ddd26dbab05039b Mon Sep 17 00:00:00 2001 From: shingjan Date: Mon, 23 Aug 2021 14:50:03 -0700 Subject: [PATCH 4/7] address comments --- .../tvm/autotvm/tuner/xgboost_cost_model.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/python/tvm/autotvm/tuner/xgboost_cost_model.py b/python/tvm/autotvm/tuner/xgboost_cost_model.py index 55196082c512..95b7a9105249 100644 --- a/python/tvm/autotvm/tuner/xgboost_cost_model.py +++ b/python/tvm/autotvm/tuner/xgboost_cost_model.py @@ -23,7 +23,7 @@ import numpy as np -from tvm.contrib.popen_pool import PopenPoolExecutor +from tvm.contrib.popen_pool import PopenPoolExecutor, StatusKind from .. import feature from ..utils import get_rank @@ -146,12 +146,6 @@ def __init__( self._sample_size = 0 self._reset_pool(self.space, self.target, self.task) - def _initializer(space, target, task): - global _extract_space, _extract_target, _extract_task - _extract_space = space - _extract_target = target - _extract_task = task - def _reset_pool(self, space, target, task): """reset processing pool for feature extraction""" @@ -163,7 +157,7 @@ def _reset_pool(self, space, target, task): self.pool = PopenPoolExecutor( max_workers=self.num_threads, - initializer=XGBoostCostModel._initializer, + initializer=_extract_popen_initializer, initargs=(space, target, task), ) @@ -331,7 +325,7 @@ def _get_feature(self, indexes): pool = self._get_pool() feas = pool.map_with_error_catching(self.feature_extract_func, need_extract) for i, fea in zip(need_extract, feas): - fea_cache[i] = fea.value + fea_cache[i] = fea.value if fea.status == StatusKind.COMPLETE else None feature_len = None for idx in indexes: @@ -355,6 +349,13 @@ def __del__(self): _extract_task = None +def _extract_popen_initializer(space, target, task): + global _extract_space, _extract_target, _extract_task + _extract_space = space + _extract_target = target + _extract_task = task + + def _extract_itervar_feature_index(args): """extract iteration var feature for an index in extract_space""" try: From b699901ef81feb77dd115cf0ed9bfcfd21a6afb6 Mon Sep 17 00:00:00 2001 From: shingjan Date: Mon, 23 Aug 2021 14:54:19 -0700 Subject: [PATCH 5/7] linting --- python/tvm/autotvm/tuner/xgboost_cost_model.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/tvm/autotvm/tuner/xgboost_cost_model.py b/python/tvm/autotvm/tuner/xgboost_cost_model.py index 95b7a9105249..c43ccf047533 100644 --- a/python/tvm/autotvm/tuner/xgboost_cost_model.py +++ b/python/tvm/autotvm/tuner/xgboost_cost_model.py @@ -17,7 +17,6 @@ # pylint: disable=invalid-name """XGBoost as cost model""" -import multiprocessing import logging import time From e372ad75354e410feff0a7d76348e5a964a24506 Mon Sep 17 00:00:00 2001 From: shingjan Date: Mon, 23 Aug 2021 16:40:22 -0700 Subject: [PATCH 6/7] fix tests --- python/tvm/autotvm/tuner/xgboost_cost_model.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/tvm/autotvm/tuner/xgboost_cost_model.py b/python/tvm/autotvm/tuner/xgboost_cost_model.py index c43ccf047533..13764b02431b 100644 --- a/python/tvm/autotvm/tuner/xgboost_cost_model.py +++ b/python/tvm/autotvm/tuner/xgboost_cost_model.py @@ -242,13 +242,14 @@ def fit_log(self, records, plan_size, min_seed_records=500): feature_extract_func = _extract_curve_feature_log else: raise RuntimeError("Invalid feature type: " + self.fea_type) - res = pool.map_with_error_catching(feature_extract_func, data) + result = pool.map_with_error_catching(feature_extract_func, data) # filter out feature with different shapes fea_len = len(self._get_feature([0])[0]) xs, ys = [], [] - for x, y in res: + for res in result: + x, y = res.value if len(x) == fea_len: xs.append(x) ys.append(y) From 486dfc853de570ef7b085ee08e9cdcfbae892366 Mon Sep 17 00:00:00 2001 From: shingjan Date: Mon, 23 Aug 2021 17:55:55 -0700 Subject: [PATCH 7/7] address comments --- python/tvm/autotvm/tuner/xgboost_cost_model.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/tvm/autotvm/tuner/xgboost_cost_model.py b/python/tvm/autotvm/tuner/xgboost_cost_model.py index 13764b02431b..99972ee3d74e 100644 --- a/python/tvm/autotvm/tuner/xgboost_cost_model.py +++ b/python/tvm/autotvm/tuner/xgboost_cost_model.py @@ -249,6 +249,8 @@ def fit_log(self, records, plan_size, min_seed_records=500): xs, ys = [], [] for res in result: + if res.status != StatusKind.COMPLETE: + continue x, y = res.value if len(x) == fea_len: xs.append(x)