From 5da3cd12d9a356e0ec125b6d9912822ddf5359d3 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Thu, 7 Aug 2025 17:09:20 +0100 Subject: [PATCH] forward units using un00 schema --- changes.md | 1 + .../update_handlers/ca_update_handler.py | 57 +++++++-- .../update_handlers/pva_update_handler.py | 19 ++- .../schema_serialiser_factory.py | 4 + .../update_handlers/serialiser_tracker.py | 16 ++- forwarder/update_handlers/un00_serialiser.py | 72 +++++++++++ tests/test_helpers/ca_fakes.py | 1 + .../update_handlers/ca_update_handler_test.py | 118 ++++++++++++++++++ .../pva_update_handler_test.py | 26 ++++ 9 files changed, 299 insertions(+), 15 deletions(-) create mode 100644 forwarder/update_handlers/un00_serialiser.py diff --git a/changes.md b/changes.md index aac27115..8f8b548f 100644 --- a/changes.md +++ b/changes.md @@ -7,6 +7,7 @@ * Refactor statistics reporter to support dynamically added metrics * Add latency and per-PV graphite metrics * Add containerfile +* add support for streaming PV units (`un00` schema) ## v2.1.0 diff --git a/forwarder/update_handlers/ca_update_handler.py b/forwarder/update_handlers/ca_update_handler.py index ab414bb7..014c4a96 100644 --- a/forwarder/update_handlers/ca_update_handler.py +++ b/forwarder/update_handlers/ca_update_handler.py @@ -1,7 +1,8 @@ import time from typing import List, Optional +from dataclasses import dataclass -from caproto import ReadNotifyResponse +from caproto import ReadNotifyResponse, timestamp_to_epics from caproto.threading.client import PV from caproto.threading.client import Context as CAContext @@ -9,6 +10,7 @@ from forwarder.metrics import Counter, Summary, sanitise_metric_name from forwarder.metrics.statistics_reporter import StatisticsReporter from forwarder.update_handlers.serialiser_tracker import SerialiserTracker +from forwarder.update_handlers.un00_serialiser import un00_CASerialiser class CAUpdateHandler: @@ -34,6 +36,7 @@ def __init__( self._processing_errors_metric = processing_errors_metric self._processing_latency_metric = None self._receive_latency_metric = None + self._last_update = 0 if self._statistics_reporter: try: self._processing_latency_metric = Summary( @@ -66,19 +69,59 @@ def __init__( ctrl_sub.add_callback(self._unit_callback) def _unit_callback(self, sub, response: ReadNotifyResponse): + # sometimes caproto gives us a unit callback before a monitor callback. + # in this case, to avoid just dropping the unit update, approximate + # by using the current time. + fallback_timestamp = time.time() + + self._logger.debug("CA Unit callback called for %s", self._pv_name) + old_unit = self._current_unit try: - self._current_unit = response.metadata.units.decode("utf-8") + new_unit = response.metadata.units.decode("utf-8") + if new_unit is not None: + # we get a unit callback with blank units if the value has updated but the EGU field + # has not. + self._current_unit = new_unit except AttributeError: - return - if old_unit is not None and old_unit != self._current_unit: - self._logger.error( + self._current_unit = None + + if old_unit != self._current_unit: + self._logger.info( f'Display unit of (ca) PV with name "{self._pv_name}" changed from "{old_unit}" to "{self._current_unit}".' ) - if self._processing_errors_metric: - self._processing_errors_metric.inc() + for serialiser_tracker in self.serialiser_tracker_list: + # Only let the unit serialiser deal with this update - as it has no value the other + # serialisers will fall over. + if isinstance(serialiser_tracker.serialiser, un00_CASerialiser): + + # The next bit is pretty hacky. We are mocking the ReadNotifyResponse + # as by default its metadata is immutable/read-only, but we need to append the + # timestamp here. + @dataclass + class StupidMetaData: + timestamp: float + units: str + + @dataclass + class StupidResponse: + metadata: StupidMetaData + + + update_time = self._last_update if self._last_update > 0 else fallback_timestamp + self._logger.debug(f"about to publish update. units: {self._current_unit}, timestamp: {update_time}") + meta = StupidMetaData(timestamp=update_time, units=self._current_unit) + response = StupidResponse(metadata=meta) + serialiser_tracker.process_ca_message(response) # type: ignore + def _monitor_callback(self, sub, response: ReadNotifyResponse): + self._logger.debug("CA Monitor callback called for %s", self._pv_name) + try: + self._last_update = response.metadata.timestamp + except Exception: + self._logger.warning("Error getting timestamp for %s", sub.pv.name) + if self._receive_latency_metric: try: response_timestamp = response.metadata.timestamp.seconds + ( diff --git a/forwarder/update_handlers/pva_update_handler.py b/forwarder/update_handlers/pva_update_handler.py index 8f015936..9bd13fdd 100644 --- a/forwarder/update_handlers/pva_update_handler.py +++ b/forwarder/update_handlers/pva_update_handler.py @@ -8,6 +8,7 @@ from forwarder.metrics import Counter, Summary, sanitise_metric_name from forwarder.metrics.statistics_reporter import StatisticsReporter from forwarder.update_handlers.serialiser_tracker import SerialiserTracker +from forwarder.update_handlers.un00_serialiser import un00_PVASerialiser class PVAUpdateHandler: @@ -54,7 +55,7 @@ def __init__( except Exception as e: self._logger.warning(f"Could not initialise metric for {pv_name}: {e}") - request = context.makeRequest("field()") + request = PVAContext.makeRequest("field()") self._sub = context.monitor( pv_name, self._monitor_callback, @@ -63,17 +64,19 @@ def __init__( ) def _monitor_callback(self, response: Union[Value, Exception]): + self._logger.debug("PVA monitor callback called for %s", self._pv_name) old_unit = self._unit try: self._unit = response.display.units # type: ignore except AttributeError: pass - if old_unit is not None and old_unit != self._unit: - self._logger.error( + + units_changed = old_unit != self._unit + if units_changed: + self._logger.info( f'Display unit of (pva) PV with name "{self._pv_name}" changed from "{old_unit}" to "{self._unit}".' ) - if self._processing_errors_metric: - self._processing_errors_metric.inc() + if self._receive_latency_metric and isinstance(response, Value): try: response_timestamp = response.timeStamp.secondsPastEpoch + ( @@ -86,9 +89,15 @@ def _monitor_callback(self, response: Union[Value, Exception]): if self._processing_latency_metric: with self._processing_latency_metric.time(): for serialiser_tracker in self.serialiser_tracker_list: + if isinstance(serialiser_tracker.serialiser, un00_PVASerialiser) and not units_changed: + # If units haven't changed, don't publish a unit update + continue serialiser_tracker.process_pva_message(response) else: for serialiser_tracker in self.serialiser_tracker_list: + if isinstance(serialiser_tracker.serialiser, un00_PVASerialiser) and not units_changed: + # If units haven't changed, don't publish a unit update + continue serialiser_tracker.process_pva_message(response) except (RuntimeError, ValueError) as e: self._logger.error( diff --git a/forwarder/update_handlers/schema_serialiser_factory.py b/forwarder/update_handlers/schema_serialiser_factory.py index fe9d4777..91931e1e 100644 --- a/forwarder/update_handlers/schema_serialiser_factory.py +++ b/forwarder/update_handlers/schema_serialiser_factory.py @@ -28,6 +28,7 @@ tdct_CASerialiser, tdct_PVASerialiser, ) +from forwarder.update_handlers.un00_serialiser import un00_PVASerialiser, un00_CASerialiser class SerialiserFactory: @@ -39,6 +40,7 @@ class SerialiserFactory: "f144": f144_CASerialiser, "no_op": no_op_CASerialiser, "tdct": tdct_CASerialiser, + "un00": un00_CASerialiser, }, EpicsProtocol.FAKE: { "al00": al00_PVASerialiser, @@ -49,6 +51,7 @@ class SerialiserFactory: "nttable_se00": nttable_se00_PVASerialiser, "nttable_senv": nttable_senv_PVASerialiser, "tdct": tdct_PVASerialiser, + "un00": un00_PVASerialiser, }, EpicsProtocol.PVA: { "al00": al00_PVASerialiser, @@ -59,6 +62,7 @@ class SerialiserFactory: "nttable_se00": nttable_se00_PVASerialiser, "nttable_senv": nttable_senv_PVASerialiser, "tdct": tdct_PVASerialiser, + "un00": un00_PVASerialiser, }, } diff --git a/forwarder/update_handlers/serialiser_tracker.py b/forwarder/update_handlers/serialiser_tracker.py index 7417f3ad..0df31f32 100644 --- a/forwarder/update_handlers/serialiser_tracker.py +++ b/forwarder/update_handlers/serialiser_tracker.py @@ -99,18 +99,18 @@ def set_new_message(self, message: bytes, timestamp_ns: Union[int, float]): message_datetime = datetime.fromtimestamp(timestamp_ns / 1e9, tz=timezone.utc) if message_datetime < self._last_timestamp: self._logger.error( - f"Rejecting update on {self._pv_name} as its timestamp is older than the previous message timestamp from that PV ({message_datetime} vs {self._last_timestamp})." + f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is older than the previous message timestamp from that PV ({message_datetime} vs {self._last_timestamp})." ) return current_datetime = datetime.now(tz=timezone.utc) if message_datetime < current_datetime - LOWER_AGE_LIMIT: self._logger.error( - f"Rejecting update on {self._pv_name} as its timestamp is older than allowed ({LOWER_AGE_LIMIT})." + f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is older than allowed ({LOWER_AGE_LIMIT})." ) return if message_datetime > current_datetime + UPPER_AGE_LIMIT: self._logger.error( - f"Rejecting update on {self._pv_name} as its timestamp is from further into the future than allowed ({UPPER_AGE_LIMIT})." + f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is from further into the future than allowed ({UPPER_AGE_LIMIT})." ) return self._last_timestamp = message_datetime @@ -182,4 +182,14 @@ def create_serialiser_list( periodic_update_ms, ) ) + # Units serialiser + return_list.append( + SerialiserTracker( + SerialiserFactory.create_serialiser(protocol, "un00", pv_name), + producer, + pv_name, + output_topic, + periodic_update_ms, + ) + ) return return_list diff --git a/forwarder/update_handlers/un00_serialiser.py b/forwarder/update_handlers/un00_serialiser.py new file mode 100644 index 00000000..07038962 --- /dev/null +++ b/forwarder/update_handlers/un00_serialiser.py @@ -0,0 +1,72 @@ +from typing import Optional, Tuple, Union +import p4p +from caproto import Message as CA_Message + +from forwarder.application_logger import get_logger +from streaming_data_types import serialise_un00 + +from forwarder.kafka.kafka_helpers import seconds_to_nanoseconds +from forwarder.update_handlers.schema_serialisers import CASerialiser, PVASerialiser + +logger = get_logger() + +def _serialise( + source_name: str, + timestamp_ns: int | None, + units: str | None, +) -> Tuple[bytes, int]: + return ( + serialise_un00(source_name, timestamp_ns, units), + timestamp_ns, + ) + + +class un00_CASerialiser(CASerialiser): + def __init__(self, source_name: str): + self._source_name = source_name + self._message: Optional[str] = None + self._units: Optional[str] = None + + def serialise( + self, update: CA_Message, **unused + ) -> Union[Tuple[bytes, int], Tuple[None, None]]: + metadata = update.metadata + try: + timestamp = seconds_to_nanoseconds(metadata.timestamp) + except AttributeError: + logger.warning("No timestamp available for %s", self._source_name) + timestamp = None + try: + units = metadata.units + except AttributeError: + logger.warning("No units available for %s", self._source_name) + return [None, None] + logger.debug(f"Source name: {self._source_name}, timestamp: {timestamp}, units: {units}") + return _serialise(self._source_name, timestamp, units) + + def conn_serialise(self, pv: str, state: str) -> Tuple[None, None]: + return None, None + + +class un00_PVASerialiser(PVASerialiser): + def __init__(self, source_name: str): + self._source_name = source_name + self._message: Optional[str] = None + self._units: Optional[str] = None + + def serialise( + self, update: Union[p4p.Value, RuntimeError] + ) -> Union[Tuple[bytes, int], Tuple[None, None]]: + if isinstance(update, RuntimeError): + return None, None + timestamp = ( + update.timeStamp.secondsPastEpoch * 1_000_000_000 + ) + update.timeStamp.nanoseconds + + try: + self._units = update.display.units + except AttributeError: + logger.warning("No units available for %s", self._source_name) + self._units = None + + return _serialise(self._source_name, timestamp, self._units) diff --git a/tests/test_helpers/ca_fakes.py b/tests/test_helpers/ca_fakes.py index a1fc7f7a..2e7d3b16 100644 --- a/tests/test_helpers/ca_fakes.py +++ b/tests/test_helpers/ca_fakes.py @@ -45,6 +45,7 @@ def get_pvs( return [FakePV(pv_name, self.subscription) for pv_name in pv_names] def call_monitor_callback_with_fake_pv_update(self, pv_update: ReadNotifyResponse): + # This actually calls both the monitor and unit callbacks. for c in self.subscription.callback: c(self.subscription, pv_update) diff --git a/tests/update_handlers/ca_update_handler_test.py b/tests/update_handlers/ca_update_handler_test.py index 2294e256..96d248cf 100644 --- a/tests/update_handlers/ca_update_handler_test.py +++ b/tests/update_handlers/ca_update_handler_test.py @@ -11,6 +11,7 @@ TimeStamp, timestamp_to_epics, ) +from caproto._dbr import DBR_CTRL_INT from streaming_data_types.alarm_al00 import Severity as al00_Severity from streaming_data_types.alarm_al00 import deserialise_al00 from streaming_data_types.epics_connection_ep01 import ConnectionInfo, deserialise_ep01 @@ -22,6 +23,7 @@ ) from streaming_data_types.logdata_f142 import deserialise_f142 from streaming_data_types.logdata_f144 import deserialise_f144 +from streaming_data_types.units_un00 import deserialise_un00 from streaming_data_types.timestamps_tdct import deserialise_tdct from streaming_data_types.utils import get_schema @@ -508,3 +510,119 @@ def test_handler_publishes_connection_state_change( connect_state_output = deserialise_ep01(producer.published_payloads[-1]) assert connect_state_output.status == state_enum assert connect_state_output.source_name == pv_source_name + +@pytest.mark.schema("f144") +def test_handler_does_not_publish_unit_update_if_egu_does_not_exist(context, producer, pv_source_name): + metadata = (0, 0, TimeStamp(*epics_timestamp())) + + context.call_monitor_callback_with_fake_pv_update( + ReadNotifyResponse( + np.array([1]).astype(np.int64), + ChannelType.TIME_ENUM, + 1, + 1, + 1, + metadata=metadata, + ) + ) + # f144 and al00, NOT un00 + assert len(producer.published_payloads) == 2 + un00_messages = [ + msg for msg in producer.published_payloads if "un00" == get_schema(msg) + ] + assert len(un00_messages) == 0 + +@pytest.mark.schema("f144") +def test_handler_publishes_update_with_changed_unit(context, producer, pv_source_name): + # initial monitor update + context.call_monitor_callback_with_fake_pv_update( + ReadNotifyResponse( + np.array([1]).astype(np.int32), + ChannelType.TIME_INT, + 1, + 1, + 1, + metadata=(0, 0, TimeStamp(*epics_timestamp())), + ) + ) + # This is to stop the monitor callback getting called - in real life + # only the unit callback is called. + del context.subscription.callback[0] + + initial_units = b"" + initial_metadata = (0, 0, initial_units) + + context.call_monitor_callback_with_fake_pv_update( + ReadNotifyResponse( + np.array([1]).astype(np.int32), + ChannelType.CTRL_INT, + 1, + 1, + 1, + metadata=initial_metadata, + ) + ) + + updated_units = b"mm" + updated_metadata = (0, 0, updated_units) + + context.call_monitor_callback_with_fake_pv_update( + ReadNotifyResponse( + np.array([1]).astype(np.int32), + ChannelType.CTRL_INT, + 1, + 1, + 1, + metadata=updated_metadata, + ) + ) + assert len(producer.published_payloads) == 4 + un00_messages = [ + msg for msg in producer.published_payloads if "un00" == get_schema(msg) + ] + assert len(un00_messages) == 2 + initial_un00_message = deserialise_un00(un00_messages[0]) + assert initial_un00_message.units == initial_units.decode() + update_un00_message = deserialise_un00(un00_messages[1]) + assert update_un00_message.units == updated_units.decode() + +@pytest.mark.schema("f144") +def test_handler_publishes_blank_update_initially(context, producer, pv_source_name): + units = b"" + + # initial monitor update + context.call_monitor_callback_with_fake_pv_update( + ReadNotifyResponse( + np.array([1]).astype(np.int32), + ChannelType.TIME_INT, + 1, + 1, + 1, + metadata=(0, 0, TimeStamp(*epics_timestamp())), + ) + ) + + + metadata = (0, 0, units) + + # This is to stop the monitor callback getting called - in real life + # only the unit callback is called. + del context.subscription.callback[0] + + context.call_monitor_callback_with_fake_pv_update( + ReadNotifyResponse( + np.array([1]).astype(np.int32), + ChannelType.CTRL_INT, + 1, + 1, + 1, + metadata=metadata, + ) + ) + assert len(producer.published_payloads) == 3 + un00_messages = [ + msg for msg in producer.published_payloads if "un00" == get_schema(msg) + ] + assert len(un00_messages) == 1 + un00_message = deserialise_un00(un00_messages[0]) + assert un00_message.units == units.decode() diff --git a/tests/update_handlers/pva_update_handler_test.py b/tests/update_handlers/pva_update_handler_test.py index dfe8e6db..4ca7302d 100644 --- a/tests/update_handlers/pva_update_handler_test.py +++ b/tests/update_handlers/pva_update_handler_test.py @@ -16,6 +16,7 @@ ) from streaming_data_types.logdata_f142 import deserialise_f142 from streaming_data_types.logdata_f144 import deserialise_f144 +from streaming_data_types.units_un00 import deserialise_un00 from streaming_data_types.timestamps_tdct import deserialise_tdct from streaming_data_types.utils import get_schema @@ -86,6 +87,11 @@ def test_update_handler_publishes_enum_update_f144(context, producer, pv_source_ pv_update_output = deserialise_f144(producer.published_payloads[-3]) assert np.allclose(pv_update_output.value, pv_index) assert pv_update_output.source_name == pv_source_name + # EGU does not exist on an enum so make sure we havent sent a unit update + un00_messages = [ + msg for msg in producer.published_payloads if "un00" == get_schema(msg) + ] + assert len(un00_messages) == 0 @pytest.mark.schema("f142") @@ -462,3 +468,23 @@ def test_handler_does_not_publish_if_never_connected( context.call_monitor_callback_with_fake_pv_update(exception) assert len(producer.published_payloads) == 0 + +@pytest.mark.schema("f144") +def test_handler_publishes_update_with_changed_unit(context, producer, pv_source_name): + expected_unit = 'mm' + t = NTScalar("i", valueAlarm=True, display=True, control=True).wrap(123, timestamp=time(), ) + t.display.units = expected_unit + + context.call_monitor_callback_with_fake_pv_update(t) + assert len(producer.published_payloads) == 4 + assert deserialise_un00(producer.published_payloads[-1]).units == expected_unit + +@pytest.mark.schema("f144") +def test_handler_publishes_blank_update_initially(context, producer, pv_source_name): + t = NTScalar("i", valueAlarm=True, display=True, control=True).wrap(123, timestamp=time(), ) + assert t.display.units == '' + + context.call_monitor_callback_with_fake_pv_update(t) + assert len(producer.published_payloads) == 4 + assert deserialise_un00(producer.published_payloads[-1]).units == '' +