diff --git a/coffea/nanoevents/factory.py b/coffea/nanoevents/factory.py index 1dbb60dec5..8b355540fe 100644 --- a/coffea/nanoevents/factory.py +++ b/coffea/nanoevents/factory.py @@ -167,6 +167,8 @@ def __call__(self, form): column_source = {key: lza[key] for key in awkward.fields(lza)} lform = PreloadedSourceMapping._extract_base_form(column_source) + lform["parameters"]["metadata"] = self.metadata + return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form) def create_column_mapping_and_key(self, columns, start, stop, interp_options): diff --git a/coffea/processor/executor.py b/coffea/processor/executor.py index f47b4401fd..618b1c741d 100644 --- a/coffea/processor/executor.py +++ b/coffea/processor/executor.py @@ -1412,7 +1412,7 @@ def _normalize_fileset( yield FileMeta(dataset, filename, local_treename, user_meta) @staticmethod - def metadata_fetcher( + def metadata_fetcher_root( xrootdtimeout: int, align_clusters: bool, item: FileMeta ) -> Accumulatable: with uproot.open({item.filename: None}, timeout=xrootdtimeout) as file: @@ -1432,7 +1432,21 @@ def metadata_fetcher( ) return out - def _preprocess_fileset(self, fileset: Dict) -> None: + @staticmethod + def metadata_fetcher_parquet(item: FileMeta): + with ParquetFileContext(item.filename) as file: + metadata = {} + if item.metadata: + metadata.update(item.metadata) + metadata.update( + {"numentries": file.num_entries, "uuid": b"NO_UUID_0000_000"} + ) + out = set_accumulator( + [FileMeta(item.dataset, item.filename, item.treename, metadata)] + ) + return out + + def _preprocess_fileset_root(self, fileset: Dict) -> None: # this is a bit of an abuse of map-reduce but ok to_get = { filemeta @@ -1457,7 +1471,43 @@ def _preprocess_fileset(self, fileset: Dict) -> None: self.automatic_retries, self.retries, self.skipbadfiles, - partial(self.metadata_fetcher, self.xrootdtimeout, self.align_clusters), + partial( + self.metadata_fetcher_root, self.xrootdtimeout, self.align_clusters + ), + ) + out, _ = pre_executor(to_get, closure, out) + while out: + item = out.pop() + self.metadata_cache[item] = item.metadata + for filemeta in fileset: + filemeta.maybe_populate(self.metadata_cache) + + def _preprocess_fileset_parquet(self, fileset: Dict) -> None: + # this is a bit of an abuse of map-reduce but ok + to_get = { + filemeta + for filemeta in fileset + if not filemeta.populated(clusters=self.align_clusters) + } + if len(to_get) > 0: + out = set_accumulator() + pre_arg_override = { + "function_name": "get_metadata", + "desc": "Preprocessing", + "unit": "file", + "compression": None, + } + if isinstance(self.pre_executor, (FuturesExecutor, ParslExecutor)): + pre_arg_override.update({"tailtimeout": None}) + if isinstance(self.pre_executor, (DaskExecutor)): + self.pre_executor.heavy_input = None + pre_arg_override.update({"worker_affinity": False}) + pre_executor = self.pre_executor.copy(**pre_arg_override) + closure = partial( + self.automatic_retries, + self.retries, + self.skipbadfiles, + self.metadata_fetcher_parquet, ) out, _ = pre_executor(to_get, closure, out) while out: @@ -1481,7 +1531,7 @@ def _chunk_generator(self, fileset: Dict, treename: str) -> Generator: config = None if self.use_skyhook: config = Runner.read_coffea_config() - if self.format == "root": + if not self.use_skyhook and (self.format == "root" or self.format == "parquet"): if self.maxchunks is None: last_chunksize = self.chunksize for filemeta in fileset: @@ -1608,10 +1658,13 @@ def _work_function( ] setattr(events, "metadata", metadata) elif format == "parquet": + import dask_awkward + tree = file - events = LazyDataFrame( - tree, item.entrystart, item.entrystop, metadata=metadata - ) + events = dask_awkward.from_parquet(item.filename)[ + item.entrystart : item.entrystop + ] + setattr(events, "metadata", metadata) else: raise ValueError("Format can only be root or parquet!") elif issubclass(schema, schemas.BaseSchema): @@ -1647,8 +1700,9 @@ def _work_function( schemaclass=schema, metadata=metadata, skyhook_options=skyhook_options, + permit_dask=True, ) - events = factory.events() + events = factory.events()[item.entrystart : item.entrystop] else: raise ValueError( "Expected schema to derive from nanoevents.BaseSchema, instead got %r" @@ -1745,7 +1799,18 @@ def preprocess( for filemeta in fileset: filemeta.maybe_populate(self.metadata_cache) - self._preprocess_fileset(fileset) + self._preprocess_fileset_root(fileset) + fileset = self._filter_badfiles(fileset) + + # reverse fileset list to match the order of files as presented in version + # v0.7.4. This fixes tests using maxchunks. + fileset.reverse() + elif self.format == "parquet": + fileset = list(self._normalize_fileset(fileset, treename)) + for filemeta in fileset: + filemeta.maybe_populate(self.metadata_cache) + + self._preprocess_fileset_parquet(fileset) fileset = self._filter_badfiles(fileset) # reverse fileset list to match the order of files as presented in version diff --git a/tests/samples/nano_dimuon.parquet b/tests/samples/nano_dimuon.parquet index 9a205e0c44..7ceb0e22d5 100644 Binary files a/tests/samples/nano_dimuon.parquet and b/tests/samples/nano_dimuon.parquet differ diff --git a/tests/samples/nano_dy.parquet b/tests/samples/nano_dy.parquet index 59b6f9cb93..5de5990c80 100644 Binary files a/tests/samples/nano_dy.parquet and b/tests/samples/nano_dy.parquet differ diff --git a/tests/test_local_executors.py b/tests/test_local_executors.py index b4a7d577d1..e7a815275a 100644 --- a/tests/test_local_executors.py +++ b/tests/test_local_executors.py @@ -11,7 +11,7 @@ pytest.skip("skipping tests that only function in linux", allow_module_level=True) -@pytest.mark.parametrize("filetype", ["root"]) # TODO re-enable parquet tests! +@pytest.mark.parametrize("filetype", ["root", "parquet"]) @pytest.mark.parametrize("skipbadfiles", [True, False]) @pytest.mark.parametrize("maxchunks", [1, None]) @pytest.mark.parametrize("chunksize", [100000, 5]) @@ -25,8 +25,8 @@ def test_dataframe_analysis( from coffea.processor.test_items import NanoTestProcessor filelist = { - "ZJets": [osp.abspath(f"tests/samples/nano_dy.{filetype}")], - "Data": [osp.abspath(f"tests/samples/nano_dimuon.{filetype}")], + "ZJets": {"files": [osp.abspath(f"tests/samples/nano_dy.{filetype}")]}, + "Data": {"files": [osp.abspath(f"tests/samples/nano_dimuon.{filetype}")]}, } executor = executor() @@ -49,13 +49,13 @@ def test_dataframe_analysis( else: assert maxchunks == 1 print(hists["cutflow"]["ZJets_pt"]) - assert hists["cutflow"]["ZJets_pt"] == 18 if chunksize == 100_000 else 2 - assert hists["cutflow"]["ZJets_mass"] == 6 if chunksize == 100_000 else 1 - assert hists["cutflow"]["Data_pt"] == 84 if chunksize == 100_000 else 13 - assert hists["cutflow"]["Data_mass"] == 66 if chunksize == 100_000 else 12 + assert hists["cutflow"]["ZJets_pt"] == (18 if chunksize == 100_000 else 2) + assert hists["cutflow"]["ZJets_mass"] == (6 if chunksize == 100_000 else 1) + assert hists["cutflow"]["Data_pt"] == (84 if chunksize == 100_000 else 13) + assert hists["cutflow"]["Data_mass"] == (66 if chunksize == 100_000 else 12) -@pytest.mark.parametrize("filetype", ["root"]) # TODO re-enable parquet tests! +@pytest.mark.parametrize("filetype", ["root", "parquet"]) @pytest.mark.parametrize("skipbadfiles", [True, False]) @pytest.mark.parametrize("maxchunks", [None, 1000]) @pytest.mark.parametrize("compression", [None, 0, 2])