Skip to content

Commit

Permalink
Include the necessary modifications to display archive data in timeplots
Browse files Browse the repository at this point in the history
  • Loading branch information
jbellister-slac committed Feb 16, 2022
1 parent 6bfca8a commit a2bb68f
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 31 deletions.
148 changes: 133 additions & 15 deletions pydm/data_plugins/archiver_plugin.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,143 @@
from pydm.data_plugins.plugin import PyDMPlugin, PyDMConnection
import requests
import numpy as np
import os
import json
import logging
import numpy as np

from datetime import datetime
from typing import Optional

from pydm.widgets.channel import PyDMChannel
from qtpy.QtCore import Slot, QObject, QUrl
from qtpy.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from pydm.data_plugins.plugin import PyDMPlugin, PyDMConnection

logger = logging.getLogger(__name__)


class Connection(PyDMConnection):
"""
Manages the requests between the archiver data plugin and the archiver appliance itself.
"""

def __init__(self, channel, address, protocol=None, parent=None):
def __init__(self, channel: PyDMChannel, address: str, protocol: Optional[str] = None,
parent: Optional[QObject] = None):
super(Connection, self).__init__(channel, address, protocol, parent)
self.add_listener(channel)
base_url = os.getenv("PYDM_ARCHIVER_URL", "http://lcls-archapp.slac.stanford.edu")
url_string = "{base}/retrieval/data/getData.json?{params}".format(base=base_url, params=address)
r = requests.get(url_string) # blocking. BAD!
if r.status_code == 200 and r.headers['content-type'] == 'application/json':
self.connected = True
self.connection_state_signal.emit(True)
data_dict = r.json()
# x_data not used so commented out... maybe return it with y_data?
# x_data = np.array([point["secs"] for point in data_dict[0]["data"]])
y_data = np.array([point["val"] for point in data_dict[0]["data"]])
self.new_waveform_signal.emit(y_data)
self.address = address
self.network_manager = QNetworkAccessManager()
self.network_manager.finished[QNetworkReply].connect(self.data_request_finished)

def add_listener(self, channel: PyDMChannel):
"""
Connects a channel's signal to the slot on this connection so that the channel has a way of requesting
and receiving data from the archiver.
Parameters
----------
channel : PyDMChannel
The channel to connect
"""
super(Connection, self).add_listener(channel)
if channel.value_signal is not None:
channel.value_signal.connect(self.fetch_data)

def fetch_data(self, from_date: float, to_date: float, processing_command: Optional[str] = None):
"""
Fetches data from the Archiver Appliance based on the input parameters.
Parameters
----------
from_date : float
Timestamp for the oldest data point to retrieve
to_date : float
Timestamp for the newest data point to retrieve
processing_command : str
A string that will be added to the URL to request additional processing on the archiver side before
returning the data such as mean values or optimized. For a full list see:
https://slacmshankar.github.io/epicsarchiver_docs/userguide.html
Note: Due to the potential of additional valid options in the future, no validation is
done on this parameter. It is the responsibility of the caller to ensure it is valid
"""
if from_date >= to_date:
logger.error(f"Cannot fetch data for invalid data range, from date={from_date} and to date={to_date}")
return

# Archiver expects timestamps to be in utc by default
from_dt = datetime.utcfromtimestamp(from_date)
to_dt = datetime.utcfromtimestamp(to_date)

# Put the dates into the form expected by the archiver in the request url, see here for more details:
# http://joda-time.sourceforge.net/apidocs/org/joda/time/format/ISODateTimeFormat.html#dateTime()
from_date_str = from_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
to_date_str = to_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

base_url = os.getenv("PYDM_ARCHIVER_URL")
if base_url is None:
logger.error("Environment variable: PYDM_ARCHIVER_URL must be defined to use the archiver plugin, for "
"example: http://lcls-archapp.slac.stanford.edu")
return

url_string = f"{base_url}/retrieval/data/getData.json?{self.address}&from={from_date_str}&to={to_date_str}"
if processing_command:
url_string = url_string.replace("pv=", "pv=" + processing_command + "(", 1)
url_string = url_string.replace("&from=", ")&from=", 1)

request = QNetworkRequest(QUrl(url_string))
# This get call is non-blocking, can be made in parallel with others, and when the results are ready they
# will be delivered to the data_request_finished method below via the "finished" signal
self.network_manager.get(request)

@Slot(QNetworkReply)
def data_request_finished(self, reply: QNetworkReply):
"""
Invoked when the request to the archiver appliance has been completed and the reply has been returned. Will
fire off the value signal with a 2D numpy array containing the x-values (timestamps) and y-values (PV data).
Parameters
----------
reply: The response from the archiver appliance
"""
if reply.error() == QNetworkReply.NoError and reply.header(QNetworkRequest.ContentTypeHeader) == "application/json":
bytes_str = reply.readAll()
data_dict = json.loads(str(bytes_str, 'utf-8'))

if "pv=optimized" in reply.url().url(): # From a url object to a string
self._send_optimized_data(data_dict)
else:
self._send_raw_data(data_dict)
else:
logger.debug(f"Request for data from archiver failed, request url: {reply.url()} retrieved header: "
f"{reply.header(QNetworkRequest.ContentTypeHeader)} error: {reply.error()}")
reply.deleteLater()

def _send_raw_data(self, data_dict):
"""
Sends a numpy array of shape (2, data_length) containing the x-values (timestamps) and y-values (PV data)
via the new value signal
"""
data = np.array(([point["secs"] for point in data_dict[0]["data"]],
[point["val"] for point in data_dict[0]["data"]]))
self.new_value_signal[np.ndarray].emit(data)

def _send_optimized_data(self, data_dict):
"""
Sends a numpy array of shape (5, data_length). Index 0 contains the timestamps, index 1 the mean values,
index 2 the standard deviations, index 3 the minimum values, and index 4 the maximum values.
"""
pv_data = [point["val"] for point in data_dict[0]["data"]]
try:
data = np.array(([point["secs"] for point in data_dict[0]["data"]],
[point[0] for point in pv_data],
[point[1] for point in pv_data],
[point[2] for point in pv_data],
[point[3] for point in pv_data]))
except TypeError:
# The archiver will fall back to sending raw data if the optimized request is for more data points
# than are in the bin
self._send_raw_data(data_dict)
return

self.new_value_signal[np.ndarray].emit(data)


class ArchiverPlugin(PyDMPlugin):
Expand Down
99 changes: 99 additions & 0 deletions pydm/tests/widgets/test_timeplot.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from pydm.widgets.archiver_time_plot import ArchivePlotCurveItem
from pyqtgraph import AxisItem
from ...widgets.timeplot import TimePlotCurveItem, PyDMTimePlot, TimeAxisItem, MINIMUM_BUFFER_SIZE, DEFAULT_BUFFER_SIZE

Expand Down Expand Up @@ -128,6 +129,104 @@ def test_timeplotcurveitem_receive_value(qtbot, signals, async_update, new_data)
assert pydm_timeplot_curve_item._update_mode == PyDMTimePlot.SynchronousMode


def test_receive_archive_data(qtbot, signals):
"""
Ensure data from archiver appliance is inserted into the archive buffer correctly
"""
curve_item = ArchivePlotCurveItem()
curve_item.setBufferSize(20)
curve_item.setArchiveBufferSize(20)

# We start with a data buffer with some sample values like those generated by a running time plot
# The x-values are timestamps and the y-values are samples from a PV
example_live_data = np.array([[100, 101, 102, 103, 104, 105],
[2.15, 2.20, 2.25, 2.22, 2.20, 2.18]])
starting_data = np.concatenate((np.zeros((2, curve_item.getBufferSize()-6)), example_live_data), axis=1)
curve_item.points_accumulated += 6
curve_item.data_buffer = starting_data

# A quick check to make sure we're starting off correctly
assert (2, 20) == curve_item.data_buffer.shape

signals.new_value_signal[np.ndarray].connect(curve_item.receiveArchiveData)

# First test the most basic case, where we've requested a bit of archive data right before the live data
mock_archive_data = np.array([[70, 75, 80, 85, 90, 95],
[2.05, 2.08, 2.07, 2.08, 2.12, 2.14]])
signals.new_value_signal[np.ndarray].emit(mock_archive_data)

expected_data = np.zeros((2, 14))
expected_data = np.concatenate((expected_data, mock_archive_data), axis=1)

# Confirm the archive data was inserted as expected
assert np.array_equal(curve_item.archive_data_buffer, expected_data)


def test_insert_archive_data(qtbot, signals):
""" When first receiving large amounts of data from the archiver appliance, it will be of the 'optimized' form
in which it is sampled across a fixed number of bins. Drawing a zoom box in this data will get more detailed
data which must be inserted into the archive data buffer. This tests that insertion is successful. """
curve_item = ArchivePlotCurveItem()
curve_item.setBufferSize(10)
curve_item.setArchiveBufferSize(10)

# Need some initial data in the live data buffer so that the archive data gets inserted into the proper place
curve_item.data_buffer = np.array([[130, 140], [8, 9]])
curve_item.points_accumulated = 2

# Set up a sample archive buffer
curve_item.archive_data_buffer = np.array([[0, 0, 0, 0, 100, 105, 110, 115, 120, 125],
[0, 0, 0, 0, 2, 3, 4, 5, 6, 7]],
dtype=float)

curve_item.archive_points_accumulated = 6
curve_item.zoomed = True

signals.new_value_signal[np.ndarray].connect(curve_item.receiveArchiveData)

# Receive raw data that is more detailed than the two values it will be replacing
mock_archive_data = np.array([[104, 106, 108, 111],
[2.8, 3.1, 3.7, 3.95]])
signals.new_value_signal[np.ndarray].emit(mock_archive_data)

# The original average values for timestamps 105 and 106 should now be replace with the actual PV data
expected_data = np.array([[0, 0, 100, 104, 106, 108, 111, 115, 120, 125],
[0, 0, 2, 2.8, 3.1, 3.7, 3.95, 5, 6, 7]])

assert np.array_equal(curve_item.archive_data_buffer, expected_data)


def test_archive_buffer_full(qtbot, signals):
""" If we insert more data points than the archive buffer can hold, then the oldest points are
removed in favor of the new ones until the user requests further backfill data again """
curve_item = ArchivePlotCurveItem()
curve_item.setBufferSize(6)
curve_item.setArchiveBufferSize(6)
curve_item.data_buffer = np.array([[130, 140], [8, 9]])
curve_item.points_accumulated = 2

# Set up a sample archive buffer that is already full
curve_item.archive_data_buffer = np.array([[100, 105, 110, 115, 120, 125],
[2, 3, 4, 5, 6, 7]],
dtype=float)
curve_item.archive_points_accumulated = 6
curve_item.zoomed = True

signals.new_value_signal[np.ndarray].connect(curve_item.receiveArchiveData)

# Receive data that will cause that will not fit in the buffer without deleting other data points
mock_archive_data = np.array([[104, 106, 108],
[2.8, 3.1, 3.7]])

signals.new_value_signal[np.ndarray].emit(mock_archive_data)

# This is what is left over after the oldest data points have been trimmed
expected_data = np.array([[104, 106, 108, 115, 120, 125],
[2.8, 3.1, 3.7, 5, 6, 7]])

assert np.array_equal(curve_item.archive_data_buffer, expected_data)


@pytest.mark.parametrize("async_update, new_data", [
(False, -10),
(False, 10.2333),
Expand Down
10 changes: 5 additions & 5 deletions pydm/widgets/multi_axis_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ def addAxis(self, axis, name, plotDataItem=None, setXLink=False, enableAutoRange
view.setYRange(minRange, maxRange)
view.enableAutoRange(axis=ViewBox.XAxis, enable=enableAutoRangeX)
view.enableAutoRange(axis=ViewBox.YAxis, enable=enableAutoRangeY)
if setXLink:
view.setXLink(self) # Link this view to the shared x-axis of this plot item
else:
self.axes['bottom']['item'].linkToView(view) # Ensure the x axis will update when the view does
self.axes['bottom']['item'].linkToView(view)

view.setMouseMode(self.vb.state['mouseMode']) # Ensure that mouse behavior is consistent between stacked views
axis.linkToView(view)
Expand All @@ -93,7 +90,6 @@ def addAxis(self, axis, name, plotDataItem=None, setXLink=False, enableAutoRange

# Rebuilding the layout of the plot item will put the new axis in the correct place
self.rebuildLayout()
self.updateStackedViews()

def addStackedView(self, view):
"""
Expand Down Expand Up @@ -243,6 +239,10 @@ def setYRange(self, minY, maxY, padding=0, update=True):
view.setYRange(minY, maxY, padding=padding)
super(MultiAxisPlot, self).setYRange(minY, maxY, padding=padding)

def disableXAutoRange(self):
for view in self.stackedViews:
view.enableAutoRange(x=False)

def clearAxes(self):
"""
Cleans up all axis related data from this plot.
Expand Down
3 changes: 3 additions & 0 deletions pydm/widgets/multi_axis_viewbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class MultiAxisViewBox(ViewBox):
# These signals will be emitted by the view when it handles these events, and will be connected
# to the event handling code of the stacked views
sigMouseDragged = Signal(object, object, object)
sigMouseDraggedDone = Signal()
sigMouseWheelZoomed = Signal(object, object, object)
sigHistoryChanged = Signal(object)

Expand Down Expand Up @@ -61,6 +62,8 @@ def mouseDragEvent(self, ev, axis=None, fromSignal=False):
if axis != ViewBox.YAxis and not fromSignal:
# This event happened within the view box area itself or the x axis so propagate to any stacked view boxes
self.sigMouseDragged.emit(self, ev, axis)
if ev.isFinish() and self.state['mouseMode'] == ViewBox.RectMode and axis is None:
self.sigMouseDraggedDone.emit()
super(MultiAxisViewBox, self).mouseDragEvent(ev, axis)

def keyPressEvent(self, ev):
Expand Down
6 changes: 6 additions & 0 deletions pydm/widgets/qtplugin_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ..widgets.base import PyDMPrimitiveWidget

from ..widgets.rules_editor import RulesEditor
from ..widgets.archiver_time_plot_editor import ArchiverTimePlotCurveEditorDialog
from ..widgets.waveformplot_curve_editor import WaveformPlotCurveEditorDialog
from ..widgets.timeplot_curve_editor import TimePlotCurveEditorDialog
from ..widgets.scatterplot_curve_editor import ScatterPlotCurveEditorDialog
Expand Down Expand Up @@ -117,6 +118,11 @@ def __init__(self, widget):
super(WaveformCurveEditorExtension, self).__init__(widget, WaveformPlotCurveEditorDialog)


class ArchiveTimeCurveEditorExtension(BasePlotExtension):
def __init__(self, widget):
super(ArchiveTimeCurveEditorExtension, self).__init__(widget, ArchiverTimePlotCurveEditorDialog)


class TimeCurveEditorExtension(BasePlotExtension):
def __init__(self, widget):
super(TimeCurveEditorExtension, self).__init__(widget, TimePlotCurveEditorDialog)
Expand Down
22 changes: 17 additions & 5 deletions pydm/widgets/qtplugins.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import os

from .qtplugin_base import qtplugin_factory, WidgetCategory
from .qtplugin_extensions import (RulesExtension, WaveformCurveEditorExtension,
TimeCurveEditorExtension,
TimeCurveEditorExtension, ArchiveTimeCurveEditorExtension,
ScatterCurveEditorExtension, SymbolExtension)
from .tab_bar_qtplugin import TabWidgetPlugin
from .byte import PyDMByteIndicator
Expand Down Expand Up @@ -32,6 +33,7 @@
from .waveformtable import PyDMWaveformTable
from .scale import PyDMScaleIndicator
from .timeplot import PyDMTimePlot
from .archiver_time_plot import PyDMArchiverTimePlot
from .waveformplot import PyDMWaveformPlot
from .scatterplot import PyDMScatterPlot
from .template_repeater import PyDMTemplateRepeater
Expand All @@ -51,10 +53,20 @@
icon=ifont.icon("tag"))

# Time Plot plugin
PyDMTimePlotPlugin = qtplugin_factory(PyDMTimePlot, group=WidgetCategory.PLOT,
extensions=[TimeCurveEditorExtension,
RulesExtension],
icon=ifont.icon("chart-line"))
# In order to keep the archiver functionality invisible to users who do not have access to an instance of the
# archiver appliance, choose which version of the time plot to load here based on the user's environment
if "PYDM_ARCHIVER_URL" not in os.environ:
PyDMTimePlotPlugin = qtplugin_factory(PyDMTimePlot, group=WidgetCategory.PLOT,
extensions=[TimeCurveEditorExtension,
RulesExtension],
icon=ifont.icon("chart-line"))
else:
# Time Plot with archiver appliance support plugin
PyDMTimePlotPlugin = qtplugin_factory(PyDMArchiverTimePlot, group=WidgetCategory.PLOT,
extensions=[ArchiveTimeCurveEditorExtension,
RulesExtension],
icon=ifont.icon("chart-line")
)

# Waveform Plot plugin
PyDMWaveformPlotPlugin = qtplugin_factory(PyDMWaveformPlot,
Expand Down
Loading

0 comments on commit a2bb68f

Please sign in to comment.