Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline step to detect Nans in time traces #59

Merged
merged 9 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mosaic/cli/clusters/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def submission_script(name, num_nodes, num_workers, num_threads, node_memory):
# use $(ppn) to use one worker per node and as many threads pr worker as cores in the node
export OMP_NUM_THREADS=$num_threads_per_worker
export OMP_PLACES=cores
export OMP_PROC_BIND=true

# set any environment variables
# for example:
Expand Down
1 change: 1 addition & 0 deletions mosaic/cli/clusters/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def submission_script(name, num_nodes, num_workers, num_threads, node_memory):
# use $(ppn) to use one worker per node and as many threads pr worker as cores in the node
export OMP_NUM_THREADS=$num_workers_per_node \\* $num_threads_per_worker
export OMP_PLACES=cores
export OMP_PROC_BIND=true

# set any environment variables
# for example:
Expand Down
2 changes: 2 additions & 0 deletions mosaic/cli/clusters/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def submission_script(name, num_nodes, num_workers, num_threads, node_memory):
#SBATCH --account=<budget_allocation>
#SBATCH --partition=<partition>
#SBATCH --qos=<quality_of_service>
#SBATCH --exclusive

name={name}
num_nodes={num_nodes}
Expand All @@ -81,6 +82,7 @@ def submission_script(name, num_nodes, num_workers, num_threads, node_memory):
# use $(ppn) to use one worker per node and as many threads pr worker as cores in the node
export OMP_NUM_THREADS=$num_threads_per_worker
export OMP_PLACES=cores
export OMP_PROC_BIND=true

# set any environment variables
# for example:
Expand Down
2 changes: 2 additions & 0 deletions mosaic/runtime/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ async def init_cluster(self, **kwargs):
cmd = (f'srun {ssh_flags} --nodes=1 --ntasks=1 --tasks-per-node={num_cpus} '
f'--cpu-bind=mask_cpu:{cpu_mask} '
f'--oversubscribe '
f'--distribution=block:block '
f'--hint=nomultithread '
f'--nodelist={node_address} '
f'{remote_cmd}')

Expand Down
41 changes: 30 additions & 11 deletions mosaic/runtime/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ async def init_workers(self, **kwargs):
num_threads = num_threads or num_cpus // num_workers
self._num_threads = num_threads

worker_cpus = {}
if self.mode == 'cluster':
if num_workers*num_threads > num_cpus:
raise ValueError('Requested number of CPUs per node (%d - num_workers*num_threads) '
Expand All @@ -97,15 +98,40 @@ async def init_workers(self, **kwargs):
if numa_available:
available_cpus = numa.info.numa_hardware_info()['node_cpu_info']
else:
available_cpus = {0: list(range(num_cpus))}
available_cpus = {worker_index: list(range(num_threads*worker_index,
num_threads*(worker_index+1)))
for worker_index in range(self._num_workers)}

# Eliminate cores corresponding to hyperthreading
for node_index, node_cpus in available_cpus.items():
node_cpus = [each for each in node_cpus if each < num_cpus]
available_cpus[node_index] = node_cpus

available_cpus = sum(list(available_cpus.values()), [])
available_cpus.remove(num_cpus-1)
node_ids = list(available_cpus.keys())
num_nodes = len(available_cpus)
num_cpus_per_node = min([len(cpus) for cpus in available_cpus.values()])

# Distribute cores across workers
if num_nodes >= self._num_workers:
nodes_per_worker = num_nodes // self._num_workers
for worker_index in range(self._num_workers):
node_s = worker_index*nodes_per_worker
node_e = min((worker_index+1)*nodes_per_worker, num_nodes)
worker_cpus[worker_index] = sum([available_cpus[node_index]
for node_index in node_ids[node_s:node_e]], [])

else:
workers_per_node = self._num_workers // num_nodes
cpus_per_worker = num_cpus_per_node // workers_per_node
for node_index, node_cpus in available_cpus.items():
worker_s = node_index*workers_per_node
worker_e = min((node_index+1)*workers_per_node, self._num_workers)
worker_chunk = {}
for worker_index in range(worker_s, worker_e):
cpu_s = worker_index*cpus_per_worker
cpu_e = min((worker_index+1)*cpus_per_worker, len(node_cpus))
worker_chunk[worker_index] = node_cpus[cpu_s:cpu_e]
worker_cpus.update(worker_chunk)

for worker_index in range(self._num_workers):
indices = self.indices + (worker_index,)
Expand All @@ -118,17 +144,10 @@ def start_worker(*args, **extra_kwargs):

mosaic.init('worker', *args, **kwargs, wait=True)

worker_cpus = None
if self.mode == 'cluster':
start_cpu = worker_index * num_threads
end_cpu = min((worker_index + 1) * num_threads, len(available_cpus))

worker_cpus = available_cpus[start_cpu:end_cpu]

worker_proxy = RuntimeProxy(name='worker', indices=indices)
worker_subprocess = subprocess(start_worker)(name=worker_proxy.uid,
daemon=False,
cpu_affinity=worker_cpus)
cpu_affinity=worker_cpus.get(worker_index, None))
worker_subprocess.start_process()
worker_proxy.subprocess = worker_subprocess

Expand Down
3 changes: 3 additions & 0 deletions mosaic/types/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def __getattr__(self, item):
def __getitem__(self, item):
return self._get(item)

def __delitem__(self, item):
self.__dict__['_content'].__delitem__(item)

def get(self, item, default=None):
"""
Returns an item from the Struct or a default value if it is not found.
Expand Down
2 changes: 1 addition & 1 deletion stride/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async def loop(worker, shot_id):
if dump:
optimiser.variable.dump(path=problem.output_folder,
project_name=problem.name,
version=iteration.abs_id)
version=iteration.abs_id+1)

logger.perf('Done iteration %d (out of %d), '
'block %d (out of %d) - Total loss %e' %
Expand Down
6 changes: 6 additions & 0 deletions stride/optimisation/pipelines/default_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class ProcessWavelets(Pipeline):
def __init__(self, steps=None, no_grad=False, **kwargs):
steps = steps or []

if kwargs.pop('check_traces', True):
steps.append('check_traces')

super().__init__(steps, no_grad=no_grad, **kwargs)


Expand All @@ -43,6 +46,9 @@ class ProcessTraces(Pipeline):
def __init__(self, steps=None, no_grad=False, **kwargs):
steps = steps or []

if kwargs.pop('check_traces', True):
steps.append('check_traces')

if kwargs.pop('filter_offsets', False):
steps.append(('filter_offsets', False)) # do not raise if not present

Expand Down
4 changes: 3 additions & 1 deletion stride/optimisation/pipelines/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .mask import Mask
from .mute_traces import MuteTraces
from .clip import Clip
from .check_traces import CheckTraces


steps_registry = {
Expand All @@ -17,7 +18,8 @@
'norm_per_trace': NormPerTrace,
'norm_field': NormField,
'smooth_field': SmoothField,
'mask' : Mask,
'mask': Mask,
'mute_traces': MuteTraces,
'clip': Clip,
'check_traces': CheckTraces,
}
82 changes: 82 additions & 0 deletions stride/optimisation/pipelines/steps/check_traces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import numpy as np

import mosaic

from ....core import Operator


class CheckTraces(Operator):
"""
Check a set of time traces for NaNs, Inf, etc.

Parameters
----------
raise_incorrect : bool, optional
Whether to raise an exception if there are incorrect traces.
Defaults to True.
filter_incorrect : bool, optional
Whether to filter out traces that are incorrect. Defaults to False.

"""

def __init__(self, **kwargs):
super().__init__(**kwargs)

self.raise_incorrect = kwargs.pop('raise_incorrect', True)
self.filter_incorrect = kwargs.pop('filter_incorrect', False)

self._num_traces = None

def forward(self, *traces, **kwargs):
self._num_traces = len(traces)

filtered = []
for each in traces:
filtered.append(self._apply(each, **kwargs))

if len(traces) > 1:
return tuple(filtered)

else:
return filtered[0]

def adjoint(self, *d_traces, **kwargs):
d_traces = d_traces[:self._num_traces]

self._num_traces = None

if len(d_traces) > 1:
return d_traces

else:
return d_traces[0]

def _apply(self, traces, **kwargs):
raise_incorrect = kwargs.pop('raise_incorrect', self.raise_incorrect)
filter_incorrect = kwargs.pop('filter_incorrect', self.filter_incorrect)

out_traces = traces.alike(name='checked_%s' % traces.name)
filtered = traces.extended_data.copy()

is_nan = np.any(np.isnan(filtered), axis=-1)
is_inf = np.any(np.isinf(filtered), axis=-1)

if np.any(is_nan) or np.any(is_inf):
msg = 'Nan or inf detected in %s' % traces.name

problem = kwargs.pop('problem', None)
shot_id = problem.shot.id if problem is not None else kwargs.pop('shot_id', None)
if shot_id is not None:
msg = '(ShotID %d) ' % shot_id + msg

if raise_incorrect:
raise RuntimeError(msg)
else:
mosaic.logger().warn(msg)

if filter_incorrect:
filtered[is_nan | is_inf, :] = 0

out_traces.extended_data[:] = filtered

return out_traces
Loading
Loading