Skip to content

Commit

Permalink
Get rid of ThreadQueueInterface in Text interfaces #500
Browse files Browse the repository at this point in the history
  • Loading branch information
yozik04 committed Sep 17, 2024
1 parent 219e508 commit a1af889
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 170 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/asottile/pyupgrade
rev: v3.15.2
rev: v3.17.0
hooks:
- id: pyupgrade
args: ["--py37-plus"]

- repo: https://github.com/psf/black
rev: 24.4.0
rev: 24.8.0
hooks:
- id: black
args:
Expand All @@ -35,7 +35,7 @@ repos:
- id: isort

- repo: https://github.com/PyCQA/flake8
rev: 7.0.0
rev: 7.1.1
hooks:
- id: flake8
additional_dependencies: [flake8-bugbear]
Expand Down
11 changes: 3 additions & 8 deletions paradox/interfaces/text/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

from paradox.config import config as cfg
from paradox.event import Event, EventLevel, Notification
from paradox.interfaces import ThreadQueueInterface
from paradox.interfaces import AsyncInterface
from paradox.lib import ps
from paradox.lib.event_filter import EventFilter, EventTagFilter, LiveEventRegexpFilter

logger = logging.getLogger("PAI").getChild(__name__)


class AbstractTextInterface(ThreadQueueInterface):
class AbstractTextInterface(AsyncInterface):
"""Interface Class using any Text interface"""

def __init__(self, alarm, event_filter: EventFilter, min_level=EventLevel.INFO):
Expand All @@ -20,12 +20,7 @@ def __init__(self, alarm, event_filter: EventFilter, min_level=EventLevel.INFO):
self.min_level = min_level
self.alarm = alarm

def stop(self):
super().stop()

def _run(self):
super()._run()

async def run(self):
ps.subscribe(self.handle_panel_event, "events")
ps.subscribe(self.handle_notify, "notifications")

Expand Down
142 changes: 54 additions & 88 deletions paradox/interfaces/text/gsm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
# -*- coding: utf-8 -*-

import asyncio
import datetime
import json
import logging
import os
from concurrent import futures

import serial_asyncio

Expand All @@ -24,22 +20,16 @@

class SerialConnectionProtocol(ConnectionProtocol):
def __init__(self, handler: ConnectionHandler):
super(SerialConnectionProtocol, self).__init__(handler)
self.buffer = b""
self.loop = asyncio.get_event_loop()
super().__init__(handler)
self.last_message = b""

def connection_made(self, transport):
super(SerialConnectionProtocol, self).connection_made(transport)
self.handler.on_connection()

async def send_message(self, message):
self.last_message = message
self.transport.write(message + b"\r\n")

def data_received(self, recv_data):
self.buffer += recv_data
logger.debug("BUFFER: {}".format(self.buffer))
logger.debug(f"BUFFER: {self.buffer}")
while len(self.buffer) >= 0:
r = self.buffer.find(b"\r\n")
# not found
Expand All @@ -61,25 +51,22 @@ def data_received(self, recv_data):
if self.last_message == frame:
self.last_message = b""
elif len(frame) > 0:
self.loop.create_task(self.handler.on_message(frame)) # Callback
self.handler.on_message(frame) # Callback

def connection_lost(self, exc):
logger.error("The serial port was closed")
self.buffer = b""
self.last_message = b""
super(SerialConnectionProtocol, self).connection_lost(exc)
super().connection_lost(exc)


class SerialCommunication(ConnectionHandler):
def __init__(self, loop, port, baud=9600, timeout=5, recv_callback=None):
def __init__(self, port, baud=9600, timeout=5):
self.port_path = port
self.baud = baud
self.connected_future = None
self.recv_callback = recv_callback
self.loop = loop
self.recv_callback = None
self.connected = False
self.connection = None
asyncio.set_event_loop(loop)
self.queue = asyncio.Queue()

def clear(self):
Expand All @@ -96,14 +83,12 @@ def on_connection(self):
self.connected = True

def on_message(self, message: bytes):
logger.debug("M->I: {}".format(message))
logger.debug(f"M->I: {message}")

if self.recv_callback is not None:
return asyncio.get_event_loop().call_soon(
self.recv_callback(message)
) # Callback
self.recv_callback(message) # Callback
else:
return self.queue.put_nowait(message)
self.queue.put_nowait(message)

def set_recv_callback(self, callback):
self.recv_callback = callback
Expand All @@ -120,23 +105,23 @@ def make_protocol(self):
return SerialConnectionProtocol(self)

async def write(self, message, timeout=15):
logger.debug("I->M: {}".format(message))
logger.debug(f"I->M: {message}")
if self.connection is not None:
await self.connection.send_message(message)
return await asyncio.wait_for(self.queue.get(), timeout=5, loop=self.loop)
return await asyncio.wait_for(self.queue.get(), timeout=5)

async def read(self, timeout=5):
if self.connection is not None:
return await asyncio.wait_for(self.queue.get(), timeout=timeout)

async def connect(self):
logger.info("Connecting to serial port {}".format(self.port_path))
logger.info(f"Connecting to serial port {self.port_path}")

self.connected_future = self.loop.create_future()
self.loop.call_later(5, self.open_timeout)
self.connected_future = asyncio.get_event_loop().create_future()
asyncio.get_event_loop().call_later(5, self.open_timeout)

_, self.connection = await serial_asyncio.create_serial_connection(
self.loop, self.make_protocol, self.port_path, self.baud
asyncio.get_event_loop(), self.make_protocol, self.port_path, self.baud
)

return await self.connected_future
Expand All @@ -156,73 +141,64 @@ def __init__(self, alarm):

self.port = None
self.modem_connected = False
self.loop = asyncio.new_event_loop()
self.message_cmt = None

def stop(self):
""" Stops the GSM Interface Thread"""
self.stop_running.set()

self.loop.stop()
"""Stops the GSM Interface"""
super().stop()
logger.debug("GSM Stopped. TODO: Implement a proper stop")

logger.debug("GSM Stopped")

def write(self, message: str, expected: str = None) -> None:
async def write(self, message: str, expected: str = None) -> None:
r = b""
while r != expected:
r = self.loop.run_until_complete(self.port.write(message))
r = await self.port.write(message)
data = b""

if r == b"ERROR":
raise Exception("Got error from modem: {}".format(r))
raise Exception(f"Got error from modem: {r}")

while r != expected:
r = self.loop.run_until_complete(self.port.read())
r = await self.port.read()
data += r + b"\n"

def connect(self):
logger.info(
"Using {} at {} baud".format(cfg.GSM_MODEM_PORT, cfg.GSM_MODEM_BAUDRATE)
)
async def connect(self):
logger.info(f"Using {cfg.GSM_MODEM_PORT} at {cfg.GSM_MODEM_BAUDRATE} baud")
try:
if not os.path.exists(cfg.GSM_MODEM_PORT):
logger.error("Modem port ({}) not found".format(cfg.GSM_MODEM_PORT))
logger.error(f"Modem port ({cfg.GSM_MODEM_PORT}) not found")
return False

self.port = SerialCommunication(
self.loop, cfg.GSM_MODEM_PORT, cfg.GSM_MODEM_BAUDRATE, 5
cfg.GSM_MODEM_PORT, cfg.GSM_MODEM_BAUDRATE, 5
)

except:
logger.exception(
"Could not open port {} for GSM modem".format(cfg.GSM_MODEM_PORT)
)
except Exception:
logger.exception(f"Could not open port {cfg.GSM_MODEM_PORT} for GSM modem")
return False

self.port.set_recv_callback(None)
result = self.loop.run_until_complete(self.port.connect())
result = await self.port.connect()

if not result:
logger.exception("Could not connect to GSM modem")
return False

try:
self.write(b"AT", b"OK") # Init
self.write(b"ATE0", b"OK") # Disable Echo
self.write(b"AT+CMEE=2", b"OK") # Increase verbosity
self.write(b"AT+CMGF=1", b"OK") # SMS Text mode
self.write(b"AT+CFUN=1", b"OK") # Enable modem
self.write(
await self.write(b"AT", b"OK") # Init
await self.write(b"ATE0", b"OK") # Disable Echo
await self.write(b"AT+CMEE=2", b"OK") # Increase verbosity
await self.write(b"AT+CMGF=1", b"OK") # SMS Text mode
await self.write(b"AT+CFUN=1", b"OK") # Enable modem
await self.write(
b"AT+CNMI=1,2,0,0,0", b"OK"
) # SMS received only when modem enabled, Use +CMT with SMS, No Status Report,
self.write(b"AT+CUSD=1", b"OK") # Enable result code presentation
await self.write(b"AT+CUSD=1", b"OK") # Enable result code presentation

except futures.TimeoutError as e:
except asyncio.TimeoutError:
logger.error("No reply from modem")
return False

except:
except Exception:
logger.exception("Modem connect error")
return False

Expand All @@ -234,18 +210,14 @@ def connect(self):
self.modem_connected = True
return True

def _run(self):
super(GSMTextInterface, self)._run()
async def run(self):
await super().run()

while not self.modem_connected and not self.stop_running.isSet():
if not self.connect():
while not self.modem_connected:
if not await self.connect():
logger.warning("Could not connect to modem")

self.stop_running.wait(5)

self.loop.run_forever()

self.stop_running.wait()
await asyncio.sleep(5)

async def data_received(self, data: str) -> bool:
logger.debug(f"Data Received: {data}")
Expand All @@ -262,29 +234,26 @@ async def data_received(self, data: str) -> bool:

return True

def handle_message(self, timestamp: str, source: str, message: str) -> None:
""" Handle GSM message. It should be a command """
async def handle_message(self, timestamp: str, source: str, message: str) -> None:
"""Handle GSM message. It should be a command"""

logger.debug("Received: {} {} {}".format(timestamp, source, message))
logger.debug(f"Received: {timestamp} {source} {message}")

if source in cfg.GSM_CONTACTS:
future = asyncio.run_coroutine_threadsafe(
self.handle_command(message), self.alarm.work_loop
)
ret = future.result(10)
ret = await self.handle_command(message)

m = "GSM {}: {}".format(source, ret)
m = f"GSM {source}: {ret}"
logger.info(m)
else:
m = "GSM {} (UNK): {}".format(source, message)
m = f"GSM {source} (UNK): {message}"
logger.warning(m)

self.send_message(m, EventLevel.INFO)
ps.sendNotification(
Notification(sender=self.name, message=m, level=EventLevel.INFO)
)

def send_message(self, message: str, level: EventLevel) -> None:
async def send_message(self, message: str, level: EventLevel) -> None:
if self.port is None:
logger.warning("GSM not available when sending message")
return
Expand All @@ -293,12 +262,9 @@ def send_message(self, message: str, level: EventLevel) -> None:
data = b'AT+CMGS="%b"\x0d%b\x1a' % (dst.encode(), message.encode())

try:
future = asyncio.run_coroutine_threadsafe(
self.port.write(data), self.loop
)
result = future.result()
logger.debug("SMS result: {}".format(result))
except:
result = await self.port.write(data)
logger.debug(f"SMS result: {result}")
except Exception:
logger.exception("ERROR sending SMS")

def process_cmt(self, header: str, text: str) -> None:
Expand All @@ -308,8 +274,8 @@ def process_cmt(self, header: str, text: str) -> None:

tokens = json.loads(f"[{header[idx:]}]", strict=False)

logger.debug("On {}, {} sent {}".format(tokens[2], tokens[0], text))
self.handle_message(tokens[2], tokens[0], text)
logger.debug(f"On {tokens[2]}, {tokens[0]} sent {text}")
asyncio.create_task(self.handle_message(tokens[2], tokens[0], text))

def process_cusd(self, message: str) -> None:
idx = message.find(" ")
Expand Down
Loading

0 comments on commit a1af889

Please sign in to comment.