-
Notifications
You must be signed in to change notification settings - Fork 0
Columnar data format
Note: These are all early prototypes.
Femtocode data would usually not be represented as Python objects, but I define a mapping here just to be concrete about what the prototype shredding/assembly algorithms do. See [Type System](Type System) for an elaboration on Femtocode types.
Femtocode schema | Python (3) type | Python example |
---|---|---|
Null | NoneType |
None |
Boolean | bool |
True , False
|
Number |
int , float
|
3 , 3.14
|
String with charset="bytes"
|
bytes |
b"hello" |
String with charset="unicode"
|
str |
u"hello" |
Collection | list |
[1, 2, 3] |
Record | namedtuple |
namedtuple("rec1", ["x", "y"])(3.14, b"hello") |
Union | any of the above (ambiguous) |
None , 3.14 , b"hello"
|
Turns Python objects into arrays similar to the ones Femtocode kernels will operate on.
First, some preliminaries (class definitions, found in statementlist.py
).
class ColumnName(object):
sizeSuffix = "@size"
tagSuffix = "@tag"
def __init__(self, *seq):
self.seq = seq
def size(self):
return ColumnName(*(self.seq + (self.sizeSuffix,)))
def tag(self):
return ColumnName(*(self.seq + (self.tagSuffix,)))
def rec(self, fieldName):
return ColumnName(*(self.seq + (fieldName,)))
def pos(self, position):
return ColumnName(*(self.seq + (position,)))
def issize(self):
return self.endswith(ColumnName.sizeSuffix)
def istag(self):
return self.endswith(ColumnName.tagSuffix)
def __eq__(self, other):
return isinstance(other, ColumnName) and self.seq == other.seq
def __hash__(self):
return hash((ColumnName, self.seq))
class Column(object):
def __init__(self, name, schema):
self.name = name
self.schema = schema
self.data = [] # the actual data, just for testing
self.pointer = 0 # index into that list
def __repr__(self):
return "Column({0}, {1})".format(self.name, self.schema)
def __eq__(self, other):
return self.name == other.name and self.schema == other.schema
class DataColumn(Column):
def __repr__(self):
return "DataColumn({0}, {1})".format(self.name, self.schema)
class RecursiveColumn(Column):
def __repr__(self):
return "RecursiveColumn({0}, {1})".format(self.name, self.schema)
class SizeColumn(Column):
def __init__(self, name, depth):
super(SizeColumn, self).__init__(name, integer(0, almost(inf)))
self.depth = depth
def __repr__(self):
return "SizeColumn({0}, {1})".format(self.name, self.depth)
class TagColumn(Column):
def __init__(self, name, possibilities):
super(TagColumn, self).__init__(name, integer(0, len(possibilities)))
self.possibilities = possibilities
def __repr__(self):
return "TagColumn({0}, {1})".format(self.name, self.possibilities)
Now the shredding algorithm.
def shred(obj, schema, columns, name):
if isinstance(name, string_types):
name = ColumnName(name)
if isinstance(columns.get(name), RecursiveColumn):
columns[name].data.append(obj)
elif isinstance(schema, Null):
pass
elif isinstance(schema, (Boolean, Number)):
columns[name].data.append(obj)
elif isinstance(schema, String):
columns[name].data.extend(list(obj))
if schema.charset != "bytes" or schema.fewest != schema.most:
sizeName = name.size()
columns[sizeName].data.append(len(obj))
elif isinstance(schema, Collection):
if schema.fewest != schema.most:
size = len(obj)
for n, c in columns.items():
if n.startswith(name) and n.endswith(ColumnName.sizeSuffix):
c.data.append(size)
items = schema.items
for x in obj:
shred(x, items, columns, name)
elif isinstance(schema, Record):
for n, t in schema.fields.items():
self.assertTrue(hasattr(obj, n))
shred(getattr(obj, n), t, columns, name.rec(n))
elif isinstance(schema, Union):
tag = columns[name.tag()]
for i, p in enumerate(tag.possibilities):
if obj in p:
tag.data.append(i)
shred(obj, p, columns, name.pos(i))
break
else:
raise Exception("unexpected type: {0} {1}".format(type(schema), schema))
Turns Femtocode columns back into the Python representation, for closed-loop tests.
def assemble(schema, columns, name):
if isinstance(name, string_types):
name = ColumnName(name)
if isinstance(columns.get(name), RecursiveColumn):
col = columns[name]
out = col.data[col.pointer]
col.pointer += 1
return out
elif isinstance(schema, Null):
return None
elif isinstance(schema, (Boolean, Number)):
col = columns[name]
out = col.data[col.pointer]
col.pointer += 1
return out
elif isinstance(schema, String):
col = columns[name]
if schema.charset == "bytes" and schema.fewest == schema.most:
size = schema.fewest
else:
sizeCol = columns[name.size()]
size = sizeCol.data[sizeCol.pointer]
sizeCol.pointer += 1
if schema.charset == "bytes":
if sys.version_info[0] >= 3:
out = bytes(col.data[col.pointer:col.pointer + size])
else:
out = b"".join(col.data[col.pointer:col.pointer + size])
else:
out = u"".join(col.data[col.pointer:col.pointer + size])
col.pointer += size
return out
elif isinstance(schema, Collection):
if schema.fewest == schema.most:
size = schema.fewest
else:
size = None
for n, c in columns.items():
if n.startswith(name) and n.endswith(ColumnName.sizeSuffix):
size = c.data[c.pointer]
c.pointer += 1
self.assertIsNotNone(size)
items = schema.items
return [assemble(items, columns, name) for i in xrange(size)]
elif isinstance(schema, Record):
names = sorted(schema.fields.keys())
return namedtuple("tmp", names)(*[assemble(schema.fields[n], columns, name.rec(n)) for n in names])
elif isinstance(schema, Union):
tag = columns[name.tag()]
pos = tag.data[tag.pointer]
tag.pointer += 1
return assemble(tag.possibilities[pos], columns, name.pos(pos))
else:
raise Exception("unexpected type: {0} {1}".format(type(schema), schema))
1. Schema: real
Input: [1.1, 2.2, 3.3, 4.4, 5.5]
Columns:
"root": [1.1, 2.2, 3.3, 4.4, 5.5]
-------------------------------------------------------------
2. Schema: collection(real)
Input: [[], [1.1], [2.2, 3.3], []]
Columns:
"root": [1.1, 2.2, 3.3]
"root@size": [0, 1, 2, 0]
-------------------------------------------------------------
3. Schema: collection(collection(real))
Input: [[], [[1.1]], [[], [2.2, 3.3]]]
Columns:
"root": [1.1, 2.2, 3.3]
"root@size": [0, 1, 1, 2, 0, 2]
-------------------------------------------------------------
4. Schema:
record(
a=real,
b=real
)
Input: [rec1(a=1.1, b=2.2), rec1(a=3.3, b=4.4)]
Columns:
"root.a": [1.1, 3.3]
"root.b": [2.2, 4.4]
-------------------------------------------------------------
5. Schema:
collection(
record(
a=real,
b=real
)
)
Input: [[], [rec1(a=1.1, b=2.2), rec1(a=3.3, b=4.4)]]
Columns:
"root.a": [1.1, 3.3]
"root.a@size": [0, 2]
"root.b": [2.2, 4.4]
"root.b@size": [0, 2]
-------------------------------------------------------------
6. Schema:
record(
x=real,
y=collection(
record(
a=real,
b=collection(
real
)
)
)
)
Input: [rec1(x=1, y=[rec2(a=2, b=[3, 4]), rec2(a=5, b=[])]), rec1(x=6, y=[rec2(a=9, b=[10, 11])])]
Columns:
"root.x": [1, 6]
"root.y.a": [2, 5, 9]
"root.y.a@size": [2, 1]
"root.y.b": [3, 4, 10, 11]
"root.y.b@size": [2, 2, 0, 1, 2]
-------------------------------------------------------------
7. Schema:
record("rec1",
a=real,
b=collection(
"rec1"
)
)
Input: [rec1(a=1.1, b=[]), rec1(a=2.2, b=[rec1(a=3.3, b=[])])]
Columns:
"root": [rec1(a=1.1, b=[]), rec1(a=2.2, b=[rec1(a=3.3, b=[])])]
Data at the same depth level (each one per event, one per track, etc.) can be operated on without reference to their size arrays (if they even have any). Before combining data at different levels of depth in a multiple-argument function, all arguments need to be brought up to the current level of depth.
This brings data
with no size array (one per event) up to the level of some modelsize
array, putting the output in out
. After calling this function, out
should be used in conjunction with modelsize
and it has depth modeldepth
.
Here's a Femtocode example of a situation where x
needs to be brought up to the level of y
:
x = event_level_value();
ys.map(y => y + x)
def explode(modeldepth, modelsize, data, out):
countdown = [None] * modeldepth
cdi = 0
datai = 0
outi = 0
for ms in modelsize:
countdown[cdi] = ms
if cdi == modeldepth - 1:
while countdown[cdi] != 0:
out[outi] = data[datai]
outi += 1
countdown[cdi] -= 1
cdi -= 1
if countdown[cdi] != 0:
countdown[cdi] -= 1
else:
while cdi != -1 and countdown[cdi] == 0:
cdi -= 1
if cdi == -1:
datai += 1
cdi += 1
Here's a Femtocode example of a situation where x
needs to be brought up to the level of y
:
xs = event_level_collection();
ys.map(y => xs.map(x => x + y))
"In" has its own depth and size, which is brought up to the model by setting in's hierarchy on top of model's. That is, if model is a collection(X)
and in is collection(real)
, out is a collection(collection(real))
. This function overwrites outsize
and outdata
. In Python, we do this by list.append
and list.extend
, but in a real situation with C, we'd have to run the function twice: first to get the lengths of outsize
and outdata
, then to fill them.
def explode(modeldepth, modelsize, indepth, insize, indata, outsize, outdata):
mocountdown = [0] * modeldepth
mocdi = 0
incountdown = [0] * indepth
incdi = 0
insizebeg = 0
insizeend = 0
indatabeg = 0
indataend = 0
for ms in modelsize:
if mocdi == 0 and mocountdown[mocdi] == 0:
insizebeg = insizeend
indatabeg = indataend
while True:
incountdown[incdi] = insize[insizeend]
insizeend += 1
if incdi == indepth - 1:
if incountdown[incdi] != 0:
indataend += incountdown[incdi]
incountdown[incdi] = 0
incdi -= 1
if incountdown[incdi] != 0:
incountdown[incdi] -= 1
else:
while incdi != -1 and incountdown[incdi] == 0:
incdi -= 1
incdi += 1
if incdi == 0 and incountdown[incdi] == 0:
break
mocountdown[mocdi] = ms
outsize.append(ms)
if mocdi == modeldepth - 1:
while mocountdown[mocdi] != 0:
outsize.extend(insize[insizebeg:insizeend])
outdata.extend(indata[indatabeg:indataend])
mocountdown[mocdi] -= 1
if mocountdown[mocdi] != 0:
mocountdown[mocdi] -= 1
else:
while mocdi != -1 and mocountdown[mocdi] == 0:
mocdi -= 1
mocdi += 1
Here's a Femtocode example of a situation where x
needs to be brought up to the level of y
:
xss.map(xs => ys = xs.map(x => x*2); xs.map(x => ys.map(y => x + y)))
TBD
Is this even a distinct case from case 2?
Are there any other cases? I don't think so.
For rolling up a size array to one of less depth or no size array at all. Examples below will be for sum
, but a similar algorithm would apply for every reducer, including max
, min
, etc.
TBD