Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/awkward2_dev' into awkward2_dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ikrommyd committed Mar 25, 2023
2 parents 692cdbb + 192ee34 commit 373873e
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 17 deletions.
2 changes: 2 additions & 0 deletions coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def __call__(self, form):
column_source = {key: lza[key] for key in awkward.fields(lza)}

lform = PreloadedSourceMapping._extract_base_form(column_source)
lform["parameters"]["metadata"] = self.metadata

return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form)

def create_column_mapping_and_key(self, columns, start, stop, interp_options):
Expand Down
83 changes: 74 additions & 9 deletions coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,7 +1412,7 @@ def _normalize_fileset(
yield FileMeta(dataset, filename, local_treename, user_meta)

@staticmethod
def metadata_fetcher(
def metadata_fetcher_root(
xrootdtimeout: int, align_clusters: bool, item: FileMeta
) -> Accumulatable:
with uproot.open({item.filename: None}, timeout=xrootdtimeout) as file:
Expand All @@ -1432,7 +1432,21 @@ def metadata_fetcher(
)
return out

def _preprocess_fileset(self, fileset: Dict) -> None:
@staticmethod
def metadata_fetcher_parquet(item: FileMeta):
with ParquetFileContext(item.filename) as file:
metadata = {}
if item.metadata:
metadata.update(item.metadata)
metadata.update(
{"numentries": file.num_entries, "uuid": b"NO_UUID_0000_000"}
)
out = set_accumulator(
[FileMeta(item.dataset, item.filename, item.treename, metadata)]
)
return out

def _preprocess_fileset_root(self, fileset: Dict) -> None:
# this is a bit of an abuse of map-reduce but ok
to_get = {
filemeta
Expand All @@ -1457,7 +1471,43 @@ def _preprocess_fileset(self, fileset: Dict) -> None:
self.automatic_retries,
self.retries,
self.skipbadfiles,
partial(self.metadata_fetcher, self.xrootdtimeout, self.align_clusters),
partial(
self.metadata_fetcher_root, self.xrootdtimeout, self.align_clusters
),
)
out, _ = pre_executor(to_get, closure, out)
while out:
item = out.pop()
self.metadata_cache[item] = item.metadata
for filemeta in fileset:
filemeta.maybe_populate(self.metadata_cache)

def _preprocess_fileset_parquet(self, fileset: Dict) -> None:
# this is a bit of an abuse of map-reduce but ok
to_get = {
filemeta
for filemeta in fileset
if not filemeta.populated(clusters=self.align_clusters)
}
if len(to_get) > 0:
out = set_accumulator()
pre_arg_override = {
"function_name": "get_metadata",
"desc": "Preprocessing",
"unit": "file",
"compression": None,
}
if isinstance(self.pre_executor, (FuturesExecutor, ParslExecutor)):
pre_arg_override.update({"tailtimeout": None})
if isinstance(self.pre_executor, (DaskExecutor)):
self.pre_executor.heavy_input = None
pre_arg_override.update({"worker_affinity": False})
pre_executor = self.pre_executor.copy(**pre_arg_override)
closure = partial(
self.automatic_retries,
self.retries,
self.skipbadfiles,
self.metadata_fetcher_parquet,
)
out, _ = pre_executor(to_get, closure, out)
while out:
Expand All @@ -1481,7 +1531,7 @@ def _chunk_generator(self, fileset: Dict, treename: str) -> Generator:
config = None
if self.use_skyhook:
config = Runner.read_coffea_config()
if self.format == "root":
if not self.use_skyhook and (self.format == "root" or self.format == "parquet"):
if self.maxchunks is None:
last_chunksize = self.chunksize
for filemeta in fileset:
Expand Down Expand Up @@ -1608,10 +1658,13 @@ def _work_function(
]
setattr(events, "metadata", metadata)
elif format == "parquet":
import dask_awkward

tree = file
events = LazyDataFrame(
tree, item.entrystart, item.entrystop, metadata=metadata
)
events = dask_awkward.from_parquet(item.filename)[
item.entrystart : item.entrystop
]
setattr(events, "metadata", metadata)
else:
raise ValueError("Format can only be root or parquet!")
elif issubclass(schema, schemas.BaseSchema):
Expand Down Expand Up @@ -1647,8 +1700,9 @@ def _work_function(
schemaclass=schema,
metadata=metadata,
skyhook_options=skyhook_options,
permit_dask=True,
)
events = factory.events()
events = factory.events()[item.entrystart : item.entrystop]
else:
raise ValueError(
"Expected schema to derive from nanoevents.BaseSchema, instead got %r"
Expand Down Expand Up @@ -1745,7 +1799,18 @@ def preprocess(
for filemeta in fileset:
filemeta.maybe_populate(self.metadata_cache)

self._preprocess_fileset(fileset)
self._preprocess_fileset_root(fileset)
fileset = self._filter_badfiles(fileset)

# reverse fileset list to match the order of files as presented in version
# v0.7.4. This fixes tests using maxchunks.
fileset.reverse()
elif self.format == "parquet":
fileset = list(self._normalize_fileset(fileset, treename))
for filemeta in fileset:
filemeta.maybe_populate(self.metadata_cache)

self._preprocess_fileset_parquet(fileset)
fileset = self._filter_badfiles(fileset)

# reverse fileset list to match the order of files as presented in version
Expand Down
Binary file modified tests/samples/nano_dimuon.parquet
Binary file not shown.
Binary file modified tests/samples/nano_dy.parquet
Binary file not shown.
16 changes: 8 additions & 8 deletions tests/test_local_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
pytest.skip("skipping tests that only function in linux", allow_module_level=True)


@pytest.mark.parametrize("filetype", ["root"]) # TODO re-enable parquet tests!
@pytest.mark.parametrize("filetype", ["root", "parquet"])
@pytest.mark.parametrize("skipbadfiles", [True, False])
@pytest.mark.parametrize("maxchunks", [1, None])
@pytest.mark.parametrize("chunksize", [100000, 5])
Expand All @@ -25,8 +25,8 @@ def test_dataframe_analysis(
from coffea.processor.test_items import NanoTestProcessor

filelist = {
"ZJets": [osp.abspath(f"tests/samples/nano_dy.{filetype}")],
"Data": [osp.abspath(f"tests/samples/nano_dimuon.{filetype}")],
"ZJets": {"files": [osp.abspath(f"tests/samples/nano_dy.{filetype}")]},
"Data": {"files": [osp.abspath(f"tests/samples/nano_dimuon.{filetype}")]},
}

executor = executor()
Expand All @@ -49,13 +49,13 @@ def test_dataframe_analysis(
else:
assert maxchunks == 1
print(hists["cutflow"]["ZJets_pt"])
assert hists["cutflow"]["ZJets_pt"] == 18 if chunksize == 100_000 else 2
assert hists["cutflow"]["ZJets_mass"] == 6 if chunksize == 100_000 else 1
assert hists["cutflow"]["Data_pt"] == 84 if chunksize == 100_000 else 13
assert hists["cutflow"]["Data_mass"] == 66 if chunksize == 100_000 else 12
assert hists["cutflow"]["ZJets_pt"] == (18 if chunksize == 100_000 else 2)
assert hists["cutflow"]["ZJets_mass"] == (6 if chunksize == 100_000 else 1)
assert hists["cutflow"]["Data_pt"] == (84 if chunksize == 100_000 else 13)
assert hists["cutflow"]["Data_mass"] == (66 if chunksize == 100_000 else 12)


@pytest.mark.parametrize("filetype", ["root"]) # TODO re-enable parquet tests!
@pytest.mark.parametrize("filetype", ["root", "parquet"])
@pytest.mark.parametrize("skipbadfiles", [True, False])
@pytest.mark.parametrize("maxchunks", [None, 1000])
@pytest.mark.parametrize("compression", [None, 0, 2])
Expand Down

0 comments on commit 373873e

Please sign in to comment.