diff --git a/ocs/agent/aggregator.py b/ocs/agent/aggregator.py index 7ef229b8..b728a2ca 100644 --- a/ocs/agent/aggregator.py +++ b/ocs/agent/aggregator.py @@ -1,11 +1,8 @@ import os -import datetime import binascii import time - -from typing import Dict - import txaio +from typing import Dict from ocs import ocs_feed @@ -18,6 +15,60 @@ G3Module = object +HKAGG_VERSION = 1 +_g3_casts = { + str: core.G3String, int: core.G3Int, float: core.G3Double, +} +_g3_list_casts = { + str: core.G3VectorString, int: core.G3VectorInt, float: core.G3VectorDouble, +} + + +def g3_cast(data, time=False): + """ + Casts a generic datatype into a corresponding G3 type. With: + int -> G3Int + str -> G3String + float -> G3Double + and lists of type X will go to G3VectorX. If ``time`` is set to True, will + convert to G3Time or G3VectorTime with the assumption that ``data`` consists + of unix timestamps. + + Args: + data (int, str, float, or list): + Generic data to be converted to a corresponding G3Type. + time (bool, optional): + If True, will assume data contains unix timestamps and try to cast + to G3Time or G3VectorTime. + + Returns: + g3_data: + Corresponding G3 datatype. + """ + is_list = isinstance(data, list) + if is_list: + dtype = type(data[0]) + if not all(isinstance(d, dtype) for d in data): + raise TypeError("Data list contains varying types!") + else: + dtype = type(data) + if dtype not in _g3_casts.keys(): + raise TypeError("g3_cast does not support type {}. Type must" + "be one of {}".format(dtype, _g3_casts.keys())) + if is_list: + if time: + return core.G3VectorTime(list(map( + lambda t: core.G3Time(t * core.G3Units.s), data))) + else: + cast = _g3_list_casts[type(data[0])] + return cast(data) + else: + if time: + return core.G3Time(data * core.G3Units.s) + else: + cast = _g3_casts[type(data)] + return cast(data) + def generate_id(hksess): """ Generates a unique session id based on the start_time, process_id, @@ -206,28 +257,18 @@ def to_frame(self, hksess=None, clear=False): for block_name, block in self.blocks.items(): if not block.timestamps: continue - - hk = so3g.IrregBlockDouble() - hk.prefix = block.prefix - hk.t = block.timestamps - for key, ts in block.data.items(): - try: - hk.data[key] = ts - except TypeError: - all_types = set([type(x) for x in ts]) - self.log.error("datapoint passed from address " + - "{a} to the Provider feed is of " + - "invalid type. Types contained " + - "in the passed list are {t}", - a=self.address, t=all_types) - self.log.error("full data list for {k}: {d}", - k=key, d=ts) - - frame['blocks'].append(hk) - + try: + m = core.G3TimesampleMap() + m.times = g3_cast(block.timestamps, time=True) + for key, ts in block.data.items(): + m[key] = g3_cast(ts) + except Exception as e: + self.log.warn("Error received when casting timestream! {e}", + e=e) + continue + frame['blocks'].append(m) if clear: self.clear() - return frame @@ -291,7 +332,6 @@ def Process(self, frames): by `filename` function passed to constructor """ for frame in frames: - ftype = frame['hkagg_type'] if ftype == so3g.HKFrameType.session: @@ -362,7 +402,8 @@ class Aggregator: def __init__(self, incoming_data, time_per_file, data_dir, session=None): self.log = txaio.make_logger() - self.hksess = so3g.hk.HKSessionHelper(description="HK data") + self.hksess = so3g.hk.HKSessionHelper(description="HK data", + hkagg_version=HKAGG_VERSION) self.hksess.start_time = time.time() self.hksess.session_id = generate_id(self.hksess) diff --git a/tests/test_aggregator.py b/tests/test_aggregator.py index 02d371a4..bd29ffe2 100644 --- a/tests/test_aggregator.py +++ b/tests/test_aggregator.py @@ -2,8 +2,9 @@ import pytest import so3g +from spt3g import core -from ocs.agent.aggregator import Provider +from ocs.agent.aggregator import Provider, g3_cast def test_passing_float_in_provider_to_frame(): """Float is the expected type we should be passing. @@ -88,3 +89,25 @@ def test_data_type_in_provider_write(): 'prefix': ''} } provider.write(data) + + +def test_g3_cast(): + correct_tests = [ + ([1, 2, 3, 4], core.G3VectorInt), + ([1., 2., 3.], core.G3VectorDouble), + (["a", "b", "c"], core.G3VectorString), + (3, core.G3Int), + ("test", core.G3String) + ] + for x, t in correct_tests: + assert isinstance(g3_cast(x), t) + + assert isinstance(g3_cast(3, time=True), core.G3Time) + assert isinstance(g3_cast([1, 2, 3], time=True), core.G3VectorTime) + + incorrect_tests = [ + ['a', 'b', 1, 2], True, [1, 1.0, 2] + ] + for x in incorrect_tests: + with pytest.raises(TypeError) as e_info: + g3_cast(x)