-
Notifications
You must be signed in to change notification settings - Fork 0
/
zhelpers.py
58 lines (46 loc) · 1.51 KB
/
zhelpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# encoding: utf-8
"""
Helper module for example applications. Mimics ZeroMQ Guide's zhelpers.h.
"""
import binascii
import os
import socket
from random import randint
import zmq
def socket_set_hwm(socket, hwm=-1):
"""libzmq 2/3/4 compatible sethwm"""
try:
socket.sndhwm = socket.rcvhwm = hwm
except AttributeError:
socket.hwm = hwm
def dump(msg_or_socket):
"""Receives all message parts from socket, printing each frame neatly"""
if isinstance(msg_or_socket, zmq.Socket):
# it's a socket, call on current message
msg = msg_or_socket.recv_multipart()
else:
msg = msg_or_socket
print("----------------------------------------")
for part in msg:
print("[%03d]" % len(part), end=' ')
is_text = True
try:
print(part.decode('ascii'))
except UnicodeDecodeError:
print(r"0x%s" % (binascii.hexlify(part).decode('ascii')))
def set_id(zsocket):
"""Set simple random printable identity on socket"""
identity = "%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
zsocket.setsockopt_string(zmq.IDENTITY, identity)
def zpipe(ctx_A, ctx_B, A, B, queue_size=10):
"""build inproc pipe for talking to threads
Returns a pair of sockets connected via inproc
"""
a = ctx_A.socket(A)
b = ctx_B.socket(B)
a.linger = b.linger = 0
a.hwm = b.hwm = queue_size
interface = "inproc://%s" % binascii.hexlify(os.urandom(8))
b.bind(interface)
a.connect(interface)
return a, b