Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use number of available CPUs instead of total number #158

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions losoto/lib_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,22 @@

# Some utilities for operations

import sys, multiprocessing
import multiprocessing
import os
import numpy as np
from losoto.h5parm import h5parm
from losoto._logging import logger as logging

def nproc():
"""
Return the number of CPU cores _available_ to the current process, similar
to what the Linux `nproc` command does. This can be less than the total
number of CPU cores in the machine, which is returned by, e.g.,
`multiprocessing.cpu_count()`
"""
return len(os.sched_getaffinity(0))


class multiprocManager(object):

class multiThread(multiprocessing.Process):
Expand Down Expand Up @@ -43,7 +54,7 @@ def __init__(self, procs=0, funct=None):
and it will be linked to the output queue
"""
if procs == 0:
procs = multiprocessing.cpu_count()
procs = nproc()
self.procs = procs
self._threads = []
self.inQueue = multiprocessing.JoinableQueue()
Expand Down
2 changes: 1 addition & 1 deletion losoto/operations/faraday.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def run( soltab, soltabOut='rotationmeasure000', refAnt='', maxResidual=1.,ncpu=

tuples = [(t,coord_rr,coord_ll,wt,vl,solType,coord,maxResidual) for t,wt,vl in zip(list(np.arange(len(times))), weightsliced, valsliced)]
if ncpu == 0:
ncpu = mp.cpu_count()
ncpu = nproc()
with mp.Pool(ncpu) as pool:
fitrm,fitweights = zip(*pool.starmap(_run_timestep,tuples))

Expand Down
2 changes: 1 addition & 1 deletion losoto/operations/interpolatedirections.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def run( soltab, interp_dirs, soltabOut=None, prefix='interp_', ncpu=0):
import sys
sys.exit()
# run the interpolation
ncpu = mp.cpu_count() if ncpu == 0 else ncpu # default use all cores
ncpu = nproc() if ncpu == 0 else ncpu # default use all cores
with mp.Pool(ncpu) as pool:
logging.info('Start interpolation.')
results = pool.starmap(interpolate_directions3d, args)
Expand Down
2 changes: 1 addition & 1 deletion losoto/operations/prefactor_bandpass.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def run(soltab, chanWidth='', outSoltabName='bandpass', BadSBList = '', interpol
if autoFlag:
if ncpu == 0:
import multiprocessing
ncpu = multiprocessing.cpu_count()
ncpu = nproc()
mpm = multiprocManager(ncpu, _flag_amplitudes)
for s in range(nants):
mpm.put([soltab.freq[:], amplitude_arraytmp[:, s, :, :], weights_arraytmp[:, s, :, :],
Expand Down
2 changes: 1 addition & 1 deletion losoto/operations/tec.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def run( soltab, soltabOut, refAnt, maxResidualFlag, maxResidualProp, ncpu ):
selections.append(selection)

if ncpu == 0:
ncpu = mp.cpu_count()
ncpu = nproc()
with mp.Pool(ncpu) as pool:
logging.info('Start TEC fitting.')
results = pool.starmap(fit_tec_to_phases, args)
Expand Down