Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: ArchivePlotCurveItem Connection Status #1108

Merged
merged 9 commits into from
Sep 18, 2024
19 changes: 15 additions & 4 deletions pydm/data_plugins/archiver_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from typing import Optional

from pydm.widgets.channel import PyDMChannel
from qtpy.QtCore import Slot, QObject, QUrl
from qtpy import sip
from qtpy.QtCore import Slot, QObject, QUrl, QTimer
from qtpy.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from pydm.data_plugins.plugin import PyDMPlugin, PyDMConnection

Expand Down Expand Up @@ -78,6 +79,7 @@ def fetch_data(self, from_date: float, to_date: float, processing_command: Optio
"Environment variable: PYDM_ARCHIVER_URL must be defined to use the archiver plugin, for "
"example: http://lcls-archapp.slac.stanford.edu"
)
self.connection_state_signal.emit(False)
return

url_string = f"{base_url}/retrieval/data/getData.json?{self.address}&from={from_date_str}&to={to_date_str}"
Expand All @@ -88,7 +90,14 @@ def fetch_data(self, from_date: float, to_date: float, processing_command: Optio
request = QNetworkRequest(QUrl(url_string))
# This get call is non-blocking, can be made in parallel with others, and when the results are ready they
# will be delivered to the data_request_finished method below via the "finished" signal
self.network_manager.get(request)
reply = self.network_manager.get(request)

def timeout():
if not isinstance(reply, QNetworkReply) or sip.isdeleted(reply):
return
reply.abort()

QTimer.singleShot(7500, timeout)

@Slot(QNetworkReply)
def data_request_finished(self, reply: QNetworkReply) -> None:
Expand All @@ -100,10 +109,12 @@ def data_request_finished(self, reply: QNetworkReply) -> None:
----------
reply: The response from the archiver appliance
"""
if (
success = (
reply.error() == QNetworkReply.NoError
and reply.header(QNetworkRequest.ContentTypeHeader) == "application/json"
):
)
self.connection_state_signal.emit(success)
if success:
bytes_str = reply.readAll()
data_dict = json.loads(str(bytes_str, "utf-8"))

Expand Down
34 changes: 26 additions & 8 deletions pydm/widgets/archiver_time_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,24 @@ class ArchivePlotCurveItem(TimePlotCurveItem):
use_archive_data : bool
If True, requests will be made to archiver appliance for archived data when
the plot is zoomed or scrolled to the left.
liveData : bool
If True, the curve will gather data in real time.
**kws : dict[str: any]
Additional parameters supported by pyqtgraph.PlotDataItem.
"""

# Used to request data from archiver appliance (starting timestamp, ending timestamp, processing command)
archive_data_request_signal = Signal(float, float, str)
archive_data_received_signal = Signal()
archive_channel_connection = Signal(bool)
prompt_archive_request = Signal()

def __init__(
self, channel_address: Optional[str] = None, use_archive_data: bool = True, liveData: bool = True, **kws
):
self.archive_channel = None
super(ArchivePlotCurveItem, self).__init__(**kws)
self.use_archive_data = use_archive_data
self.archive_channel = None
self.archive_points_accumulated = 0
self._archiveBufferSize = DEFAULT_ARCHIVE_BUFFER_SIZE
self.archive_data_buffer = np.zeros((2, self._archiveBufferSize), order="f", dtype=float)
Expand Down Expand Up @@ -81,23 +85,33 @@ def address(self, new_address: str) -> None:
"""Creates the channel for the input address for communicating with the archiver appliance plugin."""
TimePlotCurveItem.address.__set__(self, new_address)

if self.archive_channel:
if new_address == self.archive_channel.address:
return
self.archive_channel.disconnect()

if not new_address:
self.archive_channel = None
return
elif self.archive_channel and new_address == self.archive_channel.address:
return

# Prepare new address to use the archiver plugin and create the new channel
archive_address = "archiver://pv=" + remove_protocol(new_address.strip())
self.archive_channel = PyDMChannel(
address=archive_address, value_slot=self.receiveArchiveData, value_signal=self.archive_data_request_signal
address=archive_address,
value_slot=self.receiveArchiveData,
value_signal=self.archive_data_request_signal,
connection_slot=self.archive_channel_connection.emit,
)
self.archive_channel.connect()

# Clear the archive data of the previous channel and redraw the curve
if self.archive_points_accumulated:
self.initializeArchiveBuffer()
self.redrawCurve()

# Prompt the curve's associated plot to fetch archive data
self.prompt_archive_request.emit()

@property
def liveData(self):
return self._liveData
Expand Down Expand Up @@ -704,6 +718,7 @@ def __init__(
self._prev_x = None # Holds the minimum x-value of the previous update of the plot
self._starting_timestamp = time.time() # The timestamp at which the plot was first rendered
self._archive_request_queued = False
self.setTimeSpan(DEFAULT_TIME_SPAN)

def updateXAxis(self, update_immediately: bool = False) -> None:
"""Manages the requests to archiver appliance. When the user pans or zooms the x axis to the left,
Expand All @@ -715,9 +730,9 @@ def updateXAxis(self, update_immediately: bool = False) -> None:
max_x = self.plotItem.getAxis("bottom").range[1]
max_point = max([curve.max_x() for curve in self._curves])
if min_x == 0: # This is zero when the plot first renders
min_x = time.time()
self._min_x = min_x
self._starting_timestamp = min_x - DEFAULT_TIME_SPAN # A bit of a buffer so we don't overwrite live data
self._max_x = time.time()
self._min_x = self._max_x - DEFAULT_TIME_SPAN
self._starting_timestamp = self._max_x
if self.getTimeSpan() != DEFAULT_TIME_SPAN:
zdomke marked this conversation as resolved.
Show resolved Hide resolved
# Initialize x-axis based on the time span as well as trigger a call to the archiver below
self._min_x = self._min_x - self.getTimeSpan()
Expand Down Expand Up @@ -816,6 +831,7 @@ def createCurveItem(self, *args, **kwargs) -> ArchivePlotCurveItem:
"""Create and return a curve item to be plotted"""
curve_item = ArchivePlotCurveItem(*args, **kwargs)
curve_item.archive_data_received_signal.connect(self.archive_data_received)
curve_item.prompt_archive_request.connect(self.requestDataFromArchiver)
zdomke marked this conversation as resolved.
Show resolved Hide resolved
return curve_item

@Slot()
Expand Down Expand Up @@ -909,7 +925,7 @@ def addYChannel(
"""
Overrides timeplot addYChannel method to be able to pass the liveData flag.
"""
return super().addYChannel(
curve = super().addYChannel(
y_channel=y_channel,
plot_style=plot_style,
name=name,
Expand All @@ -926,6 +942,8 @@ def addYChannel(
useArchiveData=useArchiveData,
liveData=liveData,
)
self.requestDataFromArchiver()
zdomke marked this conversation as resolved.
Show resolved Hide resolved
return curve

def addFormulaChannel(self, yAxisName: str, **kwargs) -> FormulaCurveItem:
"""Creates a FormulaCurveItem and links it to the given y axis"""
Expand Down
13 changes: 10 additions & 3 deletions pydm/widgets/timeplot.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class TimePlotCurveItem(BasePlotCurveItem):

_channels = ("channel",)
unitSignal = Signal(str)
live_channel_connection = Signal(bool)

def __init__(self, channel_address=None, plot_by_timestamps=True, plot_style="Line", **kws):
"""
Expand Down Expand Up @@ -101,9 +102,10 @@ def __init__(self, channel_address=None, plot_by_timestamps=True, plot_style="Li
self.points_accumulated = 0
self.latest_value = None
self.channel = None
self.address = channel_address
self.units = ""

super(TimePlotCurveItem, self).__init__(**kws)
self.address = channel_address

def to_dict(self):
dic_ = OrderedDict([("channel", self.address), ("plot_style", self.plot_style)])
Expand All @@ -119,18 +121,22 @@ def address(self):
@address.setter
def address(self, new_address: str):
"""Creates the channel for the input address for communicating with the address' plugin."""
if self.channel:
if new_address == self.channel.address:
return
self.channel.disconnect()

if not new_address:
self.channel = None
return
elif self.channel and new_address == self.channel.address:
return

self.channel = PyDMChannel(
address=new_address,
connection_slot=self.connectionStateChanged,
value_slot=self.receiveNewValue,
unit_slot=self.unitsChanged,
)
self.channel.connect()
zdomke marked this conversation as resolved.
Show resolved Hide resolved

# Clear the data from the previous channel and redraw the curve
if self.points_accumulated:
Expand Down Expand Up @@ -181,6 +187,7 @@ def unitsChanged(self, units: str):
@Slot(bool)
def connectionStateChanged(self, connected):
# Maybe change pen stroke?
self.live_channel_connection.emit(connected)
self.connected = connected
if not self.connected:
self.latest_value = np.nan
Expand Down
Loading