Skip to content

Commit

Permalink
Merge pull request #822 from thehrh/propagate_profile
Browse files Browse the repository at this point in the history
Allow reporting timing data for several pipelines at once (from any `Detectors` or `DistributionMaker` instance) ...
  • Loading branch information
JanWeldert authored Sep 2, 2024
2 parents a35bb25 + 7327847 commit 7753dea
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 30 deletions.
16 changes: 12 additions & 4 deletions pisa/core/detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from collections import OrderedDict
import inspect
from itertools import product
import os
from tabulate import tabulate
from copy import deepcopy

import numpy as np

from pisa import ureg
from pisa.core.map import MapSet
from pisa.core.pipeline import Pipeline
from pisa.core.distribution_maker import DistributionMaker
from pisa.core.param import ParamSet, Param
Expand Down Expand Up @@ -66,7 +64,7 @@ def __init__(self, pipelines, label=None, set_livetime_from_data=True, profile=F
self._distribution_makers , self.det_names = [] , []
for pipeline in pipelines:
if not isinstance(pipeline, Pipeline):
pipeline = Pipeline(pipeline)
pipeline = Pipeline(pipeline, profile=profile)

name = pipeline.detector_name
if name in self.det_names:
Expand Down Expand Up @@ -111,7 +109,17 @@ def tabulate(self, tablefmt="plain"):

def __iter__(self):
return iter(self._distribution_makers)


def report_profile(self, detailed=False, format_num_kwargs=None):
"""Report timing information on contained distribution makers.
See `Pipeline.report_profile` for details.
"""
for distribution_maker in self._distribution_makers:
print(distribution_maker.detector_name + ':')
distribution_maker.report_profile(
detailed=detailed, format_num_kwargs=format_num_kwargs
)

@property
def profile(self):
return self._profile
Expand Down
35 changes: 32 additions & 3 deletions pisa/core/distribution_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from collections import OrderedDict
from collections.abc import Mapping
import inspect
from itertools import product
import os
from tabulate import tabulate

Expand Down Expand Up @@ -104,6 +103,11 @@ def __init__(self, pipelines, label=None, set_livetime_from_data=True, profile=F
for pipeline in pipelines:
if not isinstance(pipeline, Pipeline):
pipeline = Pipeline(pipeline, profile=profile)
else:
if profile:
# Only propagate if set to `True` (don't allow negative
# default to negate any original choice for the instance)
pipeline.profile = profile
self._pipelines.append(pipeline)

data_run_livetime = None
Expand Down Expand Up @@ -215,6 +219,15 @@ def tabulate(self, tablefmt="plain"):
def __iter__(self):
return iter(self._pipelines)

def report_profile(self, detailed=False, format_num_kwargs=None):
"""Report timing information on contained pipelines.
See `Pipeline.report_profile` for details.
"""
for pipeline in self.pipelines:
pipeline.report_profile(
detailed=detailed, format_num_kwargs=format_num_kwargs
)

@property
def profile(self):
return self._profile
Expand All @@ -225,8 +238,8 @@ def profile(self, value):
pipeline.profile = value
self._profile = value


def run(self):
"""Run all pipelines"""
for pipeline in self:
pipeline.run()

Expand Down Expand Up @@ -541,6 +554,22 @@ def test_DistributionMaker():
#current_hier = new_hier
#current_mat = new_mat

# test profile flag
p_cfg = 'settings/pipeline/example.cfg'
p = Pipeline(p_cfg, profile=True)
dm = DistributionMaker(pipelines=p)
# default init using Pipeline instance shouldn't negate
assert dm.pipelines[0].profile
# but explicit request should
dm.profile = False
assert not dm.pipelines[0].profile
# now init from cfg path and request profile
dm = DistributionMaker(pipelines=p_cfg, profile=True)
assert dm.pipelines[0].profile
# explicitly request no profile
dm = DistributionMaker(pipelines=p_cfg, profile=False)
assert not dm.pipelines[0].profile


def parse_args():
"""Get command line arguments"""
Expand Down Expand Up @@ -626,4 +655,4 @@ def main(return_outputs=False):


if __name__ == '__main__':
distribution_maker, outputs = main(return_outputs=True) # pylint: disable=invalid-name
distribution_maker, outputs = main(return_outputs=True)
81 changes: 78 additions & 3 deletions pisa/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

from argparse import ArgumentParser
from collections import OrderedDict
from collections.abc import Mapping
from configparser import NoSectionError
from copy import deepcopy
from importlib import import_module
from itertools import product
from inspect import getsource
import os
from tabulate import tabulate
from time import time
import traceback

import numpy as np
Expand All @@ -30,6 +32,7 @@
from pisa.core.binning import MultiDimBinning
from pisa.utils.config_parser import PISAConfigParser, parse_pipeline_config
from pisa.utils.fileio import mkdir
from pisa.utils.format import format_times
from pisa.utils.hash import hash_obj
from pisa.utils.log import logging, set_verbosity
from pisa.utils.profiler import profile
Expand Down Expand Up @@ -107,6 +110,9 @@ def __init__(self, config, profile=False):
self.output_key = config['pipeline']['output_key']

self._profile = profile
self._setup_times = []
self._run_times = []
self._get_outputs_times = []

self._stages = []
self._config = config
Expand Down Expand Up @@ -140,9 +146,46 @@ def tabulate(self, tablefmt="plain"):
table[-1] += [len(s.params.fixed), len(s.params.free)]
return tabulate(table, headers, tablefmt=tablefmt, colalign=colalign)

def report_profile(self, detailed=False):
def report_profile(self, detailed=False, format_num_kwargs=None):
"""Report timing information on pipeline and contained services
Parameters
----------
detailed : bool, default False
Whether to increase level of detail
format_num_kwargs : dict, optional
Dictionary containing arguments passed to `utils.format.format_num`.
Will display each number with three decimal digits by default.
"""
if not self.profile:
# Report warning only at the pipeline level, which is what the
# typical user should come across. Assume that users calling
# `report_profile` on a `Stage` instance directly know what they're
# doing.
logging.warn(
'`profile` is set to False. Results may not show the expected '
'numbers of function calls.'
)
if format_num_kwargs is None:
format_num_kwargs = {
'precision': 1e-3, 'fmt': 'full', 'trailing_zeros': True
}
assert isinstance(format_num_kwargs, Mapping)
print(f'Pipeline: {self.name}')
for func_str, times in [
('- setup: ', self._setup_times),
('- run: ', self._run_times),
('- get_outputs: ', self._get_outputs_times)
]:
print(func_str,
format_times(times=times,
nindent_detailed=len(func_str) + 1,
detailed=detailed, **format_num_kwargs)
)
print('Individual services:')
for stage in self.stages:
stage.report_profile(detailed=detailed)
stage.report_profile(detailed=detailed, **format_num_kwargs)

@property
def profile(self):
Expand Down Expand Up @@ -315,7 +358,19 @@ def _init_stages(self):

self.setup()

def get_outputs(self, output_binning=None, output_key=None):
def get_outputs(self, **get_outputs_kwargs):
"""Wrapper around `_get_outputs`. The latter might
have quite some overhead compared to `run` alone"""
if self.profile:
start_t = time()
outputs = self._get_outputs(**get_outputs_kwargs)
end_t = time()
self._get_outputs_times.append(end_t - start_t)
else:
outputs = self._get_outputs(**get_outputs_kwargs)
return outputs

def _get_outputs(self, output_binning=None, output_key=None):
"""Get MapSet output"""


Expand Down Expand Up @@ -393,12 +448,32 @@ def _add_rotated(self, paramset:ParamSet, suppress_warning=False):
return success

def run(self):
"""Wrapper around `_run_function`"""
if self.profile:
start_t = time()
self._run_function()
end_t = time()
self._run_times.append(end_t - start_t)
else:
self._run_function()

def _run_function(self):
"""Run the pipeline to compute"""
for stage in self.stages:
logging.debug(f"Working on stage {stage.stage_name}.{stage.service_name}")
stage.run()

def setup(self):
"""Wrapper around `_setup_function`"""
if self.profile:
start_t = time()
self._setup_function()
end_t = time()
self._setup_times.append(end_t - start_t)
else:
self._setup_function()

def _setup_function(self):
"""Setup (reset) all stages"""
self.data = ContainerSet(self.name)
for stage in self.stages:
Expand Down
30 changes: 14 additions & 16 deletions pisa/core/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import numpy as np

from pisa.core.container import ContainerSet
from pisa.utils.format import format_times
from pisa.utils.log import logging
from pisa.core.param import ParamSelector
from pisa.utils.format import arg_str_seq_none
Expand Down Expand Up @@ -138,23 +139,20 @@ def __init__(
def __repr__(self):
return 'Stage "%s"'%(self.__class__.__name__)

def report_profile(self, detailed=False):
def format(times):
tot = np.sum(times)
n = len(times)
ave = 0. if n == 0 else tot/n
return 'Total time %.5f s, n calls: %i, time/call: %.5f s'%(tot, n, ave)

def report_profile(self, detailed=False, **format_num_kwargs):
"""Report timing information on calls to setup, compute, and apply
"""
print(self.stage_name, self.service_name)
print('- setup: ', format(self.setup_times))
if detailed:
print(' Individual runs: ', ', '.join(['%i: %.3f s' % (i, t) for i, t in enumerate(self.setup_times)]))
print('- calc: ', format(self.calc_times))
if detailed:
print(' Individual runs: ', ', '.join(['%i: %.3f s' % (i, t) for i, t in enumerate(self.calc_times)]))
print('- apply: ', format(self.apply_times))
if detailed:
print(' Individual runs: ', ', '.join(['%i: %.3f s' % (i, t) for i, t in enumerate(self.apply_times)]))
for func_str, times in [
('- setup: ', self.setup_times),
('- compute: ', self.calc_times),
('- apply: ', self.apply_times)
]:
print(func_str,
format_times(times=times,
nindent_detailed=len(func_str) + 1,
detailed=detailed, **format_num_kwargs)
)

def select_params(self, selections, error_on_missing=False):
"""Apply the `selections` to contained ParamSet.
Expand Down
19 changes: 15 additions & 4 deletions pisa/stages/utils/kde.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from copy import deepcopy

import numpy as np
from time import time

from pisa.core.stage import Stage
from pisa.core.binning import MultiDimBinning, OneDimBinning
Expand Down Expand Up @@ -132,11 +133,21 @@ def setup_function(self):

@profile
def apply(self):
# this is special, we want the actual event weights in the kde
# therefor we're overwritting the apply function
# normally in a stage you would implement the `apply_function` method
# and not the `apply` method!
"""This is special, we want the actual event weights in the kde
therefor we're overwritting the apply function
normally in a stage you would implement the `apply_function` method
and not the `apply` method! We also have to reimplement the profiling
functionality in apply of the Base class"""

if self.profile:
start_t = time()
self.apply_function()
end_t = time()
self.apply_times.append(end_t - start_t)
else:
self.apply_function()

def apply_function(self):
for container in self.data:

if self.stash_valid:
Expand Down
Loading

0 comments on commit 7753dea

Please sign in to comment.