Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No pool #79

Merged
merged 21 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ jobs:
if: success() || failure()
run: |
make compare test print

- name: Run Aggregate
if: success() || failure()
run: |
make aggregate test print
29 changes: 22 additions & 7 deletions CRISPResso2/CRISPRessoAggregateCORE.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def main():
crispresso2_info = {'running_info': {}, 'results': {'alignment_stats': {}, 'general_plots': {}}} #keep track of all information for this run to be pickled and saved at the end of the run
crispresso2_info['running_info']['version'] = CRISPRessoShared.__version__
crispresso2_info['running_info']['args'] = deepcopy(args)
crispresso2_info['running_info']['command_used'] = ' '.join(sys.argv)

crispresso2_info['running_info']['log_filename'] = os.path.basename(log_filename)

Expand Down Expand Up @@ -227,7 +228,7 @@ def main():

if successfully_imported_count > 0:

crispresso2_folders = crispresso2_folder_infos.keys()
crispresso2_folders = list(sorted(crispresso2_folder_infos.keys()))
crispresso2_folder_names = {}
crispresso2_folder_htmls = {}#file_loc->html folder loc
quilt_plots_to_show = {} # name->{'href':path to report, 'img': png}
Expand Down Expand Up @@ -515,8 +516,10 @@ def main():
'fig_filename_root': this_window_nuc_pct_quilt_plot_name,
'save_also_png': save_png,
'sgRNA_intervals': sub_sgRNA_intervals,
'sgRNA_sequences': consensus_guides,
'quantification_window_idxs': include_idxs,
'group_column': 'Folder',
'custom_colors': None,
}
plot(
CRISPRessoPlot.plot_nucleotide_quilt,
Expand Down Expand Up @@ -550,8 +553,10 @@ def main():
'fig_filename_root': this_nuc_pct_quilt_plot_name,
'save_also_png': save_png,
'sgRNA_intervals': consensus_sgRNA_intervals,
'sgRNA_sequences': consensus_guides,
'quantification_window_idxs': include_idxs,
'group_column': 'Folder',
'custom_colors': None,
}
plot(
CRISPRessoPlot.plot_nucleotide_quilt,
Expand Down Expand Up @@ -589,8 +594,10 @@ def main():
'fig_filename_root': this_nuc_pct_quilt_plot_name,
'save_also_png': save_png,
'sgRNA_intervals': consensus_sgRNA_intervals,
'sgRNA_sequences': consensus_guides,
'quantification_window_idxs': consensus_include_idxs,
'group_column': 'Folder',
'custom_colors': None,
}
plot(
CRISPRessoPlot.plot_nucleotide_quilt,
Expand Down Expand Up @@ -654,6 +661,7 @@ def main():
'plot_path': plot_path,
'title': modification_type,
'div_id': heatmap_div_id,
'amplicon_name': amplicon_name,
}
plot(
CRISPRessoPlot.plot_allele_modification_heatmap,
Expand Down Expand Up @@ -687,6 +695,7 @@ def main():
'plot_path': plot_path,
'title': modification_type,
'div_id': line_div_id,
'amplicon_name': amplicon_name,
}
plot(
CRISPRessoPlot.plot_allele_modification_line,
Expand Down Expand Up @@ -779,7 +788,7 @@ def main():

header = 'Name\tUnmodified%\tModified%\tReads_total\tReads_aligned\tUnmodified\tModified\tDiscarded\tInsertions\tDeletions\tSubstitutions\tOnly Insertions\tOnly Deletions\tOnly Substitutions\tInsertions and Deletions\tInsertions and Substitutions\tDeletions and Substitutions\tInsertions Deletions and Substitutions'
header_els = header.split("\t")
df_summary_quantification=pd.DataFrame(quantification_summary, columns=header_els)
df_summary_quantification=pd.DataFrame(quantification_summary, columns=header_els).sort_values(by=['Name'])
samples_quantification_summary_filename = _jp('CRISPRessoAggregate_quantification_of_editing_frequency.txt') #this file has one line for each run (sum of all amplicons)
df_summary_quantification.fillna('NA').to_csv(samples_quantification_summary_filename, sep='\t', index=None)
crispresso2_info['results']['alignment_stats']['samples_quantification_summary_filename'] = os.path.basename(samples_quantification_summary_filename)
Expand Down Expand Up @@ -841,11 +850,17 @@ def main():
report_filename = OUTPUT_DIRECTORY+'.html'
if (args.place_report_in_output_folder):
report_filename = _jp("CRISPResso2Aggregate_report.html")
CRISPRessoReport.make_aggregate_report(crispresso2_info, args.name,
report_filename, OUTPUT_DIRECTORY,
_ROOT, crispresso2_folders,
crispresso2_folder_htmls,
quilt_plots_to_show)
CRISPRessoReport.make_aggregate_report(
crispresso2_info,
args.name,
report_filename,
OUTPUT_DIRECTORY,
_ROOT,
crispresso2_folders,
crispresso2_folder_htmls,
logger,
compact_plots_to_show=quilt_plots_to_show,
)
crispresso2_info['running_info']['report_location'] = report_filename
crispresso2_info['running_info']['report_filename'] = os.path.basename(report_filename)
else: #no files successfully imported
Expand Down
115 changes: 73 additions & 42 deletions CRISPResso2/CRISPRessoMultiProcessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ def run_crispresso(crispresso_cmds, descriptor, idx):
idx: index of the command to run
"""
crispresso_cmd=crispresso_cmds[idx]
logger = logging.getLogger(getmodule(stack()[1][0]).__name__)

logging.info('Running CRISPResso on %s #%d/%d: %s' % (descriptor, idx, len(crispresso_cmds), crispresso_cmd))
logger.info('Running CRISPResso on %s #%d/%d: %s' % (descriptor, idx, len(crispresso_cmds), crispresso_cmd))

return_value = sb.call(crispresso_cmd, shell=True)

if return_value == 137:
logging.warn('CRISPResso was killed by your system (return value %d) on %s #%d: "%s"\nPlease reduce the number of processes (-p) and run again.'%(return_value, descriptor, idx, crispresso_cmd))
logger.warn('CRISPResso was killed by your system (return value %d) on %s #%d: "%s"\nPlease reduce the number of processes (-p) and run again.'%(return_value, descriptor, idx, crispresso_cmd))
elif return_value != 0:
logging.warn('CRISPResso command failed (return value %d) on %s #%d: "%s"'%(return_value, descriptor, idx, crispresso_cmd))
logger.warn('CRISPResso command failed (return value %d) on %s #%d: "%s"'%(return_value, descriptor, idx, crispresso_cmd))
else:
logging.info('Finished CRISPResso %s #%d' %(descriptor, idx))
logger.info('Finished CRISPResso %s #%d' %(descriptor, idx))
return return_value


Expand Down Expand Up @@ -91,11 +92,12 @@ def run_crispresso_cmds(crispresso_cmds, n_processes="1", descriptor = 'region',
int_n_processes = int(n_processes)

logger.info("Running CRISPResso with %d processes" % int_n_processes)
pool = mp.Pool(processes=int_n_processes)
if int_n_processes > 1:
pool = mp.Pool(processes=int_n_processes)
pFunc = partial(run_crispresso, crispresso_cmds, descriptor)
p_wrapper = partial(wrapper, pFunc)
idxs = range(len(crispresso_cmds))
ret_vals = [None] * len(crispresso_cmds)
pFunc = partial(run_crispresso, crispresso_cmds, descriptor)
p_wrapper = partial(wrapper, pFunc)
if start_end_percent is not None:
percent_complete_increment = start_end_percent[1] - start_end_percent[0]
percent_complete_step = percent_complete_increment / len(crispresso_cmds)
Expand All @@ -109,14 +111,24 @@ def run_crispresso_cmds(crispresso_cmds, n_processes="1", descriptor = 'region',
signal.signal(signal.SIGINT, original_sigint_handler)
try:
completed = 0
for idx, res in pool.imap_unordered(p_wrapper, enumerate(idxs)):
ret_vals[idx] = res
completed += 1
percent_complete += percent_complete_step
logger.info(
"Completed {0}/{1} runs".format(completed, len(crispresso_cmds)),
{'percent_complete': percent_complete},
)
if int_n_processes == 1:
for idx, cmd in enumerate(crispresso_cmds):
ret_vals[idx] = run_crispresso(crispresso_cmds, descriptor, idx)
completed += 1
percent_complete += percent_complete_step
logger.info(
"Completed {0}/{1} runs".format(completed, len(crispresso_cmds)),
{'percent_complete': percent_complete},
)
else:
for idx, res in pool.imap_unordered(p_wrapper, enumerate(idxs)):
ret_vals[idx] = res
completed += 1
percent_complete += percent_complete_step
logger.info(
"Completed {0}/{1} runs".format(completed, len(crispresso_cmds)),
{'percent_complete': percent_complete},
)
for idx, ret in enumerate(ret_vals):
if ret == 137:
raise Exception('CRISPResso %s #%d was killed by your system. Please decrease the number of processes (-p) and run again.'%(descriptor, idx))
Expand All @@ -135,8 +147,10 @@ def run_crispresso_cmds(crispresso_cmds, n_processes="1", descriptor = 'region',
if descriptor.endswith("ch") or descriptor.endswith("sh"):
plural = descriptor+"es"
logger.info("Finished all " + plural)
pool.close()
pool.join()
if int_n_processes > 1:
pool.close()
if int_n_processes > 1:
pool.join()

def run_pandas_apply_parallel(input_df, input_function_chunk, n_processes=1):
"""
Expand All @@ -163,7 +177,10 @@ def input_function_chunk(df):
#shuffle the dataset to avoid finishing all the ones on top while leaving the ones on the bottom unfinished
n_splits = min(n_processes, len(input_df))
df_split = np.array_split(input_df.sample(frac=1), n_splits)
pool = mp.Pool(processes = n_splits)
if n_processes > 1:
pool = mp.Pool(processes = n_splits)
else:
return input_function_chunk(input_df)

#handle signals -- bug in python 2.7 (https://stackoverflow.com/questions/11312525/catch-ctrlc-sigint-and-exit-multiprocesses-gracefully-in-python)
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
Expand Down Expand Up @@ -193,40 +210,55 @@ def run_function_on_array_chunk_parallel(input_array, input_function, n_processe
input_function: function to run on chunks of the array
input_function should take in a smaller array of objects
"""
pool = mp.Pool(processes = n_processes)

#handle signals -- bug in python 2.7 (https://stackoverflow.com/questions/11312525/catch-ctrlc-sigint-and-exit-multiprocesses-gracefully-in-python)
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGINT, original_sigint_handler)
try:
n = int(max(10, len(input_array)/n_processes)) #don't parallelize unless at least 10 tasks
input_chunks = [input_array[i * n:(i + 1) * n] for i in range((len(input_array) + n - 1) // n )]
r = pool.map_async(input_function, input_chunks)
results = r.get(60*60*60) # Without the timeout this blocking call ignores all signals.
except KeyboardInterrupt:
pool.terminate()
logging.warn('Caught SIGINT. Program Terminated')
raise Exception('CRISPResso2 Terminated')
exit (0)
except Exception as e:
print('CRISPResso2 failed')
raise e
if n_processes == 1:
try:
results = input_function(input_array)
except Exception as e:
print('CRISPResso2 failed')
raise e
return results
else:
pool.close()
pool.join()
return [y for x in results for y in x]
pool = mp.Pool(processes = n_processes)

#handle signals -- bug in python 2.7 (https://stackoverflow.com/questions/11312525/catch-ctrlc-sigint-and-exit-multiprocesses-gracefully-in-python)
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGINT, original_sigint_handler)
try:
n = int(max(10, len(input_array)/n_processes)) #don't parallelize unless at least 10 tasks
input_chunks = [input_array[i * n:(i + 1) * n] for i in range((len(input_array) + n - 1) // n )]
r = pool.map_async(input_function, input_chunks)
results = r.get(60*60*60) # Without the timeout this blocking call ignores all signals.
except KeyboardInterrupt:
pool.terminate()
logging.warn('Caught SIGINT. Program Terminated')
raise Exception('CRISPResso2 Terminated')
exit (0)
except Exception as e:
print('CRISPResso2 failed')
raise e
else:
pool.close()
pool.join()
return [y for x in results for y in x]



def run_subprocess(cmd):
return sb.call(cmd, shell=True)

def run_parallel_commands(commands_arr,n_processes=1,descriptor='CRISPResso2',continue_on_fail=False):
def run_parallel_commands(commands_arr, n_processes=1, descriptor='CRISPResso2', continue_on_fail=False):
"""
input: commands_arr: list of shell commands to run
descriptor: string to print out to user describing run
"""
pool = mp.Pool(processes = n_processes)
if n_processes > 1:
pool = mp.Pool(processes = n_processes)
else:
for idx, command in enumerate(commands_arr):
return_value = run_subprocess(command)
if return_value != 0 and not continue_on_fail:
raise Exception(f'{descriptor} #{idx} was failed')
return

#handle signals -- bug in python 2.7 (https://stackoverflow.com/questions/11312525/catch-ctrlc-sigint-and-exit-multiprocesses-gracefully-in-python)
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
Expand Down Expand Up @@ -281,4 +313,3 @@ def run_plot(plot_func, plot_args, num_processes, process_futures, process_pool)
except Exception as e:
logger.warn(f"Plot error {e}, skipping plot \n")
logger.debug(traceback.format_exc())

4 changes: 4 additions & 0 deletions CRISPResso2/CRISPRessoReports/CRISPRessoReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ def make_aggregate_report(
_ROOT,
folder_arr,
crispresso_html_reports,
logger,
compact_plots_to_show=None,
display_names=None,
):
Expand All @@ -660,6 +661,7 @@ def make_aggregate_report(
_ROOT (string): location of crispresso assets (images, templates, etc)
folder_arr (arr of strings): paths to the aggregated crispresso folders
crispresso_html_reports (dict): folder->html_path; Paths to the aggregated crispresso run html reports
logger (logging.Logger): logger to log messages
compact_plots_to_show (dict): name=>{'href': path to target(report) when user clicks on image, 'img': path to png image to show}
display_names (dict): folder->display_name; Titles to be shown for crispresso runs
(if different from names_arr, e.g. if display_names have spaces or bad chars, they won't be the same as names_arr)
Expand Down Expand Up @@ -778,6 +780,8 @@ def make_aggregate_report(
crispresso_report_folder,
_ROOT,
report_name,
'aggregate',
logger,
window_nuc_pct_quilts=window_nuc_pct_quilts,
nuc_pct_quilts=nuc_pct_quilts,
summary_plots=summary_plots,
Expand Down
Loading