Skip to content

Commit

Permalink
Fix formatting (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
sikmir authored Jan 12, 2023
1 parent c42a1e1 commit 49a20f7
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 84 deletions.
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1 +1 @@
buildDebArchAll()
buildDebArchAll defaultRunPythonChecks: true
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
## Wiren Board Python common library and helpers
6 changes: 6 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
wb-common (2.0.1) stable; urgency=medium

* Fix formatting

-- Nikolay Korotkiy <nikolay.korotkiy@wirenboard.com> Thu, 12 Jan 2023 16:50:00 +0400

wb-common (2.0.0) stable; urgency=medium

* remove python2 package
Expand Down
34 changes: 19 additions & 15 deletions wb_common/adc.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,48 @@
# coding: utf-8
import unittest
import time
import subprocess
from subprocess import Popen, PIPE
import time
import unittest
from subprocess import PIPE, Popen


class ADC(object):
N_SAMPLES=30
N_SAMPLES = 30

def setup(self):
subprocess.call("killall -9 wb-homa-adc", shell=True)


def set_scale(self, channel, scale):
open('/sys/bus/iio/devices/iio:device0/in_voltage%d_scale' % channel, 'wt').write(scale + '\n')
open("/sys/bus/iio/devices/iio:device0/in_voltage%d_scale" % channel, "wt").write(scale + "\n")

def get_available_scales(self, channel):
return open('/sys/bus/iio/devices/iio:device0/in_voltage%d_scale_available' % channel).read().strip().split()
return (
open("/sys/bus/iio/devices/iio:device0/in_voltage%d_scale_available" % channel)
.read()
.strip()
.split()
)

def read_mux_value(self, mux_ch):
subprocess.call("wb-adc-set-mux %d" % mux_ch, shell=True)
time.sleep(100E-3)
time.sleep(100e-3)
return self.read_phys_ch_value(1)

def read_mux_value_with_source(self, mux_ch, current):

subprocess.call("wb-adc-set-mux %d" % mux_ch, shell=True)
time.sleep(100E-3)
time.sleep(100e-3)
subprocess.call("lradc-set-current %duA" % current, shell=True)
time.sleep(10E-3)

time.sleep(10e-3)

value = self.read_phys_ch_value(1)
subprocess.call("lradc-set-current off" , shell=True)
subprocess.call("lradc-set-current off", shell=True)

return value

def read_phys_ch_value(self, channel):
values = []
for i in range(self.N_SAMPLES):
v = int(open('/sys/bus/iio/devices/iio:device0/in_voltage%d_raw' % channel).read())
v = int(open("/sys/bus/iio/devices/iio:device0/in_voltage%d_raw" % channel).read())
values.append(v)
#~ time.sleep(20)
# ~ time.sleep(20)
return 1.0 * sum(values) / len(values)

17 changes: 9 additions & 8 deletions wb_common/beeper.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import os
import time


class Beeper(object):
PWM_DIR_TEMPLATE = '/sys/class/pwm/pwmchip0/'
PWM_DIR_TEMPLATE = "/sys/class/pwm/pwmchip0/"

def __init__(self, pwm_num):
self.pwm_num = pwm_num
self.pwm_dir = os.path.join(self.PWM_DIR_TEMPLATE, 'pwm%s' % str(pwm_num))
self.pwm_dir = os.path.join(self.PWM_DIR_TEMPLATE, "pwm%s" % str(pwm_num))

def set(self, enabled):
open(os.path.join(self.pwm_dir, 'enable'), 'w').write(('1' if enabled else '0') + '\n')
open(os.path.join(self.pwm_dir, "enable"), "w").write(("1" if enabled else "0") + "\n")

def setup(self, period=250000, duty_cycle=125000):
if not os.path.exists(self.pwm_dir):
open(os.path.join(self.PWM_DIR_TEMPLATE, 'export'), 'w').write(str(self.pwm_num) + '\n')
open(os.path.join(self.PWM_DIR_TEMPLATE, "export"), "w").write(str(self.pwm_num) + "\n")
self.set(0)
open(os.path.join(self.pwm_dir, 'period'), 'w').write('%d\n' % period)
open(os.path.join(self.pwm_dir, 'duty_cycle'), 'w').write('%d\n' % duty_cycle)
open(os.path.join(self.pwm_dir, "period"), "w").write("%d\n" % period)
open(os.path.join(self.pwm_dir, "duty_cycle"), "w").write("%d\n" % duty_cycle)

def beep(self, duration, repeat=1):
try: #To prevent from stucking in '1' state
try: # To prevent from stucking in '1' state
for i in range(repeat):
if i != 0:
time.sleep(duration)
Expand All @@ -37,7 +38,7 @@ def test(self):
Initiallizing beeper to call directly from imported module
example: import beeper; beeper.beep(1, 2)
"""
_beeper = Beeper(os.environ['WB_PWM_BUZZER'])
_beeper = Beeper(os.environ["WB_PWM_BUZZER"])
_beeper.setup()

setup = _beeper.setup
Expand Down
20 changes: 11 additions & 9 deletions wb_common/can.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import subprocess
import threading


class CanPort(object):
def __init__(self, iface = 'can0', bitrate=115200):
def __init__(self, iface="can0", bitrate=115200):
self.iface = iface
self.bitrate = bitrate

Expand All @@ -22,23 +23,25 @@ def setup(self):
def send(self, addr, data):
addr_str = hex(addr)[2:][:3].zfill(3)
data_str = binascii.hexlify(data)
subprocess.call("cansend %s %s#%s" % (self.iface , addr_str, data_str), shell=True)
subprocess.call("cansend %s %s#%s" % (self.iface, addr_str, data_str), shell=True)

def receive(self, timeout_ms = 1000):
proc = subprocess.Popen("candump %s -s0 -L -T %s" % (self.iface, timeout_ms) , shell=True, stdout=subprocess.PIPE)
def receive(self, timeout_ms=1000):
proc = subprocess.Popen(
"candump %s -s0 -L -T %s" % (self.iface, timeout_ms), shell=True, stdout=subprocess.PIPE
)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise RuntimeError("candump failed")

stdout_str = stdout.strip()
frames = []
for line in stdout_str.split('\n'):
for line in stdout_str.split("\n"):
line = line.strip()
if line:
parts = line.split(' ')
parts = line.split(" ")
if len(parts) == 3:
ts, iface, packet = parts
addr_str, data_str = packet.split('#')
addr_str, data_str = packet.split("#")
addr = int(addr_str, 16)
data = binascii.unhexlify(data_str)
frames.append((addr, data))
Expand All @@ -48,11 +51,10 @@ def _receiver_work(self, timeout_ms):
frames = self.receive(timeout_ms)
self._frames = frames

def start_receive(self, timeout_ms = 1000):
def start_receive(self, timeout_ms=1000):
self.receive_thread = threading.Thread(target=self._receiver_work, args=(timeout_ms,))
self.receive_thread.start()

def get_received_data(self):
self.receive_thread.join()
return self._frames

33 changes: 18 additions & 15 deletions wb_common/gpio.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import print_function
from six import iteritems

import threading
import select
import threading
from collections import defaultdict

from six import iteritems


class GPIOHandler(object):
IN = "in"
Expand Down Expand Up @@ -36,25 +37,25 @@ def gpio_polling_thread(self):
for gpio, fd in iteritems(self.gpio_fds):
if fileno == fd.fileno():
if self.gpio_first_event_fired[gpio]:
#~ print "fire callback"
# ~ print "fire callback"
cb = self.event_callbacks.get(gpio)
if cb is not None:
cb(gpio)
else:
self.gpio_first_event_fired[gpio] = True

def export(self, gpio):
open('/sys/class/gpio/export', 'wt').write("%d\n" % gpio)
open("/sys/class/gpio/export", "wt").write("%d\n" % gpio)

def unexport(self, gpio):
open('/sys/class/gpio/unexport', 'wt').write("%d\n" % gpio)
open("/sys/class/gpio/unexport", "wt").write("%d\n" % gpio)

def setup(self, gpio, direction):
self.export(gpio)
open('/sys/class/gpio/gpio%d/direction' % gpio, 'wt').write("%s\n" % direction)
open("/sys/class/gpio/gpio%d/direction" % gpio, "wt").write("%s\n" % direction)

def _open(self, gpio):
fd = open('/sys/class/gpio/gpio%d/value' % gpio, 'r+')
fd = open("/sys/class/gpio/gpio%d/value" % gpio, "r+")
self.gpio_fds[gpio] = fd

def _check_open(self, gpio):
Expand All @@ -65,24 +66,24 @@ def output(self, gpio, value):
self._check_open(gpio)

self.gpio_fds[gpio].seek(0)
self.gpio_fds[gpio].write('1' if value else '0')
self.gpio_fds[gpio].write("1" if value else "0")
self.gpio_fds[gpio].flush()

def input(self, gpio):
self._check_open(gpio)

self.gpio_fds[gpio].seek(0)
val = self.gpio_fds[gpio].read().strip()
return False if val == '0' else True
return False if val == "0" else True

def request_gpio_interrupt(self, gpio, edge):
open('/sys/class/gpio/gpio%d/edge' % gpio, 'wt').write("%s\n" % edge)
open("/sys/class/gpio/gpio%d/edge" % gpio, "wt").write("%s\n" % edge)
self._check_open(gpio)

def add_event_detect(self, gpio, edge, callback):
self.request_gpio_interrupt(gpio, edge)

already_present = (gpio in self.event_callbacks)
already_present = gpio in self.event_callbacks
self.event_callbacks[gpio] = callback
if not already_present:
self.gpio_first_event_fired[gpio] = False
Expand All @@ -97,18 +98,20 @@ def remove_event_detect(self, gpio):

def wait_for_edge(self, gpio, edge, timeout=None):
if timeout is None:
timeout = 1E100
timeout = 1e100

event = threading.Event()
event.clear()

self.add_event_detect(gpio, edge, lambda x: event.set())
#~ print "wait for edge..."
# ~ print "wait for edge..."
ret = event.wait(timeout)
#~ print "wait for edge done"
# ~ print "wait for edge done"
self.remove_event_detect(gpio)

return ret

#~ self.irq_gpio, GPIO.RISING, callback=self.interruptHandler)
# ~ self.irq_gpio, GPIO.RISING, callback=self.interruptHandler)


GPIO = GPIOHandler()
6 changes: 5 additions & 1 deletion wb_common/gsm.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
# coding: utf-8
from __future__ import print_function

import os
import subprocess


def gsm_decode(hexstr):
return os.popen('echo %s | xxd -r -ps | iconv -f=UTF-16BE -t=UTF-8' % hexstr).read()
return os.popen("echo %s | xxd -r -ps | iconv -f=UTF-16BE -t=UTF-8" % hexstr).read()


def init_gsm():
retcode = subprocess.call("wb-gsm restart_if_broken", shell=True)
if retcode != 0:
raise RuntimeError("gsm init failed")


def init_baudrate():
retcode = subprocess.call("wb-gsm init_baud", shell=True)
if retcode != 0:
raise RuntimeError("gsm init baudrate failed")


def gsm_get_imei():
proc = subprocess.Popen("wb-gsm imei", shell=True, stdout=subprocess.PIPE)
stdout, stderr = proc.communicate()
Expand All @@ -26,6 +29,7 @@ def gsm_get_imei():

return stdout.strip()


def split_imei(imei):
imei = str(imei)
if not imei.isdigit():
Expand Down
15 changes: 9 additions & 6 deletions wb_common/leds.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from __future__ import print_function

import os

SYS_PREFIX='/sys/class/leds/'
SYS_PREFIX = "/sys/class/leds/"


def set_brightness(led, val):
open(SYS_PREFIX + led + '/brightness', 'wt').write(str(val) + '\n')
open(SYS_PREFIX + led + "/brightness", "wt").write(str(val) + "\n")


def set_blink(led, delay_on=100, delay_off=100):
open(SYS_PREFIX + led + '/trigger', 'wt').write('timer\n')
open(SYS_PREFIX + led + "/trigger", "wt").write("timer\n")

open(SYS_PREFIX + led + "/delay_on", "wt").write(str(delay_on) + "\n")
open(SYS_PREFIX + led + "/delay_off", "wt").write(str(delay_off) + "\n")

open(SYS_PREFIX + led + '/delay_on', 'wt').write(str(delay_on) + '\n')
open(SYS_PREFIX + led + '/delay_off', 'wt').write(str(delay_off) + '\n')

def blink_fast(led):
set_blink(led, 50, 50)

3 changes: 2 additions & 1 deletion wb_common/sysinfo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import print_function

import os


Expand All @@ -10,4 +11,4 @@ def get_fw_version():


def get_wb_version():
return os.environ['WB_VERSION']
return os.environ["WB_VERSION"]
Loading

0 comments on commit 49a20f7

Please sign in to comment.