Skip to content

Commit

Permalink
ressurect more tests, awkward is dropping behaviors in some cases
Browse files Browse the repository at this point in the history
  • Loading branch information
lgray committed Nov 4, 2022
1 parent 2bf8809 commit 18765f6
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 55 deletions.
12 changes: 9 additions & 3 deletions coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@


def _key_formatter(prefix, form_key, form, attribute):
print(form)
return prefix + f"/{form_key}/{attribute}"
if attribute == "offsets":
form_key += "%2C%21offsets"
return prefix + f"/{attribute}/{form_key}"


class NanoEventsFactory:
Expand Down Expand Up @@ -117,6 +118,8 @@ def from_root(
uuidpfn = {partition_key[0]: tree.file.file_path}
mapping = UprootSourceMapping(
TrivialUprootOpener(uuidpfn, uproot_options),
entry_start,
entry_stop,
cache={},
access_log=access_log,
)
Expand Down Expand Up @@ -220,7 +223,10 @@ def from_parquet(
)
uuidpfn = {partition_key[0]: pqobj_path}
mapping = ParquetSourceMapping(
TrivialParquetOpener(uuidpfn, parquet_options), access_log=access_log
TrivialParquetOpener(uuidpfn, parquet_options),
entry_start,
entry_stop,
access_log=access_log,
)

format_ = "parquet"
Expand Down
6 changes: 4 additions & 2 deletions coffea/nanoevents/mapping/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ def open_uuid(self, uuid):
class BaseSourceMapping(Mapping):
_debug = False

def __init__(self, fileopener, cache=None, access_log=None):
def __init__(self, fileopener, start, stop, cache=None, access_log=None):
self._fileopener = fileopener
self._cache = cache
self._access_log = access_log
self._start = start
self._stop = stop
self.setup()

def setup(self):
Expand Down Expand Up @@ -70,7 +72,7 @@ def interpret_key(cls, key):
def __getitem__(self, key):
uuid, treepath, start, stop, nodes = self.interpret_key(key)
if self._debug:
print("Gettting:", uuid, treepath, start, stop, nodes)
print("Gettting (", key, ") :", uuid, treepath, start, stop, nodes)
stack = []
skip = False
for node in nodes:
Expand Down
32 changes: 14 additions & 18 deletions coffea/nanoevents/mapping/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,14 @@ def arrow_schema_to_awkward_form(schema):
offsets="i64",
content=awkward.forms.NumpyForm(
inner_shape=[],
itemsize=dtype.dtype.itemsize,
format=dtype.dtype.char,
primitive=dtype.dtype.name,
),
)
elif isinstance(schema, pa.lib.DataType):
dtype = schema.to_pandas_dtype()()
return awkward.forms.NumpyForm(
inner_shape=[],
itemsize=dtype.dtype.itemsize,
format=dtype.dtype.char,
primitive=dtype.dtype.name,
)
else:
raise Exception("Unrecognized pyarrow array type")
Expand Down Expand Up @@ -118,28 +116,25 @@ def array(self, entry_start, entry_stop):
: len(aspa) + 1
]
offsets = offsets.astype(numpy.int64)
offsets = awkward.layout.Index64(offsets)
offsets = awkward.index.Index64(offsets)

if not isinstance(value_type, pa.lib.DataType):
raise Exception(
"arrow only accepts single jagged arrays for now..."
)
dtype = value_type.to_pandas_dtype()
flat = aspa.flatten()
content = numpy.frombuffer(flat.buffers()[1], dtype=dtype)[: len(flat)]
content = awkward.layout.NumpyArray(content)
out = awkward.layout.ListOffsetArray64(offsets, content)
elif isinstance(aspa, pa.lib.NumericArray):
out = numpy.frombuffer(
aspa.buffers()[1], dtype=aspa.type.to_pandas_dtype()
)[: len(aspa)]
out = awkward.layout.NumpyArray(out)
content = awkward.contents.NumpyArray(flat)
out = awkward.contents.ListOffsetArray(offsets, content)
elif isinstance(aspa, (pa.lib.NumericArray, pa.lib.BooleanArray)):
out = awkward.contents.NumpyArray(aspa)
else:
raise Exception("array is not flat array or jagged list")
return awkward.Array(out)

def __init__(self, fileopener, cache=None, access_log=None):
super(ParquetSourceMapping, self).__init__(fileopener, cache, access_log)
def __init__(self, fileopener, start, stop, cache=None, access_log=None):
super(ParquetSourceMapping, self).__init__(
fileopener, start, stop, cache, access_log
)

@classmethod
def _extract_base_form(cls, arrow_schema):
Expand Down Expand Up @@ -188,7 +183,8 @@ def _extract_base_form(cls, arrow_schema):
column_forms[key] = form
return {
"class": "RecordArray",
"contents": column_forms,
"contents": [item for item in column_forms.values()],
"fields": [key for key in column_forms.keys()],
"parameters": {"__doc__": "parquetfile"},
"form_key": "",
}
Expand All @@ -208,7 +204,7 @@ def extract_column(self, columnhandle, start, stop):
return columnhandle.array(entry_start=start, entry_stop=stop)

def __len__(self):
raise NotImplementedError
return self._stop - self._start

def __iter__(self):
raise NotImplementedError
15 changes: 9 additions & 6 deletions coffea/nanoevents/mapping/uproot.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ def _lazify_parameters(form_parameters, docstr=None):


class UprootSourceMapping(BaseSourceMapping):
_debug = True
_debug = False
_fix_awkward_form_of_iter = False

def __init__(self, fileopener, cache=None, access_log=None):
super(UprootSourceMapping, self).__init__(fileopener, cache, access_log)
def __init__(self, fileopener, start, stop, cache=None, access_log=None):
super(UprootSourceMapping, self).__init__(
fileopener, start, stop, cache, access_log
)

@classmethod
def _extract_base_form(cls, tree, iteritems_options={}):
Expand Down Expand Up @@ -129,9 +131,10 @@ def _extract_base_form(cls, tree, iteritems_options={}):

return {
"class": "RecordArray",
"contents": branch_forms,
"contents": [item for item in branch_forms.values()],
"fields": [key for key in branch_forms.keys()],
"parameters": {"__doc__": tree.title},
"form_key": "",
"form_key": None,
}

def key_root(self):
Expand All @@ -155,7 +158,7 @@ def extract_column(self, columnhandle, start, stop):
)

def __len__(self):
raise NotImplementedError
return self._stop - self._start

def __iter__(self):
raise NotImplementedError
14 changes: 7 additions & 7 deletions coffea/nanoevents/methods/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ def _collection_name(self):
def _getlistarray(self):
"""Do some digging to find the initial listarray"""

def descend(layout, depth):
def descend(layout, depth, **kwargs):
islistarray = isinstance(
layout,
(awkward.layout.ListOffsetArray32, awkward.layout.ListOffsetArray64),
awkward.contents.ListOffsetArray,
)
if islistarray and layout.content.parameter("collection_name") is not None:
return lambda: layout
return layout

return awkward._util.recursively_apply(self.layout, descend)
return awkward.transform(descend, self.layout, highlevel=False)

def _content(self):
"""Internal method to get jagged collection content
Expand All @@ -210,12 +210,12 @@ def flat_take(layout):
idx = awkward.Array(layout)
return self._content()[idx.mask[idx >= 0]]

def descend(layout, depth):
def descend(layout, depth, **kwargs):
if layout.purelist_depth == 1:
return lambda: flat_take(layout)
return flat_take(layout)

(index,) = awkward.broadcast_arrays(index)
out = awkward._util.recursively_apply(index.layout, descend)
out = awkward.transform(descend, index.layout, highlevel=False)
return awkward.Array(out, behavior=self.behavior)

def _events(self):
Expand Down
10 changes: 6 additions & 4 deletions coffea/nanoevents/schemas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ def listarray_form(content, offsets):
if offsets["class"] != "NumpyArray":
raise ValueError
if offsets["primitive"] == "int32":
arrayclass = "ListOffsetArray32"
arrayclass = "ListOffsetArray"
offsetstype = "i32"
elif offsets["primitive"] == "int64":
arrayclass = "ListOffsetArray64"
arrayclass = "ListOffsetArray"
offsetstype = "i64"
else:
raise ValueError("Unrecognized offsets data type")
Expand All @@ -32,7 +32,8 @@ def zip_forms(forms, name, record_name=None, offsets=None, bypass=False):
raise ValueError
record = {
"class": "RecordArray",
"contents": {k: form["content"] for k, form in forms.items()},
"fields": [k for k in forms.keys()],
"contents": [form["content"] for form in forms.values()],
"form_key": quote("!invalid," + name),
}
if record_name is not None:
Expand Down Expand Up @@ -99,9 +100,10 @@ def __init__(self, base_form):
params.setdefault("metadata", {})
self._form = {
"class": "RecordArray",
"fields": base_form["fields"],
"contents": base_form["contents"],
"parameters": params,
"form_key": "",
"form_key": None,
}

@property
Expand Down
9 changes: 6 additions & 3 deletions coffea/nanoevents/schemas/nanoaod.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ def __init__(self, base_form, version="latest"):
if int(version) < 6:
del self.cross_references["FsrPhoton_muonIdx"]
del self.cross_references["Muon_fsrPhotonIdx"]
self._form["contents"] = self._build_collections(self._form["contents"])
self._form["fields"], self._form["contents"] = self._build_collections(
self._form["fields"], self._form["contents"]
)
self._form["parameters"]["metadata"]["version"] = self._version

@classmethod
Expand All @@ -169,7 +171,8 @@ def v5(cls, base_form):
"""Build the NanoEvents assuming NanoAODv5"""
return cls(base_form, version="5")

def _build_collections(self, branch_forms):
def _build_collections(self, field_names, input_contents):
branch_forms = {k: v for k, v in zip(field_names, input_contents)}
# parse into high-level records (collections, list collections, and singletons)
collections = set(k.split("_")[0] for k in branch_forms)
collections -= set(
Expand Down Expand Up @@ -269,7 +272,7 @@ def _build_collections(self, branch_forms):
output[name].setdefault("parameters", {})
output[name]["parameters"].update({"collection_name": name})

return output
return output.keys(), output.values()

@property
def behavior(self):
Expand Down
24 changes: 12 additions & 12 deletions coffea/nanoevents/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


def to_layout(array):
if isinstance(array, awkward.layout.Content):
if isinstance(array, awkward.contents.Content):
return array
return array.layout

Expand Down Expand Up @@ -137,7 +137,7 @@ def counts2nestedindex_form(local_counts, target_offsets):
if not target_offsets["class"] == "NumpyArray":
raise RuntimeError
form = {
"class": "ListOffsetArray64",
"class": "ListOffsetArray",
"offsets": "i64",
"content": copy.deepcopy(local_counts),
}
Expand Down Expand Up @@ -191,7 +191,7 @@ def distinctParent_form(parents, pdg):
if not pdg["class"].startswith("ListOffset"):
raise RuntimeError
form = {
"class": "ListOffsetArray64",
"class": "ListOffsetArray",
"offsets": "i64",
"content": {
"class": "NumpyArray",
Expand Down Expand Up @@ -250,10 +250,10 @@ def children_form(offsets, globalparents):
if not globalparents["class"].startswith("ListOffset"):
raise RuntimeError
form = {
"class": "ListOffsetArray64",
"class": "ListOffsetArray",
"offsets": "i64",
"content": {
"class": "ListOffsetArray64",
"class": "ListOffsetArray",
"offsets": "i64",
"content": {
"class": "NumpyArray",
Expand All @@ -280,9 +280,9 @@ def children(stack):
offsets = stack.pop()
coffsets, ccontent = _children_kernel(offsets, parents)
out = awkward.Array(
awkward.layout.ListOffsetArray64(
awkward.layout.Index64(coffsets),
awkward.layout.NumpyArray(ccontent),
awkward.contents.ListOffsetArray(
awkward.index.Index64(coffsets),
awkward.contents.NumpyArray(ccontent),
)
)
stack.append(out)
Expand All @@ -292,7 +292,7 @@ def nestedindex_form(indices):
if not all(index["class"].startswith("ListOffset") for index in indices):
raise RuntimeError
form = {
"class": "ListOffsetArray64",
"class": "ListOffsetArray",
"offsets": "i64",
"content": copy.deepcopy(indices[0]),
}
Expand Down Expand Up @@ -324,9 +324,9 @@ def nestedindex(stack):
out[i::n] = idx
offsets = numpy.arange(0, len(out) + 1, n, dtype=numpy.int64)
out = awkward.Array(
awkward.layout.ListOffsetArray64(
awkward.layout.Index64(offsets),
awkward.layout.NumpyArray(out),
awkward.contents.ListOffsetArray(
awkward.index.Index64(offsets),
awkward.contents.NumpyArray(out),
)
)
stack.append(out)
Expand Down

0 comments on commit 18765f6

Please sign in to comment.