Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout option on Client #46

Merged
merged 9 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions karabo_bridge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
"""

from functools import partial
import pickle

import msgpack
import numpy as np
import pickle
import zmq


Expand All @@ -39,7 +40,16 @@ class Client:
socket type - supported: REQ, SUB.
ser : str, optional
Serialization protocol to use to decode the incoming message (default
is msgpack) - supported: msgpack,pickle.
is msgpack) - supported: msgpack, pickle.
timeout : int
Timeout on :method:`next` (in seconds)

Data transfered at the EuXFEL for Mega-pixels detectors can be very
large. Setting a too small timeout might end in never getting data.
Some example of transfer timing for 1Mpix detector (AGIPD, LPD):
32 pulses per train (125 MB): ~0.1 s
128 pulses per train (500 MB): ~0.4 s
350 pulses per train (1.37 GB): ~1 s

Raises
------
Expand All @@ -48,7 +58,7 @@ class Client:
ZMQError
if provided endpoint is not valid.
"""
def __init__(self, endpoint, sock='REQ', ser='msgpack'):
def __init__(self, endpoint, sock='REQ', ser='msgpack', timeout=None):

self._context = zmq.Context()
self._socket = None
Expand All @@ -66,10 +76,15 @@ def __init__(self, endpoint, sock='REQ', ser='msgpack'):
else:
raise NotImplementedError('socket is not supported:', str(sock))

if timeout is not None:
self._socket.setsockopt(zmq.RCVTIMEO, int(timeout * 1000))
self._recv_ready = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit wary of this extra state variable. I haven't thought through in detail, but I think it might end up with both server and client thinking it's their turn to listen for a message, and it won't be obvious why nothing is getting sent.

I don't have a better idea in the short term, but maybe this is the real reason to use another socket type like PUSH/PULL - it avoids having client & server state which can get out of step.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't like it so much... But is has proven reliable at least in use with karaboFAI.
I should spend more time playing with PUSH/PULL and other patterns, but (PUSH/PULL) seems to bring also some more complication:

  • handling properly message queues, since messages are very large it can easily crash in client applications
  • PUSH/PULL doesn't allow load balancing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can constrain the memory usage easily enough in our client code by setting a HWM. Not sure what you mean about the load balancing. I'm thinking about how the bridge can work most efficiently; maybe this is something we can work on when I'm there next week.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PUSH socket does not allow load balancing if you have several clients (PULL sockets), it is shared in round robin manner, so if a client is slow or not ready it will still receive data that will be queued. It is generally bad if you have some client that is used for monitoring once in a while what is in the data, in that case it will consume 1/nth train even if not using anything - also thinking what happens if one client is not consuming data, and the input queue is full, it this blocking the whole interface?)... Also for tools like onda, I believe the master process is waiting for processed train/pulses to arrive in order so if a slow worker still queue data at its input it can increase delay quite a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I never managed to make use of the HWM, if you have an example if it working I'm highly interested!

Copy link
Member Author

@tmichela tmichela Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks,

I tried it quickly, it does seem to work! I however hit into something weird:
It happened that exiting a PULL socket while receiving data could crash the PUSH socket:

sent 177
Assertion failed: !more (src/lb.cpp:110)
zsh: abort (core dumped)  python pushpull.py push

Bug is known and fixed in libzmq 4.2.x
zeromq/libzmq#1588

I had to update pyzmq>=17.1.0 to be built against the lib 4.2, but then the weird behavior is: when closing the PULL socket while receiving data the PUSH is not crashing anymore, but the next data received (after restarting a PULL) starts from the last part not sent. :

 % python pushpull.py pull
RCVHWM 1
RCVBUF -1
# parts: 2
# parts: 2
# parts: 2
^C%
tmichela@machine ~/projects/tmp/pushpull
 % python pushpull.py pull
RCVHWM 1
RCVBUF -1
# parts: 1
# parts: 2
# parts: 2
# parts: 2

you can find the code for the example here

Copy link
Member Author

@tmichela tmichela Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also seem that closing a PULL socket affects other PULL sockets:

 % python pushpull.py pull
RCVHWM 1
RCVBUF -1
9 - # parts: 2
10 - # parts: 3
11 - # parts: 3
12 - # parts: 3
13 - # parts: 3
14 - # parts: 3
16 - # parts: 3
19 - # parts: 3
20 - # parts: 3
23 - # parts: 3
25 - # parts: 3
26 - # parts: 2
27 - # parts: 3
29 - # parts: 3
31 - # parts: 2
32 - # parts: 3
33 - # parts: 3
34 - # parts: 3
36 - # parts: 3
38 - # parts: 2
39 - # parts: 3

Here I receive partial messages when an other PULL socket is terminated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move this discussion to a separate issue? I think it's possible that's a bug with ZeroMQ or pyzmq.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue on pyzmq for it: zeromq/pyzmq#1244

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


self._pattern = self._socket.TYPE

if ser == 'msgpack':
self._deserializer = partial(msgpack.loads, raw=False)
self._deserializer = partial(msgpack.loads, raw=False,
max_bin_len=0x7fffffff)
tmichela marked this conversation as resolved.
Show resolved Hide resolved
elif ser == 'pickle':
self._deserializer = pickle.loads
else:
Expand All @@ -90,10 +105,23 @@ def next(self):
This dictionary is populated for protocol version 1.0 and 2.2.
For other protocol versions, metadata information is available in
`data` dict.

Raises
------
TimeoutError
If timeout is reached before receiving data.
"""
if self._pattern == zmq.REQ:
if self._pattern == zmq.REQ and not self._recv_ready:
self._socket.send(b'next')
msg = self._socket.recv_multipart(copy=False)
self._recv_ready = True
try:
msg = self._socket.recv_multipart(copy=False)
except zmq.error.Again:
raise TimeoutError(
tmichela marked this conversation as resolved.
Show resolved Hide resolved
'No data received from {} in the last {} ms'.format(
self._socket.getsockopt_string(zmq.LAST_ENDPOINT),
self._socket.getsockopt(zmq.RCVTIMEO)))
self._recv_ready = False
return self._deserialize(msg)

def _deserialize(self, msg):
Expand Down
10 changes: 10 additions & 0 deletions karabo_bridge/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ def test_iterator(sim_server):
for i, (data, metadata) in enumerate(islice(c, 3)):
trainId = metadata['SPB_DET_AGIPD1M-1/DET/0CH0:xtdf']['timestamp.tid']
assert trainId == 10000000000 + i


def test_timeout():
no_server = 'ipc://nodata'
with Client(no_server, timeout=0.2) as c:
for _ in range(3):
with pytest.raises(TimeoutError) as info:
tid, data = c.next()

assert 'No data received from ipc://nodata in the last 200 ms' in str(info.value)
tmichela marked this conversation as resolved.
Show resolved Hide resolved