Skip to content

Commit

Permalink
Merge pull request #46 from European-XFEL/timeout
Browse files Browse the repository at this point in the history
Add timeout option on Client
  • Loading branch information
tmichela authored Mar 1, 2019
2 parents ef58006 + f978e7d commit 6889339
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
40 changes: 34 additions & 6 deletions karabo_bridge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
"""

from functools import partial
import pickle

import msgpack
import numpy as np
import pickle
import zmq


Expand All @@ -39,7 +40,16 @@ class Client:
socket type - supported: REQ, SUB.
ser : str, optional
Serialization protocol to use to decode the incoming message (default
is msgpack) - supported: msgpack,pickle.
is msgpack) - supported: msgpack, pickle.
timeout : int
Timeout on :method:`next` (in seconds)
Data transfered at the EuXFEL for Mega-pixels detectors can be very
large. Setting a too small timeout might end in never getting data.
Some example of transfer timing for 1Mpix detector (AGIPD, LPD):
32 pulses per train (125 MB): ~0.1 s
128 pulses per train (500 MB): ~0.4 s
350 pulses per train (1.37 GB): ~1 s
Raises
------
Expand All @@ -48,7 +58,7 @@ class Client:
ZMQError
if provided endpoint is not valid.
"""
def __init__(self, endpoint, sock='REQ', ser='msgpack'):
def __init__(self, endpoint, sock='REQ', ser='msgpack', timeout=None):

self._context = zmq.Context()
self._socket = None
Expand All @@ -66,10 +76,15 @@ def __init__(self, endpoint, sock='REQ', ser='msgpack'):
else:
raise NotImplementedError('socket is not supported:', str(sock))

if timeout is not None:
self._socket.setsockopt(zmq.RCVTIMEO, int(timeout * 1000))
self._recv_ready = False

self._pattern = self._socket.TYPE

if ser == 'msgpack':
self._deserializer = partial(msgpack.loads, raw=False)
self._deserializer = partial(msgpack.loads, raw=False,
max_bin_len=0x7fffffff)
elif ser == 'pickle':
self._deserializer = pickle.loads
else:
Expand All @@ -90,10 +105,23 @@ def next(self):
This dictionary is populated for protocol version 1.0 and 2.2.
For other protocol versions, metadata information is available in
`data` dict.
Raises
------
TimeoutError
If timeout is reached before receiving data.
"""
if self._pattern == zmq.REQ:
if self._pattern == zmq.REQ and not self._recv_ready:
self._socket.send(b'next')
msg = self._socket.recv_multipart(copy=False)
self._recv_ready = True
try:
msg = self._socket.recv_multipart(copy=False)
except zmq.error.Again:
raise TimeoutError(
'No data received from {} in the last {} ms'.format(
self._socket.getsockopt_string(zmq.LAST_ENDPOINT),
self._socket.getsockopt(zmq.RCVTIMEO)))
self._recv_ready = False
return self._deserialize(msg)

def _deserialize(self, msg):
Expand Down
10 changes: 10 additions & 0 deletions karabo_bridge/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ def test_iterator(sim_server):
for i, (data, metadata) in enumerate(islice(c, 3)):
trainId = metadata['SPB_DET_AGIPD1M-1/DET/0CH0:xtdf']['timestamp.tid']
assert trainId == 10000000000 + i


def test_timeout():
no_server = 'ipc://nodata'
with Client(no_server, timeout=0.2) as c:
for _ in range(3):
with pytest.raises(TimeoutError) as info:
tid, data = c.next()

assert 'No data received from ipc://nodata in the last 200 ms' in str(info.value)

0 comments on commit 6889339

Please sign in to comment.