Skip to content

Commit

Permalink
refactor(cluster): rename num_threads to jobs in WorkloadParams class…
Browse files Browse the repository at this point in the history
… and related methods
  • Loading branch information
realSAH committed Jun 26, 2023
1 parent 5443342 commit 3713c38
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion myresources/crocodile/cluster/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_workload_params(self, machines_specs: list[MachineSpecs], threads_per_ma
print(f"All values: {tmp=}, {idx_so_far=}, {idx1=}, {idx2=}, {self.max_num=}, {a_threads_per_machine=}, {machine_index=}, {machine_specs=}, {load_value=}, {self.load_ratios=}, {self.load_ratios_repr=}")
raise ValueError(f"idx2 ({idx2}) > max_num ({self.max_num})")
idx_so_far = idx2
tmp.append(WorkloadParams(idx_start=idx1, idx_end=idx2, idx_max=self.max_num, num_threads=a_threads_per_machine))
tmp.append(WorkloadParams(idx_start=idx1, idx_end=idx2, idx_max=self.max_num, jobs=a_threads_per_machine))
return tmp


Expand Down
6 changes: 3 additions & 3 deletions myresources/crocodile/cluster/meta_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ def get_execution_line(func_name, func_class, rel_full_path, workload_params: Wo
final_func = f"""module{('.' + func_class) if func_class is not None else ''}.{func_name}"""
if parallelize:
assert workload_params is not None
kwargs_split = tb.L(range(workload_params.idx_start, workload_params.idx_end, 1)).split(to=workload_params.num_threads).apply(lambda sub_list: WorkloadParams(idx_start=sub_list[0], idx_end=sub_list[-1] + 1, idx_max=workload_params.idx_max, num_threads=workload_params.num_threads))
kwargs_split = tb.L(range(workload_params.idx_start, workload_params.idx_end, 1)).split(to=workload_params.jobs).apply(lambda sub_list: WorkloadParams(idx_start=sub_list[0], idx_end=sub_list[-1] + 1, idx_max=workload_params.idx_max, jobs=workload_params.jobs))
# Note: like MachineLoadCalculator get_kwargs, the behaviour is to include the edge cases on both ends of subsequent intervals.
base_func = f"""
print(f"This machine will execute ({(workload_params.idx_end - workload_params.idx_start) / workload_params.idx_max * 100:.2f}%) of total job workload.")
print(f"This share of workload will be split among {workload_params.num_threads} of threads on this machine.")
print(f"This share of workload will be split among {workload_params.jobs} of threads on this machine.")
kwargs_workload = {list(kwargs_split.apply(lambda a_kwargs: a_kwargs.__dict__))}
workload_params = []
for idx, x in enumerate(kwargs_workload):
tb.S(x).print(as_config=True, title=f"Instance {{idx}}")
workload_params.append(WorkloadParams(**x))
print("\\n" * 2)
res = tb.L(workload_params).apply(lambda a_workload_params: {final_func}(workload_params=a_workload_params, **func_kwargs), jobs={workload_params.num_threads})
res = tb.L(workload_params).apply(lambda a_workload_params: {final_func}(workload_params=a_workload_params, **func_kwargs), jobs={workload_params.jobs})
# res = tb.P(res[0]).parent if type(res[0]) is str else res
# res = {final_func}(workload_params=workload_params, **func_kwargs)
"""
Expand Down
2 changes: 1 addition & 1 deletion myresources/crocodile/cluster/remote_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class WorkloadParams:
idx_start: int = 0
idx_end: int = 1000
idx_max: int = 1000
num_threads: int = 3
jobs: int = 3
@property
def save_suffix(self) -> str: return f"machine_{self.idx_start}_{self.idx_end}"

Expand Down
4 changes: 2 additions & 2 deletions myresources/crocodile/cluster/template_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def func_single_job(workload_params: WorkloadParams, *args, **kwargs) -> tb.P:

@staticmethod
def func(workload_params: WorkloadParams, *args, **kwargs) -> tb.P:
per_job_workload_params = tb.L(range(workload_params.idx_start, workload_params.idx_end, 1)).split(to=workload_params.num_threads).apply(lambda sub_list: WorkloadParams(idx_start=sub_list[0], idx_end=sub_list[-1] + 1, idx_max=workload_params.idx_max, num_threads=workload_params.num_threads))
res = tb.L(per_job_workload_params).apply(lambda a_workload_params: ExpensiveComputation.func_single_job(*args, workload_params=a_workload_params, **kwargs), jobs=workload_params.num_threads)
per_job_workload_params = tb.L(range(workload_params.idx_start, workload_params.idx_end, 1)).split(to=workload_params.jobs).apply(lambda sub_list: WorkloadParams(idx_start=sub_list[0], idx_end=sub_list[-1] + 1, idx_max=workload_params.idx_max, jobs=workload_params.jobs))
res = tb.L(per_job_workload_params).apply(lambda a_workload_params: ExpensiveComputation.func_single_job(*args, workload_params=a_workload_params, **kwargs), jobs=workload_params.jobs)
return res[0] if len(res) == 1 else res

@staticmethod
Expand Down

0 comments on commit 3713c38

Please sign in to comment.