From a2bb68fddf8b2454935a370c94651c962a2f2def Mon Sep 17 00:00:00 2001 From: jbellister-slac Date: Mon, 3 Jan 2022 10:48:29 -0800 Subject: [PATCH] Include the necessary modifications to display archive data in timeplots --- pydm/data_plugins/archiver_plugin.py | 148 ++++++++++++++++++++++++--- pydm/tests/widgets/test_timeplot.py | 99 ++++++++++++++++++ pydm/widgets/multi_axis_plot.py | 10 +- pydm/widgets/multi_axis_viewbox.py | 3 + pydm/widgets/qtplugin_extensions.py | 6 ++ pydm/widgets/qtplugins.py | 22 +++- pydm/widgets/timeplot.py | 14 ++- requirements.txt | 2 +- 8 files changed, 273 insertions(+), 31 deletions(-) diff --git a/pydm/data_plugins/archiver_plugin.py b/pydm/data_plugins/archiver_plugin.py index fdb2b4a69..1df3bc4cd 100644 --- a/pydm/data_plugins/archiver_plugin.py +++ b/pydm/data_plugins/archiver_plugin.py @@ -1,25 +1,143 @@ -from pydm.data_plugins.plugin import PyDMPlugin, PyDMConnection -import requests -import numpy as np import os +import json +import logging +import numpy as np + +from datetime import datetime +from typing import Optional + +from pydm.widgets.channel import PyDMChannel +from qtpy.QtCore import Slot, QObject, QUrl +from qtpy.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply +from pydm.data_plugins.plugin import PyDMPlugin, PyDMConnection + +logger = logging.getLogger(__name__) class Connection(PyDMConnection): + """ + Manages the requests between the archiver data plugin and the archiver appliance itself. + """ - def __init__(self, channel, address, protocol=None, parent=None): + def __init__(self, channel: PyDMChannel, address: str, protocol: Optional[str] = None, + parent: Optional[QObject] = None): super(Connection, self).__init__(channel, address, protocol, parent) self.add_listener(channel) - base_url = os.getenv("PYDM_ARCHIVER_URL", "http://lcls-archapp.slac.stanford.edu") - url_string = "{base}/retrieval/data/getData.json?{params}".format(base=base_url, params=address) - r = requests.get(url_string) # blocking. BAD! - if r.status_code == 200 and r.headers['content-type'] == 'application/json': - self.connected = True - self.connection_state_signal.emit(True) - data_dict = r.json() - # x_data not used so commented out... maybe return it with y_data? - # x_data = np.array([point["secs"] for point in data_dict[0]["data"]]) - y_data = np.array([point["val"] for point in data_dict[0]["data"]]) - self.new_waveform_signal.emit(y_data) + self.address = address + self.network_manager = QNetworkAccessManager() + self.network_manager.finished[QNetworkReply].connect(self.data_request_finished) + + def add_listener(self, channel: PyDMChannel): + """ + Connects a channel's signal to the slot on this connection so that the channel has a way of requesting + and receiving data from the archiver. + + Parameters + ---------- + channel : PyDMChannel + The channel to connect + """ + super(Connection, self).add_listener(channel) + if channel.value_signal is not None: + channel.value_signal.connect(self.fetch_data) + + def fetch_data(self, from_date: float, to_date: float, processing_command: Optional[str] = None): + """ + Fetches data from the Archiver Appliance based on the input parameters. + + Parameters + ---------- + from_date : float + Timestamp for the oldest data point to retrieve + to_date : float + Timestamp for the newest data point to retrieve + processing_command : str + A string that will be added to the URL to request additional processing on the archiver side before + returning the data such as mean values or optimized. For a full list see: + https://slacmshankar.github.io/epicsarchiver_docs/userguide.html + Note: Due to the potential of additional valid options in the future, no validation is + done on this parameter. It is the responsibility of the caller to ensure it is valid + """ + if from_date >= to_date: + logger.error(f"Cannot fetch data for invalid data range, from date={from_date} and to date={to_date}") + return + + # Archiver expects timestamps to be in utc by default + from_dt = datetime.utcfromtimestamp(from_date) + to_dt = datetime.utcfromtimestamp(to_date) + + # Put the dates into the form expected by the archiver in the request url, see here for more details: + # http://joda-time.sourceforge.net/apidocs/org/joda/time/format/ISODateTimeFormat.html#dateTime() + from_date_str = from_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + to_date_str = to_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + base_url = os.getenv("PYDM_ARCHIVER_URL") + if base_url is None: + logger.error("Environment variable: PYDM_ARCHIVER_URL must be defined to use the archiver plugin, for " + "example: http://lcls-archapp.slac.stanford.edu") + return + + url_string = f"{base_url}/retrieval/data/getData.json?{self.address}&from={from_date_str}&to={to_date_str}" + if processing_command: + url_string = url_string.replace("pv=", "pv=" + processing_command + "(", 1) + url_string = url_string.replace("&from=", ")&from=", 1) + + request = QNetworkRequest(QUrl(url_string)) + # This get call is non-blocking, can be made in parallel with others, and when the results are ready they + # will be delivered to the data_request_finished method below via the "finished" signal + self.network_manager.get(request) + + @Slot(QNetworkReply) + def data_request_finished(self, reply: QNetworkReply): + """ + Invoked when the request to the archiver appliance has been completed and the reply has been returned. Will + fire off the value signal with a 2D numpy array containing the x-values (timestamps) and y-values (PV data). + + Parameters + ---------- + reply: The response from the archiver appliance + """ + if reply.error() == QNetworkReply.NoError and reply.header(QNetworkRequest.ContentTypeHeader) == "application/json": + bytes_str = reply.readAll() + data_dict = json.loads(str(bytes_str, 'utf-8')) + + if "pv=optimized" in reply.url().url(): # From a url object to a string + self._send_optimized_data(data_dict) + else: + self._send_raw_data(data_dict) + else: + logger.debug(f"Request for data from archiver failed, request url: {reply.url()} retrieved header: " + f"{reply.header(QNetworkRequest.ContentTypeHeader)} error: {reply.error()}") + reply.deleteLater() + + def _send_raw_data(self, data_dict): + """ + Sends a numpy array of shape (2, data_length) containing the x-values (timestamps) and y-values (PV data) + via the new value signal + """ + data = np.array(([point["secs"] for point in data_dict[0]["data"]], + [point["val"] for point in data_dict[0]["data"]])) + self.new_value_signal[np.ndarray].emit(data) + + def _send_optimized_data(self, data_dict): + """ + Sends a numpy array of shape (5, data_length). Index 0 contains the timestamps, index 1 the mean values, + index 2 the standard deviations, index 3 the minimum values, and index 4 the maximum values. + """ + pv_data = [point["val"] for point in data_dict[0]["data"]] + try: + data = np.array(([point["secs"] for point in data_dict[0]["data"]], + [point[0] for point in pv_data], + [point[1] for point in pv_data], + [point[2] for point in pv_data], + [point[3] for point in pv_data])) + except TypeError: + # The archiver will fall back to sending raw data if the optimized request is for more data points + # than are in the bin + self._send_raw_data(data_dict) + return + + self.new_value_signal[np.ndarray].emit(data) class ArchiverPlugin(PyDMPlugin): diff --git a/pydm/tests/widgets/test_timeplot.py b/pydm/tests/widgets/test_timeplot.py index b6444f118..4ba376d11 100644 --- a/pydm/tests/widgets/test_timeplot.py +++ b/pydm/tests/widgets/test_timeplot.py @@ -1,4 +1,5 @@ import pytest +from pydm.widgets.archiver_time_plot import ArchivePlotCurveItem from pyqtgraph import AxisItem from ...widgets.timeplot import TimePlotCurveItem, PyDMTimePlot, TimeAxisItem, MINIMUM_BUFFER_SIZE, DEFAULT_BUFFER_SIZE @@ -128,6 +129,104 @@ def test_timeplotcurveitem_receive_value(qtbot, signals, async_update, new_data) assert pydm_timeplot_curve_item._update_mode == PyDMTimePlot.SynchronousMode +def test_receive_archive_data(qtbot, signals): + """ + Ensure data from archiver appliance is inserted into the archive buffer correctly + """ + curve_item = ArchivePlotCurveItem() + curve_item.setBufferSize(20) + curve_item.setArchiveBufferSize(20) + + # We start with a data buffer with some sample values like those generated by a running time plot + # The x-values are timestamps and the y-values are samples from a PV + example_live_data = np.array([[100, 101, 102, 103, 104, 105], + [2.15, 2.20, 2.25, 2.22, 2.20, 2.18]]) + starting_data = np.concatenate((np.zeros((2, curve_item.getBufferSize()-6)), example_live_data), axis=1) + curve_item.points_accumulated += 6 + curve_item.data_buffer = starting_data + + # A quick check to make sure we're starting off correctly + assert (2, 20) == curve_item.data_buffer.shape + + signals.new_value_signal[np.ndarray].connect(curve_item.receiveArchiveData) + + # First test the most basic case, where we've requested a bit of archive data right before the live data + mock_archive_data = np.array([[70, 75, 80, 85, 90, 95], + [2.05, 2.08, 2.07, 2.08, 2.12, 2.14]]) + signals.new_value_signal[np.ndarray].emit(mock_archive_data) + + expected_data = np.zeros((2, 14)) + expected_data = np.concatenate((expected_data, mock_archive_data), axis=1) + + # Confirm the archive data was inserted as expected + assert np.array_equal(curve_item.archive_data_buffer, expected_data) + + +def test_insert_archive_data(qtbot, signals): + """ When first receiving large amounts of data from the archiver appliance, it will be of the 'optimized' form + in which it is sampled across a fixed number of bins. Drawing a zoom box in this data will get more detailed + data which must be inserted into the archive data buffer. This tests that insertion is successful. """ + curve_item = ArchivePlotCurveItem() + curve_item.setBufferSize(10) + curve_item.setArchiveBufferSize(10) + + # Need some initial data in the live data buffer so that the archive data gets inserted into the proper place + curve_item.data_buffer = np.array([[130, 140], [8, 9]]) + curve_item.points_accumulated = 2 + + # Set up a sample archive buffer + curve_item.archive_data_buffer = np.array([[0, 0, 0, 0, 100, 105, 110, 115, 120, 125], + [0, 0, 0, 0, 2, 3, 4, 5, 6, 7]], + dtype=float) + + curve_item.archive_points_accumulated = 6 + curve_item.zoomed = True + + signals.new_value_signal[np.ndarray].connect(curve_item.receiveArchiveData) + + # Receive raw data that is more detailed than the two values it will be replacing + mock_archive_data = np.array([[104, 106, 108, 111], + [2.8, 3.1, 3.7, 3.95]]) + signals.new_value_signal[np.ndarray].emit(mock_archive_data) + + # The original average values for timestamps 105 and 106 should now be replace with the actual PV data + expected_data = np.array([[0, 0, 100, 104, 106, 108, 111, 115, 120, 125], + [0, 0, 2, 2.8, 3.1, 3.7, 3.95, 5, 6, 7]]) + + assert np.array_equal(curve_item.archive_data_buffer, expected_data) + + +def test_archive_buffer_full(qtbot, signals): + """ If we insert more data points than the archive buffer can hold, then the oldest points are + removed in favor of the new ones until the user requests further backfill data again """ + curve_item = ArchivePlotCurveItem() + curve_item.setBufferSize(6) + curve_item.setArchiveBufferSize(6) + curve_item.data_buffer = np.array([[130, 140], [8, 9]]) + curve_item.points_accumulated = 2 + + # Set up a sample archive buffer that is already full + curve_item.archive_data_buffer = np.array([[100, 105, 110, 115, 120, 125], + [2, 3, 4, 5, 6, 7]], + dtype=float) + curve_item.archive_points_accumulated = 6 + curve_item.zoomed = True + + signals.new_value_signal[np.ndarray].connect(curve_item.receiveArchiveData) + + # Receive data that will cause that will not fit in the buffer without deleting other data points + mock_archive_data = np.array([[104, 106, 108], + [2.8, 3.1, 3.7]]) + + signals.new_value_signal[np.ndarray].emit(mock_archive_data) + + # This is what is left over after the oldest data points have been trimmed + expected_data = np.array([[104, 106, 108, 115, 120, 125], + [2.8, 3.1, 3.7, 5, 6, 7]]) + + assert np.array_equal(curve_item.archive_data_buffer, expected_data) + + @pytest.mark.parametrize("async_update, new_data", [ (False, -10), (False, 10.2333), diff --git a/pydm/widgets/multi_axis_plot.py b/pydm/widgets/multi_axis_plot.py index 728e217dc..f524c2cf1 100644 --- a/pydm/widgets/multi_axis_plot.py +++ b/pydm/widgets/multi_axis_plot.py @@ -77,10 +77,7 @@ def addAxis(self, axis, name, plotDataItem=None, setXLink=False, enableAutoRange view.setYRange(minRange, maxRange) view.enableAutoRange(axis=ViewBox.XAxis, enable=enableAutoRangeX) view.enableAutoRange(axis=ViewBox.YAxis, enable=enableAutoRangeY) - if setXLink: - view.setXLink(self) # Link this view to the shared x-axis of this plot item - else: - self.axes['bottom']['item'].linkToView(view) # Ensure the x axis will update when the view does + self.axes['bottom']['item'].linkToView(view) view.setMouseMode(self.vb.state['mouseMode']) # Ensure that mouse behavior is consistent between stacked views axis.linkToView(view) @@ -93,7 +90,6 @@ def addAxis(self, axis, name, plotDataItem=None, setXLink=False, enableAutoRange # Rebuilding the layout of the plot item will put the new axis in the correct place self.rebuildLayout() - self.updateStackedViews() def addStackedView(self, view): """ @@ -243,6 +239,10 @@ def setYRange(self, minY, maxY, padding=0, update=True): view.setYRange(minY, maxY, padding=padding) super(MultiAxisPlot, self).setYRange(minY, maxY, padding=padding) + def disableXAutoRange(self): + for view in self.stackedViews: + view.enableAutoRange(x=False) + def clearAxes(self): """ Cleans up all axis related data from this plot. diff --git a/pydm/widgets/multi_axis_viewbox.py b/pydm/widgets/multi_axis_viewbox.py index 2b56a32a9..a21bb3ba1 100644 --- a/pydm/widgets/multi_axis_viewbox.py +++ b/pydm/widgets/multi_axis_viewbox.py @@ -18,6 +18,7 @@ class MultiAxisViewBox(ViewBox): # These signals will be emitted by the view when it handles these events, and will be connected # to the event handling code of the stacked views sigMouseDragged = Signal(object, object, object) + sigMouseDraggedDone = Signal() sigMouseWheelZoomed = Signal(object, object, object) sigHistoryChanged = Signal(object) @@ -61,6 +62,8 @@ def mouseDragEvent(self, ev, axis=None, fromSignal=False): if axis != ViewBox.YAxis and not fromSignal: # This event happened within the view box area itself or the x axis so propagate to any stacked view boxes self.sigMouseDragged.emit(self, ev, axis) + if ev.isFinish() and self.state['mouseMode'] == ViewBox.RectMode and axis is None: + self.sigMouseDraggedDone.emit() super(MultiAxisViewBox, self).mouseDragEvent(ev, axis) def keyPressEvent(self, ev): diff --git a/pydm/widgets/qtplugin_extensions.py b/pydm/widgets/qtplugin_extensions.py index 405764a88..f6b187310 100644 --- a/pydm/widgets/qtplugin_extensions.py +++ b/pydm/widgets/qtplugin_extensions.py @@ -4,6 +4,7 @@ from ..widgets.base import PyDMPrimitiveWidget from ..widgets.rules_editor import RulesEditor +from ..widgets.archiver_time_plot_editor import ArchiverTimePlotCurveEditorDialog from ..widgets.waveformplot_curve_editor import WaveformPlotCurveEditorDialog from ..widgets.timeplot_curve_editor import TimePlotCurveEditorDialog from ..widgets.scatterplot_curve_editor import ScatterPlotCurveEditorDialog @@ -117,6 +118,11 @@ def __init__(self, widget): super(WaveformCurveEditorExtension, self).__init__(widget, WaveformPlotCurveEditorDialog) +class ArchiveTimeCurveEditorExtension(BasePlotExtension): + def __init__(self, widget): + super(ArchiveTimeCurveEditorExtension, self).__init__(widget, ArchiverTimePlotCurveEditorDialog) + + class TimeCurveEditorExtension(BasePlotExtension): def __init__(self, widget): super(TimeCurveEditorExtension, self).__init__(widget, TimePlotCurveEditorDialog) diff --git a/pydm/widgets/qtplugins.py b/pydm/widgets/qtplugins.py index 47730ede5..38501ea82 100644 --- a/pydm/widgets/qtplugins.py +++ b/pydm/widgets/qtplugins.py @@ -1,8 +1,9 @@ import logging +import os from .qtplugin_base import qtplugin_factory, WidgetCategory from .qtplugin_extensions import (RulesExtension, WaveformCurveEditorExtension, - TimeCurveEditorExtension, + TimeCurveEditorExtension, ArchiveTimeCurveEditorExtension, ScatterCurveEditorExtension, SymbolExtension) from .tab_bar_qtplugin import TabWidgetPlugin from .byte import PyDMByteIndicator @@ -32,6 +33,7 @@ from .waveformtable import PyDMWaveformTable from .scale import PyDMScaleIndicator from .timeplot import PyDMTimePlot +from .archiver_time_plot import PyDMArchiverTimePlot from .waveformplot import PyDMWaveformPlot from .scatterplot import PyDMScatterPlot from .template_repeater import PyDMTemplateRepeater @@ -51,10 +53,20 @@ icon=ifont.icon("tag")) # Time Plot plugin -PyDMTimePlotPlugin = qtplugin_factory(PyDMTimePlot, group=WidgetCategory.PLOT, - extensions=[TimeCurveEditorExtension, - RulesExtension], - icon=ifont.icon("chart-line")) +# In order to keep the archiver functionality invisible to users who do not have access to an instance of the +# archiver appliance, choose which version of the time plot to load here based on the user's environment +if "PYDM_ARCHIVER_URL" not in os.environ: + PyDMTimePlotPlugin = qtplugin_factory(PyDMTimePlot, group=WidgetCategory.PLOT, + extensions=[TimeCurveEditorExtension, + RulesExtension], + icon=ifont.icon("chart-line")) +else: + # Time Plot with archiver appliance support plugin + PyDMTimePlotPlugin = qtplugin_factory(PyDMArchiverTimePlot, group=WidgetCategory.PLOT, + extensions=[ArchiveTimeCurveEditorExtension, + RulesExtension], + icon=ifont.icon("chart-line") + ) # Waveform Plot plugin PyDMWaveformPlotPlugin = qtplugin_factory(PyDMWaveformPlot, diff --git a/pydm/widgets/timeplot.py b/pydm/widgets/timeplot.py index 40a729bb7..7ad75544c 100644 --- a/pydm/widgets/timeplot.py +++ b/pydm/widgets/timeplot.py @@ -1,7 +1,7 @@ import time import json from collections import OrderedDict -from pyqtgraph import ViewBox, AxisItem +from pyqtgraph import ViewBox, AxisItem, DateAxisItem import numpy as np from qtpy.QtGui import QColor from qtpy.QtCore import Signal, Slot, Property, QTimer @@ -204,7 +204,7 @@ def asyncUpdate(self): def update_min_max_y_values(self, new_value): """ - Updte the min and max y-value as a new value is available. This is + Update the min and max y-value as a new value is available. This is useful for auto-scaling to a specific curve. Parameters @@ -392,7 +392,7 @@ def initialize_for_designer(self): def addYChannel(self, y_channel=None, name=None, color=None, lineStyle=None, lineWidth=None, symbol=None, - symbolSize=None, yAxisName=None): + symbolSize=None, yAxisName=None, useArchiveData=False): """ Adds a new curve to the current plot @@ -432,8 +432,8 @@ def addYChannel(self, y_channel=None, name=None, color=None, plot_opts['lineWidth'] = lineWidth # Add curve - new_curve = TimePlotCurveItem(y_channel, plot_by_timestamps=self._plot_by_timestamps, name=name, color=color, - yAxisName=yAxisName, **plot_opts) + new_curve = self.createCurveItem(y_channel, self._plot_by_timestamps, name, color=color, + yAxisName=yAxisName, useArchiveData=useArchiveData, **plot_opts) new_curve.setUpdatesAsynchronously(self.updatesAsynchronously) new_curve.setBufferSize(self._bufferSize) @@ -445,6 +445,10 @@ def addYChannel(self, y_channel=None, name=None, color=None, return new_curve + def createCurveItem(self, y_channel, plot_by_timestamps, name, color, yAxisName, useArchiveData, **plot_opts): + return TimePlotCurveItem(y_channel, plot_by_timestamps=plot_by_timestamps, + name=name, color=color, yAxisName=yAxisName, **plot_opts) + def removeYChannel(self, curve): """ Remove a curve from the graph. This also stops update the timer diff --git a/requirements.txt b/requirements.txt index 4cd70f907..fac00d39f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ numpy>=1.11.0 pyepics>=3.2.7 -pyqtgraph>=0.11.0 +pyqtgraph>=0.12.0 qtpy requests>=1.1.0 scipy>=0.12.0