-
-
Notifications
You must be signed in to change notification settings - Fork 32.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create a binary_sensor using the
gpiozero
library
The `gpiozero` library provides a unified interface for both local GPIO pins and remote GPIO pins (via `pigpiod`). This allows it to replace the standard `rpi_gpio` component while also adding new capabilities to easily interact with GPIO pins on networked Raspberry Pis.
- Loading branch information
Showing
4 changed files
with
349 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
""" | ||
Support for binary sensor using the RPi GPIO Zero library. | ||
This component can interact with both local GPIO pins and remote GPIO pins (via | ||
(pigpio)[http://abyz.me.uk/rpi/pigpio]. Local pin configuration is identical | ||
to the standard rpi_gpio component. To connect to a remote `pigpio` daemon | ||
use the `host` and `port` options, for example: | ||
binary_sensor: | ||
- platform: rpi_gpiozero | ||
host: 192.168.1.254 | ||
ports: | ||
18: Front Door | ||
19: Rear Door | ||
For more details about this platform, please refer to the documentation at | ||
https://home-assistant.io/components/binary_sensor.rpi_gpiozero/ | ||
""" | ||
import logging | ||
import threading | ||
|
||
import voluptuous as vol | ||
|
||
import homeassistant.components.rpi_gpiozero as rpi_gpiozero | ||
from homeassistant.components.binary_sensor import ( | ||
BinarySensorDevice, PLATFORM_SCHEMA) | ||
from homeassistant.const import DEVICE_DEFAULT_NAME | ||
import homeassistant.helpers.config_validation as cv | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
CONF_BOUNCETIME = 'bouncetime' | ||
CONF_INVERT_LOGIC = 'invert_logic' | ||
CONF_PORTS = 'ports' | ||
CONF_PULL_MODE = 'pull_mode' | ||
CONF_HOST = 'host' | ||
CONF_PORT = 'port' | ||
|
||
DEFAULT_BOUNCETIME = 50 | ||
DEFAULT_INVERT_LOGIC = False | ||
DEFAULT_PULL_MODE = 'UP' | ||
DEFAULT_HOST = '' | ||
DEFAULT_PORT = 8888 | ||
|
||
DEPENDENCIES = ['rpi_gpiozero'] | ||
|
||
_SENSORS_SCHEMA = vol.Schema({ | ||
cv.positive_int: cv.string, | ||
}) | ||
|
||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | ||
vol.Required(CONF_PORTS): _SENSORS_SCHEMA, | ||
vol.Optional(CONF_BOUNCETIME, default=DEFAULT_BOUNCETIME): cv.positive_int, | ||
vol.Optional(CONF_INVERT_LOGIC, default=DEFAULT_INVERT_LOGIC): cv.boolean, | ||
vol.Optional(CONF_PULL_MODE, default=DEFAULT_PULL_MODE): cv.string, | ||
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string, | ||
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.positive_int, | ||
}) | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def setup_platform(hass, config, add_devices, discovery_info=None): | ||
"""Set up the Raspberry PI GPIO devices.""" | ||
pull_mode = config.get(CONF_PULL_MODE) | ||
bouncetime = config.get(CONF_BOUNCETIME) | ||
invert_logic = config.get(CONF_INVERT_LOGIC) | ||
host = config.get(CONF_HOST) | ||
port = config.get(CONF_PORT) | ||
|
||
binary_sensors = [] | ||
ports = config.get('ports') | ||
for port_num, port_name in ports.items(): | ||
binary_sensors.append(RPiGPIOZeroBinarySensor( | ||
port_name, | ||
port_num, | ||
pull_mode, | ||
bouncetime, | ||
invert_logic, | ||
(host, port) | ||
)) | ||
add_devices(binary_sensors, True) | ||
|
||
|
||
class RPiGPIOZeroBinarySensor(BinarySensorDevice): | ||
"""Represent a binary sensor that uses Raspberry Pi GPIO via gpiozero""" | ||
|
||
def __init__(self, name, port, pull_mode, bouncetime, invert_logic, | ||
hostport): | ||
"""Initialize the RPi gpiozero binary sensor.""" | ||
# pylint: disable=no-member | ||
self._name = name or DEVICE_DEFAULT_NAME | ||
self._port = port | ||
self._pull_mode = pull_mode | ||
self._bouncetime = bouncetime | ||
self._invert_logic = invert_logic | ||
self._hostport = hostport | ||
self._state = None | ||
self._btn = None | ||
self._btn_lock = threading.Lock() | ||
|
||
@property | ||
def btn(self): | ||
self._btn_lock.acquire() | ||
try: | ||
if self._btn is None and self._hostport: | ||
|
||
_LOGGER.debug("creating button %s on port %s", | ||
self._name, self._port) | ||
self._btn = rpi_gpiozero.setup_button( | ||
self._port, | ||
self._pull_mode, | ||
self._bouncetime, | ||
self._hostport | ||
) | ||
|
||
if self._btn is None: | ||
_LOGGER.error("failed to create button %s on port %s", | ||
self._name, self._port) | ||
else: | ||
def on_change(device): | ||
"""Read state from GPIO.""" | ||
self._state = device.is_pressed | ||
_LOGGER.info("%s has changed to %s", | ||
self._name, self._state) | ||
self.schedule_update_ha_state() | ||
|
||
self._btn.when_pressed = on_change | ||
self._btn.when_released = on_change | ||
finally: | ||
self._btn_lock.release() | ||
|
||
return self._btn | ||
|
||
@property | ||
def should_poll(self): | ||
""" | ||
Polling isn't required for state changes, but it useful for tracking | ||
and restoring connectivity | ||
""" | ||
return True | ||
|
||
@property | ||
def name(self): | ||
"""Return the name of the sensor.""" | ||
return self._name | ||
|
||
@property | ||
def is_on(self): | ||
"""Return the state of the entity.""" | ||
return self._state != self._invert_logic | ||
|
||
@property | ||
def available(self): | ||
return self.btn is not None | ||
|
||
def _reset(self): | ||
self._btn = None | ||
return self.btn | ||
|
||
def update(self): | ||
"""Update the GPIO state.""" | ||
_LOGGER.info("Updating %s", self._name) | ||
if self.btn: | ||
try: | ||
if self.btn.closed: | ||
_LOGGER.exception("%s has been closed", self._name) | ||
self._reset() | ||
else: | ||
self._state = self.btn.is_pressed | ||
except: | ||
# If there are any errors during checking is_pressed | ||
# reset the _btn | ||
_LOGGER.exception("%s has failed to update", self._name) | ||
self._reset() | ||
else: | ||
self._state = False | ||
|
||
_LOGGER.info("%s has been updated to state %s", | ||
self._name, self._state) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
""" | ||
Support for controlling GPIO pins of a Raspberry Pi with gpiozero | ||
""" | ||
# pylint: disable=import-error | ||
import logging | ||
|
||
from homeassistant.const import ( | ||
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP) | ||
|
||
REQUIREMENTS = ['gpiozero==1.4.0', 'pigpio==1.38', 'RPi.GPIO==0.6.1'] | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
DOMAIN = 'rpi_gpiozero' | ||
|
||
_DEVICES = set() | ||
_REMOTE_FACTORY = {} | ||
_LOCAL_FACTORY = None | ||
|
||
|
||
# pylint: disable=no-member | ||
def setup(hass, config): | ||
"""Set up the Raspberry PI GPIO component.""" | ||
import os | ||
# Make the default pin factory 'mock' so that | ||
# it other pin factories can be loaded after import | ||
os.environ['GPIOZERO_PIN_FACTORY'] = 'mock' | ||
|
||
def cleanup_gpiozero(event): | ||
"""Stuff to do before stopping.""" | ||
for dev in _DEVICES: | ||
try: | ||
_LOGGER.info("closing device %s", dev) | ||
dev.close() | ||
except: | ||
_LOGGER.exception("unexpected error closing device %s", dev) | ||
_DEVICES.clear() | ||
|
||
def prepare_gpiozero(event): | ||
"""Stuff to do when home assistant starts.""" | ||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpiozero) | ||
|
||
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, prepare_gpiozero) | ||
return True | ||
|
||
|
||
def close_remote_pinfactory(hostport): | ||
global _REMOTE_FACTORY | ||
|
||
_LOGGER.info("closing pin_factory for %s", hostport) | ||
# Remove the pin_factory from our stored list | ||
pin_factory = _REMOTE_FACTORY.pop(hostport, None) | ||
if not pin_factory: | ||
return | ||
|
||
# Close and remove all devices associated with this pin factory | ||
for dev in list(_DEVICES): | ||
if dev.pin_factory == pin_factory: | ||
try: | ||
dev.close() | ||
except: | ||
_LOGGER.exception("error closing device") | ||
|
||
_DEVICES.remove(dev) | ||
|
||
# Close the pin_factory itself | ||
try: | ||
pin_factory.close() | ||
except: | ||
_LOGGER.exception("error closing pin factory") | ||
|
||
|
||
def get_remote_pinfactory(hostport, timeout=1): | ||
global _REMOTE_FACTORY | ||
|
||
pin_factory = _REMOTE_FACTORY.get(hostport) | ||
|
||
if pin_factory: | ||
try: | ||
tick = pin_factory._connection.get_current_tick() | ||
_LOGGER.info("checked pin_factory for %s : %s", hostport, tick) | ||
except Exception as e: | ||
_LOGGER.error("error checking pin_factory for %s due to", | ||
hostport, e) | ||
close_remote_pinfactory(hostport) | ||
pin_factory = None | ||
|
||
return pin_factory | ||
|
||
|
||
def get_pinfactory(hostport=None, timeout=1): | ||
""" | ||
Get the pinfactory for the configured hostport. | ||
:param hostport: the host/port tuple, when None local GPIO is used | ||
""" | ||
global _LOCAL_FACTORY, _REMOTE_FACTORY | ||
|
||
# TODO do we need any thread safety here? | ||
pin_factory = None | ||
|
||
if hostport and hostport[0]: | ||
from gpiozero.pins.pigpio import PiGPIOFactory | ||
pin_factory = get_remote_pinfactory(hostport, timeout) | ||
# if we don't have a pin_factory, create a new one | ||
if pin_factory is None: | ||
_LOGGER.info( | ||
"Creating pigpiod connection to %s:%s", | ||
hostport[0], | ||
hostport[1] | ||
) | ||
|
||
try: | ||
pin_factory = PiGPIOFactory( | ||
host=hostport[0], | ||
port=hostport[1] | ||
) | ||
# We set a timeout so that we can determine if the | ||
# connection dies | ||
pin_factory._connection.sl.s.settimeout(timeout) | ||
_REMOTE_FACTORY[hostport] = pin_factory | ||
except IOError as e: | ||
_LOGGER.error("error connecting to pigpio due to: %s", e) | ||
pin_factory = None | ||
else: | ||
from gpiozero.pins.rpigpio import RPiGPIOFactory | ||
if _LOCAL_FACTORY is None: | ||
_LOCAL_FACTORY = RPiGPIOFactory() | ||
pin_factory = _LOCAL_FACTORY | ||
return pin_factory | ||
|
||
|
||
def setup_button(port, pull_mode, bouncetime, hostport): | ||
""" | ||
Set up a GPIO as input (a.k.a Button in Gpiozero. | ||
:param port: the GPIO port using BCM numbering. | ||
:param pull_mode: 'UP' or 'DOWN' to pull the GPIO pin high or low. | ||
:param bouncetime: the software bounce compensation in msec. | ||
:param hostport: the remote host/port, None for local. | ||
""" | ||
from gpiozero import Button | ||
|
||
if pull_mode.upper() not in ('UP', 'DOWN'): | ||
raise ValueError("invalid pull_mode %s", pull_mode) | ||
|
||
if bouncetime < 0: | ||
raise ValueError("invalid bouncetime %s", bouncetime) | ||
|
||
pin_factory = get_pinfactory(hostport) | ||
if pin_factory is None: | ||
return None | ||
|
||
btn = Button( | ||
port, | ||
pull_up=(pull_mode.upper() == 'UP'), | ||
bounce_time=float(bouncetime) / 1e3, | ||
pin_factory=pin_factory | ||
) | ||
|
||
# add the button to the _DEVICES list so we can cleanup on shutdown | ||
_DEVICES.add(btn) | ||
|
||
return btn |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters