-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathireventd.py
445 lines (381 loc) · 14.2 KB
/
ireventd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
#!/usr/bin/env python3
#
# Raspberry Pi remote control daemon
# $Id: ireventd.py,v 1.39 2025/01/19 11:03:55 bob Exp $
#
# Author : Bob Rathbone
# Site : http://www.bobrathbone.com
#
# License: GNU V3, See https://www.gnu.org/copyleft/gpl.html
#
# Disclaimer: Software is provided as is and absolutly no warranties are implied or given.
# The authors shall not be liable for any loss or damage however caused.
#
# The important configuration files are
# /etc/rc_keymaps ir-keytable .toml configuration files
# <myremote>.toml ir-keytable definition file
#
# This is the Python 3 version for use on Bullseye
#
# This class is calls the ir_daemon.py class to daemonize this process
# It is normally called from ireventd.service
import RPi.GPIO as GPIO
import configparser
import sys
import pwd
import os
import time
import signal
import socket
import errno
import re
import pdb
from evdev import *
from threading import Timer
# Radio project imports
from config_class import Configuration
from ir_daemon import Daemon
from log_class import Log
log = Log()
IR_LED=11 # GPIO 11 pin 23
remote_led = IR_LED
muted = False
udphost = 'localhost' # IR Listener UDP host default localhost
udpport = 5100 # IR Listener UDP port number default 5100
config = Configuration()
pidfile = '/var/run/ireventd.pid'
key_maps = '/etc/rc_keymaps'
sys_rc = '/sys/class/rc'
rc_device = ''
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False) # Disable warnings
recording = False
# Signal SIGTERM handler
def signalHandler(signal,frame):
global log
pid = os.getpid()
log.message("Remote control stopped, PID " + str(pid), log.INFO)
sys.exit(0)
# User signal SIGUSR1/2 used to indicate if recording active
def sigUser(signal,frame):
global recording
if signal == 10 and remote_led > 0:
recording = True
GPIO.output(remote_led, True)
elif signal == 12 and remote_led > 0:
recording = False
GPIO.output(remote_led, False)
# Daemon class
class RemoteDaemon(Daemon):
keytable = config.keytable
keytable = 'myremote.toml'
ir_device = 'rc0' # Can be rc0, rc1, rc2, rc4 - Run ir-keytable to see actual
play_number = 0
timer_running = False
timer = None
protocols_cmd = 'sudo ir-keytable -p rc-5,rc-5-sz,jvc,sony,nec,sanyo,mce_kbd,rc-6,sharp'
def run(self):
global remote_led
global udpport
global udphost
log.init('radio')
progcall = str(sys.argv)
log.message(progcall, log.DEBUG)
log.message('Remote control running pid ' + str(os.getpid()), log.INFO)
signal.signal(signal.SIGHUP,signalHandler)
signal.signal(signal.SIGUSR1,sigUser)
signal.signal(signal.SIGUSR2,sigUser)
msg = "Using IR kernel events architecture"
print(msg)
log.message(msg, log.DEBUG)
remote_led = config.remote_led
if remote_led > 0:
print("Flashing LED on GPIO", remote_led)
GPIO.setup(remote_led, GPIO.OUT) # Output LED
self.flash_led(remote_led)
else:
log.message("Remote control LED disabled", log.DEBUG)
self.ir_device = self.get_ir_device('gpio_ir_recv')
print("Using device /sys/class/rc/" + self.ir_device)
udphost = config.remote_control_host
udpport = config.remote_control_port
log.message("UDP connect host " + udphost + " port " + str(udpport), log.DEBUG)
devices = [InputDevice(path) for path in list_devices()]
#print("DEBUG " + str(devices))
self.keytable = config.keytable
self.loadKeyTable(self.keytable)
# Define IR input as defined in /boot/config.txt
irin = None
for device in devices:
print(device.path, device.name, device.phys)
if(device.name=="gpio_ir_recv"):
irin = device
if(irin == None):
print("Unable to find IR input device, exiting")
exit(1)
self.listener(irin)
# Returns the device name for the "gpio_ir_recv" overlay (rc0...rc6)
# Used to load ir_keytable
def get_ir_device(self,sName):
global rc_device
found = False
for x in range(7):
name = ''
device = ''
for y in range(7):
file = sys_rc + '/rc' + str(x) + '/input' + str(y) + '/name'
if os.path.isfile (file):
try:
f = open(file, "r")
name = f.read()
name = name.strip()
if (sName == name):
device = 'rc' + str(x)
rc_device = sys_rc + '/rc' + str(x)
found = True
break
f.close()
except Exception as e:
print(str(e))
if found:
break
return device
# Used by KEY_NUMERIC_x entries
def timerTask(self):
msg = 'IR event PLAY_' + str(self.play_number)
log.message(msg, log.DEBUG)
if self.play_number > 0:
print('PLAY_' + str(self.play_number))
reply = self.udpSend('PLAY_' + str(self.play_number))
print(reply)
self.timer_running = False
self.timer.cancel()
self.play_number = 0
# Load the specified key table into /etc/rc_keymaps/
def loadKeyTable(self,keytable):
# Load protocols
print(self.protocols_cmd)
log.message(self.protocols_cmd, log.DEBUG)
execCommand(self.protocols_cmd)
msg = "Loading " + key_maps + "/" + self.keytable
log.message(msg,log.INFO)
cmd = "sudo /usr/bin/ir-keytable -c -w " + key_maps + "/" + keytable + " -s " + self.ir_device
print(cmd)
log.message(cmd,log.DEBUG)
execCommand("sudo /usr/bin/ir-keytable -c -w " + key_maps + "/" + keytable
+ " -s " + self.ir_device)
# Handle the IR input event
def readInputEvent(self,device):
volume = 70
for event in device.read_loop():
# Event returns sec, usec (combined with .), type, code, value
# Type 01 or ecodes.EV_KEY is a keypress event
# A value of 0 is key up, 1 is key down, 2 is repeat key
# the code is the value of the keypress
# Full details at https://python-evdev.readthedocs.io/en/latest/apidoc.html
# However we can use the categorize structure to simplify things
# data.keycode - Text respresentation of the key
# data.keystate - State of the key, may match .key_down or .key_up
# data.scancode - Key code
# See https://python-evdev.readthedocs.io/en/latest/apidoc.html#evdev.events.InputEvent
if event.type == ecodes.EV_KEY:
if remote_led > 0:
if recording:
GPIO.output(remote_led, False)
else:
GPIO.output(remote_led, True)
# Or use categorize. This is more useful if we want to write a function to
# return a text representation of the button press on a key down
data = categorize(event)
if data.keystate > 0:
if len(data.keycode) == 2:
keycode = data.keycode[1]
else:
keycode = data.keycode
print(keycode,hex(data.scancode),data.keystate)
if 'KEY_NUMERIC_' in keycode :
playnum = int(keycode.split('_')[2])
if self.play_number > 0:
self.play_number *= 10
self.play_number += playnum
if not self.timer_running:
self.timer = Timer(2, self.timerTask, args=None)
self.timer.start()
self.timer_running = True
else:
print(keycode)
reply = self.udpSend(keycode)
print(reply)
if 'KEY_RECORD' in keycode :
time.sleep(2)
if remote_led > 0:
if recording:
GPIO.output(remote_led, True)
else:
GPIO.output(remote_led, False)
# Listener routine
def listener(self,irin):
print("Listening for IR events:")
while True:
self.readInputEvent(irin)
# Status enquiry
def status(self):
# Get the pid from the pidfile
try:
pf = open(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "Remote control status: not running"
log.message(message, log.INFO)
print(message)
else:
message = "Remote control running pid " + str(pid)
log.message(message, log.INFO)
print(message)
return
# Test udp send
def send(self,msg):
reply = self.udpSend(msg)
return reply
# Flash the IR activity LED
def flash_led(self,led):
count = 6
delay = 0.3
log.message("Flash LED on GPIO " + str(led), log.DEBUG)
while count > 0:
GPIO.output(led, True)
time.sleep(delay)
GPIO.output(led, False)
time.sleep(delay)
count -= 1
return
# Test the LED
def flash(self):
log.init('radio')
remote_led = config.remote_led
if remote_led > 0:
GPIO.setup(remote_led, GPIO.OUT) # Output LED
self.flash_led(remote_led)
return
# Send button data to radio program
def udpSend(self,button):
global udpport
data = ''
log.message("Remote control daemon udpSend " + button, log.DEBUG)
# The host to send to is either local host or the IP address of the remote server
udphost = config.remote_listen_host
try:
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
clientsocket.settimeout(3)
button = button.encode('utf-8')
clientsocket.sendto(button, (udphost, udpport))
data = clientsocket.recv(100).strip()
clientsocket.close()
except socket.error as e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
msg = "IR remote udpSend no data: " + str(e)
print(msg)
log.message(msg, log.ERROR)
else:
# Errors such as timeout
msg = "IR remote udpSend: " + str(e)
print(msg)
log.message(msg , log.ERROR)
if len(data) > 0:
data = data.decode('utf-8')
log.message("IR daemon server sent: " + data, log.DEBUG)
return data
# Execute system command
def execCommand(cmd):
p = os.popen(cmd)
return p.readline().rstrip('\n')
# Print usage
def usage():
print(("Usage: sudo %s start|stop|status|nodaemon|flash|config|send <KEY>" % sys.argv[0]))
sys.exit(2)
def usageSend():
print(("Usage: %s send <KEY>" % sys.argv[0]))
print ("Where <KEY> is a valid IR_KEY")
print (" KEY_VOLUMEUP,KEY_VOLUMEDOWN,KEY_CHANNELUP,KEY_CHANNELDOWN,KEY_MENU")
print (" KEY_UP,KEY_DOWN,KEY_LEFT,KEY_RIGHT,KEY_OK,KEY_INFO,KEY_MUTE,KEY_RECORD")
print (" PLAY_n Where n is the station or track number. E.g. PLAY_9 or PLAY_176 ")
sys.exit(2)
# get gpio-ir dtoverlay configuration and port configuration
def getBootConfig(str):
file = "/boot/config.txt"
if os.path.exists("/boot/firmware/config.txt"):
file = "/boot/firmware/config.txt"
f = open(file, "r")
msg = 'Warning: dtoverlay gpio-ir not configured in ' + file
for line in f:
if re.search(str, line):
msg = line
return msg
# Display configuration
def displayConfiguration():
global rc_device
print("Remote Control daemon configuration")
print("-----------------------------------")
config = Configuration()
print("LED = GPIO " + str(config.remote_led))
print("HOST = " + config.remote_control_host)
print("PORT = " + str(config.remote_control_port))
print("LISTEN = " + str(config.remote_control_port))
line = getBootConfig("^dtoverlay=gpio-ir")
if line != None:
print(line.rstrip())
mods = execCommand("lsmod | grep -i gpio_ir_recv")
if len(mods) > 0:
x = mods.split(' ')
print ("Module %s loaded" % x[0])
else:
print ("ERROR: Module gpio_ir_recv not loaded, missing gpio-ir overlay")
daemon.get_ir_device('gpio_ir_recv')
print('Sysfs: ' + rc_device)
protos = execCommand("cat " + rc_device + '/protocols')
print('Protocols ' + protos)
for file in os.listdir(key_maps):
if file.endswith(".toml"):
print("Key map: " + os.path.join(key_maps, file))
### Main routine ###
if __name__ == "__main__":
if pwd.getpwuid(os.geteuid()).pw_uid > 0:
print("This program must be run with sudo or root permissions!")
usage()
daemon = RemoteDaemon(pidfile)
if len(sys.argv) >= 2:
if 'start' == sys.argv[1]:
daemon.start()
daemon.flash_led()
elif 'nodaemon' == sys.argv[1]:
daemon.nodaemon()
elif 'stop' == sys.argv[1]:
daemon.stop()
elif 'flash' == sys.argv[1]:
daemon.flash()
elif 'status' == sys.argv[1]:
daemon.status()
elif 'send' == sys.argv[1]:
msg = 'IR_REMOTE'
if len(sys.argv) > 2:
msg = sys.argv[2]
msg = msg.upper()
reply = daemon.send(msg)
print(reply)
else:
usageSend()
elif 'config' == sys.argv[1]:
displayConfiguration()
else:
print("Unknown command: " + sys.argv[1])
usage()
sys.exit(0)
else:
usage()
# set tabstop=4 shiftwidth=4 expandtab
# retab
# End of script