Skip to content

Commit

Permalink
Board setup - better pins and modes handling
Browse files Browse the repository at this point in the history
* Enhanced autoconfig, which handles capabilities query with mixed modes
  (fixes issue #51)
* Pins can be both digital and analog pin (analog pins could be often
  used as digital pins)
* All pins are saved into global board.pins list, each pins exists in
  one instance and in board.digital/board.analog are references to the
  same objects
  • Loading branch information
setnicka committed Dec 15, 2016
1 parent 7a9ef22 commit baaed16
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 130 deletions.
1 change: 1 addition & 0 deletions pyfirmata/mockup.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def start(self):
def stop(self):
pass


if __name__ == '__main__':
import doctest
doctest.testmod()
Expand Down
194 changes: 135 additions & 59 deletions pyfirmata/pyfirmata.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Module for interaction over Firmata protocol."""
from __future__ import division, unicode_literals

import inspect
import time

import serial

from .util import pin_list_to_board_dict, to_two_bytes, two_byte_iter_to_str
from .util import to_two_bytes, two_byte_iter_to_str

# Message command bytes (0x80(128) to 0xFF(255)) - straight from Firmata.h
DIGITAL_MESSAGE = 0x90 # send data for a digital pin
Expand Down Expand Up @@ -54,6 +55,16 @@
ANALOG = 2 # analog pin in analogInput mode
PWM = 3 # digital pin in PWM output mode
SERVO = 4 # digital pin in SERVO mode
I2C = 6 # pin belonging to I2C pair

modes_names = {
INPUT: "INPUT",
OUTPUT: "OUTPUT",
ANALOG: "ANALOG",
PWM: "PWM",
SERVO: "SERVO",
I2C: "I2C"
}

# Pin types
DIGITAL = OUTPUT # same as OUTPUT below
Expand Down Expand Up @@ -122,37 +133,67 @@ def __del__(self):
def send_as_two_bytes(self, val):
self.sp.write(bytearray([val % 128, val >> 7]))

def _get_pin(self, pin_number):
while len(self.pins) <= pin_number:
self.pins.append(None)
if self.pins[pin_number] is None:
self.pins[pin_number] = Pin(self, pin_number)
return self.pins[pin_number]

def _setup_pins(self, pins, mode, append_to=None, default_res=1):
for pin_def in pins:
if type(pin_def) == tuple:
(pin_number, res) = pin_def
else:
# TODO: Use resolution
# (pin_number, res) = (pin_def, default_res)
pin_number = pin_def

pin = self._get_pin(pin_number)
pin.supported_modes.append(mode)
if append_to is not None:
append_to.append(pin)

def setup_layout(self, board_layout):
"""
Setup the Pin instances based on the given board layout.
"""
# Create pin instances based on board layout

self.pins = []
# Analog:
self.analog = []
for i in board_layout['analog']:
self.analog.append(Pin(self, i))
self._setup_pins(board_layout['analog'], ANALOG, append_to=self.analog, default_res=10)
for i, a in enumerate(self.analog):
a.analog_pin_number = i

# Digital input:
if 'digital_input' not in board_layout:
board_layout['digital_input'] = board_layout['digital']
self._setup_pins(board_layout['digital_input'], INPUT)

# Digital output:
if 'digital_output' not in board_layout:
board_layout['digital_output'] = board_layout['digital']
self._setup_pins(board_layout['digital_output'], OUTPUT)

self.digital = []
for p in sorted(set(board_layout['digital_input'] + board_layout['digital_output'])):
self.digital.append(self.pins[p])

# Setup pins into ports
self.digital_ports = []
for i in range(0, len(board_layout['digital']), 8):
num_pins = len(board_layout['digital'][i:i + 8])
for i in range(0, len(self.digital), 8):
port_number = int(i / 8)
self.digital_ports.append(Port(self, port_number, num_pins))
self.digital_ports.append(Port(self, port_number, self.digital[i:i + 8]))

# Allow to access the Pin instances directly
for port in self.digital_ports:
self.digital += port.pins

# Setup PWM pins
for i in board_layout['pwm']:
self.digital[i].PWM_CAPABLE = True
self._setup_pins(board_layout['pwm'], PWM, default_res=8)
self._setup_pins(board_layout['servo'], SERVO, default_res=14)

# Disable certain ports like Rx/Tx and crystal ports
for i in board_layout['disabled']:
self.digital[i].mode = UNAVAILABLE

# Create a dictionary of 'taken' pins. Used by the get_pin method
self.taken = {'analog': dict(map(lambda p: (p.pin_number, False), self.analog)),
'digital': dict(map(lambda p: (p.pin_number, False), self.digital))}
if i < len(self.pins):
self.pins[i].mode = UNAVAILABLE

self._set_default_handlers()

Expand Down Expand Up @@ -221,21 +262,27 @@ def get_pin(self, pin_def):
raise InvalidPinDefError('Invalid pin definition: '
'UNAVAILABLE pin {0} at position on {1}'
.format(pin_def, self.name))
if self.taken[a_d][pin_nr]:
pin = part[pin_nr]
if pin.taken:
raise PinAlreadyTakenError('{0} pin {1} is already taken on {2}'
.format(a_d, bits[1], self.name))
# ok, should be available
pin = part[pin_nr]
self.taken[a_d][pin_nr] = True
if pin.type is DIGITAL:
pin.taken = True
if a_d == 'analog':
pin.mode = ANALOG
pin.enable_reporting()
else:
if bits[2] == 'p':
pin.mode = PWM
elif bits[2] == 's':
pin.mode = SERVO
elif bits[2] != 'o':
elif bits[2] == 'o':
pin.mode = OUTPUT
elif bits[2] == 'i':
pin.mode = INPUT
else:
pin.enable_reporting()
else:
raise InvalidPinDefError('Invalid pin definition: {0} at position 3 on {1}'
.format(pin_def, self.name))
return pin

def pass_time(self, t):
Expand Down Expand Up @@ -368,33 +415,60 @@ def _handle_report_firmware(self, *data):
self.firmware = two_byte_iter_to_str(data[2:])

def _handle_report_capability_response(self, *data):
charbuffer = []
pin_spec_list = []

capabilities = []

board_dict = {
'digital_input': [],
'digital_output': [],
'analog': [],
'pwm': [],
'servo': [], # 2.2 specs
'i2c': [], # 2.3 specs
'disabled': [],
}
pin = 0
for c in data:
if c == CAPABILITY_RESPONSE:
continue

charbuffer.append(c)
if c == 0x7F:
# A copy of charbuffer
pin_spec_list.append(charbuffer[:])
charbuffer = []

self._layout = pin_list_to_board_dict(pin_spec_list)
if c != 0x7F:
capabilities.append(c)
continue
# Append new pin definiton and free capabilities for next pin
if len(capabilities) == 0:
board_dict['disabled'].append(pin)
for j, val in enumerate(capabilities):
if j % 2 == 0:
mode = val
else:
if mode == INPUT and val == 1:
board_dict['digital_input'].append(pin)
elif mode == OUTPUT and val == 1:
board_dict['digital_output'].append(pin)
elif mode == ANALOG:
board_dict['analog'].append((pin, val))
elif mode == PWM:
board_dict['pwm'].append((pin, val))
elif mode == SERVO:
board_dict['servo'].append((pin, val))
elif mode == I2C and val == 1:
board_dict['i2c'].append(pin)
pin += 1
capabilities = []

self._layout = board_dict
self.setup_layout(board_dict)


class Port(object):
"""An 8-bit port on the board."""
def __init__(self, board, port_number, num_pins=8):
def __init__(self, board, port_number, pins):
self.board = board
self.port_number = port_number
self.reporting = False

self.pins = []
for i in range(num_pins):
pin_nr = i + self.port_number * 8
self.pins.append(Pin(self.board, pin_nr, type=DIGITAL, port=self))
self.pins = pins
for pin in pins:
pin.port = self

def __str__(self):
return "Digital Port {0.port_number} on {0.board}".format(self)
Expand Down Expand Up @@ -440,33 +514,32 @@ def _update(self, mask):

class Pin(object):
"""A Pin representation"""
def __init__(self, board, pin_number, type=ANALOG, port=None):
def __init__(self, board, pin_number, port=None, analog_pin_number=None, supported_modes=[]):
self.board = board
self.pin_number = pin_number
self.type = type
self.analog_pin_number = analog_pin_number
self.port = port
self.PWM_CAPABLE = False
self._mode = (type == DIGITAL and OUTPUT or INPUT)
self.supported_modes = supported_modes
self._mode = None # Until first usage
self.reporting = False
self.taken = False
self.value = None

def __str__(self):
type = {ANALOG: 'Analog', DIGITAL: 'Digital'}[self.type]
return "{0} pin {1}".format(type, self.pin_number)
if self.analog_pin_number is not None:
return "Pin {0} (analog pin A{1})".format(self.pin_number, self.analog_pin_number)
else:
return "Digital pin {0}".format(self.pin_number)

def _set_mode(self, mode):
if mode is UNAVAILABLE:
self._mode = UNAVAILABLE
return
if self._mode is UNAVAILABLE:
if self.mode is UNAVAILABLE:
raise IOError("{0} can not be used through Firmata".format(self))
if mode is PWM and not self.PWM_CAPABLE:
raise IOError("{0} does not have PWM capabilities".format(self))
if mode not in self.supported_modes:
raise IOError("{0} does not support {0} mode".format(modes_names[mode]))
if mode == SERVO:
if self.type != DIGITAL:
raise IOError("Only digital pins can drive servos! {0} is not"
"digital".format(self))
self._mode = SERVO
self.board.servo_config(self.pin_number)
return

Expand All @@ -487,19 +560,19 @@ def _get_mode(self):

def enable_reporting(self):
"""Set an input pin to report values."""
if self.mode is not INPUT:
raise IOError("{0} is not an input and can therefore not report".format(self))
if self.type == ANALOG:
if self.mode == ANALOG:
self.reporting = True
msg = bytearray([REPORT_ANALOG + self.pin_number, 1])
self.board.sp.write(msg)
else:
elif self.mode == INPUT:
self.port.enable_reporting()
# TODO This is not going to work for non-optimized boards like Mega
else:
raise IOError("{0} is not an input and can therefore not report".format(self))

def disable_reporting(self):
"""Disable the reporting of an input pin."""
if self.type == ANALOG:
if self.mode == ANALOG:
self.reporting = False
msg = bytearray([REPORT_ANALOG + self.pin_number, 0])
self.board.sp.write(msg)
Expand Down Expand Up @@ -531,6 +604,9 @@ def write(self, value):
if self.mode is INPUT:
raise IOError("{0} is set up as an INPUT and can therefore not be written to"
.format(self))
if self.mode is ANALOG:
raise IOError("{0} is set up as an ANALOG and can therefore not be written to"
.format(self))
if value is not self.value:
self.value = value
if self.mode is OUTPUT:
Expand Down
71 changes: 0 additions & 71 deletions pyfirmata/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,74 +150,3 @@ def break_to_bytes(value):
if rest < least[1]:
least = (c, rest)
return (c, int(value / c))


def pin_list_to_board_dict(pinlist):
"""
Capability Response codes:
INPUT: 0, 1
OUTPUT: 1, 1
ANALOG: 2, 10
PWM: 3, 8
SERV0: 4, 14
I2C: 6, 1
"""

board_dict = {
'digital': [],
'analog': [],
'pwm': [],
'servo': [], # 2.2 specs
# 'i2c': [], # 2.3 specs
'disabled': [],
}
for i, pin in enumerate(pinlist):
pin.pop() # removes the 0x79 on end
if not pin:
board_dict['disabled'] += [i]
board_dict['digital'] += [i]
continue

for j, _ in enumerate(pin):
# Iterate over evens
if j % 2 == 0:
# This is safe. try: range(10)[5:50]
if pin[j:j + 4] == [0, 1, 1, 1]:
board_dict['digital'] += [i]

if pin[j:j + 2] == [2, 10]:
board_dict['analog'] += [i]

if pin[j:j + 2] == [3, 8]:
board_dict['pwm'] += [i]

if pin[j:j + 2] == [4, 14]:
board_dict['servo'] += [i]

# Desable I2C
if pin[j:j + 2] == [6, 1]:
pass

# We have to deal with analog pins:
# - (14, 15, 16, 17, 18, 19)
# + (0, 1, 2, 3, 4, 5)
diff = set(board_dict['digital']) - set(board_dict['analog'])
board_dict['analog'] = [n for n, _ in enumerate(board_dict['analog'])]

# Digital pin problems:
# - (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
# + (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)

board_dict['digital'] = [n for n, _ in enumerate(diff)]
# Based on lib Arduino 0017
board_dict['servo'] = board_dict['digital']

# Turn lists into tuples
# Using dict for Python 2.6 compatibility
board_dict = dict([
(key, tuple(value))
for key, value
in board_dict.items()
])

return board_dict

0 comments on commit baaed16

Please sign in to comment.