diff --git a/invisible_cities/cities/components.py b/invisible_cities/cities/components.py index b3487087d..9ccc6e7ec 100644 --- a/invisible_cities/cities/components.py +++ b/invisible_cities/cities/components.py @@ -554,7 +554,7 @@ def wf_from_files(paths, wf_type): def pmap_from_files(paths): for path in paths: try: - pmaps = load_pmaps(path) + pmaps = load_pmaps(path, lazy=True) except tb.exceptions.NoSuchNodeError: continue @@ -567,11 +567,11 @@ def pmap_from_files(paths): except IndexError: continue - check_lengths(event_info, pmaps) - - for evtinfo in event_info: + for evtinfo, (evt, pmap) in zip(event_info, pmaps): event_number, timestamp = evtinfo.fetch_all_fields() - yield dict(pmap=pmaps[event_number], run_number=run_number, + if event_number != evt: + raise RuntimeError("Inconsistent data: event number mismatch") + yield dict(pmap=pmap, run_number=run_number, event_number=event_number, timestamp=timestamp) diff --git a/invisible_cities/io/pmaps_io.py b/invisible_cities/io/pmaps_io.py index bd9ae7582..aa41f7840 100644 --- a/invisible_cities/io/pmaps_io.py +++ b/invisible_cities/io/pmaps_io.py @@ -4,6 +4,8 @@ import tables as tb import pandas as pd +from .. core.exceptions import InvalidInputFileStructure + from .. evm .pmaps import PMTResponses from .. evm .pmaps import SiPMResponses from .. evm .pmaps import S1 @@ -78,8 +80,23 @@ def _make_tables(hdf5_file, compression): return pmp_tables -def load_pmaps_as_df(filename): +def check_file_integrity(file): + events_run = file.root.Run .events.read(field="evt_number") + events_pmaps_s1 = file.root.PMAPS.S1 .read(field="event") + events_pmaps_s2 = file.root.PMAPS.S2 .read(field="event") + if set(events_run) != set(events_pmaps_s1.tolist() + events_pmaps_s2.tolist()): + raise InvalidInputFileStructure("Inconsistent data: event number mismatch") + + +def load_pmaps_as_df(filename, lazy=False): + loader = load_pmaps_as_df_lazy if lazy else load_pmaps_as_df_eager + return loader(filename) + + +def load_pmaps_as_df_eager(filename): with tb.open_file(filename, 'r') as h5f: + check_file_integrity(h5f) + pmap = h5f.root.PMAPS to_df = pd.DataFrame.from_records return (to_df(pmap.S1 .read()), @@ -89,6 +106,24 @@ def load_pmaps_as_df(filename): to_df(pmap.S2Pmt.read()) if 'S2Pmt' in pmap else None) +def load_pmaps_as_df_lazy(filename, skip=0, n=None): + def read_event(table, event): + if table is None: return None + records = table.read_where(f"event=={event}") + return pd.DataFrame.from_records(records) + + tables = "S1 S2 S2Si S1Pmt S2Pmt".split() + with tb.open_file(filename, 'r') as h5f: + check_file_integrity(h5f) + + events = h5f.root.Run.events.read(field="evt_number") + tables = [getattr(h5f.root.PMAPS, table, None) for table in tables] + n = events.size if n is None else n + events = events[skip : skip+n] + for event in events: + yield tuple(read_event(table, event) for table in tables) + + # Hack fix to allow loading pmaps without individual pmts. Used in load_pmaps def _build_ipmtdf_from_sumdf(sumdf): ipmtdf = sumdf.copy() @@ -96,9 +131,15 @@ def _build_ipmtdf_from_sumdf(sumdf): ipmtdf['npmt'] = -1 return ipmtdf -def load_pmaps(filename): + +def load_pmaps(filename, lazy=False, **kwargs): + loader = load_pmaps_lazy if lazy else load_pmaps_eager + return loader(filename, **kwargs) + + +def load_pmaps_eager(filename): pmap_dict = {} - s1df, s2df, sidf, s1pmtdf, s2pmtdf = load_pmaps_as_df(filename) + s1df, s2df, sidf, s1pmtdf, s2pmtdf = load_pmaps_as_df_eager(filename) # Hack fix to allow loading pmaps without individual pmts if s1pmtdf is None: s1pmtdf = _build_ipmtdf_from_sumdf(s1df) if s2pmtdf is None: s2pmtdf = _build_ipmtdf_from_sumdf(s2df) @@ -141,6 +182,19 @@ def load_pmaps(filename): return pmap_dict +def load_pmaps_lazy(filename, skip=0, n=None): + for (s1df, s2df, sidf, s1pmtdf, s2pmtdf) in load_pmaps_as_df_lazy(filename, skip, n): + # Hack fix to allow loading pmaps without individual pmts + if s1pmtdf is None: s1pmtdf = _build_ipmtdf_from_sumdf(s1df) + if s2pmtdf is None: s2pmtdf = _build_ipmtdf_from_sumdf(s2df) + + event_number = np.concatenate([s1df.event, s2df.event])[0] + + s1s = s1s_from_df(s1df, s1pmtdf) + s2s = s2s_from_df(s2df, s2pmtdf, sidf) + yield event_number, PMap(s1s, s2s) + + def build_pmt_responses(pmtdf, ipmtdf): times = pmtdf.time.values try: diff --git a/invisible_cities/io/pmaps_io_test.py b/invisible_cities/io/pmaps_io_test.py index 18cb144e2..239aa4745 100644 --- a/invisible_cities/io/pmaps_io_test.py +++ b/invisible_cities/io/pmaps_io_test.py @@ -1,32 +1,132 @@ import os +import shutil from collections import namedtuple from collections import defaultdict from pytest import mark from pytest import approx - -from hypothesis import given -from hypothesis import strategies as stg +from pytest import fixture +from pytest import raises import tables as tb import numpy as np import pandas as pd +from ..core.exceptions import InvalidInputFileStructure from ..core.testing_utils import assert_PMap_equality from ..core.testing_utils import assert_dataframes_equal from ..core.testing_utils import exactly from ..evm .pmaps import S1 from ..evm .pmaps import S2 -from ..evm .pmaps_test import pmaps as pmap_gen +from ..evm .pmaps import PMTResponses +from ..evm .pmaps import SiPMResponses +from ..evm .pmaps import PMap from . import pmaps_io as pmpio +from . import run_and_event_io as reio + +from typing import Generator +from typing import Mapping pmaps_data = namedtuple("pmaps_data", """evt_numbers peak_numbers evt_numbers_pmt peak_numbers_pmt evt_numbers_sipm peak_numbers_sipm - times npmts nsipms + times bwidths npmts nsipms enes enes_pmt enes_sipm""") +@fixture(scope="session") +def two_pmaps_evm(): + """ + Generate two pmaps (two events) with random data. We fix the RNG + state for reproducibility. We make minimal changes to the sensor + ID and wfs between pmts and sipms to keep this fixture to a + minimum since the data values are irrelevant. We use different + number of S1s and S2s to cover more cases. + """ + state = np.random.get_state() + np.random.seed(123456789) + + pmaps = {} + for i in range(2): + n_sensors = 5 + n_samples = 50 + n_s1 = 1 + i + n_s2 = 2 - i + + peaks = [] + for with_sipms in [False]*n_s1 + [True]*n_s2: + times = np.arange (n_samples, dtype=np.float32) + bwidths = np.ones (n_samples, dtype=np.float32) + ids = np.arange (n_sensors) * 3 + wfs = np.random.rand(n_sensors, n_samples).astype(np.float32) + pmts = PMTResponses(ids , wfs ) + if with_sipms: + sipms = SiPMResponses(ids+1000, wfs*2) + peak = S2 + else: + sipms = SiPMResponses.build_empty_instance() + peak = S1 + peaks.append(peak(times, bwidths, pmts, sipms)) + + s1s, s2s = peaks[:n_s1], peaks[n_s1:] + pmaps[i*5] = PMap(s1s, s2s) + + np.random.set_state(state) + return pmaps + + +@fixture(scope="session") +def two_pmaps_dfs(two_pmaps_evm): + """Same as `two_pmaps` but with DataFrames""" + s1_data = pmaps_to_arrays(two_pmaps_evm, list(two_pmaps_evm), "s1s") + s2_data = pmaps_to_arrays(two_pmaps_evm, list(two_pmaps_evm), "s2s") + + s1 = pd.DataFrame(dict( event = s1_data.evt_numbers + , time = s1_data.times + , peak = s1_data.peak_numbers.astype(np.uint8) + , bwidth = s1_data.bwidths + , ene = s1_data.enes + )) + s2 = pd.DataFrame(dict( event = s2_data.evt_numbers + , time = s2_data.times + , peak = s2_data.peak_numbers.astype(np.uint8) + , bwidth = s2_data.bwidths + , ene = s2_data.enes + )) + si = pd.DataFrame(dict( event = s2_data.evt_numbers_sipm + , peak = s2_data.peak_numbers_sipm.astype(np.uint8) + , nsipm = s2_data.nsipms.astype(np.int16) + , ene = s2_data.enes_sipm + )) + s2pmt = pd.DataFrame(dict( event = s2_data.evt_numbers_pmt + , peak = s2_data.peak_numbers_pmt.astype(np.uint8) + , npmt = s2_data.npmts.astype(np.uint8) + , ene = s2_data.enes_pmt + )) + s1pmt = pd.DataFrame(dict( event = s1_data.evt_numbers_pmt + , peak = s1_data.peak_numbers_pmt.astype(np.uint8) + , npmt = s1_data.npmts.astype(np.uint8) + , ene = s1_data.enes_pmt + )) + return s1, s2, si, s1pmt, s2pmt + + +@fixture(scope="session") +def two_pmaps(two_pmaps_evm, two_pmaps_dfs, output_tmpdir): + pmap_filename = os.path.join(output_tmpdir, "two_pmaps.h5") + run_number = 0 # irrelevant + timestamp = 0 # irrelevant + + with tb.open_file(pmap_filename, "w") as output_file: + write_pmap = pmpio.pmap_writer(output_file) + write_evtinfo = reio.run_and_event_writer(output_file) + for event_number, pmap in two_pmaps_evm.items(): + write_pmap (pmap, event_number) + write_evtinfo(run_number, event_number, timestamp) + + return pmap_filename, two_pmaps_evm, two_pmaps_dfs + + def pmaps_to_arrays(pmaps, evt_numbers, attr): data = defaultdict(list) for evt_number, pmap in zip(evt_numbers, pmaps.values()): @@ -42,6 +142,7 @@ def pmaps_to_arrays(pmaps, evt_numbers, attr): data["evt_numbers_sipm"] .extend([ evt_number] * size * nsipm) data["peak_numbers_sipm"].extend([peak_number] * size * nsipm) data["times"] .extend(peak.times) + data["bwidths"] .extend(peak.bin_widths) data["npmts"] .extend(np.repeat(peak. pmts.ids, size)) data["nsipms"] .extend(np.repeat(peak.sipms.ids, size)) data["enes"] .extend(peak.pmts.sum_over_sensors) @@ -181,16 +282,73 @@ def test_store_pmap(output_tmpdir, KrMC_pmaps_dict): assert cols.ene [:] == approx (s2_data.enes_sipm) -def test_load_pmaps_as_df(KrMC_pmaps_filename, KrMC_pmaps_dfs): - true_dfs = KrMC_pmaps_dfs - read_dfs = pmpio.load_pmaps_as_df(KrMC_pmaps_filename) +def test_check_file_integrity_ok(KrMC_pmaps_filename): + """For a file with no problems (like the input here), this test should pass.""" + # just check that it doesn't raise an exception + with tb.open_file(KrMC_pmaps_filename) as file: + pmpio.check_file_integrity(file) + + +def test_check_file_integrity_raises(KrMC_pmaps_filename, config_tmpdir): + """Check that a file with a mismatch in the event number raises an exception.""" + filename = os.path.join(config_tmpdir, f"test_check_file_integrity_raises.h5") + shutil.copy(KrMC_pmaps_filename, filename) + with tb.open_file(filename, "r+") as file: + file.root.Run.events.remove_rows(0, 1) # remove first row/event + + with raises(InvalidInputFileStructure, match="Inconsistent data: event number mismatch"): + pmpio.check_file_integrity(file) + + +def test_load_pmaps_as_df_eager(two_pmaps): + filename, _, true_dfs = two_pmaps + read_dfs = pmpio.load_pmaps_as_df_eager(filename) for read_df, true_df in zip(read_dfs, true_dfs): assert_dataframes_equal(read_df, true_df) -def test_load_pmaps_as_df_without_ipmt(KrMC_pmaps_without_ipmt_filename, KrMC_pmaps_without_ipmt_dfs): +def test_load_pmaps_as_df_lazy(KrMC_pmaps_filename): + """Ensure the lazy and non-lazy versions provide the same result""" + dfs_eager = pmpio.load_pmaps_as_df_eager(KrMC_pmaps_filename) + dfs_lazy = pmpio.load_pmaps_as_df_lazy (KrMC_pmaps_filename) + + # concatenate all dfs from the same node + dfs_lazy = [pd.concat(node_dfs, ignore_index=True) for node_dfs in zip(*dfs_lazy)] + + assert len(dfs_eager) == len(dfs_lazy) + for df_lazy, df_eager in zip(dfs_lazy, dfs_eager): + assert_dataframes_equal(df_lazy, df_eager) + + +@mark.parametrize("skip" , (0, 1, 2, 3)) +@mark.parametrize("nread", (0, 1, 2, 3)) +def test_load_pmaps_as_df_lazy_subset(KrMC_pmaps_filename, skip, nread): + """Ensure the lazy and non-lazy versions provide the same result""" + # concatenate all dfs from the same node + dfs_lazy = pmpio.load_pmaps_as_df_lazy (KrMC_pmaps_filename, skip, nread) + dfs_lazy = [pd.concat(node_dfs, ignore_index=True) for node_dfs in zip(*dfs_lazy)] + + for df in dfs_lazy: + assert df.event.nunique() == nread + + +def test_load_pmaps_as_df(KrMC_pmaps_filename): + """Ensure the output of the function is the expected one""" + eager = pmpio.load_pmaps_as_df(KrMC_pmaps_filename, lazy=False) + lazy = pmpio.load_pmaps_as_df(KrMC_pmaps_filename, lazy=True ) + assert len(eager) == 5 + assert all(isinstance(item, pd.DataFrame) for item in eager) + + assert isinstance(lazy, Generator) + element = next(lazy) + assert len(element) == 5 + assert all(isinstance(item, pd.DataFrame) for item in element) + + +@mark.skip(reason="Deprecated feature. Plus this test makes no sense. Compares the output with itself.") +def test_load_pmaps_as_df_eager_without_ipmt(KrMC_pmaps_without_ipmt_filename, KrMC_pmaps_without_ipmt_dfs): true_dfs = KrMC_pmaps_without_ipmt_dfs - read_dfs = pmpio.load_pmaps_as_df(KrMC_pmaps_without_ipmt_filename) + read_dfs = pmpio.load_pmaps_as_df_eager(KrMC_pmaps_without_ipmt_filename) # Indices 0, 1 and 2 correspond to the S1sum, S2sum and Si # dataframes. Indices 3 and 4 are the S1pmt and S2pmt dataframes @@ -202,24 +360,52 @@ def test_load_pmaps_as_df_without_ipmt(KrMC_pmaps_without_ipmt_filename, KrMC_pm assert read_dfs[4] is None -@given(stg.data()) -def test_load_pmaps(output_tmpdir, data): - pmap_filename = os.path.join(output_tmpdir, "test_pmap_file.h5") - event_numbers = [2, 4, 6] - pmt_ids = [1, 6, 8] - pmap_generator = pmap_gen(pmt_ids) - true_pmaps = [data.draw(pmap_generator)[1] for _ in event_numbers] +def test_load_pmaps_eager(two_pmaps, output_tmpdir): + filename, true_pmaps, _ = two_pmaps + read_pmaps = pmpio.load_pmaps_eager(filename) + assert len(read_pmaps) == len(true_pmaps) + assert sorted(read_pmaps) == sorted(true_pmaps) # compare keys + for key, true_pmap in true_pmaps.items(): + assert_PMap_equality(read_pmaps[key], true_pmap) - with tb.open_file(pmap_filename, "w") as output_file: - write = pmpio.pmap_writer(output_file) - list(map(write, true_pmaps, event_numbers)) - read_pmaps = pmpio.load_pmaps(pmap_filename) +def test_load_pmaps_lazy(KrMC_pmaps_filename): + """Ensure the lazy and non-lazy versions provide the same result""" + pmaps_eager = pmpio.load_pmaps_eager(KrMC_pmaps_filename) + pmaps_lazy = pmpio.load_pmaps_lazy (KrMC_pmaps_filename) + + # combine all events in the same dictionary + pmaps_lazy = dict(pmaps_lazy) + + assert len(pmaps_lazy) == len(pmaps_eager) + for evt, pmap_lazy in pmaps_lazy.items(): + assert evt in pmaps_eager + assert_PMap_equality(pmap_lazy, pmaps_eager[evt]) + + +@mark.parametrize("skip" , (0, 1, 2, 3)) +@mark.parametrize("nread", (0, 1, 2, 3)) +def test_load_pmaps_lazy_subset(KrMC_pmaps_filename, skip, nread): + """Ensure the lazy and non-lazy versions provide the same result""" + # concatenate all dfs from the same node + pmaps_lazy = pmpio.load_pmaps_lazy(KrMC_pmaps_filename, skip, nread) + pmaps_lazy = dict(pmaps_lazy) + assert len(pmaps_lazy) == nread + + +def test_load_pmaps(KrMC_pmaps_filename): + """Ensure the output of the function is the expected one""" + eager = pmpio.load_pmaps(KrMC_pmaps_filename, lazy=False) + lazy = pmpio.load_pmaps(KrMC_pmaps_filename, lazy=True ) + + assert isinstance(eager, Mapping) + element = next(iter(eager.values())) + assert isinstance(element, PMap) - assert len(read_pmaps) == len(true_pmaps) - assert np.all(list(read_pmaps.keys()) == event_numbers) - for true_pmap, read_pmap in zip(true_pmaps, read_pmaps.values()): - assert_PMap_equality(read_pmap, true_pmap) + assert isinstance(lazy, Generator) + element = next(lazy) + assert len(element) == 2 # event, pmap + assert isinstance(element[1], PMap) @mark.parametrize("signal_type", (S1, S2))