Skip to content

Commit

Permalink
Merge pull request #59 from trustimaging/nan-detect
Browse files Browse the repository at this point in the history
Add pipeline step to detect Nans in time traces
  • Loading branch information
ccuetom authored Oct 13, 2023
2 parents 31d52cf + bbd71fb commit a77a500
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 63 deletions.
1 change: 1 addition & 0 deletions mosaic/cli/clusters/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def submission_script(name, num_nodes, num_workers, num_threads, node_memory):
# use $(ppn) to use one worker per node and as many threads pr worker as cores in the node
export OMP_NUM_THREADS=$num_threads_per_worker
export OMP_PLACES=cores
export OMP_PROC_BIND=true
# set any environment variables
# for example:
Expand Down
1 change: 1 addition & 0 deletions mosaic/cli/clusters/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def submission_script(name, num_nodes, num_workers, num_threads, node_memory):
# use $(ppn) to use one worker per node and as many threads pr worker as cores in the node
export OMP_NUM_THREADS=$num_workers_per_node \\* $num_threads_per_worker
export OMP_PLACES=cores
export OMP_PROC_BIND=true
# set any environment variables
# for example:
Expand Down
2 changes: 2 additions & 0 deletions mosaic/cli/clusters/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def submission_script(name, num_nodes, num_workers, num_threads, node_memory):
#SBATCH --account=<budget_allocation>
#SBATCH --partition=<partition>
#SBATCH --qos=<quality_of_service>
#SBATCH --exclusive
name={name}
num_nodes={num_nodes}
Expand All @@ -81,6 +82,7 @@ def submission_script(name, num_nodes, num_workers, num_threads, node_memory):
# use $(ppn) to use one worker per node and as many threads pr worker as cores in the node
export OMP_NUM_THREADS=$num_threads_per_worker
export OMP_PLACES=cores
export OMP_PROC_BIND=true
# set any environment variables
# for example:
Expand Down
2 changes: 2 additions & 0 deletions mosaic/runtime/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ async def init_cluster(self, **kwargs):
cmd = (f'srun {ssh_flags} --nodes=1 --ntasks=1 --tasks-per-node={num_cpus} '
f'--cpu-bind=mask_cpu:{cpu_mask} '
f'--oversubscribe '
f'--distribution=block:block '
f'--hint=nomultithread '
f'--nodelist={node_address} '
f'{remote_cmd}')

Expand Down
41 changes: 30 additions & 11 deletions mosaic/runtime/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ async def init_workers(self, **kwargs):
num_threads = num_threads or num_cpus // num_workers
self._num_threads = num_threads

worker_cpus = {}
if self.mode == 'cluster':
if num_workers*num_threads > num_cpus:
raise ValueError('Requested number of CPUs per node (%d - num_workers*num_threads) '
Expand All @@ -97,15 +98,40 @@ async def init_workers(self, **kwargs):
if numa_available:
available_cpus = numa.info.numa_hardware_info()['node_cpu_info']
else:
available_cpus = {0: list(range(num_cpus))}
available_cpus = {worker_index: list(range(num_threads*worker_index,
num_threads*(worker_index+1)))
for worker_index in range(self._num_workers)}

# Eliminate cores corresponding to hyperthreading
for node_index, node_cpus in available_cpus.items():
node_cpus = [each for each in node_cpus if each < num_cpus]
available_cpus[node_index] = node_cpus

available_cpus = sum(list(available_cpus.values()), [])
available_cpus.remove(num_cpus-1)
node_ids = list(available_cpus.keys())
num_nodes = len(available_cpus)
num_cpus_per_node = min([len(cpus) for cpus in available_cpus.values()])

# Distribute cores across workers
if num_nodes >= self._num_workers:
nodes_per_worker = num_nodes // self._num_workers
for worker_index in range(self._num_workers):
node_s = worker_index*nodes_per_worker
node_e = min((worker_index+1)*nodes_per_worker, num_nodes)
worker_cpus[worker_index] = sum([available_cpus[node_index]
for node_index in node_ids[node_s:node_e]], [])

else:
workers_per_node = self._num_workers // num_nodes
cpus_per_worker = num_cpus_per_node // workers_per_node
for node_index, node_cpus in available_cpus.items():
worker_s = node_index*workers_per_node
worker_e = min((node_index+1)*workers_per_node, self._num_workers)
worker_chunk = {}
for worker_index in range(worker_s, worker_e):
cpu_s = worker_index*cpus_per_worker
cpu_e = min((worker_index+1)*cpus_per_worker, len(node_cpus))
worker_chunk[worker_index] = node_cpus[cpu_s:cpu_e]
worker_cpus.update(worker_chunk)

for worker_index in range(self._num_workers):
indices = self.indices + (worker_index,)
Expand All @@ -118,17 +144,10 @@ def start_worker(*args, **extra_kwargs):

mosaic.init('worker', *args, **kwargs, wait=True)

worker_cpus = None
if self.mode == 'cluster':
start_cpu = worker_index * num_threads
end_cpu = min((worker_index + 1) * num_threads, len(available_cpus))

worker_cpus = available_cpus[start_cpu:end_cpu]

worker_proxy = RuntimeProxy(name='worker', indices=indices)
worker_subprocess = subprocess(start_worker)(name=worker_proxy.uid,
daemon=False,
cpu_affinity=worker_cpus)
cpu_affinity=worker_cpus.get(worker_index, None))
worker_subprocess.start_process()
worker_proxy.subprocess = worker_subprocess

Expand Down
3 changes: 3 additions & 0 deletions mosaic/types/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def __getattr__(self, item):
def __getitem__(self, item):
return self._get(item)

def __delitem__(self, item):
self.__dict__['_content'].__delitem__(item)

def get(self, item, default=None):
"""
Returns an item from the Struct or a default value if it is not found.
Expand Down
2 changes: 1 addition & 1 deletion stride/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async def loop(worker, shot_id):
if dump:
optimiser.variable.dump(path=problem.output_folder,
project_name=problem.name,
version=iteration.abs_id)
version=iteration.abs_id+1)

logger.perf('Done iteration %d (out of %d), '
'block %d (out of %d) - Total loss %e' %
Expand Down
6 changes: 6 additions & 0 deletions stride/optimisation/pipelines/default_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class ProcessWavelets(Pipeline):
def __init__(self, steps=None, no_grad=False, **kwargs):
steps = steps or []

if kwargs.pop('check_traces', True):
steps.append('check_traces')

super().__init__(steps, no_grad=no_grad, **kwargs)


Expand All @@ -43,6 +46,9 @@ class ProcessTraces(Pipeline):
def __init__(self, steps=None, no_grad=False, **kwargs):
steps = steps or []

if kwargs.pop('check_traces', True):
steps.append('check_traces')

if kwargs.pop('filter_offsets', False):
steps.append(('filter_offsets', False)) # do not raise if not present

Expand Down
4 changes: 3 additions & 1 deletion stride/optimisation/pipelines/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .mask import Mask
from .mute_traces import MuteTraces
from .clip import Clip
from .check_traces import CheckTraces


steps_registry = {
Expand All @@ -17,7 +18,8 @@
'norm_per_trace': NormPerTrace,
'norm_field': NormField,
'smooth_field': SmoothField,
'mask' : Mask,
'mask': Mask,
'mute_traces': MuteTraces,
'clip': Clip,
'check_traces': CheckTraces,
}
82 changes: 82 additions & 0 deletions stride/optimisation/pipelines/steps/check_traces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import numpy as np

import mosaic

from ....core import Operator


class CheckTraces(Operator):
"""
Check a set of time traces for NaNs, Inf, etc.
Parameters
----------
raise_incorrect : bool, optional
Whether to raise an exception if there are incorrect traces.
Defaults to True.
filter_incorrect : bool, optional
Whether to filter out traces that are incorrect. Defaults to False.
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)

self.raise_incorrect = kwargs.pop('raise_incorrect', True)
self.filter_incorrect = kwargs.pop('filter_incorrect', False)

self._num_traces = None

def forward(self, *traces, **kwargs):
self._num_traces = len(traces)

filtered = []
for each in traces:
filtered.append(self._apply(each, **kwargs))

if len(traces) > 1:
return tuple(filtered)

else:
return filtered[0]

def adjoint(self, *d_traces, **kwargs):
d_traces = d_traces[:self._num_traces]

self._num_traces = None

if len(d_traces) > 1:
return d_traces

else:
return d_traces[0]

def _apply(self, traces, **kwargs):
raise_incorrect = kwargs.pop('raise_incorrect', self.raise_incorrect)
filter_incorrect = kwargs.pop('filter_incorrect', self.filter_incorrect)

out_traces = traces.alike(name='checked_%s' % traces.name)
filtered = traces.extended_data.copy()

is_nan = np.any(np.isnan(filtered), axis=-1)
is_inf = np.any(np.isinf(filtered), axis=-1)

if np.any(is_nan) or np.any(is_inf):
msg = 'Nan or inf detected in %s' % traces.name

problem = kwargs.pop('problem', None)
shot_id = problem.shot.id if problem is not None else kwargs.pop('shot_id', None)
if shot_id is not None:
msg = '(ShotID %d) ' % shot_id + msg

if raise_incorrect:
raise RuntimeError(msg)
else:
mosaic.logger().warn(msg)

if filter_incorrect:
filtered[is_nan | is_inf, :] = 0

out_traces.extended_data[:] = filtered

return out_traces
Loading

0 comments on commit a77a500

Please sign in to comment.