diff --git a/tpot2/evolvers/steady_state_evolver.py b/tpot2/evolvers/steady_state_evolver.py index 9a9f5708..f077b05f 100644 --- a/tpot2/evolvers/steady_state_evolver.py +++ b/tpot2/evolvers/steady_state_evolver.py @@ -444,6 +444,7 @@ def optimize(self): print("Cancelled future (likely memory related)") scores = [np.nan for _ in range(len(self.objective_names))] eval_error = "INVALID" + client.run(gc.collect) else: #if the future is done and did not throw an error, get the scores try: scores = completed_future.result() @@ -466,13 +467,14 @@ def optimize(self): print("cancelld ", completed_future.cancelled()) scores = [np.nan for _ in range(len(self.objective_names))] eval_error = "INVALID" + completed_future.release() #release the future else: #if future is not done if self.max_eval_time_mins is not None: #check if the future has been running for too long, cancel the future if time.time() - submitted_futures[completed_future]["time"] > self.max_eval_time_mins*1.25*60: completed_future.cancel() - + completed_future.release() #release the future if self.verbose >= 4: print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n') @@ -506,6 +508,8 @@ def optimize(self): self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="INVALID") self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") + #I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start. + #client.run(gc.collect) #run garbage collection to free up memory ############################### # Step 2: Early Stopping @@ -717,6 +721,10 @@ def optimize(self): #done, cleanup futures for future in submitted_futures.keys(): future.cancel() + future.release() #release the future + + #I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start. + #client.run(gc.collect) #run garbage collection to free up memory #checkpoint if self.population_file is not None: diff --git a/tpot2/utils/eval_utils.py b/tpot2/utils/eval_utils.py index 7f93cef4..bc308164 100644 --- a/tpot2/utils/eval_utils.py +++ b/tpot2/utils/eval_utils.py @@ -49,6 +49,7 @@ from dask.distributed import progress import distributed import func_timeout +import gc def process_scores(scores, n): ''' @@ -163,6 +164,7 @@ def parallel_eval_objective_list(individual_list, print("Cancelled future (likely memory related)") scores = [np.nan for _ in range(n_expected_columns)] eval_error = "INVALID" + client.run(gc.collect) else: #if the future is done and did not throw an error, get the scores try: scores = completed_future.result() @@ -186,13 +188,15 @@ def parallel_eval_objective_list(individual_list, print("cancelld ", completed_future.cancelled()) scores = [np.nan for _ in range(n_expected_columns)] eval_error = "INVALID" + + completed_future.release() #release the future else: #if future is not done # check if the future has been running for too long, cancel the future # we multiply max_eval_time_mins by 1.25 since the objective function in the future should be able to cancel itself. This is a backup in case it doesn't. if max_eval_time_mins is not None and time.time() - submitted_futures[completed_future]["time"] > max_eval_time_mins*1.25*60: completed_future.cancel() - + completed_future.release() if verbose >= 4: print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n') @@ -200,6 +204,7 @@ def parallel_eval_objective_list(individual_list, eval_error = "TIMEOUT" elif global_timeout_triggered: completed_future.cancel() + completed_future.release() if verbose >= 4: print(f'WARNING AN INDIVIDUAL TIMED OUT (max_time_mins): \n {submitted_futures[completed_future]} \n') @@ -222,6 +227,10 @@ def parallel_eval_objective_list(individual_list, #update submitted futures submitted_futures.pop(completed_future) + + #I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start. + #client.run(gc.collect) #run garbage collection to free up memory + #break if timeout if global_timeout_triggered: while len(individual_stack) > 0: @@ -243,10 +252,10 @@ def parallel_eval_objective_list(individual_list, submitted_inds.add(individual.unique_id()) + #I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start. + #client.run(gc.collect) #run garbage collection to free up memory #collect remaining futures - - final_scores = [scores_dict[individual]["scores"] for individual in individual_list] final_start_times = [scores_dict[individual]["start_time"] for individual in individual_list] final_end_times = [scores_dict[individual]["end_time"] for individual in individual_list]