From 49a20f773b838bdd3e4b7a10887d9830b2a32e0a Mon Sep 17 00:00:00 2001 From: Nikolay Korotkiy Date: Thu, 12 Jan 2023 18:43:35 +0400 Subject: [PATCH] Fix formatting (#7) --- Jenkinsfile | 2 +- README.md | 1 + debian/changelog | 6 +++++ wb_common/adc.py | 34 ++++++++++++++++------------- wb_common/beeper.py | 17 ++++++++------- wb_common/can.py | 20 +++++++++-------- wb_common/gpio.py | 33 +++++++++++++++------------- wb_common/gsm.py | 6 ++++- wb_common/leds.py | 15 ++++++++----- wb_common/sysinfo.py | 3 ++- wb_common/wbmqtt.py | 52 +++++++++++++++++++++++--------------------- wb_common/wifi.py | 8 ++++--- 12 files changed, 113 insertions(+), 84 deletions(-) create mode 100644 README.md diff --git a/Jenkinsfile b/Jenkinsfile index 9732e2d..c38aa09 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1 +1 @@ -buildDebArchAll() +buildDebArchAll defaultRunPythonChecks: true diff --git a/README.md b/README.md new file mode 100644 index 0000000..0af9a82 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +## Wiren Board Python common library and helpers diff --git a/debian/changelog b/debian/changelog index eb4c27e..eb5f4d8 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +wb-common (2.0.1) stable; urgency=medium + + * Fix formatting + + -- Nikolay Korotkiy Thu, 12 Jan 2023 16:50:00 +0400 + wb-common (2.0.0) stable; urgency=medium * remove python2 package diff --git a/wb_common/adc.py b/wb_common/adc.py index df08b5b..199e85c 100644 --- a/wb_common/adc.py +++ b/wb_common/adc.py @@ -1,44 +1,48 @@ # coding: utf-8 -import unittest -import time import subprocess -from subprocess import Popen, PIPE +import time +import unittest +from subprocess import PIPE, Popen + class ADC(object): - N_SAMPLES=30 + N_SAMPLES = 30 + def setup(self): subprocess.call("killall -9 wb-homa-adc", shell=True) - def set_scale(self, channel, scale): - open('/sys/bus/iio/devices/iio:device0/in_voltage%d_scale' % channel, 'wt').write(scale + '\n') + open("/sys/bus/iio/devices/iio:device0/in_voltage%d_scale" % channel, "wt").write(scale + "\n") def get_available_scales(self, channel): - return open('/sys/bus/iio/devices/iio:device0/in_voltage%d_scale_available' % channel).read().strip().split() + return ( + open("/sys/bus/iio/devices/iio:device0/in_voltage%d_scale_available" % channel) + .read() + .strip() + .split() + ) def read_mux_value(self, mux_ch): subprocess.call("wb-adc-set-mux %d" % mux_ch, shell=True) - time.sleep(100E-3) + time.sleep(100e-3) return self.read_phys_ch_value(1) def read_mux_value_with_source(self, mux_ch, current): subprocess.call("wb-adc-set-mux %d" % mux_ch, shell=True) - time.sleep(100E-3) + time.sleep(100e-3) subprocess.call("lradc-set-current %duA" % current, shell=True) - time.sleep(10E-3) - + time.sleep(10e-3) value = self.read_phys_ch_value(1) - subprocess.call("lradc-set-current off" , shell=True) + subprocess.call("lradc-set-current off", shell=True) return value def read_phys_ch_value(self, channel): values = [] for i in range(self.N_SAMPLES): - v = int(open('/sys/bus/iio/devices/iio:device0/in_voltage%d_raw' % channel).read()) + v = int(open("/sys/bus/iio/devices/iio:device0/in_voltage%d_raw" % channel).read()) values.append(v) - #~ time.sleep(20) + # ~ time.sleep(20) return 1.0 * sum(values) / len(values) - diff --git a/wb_common/beeper.py b/wb_common/beeper.py index 7c1d42a..78c0670 100644 --- a/wb_common/beeper.py +++ b/wb_common/beeper.py @@ -1,25 +1,26 @@ import os import time + class Beeper(object): - PWM_DIR_TEMPLATE = '/sys/class/pwm/pwmchip0/' + PWM_DIR_TEMPLATE = "/sys/class/pwm/pwmchip0/" def __init__(self, pwm_num): self.pwm_num = pwm_num - self.pwm_dir = os.path.join(self.PWM_DIR_TEMPLATE, 'pwm%s' % str(pwm_num)) + self.pwm_dir = os.path.join(self.PWM_DIR_TEMPLATE, "pwm%s" % str(pwm_num)) def set(self, enabled): - open(os.path.join(self.pwm_dir, 'enable'), 'w').write(('1' if enabled else '0') + '\n') + open(os.path.join(self.pwm_dir, "enable"), "w").write(("1" if enabled else "0") + "\n") def setup(self, period=250000, duty_cycle=125000): if not os.path.exists(self.pwm_dir): - open(os.path.join(self.PWM_DIR_TEMPLATE, 'export'), 'w').write(str(self.pwm_num) + '\n') + open(os.path.join(self.PWM_DIR_TEMPLATE, "export"), "w").write(str(self.pwm_num) + "\n") self.set(0) - open(os.path.join(self.pwm_dir, 'period'), 'w').write('%d\n' % period) - open(os.path.join(self.pwm_dir, 'duty_cycle'), 'w').write('%d\n' % duty_cycle) + open(os.path.join(self.pwm_dir, "period"), "w").write("%d\n" % period) + open(os.path.join(self.pwm_dir, "duty_cycle"), "w").write("%d\n" % duty_cycle) def beep(self, duration, repeat=1): - try: #To prevent from stucking in '1' state + try: # To prevent from stucking in '1' state for i in range(repeat): if i != 0: time.sleep(duration) @@ -37,7 +38,7 @@ def test(self): Initiallizing beeper to call directly from imported module example: import beeper; beeper.beep(1, 2) """ -_beeper = Beeper(os.environ['WB_PWM_BUZZER']) +_beeper = Beeper(os.environ["WB_PWM_BUZZER"]) _beeper.setup() setup = _beeper.setup diff --git a/wb_common/can.py b/wb_common/can.py index 18abeb1..3138fc9 100644 --- a/wb_common/can.py +++ b/wb_common/can.py @@ -5,8 +5,9 @@ import subprocess import threading + class CanPort(object): - def __init__(self, iface = 'can0', bitrate=115200): + def __init__(self, iface="can0", bitrate=115200): self.iface = iface self.bitrate = bitrate @@ -22,23 +23,25 @@ def setup(self): def send(self, addr, data): addr_str = hex(addr)[2:][:3].zfill(3) data_str = binascii.hexlify(data) - subprocess.call("cansend %s %s#%s" % (self.iface , addr_str, data_str), shell=True) + subprocess.call("cansend %s %s#%s" % (self.iface, addr_str, data_str), shell=True) - def receive(self, timeout_ms = 1000): - proc = subprocess.Popen("candump %s -s0 -L -T %s" % (self.iface, timeout_ms) , shell=True, stdout=subprocess.PIPE) + def receive(self, timeout_ms=1000): + proc = subprocess.Popen( + "candump %s -s0 -L -T %s" % (self.iface, timeout_ms), shell=True, stdout=subprocess.PIPE + ) stdout, stderr = proc.communicate() if proc.returncode != 0: raise RuntimeError("candump failed") stdout_str = stdout.strip() frames = [] - for line in stdout_str.split('\n'): + for line in stdout_str.split("\n"): line = line.strip() if line: - parts = line.split(' ') + parts = line.split(" ") if len(parts) == 3: ts, iface, packet = parts - addr_str, data_str = packet.split('#') + addr_str, data_str = packet.split("#") addr = int(addr_str, 16) data = binascii.unhexlify(data_str) frames.append((addr, data)) @@ -48,11 +51,10 @@ def _receiver_work(self, timeout_ms): frames = self.receive(timeout_ms) self._frames = frames - def start_receive(self, timeout_ms = 1000): + def start_receive(self, timeout_ms=1000): self.receive_thread = threading.Thread(target=self._receiver_work, args=(timeout_ms,)) self.receive_thread.start() def get_received_data(self): self.receive_thread.join() return self._frames - diff --git a/wb_common/gpio.py b/wb_common/gpio.py index e7246ee..de1fd2c 100644 --- a/wb_common/gpio.py +++ b/wb_common/gpio.py @@ -1,10 +1,11 @@ from __future__ import print_function -from six import iteritems -import threading import select +import threading from collections import defaultdict +from six import iteritems + class GPIOHandler(object): IN = "in" @@ -36,7 +37,7 @@ def gpio_polling_thread(self): for gpio, fd in iteritems(self.gpio_fds): if fileno == fd.fileno(): if self.gpio_first_event_fired[gpio]: - #~ print "fire callback" + # ~ print "fire callback" cb = self.event_callbacks.get(gpio) if cb is not None: cb(gpio) @@ -44,17 +45,17 @@ def gpio_polling_thread(self): self.gpio_first_event_fired[gpio] = True def export(self, gpio): - open('/sys/class/gpio/export', 'wt').write("%d\n" % gpio) + open("/sys/class/gpio/export", "wt").write("%d\n" % gpio) def unexport(self, gpio): - open('/sys/class/gpio/unexport', 'wt').write("%d\n" % gpio) + open("/sys/class/gpio/unexport", "wt").write("%d\n" % gpio) def setup(self, gpio, direction): self.export(gpio) - open('/sys/class/gpio/gpio%d/direction' % gpio, 'wt').write("%s\n" % direction) + open("/sys/class/gpio/gpio%d/direction" % gpio, "wt").write("%s\n" % direction) def _open(self, gpio): - fd = open('/sys/class/gpio/gpio%d/value' % gpio, 'r+') + fd = open("/sys/class/gpio/gpio%d/value" % gpio, "r+") self.gpio_fds[gpio] = fd def _check_open(self, gpio): @@ -65,7 +66,7 @@ def output(self, gpio, value): self._check_open(gpio) self.gpio_fds[gpio].seek(0) - self.gpio_fds[gpio].write('1' if value else '0') + self.gpio_fds[gpio].write("1" if value else "0") self.gpio_fds[gpio].flush() def input(self, gpio): @@ -73,16 +74,16 @@ def input(self, gpio): self.gpio_fds[gpio].seek(0) val = self.gpio_fds[gpio].read().strip() - return False if val == '0' else True + return False if val == "0" else True def request_gpio_interrupt(self, gpio, edge): - open('/sys/class/gpio/gpio%d/edge' % gpio, 'wt').write("%s\n" % edge) + open("/sys/class/gpio/gpio%d/edge" % gpio, "wt").write("%s\n" % edge) self._check_open(gpio) def add_event_detect(self, gpio, edge, callback): self.request_gpio_interrupt(gpio, edge) - already_present = (gpio in self.event_callbacks) + already_present = gpio in self.event_callbacks self.event_callbacks[gpio] = callback if not already_present: self.gpio_first_event_fired[gpio] = False @@ -97,18 +98,20 @@ def remove_event_detect(self, gpio): def wait_for_edge(self, gpio, edge, timeout=None): if timeout is None: - timeout = 1E100 + timeout = 1e100 event = threading.Event() event.clear() self.add_event_detect(gpio, edge, lambda x: event.set()) - #~ print "wait for edge..." + # ~ print "wait for edge..." ret = event.wait(timeout) - #~ print "wait for edge done" + # ~ print "wait for edge done" self.remove_event_detect(gpio) return ret - #~ self.irq_gpio, GPIO.RISING, callback=self.interruptHandler) + # ~ self.irq_gpio, GPIO.RISING, callback=self.interruptHandler) + + GPIO = GPIOHandler() diff --git a/wb_common/gsm.py b/wb_common/gsm.py index 9f7daa0..c65d9d7 100644 --- a/wb_common/gsm.py +++ b/wb_common/gsm.py @@ -1,11 +1,12 @@ # coding: utf-8 from __future__ import print_function + import os import subprocess def gsm_decode(hexstr): - return os.popen('echo %s | xxd -r -ps | iconv -f=UTF-16BE -t=UTF-8' % hexstr).read() + return os.popen("echo %s | xxd -r -ps | iconv -f=UTF-16BE -t=UTF-8" % hexstr).read() def init_gsm(): @@ -13,11 +14,13 @@ def init_gsm(): if retcode != 0: raise RuntimeError("gsm init failed") + def init_baudrate(): retcode = subprocess.call("wb-gsm init_baud", shell=True) if retcode != 0: raise RuntimeError("gsm init baudrate failed") + def gsm_get_imei(): proc = subprocess.Popen("wb-gsm imei", shell=True, stdout=subprocess.PIPE) stdout, stderr = proc.communicate() @@ -26,6 +29,7 @@ def gsm_get_imei(): return stdout.strip() + def split_imei(imei): imei = str(imei) if not imei.isdigit(): diff --git a/wb_common/leds.py b/wb_common/leds.py index edc098a..13fca78 100644 --- a/wb_common/leds.py +++ b/wb_common/leds.py @@ -1,17 +1,20 @@ from __future__ import print_function + import os -SYS_PREFIX='/sys/class/leds/' +SYS_PREFIX = "/sys/class/leds/" + def set_brightness(led, val): - open(SYS_PREFIX + led + '/brightness', 'wt').write(str(val) + '\n') + open(SYS_PREFIX + led + "/brightness", "wt").write(str(val) + "\n") + def set_blink(led, delay_on=100, delay_off=100): - open(SYS_PREFIX + led + '/trigger', 'wt').write('timer\n') + open(SYS_PREFIX + led + "/trigger", "wt").write("timer\n") + + open(SYS_PREFIX + led + "/delay_on", "wt").write(str(delay_on) + "\n") + open(SYS_PREFIX + led + "/delay_off", "wt").write(str(delay_off) + "\n") - open(SYS_PREFIX + led + '/delay_on', 'wt').write(str(delay_on) + '\n') - open(SYS_PREFIX + led + '/delay_off', 'wt').write(str(delay_off) + '\n') def blink_fast(led): set_blink(led, 50, 50) - diff --git a/wb_common/sysinfo.py b/wb_common/sysinfo.py index 2e4b262..be769d3 100644 --- a/wb_common/sysinfo.py +++ b/wb_common/sysinfo.py @@ -1,4 +1,5 @@ from __future__ import print_function + import os @@ -10,4 +11,4 @@ def get_fw_version(): def get_wb_version(): - return os.environ['WB_VERSION'] + return os.environ["WB_VERSION"] diff --git a/wb_common/wbmqtt.py b/wb_common/wbmqtt.py index 99806db..b61ce68 100644 --- a/wb_common/wbmqtt.py +++ b/wb_common/wbmqtt.py @@ -1,38 +1,37 @@ #!/usr/bin/python from __future__ import print_function -import mosquitto -import time -from collections import defaultdict import logging +import time +from collections import defaultdict + +import mosquitto VALUES_MASK = "/devices/+/controls/+" ERRORS_MASK = "/devices/+/controls/+/meta/error" - - from functools import wraps + def timing(f): @wraps(f) def wrap(*args, **kw): ts = time.time() result = f(*args, **kw) te = time.time() - print('func:%r args:[%r, %r] took: %2.4f sec' % \ - (f.__name__, args, kw, te-ts)) + print("func:%r args:[%r, %r] took: %2.4f sec" % (f.__name__, args, kw, te - ts)) return result - return wrap - + return wrap class CellSpec(object): - def __init__(self, value = None, error = None): + def __init__(self, value=None, error=None): self.value = value self.error = error + class MQTTConnection(mosquitto.Mosquitto): def loop_forever(self, timeout=1.0, max_packets=1): mosquitto.Mosquitto.loop_forever(self, timeout=0.05) @@ -43,7 +42,7 @@ def __init__(self): self.control_values = defaultdict(lambda: CellSpec()) self.client = MQTTConnection() - self.client.connect('localhost', 1883) + self.client.connect("localhost", 1883) self.client.on_message = self.on_mqtt_message self.client.loop_start() @@ -55,15 +54,15 @@ def __init__(self): @staticmethod def _get_channel_topic(device_id, control_id): - return '/devices/%s/controls/%s' % (device_id, control_id) + return "/devices/%s/controls/%s" % (device_id, control_id) def watch_device(self, device_id): if device_id in self.device_subscriptions: return else: - topic = self._get_channel_topic(device_id, '+') + topic = self._get_channel_topic(device_id, "+") self.client.subscribe(topic) - self.client.subscribe(topic + '/meta/error') + self.client.subscribe(topic + "/meta/error") self.device_subscriptions.add(device_id) def watch_channel(self, device_id, control_id): @@ -74,17 +73,17 @@ def watch_channel(self, device_id, control_id): topic = self._get_channel_topic(device_id, control_id) self.client.subscribe(topic) - self.client.subscribe(topic + '/meta/error') + self.client.subscribe(topic + "/meta/error") self.channel_subscriptions.add((device_id, control_id)) def unwatch_channel(self, device_id, control_id): topic = self._get_channel_topic(device_id, control_id) self.client.unsubscribe(topic) - self.client.unsubscribe(topic + '/meta/error') + self.client.unsubscribe(topic + "/meta/error") @staticmethod def _get_channel(topic): - parts = topic.split('/') + parts = topic.split("/") device_id = parts[2] control_id = parts[4] return device_id, control_id @@ -105,6 +104,7 @@ def on_mqtt_message(self, arg0, arg1, arg2=None): self.control_values[self._get_channel(msg.topic)].error = msg.payload or None # print "on msg", msg.topic, msg.payload, "took %d ms" % ((time.time() - st)*1000) + def clear_values(self): for cell_spec in self.control_values.values(): cell_spec.value = None @@ -130,9 +130,10 @@ def get_last_or_next_value(self, device_id, control_id): return val else: return self.get_next_value(device_id, control_id) + # @timing def get_next_or_last_value(self, device_id, control_id, timeout=0.5): - """ wait for timeout for new value, return old one otherwise""" + """wait for timeout for new value, return old one otherwise""" val = self.get_next_value(device_id, control_id, timeout=timeout) if val is None: val = self.get_last_value(device_id, control_id) @@ -157,7 +158,7 @@ def get_next_value(self, device_id, control_id, timeout=10): time.sleep(0.01) - def get_stable_value(self, device_id, control_id, timeout=30, jitter=10): + def get_stable_value(self, device_id, control_id, timeout=30, jitter=10): start = time.time() last_val = None while time.time() - start < timeout: @@ -192,10 +193,10 @@ def get_average_value(self, device_id, control_id, interval=1): def send_value(self, device_id, control_id, new_value, retain=False): self.client.publish("/devices/%s/controls/%s/on" % (device_id, control_id), new_value, retain=retain) - def send_and_wait_for_value(self, device_id, control_id, new_value, retain=False, poll_interval=10E-3): - """ Sends the value to control/on topic, - then waits until control topic is updated by the corresponding - driver to the new value""" + def send_and_wait_for_value(self, device_id, control_id, new_value, retain=False, poll_interval=10e-3): + """Sends the value to control/on topic, + then waits until control topic is updated by the corresponding + driver to the new value""" self.send_value(device_id, control_id, new_value, retain) while self.get_last_or_next_value(device_id, control_id) != new_value: @@ -207,8 +208,9 @@ def close(self): def __del__(self): self.close() -if __name__ == '__main__': + +if __name__ == "__main__": time.sleep(1) - print(wbmqtt.get_last_value('wb-adc', 'A1')) + print(wbmqtt.get_last_value("wb-adc", "A1")) wbmqtt.close() diff --git a/wb_common/wifi.py b/wb_common/wifi.py index 11700ac..49424bc 100644 --- a/wb_common/wifi.py +++ b/wb_common/wifi.py @@ -2,16 +2,18 @@ import os + def get_wlan_ifaces(): - ifaces = os.listdir('/sys/class/net/') - wlan_ifaces = [x for x in ifaces if x.startswith('wlan')] + ifaces = os.listdir("/sys/class/net/") + wlan_ifaces = [x for x in ifaces if x.startswith("wlan")] return wlan_ifaces + def get_wlan_mac(): ifaces = get_wlan_ifaces() if ifaces: try: - return open('/sys/class/net/%s/address' % ifaces[0]).read().strip() + return open("/sys/class/net/%s/address" % ifaces[0]).read().strip() except: return None else: