Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lazy PMap reader #917

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions invisible_cities/cities/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def wf_from_files(paths, wf_type):
def pmap_from_files(paths):
for path in paths:
try:
pmaps = load_pmaps(path)
pmaps = load_pmaps(path, lazy=True)
except tb.exceptions.NoSuchNodeError:
continue

Expand All @@ -567,11 +567,11 @@ def pmap_from_files(paths):
except IndexError:
continue

check_lengths(event_info, pmaps)

for evtinfo in event_info:
for evtinfo, (evt, pmap) in zip(event_info, pmaps):
event_number, timestamp = evtinfo.fetch_all_fields()
yield dict(pmap=pmaps[event_number], run_number=run_number,
if event_number != evt:
raise RuntimeError("Inconsistent data: event number mismatch")
yield dict(pmap=pmap, run_number=run_number,
event_number=event_number, timestamp=timestamp)


Expand Down
58 changes: 55 additions & 3 deletions invisible_cities/io/pmaps_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import tables as tb
import pandas as pd

from .. core.exceptions import InvalidInputFileStructure

from .. evm .pmaps import PMTResponses
from .. evm .pmaps import SiPMResponses
from .. evm .pmaps import S1
Expand Down Expand Up @@ -78,8 +80,23 @@ def _make_tables(hdf5_file, compression):
return pmp_tables


def load_pmaps_as_df(filename):
def check_file_integrity(file):
events_run = file.root.Run .events.read(field="evt_number")
events_pmaps_s1 = file.root.PMAPS.S1 .read(field="event")
events_pmaps_s2 = file.root.PMAPS.S2 .read(field="event")
if set(events_run) != set(events_pmaps_s1.tolist() + events_pmaps_s2.tolist()):
raise InvalidInputFileStructure("Inconsistent data: event number mismatch")
jwaiton marked this conversation as resolved.
Show resolved Hide resolved


def load_pmaps_as_df(filename, lazy=False):
loader = load_pmaps_as_df_lazy if lazy else load_pmaps_as_df_eager
return loader(filename)


def load_pmaps_as_df_eager(filename):
with tb.open_file(filename, 'r') as h5f:
check_file_integrity(h5f)

pmap = h5f.root.PMAPS
to_df = pd.DataFrame.from_records
return (to_df(pmap.S1 .read()),
Expand All @@ -89,16 +106,38 @@ def load_pmaps_as_df(filename):
to_df(pmap.S2Pmt.read()) if 'S2Pmt' in pmap else None)


def load_pmaps_as_df_lazy(filename):
def read_event(table, event):
if table is None: return None
records = table.read_where(f"event=={event}")
return pd.DataFrame.from_records(records)

tables = "S1 S2 S2Si S1Pmt S2Pmt".split()
with tb.open_file(filename, 'r') as h5f:
check_file_integrity(h5f)

events = h5f.root.Run.events.read(field="evt_number")
tables = [getattr(h5f.root.PMAPS, table, None) for table in tables]
for event in events:
yield tuple(read_event(table, event) for table in tables)


# Hack fix to allow loading pmaps without individual pmts. Used in load_pmaps
def _build_ipmtdf_from_sumdf(sumdf):
ipmtdf = sumdf.copy()
ipmtdf = ipmtdf.rename(index=str, columns={'time': 'npmt'})
ipmtdf['npmt'] = -1
return ipmtdf

def load_pmaps(filename):

def load_pmaps(filename, lazy=False):
loader = load_pmaps_lazy if lazy else load_pmaps_eager
return loader(filename)


def load_pmaps_eager(filename):
pmap_dict = {}
s1df, s2df, sidf, s1pmtdf, s2pmtdf = load_pmaps_as_df(filename)
s1df, s2df, sidf, s1pmtdf, s2pmtdf = load_pmaps_as_df_eager(filename)
# Hack fix to allow loading pmaps without individual pmts
if s1pmtdf is None: s1pmtdf = _build_ipmtdf_from_sumdf(s1df)
if s2pmtdf is None: s2pmtdf = _build_ipmtdf_from_sumdf(s2df)
Expand Down Expand Up @@ -141,6 +180,19 @@ def load_pmaps(filename):
return pmap_dict


def load_pmaps_lazy(filename):
for (s1df, s2df, sidf, s1pmtdf, s2pmtdf) in load_pmaps_as_df_lazy(filename):
# Hack fix to allow loading pmaps without individual pmts
if s1pmtdf is None: s1pmtdf = _build_ipmtdf_from_sumdf(s1df)
if s2pmtdf is None: s2pmtdf = _build_ipmtdf_from_sumdf(s2df)

event_number = np.concatenate([s1df.event, s2df.event])[0]

s1s = s1s_from_df(s1df, s1pmtdf)
s2s = s2s_from_df(s2df, s2pmtdf, sidf)
yield event_number, PMap(s1s, s2s)


def build_pmt_responses(pmtdf, ipmtdf):
times = pmtdf.time.values
try:
Expand Down
214 changes: 189 additions & 25 deletions invisible_cities/io/pmaps_io_test.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,132 @@
import os
import shutil
from collections import namedtuple
from collections import defaultdict

from pytest import mark
from pytest import approx

from hypothesis import given
from hypothesis import strategies as stg
from pytest import fixture
from pytest import raises

import tables as tb
import numpy as np
import pandas as pd

from ..core.exceptions import InvalidInputFileStructure
from ..core.testing_utils import assert_PMap_equality
from ..core.testing_utils import assert_dataframes_equal
from ..core.testing_utils import exactly
from ..evm .pmaps import S1
from ..evm .pmaps import S2
from ..evm .pmaps_test import pmaps as pmap_gen
from ..evm .pmaps import PMTResponses
from ..evm .pmaps import SiPMResponses
from ..evm .pmaps import PMap
from . import pmaps_io as pmpio
from . import run_and_event_io as reio

from typing import Generator
from typing import Mapping


pmaps_data = namedtuple("pmaps_data", """evt_numbers peak_numbers
evt_numbers_pmt peak_numbers_pmt
evt_numbers_sipm peak_numbers_sipm
times npmts nsipms
times bwidths npmts nsipms
enes enes_pmt enes_sipm""")

@fixture(scope="session")
def two_pmaps_evm():
"""
Generate two pmaps (two events) with random data. We fix the RNG
state for reproducibility. We make minimal changes to the sensor
ID and wfs between pmts and sipms to keep this fixture to a
minimum since the data values are irrelevant. We use different
number of S1s and S2s to cover more cases.
"""
state = np.random.get_state()
np.random.seed(123456789)

pmaps = {}
for i in range(2):
n_sensors = 5
n_samples = 50
n_s1 = 1 + i
n_s2 = 2 - i

peaks = []
for with_sipms in [False]*n_s1 + [True]*n_s2:
times = np.arange (n_samples, dtype=np.float32)
bwidths = np.ones (n_samples, dtype=np.float32)
ids = np.arange (n_sensors) * 3
wfs = np.random.rand(n_sensors, n_samples).astype(np.float32)
pmts = PMTResponses(ids , wfs )
if with_sipms:
sipms = SiPMResponses(ids+1000, wfs*2)
peak = S2
else:
sipms = SiPMResponses.build_empty_instance()
peak = S1
peaks.append(peak(times, bwidths, pmts, sipms))

s1s, s2s = peaks[:n_s1], peaks[n_s1:]
pmaps[i*5] = PMap(s1s, s2s)

np.random.set_state(state)
return pmaps


@fixture(scope="session")
def two_pmaps_dfs(two_pmaps_evm):
"""Same as `two_pmaps` but with DataFrames"""
s1_data = pmaps_to_arrays(two_pmaps_evm, list(two_pmaps_evm), "s1s")
s2_data = pmaps_to_arrays(two_pmaps_evm, list(two_pmaps_evm), "s2s")

s1 = pd.DataFrame(dict( event = s1_data.evt_numbers
, time = s1_data.times
, peak = s1_data.peak_numbers.astype(np.uint8)
, bwidth = s1_data.bwidths
, ene = s1_data.enes
))
s2 = pd.DataFrame(dict( event = s2_data.evt_numbers
, time = s2_data.times
, peak = s2_data.peak_numbers.astype(np.uint8)
, bwidth = s2_data.bwidths
, ene = s2_data.enes
))
si = pd.DataFrame(dict( event = s2_data.evt_numbers_sipm
, peak = s2_data.peak_numbers_sipm.astype(np.uint8)
, nsipm = s2_data.nsipms.astype(np.int16)
, ene = s2_data.enes_sipm
))
s2pmt = pd.DataFrame(dict( event = s2_data.evt_numbers_pmt
, peak = s2_data.peak_numbers_pmt.astype(np.uint8)
, npmt = s2_data.npmts.astype(np.uint8)
, ene = s2_data.enes_pmt
))
s1pmt = pd.DataFrame(dict( event = s1_data.evt_numbers_pmt
, peak = s1_data.peak_numbers_pmt.astype(np.uint8)
, npmt = s1_data.npmts.astype(np.uint8)
, ene = s1_data.enes_pmt
))
return s1, s2, si, s1pmt, s2pmt


@fixture(scope="session")
def two_pmaps(two_pmaps_evm, two_pmaps_dfs, output_tmpdir):
pmap_filename = os.path.join(output_tmpdir, "two_pmaps.h5")
run_number = 0 # irrelevant
timestamp = 0 # irrelevant

with tb.open_file(pmap_filename, "w") as output_file:
write_pmap = pmpio.pmap_writer(output_file)
write_evtinfo = reio.run_and_event_writer(output_file)
for event_number, pmap in two_pmaps_evm.items():
write_pmap (pmap, event_number)
write_evtinfo(run_number, event_number, timestamp)

return pmap_filename, two_pmaps_evm, two_pmaps_dfs


def pmaps_to_arrays(pmaps, evt_numbers, attr):
data = defaultdict(list)
for evt_number, pmap in zip(evt_numbers, pmaps.values()):
Expand All @@ -42,6 +142,7 @@ def pmaps_to_arrays(pmaps, evt_numbers, attr):
data["evt_numbers_sipm"] .extend([ evt_number] * size * nsipm)
data["peak_numbers_sipm"].extend([peak_number] * size * nsipm)
data["times"] .extend(peak.times)
data["bwidths"] .extend(peak.bin_widths)
data["npmts"] .extend(np.repeat(peak. pmts.ids, size))
data["nsipms"] .extend(np.repeat(peak.sipms.ids, size))
data["enes"] .extend(peak.pmts.sum_over_sensors)
Expand Down Expand Up @@ -181,16 +282,61 @@ def test_store_pmap(output_tmpdir, KrMC_pmaps_dict):
assert cols.ene [:] == approx (s2_data.enes_sipm)


def test_load_pmaps_as_df(KrMC_pmaps_filename, KrMC_pmaps_dfs):
true_dfs = KrMC_pmaps_dfs
read_dfs = pmpio.load_pmaps_as_df(KrMC_pmaps_filename)
def test_check_file_integrity_ok(KrMC_pmaps_filename):
"""For a file with no problems (like the input here), this test should pass."""
# just check that it doesn't raise an exception
with tb.open_file(KrMC_pmaps_filename) as file:
pmpio.check_file_integrity(file)


def test_check_file_integrity_raises(KrMC_pmaps_filename, config_tmpdir):
"""Check that a file with a mismatch in the event number raises an exception."""
filename = os.path.join(config_tmpdir, f"test_check_file_integrity_raises.h5")
shutil.copy(KrMC_pmaps_filename, filename)
with tb.open_file(filename, "r+") as file:
file.root.Run.events.remove_rows(0, 1) # remove first row/event

with raises(InvalidInputFileStructure, match="Inconsistent data: event number mismatch"):
pmpio.check_file_integrity(file)


def test_load_pmaps_as_df_eager(two_pmaps):
filename, _, true_dfs = two_pmaps
read_dfs = pmpio.load_pmaps_as_df_eager(filename)
for read_df, true_df in zip(read_dfs, true_dfs):
assert_dataframes_equal(read_df, true_df)


def test_load_pmaps_as_df_without_ipmt(KrMC_pmaps_without_ipmt_filename, KrMC_pmaps_without_ipmt_dfs):
def test_load_pmaps_as_df_lazy(KrMC_pmaps_filename):
"""Ensure the lazy and non-lazy versions provide the same result"""
dfs_eager = pmpio.load_pmaps_as_df_eager(KrMC_pmaps_filename)
dfs_lazy = pmpio.load_pmaps_as_df_lazy (KrMC_pmaps_filename)

# concatenate all dfs from the same node
dfs_lazy = [pd.concat(node_dfs, ignore_index=True) for node_dfs in zip(*dfs_lazy)]

assert len(dfs_eager) == len(dfs_lazy)
for df_lazy, df_eager in zip(dfs_lazy, dfs_eager):
assert_dataframes_equal(df_lazy, df_eager)


def test_load_pmaps_as_df(KrMC_pmaps_filename):
"""Ensure the output of the function is the expected one"""
eager = pmpio.load_pmaps_as_df(KrMC_pmaps_filename, lazy=False)
lazy = pmpio.load_pmaps_as_df(KrMC_pmaps_filename, lazy=True )
assert len(eager) == 5
assert all(isinstance(item, pd.DataFrame) for item in eager)

assert isinstance(lazy, Generator)
element = next(lazy)
assert len(element) == 5
assert all(isinstance(item, pd.DataFrame) for item in element)


@mark.skip(reason="Deprecated feature. Plus this test makes no sense. Compares the output with itself.")
def test_load_pmaps_as_df_eager_without_ipmt(KrMC_pmaps_without_ipmt_filename, KrMC_pmaps_without_ipmt_dfs):
true_dfs = KrMC_pmaps_without_ipmt_dfs
read_dfs = pmpio.load_pmaps_as_df(KrMC_pmaps_without_ipmt_filename)
read_dfs = pmpio.load_pmaps_as_df_eager(KrMC_pmaps_without_ipmt_filename)

# Indices 0, 1 and 2 correspond to the S1sum, S2sum and Si
# dataframes. Indices 3 and 4 are the S1pmt and S2pmt dataframes
Expand All @@ -202,24 +348,42 @@ def test_load_pmaps_as_df_without_ipmt(KrMC_pmaps_without_ipmt_filename, KrMC_pm
assert read_dfs[4] is None


@given(stg.data())
def test_load_pmaps(output_tmpdir, data):
pmap_filename = os.path.join(output_tmpdir, "test_pmap_file.h5")
event_numbers = [2, 4, 6]
pmt_ids = [1, 6, 8]
pmap_generator = pmap_gen(pmt_ids)
true_pmaps = [data.draw(pmap_generator)[1] for _ in event_numbers]
def test_load_pmaps_eager(two_pmaps, output_tmpdir):
filename, true_pmaps, _ = two_pmaps
read_pmaps = pmpio.load_pmaps_eager(filename)
assert len(read_pmaps) == len(true_pmaps)
assert sorted(read_pmaps) == sorted(true_pmaps) # compare keys
for key, true_pmap in true_pmaps.items():
assert_PMap_equality(read_pmaps[key], true_pmap)

with tb.open_file(pmap_filename, "w") as output_file:
write = pmpio.pmap_writer(output_file)
list(map(write, true_pmaps, event_numbers))

read_pmaps = pmpio.load_pmaps(pmap_filename)
def test_load_pmaps_lazy(KrMC_pmaps_filename):
"""Ensure the lazy and non-lazy versions provide the same result"""
pmaps_eager = pmpio.load_pmaps_eager(KrMC_pmaps_filename)
pmaps_lazy = pmpio.load_pmaps_lazy (KrMC_pmaps_filename)
jwaiton marked this conversation as resolved.
Show resolved Hide resolved

# combine all events in the same dictionary
pmaps_lazy = dict(pmaps_lazy)

assert len(pmaps_lazy) == len(pmaps_eager)
for evt, pmap_lazy in pmaps_lazy.items():
assert evt in pmaps_eager
assert_PMap_equality(pmap_lazy, pmaps_eager[evt])


def test_load_pmaps(KrMC_pmaps_filename):
"""Ensure the output of the function is the expected one"""
eager = pmpio.load_pmaps(KrMC_pmaps_filename, lazy=False)
lazy = pmpio.load_pmaps(KrMC_pmaps_filename, lazy=True )

assert isinstance(eager, Mapping)
element = next(iter(eager.values()))
assert isinstance(element, PMap)

assert len(read_pmaps) == len(true_pmaps)
assert np.all(list(read_pmaps.keys()) == event_numbers)
for true_pmap, read_pmap in zip(true_pmaps, read_pmaps.values()):
assert_PMap_equality(read_pmap, true_pmap)
assert isinstance(lazy, Generator)
element = next(lazy)
assert len(element) == 2 # event, pmap
assert isinstance(element[1], PMap)


@mark.parametrize("signal_type", (S1, S2))
Expand Down