Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Accelstepper support #106

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 165 additions & 4 deletions pyfirmata/pyfirmata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import time

import serial
import math

from .util import pin_list_to_board_dict, to_two_bytes, two_byte_iter_to_str
from .util import pin_list_to_board_dict, to_two_bytes, two_byte_iter_to_str, write_signed_long, read_signed_long, write_float

# Message command bytes (0x80(128) to 0xFF(255)) - straight from Firmata.h
DIGITAL_MESSAGE = 0x90 # send data for a digital pin
Expand Down Expand Up @@ -34,7 +35,20 @@
ANALOG_MAPPING_QUERY = 0x69 # ask for mapping of analog to pin numbers
ANALOG_MAPPING_RESPONSE = 0x6A # reply with mapping info

SERVO_CONFIG = 0x70 # set max angle, minPulse, maxPulse, freq
SERVO_CONFIG = 0x70 # set max angle, minPulse, maxPulse, freq

ACCELSTEPPER_CONFIG = 0x00 # start stepper configuration message
ACCELSTEPPER_DATA = 0x62 # send and receive accelstepper data
ACCELSTEPPER_ZERO = 0x01 # Zero the stepper position
ACCELSTEPPER_STEP = 0x02 # step the stepper, moving relative to current position
ACCELSTEPPER_TO = 0x03 # step the stepper to an absolute position
ACCELSTEPPER_ENABLE = 0x04 # enable the stepper motor.
ACCELSTEPPER_STOP = 0x05 # stop the stepper motor
ACCELSTEPPER_REPORT_POSITION = 0x06 # Send and receive stepper position
ACCELSTEPPER_SET_ACCELERATION = 0x08 # sets the stepper acceleration
ACCELSTEPPER_SET_SPEED = 0x09 # sets the stepper speed
ACCELSTEPPER_MOVE_COMPLETE = 0x0A # sent when stepper completes move, reports position

STRING_DATA = 0x71 # a string message with 14-bits per char
SHIFT_DATA = 0x75 # a bitstream to/from a shift register
I2C_REQUEST = 0x76 # send an I2C read/write request
Expand Down Expand Up @@ -94,6 +108,8 @@ def __init__(self, port, layout=None, baudrate=57600, name=None, timeout=None):
self.pass_time(BOARD_SETUP_WAIT_TIME)
self.name = name
self._layout = layout
self.stepper = []
self.num_steppers = 0
if not self.name:
self.name = port

Expand Down Expand Up @@ -162,6 +178,8 @@ def _set_default_handlers(self):
self.add_cmd_handler(DIGITAL_MESSAGE, self._handle_digital_message)
self.add_cmd_handler(REPORT_VERSION, self._handle_report_version)
self.add_cmd_handler(REPORT_FIRMWARE, self._handle_report_firmware)
self.add_cmd_handler(ACCELSTEPPER_REPORT_POSITION, self._handle_report_stepper_position)
self.add_cmd_handler(ACCELSTEPPER_MOVE_COMPLETE, self._handle_stepper_move_complete)

def auto_setup(self):
"""
Expand Down Expand Up @@ -272,7 +290,7 @@ def iterate(self):
received_data = []
handler = None
if data < START_SYSEX:
# These commands can have 'channel data' like a pin nummber appended.
# These commands can have 'channel data' like a pin number appended.
try:
handler = self._command_handlers[data & 0xF0]
except KeyError:
Expand All @@ -282,7 +300,11 @@ def iterate(self):
received_data.append(ord(self.sp.read()))
elif data == START_SYSEX:
data = ord(self.sp.read())
handler = self._command_handlers.get(data)
if data == ACCELSTEPPER_DATA:
data = ord(self.sp.read())
handler = self._command_handlers.get(data)
else:
handler = self._command_handlers.get(data)
if not handler:
return
data = ord(self.sp.read())
Expand Down Expand Up @@ -327,6 +349,31 @@ def servo_config(self, pin, min_pulse=544, max_pulse=2400, angle=0):
self.digital[pin]._mode = SERVO
self.digital[pin].write(angle)

def stepper_config(self, wire_count, step_type, pins, invert_pins, has_enable=True):
device_num = self.num_steppers
if device_num > 9:
raise IOError("Only 10 stepper motors are supported")
elif wire_count > 4:
raise IOError("Only stepper drivers or 2-4 wire steppers supported")
self.num_steppers += 1
data = bytearray([ACCELSTEPPER_CONFIG, device_num])
interface = 0 | (wire_count << 4)
step_size = int(math.log(step_type, 0.5)) << 1
interface |= step_size
if has_enable:
interface |= 1
data += bytearray([interface])
data += bytearray(pins)
invert = 0
for n, p in enumerate(invert_pins):
if p:
invert |= int(math.pow(2, n))
if invert > 0:
data += bytearray([invert])
self.send_sysex(ACCELSTEPPER_DATA, data)
self.stepper.append(StepperMotor(self, device_num, has_enable))
self.num_steppers += 1

def exit(self):
"""Call this to exit cleanly."""
# First detach all servo's, otherwise it somehow doesn't want to close...
Expand Down Expand Up @@ -383,6 +430,23 @@ def _handle_report_capability_response(self, *data):

self._layout = pin_list_to_board_dict(pin_spec_list)

def _handle_report_stepper_position(self, *data):
"""
Updates the stepper position from signed long returned by the board.
"""
device_num = data[0]
self.stepper[device_num].position = read_signed_long(data[1:])

def _handle_stepper_move_complete(self, *data):
"""
Updates the stepper position from signed long returned by the board
after movement complete. Toggles stepper moving flag to False.
"""
device_num = data[0]
stepper = self.stepper[device_num]
stepper.position = read_signed_long(data[1:])
stepper.moving = False


class Port(object):
"""An 8-bit port on the board."""
Expand Down Expand Up @@ -547,3 +611,100 @@ def write(self, value):
value = int(value)
msg = bytearray([ANALOG_MESSAGE + self.pin_number, value % 128, value >> 7])
self.board.sp.write(msg)


class StepperMotor:
def __init__(self, board, device_num, has_enable):
"""
Class to expose AccelStepper-specific methods.
"""
self.board = board
self.device_num = device_num
self.has_enable = has_enable
self.acceleration = 0.0
self.speed = 0.0
self.moving = False
self.enabled = True
self.position = 0

def zero(self):
"""
Sends sysex instruction to set stepper position to zero.
"""
data = bytearray([ACCELSTEPPER_ZERO, self.device_num])
self.board.send_sysex(ACCELSTEPPER_DATA, data)

def step(self, steps):
"""
Sends a sysex instruction to step the motor relative to the current position by steps.
Direction can be reversed using negative steps.
"""
data = bytearray([ACCELSTEPPER_STEP, self.device_num])
steps_sl = write_signed_long(steps)
if steps_sl is None:
return
data += steps_sl
self.board.send_sysex(ACCELSTEPPER_DATA, data)
self.moving = True

def move_to(self, position):
"""
Sends a sysex instruction to step the motor to reach the desired position.
"""
data = bytearray([ACCELSTEPPER_TO, self.device_num])
position_sl = write_signed_long(position)
if position_sl is None:
return
data += bytearray(position_sl)
self.board.send_sysex(ACCELSTEPPER_DATA, data)
self.moving = True

def enable(self, enable=False):
"""
Sends a sysex instruction to enable the motor. Can be used to save power
and reduce overheating.
"""
if enable != self.enabled and self.has_enable:
data = bytearray([ACCELSTEPPER_ENABLE, self.device_num, enable])
self.enabled = enable
self.board.send_sysex(ACCELSTEPPER_DATA, data)

def stop(self):
"""
Sends a sysex instruction to stop the motor. Motor moving flag will be updated
by Board._handle_move_complete
"""
data = bytearray([ACCELSTEPPER_STOP, self.device_num])
self.board.send_sysex(ACCELSTEPPER_DATA, data)

def request_position(self):
"""
Sends a sysex instruction to query the motor's current position.
"""
data = bytearray([ACCELSTEPPER_REPORT_POSITION, self.device_num])
self.board.send_sysex(ACCELSTEPPER_DATA, data)

def set_acceleration(self, acceleration):
"""
Sends a sysex instruction to set the acceleration.
"""
data = bytearray([ACCELSTEPPER_SET_ACCELERATION, self.device_num])
custom_float = write_float(acceleration)
if custom_float is None:
return
data += custom_float
self.board.send_sysex(ACCELSTEPPER_DATA, data)
self.acceleration = acceleration

def set_speed(self, speed):
"""
Sends a sysex instruction to set the motor speed. Should be called before
calling step or move_to, otherwise max speed will be 1.0 step/sec.
"""
data = bytearray([ACCELSTEPPER_SET_SPEED, self.device_num])
custom_float = write_float(speed)
if custom_float is None:
return
data += custom_float
self.board.send_sysex(ACCELSTEPPER_DATA, data)
self.speed = speed
72 changes: 72 additions & 0 deletions pyfirmata/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,78 @@ def break_to_bytes(value):
return (c, int(value / c))


def read_signed_long(data):
"""
Read a signed long from the board.
Signed long format is 31 bits for the number plus a sign bit.
Returns an integer.
"""
long = data[0] | data[1] << 7 | data[2] << 14 | data[3] << 21
long |= ((data[4] << 28) & 0x07)
if data[-1] >> 3 == 1:
long = -long
return long


def write_signed_long(number):
"""
Converts a signed long into a bytearray.
Format is 31 bits for the number plus a sign bit.
Returns a bytearray
"""
parsed_numbers = []
mask = 127
negative = False
if number > 0x7fffffff or number < -0x80000000:
return
if number < 0:
number *= -1
negative = True
for i in range(5):
byte = (number >> (i * 7)) & mask
parsed_numbers.append(byte)
if negative:
parsed_numbers[-1] |= 0x08
return bytearray(parsed_numbers)


def write_float(number):
"""
Converts a floating point number to the Firmata custom float format.
Returns None if number cannot be converted, otherwise returns bytearray.
"""
args = []
mask = 0x7f
i = 0
sign = 0
if number < 0:
sign = 1
number *= -1
if number < 1:
while number < 1 or i > -12:
number *= 10
i -= 1
if number % 10 == 0 or number > 8388608:
number /= 10
i += 1
break
else:
while number > 8388607 or number % 10 == 0:
number /= 10
i += 1
if i > 3:
break
if number > 8388607 or number < 1:
return
number = int(number)
exponent = i + 11
args.extend([number & mask,
(number >> 7) & mask,
(number >> 14) & mask,
(number >> 21) & 0x03 | (exponent & 0x0f) << 2 | (sign & 0x01) << 6])
return bytearray(args)


def pin_list_to_board_dict(pinlist):
"""
Capability Response codes:
Expand Down