-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpymsgq.py
169 lines (157 loc) · 6.25 KB
/
pymsgq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#-*- coding:utf-8 -*-
# Copyright 2017 jj4jj <resc@vip.qq.com>
#
# Licensed under the MIT License;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at https://opensource.org/licenses/MIT
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" A simple wrapper System V msgq interface
Eazy to communicate with native program in Linux platform . :D
"""
from __future__ import unicode_literals
import ctypes
import os
import errno
import logging
libc=ctypes.CDLL('libc.so.6',use_errno=True)
_msgget = libc.msgget
_msgsnd = libc.msgsnd
_msgrcv = libc.msgrcv
_msgctl = libc.msgctl
_ftok = libc.ftok
###########################################
IPC_CREAT=512
IPC_NOWAIT=2048
IPC_STAT=2
IPC_SET=1
MSG_NOERROR=4096
#msgqbuf
def _msgbuf(size):
class __msgbuf(ctypes.Structure):
_pack_ = 1
_fields_ = [
('mtype', ctypes.c_long, 8*8),
('mtext', ctypes.c_byte*size,),
]
return __msgbuf()
#msgdsbuf
DS_STRUCT_SIZE = 120
DS_STRUCT_DUMMY_OFFSET_SIZE = 88
DS_STRUCT_DUMMY_PADDING_SIZE = DS_STRUCT_SIZE - DS_STRUCT_DUMMY_OFFSET_SIZE - 8
class _msgdsbuf(ctypes.Structure):
_pack_ = 1
_fields_ = [
('dummy_1', ctypes.c_byte*DS_STRUCT_DUMMY_OFFSET_SIZE),
('msg_qbytes', ctypes.c_long, 8*8),
('dummy_2', ctypes.c_byte*DS_STRUCT_DUMMY_PADDING_SIZE)
]
class Msgq(object):
#438=0666
def __init__(self, key, create=False, max_msg_buff_sz=512*1024, max_msgq_buff_total_sz=1024*1024*16, perms=438, passive = True):
flags=perms
if create:
flags=IPC_CREAT|perms
#########################
mqkey = key
if isinstance(key, str) or isinstance(key, unicode):
if os.path.exists(key):
proj_id = 2
if passive:
proj_id = 1
mqkey = _ftok(str(key), proj_id)
else:
raise Exception("create msgq error path key:%s" % key)
logging.debug('key path:%s passive:%s msgq key:%d proj:%d' % (key, str(passive), mqkey, proj_id))
self.mqid = _msgget(mqkey, flags)
if self.mqid < 0:
raise Exception('create msgq error:%s key:%s ftok:%d passive:%s' % (os.strerror(ctypes.get_errno()), str(key), mqkey, str(passive)))
if create:
ds = _msgdsbuf()
err = _msgctl(self.mqid, IPC_STAT, ctypes.byref(ds))
if err < 0:
raise Exception('get msgq stat error:%s' % os.strerror(ctypes.get_errno()))
if ds.msg_qbytes < max_msgq_buff_total_sz:
ds.msg_qbytes = max_msgq_buff_total_sz
err = _msgctl(self.mqid, IPC_SET, ctypes.byref(ds))
if err < 0:
raise Exception('set msgq buffer error:%s' % os.strerror(ctypes.get_errno()))
self.msgbuf = _msgbuf(max_msg_buff_sz)
self.max_msg_size = max_msg_buff_sz
def send(self, buff, mtype=0, flags=IPC_NOWAIT):
if mtype == 0:
mtype = id(self)
self.msgbuf.mtype = mtype
if len(buff) > self.max_msg_size:
raise Exception('send msgq buff too big (>%d) !' % self.max_msg_size)
nc = 0
for c in buff:
self.msgbuf.mtext[nc] = ctypes.c_byte(ord(c))
nc += 1
err = _msgsnd(self.mqid, ctypes.byref(self.msgbuf), nc, flags)
if err < 0:
eno = ctypes.get_errno()
if eno == errno.EAGAIN:
return -1
if eno == errno.EINTR:
return -2
raise Exception('send msgq error:%s' % os.strerror(cyptes.get_errno()))
return err
def recv(self, mtype=0, flags=IPC_NOWAIT):
ntx = _msgrcv(self.mqid, ctypes.byref(self.msgbuf), ctypes.sizeof(self.msgbuf.mtext), mtype, flags)
if ntx == -1:
eno = ctypes.get_errno()
if eno == errno.ENOMSG or eno == errno.EAGAIN or eno == errno.EINTR:
return 0,None
if eno == errno.E2BIG:
ntx = _msgrcv(self.mqid, ctypes.byref(self.msgbuf), ctypes.sizeof(self.msgbuf.mtext), mtype, flags|MSG_NOERROR)
#error too big msg
return 0,None
raise Exception('recv msgq error:%s' % os.strerror(eno))
return self.msgbuf.mtype,ctypes.string_at(self.msgbuf.mtext, ntx)
if __name__ == '__main__':
import sys
if len(sys.argv) == 1:
print('Usage:%s <send|recv|key_send|key_recv|ftok>' % sys.argv[0])
print('testing send and recv interface')
sys.exit(-1)
if sys.argv[1] == 'ftok':
print(_ftok(sys.argv[2], int(sys.argv[3])))
elif sys.argv[1] == 'send':
test_q = Msgq(123456, True)
msg = 'hello[\x92\xE5]\0\x56 world!'
print('case1: send msg(%s) (sz:%d) ret (0/-1):' % (msg,len(msg)), test_q.send(msg, 54321))
print('case2: send msg(%s) (sz:%d) ret (0/-1):' % (msg,len(msg)), test_q.send(msg))
elif sys.argv[1] == 'key_send':
test_q = Msgq('/tmp', True)
msg = 'hello[\x92\xE5]\0\x56 world!'
print('case1: send msg(%s) (sz:%d) ret (0/-1):' % (msg,len(msg)), test_q.send(msg, 54321))
print('case2: send msg(%s) (sz:%d) ret (0/-1):' % (msg,len(msg)), test_q.send(msg))
elif sys.argv[1] == 'key_recv':
test_q = Msgq('/tmp', True)
mtype,mbuff = test_q.recv()
sz = 0
if mbuff:
sz = len(mbuff)
print('case1: recv ret (msg type,msg buff,sz):', mtype, mbuff,sz)
mtype,mbuff = test_q.recv()
sz = 0
if mbuff:
sz = len(mbuff)
print('case2: recv ret (msg type,msg buff,sz):', mtype, mbuff,sz)
else:
test_q = Msgq(123456, False)
mtype,mbuff = test_q.recv()
sz = 0
if mbuff:
sz = len(mbuff)
print('case1: recv ret (msg type,msg buff,sz):', mtype, mbuff,sz)
mtype,mbuff = test_q.recv()
sz = 0
if mbuff:
sz = len(mbuff)
print('case2: recv ret (msg type,msg buff,sz):', mtype, mbuff,sz)