Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Note: Python API v0.0.4 is designed for use with SimpleFOC v2.3.3 or later
Features:

- Serial connections to SimpleFOC drivers
- **CAN bus connections via CANCommander protocol**
> Important: CAN support requires python-can package and a USB dongle that supports it (ex. SocketCAN).
- Control motors via Commander protocol
- Telemetry based on SimpleFOC monitoring abstraction
- Control motors via packet based protocol based on SimpleFOC Drivers Registers abstraction
Expand All @@ -34,11 +36,12 @@ Features:
1. Install python dependencies:

```
pip install pyserial rx
pip install pyserial rx python-can
```
or, on Debian:
```
sudo apt install python3-serial python3-rx
pip install python-can
```

2. Install PySimpleFOC:
Expand All @@ -51,7 +54,14 @@ pip install simplefoc

3. Set up your serial connection to the driver.

4. Decide on protocol to use: Commander or Packets, and if Packets, Text or Binary
**OR** set up your CAN bus connection (requires CAN hardware interface):
```bash
# Linux SocketCAN setup
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up
```

4. Decide on protocol to use: Commander, Packets (Text/Binary), or **CAN**

5. Write and run some python code (see our [examples](./examples/))

Expand Down Expand Up @@ -139,6 +149,8 @@ options:

## Library usage

### Serial Communication (Commander protocol)

Using the python API is quite simple, the following examples show use of the 'commander' protocol:

```python
Expand All @@ -151,6 +163,36 @@ motor = commander.full_control('M')
motor.set_target(10)
```

### CAN Bus Communication (CANCommander protocol)

For CAN bus communication using the CANCommander protocol:

```python
import simplefoc.can as can
from simplefoc import MotionControlType, TorqueControlType

# Connect to CAN bus
motors = can.can_bus(
interface='can0', # CAN interface name
target_address=0x05 # SimpleFOC driver's address
)
motors.connect()

# Get motor and control it
motor = motors.motor(0)
motor.set_mode(MotionControlType.torque, TorqueControlType.voltage)
motor.enable()
motor.set_target(2.0)

# Read motor state
velocity = motor.get_velocity()
angle = motor.get_angle()
```

See [examples/can/README.md](examples/can/README.md) for detailed CAN setup and usage.

### Common Motor Control

Various easy to use methods are pre-defined to interact with the motor:

```python
Expand Down
47 changes: 47 additions & 0 deletions examples/can/minimal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""Minimal CAN example for SimpleFOC"""

import simplefoc.packets as packets
from simplefoc import motors
import time

# Connect to CAN
motors = motors.can('can0', target_address=0x01, bitrate=1000000)
motors.connect()

# Get motor
motor = motors.motor(0)

# Control
motor.enable()
motor.set_target(2.0)

angle = motor.get_angle()
print(f"Angle: {angle}")

velocity = motor.get_velocity()
print(f"Velocity: {velocity}")

torque_mode = motor.get_torque_mode()
print(f"Torque Control: {torque_mode}")
motion_mode = motor.get_motion_control_type()
print(f"Motion Control: {motion_mode}")

velocity_pid_params = motor.get_velocity_pid()
print(f"Velocity PID params: {velocity_pid_params}")

time.sleep(1)

print("Setting Angle PID to P:10.0, I:2.0, D:0.01")
motor.set_angle_pid(10.0, 2.0, 0.01)

time.sleep(1)
angle_pid_params = motor.get_angle_pid()
print(f"Angle PID params: {angle_pid_params}")

time.sleep(1)
motor_params = motor.get_motor_parameters()
print(f"Motor parameters: {motor_params}")

# Cleanup
motors.disconnect()
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
author='Richard Unger',
author_email="runger@simplefoc.com",
install_requires=['serial', 'rx'],
extras_require={
'can': ['python-can>=4.0.0'],
},
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
Expand Down
38 changes: 38 additions & 0 deletions simplefoc/motor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

from .registers import SimpleFOCRegisters
from rx import operators as ops
from simplefoc import TorqueControlType, MotionControlType

class Motor:
""" SimpleFOC Motor class
Expand Down Expand Up @@ -75,3 +76,40 @@ def enable(self, enable:bool=True):

def get_angle(self, timeout:float=1.0):
return self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_ANGLE, timeout)

def get_velocity(self, timeout:float=1.0):
return self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_VELOCITY, timeout)

def get_torque(self, timeout:float=1.0):
return self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_CURRENT_Q, timeout)

def get_torque_mode(self, timeout:float=1.0):
return TorqueControlType(self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_TORQUE_MODE, timeout))

def get_motion_control_type(self, timeout:float=1.0):
return MotionControlType(self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_CONTROL_MODE, timeout))

def get_angle_pid(self, timeout:float=1.0):
p = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_ANG_PID_P, timeout)
i = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_ANG_PID_I, timeout)
d = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_ANG_PID_D, timeout)
return (p,i,d)

def get_velocity_pid(self, timeout:float=1.0):
p = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_VEL_PID_P, timeout)
i = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_VEL_PID_I, timeout)
d = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_VEL_PID_D, timeout)
return (p,i,d)


def get_motor_parameters(self, timeout:float=1.0):
resistance = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_PHASE_RESISTANCE, timeout)
kv = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_KV, timeout)
inductance = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_PHASE_INDUCTANCE, timeout)
pole_pairs = self.motors.get_register(self.motor_id, SimpleFOCRegisters.REG_POLE_PAIRS, timeout)
return {
"pole_pairs": pole_pairs if pole_pairs != -1 else None,
"resistance": resistance if resistance != -12345.0 else None,
"kv": kv if kv != -12345.0 else None,
"inductance": inductance if inductance != -12345.0 else None
}
112 changes: 104 additions & 8 deletions simplefoc/motors.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,58 @@

from enum import Enum
import time
from .registers import parse_register, Register, SimpleFOCRegisters
from .motor import Motor
from .telemetry import Telemetry
import serial as ser
from rx import operators as ops, Observable
from simplefoc import FrameType, Frame
from .packets import ASCIIComms, BinaryComms, CANComms, ProtocolType


try:
import can as python_can
except ImportError:
print("WARNING: python-can not installed. Install with: pip install python-can if useing CAN communication.")
python_can = None



def serial(port, baud, protocol=ProtocolType.binary):
""" Create a serial connection to a SimpleFOC driver, and return a Motors instance to interact with it.
The connection is packet-based, and uses either ASCII or binary protocol.

Note that the serial connection is not opened until you call motors.connect().

@param port: the serial port to connect to
@param baud: the baud rate to use
@param protocol: the protocol to use (binary or ascii)
"""
ser_conn = ser.Serial()
ser_conn.port = port
ser_conn.baudrate = baud
comms = None
if protocol == ProtocolType.binary:
comms = BinaryComms(ser_conn)
elif protocol == ProtocolType.ascii:
comms = ASCIIComms(ser_conn)
else:
raise ValueError("Unknown protocol type")
return Motors(comms)


def can(channel, target_address, bitrate=1000000, interface='socketcan'):
""" Create a CAN connection to a SimpleFOC driver using CANCommander protocol.

@param channel: CAN interface (e.g., 'can0')
@param target_address: Target node address (0x00-0xFF)
@param bitrate: CAN bitrate in bps (default 1000000)
@param bustype: CAN bus type (default 'socketcan')
"""

can_bus = python_can.interface.Bus(channel=channel, interface=interface, bitrate=bitrate)
return Motors(CANComms(can_bus, target_address))


class Motors(object):
""" SimpleFOC Motors class
Expand Down Expand Up @@ -50,7 +97,7 @@ def set_register(self, motor_id:int, reg:int|str|Register, values):
elif self.current_motor != motor_id:
self.connection.send_frame(Frame(frame_type=FrameType.REGISTER, register=SimpleFOCRegisters.REG_MOTOR_ADDRESS, values=[motor_id]))
self.current_motor = motor_id # optimistic
self.connection.send_frame(Frame(frame_type=FrameType.REGISTER, register=reg, values=values))
self.connection.send_frame(Frame(frame_type=FrameType.REGISTER, register=reg, values=values, motor_id=motor_id))

def set_register_with_response(self, motor_id:int, reg:int|str|Register, values, timeout=1.0):
self.set_register(motor_id, reg, values)
Expand All @@ -59,14 +106,48 @@ def set_register_with_response(self, motor_id:int, reg:int|str|Register, values,
def get_register(self, motor_id:int, reg:int|str|Register, timeout=None):
if not isinstance(reg, Register):
reg = parse_register(reg)
if self.current_motor != motor_id:
self.connection.send_frame(Frame(frame_type=FrameType.REGISTER, register=SimpleFOCRegisters.REG_MOTOR_ADDRESS, values=[motor_id]))
self.current_motor = motor_id
self.connection.send_frame(Frame(frame_type=FrameType.REGISTER, register=reg, values=[]))
if timeout is not None:
return self.get_response(motor_id, reg, timeout)

if type(self.connection) is CANComms:

if timeout is None:
timeout = 1.0

result = {"value": None}
def on_next(v):
result["value"] = v
# subscribe FIRST so we don't miss the response
exp = self.expect_response(motor_id, reg, timeout)
sub = exp.subscribe(on_next)

try:
# send the request
self.connection.send_frame(Frame(frame_type=FrameType.REGISTER, register=reg, values=[], motor_id=motor_id))

# wait for response
start = time.time()
while result["value"] is None:
if (time.time() - start) > timeout:
raise TimeoutError(
f"Timeout waiting for response for register {reg} on motor {motor_id}"
)
time.sleep(0.001)

return result["value"]

finally:
sub.dispose()

else:
if self.current_motor != motor_id:
self.connection.send_frame(Frame(frame_type=FrameType.REGISTER, register=SimpleFOCRegisters.REG_MOTOR_ADDRESS, values=[motor_id]))
self.current_motor = motor_id
self.connection.send_frame(Frame(frame_type=FrameType.REGISTER, register=reg, values=[]))
if timeout is not None:
return self.get_response(motor_id, reg, timeout)


def get_response(self, motor_id:int, reg:int|str|Register, timeout=1.0):
self.reg_val = None
return self.observable().pipe(
ops.filter(lambda p: p.frame_type == FrameType.RESPONSE and p.motor_id == motor_id and p.register == reg),
ops.filter(lambda p: p.values is not None and len(p.values) > 0),
Expand All @@ -75,6 +156,17 @@ def get_response(self, motor_id:int, reg:int|str|Register, timeout=1.0):
ops.timeout(float(timeout))
).run()


def expect_response(self, motor_id:int, reg:int|str|Register, timeout=1.0) -> Observable:
return self.observable().pipe(
ops.filter(lambda p: p.frame_type == FrameType.RESPONSE and p.motor_id == motor_id and p.register == reg),
ops.filter(lambda p: p.values is not None and len(p.values) > 0),
ops.first(),
ops.map(lambda p: p.values if len(p.values) > 1 else p.values[0]),
ops.timeout(float(timeout))
)


def telemetry(self) -> Telemetry:
return Telemetry(self)

Expand All @@ -93,5 +185,9 @@ def __set_motor_id(self, packet):
if packet.frame_type == FrameType.RESPONSE:
if packet.register == SimpleFOCRegisters.REG_MOTOR_ADDRESS:
self.current_motor = packet.values[0]
packet.motor_id = self.current_motor

# Only override motor_id if it's ot already set (serial/binary canse)
if not hasattr(packet, "motor_id"):
packet.motor_id = self.current_motor

return packet
Loading