|
| 1 | +import threading, struct, queue |
| 2 | +from PPTypes import * |
| 3 | + |
| 4 | +FormatString = '@nnHHi' |
| 5 | +FormatSize = struct.calcsize(FormatString) |
| 6 | + |
| 7 | +class KernelInputEventStruct: |
| 8 | + Fields = ('tv_sec', 'tv_usec', 'type', 'code', 'value') |
| 9 | + |
| 10 | + def __init__(self, Input): |
| 11 | + Blob = struct.unpack(FormatString, Input) |
| 12 | + assert len(self.Fields) == len(Blob) |
| 13 | + |
| 14 | + for Inc, Field in enumerate(self.Fields): |
| 15 | + setattr(self, Field, Blob[Inc]) |
| 16 | + |
| 17 | + @property |
| 18 | + def MSTime(self): |
| 19 | + return ((self.tv_sec * 1_000_000) + self.tv_usec) // 1000 |
| 20 | + |
| 21 | +class ButtonMonitor: |
| 22 | + def __init__(self): |
| 23 | + self.__PowerThread = threading.Thread(target=self.__PowerThreadFunc) |
| 24 | + self.__VolThread = threading.Thread(target=self.__VolThreadFunc) |
| 25 | + self.__ShouldDie = False |
| 26 | + self.__DieLock = threading.Lock() |
| 27 | + self.__ButtonStates = { ButtonType(S) : ButtonState(S) for S in (ButtonType.POWER, ButtonType.VOLDOWN, ButtonType.VOLUP) } |
| 28 | + self.__ButtonEvent = queue.Queue() |
| 29 | + self.__PowerThread.start() |
| 30 | + self.__VolThread.start() |
| 31 | + |
| 32 | + @property |
| 33 | + def ShouldDie(self): |
| 34 | + with self.__DieLock: |
| 35 | + return bool(self.__ShouldDie) |
| 36 | + |
| 37 | + @ShouldDie.setter |
| 38 | + def ShouldDie(self, Value): |
| 39 | + with self.__DieLock: |
| 40 | + self.__ShouldDie = Value |
| 41 | + |
| 42 | + def WaitForChange(self, Timeout = None): |
| 43 | + try: |
| 44 | + return self.__ButtonEvent.get(timeout=Timeout) |
| 45 | + except queue.Empty: |
| 46 | + return None |
| 47 | + |
| 48 | + def __del__(self): |
| 49 | + return #Might be blocking on a read(), just forget about it and let Python deal with it. |
| 50 | + |
| 51 | + self.__ShouldDie = True |
| 52 | + self.__PowerThread.join() |
| 53 | + self.__VolThread.join() |
| 54 | + |
| 55 | + def __VolThreadFunc(self): |
| 56 | + with open('/dev/input/event1', 'rb') as Desc: |
| 57 | + while not self.ShouldDie: |
| 58 | + Blob = Desc.read(FormatSize) |
| 59 | + |
| 60 | + S = KernelInputEventStruct(Blob) |
| 61 | + |
| 62 | + if not S.type or not S.code: continue #Filter useless data |
| 63 | + |
| 64 | + print(f'Volume button event {S.__dict__}') |
| 65 | + |
| 66 | + with self.__ButtonStates[ButtonType(S.code)] as Button: |
| 67 | + Button.IsPressed = bool(S.value) |
| 68 | + Button.LastChangeTime = Button.ChangeTime |
| 69 | + Button.ChangeTime = S.MSTime |
| 70 | + |
| 71 | + self.__ButtonEvent.put(self.GetButtonStates()) |
| 72 | + |
| 73 | + |
| 74 | + def __PowerThreadFunc(self): |
| 75 | + with open('/dev/input/event0', 'rb') as Desc: |
| 76 | + while not self.ShouldDie: |
| 77 | + Blob = Desc.read(FormatSize) |
| 78 | + |
| 79 | + S = KernelInputEventStruct(Blob) |
| 80 | + |
| 81 | + if not S.code: continue #Filter useless data |
| 82 | + |
| 83 | + print(f'Power button event {S.__dict__}') |
| 84 | + |
| 85 | + with self.__ButtonStates[ButtonType.POWER] as Button: |
| 86 | + Button.IsPressed = bool(S.value) |
| 87 | + Button.LastChangeTime = Button.ChangeTime |
| 88 | + Button.ChangeTime = S.MSTime |
| 89 | + |
| 90 | + self.__ButtonEvent.put(self.GetButtonStates()) |
| 91 | + |
| 92 | + def GetButtonStates(self): |
| 93 | + return { S : self.__ButtonStates[S].Clone() for S in self.__ButtonStates } |
| 94 | + |
0 commit comments