Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.9.1 Release #126

Merged
merged 3 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 11 additions & 62 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,29 @@
"""Setup module for zigpy-zigate"""
import os

import pathlib

from setuptools import find_packages, setup
from zigpy_zigate import __version__


# extracted from https://raspberrypi.stackexchange.com/questions/5100/detect-that-a-python-program-is-running-on-the-pi
def is_raspberry_pi(raise_on_errors=False):
"""Checks if Raspberry PI.

:return:
"""
try:
with open('/proc/cpuinfo', 'r') as cpuinfo:
found = False
for line in cpuinfo:
if line.startswith('Hardware'):
found = True
label, value = line.strip().split(':', 1)
value = value.strip()
if value not in (
'BCM2708',
'BCM2709',
'BCM2835',
'BCM2836'
):
if raise_on_errors:
raise ValueError(
'This system does not appear to be a '
'Raspberry Pi.'
)
else:
return False
if not found:
if raise_on_errors:
raise ValueError(
'Unable to determine if this system is a Raspberry Pi.'
)
else:
return False
except IOError:
if raise_on_errors:
raise ValueError('Unable to open `/proc/cpuinfo`.')
else:
return False

return True


requires = [
'pyserial>=3.5',
'pyserial-asyncio>=0.5; platform_system!="Windows"',
'pyserial-asyncio!=0.5; platform_system=="Windows"', # 0.5 broke writesv
'pyusb>=1.1.0',
'zigpy>=0.47.0',
]

if is_raspberry_pi():
requires.append('RPi.GPIO')

this_directory = os.path.join(os.path.abspath(os.path.dirname(__file__)))
with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f:
long_description = f.read()

setup(
name="zigpy-zigate",
version=__version__,
description="A library which communicates with ZiGate radios for zigpy",
long_description=long_description,
long_description=(pathlib.Path(__file__).parent / "README.md").read_text(),
long_description_content_type="text/markdown",
url="http://github.com/zigpy/zigpy-zigate",
author="Sébastien RAMAGE",
author_email="sebatien.ramage@gmail.com",
license="GPL-3.0",
packages=find_packages(exclude=['tests']),
install_requires=requires,
install_requires=[
'pyserial>=3.5',
'pyserial-asyncio>=0.5; platform_system!="Windows"',
'pyserial-asyncio!=0.5; platform_system=="Windows"', # 0.5 broke writes
'pyusb>=1.1.0',
'zigpy>=0.47.0',
'gpiozero',
],
tests_require=[
'pytest',
'pytest-asyncio',
Expand Down
2 changes: 0 additions & 2 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ async def mock_conn(loop, protocol_factory, **kwargs):
DEVICE_CONFIG = zigpy_zigate.config.SCHEMA_DEVICE(
{zigpy_zigate.config.CONF_DEVICE_PATH: port}
)
sys.modules['RPi'] = MagicMock()
sys.modules['RPi.GPIO'] = MagicMock()
res = await zigate_api.ZiGate.probe(DEVICE_CONFIG)
assert res is True
assert mock_raw_mode.call_count == 1
Expand Down
2 changes: 1 addition & 1 deletion zigpy_zigate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
MAJOR_VERSION = 0
MINOR_VERSION = 9
PATCH_VERSION = '0'
PATCH_VERSION = '1'
__short_version__ = '{}.{}'.format(MAJOR_VERSION, MINOR_VERSION)
__version__ = '{}.{}'.format(__short_version__, PATCH_VERSION)
179 changes: 108 additions & 71 deletions zigpy_zigate/common.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,38 @@
import re
import time
import os.path
import serial.tools.list_ports
import serial
import usb
import logging
import asyncio

from gpiozero import OutputDevice


LOGGER = logging.getLogger(__name__)

GPIO_PIN0 = 17
GPIO_PIN2 = 27


class UnclosableOutputDevice(OutputDevice):
"""
`OutputDevice` that never closes its pins. Allows for the last-written pin state to
be retained even after the `OutputDevice` is garbage collected.
"""

def __init__(
self, pin=None, *, active_high=True, initial_value=False, pin_factory=None
):
super().__init__(
pin,
active_high=active_high,
initial_value=initial_value,
pin_factory=pin_factory,
)
self._pin.close = lambda *args, **kwargs: None
self.pin_factory.close = lambda *args, **kwargs: None


def discover_port():
""" discover zigate port """
Expand Down Expand Up @@ -53,46 +78,44 @@ def is_zigate_wifi(port):
return port.startswith('socket://')


async def set_pizigate_running_mode():
try:
import RPi.GPIO as GPIO
LOGGER.info('Put PiZiGate in running mode')
GPIO.setmode(GPIO.BCM)
GPIO.setup(17, GPIO.OUT) # GPIO0
GPIO.setup(27, GPIO.OUT) # GPIO2
GPIO.output(27, GPIO.HIGH)
await asyncio.sleep(0.5)
GPIO.output(17, GPIO.LOW)
await asyncio.sleep(0.5)
GPIO.output(17, GPIO.HIGH)
await asyncio.sleep(0.5)
except Exception as e:
LOGGER.error('Unable to set PiZiGate GPIO, please check configuration')
LOGGER.error(str(e))


async def set_pizigate_flashing_mode():
try:
import RPi.GPIO as GPIO
LOGGER.info('Put PiZiGate in flashing mode')
GPIO.setmode(GPIO.BCM)
GPIO.setup(17, GPIO.OUT) # GPIO0
GPIO.setup(27, GPIO.OUT) # GPIO2
GPIO.output(27, GPIO.LOW)
await asyncio.sleep(0.5)
GPIO.output(17, GPIO.LOW)
await asyncio.sleep(0.5)
GPIO.output(17, GPIO.HIGH)
await asyncio.sleep(0.5)
except Exception as e:
LOGGER.error('Unable to set PiZiGate GPIO, please check configuration')
LOGGER.error(str(e))
def set_pizigate_running_mode():
LOGGER.info('Put PiZiGate in running mode')

gpio0 = UnclosableOutputDevice(pin=GPIO_PIN0, initial_state=None)
gpio2 = UnclosableOutputDevice(pin=GPIO_PIN2, initial_state=None)

gpio2.on()
time.sleep(0.5)

gpio0.off()
time.sleep(0.5)

gpio0.on()
time.sleep(0.5)


def set_pizigate_flashing_mode():
LOGGER.info('Put PiZiGate in flashing mode')

gpio0 = UnclosableOutputDevice(pin=GPIO_PIN0, initial_state=None)
gpio2 = UnclosableOutputDevice(pin=GPIO_PIN2, initial_state=None)

gpio2.off()
time.sleep(0.5)

gpio0.off()
time.sleep(0.5)

gpio0.on()
time.sleep(0.5)


def ftdi_set_bitmode(dev, bitmask):
'''
Set mode for ZiGate DIN module
'''
import usb

BITMODE_CBUS = 0x20
SIO_SET_BITMODE_REQUEST = 0x0b
bmRequestType = usb.util.build_request_type(usb.util.CTRL_OUT,
Expand All @@ -102,39 +125,53 @@ def ftdi_set_bitmode(dev, bitmask):
dev.ctrl_transfer(bmRequestType, SIO_SET_BITMODE_REQUEST, wValue)


async def set_zigatedin_running_mode():
try:
dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
LOGGER.error('ZiGate DIN not found.')
return
LOGGER.info('Put ZiGate DIN in running mode')
ftdi_set_bitmode(dev, 0xC8)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
await asyncio.sleep(0.5)
except Exception as e:
LOGGER.error('Unable to set FTDI bitmode, please check configuration')
LOGGER.error(str(e))


async def set_zigatedin_flashing_mode():
try:
dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
LOGGER.error('ZiGate DIN not found.')
return
LOGGER.info('Put ZiGate DIN in flashing mode')
ftdi_set_bitmode(dev, 0x00)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xC0)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xC4)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
await asyncio.sleep(0.5)
except Exception as e:
LOGGER.error('Unable to set FTDI bitmode, please check configuration')
LOGGER.error(str(e))
def set_zigatedin_running_mode():
import usb

dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
raise RuntimeError('ZiGate DIN not found.')

LOGGER.info('Put ZiGate DIN in running mode')
ftdi_set_bitmode(dev, 0xC8)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)


def set_zigatedin_flashing_mode():
import usb

dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
raise RuntimeError('ZiGate DIN not found.')

LOGGER.info('Put ZiGate DIN in flashing mode')
ftdi_set_bitmode(dev, 0x00)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xC0)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xC4)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)


def async_run_in_executor(function):
"""Decorator to make a sync function async."""

async def replacement(*args):
return asyncio.get_running_loop().run_in_executor(None, function, *args)

replacement._sync_func = function

return replacement


# Create async version of all of the above functions
async_set_pizigate_running_mode = async_run_in_executor(set_pizigate_running_mode)
async_set_pizigate_flashing_mode = async_run_in_executor(set_pizigate_flashing_mode)
async_set_zigatedin_running_mode = async_run_in_executor(set_zigatedin_running_mode)
async_set_zigatedin_flashing_mode = async_run_in_executor(set_zigatedin_flashing_mode)
Loading