-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoscserver.py
146 lines (112 loc) · 3.91 KB
/
oscserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# -*- coding: utf-8 -*-
#
# uosc/server.py
#
"""A minimal OSC UDP server."""
import socket
try:
from ustruct import unpack
except ImportError:
from struct import unpack
from common import Impulse, to_time
MAX_DGRAM_SIZE = 1472
def split_oscstr(msg, offset):
end = msg.find(b'\0', offset)
return msg[offset:end].decode('utf-8'), (end + 4) & ~0x03
def split_oscblob(msg, offset):
start = offset + 4
size = unpack('>I', msg[offset:start])[0]
return msg[start:start + size], (start + size + 4) & ~0x03
def parse_timetag(msg, offset):
"""Parse an OSC timetag from msg at offset."""
return to_time(unpack('>II', msg[offset:offset + 4]))
def parse_message(msg, strict=False):
args = []
addr, ofs = split_oscstr(msg, 0)
if not addr.startswith('/'):
raise ValueError("OSC address pattern must start with a slash.")
# type tag string must start with comma (ASCII 44)
if ofs < len(msg) and msg[ofs:ofs + 1] == b',':
tags, ofs = split_oscstr(msg, ofs)
tags = tags[1:]
else:
errmsg = "Missing/invalid OSC type tag string."
if strict:
raise ValueError(errmsg)
else:
tags = ''
for typetag in tags:
size = 0
if typetag in 'ifd':
size = 8 if typetag == 'd' else 4
args.append(unpack('>' + typetag, msg[ofs:ofs + size])[0])
elif typetag in 'sS':
s, ofs = split_oscstr(msg, ofs)
args.append(s)
elif typetag == 'b':
s, ofs = split_oscblob(msg, ofs)
args.append(s)
elif typetag in 'rm':
size = 4
args.append(unpack('BBBB', msg[ofs:ofs + size]))
elif typetag == 'c':
size = 4
args.append(chr(unpack('>I', msg[ofs:ofs + size])[0]))
elif typetag == 'h':
size = 8
args.append(unpack('>q', msg[ofs:ofs + size])[0])
elif typetag == 't':
size = 8
args.append(parse_timetag(msg, ofs))
elif typetag in 'TFNI':
args.append({'T': True, 'F': False, 'I': Impulse}.get(typetag))
else:
raise ValueError("Type tag '%s' not supported." % typetag)
ofs += size
return (addr, tags, tuple(args))
def parse_bundle(bundle, strict=False):
"""Parse a binary OSC bundle.
Returns a generator which walks over all contained messages and bundles
recursively, depth-first. Each item yielded is a (timetag, message) tuple.
"""
if not bundle.startswith(b'#bundle\0'):
raise TypeError("Bundle must start with b'#bundle\\0'.")
ofs = 16
timetag = to_time(*unpack('>II', bundle[8:ofs]))
while True:
if ofs >= len(bundle):
break
size = unpack('>I', bundle[ofs:ofs + 4])[0]
element = bundle[ofs + 4:ofs + 4 + size]
ofs += size + 4
if element.startswith(b'#bundle'):
for el in parse_bundle(element):
yield el
else:
yield timetag, parse_message(element, strict)
def handle_osc(data, src, dispatch=None, strict=False):
try:
head, _ = split_oscstr(data, 0)
if head.startswith('/'):
messages = [(-1, parse_message(data, strict))]
elif head == '#bundle':
messages = parse_bundle(data, strict)
except Exception as exc:
return
try:
for timetag, (oscaddr, tags, args) in messages:
if dispatch:
dispatch(timetag, (oscaddr, tags, args, src))
except Exception as exc:
print("Error")
def run_server(saddr, port, handler=handle_osc):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((saddr, port))
try:
while True:
data, caddr = sock.recvfrom(MAX_DGRAM_SIZE)
handler(data, caddr)
finally:
sock.close()
log.info("Bye!")