Skip to content

Commit

Permalink
906 Fix non-interpolated deconvolution
Browse files Browse the repository at this point in the history
#906

[author: gonzaponte]

The output of the deconvolution without interpolation produces an
image with a different size than with interpolation.

Fixes #905.

[reviewer: jwaiton]

This PR resolves a difficult to spot issue within the interpolation
and improves the clarity of the interpolation process in
beersheba. Good work!
  • Loading branch information
jwaiton authored and carhc committed Nov 13, 2024
2 parents bec62bd + 850f8b5 commit f2e52af
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 18 deletions.
15 changes: 9 additions & 6 deletions invisible_cities/reco/deconv_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from scipy import ndimage as ndi

from ..core .core_functions import shift_to_bin_centers
from .. core.core_functions import binedges_from_bincenters
from ..core .core_functions import in_range
from ..core .configure import check_annotations

Expand Down Expand Up @@ -210,15 +211,17 @@ def deconvolution_input(data : Tuple[np.ndarray, ...],
weight : np.ndarray
) -> Tuple[np.ndarray, Tuple[np.ndarray, ...]]:

ranges = [[coord.min() - 1.5 * sw, coord.max() + 1.5 * sw] for coord, sw in zip(data, sample_width)]
if inter_method in (InterpolationMethod.linear, InterpolationMethod.cubic, InterpolationMethod.nearest):
allbins = [np.arange(rang[0], rang[1] + np.finfo(np.float32).eps, sw) for rang, sw in zip(ranges, sample_width)]
Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight)
elif inter_method is InterpolationMethod.nointerpolation:
eps = np.finfo(np.float32).eps
ranges = [ [ coord.min() - 1.5 * sw
, coord.max() + 1.5 * sw + eps]
for coord, sw in zip(data, sample_width) ]
if inter_method is InterpolationMethod.nointerpolation:
allbins = [grid[in_range(grid, *rang)] for rang, grid in zip(ranges, det_grid)]
allbins = [binedges_from_bincenters(bins) for bins in allbins]
Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight)
else:
raise ValueError(f'inter_method {inter_method} is not a valid interpolatin mode.')
allbins = [np.arange(*rang, sw) for rang, sw in zip(ranges, sample_width)]
Hs, edges = np.histogramdd(data, bins=allbins, normed=False, weights=weight)

inter_points = np.meshgrid(*(shift_to_bin_centers(edge) for edge in edges), indexing='ij')
inter_points = tuple (inter_p.flatten() for inter_p in inter_points)
Expand Down
65 changes: 53 additions & 12 deletions invisible_cities/reco/deconv_functions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from pytest import mark
from pytest import raises
from pytest import fixture

from hypothesis import given
from hypothesis.strategies import floats
Expand Down Expand Up @@ -140,31 +141,71 @@ def test_interpolate_signal():
assert np.allclose(grid , sorted(set(inter_position[1])))


def test_deconvolution_input(data_hdst, data_hdst_deconvolved):
@fixture(scope="session")
def new_grid_1mm():
bin_sizes = (1., 1.)
det_db = DataSiPM('new', 0)
det_grid = [ np.arange( det_db[var].min() - bs/2
, det_db[var].max() + bs/2 + np.finfo(np.float32).eps, bs)
for var, bs in zip("X Y".split(), bin_sizes)]
return det_grid


@fixture(scope="session")
def data_hdst_first_peak(data_hdst):
event = 3021916
peak = 0
hdst = ( load_dst(data_hdst, 'RECO', 'Events')
.groupby("event npeak".split())
.get_group((event, peak))
.groupby(['X', 'Y']).Q.sum().reset_index()
.loc[lambda df: df.Q > 40]
)
return hdst


def test_deconvolution_input_exact_result(data_hdst_first_peak, data_hdst_deconvolved, new_grid_1mm):
"""
Compare the output of the deconvolution with a reference output
stored in a file.
"""
hdst = data_hdst_first_peak
ref_interpolation = np.load(data_hdst_deconvolved)

hdst = load_dst(data_hdst, 'RECO', 'Events')
h = hdst[(hdst.event == 3021916) & (hdst.npeak == 0)]
h = h.groupby(['X', 'Y']).Q.sum().reset_index()
h = h[h.Q > 40]

det_db = DataSiPM('new', 0)
det_grid = [np.arange(det_db[var].min() + bs/2, det_db[var].max() - bs/2 + np.finfo(np.float32).eps, bs)
for var, bs in zip(['X', 'Y'], [1., 1.])]
interpolator = deconvolution_input([10., 10.], det_grid, InterpolationMethod.cubic)
inter = interpolator((h.X, h.Y), h.Q)
interpolator = deconvolution_input([10., 10.], new_grid_1mm, InterpolationMethod.cubic)
inter = interpolator((hdst.X, hdst.Y), hdst.Q)

assert np.allclose(ref_interpolation['e_inter'], inter[0])
assert np.allclose(ref_interpolation['x_inter'], inter[1][0])
assert np.allclose(ref_interpolation['y_inter'], inter[1][1])


@mark.parametrize("interp_method", InterpolationMethod)
def test_deconvolution_input_interpolation_method(data_hdst_first_peak, new_grid_1mm, interp_method):
"""
Check that it runs with all interpolation methods.
Select one event/peak from input and apply a cut to obtain an
input image that results in round numbers.
"""
hdst = data_hdst_first_peak
interpolator = deconvolution_input([10., 10.], new_grid_1mm, interp_method)
output = interpolator((hdst.X, hdst.Y), hdst.Q)

shape = 120, 120
nelements = shape[0] * shape[1]
assert output[0].shape == shape
assert len(output[1]) == 2
assert output[1][0].shape == (nelements,)
assert output[1][1].shape == (nelements,)


@mark.parametrize("interp_method", InterpolationMethod.__members__)
def test_deconvolution_input_interpolation_method(data_hdst, data_hdst_deconvolved, interp_method):
def test_deconvolution_input_wrong_interpolation_method_raises(interp_method):
with raises(ValueError):
deconvolution_input([10., 10.], [1., 1.], interp_method)


#TODO: use data_hdst_first_peak and new_grid_1mm here too, if possible
def test_deconvolve(data_hdst, data_hdst_deconvolved):
ref_interpolation = np.load (data_hdst_deconvolved)

Expand Down

0 comments on commit f2e52af

Please sign in to comment.