Skip to content

Commit

Permalink
Merge pull request #128 from simonsobs/aggregate_G3Timesample
Browse files Browse the repository at this point in the history
Switch from IrregBlockDouble to G3Timesample

CI tests ran in PR, skipping CI to prevent docker image build.

[skip ci]
  • Loading branch information
BrianJKoopman authored Jun 17, 2020
2 parents 786c8ea + beed2b2 commit 7ac2984
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 27 deletions.
93 changes: 67 additions & 26 deletions ocs/agent/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import os
import datetime
import binascii
import time

from typing import Dict

import txaio
from typing import Dict

from ocs import ocs_feed

Expand All @@ -18,6 +15,60 @@
G3Module = object


HKAGG_VERSION = 1
_g3_casts = {
str: core.G3String, int: core.G3Int, float: core.G3Double,
}
_g3_list_casts = {
str: core.G3VectorString, int: core.G3VectorInt, float: core.G3VectorDouble,
}


def g3_cast(data, time=False):
"""
Casts a generic datatype into a corresponding G3 type. With:
int -> G3Int
str -> G3String
float -> G3Double
and lists of type X will go to G3VectorX. If ``time`` is set to True, will
convert to G3Time or G3VectorTime with the assumption that ``data`` consists
of unix timestamps.
Args:
data (int, str, float, or list):
Generic data to be converted to a corresponding G3Type.
time (bool, optional):
If True, will assume data contains unix timestamps and try to cast
to G3Time or G3VectorTime.
Returns:
g3_data:
Corresponding G3 datatype.
"""
is_list = isinstance(data, list)
if is_list:
dtype = type(data[0])
if not all(isinstance(d, dtype) for d in data):
raise TypeError("Data list contains varying types!")
else:
dtype = type(data)
if dtype not in _g3_casts.keys():
raise TypeError("g3_cast does not support type {}. Type must"
"be one of {}".format(dtype, _g3_casts.keys()))
if is_list:
if time:
return core.G3VectorTime(list(map(
lambda t: core.G3Time(t * core.G3Units.s), data)))
else:
cast = _g3_list_casts[type(data[0])]
return cast(data)
else:
if time:
return core.G3Time(data * core.G3Units.s)
else:
cast = _g3_casts[type(data)]
return cast(data)

def generate_id(hksess):
"""
Generates a unique session id based on the start_time, process_id,
Expand Down Expand Up @@ -206,28 +257,18 @@ def to_frame(self, hksess=None, clear=False):
for block_name, block in self.blocks.items():
if not block.timestamps:
continue

hk = so3g.IrregBlockDouble()
hk.prefix = block.prefix
hk.t = block.timestamps
for key, ts in block.data.items():
try:
hk.data[key] = ts
except TypeError:
all_types = set([type(x) for x in ts])
self.log.error("datapoint passed from address " +
"{a} to the Provider feed is of " +
"invalid type. Types contained " +
"in the passed list are {t}",
a=self.address, t=all_types)
self.log.error("full data list for {k}: {d}",
k=key, d=ts)

frame['blocks'].append(hk)

try:
m = core.G3TimesampleMap()
m.times = g3_cast(block.timestamps, time=True)
for key, ts in block.data.items():
m[key] = g3_cast(ts)
except Exception as e:
self.log.warn("Error received when casting timestream! {e}",
e=e)
continue
frame['blocks'].append(m)
if clear:
self.clear()

return frame


Expand Down Expand Up @@ -291,7 +332,6 @@ def Process(self, frames):
by `filename` function passed to constructor
"""
for frame in frames:

ftype = frame['hkagg_type']

if ftype == so3g.HKFrameType.session:
Expand Down Expand Up @@ -362,7 +402,8 @@ class Aggregator:
def __init__(self, incoming_data, time_per_file, data_dir, session=None):
self.log = txaio.make_logger()

self.hksess = so3g.hk.HKSessionHelper(description="HK data")
self.hksess = so3g.hk.HKSessionHelper(description="HK data",
hkagg_version=HKAGG_VERSION)
self.hksess.start_time = time.time()
self.hksess.session_id = generate_id(self.hksess)

Expand Down
25 changes: 24 additions & 1 deletion tests/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import pytest

import so3g
from spt3g import core

from ocs.agent.aggregator import Provider
from ocs.agent.aggregator import Provider, g3_cast

def test_passing_float_in_provider_to_frame():
"""Float is the expected type we should be passing.
Expand Down Expand Up @@ -88,3 +89,25 @@ def test_data_type_in_provider_write():
'prefix': ''}
}
provider.write(data)


def test_g3_cast():
correct_tests = [
([1, 2, 3, 4], core.G3VectorInt),
([1., 2., 3.], core.G3VectorDouble),
(["a", "b", "c"], core.G3VectorString),
(3, core.G3Int),
("test", core.G3String)
]
for x, t in correct_tests:
assert isinstance(g3_cast(x), t)

assert isinstance(g3_cast(3, time=True), core.G3Time)
assert isinstance(g3_cast([1, 2, 3], time=True), core.G3VectorTime)

incorrect_tests = [
['a', 'b', 1, 2], True, [1, 1.0, 2]
]
for x in incorrect_tests:
with pytest.raises(TypeError) as e_info:
g3_cast(x)

0 comments on commit 7ac2984

Please sign in to comment.