forked from micropython/micropython
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Open
Milestone
Description
Issue
I’m using CircuitPython on nRF52840 with RFM9x (LoRa) and an SD card sharing the same SPI bus, stacked all boards one another.
My device behaves like a Class C gateway:
- LoRa is in continuous receive mode (
receive(timeout=0.1)) - Incoming sensor data is queued and logged to an SD card
- I tried to use separate SPI buses, but it gives the error "SPI in USE."
SD writes frequently fail with:
OSError: [Errno 5] Input/output error
This happens even when:
- Logs are queued in RAM
- SD writes are retried later
spi.try_lock()/spi.unlock()are used
It appears that the LoRa radio accesses SPI so frequently that the SD card never gets a clean window.
Question
What is the recommended approach in CircuitPython to safely share SPI between:
- A continuously listening RFM9x
- An SD card used for logging?
Specifically:
- Should LoRa RX be temporarily stopped while writing to SD?
- Is batching SD writes required?
- Is there a known SPI access pattern or timing strategy for this case?
Context
This project is a LoRa controller/gateway that must:
- Always listen for packets (Class C–like behavior)
- Reliably log data to an SD card
Any guidance or best practices would be appreciated.
Below is my code:
import time
import board
import busio
import digitalio
import adafruit_rfm9x
import adafruit_ssd1306
import neopixel
import json
import storage
import adafruit_sdcard
import os
from adafruit_pcf8523.pcf8523 import PCF8523
from collections import deque
# ----------------------------
# LoRa setup
# ----------------------------
spi = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO)
cs = digitalio.DigitalInOut(board.D11)
reset = digitalio.DigitalInOut(board.D12)
rfm9x = adafruit_rfm9x.RFM9x(spi, cs, reset, 433.0)
# ----------------------------
# OLED setup
# ----------------------------
i2c = busio.I2C(board.SCL, board.SDA)
oled = adafruit_ssd1306.SSD1306_I2C(128, 32, i2c)
# ----------------------------
# NeoPixel setup
# ----------------------------
pixels = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.3)
pixels.fill((0,0,0))
# ----------------------------
# Buttons setup
# ----------------------------
button_a = digitalio.DigitalInOut(board.D9)
button_a.switch_to_input(pull=digitalio.Pull.UP)
button_b = digitalio.DigitalInOut(board.D6)
button_b.switch_to_input(pull=digitalio.Pull.UP)
button_c = digitalio.DigitalInOut(board.D5)
button_c.switch_to_input(pull=digitalio.Pull.UP)
# ----------------------------
# SD Card setup
# ----------------------------
SD_CS = board.D10
cs_sd = digitalio.DigitalInOut(SD_CS)
sdcard = adafruit_sdcard.SDCard(spi, cs_sd)
vfs = storage.VfsFat(sdcard)
storage.mount(vfs, "/sd")
csv_file = "/sd/waste_controller_logs.csv"
if "waste_controller_logs.csv" not in os.listdir("/sd"):
with open(csv_file, "w") as f:
f.write("Timestamp,Fill,Temperature,Humidity,Period_Sent,ACK_Received\n")
print(f"[INFO] Created log file: {csv_file}")
# ----------------------------
# RTC setup
# ----------------------------
rtc = PCF8523(i2c)
def get_timestamp():
t = rtc.datetime
return "{:04}-{:02}-{:02} {:02}:{:02}:{:02}".format(
t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec
)
# ----------------------------
# Logging queue
# ----------------------------
LOG_QUEUE_MAX = 100
log_queue = []
def queue_log(period_sent=None, ack=None):
ts = get_timestamp()
log_line = f"{ts},{fill},{temperature},{humidity},{period_sent if period_sent else ''},{ack if ack else ''}"
log_queue.append(log_line)
print(f"[QUEUE] {log_line}")
print(f"[QUEUE LENGTH] {len(log_queue)}")
def flush_log_queue():
"""Attempt to write queued logs to SD card sequentially"""
while log_queue:
line = log_queue[0] # peek first item
try:
print(f"confirming sd write")
with open(csv_file, "a") as f:
print(f"confirming sd file open")
f.write(line + "\n")
print(f"[LOG] {line}")
log_queue.pop(0) # remove after successful write
except OSError as e:
print(f"SD write failed, will retry: {e}")
break # stop on first failure to retry later
# ----------------------------
# State variables
# ----------------------------
mode = "MAIN"
current_period = 5
pending_period = 5
waiting_for_ack = False
last_cmd_time = 0
RETRY_INTERVAL = 3
last_button_check = 0
BUTTON_INTERVAL = 0.1
fill = 0
temperature = 0
humidity = 0
temp_message = ""
temp_message_end = 0
# ----------------------------
# Helper functions
# ----------------------------
def update_led(fill):
if fill < 35:
pixels.fill((0,255,0))
elif fill < 80:
pixels.fill((255,150,0))
else:
pixels.fill((255,0,0))
def update_oled():
oled.fill(0)
oled.text(f"P:{current_period}s", 0, 0, 1)
if mode=="SET_PERIOD":
oled.text(f"SET:{pending_period}s", 64, 0, 1)
oled.text(f"F:{fill}%", 0, 12, 1)
oled.text(f"T:{temperature}C", 42, 12, 1)
oled.text(f"H:{humidity}%", 84, 12, 1)
if time.monotonic() < temp_message_end:
oled.text(temp_message, 0, 22, 1)
oled.show()
def send_period_command():
global temp_message, temp_message_end, last_cmd_time
msg = {"type":"SET_PERIOD","period":pending_period}
rfm9x.send(json.dumps(msg).encode())
print("SET_PERIOD sent:", pending_period)
temp_message = f"SET_PERIOD sent: {pending_period}"
temp_message_end = time.monotonic() + 3
last_cmd_time = time.monotonic()
queue_log(period_sent=pending_period)
# ----------------------------
# Main loop
# ----------------------------
try:
while True:
now = time.monotonic()
# ---- Buttons (non-blocking) ----
if now - last_button_check > BUTTON_INTERVAL:
last_button_check = now
if not button_a.value:
mode = "SET_PERIOD" if mode=="MAIN" else "MAIN"
time.sleep(0.2)
if not button_b.value and mode=="SET_PERIOD":
pending_period += 5
if pending_period > 30:
pending_period = 5
time.sleep(0.2)
if not button_c.value and mode=="SET_PERIOD":
waiting_for_ack = True
last_cmd_time = 0
send_period_command()
time.sleep(0.2)
# ---- Retry SET_PERIOD if waiting for ACK ----
if waiting_for_ack and (now - last_cmd_time > RETRY_INTERVAL):
send_period_command()
# ---- Receive LoRa packets ----
packet = rfm9x.receive(timeout=0.1)
if packet:
try:
data = json.loads(packet.decode())
if data["type"]=="DATA":
fill = data["fill"]
temperature = data["temperature"]
humidity = data["humidity"]
update_led(fill)
queue_log() # log regular sensor data
elif data["type"]=="ACK":
if data["period"] == pending_period:
current_period = pending_period
waiting_for_ack = False
temp_message = "ACK received"
temp_message_end = time.monotonic() + 3
print("ACK received")
queue_log(ack=current_period)
except Exception as e:
print("Decode error:", e)
# ---- Flush queued logs to SD ----
flush_log_queue()
# ---- Update OLED ----
update_oled()
except KeyboardInterrupt:
oled.fill(0)
oled.show()
pixels.fill((0,0,0))
print("Program stopped, OLED cleared.")Metadata
Metadata
Assignees
Labels
No labels