Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Freemem #160

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion tpot2/evolvers/steady_state_evolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ def optimize(self):
print("Cancelled future (likely memory related)")
scores = [np.nan for _ in range(len(self.objective_names))]
eval_error = "INVALID"
client.run(gc.collect)
else: #if the future is done and did not throw an error, get the scores
try:
scores = completed_future.result()
Expand All @@ -466,13 +467,14 @@ def optimize(self):
print("cancelld ", completed_future.cancelled())
scores = [np.nan for _ in range(len(self.objective_names))]
eval_error = "INVALID"
completed_future.release() #release the future
else: #if future is not done

if self.max_eval_time_mins is not None:
#check if the future has been running for too long, cancel the future
if time.time() - submitted_futures[completed_future]["time"] > self.max_eval_time_mins*1.25*60:
completed_future.cancel()

completed_future.release() #release the future
if self.verbose >= 4:
print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n')

Expand Down Expand Up @@ -506,6 +508,8 @@ def optimize(self):
self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="INVALID")
self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT")

#I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start.
#client.run(gc.collect) #run garbage collection to free up memory

###############################
# Step 2: Early Stopping
Expand Down Expand Up @@ -717,6 +721,10 @@ def optimize(self):
#done, cleanup futures
for future in submitted_futures.keys():
future.cancel()
future.release() #release the future

#I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start.
#client.run(gc.collect) #run garbage collection to free up memory

#checkpoint
if self.population_file is not None:
Expand Down
15 changes: 12 additions & 3 deletions tpot2/utils/eval_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from dask.distributed import progress
import distributed
import func_timeout
import gc

def process_scores(scores, n):
'''
Expand Down Expand Up @@ -163,6 +164,7 @@ def parallel_eval_objective_list(individual_list,
print("Cancelled future (likely memory related)")
scores = [np.nan for _ in range(n_expected_columns)]
eval_error = "INVALID"
client.run(gc.collect)
else: #if the future is done and did not throw an error, get the scores
try:
scores = completed_future.result()
Expand All @@ -186,20 +188,23 @@ def parallel_eval_objective_list(individual_list,
print("cancelld ", completed_future.cancelled())
scores = [np.nan for _ in range(n_expected_columns)]
eval_error = "INVALID"

completed_future.release() #release the future
else: #if future is not done

# check if the future has been running for too long, cancel the future
# we multiply max_eval_time_mins by 1.25 since the objective function in the future should be able to cancel itself. This is a backup in case it doesn't.
if max_eval_time_mins is not None and time.time() - submitted_futures[completed_future]["time"] > max_eval_time_mins*1.25*60:
completed_future.cancel()

completed_future.release()
if verbose >= 4:
print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n')

scores = [np.nan for _ in range(n_expected_columns)]
eval_error = "TIMEOUT"
elif global_timeout_triggered:
completed_future.cancel()
completed_future.release()

if verbose >= 4:
print(f'WARNING AN INDIVIDUAL TIMED OUT (max_time_mins): \n {submitted_futures[completed_future]} \n')
Expand All @@ -222,6 +227,10 @@ def parallel_eval_objective_list(individual_list,
#update submitted futures
submitted_futures.pop(completed_future)


#I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start.
#client.run(gc.collect) #run garbage collection to free up memory

#break if timeout
if global_timeout_triggered:
while len(individual_stack) > 0:
Expand All @@ -243,10 +252,10 @@ def parallel_eval_objective_list(individual_list,

submitted_inds.add(individual.unique_id())

#I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start.
#client.run(gc.collect) #run garbage collection to free up memory

#collect remaining futures


final_scores = [scores_dict[individual]["scores"] for individual in individual_list]
final_start_times = [scores_dict[individual]["start_time"] for individual in individual_list]
final_end_times = [scores_dict[individual]["end_time"] for individual in individual_list]
Expand Down
Loading