Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Refactor statistics reporter to support dynamically added metrics
* Add latency and per-PV graphite metrics
* Add containerfile
* add support for streaming PV units (`un00` schema)

## v2.1.0

Expand Down
57 changes: 50 additions & 7 deletions forwarder/update_handlers/ca_update_handler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import time
from typing import List, Optional
from dataclasses import dataclass

from caproto import ReadNotifyResponse
from caproto import ReadNotifyResponse, timestamp_to_epics
from caproto.threading.client import PV
from caproto.threading.client import Context as CAContext

from forwarder.application_logger import get_logger
from forwarder.metrics import Counter, Summary, sanitise_metric_name
from forwarder.metrics.statistics_reporter import StatisticsReporter
from forwarder.update_handlers.serialiser_tracker import SerialiserTracker
from forwarder.update_handlers.un00_serialiser import un00_CASerialiser


class CAUpdateHandler:
Expand All @@ -34,6 +36,7 @@ def __init__(
self._processing_errors_metric = processing_errors_metric
self._processing_latency_metric = None
self._receive_latency_metric = None
self._last_update = 0
if self._statistics_reporter:
try:
self._processing_latency_metric = Summary(
Expand Down Expand Up @@ -66,19 +69,59 @@ def __init__(
ctrl_sub.add_callback(self._unit_callback)

def _unit_callback(self, sub, response: ReadNotifyResponse):
# sometimes caproto gives us a unit callback before a monitor callback.
# in this case, to avoid just dropping the unit update, approximate
# by using the current time.
fallback_timestamp = time.time()

self._logger.debug("CA Unit callback called for %s", self._pv_name)

old_unit = self._current_unit
try:
self._current_unit = response.metadata.units.decode("utf-8")
new_unit = response.metadata.units.decode("utf-8")
if new_unit is not None:
# we get a unit callback with blank units if the value has updated but the EGU field
# has not.
self._current_unit = new_unit
except AttributeError:
return
if old_unit is not None and old_unit != self._current_unit:
self._logger.error(
self._current_unit = None

if old_unit != self._current_unit:
self._logger.info(
f'Display unit of (ca) PV with name "{self._pv_name}" changed from "{old_unit}" to "{self._current_unit}".'
)
if self._processing_errors_metric:
self._processing_errors_metric.inc()
for serialiser_tracker in self.serialiser_tracker_list:
# Only let the unit serialiser deal with this update - as it has no value the other
# serialisers will fall over.
if isinstance(serialiser_tracker.serialiser, un00_CASerialiser):

# The next bit is pretty hacky. We are mocking the ReadNotifyResponse
# as by default its metadata is immutable/read-only, but we need to append the
# timestamp here.
@dataclass
class StupidMetaData:
timestamp: float
units: str

@dataclass
class StupidResponse:
metadata: StupidMetaData


update_time = self._last_update if self._last_update > 0 else fallback_timestamp
self._logger.debug(f"about to publish update. units: {self._current_unit}, timestamp: {update_time}")
meta = StupidMetaData(timestamp=update_time, units=self._current_unit)
response = StupidResponse(metadata=meta)
serialiser_tracker.process_ca_message(response) # type: ignore


def _monitor_callback(self, sub, response: ReadNotifyResponse):
self._logger.debug("CA Monitor callback called for %s", self._pv_name)
try:
self._last_update = response.metadata.timestamp
except Exception:
self._logger.warning("Error getting timestamp for %s", sub.pv.name)

if self._receive_latency_metric:
try:
response_timestamp = response.metadata.timestamp.seconds + (
Expand Down
19 changes: 14 additions & 5 deletions forwarder/update_handlers/pva_update_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from forwarder.metrics import Counter, Summary, sanitise_metric_name
from forwarder.metrics.statistics_reporter import StatisticsReporter
from forwarder.update_handlers.serialiser_tracker import SerialiserTracker
from forwarder.update_handlers.un00_serialiser import un00_PVASerialiser


class PVAUpdateHandler:
Expand Down Expand Up @@ -54,7 +55,7 @@ def __init__(
except Exception as e:
self._logger.warning(f"Could not initialise metric for {pv_name}: {e}")

request = context.makeRequest("field()")
request = PVAContext.makeRequest("field()")
self._sub = context.monitor(
pv_name,
self._monitor_callback,
Expand All @@ -63,17 +64,19 @@ def __init__(
)

def _monitor_callback(self, response: Union[Value, Exception]):
self._logger.debug("PVA monitor callback called for %s", self._pv_name)
old_unit = self._unit
try:
self._unit = response.display.units # type: ignore
except AttributeError:
pass
if old_unit is not None and old_unit != self._unit:
self._logger.error(

units_changed = old_unit != self._unit
if units_changed:
self._logger.info(
f'Display unit of (pva) PV with name "{self._pv_name}" changed from "{old_unit}" to "{self._unit}".'
)
if self._processing_errors_metric:
self._processing_errors_metric.inc()

if self._receive_latency_metric and isinstance(response, Value):
try:
response_timestamp = response.timeStamp.secondsPastEpoch + (
Expand All @@ -86,9 +89,15 @@ def _monitor_callback(self, response: Union[Value, Exception]):
if self._processing_latency_metric:
with self._processing_latency_metric.time():
for serialiser_tracker in self.serialiser_tracker_list:
if isinstance(serialiser_tracker.serialiser, un00_PVASerialiser) and not units_changed:
# If units haven't changed, don't publish a unit update
continue
serialiser_tracker.process_pva_message(response)
else:
for serialiser_tracker in self.serialiser_tracker_list:
if isinstance(serialiser_tracker.serialiser, un00_PVASerialiser) and not units_changed:
# If units haven't changed, don't publish a unit update
continue
serialiser_tracker.process_pva_message(response)
except (RuntimeError, ValueError) as e:
self._logger.error(
Expand Down
4 changes: 4 additions & 0 deletions forwarder/update_handlers/schema_serialiser_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
tdct_CASerialiser,
tdct_PVASerialiser,
)
from forwarder.update_handlers.un00_serialiser import un00_PVASerialiser, un00_CASerialiser


class SerialiserFactory:
Expand All @@ -39,6 +40,7 @@ class SerialiserFactory:
"f144": f144_CASerialiser,
"no_op": no_op_CASerialiser,
"tdct": tdct_CASerialiser,
"un00": un00_CASerialiser,
},
EpicsProtocol.FAKE: {
"al00": al00_PVASerialiser,
Expand All @@ -49,6 +51,7 @@ class SerialiserFactory:
"nttable_se00": nttable_se00_PVASerialiser,
"nttable_senv": nttable_senv_PVASerialiser,
"tdct": tdct_PVASerialiser,
"un00": un00_PVASerialiser,
},
EpicsProtocol.PVA: {
"al00": al00_PVASerialiser,
Expand All @@ -59,6 +62,7 @@ class SerialiserFactory:
"nttable_se00": nttable_se00_PVASerialiser,
"nttable_senv": nttable_senv_PVASerialiser,
"tdct": tdct_PVASerialiser,
"un00": un00_PVASerialiser,
},
}

Expand Down
16 changes: 13 additions & 3 deletions forwarder/update_handlers/serialiser_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,18 @@ def set_new_message(self, message: bytes, timestamp_ns: Union[int, float]):
message_datetime = datetime.fromtimestamp(timestamp_ns / 1e9, tz=timezone.utc)
if message_datetime < self._last_timestamp:
self._logger.error(
f"Rejecting update on {self._pv_name} as its timestamp is older than the previous message timestamp from that PV ({message_datetime} vs {self._last_timestamp})."
f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is older than the previous message timestamp from that PV ({message_datetime} vs {self._last_timestamp})."
)
return
current_datetime = datetime.now(tz=timezone.utc)
if message_datetime < current_datetime - LOWER_AGE_LIMIT:
self._logger.error(
f"Rejecting update on {self._pv_name} as its timestamp is older than allowed ({LOWER_AGE_LIMIT})."
f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is older than allowed ({LOWER_AGE_LIMIT})."
)
return
if message_datetime > current_datetime + UPPER_AGE_LIMIT:
self._logger.error(
f"Rejecting update on {self._pv_name} as its timestamp is from further into the future than allowed ({UPPER_AGE_LIMIT})."
f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is from further into the future than allowed ({UPPER_AGE_LIMIT})."
)
return
self._last_timestamp = message_datetime
Expand Down Expand Up @@ -182,4 +182,14 @@ def create_serialiser_list(
periodic_update_ms,
)
)
# Units serialiser
return_list.append(
SerialiserTracker(
SerialiserFactory.create_serialiser(protocol, "un00", pv_name),
producer,
pv_name,
output_topic,
periodic_update_ms,
)
)
return return_list
72 changes: 72 additions & 0 deletions forwarder/update_handlers/un00_serialiser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Optional, Tuple, Union
import p4p
from caproto import Message as CA_Message

from forwarder.application_logger import get_logger
from streaming_data_types import serialise_un00

from forwarder.kafka.kafka_helpers import seconds_to_nanoseconds
from forwarder.update_handlers.schema_serialisers import CASerialiser, PVASerialiser

logger = get_logger()

def _serialise(
source_name: str,
timestamp_ns: int | None,
units: str | None,
) -> Tuple[bytes, int]:
return (
serialise_un00(source_name, timestamp_ns, units),
timestamp_ns,
)


class un00_CASerialiser(CASerialiser):
def __init__(self, source_name: str):
self._source_name = source_name
self._message: Optional[str] = None
self._units: Optional[str] = None

def serialise(
self, update: CA_Message, **unused
) -> Union[Tuple[bytes, int], Tuple[None, None]]:
metadata = update.metadata
try:
timestamp = seconds_to_nanoseconds(metadata.timestamp)
except AttributeError:
logger.warning("No timestamp available for %s", self._source_name)
timestamp = None
try:
units = metadata.units
except AttributeError:
logger.warning("No units available for %s", self._source_name)
return [None, None]
logger.debug(f"Source name: {self._source_name}, timestamp: {timestamp}, units: {units}")
return _serialise(self._source_name, timestamp, units)

def conn_serialise(self, pv: str, state: str) -> Tuple[None, None]:
return None, None


class un00_PVASerialiser(PVASerialiser):
def __init__(self, source_name: str):
self._source_name = source_name
self._message: Optional[str] = None
self._units: Optional[str] = None

def serialise(
self, update: Union[p4p.Value, RuntimeError]
) -> Union[Tuple[bytes, int], Tuple[None, None]]:
if isinstance(update, RuntimeError):
return None, None
timestamp = (
update.timeStamp.secondsPastEpoch * 1_000_000_000
) + update.timeStamp.nanoseconds

try:
self._units = update.display.units
except AttributeError:
logger.warning("No units available for %s", self._source_name)
self._units = None

return _serialise(self._source_name, timestamp, self._units)
1 change: 1 addition & 0 deletions tests/test_helpers/ca_fakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def get_pvs(
return [FakePV(pv_name, self.subscription) for pv_name in pv_names]

def call_monitor_callback_with_fake_pv_update(self, pv_update: ReadNotifyResponse):
# This actually calls both the monitor and unit callbacks.
for c in self.subscription.callback:
c(self.subscription, pv_update)

Expand Down
Loading