Skip to content

Commit

Permalink
added walltime parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
raptor419 committed May 21, 2024
1 parent 7b04f70 commit 84b1a75
Show file tree
Hide file tree
Showing 16 changed files with 58 additions and 34 deletions.
File renamed without changes.
30 changes: 20 additions & 10 deletions streamline/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ def run(self):
random_state=self.params['random_state'],
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])

self.run_phase(eda, 1)
self.params['outcome_type'] = eda.outcome_type
Expand All @@ -241,7 +242,8 @@ def run(self):
instance_label=self.params['instance_label'], random_state=self.params['random_state'],
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])
self.run_phase(dpr, 2)

if self.params['do_feat_imp']:
Expand All @@ -255,7 +257,8 @@ def run(self):
random_state=self.params['random_state'], n_jobs=self.params['n_jobs'],
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])
self.run_phase(f_imp, 3)

if self.params['do_feat_sel']:
Expand All @@ -271,7 +274,8 @@ def run(self):
n_jobs=self.params['n_jobs'],
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])
self.run_phase(f_sel, 4)

if self.params['do_model']:
Expand All @@ -292,7 +296,8 @@ def run(self):
random_state=self.params['random_state'], n_jobs=self.params['n_jobs'],
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])

self.run_phase(model, 5)

Expand All @@ -308,7 +313,8 @@ def run(self):
exclude_plots=self.params['exclude_plots'], show_plots=False,
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])
self.run_phase(stats, 6)

if self.params['do_compare_dataset']:
Expand All @@ -320,7 +326,8 @@ def run(self):
show_plots=False,
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])
self.run_phase(compare, 7)

if self.params['do_report']:
Expand All @@ -329,7 +336,8 @@ def run(self):
experiment_path=None,
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])
self.run_phase(report, 8)

if self.params['do_replicate']:
Expand All @@ -341,7 +349,8 @@ def run(self):
exclude_plots=self.params['exclude_rep_plots'],
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])
self.run_phase(replicate, 9)

if self.params['do_rep_report']:
Expand All @@ -352,7 +361,8 @@ def run(self):
dataset_for_rep=self.params['dataset_for_rep'],
run_cluster=self.params['run_cluster'],
queue=self.params['queue'],
reserved_memory=self.params['reserved_memory'])
reserved_memory=self.params['reserved_memory'],
walltime=self.params['walltime'])
self.run_phase(report, 10)

if self.params['do_cleanup']:
Expand Down
3 changes: 2 additions & 1 deletion streamline/runners/compare_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class CompareRunner:
def __init__(self, output_path, experiment_name, experiment_path=None,
outcome_label="Class", outcome_type="Binary", instance_label=None, sig_cutoff=0.05,
show_plots=False,
run_cluster=False, queue='defq', reserved_memory=4):
run_cluster=False, queue='defq', reserved_memory=4, walltime=24):
"""
Args:
output_path: path to output directory
Expand Down Expand Up @@ -55,6 +55,7 @@ def __init__(self, output_path, experiment_name, experiment_path=None,
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime

# Argument checks
if not os.path.exists(self.output_path):
Expand Down
5 changes: 3 additions & 2 deletions streamline/runners/dataprocess_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, data_path, output_path, experiment_name, exclude_eda_output=N
ignore_features=None, categorical_features=None, quantitative_features=None, top_features=20,
categorical_cutoff=10, sig_cutoff=0.05, featureeng_missingness=0.5, cleaning_missingness=0.5,
correlation_removal_threshold=1.0,
random_state=None, run_cluster=False, queue='defq', reserved_memory=4, show_plots=False):
random_state=None, run_cluster=False, queue='defq', reserved_memory=4, walltime=24, show_plots=False):
"""
Initializer for a runner class for Exploratory Data Analysis Jobs
Expand Down Expand Up @@ -136,6 +136,7 @@ def __init__(self, data_path, output_path, experiment_name, exclude_eda_output=N
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime
self.show_plots = show_plots
self.random_state = random_state
self.sig_cutoff = sig_cutoff
Expand Down Expand Up @@ -207,7 +208,7 @@ def run(self, run_parallel=False):

if self.run_cluster and "Old" not in self.run_cluster:
get_cluster(self.run_cluster,
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory)
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory, self.walltime)
dask.compute([dask.delayed(
parallel_eda_call
)(job_obj, {'top_features': self.top_features}) for job_obj in job_obj_list])
Expand Down
10 changes: 6 additions & 4 deletions streamline/runners/feature_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class FeatureImportanceRunner:
def __init__(self, output_path, experiment_name, outcome_label="Class", instance_label=None,
instance_subset=None, algorithms=("MI", "MS"), use_turf=True, turf_pct=True,
random_state=None, n_jobs=None,
run_cluster=False, queue='defq', reserved_memory=4):
run_cluster=False, queue='defq', reserved_memory=4, walltime=24):
"""
Args:
Expand Down Expand Up @@ -57,6 +57,7 @@ def __init__(self, output_path, experiment_name, outcome_label="Class", instance
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime

if self.turf_pct == 'False' or self.turf_pct == False:
self.turf_pct = False
Expand Down Expand Up @@ -125,7 +126,7 @@ def run(self, run_parallel=False):
Parallel(n_jobs=num_cores)(delayed(runner_fn)(job_obj) for job_obj in job_list)
if self.run_cluster and "Old" not in self.run_cluster:
get_cluster(self.run_cluster,
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory)
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory, self.walltime)
dask.compute([dask.delayed(runner_fn)(job_obj) for job_obj in job_list])

def save_metadata(self):
Expand Down Expand Up @@ -210,7 +211,7 @@ class FeatureSelectionRunner:
def __init__(self, output_path, experiment_name, algorithms, outcome_label="Class", instance_label=None,
max_features_to_keep=2000, filter_poor_features=True, top_features=40, export_scores=True,
overwrite_cv=True, random_state=None, n_jobs=None,
run_cluster=False, queue='defq', reserved_memory=4, show_plots=False):
run_cluster=False, queue='defq', reserved_memory=4, walltime=24, show_plots=False):
"""
Args:
Expand Down Expand Up @@ -248,6 +249,7 @@ def __init__(self, output_path, experiment_name, algorithms, outcome_label="Clas
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime
self.show_plots = show_plots

if self.filter_poor_features == 'False' or self.filter_poor_features is False:
Expand Down Expand Up @@ -312,7 +314,7 @@ def run(self, run_parallel=False):
Parallel(n_jobs=num_cores)(delayed(runner_fn)(job_obj) for job_obj in job_list)
if self.run_cluster and "Old" not in self.run_cluster:
get_cluster(self.run_cluster,
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory)
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory, self.walltime)
dask.compute([dask.delayed(runner_fn)(job_obj) for job_obj in job_list])

def save_metadata(self):
Expand Down
5 changes: 3 additions & 2 deletions streamline/runners/imputation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ImputationRunner:

def __init__(self, output_path, experiment_name, scale_data=True, impute_data=True,
multi_impute=True, overwrite_cv=True, outcome_label="Class", instance_label=None, random_state=None,
run_cluster=False, queue='defq', reserved_memory=4):
run_cluster=False, queue='defq', reserved_memory=4, walltime=24):
"""
Args:
Expand All @@ -41,6 +41,7 @@ def __init__(self, output_path, experiment_name, scale_data=True, impute_data=Tr
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime

# Argument checks-------------------------------------------------------------
if not os.path.exists(self.output_path):
Expand Down Expand Up @@ -97,7 +98,7 @@ def run(self, run_parallel=False):
Parallel(n_jobs=num_cores)(delayed(runner_fn)(job_obj) for job_obj in job_list)
if self.run_cluster and "Old" not in self.run_cluster:
get_cluster(self.run_cluster,
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory)
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory, self.walltime)
dask.compute([dask.delayed(runner_fn)(job_obj) for job_obj in job_list])

def save_metadata(self):
Expand Down
5 changes: 3 additions & 2 deletions streamline/runners/model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, output_path, experiment_name, algorithms=None, exclude=("XCS"
timeout=900, save_plots=False, do_lcs_sweep=False, lcs_nu=1, lcs_n=2000, lcs_iterations=200000,
lcs_timeout=1200, resubmit=False, random_state=None, n_jobs=None,
run_cluster=False,
queue='defq', reserved_memory=4):
queue='defq', reserved_memory=4, walltime=24):

"""
Args:
Expand Down Expand Up @@ -163,6 +163,7 @@ def __init__(self, output_path, experiment_name, algorithms=None, exclude=("XCS"
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime

# Argument checks
if not os.path.exists(self.output_path):
Expand Down Expand Up @@ -294,7 +295,7 @@ def run(self, run_parallel=False):
) for job_obj, model in tqdm(job_list))
if self.run_cluster and "Old" not in self.run_cluster:
get_cluster(self.run_cluster,
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory)
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory, self.walltime)
dask.compute([dask.delayed(model_runner_fn)(job_obj, model
) for job_obj, model in job_list])

Expand Down
3 changes: 2 additions & 1 deletion streamline/runners/replicate_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ReplicationRunner:

def __init__(self, rep_data_path, dataset_for_rep, output_path, experiment_name,
outcome_label=None, instance_label=None, match_label=None,
exclude_plots=None, run_cluster=False, queue='defq', reserved_memory=4, show_plots=False):
exclude_plots=None, run_cluster=False, queue='defq', reserved_memory=4, walltime=24, show_plots=False):
"""
Args:
Expand Down Expand Up @@ -104,6 +104,7 @@ def __init__(self, rep_data_path, dataset_for_rep, output_path, experiment_name,
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime

# Argument checks
if not os.path.exists(self.output_path):
Expand Down
5 changes: 3 additions & 2 deletions streamline/runners/report_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import dask
from pathlib import Path
from joblib import Parallel, delayed
from streamline.postanalysis.gererate_report import ReportJob
from streamline.postanalysis.generate_report import ReportJob
from streamline.utils.runners import runner_fn
from streamline.utils.cluster import get_cluster

Expand All @@ -16,7 +16,7 @@ class ReportRunner:

def __init__(self, output_path=None, experiment_name=None, experiment_path=None,
training=True, rep_data_path=None, dataset_for_rep=None,
run_cluster=False, queue='defq', reserved_memory=4):
run_cluster=False, queue='defq', reserved_memory=4, walltime=24):
"""
Args:
output_path: path to output directory
Expand Down Expand Up @@ -46,6 +46,7 @@ def __init__(self, output_path=None, experiment_name=None, experiment_path=None,
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime
self.algorithms = None
self.get_algorithms()

Expand Down
5 changes: 3 additions & 2 deletions streamline/runners/stats_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, output_path, experiment_name,
scoring_metric='balanced_accuracy',
top_features=40, sig_cutoff=0.05, metric_weight='balanced_accuracy', scale_data=True,
exclude_plots=None, show_plots=False,
run_cluster=False, queue='defq', reserved_memory=4):
run_cluster=False, queue='defq', reserved_memory=4, walltime=24):
"""
Args:
output_path: path to output directory
Expand Down Expand Up @@ -81,6 +81,7 @@ def __init__(self, output_path, experiment_name,
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
self.walltime = walltime

# Argument checks
if not os.path.exists(self.output_path):
Expand Down Expand Up @@ -138,7 +139,7 @@ def run(self, run_parallel=False):
Parallel(n_jobs=num_cores)(delayed(runner_fn)(job_obj) for job_obj in job_list)
if self.run_cluster and "Old" not in self.run_cluster:
get_cluster(self.run_cluster,
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory)
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory, self.walltime)
dask.compute([dask.delayed(runner_fn)(job_obj) for job_obj in job_list])

def save_metadata(self):
Expand Down
10 changes: 5 additions & 5 deletions streamline/utils/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,30 @@
}


def get_cluster(cluster_type='SLURM', output_path=".", queue='defq', memory=4):
def get_cluster(cluster_type='SLURM', output_path=".", queue='defq', memory=4, walltime=24):
client = None
try:
if cluster_type == 'SLURM':
cluster = SLURMCluster(queue=queue,
cores=1,
memory=str(memory) + "G",
walltime="24:00:00",
walltime=str(walltime)+":00:00",
log_directory=output_path + "/dask_logs/")
cluster.adapt(maximum_jobs=400)
elif cluster_type == "LSF":
cluster = LSFCluster(queue=queue,
cores=1,
mem=memory * 1000000000,
memory=str(memory) + "G",
walltime="24:00",
walltime=str(walltime)+":00",
log_directory=output_path + "/dask_logs/")
cluster.adapt(maximum_jobs=400)
elif cluster_type == 'UGE':
cluster = SGECluster(queue=queue,
cores=1,
memory=str(memory) + "G",
resource_spec="mem_free=" + str(memory) + "G",
walltime="24:00:00",
walltime=str(walltime)+":00:00",
log_directory=output_path + "/dask_logs/")
cluster.adapt(maximum_jobs=400)
elif cluster_type == 'HTCondor':
Expand All @@ -53,7 +53,7 @@ def get_cluster(cluster_type='SLURM', output_path=".", queue='defq', memory=4):
cluster = cluster_dict[cluster_type](queue=queue,
cores=1,
memory=str(memory) + "G",
walltime="24:00:00",
walltime=str(walltime)+":00:00",
log_directory=output_path + "/dask_logs/")
cluster.adapt(maximum_jobs=400)
except KeyError:
Expand Down
1 change: 1 addition & 0 deletions streamline/utils/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ run_parallel = True
run_cluster = "SLURM"
reserved_memory = 4
queue = 'defq'
walltime = 24

[logging]
logging_level = 'INFO'
Expand Down
3 changes: 2 additions & 1 deletion streamline/utils/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def process_params(params):
params['run_parallel'] = False
if params['run_parallel'] == "True":
params['run_parallel'] = True

if type(params['walltime']) == str:
params['walltime'] = int(params['walltime'])
return params


Expand Down
Loading

0 comments on commit 84b1a75

Please sign in to comment.