From a0a4f91e76d0e7ac58dfe6cc20776e867c5317d2 Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Tue, 20 Aug 2024 09:44:49 -0700 Subject: [PATCH 1/9] ENH: ArchivePlotCurveItem address validation --- pydm/widgets/archiver_time_plot.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pydm/widgets/archiver_time_plot.py b/pydm/widgets/archiver_time_plot.py index ecbc47c67..f98842614 100644 --- a/pydm/widgets/archiver_time_plot.py +++ b/pydm/widgets/archiver_time_plot.py @@ -22,6 +22,7 @@ logger = logging.getLogger(__name__) DEFAULT_ARCHIVE_BUFFER_SIZE = 18000 +DEFAULT_VALIDITY_TIMEOUT = 7500 DEFAULT_TIME_SPAN = 3600.0 MIN_TIME_SPAN = 5.0 APPROX_SECONDS_300_YEARS = 10000000000 @@ -40,6 +41,11 @@ class ArchivePlotCurveItem(TimePlotCurveItem): use_archive_data : bool If True, requests will be made to archiver appliance for archived data when the plot is zoomed or scrolled to the left. + liveData : bool + If True, the curve will gather data in real time. + validity_timeout : int + The time waited between setting a new address and determining if the + address is in the archiver. Measured in milliseconds. Default is 7500. **kws : dict[str: any] Additional parameters supported by pyqtgraph.PlotDataItem. """ @@ -47,9 +53,15 @@ class ArchivePlotCurveItem(TimePlotCurveItem): # Used to request data from archiver appliance (starting timestamp, ending timestamp, processing command) archive_data_request_signal = Signal(float, float, str) archive_data_received_signal = Signal() + invalid_archive_channel = Signal() def __init__( - self, channel_address: Optional[str] = None, use_archive_data: bool = True, liveData: bool = True, **kws + self, + channel_address: Optional[str] = None, + use_archive_data: bool = True, + liveData: bool = True, + validity_timeout: int = DEFAULT_VALIDITY_TIMEOUT, + **kws ): super(ArchivePlotCurveItem, self).__init__(**kws) self.use_archive_data = use_archive_data @@ -64,6 +76,12 @@ def __init__( self.error_bar_item = ErrorBarItem() self.error_bar_needs_set = True + # Create a timer for checking if the channel is accessible + self.validity_timer = QTimer(self) + self.validity_timer.setInterval(validity_timeout) + self.validity_timer.setSingleShot(True) + self.validity_timer.timeout.connect(self.invalid_archive_channel.emit) + self.address = channel_address def to_dict(self) -> OrderedDict: @@ -92,6 +110,7 @@ def address(self, new_address: str) -> None: self.archive_channel = PyDMChannel( address=archive_address, value_slot=self.receiveArchiveData, value_signal=self.archive_data_request_signal ) + self.validity_timer.start() # Clear the archive data of the previous channel and redraw the curve if self.archive_points_accumulated: @@ -130,6 +149,7 @@ def receiveArchiveData(self, data: np.ndarray) -> None: Additional indices may be used as well based on the type of request made to the archiver appliance. For example optimized data will include standard deviations, minimums, and maximums """ + self.validity_timer.stop() archive_data_length = len(data[0]) max_x = data[0][archive_data_length - 1] From 2273ac1315ce46ab77fe95f334bcc00ed4c54830 Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Mon, 26 Aug 2024 13:52:41 -0700 Subject: [PATCH 2/9] ENH: Archiver Plugin connection state --- pydm/data_plugins/archiver_plugin.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pydm/data_plugins/archiver_plugin.py b/pydm/data_plugins/archiver_plugin.py index 3eb43c69e..cca0a7f5a 100644 --- a/pydm/data_plugins/archiver_plugin.py +++ b/pydm/data_plugins/archiver_plugin.py @@ -78,6 +78,7 @@ def fetch_data(self, from_date: float, to_date: float, processing_command: Optio "Environment variable: PYDM_ARCHIVER_URL must be defined to use the archiver plugin, for " "example: http://lcls-archapp.slac.stanford.edu" ) + self.connection_state_signal.emit(False) return url_string = f"{base_url}/retrieval/data/getData.json?{self.address}&from={from_date_str}&to={to_date_str}" @@ -100,10 +101,10 @@ def data_request_finished(self, reply: QNetworkReply) -> None: ---------- reply: The response from the archiver appliance """ - if ( - reply.error() == QNetworkReply.NoError - and reply.header(QNetworkRequest.ContentTypeHeader) == "application/json" - ): + success = (reply.error() == QNetworkReply.NoError + and reply.header(QNetworkRequest.ContentTypeHeader) == "application/json") + self.connection_state_signal.emit(success) + if success: bytes_str = reply.readAll() data_dict = json.loads(str(bytes_str, "utf-8")) From 35428ee842cf4c393337fd8f0965dc8b5b4e43a2 Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Mon, 26 Aug 2024 13:56:10 -0700 Subject: [PATCH 3/9] ENH: TimePlotCurves emit connection status --- pydm/widgets/archiver_time_plot.py | 15 +++++---------- pydm/widgets/timeplot.py | 2 ++ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/pydm/widgets/archiver_time_plot.py b/pydm/widgets/archiver_time_plot.py index f98842614..bc96342b5 100644 --- a/pydm/widgets/archiver_time_plot.py +++ b/pydm/widgets/archiver_time_plot.py @@ -53,14 +53,13 @@ class ArchivePlotCurveItem(TimePlotCurveItem): # Used to request data from archiver appliance (starting timestamp, ending timestamp, processing command) archive_data_request_signal = Signal(float, float, str) archive_data_received_signal = Signal() - invalid_archive_channel = Signal() + archive_channel_connection = Signal(bool) def __init__( self, channel_address: Optional[str] = None, use_archive_data: bool = True, liveData: bool = True, - validity_timeout: int = DEFAULT_VALIDITY_TIMEOUT, **kws ): super(ArchivePlotCurveItem, self).__init__(**kws) @@ -76,12 +75,6 @@ def __init__( self.error_bar_item = ErrorBarItem() self.error_bar_needs_set = True - # Create a timer for checking if the channel is accessible - self.validity_timer = QTimer(self) - self.validity_timer.setInterval(validity_timeout) - self.validity_timer.setSingleShot(True) - self.validity_timer.timeout.connect(self.invalid_archive_channel.emit) - self.address = channel_address def to_dict(self) -> OrderedDict: @@ -108,7 +101,10 @@ def address(self, new_address: str) -> None: # Prepare new address to use the archiver plugin and create the new channel archive_address = "archiver://pv=" + remove_protocol(new_address.strip()) self.archive_channel = PyDMChannel( - address=archive_address, value_slot=self.receiveArchiveData, value_signal=self.archive_data_request_signal + address=archive_address, + value_slot=self.receiveArchiveData, + value_signal=self.archive_data_request_signal, + connection_slot=self.archive_channel_connection.emit ) self.validity_timer.start() @@ -149,7 +145,6 @@ def receiveArchiveData(self, data: np.ndarray) -> None: Additional indices may be used as well based on the type of request made to the archiver appliance. For example optimized data will include standard deviations, minimums, and maximums """ - self.validity_timer.stop() archive_data_length = len(data[0]) max_x = data[0][archive_data_length - 1] diff --git a/pydm/widgets/timeplot.py b/pydm/widgets/timeplot.py index 5cd1e96fd..e9cb4c18d 100644 --- a/pydm/widgets/timeplot.py +++ b/pydm/widgets/timeplot.py @@ -65,6 +65,7 @@ class TimePlotCurveItem(BasePlotCurveItem): _channels = ("channel",) unitSignal = Signal(str) + live_channel_connection = Signal(bool) def __init__(self, channel_address=None, plot_by_timestamps=True, plot_style="Line", **kws): """ @@ -181,6 +182,7 @@ def unitsChanged(self, units: str): @Slot(bool) def connectionStateChanged(self, connected): # Maybe change pen stroke? + self.live_channel_connection.emit(connected) self.connected = connected if not self.connected: self.latest_value = np.nan From ef95375cbeea8cc1aa056999cce032c34b66b61b Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Mon, 26 Aug 2024 13:57:23 -0700 Subject: [PATCH 4/9] ENH: TimePlotCurves (dis)connect channel in address setter --- pydm/data_plugins/archiver_plugin.py | 6 +++-- pydm/widgets/archiver_time_plot.py | 35 ++++++++++++++-------------- pydm/widgets/timeplot.py | 8 +++++-- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/pydm/data_plugins/archiver_plugin.py b/pydm/data_plugins/archiver_plugin.py index cca0a7f5a..1338dd5f0 100644 --- a/pydm/data_plugins/archiver_plugin.py +++ b/pydm/data_plugins/archiver_plugin.py @@ -101,8 +101,10 @@ def data_request_finished(self, reply: QNetworkReply) -> None: ---------- reply: The response from the archiver appliance """ - success = (reply.error() == QNetworkReply.NoError - and reply.header(QNetworkRequest.ContentTypeHeader) == "application/json") + success = ( + reply.error() == QNetworkReply.NoError + and reply.header(QNetworkRequest.ContentTypeHeader) == "application/json" + ) self.connection_state_signal.emit(success) if success: bytes_str = reply.readAll() diff --git a/pydm/widgets/archiver_time_plot.py b/pydm/widgets/archiver_time_plot.py index bc96342b5..611825f08 100644 --- a/pydm/widgets/archiver_time_plot.py +++ b/pydm/widgets/archiver_time_plot.py @@ -22,7 +22,6 @@ logger = logging.getLogger(__name__) DEFAULT_ARCHIVE_BUFFER_SIZE = 18000 -DEFAULT_VALIDITY_TIMEOUT = 7500 DEFAULT_TIME_SPAN = 3600.0 MIN_TIME_SPAN = 5.0 APPROX_SECONDS_300_YEARS = 10000000000 @@ -43,9 +42,6 @@ class ArchivePlotCurveItem(TimePlotCurveItem): the plot is zoomed or scrolled to the left. liveData : bool If True, the curve will gather data in real time. - validity_timeout : int - The time waited between setting a new address and determining if the - address is in the archiver. Measured in milliseconds. Default is 7500. **kws : dict[str: any] Additional parameters supported by pyqtgraph.PlotDataItem. """ @@ -54,17 +50,14 @@ class ArchivePlotCurveItem(TimePlotCurveItem): archive_data_request_signal = Signal(float, float, str) archive_data_received_signal = Signal() archive_channel_connection = Signal(bool) + prompt_archive_request = Signal() def __init__( - self, - channel_address: Optional[str] = None, - use_archive_data: bool = True, - liveData: bool = True, - **kws + self, channel_address: Optional[str] = None, use_archive_data: bool = True, liveData: bool = True, **kws ): + self.archive_channel = None super(ArchivePlotCurveItem, self).__init__(**kws) self.use_archive_data = use_archive_data - self.archive_channel = None self.archive_points_accumulated = 0 self._archiveBufferSize = DEFAULT_ARCHIVE_BUFFER_SIZE self.archive_data_buffer = np.zeros((2, self._archiveBufferSize), order="f", dtype=float) @@ -92,11 +85,14 @@ def address(self, new_address: str) -> None: """Creates the channel for the input address for communicating with the archiver appliance plugin.""" TimePlotCurveItem.address.__set__(self, new_address) + if self.archive_channel: + if new_address == self.archive_channel.address: + return + self.archive_channel.disconnect() + if not new_address: self.archive_channel = None return - elif self.archive_channel and new_address == self.archive_channel.address: - return # Prepare new address to use the archiver plugin and create the new channel archive_address = "archiver://pv=" + remove_protocol(new_address.strip()) @@ -104,15 +100,18 @@ def address(self, new_address: str) -> None: address=archive_address, value_slot=self.receiveArchiveData, value_signal=self.archive_data_request_signal, - connection_slot=self.archive_channel_connection.emit + connection_slot=self.archive_channel_connection.emit, ) - self.validity_timer.start() + self.archive_channel.connect() # Clear the archive data of the previous channel and redraw the curve if self.archive_points_accumulated: self.initializeArchiveBuffer() self.redrawCurve() + # Prompt the curve's associated plot to fetch archive data + self.prompt_archive_request.emit() + @property def liveData(self): return self._liveData @@ -719,6 +718,7 @@ def __init__( self._prev_x = None # Holds the minimum x-value of the previous update of the plot self._starting_timestamp = time.time() # The timestamp at which the plot was first rendered self._archive_request_queued = False + self.setTimeSpan(DEFAULT_TIME_SPAN) def updateXAxis(self, update_immediately: bool = False) -> None: """Manages the requests to archiver appliance. When the user pans or zooms the x axis to the left, @@ -730,9 +730,9 @@ def updateXAxis(self, update_immediately: bool = False) -> None: max_x = self.plotItem.getAxis("bottom").range[1] max_point = max([curve.max_x() for curve in self._curves]) if min_x == 0: # This is zero when the plot first renders - min_x = time.time() - self._min_x = min_x - self._starting_timestamp = min_x - DEFAULT_TIME_SPAN # A bit of a buffer so we don't overwrite live data + self._max_x = time.time() + self._min_x = self._max_x - DEFAULT_TIME_SPAN + self._starting_timestamp = self._max_x if self.getTimeSpan() != DEFAULT_TIME_SPAN: # Initialize x-axis based on the time span as well as trigger a call to the archiver below self._min_x = self._min_x - self.getTimeSpan() @@ -831,6 +831,7 @@ def createCurveItem(self, *args, **kwargs) -> ArchivePlotCurveItem: """Create and return a curve item to be plotted""" curve_item = ArchivePlotCurveItem(*args, **kwargs) curve_item.archive_data_received_signal.connect(self.archive_data_received) + curve_item.prompt_archive_request.connect(self.requestDataFromArchiver) return curve_item @Slot() diff --git a/pydm/widgets/timeplot.py b/pydm/widgets/timeplot.py index e9cb4c18d..5a1559213 100644 --- a/pydm/widgets/timeplot.py +++ b/pydm/widgets/timeplot.py @@ -120,11 +120,14 @@ def address(self): @address.setter def address(self, new_address: str): """Creates the channel for the input address for communicating with the address' plugin.""" + if self.channel: + if new_address == self.channel.address: + return + self.channel.disconnect() + if not new_address: self.channel = None return - elif self.channel and new_address == self.channel.address: - return self.channel = PyDMChannel( address=new_address, @@ -132,6 +135,7 @@ def address(self, new_address: str): value_slot=self.receiveNewValue, unit_slot=self.unitsChanged, ) + self.channel.connect() # Clear the data from the previous channel and redraw the curve if self.points_accumulated: From 7248de005526f38c47f3ea3c57fe05d6575b2d74 Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Wed, 28 Aug 2024 14:15:58 -0700 Subject: [PATCH 5/9] ENH: Archiver Plugin request timeout (30s -> 7.5s) --- pydm/data_plugins/archiver_plugin.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pydm/data_plugins/archiver_plugin.py b/pydm/data_plugins/archiver_plugin.py index 1338dd5f0..447aa5195 100644 --- a/pydm/data_plugins/archiver_plugin.py +++ b/pydm/data_plugins/archiver_plugin.py @@ -7,7 +7,7 @@ from typing import Optional from pydm.widgets.channel import PyDMChannel -from qtpy.QtCore import Slot, QObject, QUrl +from qtpy.QtCore import Slot, QObject, QUrl, QTimer from qtpy.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply from pydm.data_plugins.plugin import PyDMPlugin, PyDMConnection @@ -89,7 +89,9 @@ def fetch_data(self, from_date: float, to_date: float, processing_command: Optio request = QNetworkRequest(QUrl(url_string)) # This get call is non-blocking, can be made in parallel with others, and when the results are ready they # will be delivered to the data_request_finished method below via the "finished" signal - self.network_manager.get(request) + self.connection_state_signal.emit(False) + reply = self.network_manager.get(request) + QTimer.singleShot(7500, reply.abort) @Slot(QNetworkReply) def data_request_finished(self, reply: QNetworkReply) -> None: From 8fb67a413199fbc621326ff472ef258c1b4b37a2 Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Thu, 29 Aug 2024 09:46:19 -0700 Subject: [PATCH 6/9] FIX: Archiver Plugin timeout fix --- pydm/data_plugins/archiver_plugin.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pydm/data_plugins/archiver_plugin.py b/pydm/data_plugins/archiver_plugin.py index 447aa5195..488971aa1 100644 --- a/pydm/data_plugins/archiver_plugin.py +++ b/pydm/data_plugins/archiver_plugin.py @@ -91,7 +91,12 @@ def fetch_data(self, from_date: float, to_date: float, processing_command: Optio # will be delivered to the data_request_finished method below via the "finished" signal self.connection_state_signal.emit(False) reply = self.network_manager.get(request) - QTimer.singleShot(7500, reply.abort) + + def timeout(): + if isinstance(reply, QNetworkReply): + reply.abort() + + QTimer.singleShot(7500, timeout) @Slot(QNetworkReply) def data_request_finished(self, reply: QNetworkReply) -> None: From 19e74f68eb94a98885e9f54b8f7e1c6c1e7b35f8 Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Wed, 4 Sep 2024 16:55:07 -0700 Subject: [PATCH 7/9] FIX: Check if QNetworkReply is deleted --- pydm/data_plugins/archiver_plugin.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pydm/data_plugins/archiver_plugin.py b/pydm/data_plugins/archiver_plugin.py index 488971aa1..0689c3b63 100644 --- a/pydm/data_plugins/archiver_plugin.py +++ b/pydm/data_plugins/archiver_plugin.py @@ -7,6 +7,7 @@ from typing import Optional from pydm.widgets.channel import PyDMChannel +from qtpy import sip from qtpy.QtCore import Slot, QObject, QUrl, QTimer from qtpy.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply from pydm.data_plugins.plugin import PyDMPlugin, PyDMConnection @@ -89,12 +90,12 @@ def fetch_data(self, from_date: float, to_date: float, processing_command: Optio request = QNetworkRequest(QUrl(url_string)) # This get call is non-blocking, can be made in parallel with others, and when the results are ready they # will be delivered to the data_request_finished method below via the "finished" signal - self.connection_state_signal.emit(False) reply = self.network_manager.get(request) def timeout(): - if isinstance(reply, QNetworkReply): - reply.abort() + if not isinstance(reply, QNetworkReply) or sip.isdeleted(reply): + return + reply.abort() QTimer.singleShot(7500, timeout) From 714340747e0cdb747d3dca2de2768235ddee713a Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Fri, 13 Sep 2024 11:32:09 -0700 Subject: [PATCH 8/9] FIX: Fixing issues in PR #1108 --- pydm/widgets/archiver_time_plot.py | 4 +++- pydm/widgets/timeplot.py | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pydm/widgets/archiver_time_plot.py b/pydm/widgets/archiver_time_plot.py index 611825f08..6be9f6fe2 100644 --- a/pydm/widgets/archiver_time_plot.py +++ b/pydm/widgets/archiver_time_plot.py @@ -925,7 +925,7 @@ def addYChannel( """ Overrides timeplot addYChannel method to be able to pass the liveData flag. """ - return super().addYChannel( + curve = super().addYChannel( y_channel=y_channel, plot_style=plot_style, name=name, @@ -942,6 +942,8 @@ def addYChannel( useArchiveData=useArchiveData, liveData=liveData, ) + self.requestDataFromArchiver() + return curve def addFormulaChannel(self, yAxisName: str, **kwargs) -> FormulaCurveItem: """Creates a FormulaCurveItem and links it to the given y axis""" diff --git a/pydm/widgets/timeplot.py b/pydm/widgets/timeplot.py index 5a1559213..25d76ed41 100644 --- a/pydm/widgets/timeplot.py +++ b/pydm/widgets/timeplot.py @@ -102,9 +102,10 @@ def __init__(self, channel_address=None, plot_by_timestamps=True, plot_style="Li self.points_accumulated = 0 self.latest_value = None self.channel = None - self.address = channel_address self.units = "" + super(TimePlotCurveItem, self).__init__(**kws) + self.address = channel_address def to_dict(self): dic_ = OrderedDict([("channel", self.address), ("plot_style", self.plot_style)]) From 1a7dcb8d192e8399ce314023bb2427dd6acfcd2e Mon Sep 17 00:00:00 2001 From: Zach Domke Date: Tue, 17 Sep 2024 10:11:48 -0700 Subject: [PATCH 9/9] FIX: More issues in PR #1108 --- pydm/widgets/archiver_time_plot.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pydm/widgets/archiver_time_plot.py b/pydm/widgets/archiver_time_plot.py index 6be9f6fe2..1c0fcb1ff 100644 --- a/pydm/widgets/archiver_time_plot.py +++ b/pydm/widgets/archiver_time_plot.py @@ -5,7 +5,7 @@ from collections import OrderedDict from typing import List, Optional from pyqtgraph import DateAxisItem, ErrorBarItem -from pydm.utilities import remove_protocol +from pydm.utilities import remove_protocol, is_qt_designer from pydm.widgets.channel import PyDMChannel from pydm.widgets.timeplot import TimePlotCurveItem from pydm.widgets import PyDMTimePlot @@ -714,9 +714,9 @@ def __init__( bottom_axis=DateAxisItem("bottom"), ) self.optimized_data_bins = optimized_data_bins - self._min_x = None - self._prev_x = None # Holds the minimum x-value of the previous update of the plot self._starting_timestamp = time.time() # The timestamp at which the plot was first rendered + self._min_x = self._starting_timestamp - DEFAULT_TIME_SPAN + self._prev_x = self._min_x # Holds the minimum x-value of the previous update of the plot self._archive_request_queued = False self.setTimeSpan(DEFAULT_TIME_SPAN) @@ -733,7 +733,7 @@ def updateXAxis(self, update_immediately: bool = False) -> None: self._max_x = time.time() self._min_x = self._max_x - DEFAULT_TIME_SPAN self._starting_timestamp = self._max_x - if self.getTimeSpan() != DEFAULT_TIME_SPAN: + if self.getTimeSpan() != MIN_TIME_SPAN: # Initialize x-axis based on the time span as well as trigger a call to the archiver below self._min_x = self._min_x - self.getTimeSpan() self._archive_request_queued = True @@ -942,7 +942,8 @@ def addYChannel( useArchiveData=useArchiveData, liveData=liveData, ) - self.requestDataFromArchiver() + if not is_qt_designer(): + self.requestDataFromArchiver() return curve def addFormulaChannel(self, yAxisName: str, **kwargs) -> FormulaCurveItem: