Skip to content

Commit

Permalink
feat: send amiibo report
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemeowx2 committed Apr 23, 2020
1 parent e904993 commit be8dce7
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 32 deletions.
5 changes: 5 additions & 0 deletions joycontrol/controller_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class ControllerState:
def __init__(self, protocol, controller: Controller, spi_flash: FlashMemory = None):
self._protocol = protocol
self._controller = controller
self._nfc_content = None

self._spi_flash = spi_flash

Expand Down Expand Up @@ -198,6 +199,10 @@ async def button_push(controller_state, *buttons, sec=0.1):
await controller_state.send()


async def set_nfc(controller_state, nfc_content):
controller_state._nfc_content = nfc_content


class _StickCalibration:
def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center):
self.h_center = h_center
Expand Down
154 changes: 154 additions & 0 deletions joycontrol/mcu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from enum import Enum
from crc8 import crc8


class Action(Enum):
NON = 0
REQUEST_STATUS = 1
START_TAG_POLLING = 2
START_TAG_DISCOVERY = 3
READ_TAG = 4
READ_TAG_2 = 5
READ_FINISHED = 6


class McuState(Enum):
NOT_INITIALIZED = 0
IRC = 1
NFC = 2
STAND_BY = 3
BUSY = 4


def copyarray(dest, offset, src):
for i in range(len(src)):
dest[offset + i] = src[i]

class Mcu:
def __init__(self):
self._fw_major = [0, 3]
self._fw_minor = [0, 5]

self._bytes = [0] * 313

self._action = Action.NON
self._state = McuState.NOT_INITIALIZED

self._nfc_content = None

def get_fw_major(self):
return self._fw_major

def get_fw_minor(self):
return self._fw_minor

def set_action(self, v):
self._action = v

def get_action(self):
return self._action

def set_state(self, v):
self._state = v

def get_state(self):
return self._state

def _get_state_byte(self):
if self.get_state() == McuState.NFC:
return 4
elif self.get_state() == McuState.BUSY:
return 6
elif self.get_state() == McuState.NOT_INITIALIZED:
return 1
elif self.get_state() == McuState.STAND_BY:
return 1
else:
return 0

def update_status(self):
self._bytes[0] = 1
self._bytes[1] = 0
self._bytes[2] = 0
self._bytes[3] = self._fw_major[0]
self._bytes[4] = self._fw_major[1]
self._bytes[5] = self._fw_minor[0]
self._bytes[6] = self._fw_minor[1]
self._bytes[7] = self._get_state_byte()

def update_nfc_report(self):
self._bytes = [0] * 313
if self.get_action() == Action.REQUEST_STATUS:
self._bytes[0] = 1
self._bytes[1] = 0
self._bytes[2] = 0
self._bytes[3] = self._fw_major[0]
self._bytes[4] = self._fw_major[1]
self._bytes[5] = self._fw_minor[0]
self._bytes[6] = self._fw_minor[1]
self._bytes[7] = self._get_state_byte()
elif self.get_action() == Action.NON:
self._bytes[0] = 0xff
elif self.get_action() == Action.START_TAG_DISCOVERY:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
self._bytes[5] = 9
self._bytes[6] = 0x31
self._bytes[7] = 0
elif self.get_action() == Action.START_TAG_POLLING:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
if not self._nfc_content is None:
data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07]
copyarray(self._bytes, 5, data)
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
else:
print('nfc content is none')
self._bytes[5] = 9
self._bytes[6] = 0x31
self._bytes[7] = 0
elif self.get_action() == Action.READ_TAG or self.get_action() == Action.READ_TAG_2:
self._bytes[0] = 0x3a
self._bytes[1] = 0
self._bytes[2] = 7
if self.get_action() == Action.READ_TAG:
data1 = bytes.fromhex('010001310200000001020007')
copyarray(self._bytes, 3, data1)
copyarray(self._bytes, 3 + len(data1), self._nfc_content[0:3])
copyarray(self._bytes, 3 + len(data1) + 3, self._nfc_content[4:8])
data2 = bytes.fromhex('000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000')
copyarray(self._bytes, 3 + len(data1) + 3 + 4, data2)
copyarray(self._bytes, 3 + len(data1) + 3 + 4 + len(data2), self._nfc_content[0:245])
self.set_action(Action.READ_TAG_2)
else:
data = bytes.fromhex('02000927')
copyarray(self._bytes, 3, data)
copyarray(self._bytes, 3 + len(data), self._nfc_content[245:])
self.set_action(Action.READ_FINISHED)
elif self.get_action() == Action.READ_FINISHED:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
data = bytes.fromhex('0931040000000101020007')
copyarray(self._bytes, 5, data)
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])

crc = crc8()
crc.update(bytes(self._bytes[:-1]))
self._bytes[-1] = ord(crc.digest())

def set_nfc(self, nfc_content):
self._nfc_content = nfc_content

def __bytes__(self):
return bytes(self._bytes)
135 changes: 107 additions & 28 deletions joycontrol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from joycontrol.memory import FlashMemory
from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID
from joycontrol.transport import NotConnectedError
from joycontrol.mcu import Mcu, McuState, Action
from crc8 import crc8

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -39,6 +41,8 @@ def __init__(self, controller: Controller, spi_flash: FlashMemory = None):
self._controller_state = ControllerState(self, controller, spi_flash=spi_flash)
self._controller_state_sender = None

self._mcu = Mcu()

# None = Just answer to sub commands
self._input_report_mode = None

Expand Down Expand Up @@ -89,6 +93,11 @@ async def write(self, input_report: InputReport):
input_report.set_timer(self._input_report_timer)
self._input_report_timer = (self._input_report_timer + 1) % 0x100

if input_report.get_input_report_id() == 0x31:
self._mcu.set_nfc(self._controller_state._nfc_content)
self._mcu.update_nfc_report()
input_report.set_mcu(self._mcu)

await self.transport.write(input_report)

self._controller_state.sig_is_send.set()
Expand Down Expand Up @@ -120,22 +129,22 @@ def error_received(self, exc: Exception) -> None:
# TODO?
raise NotImplementedError()

async def input_report_mode_0x30(self):
async def input_report_mode_full(self):
"""
Continuously sends 0x30 input reports containing the controller state.
Continuously sends full input reports containing the controller state.
"""
if self.transport.is_reading():
raise ValueError('Transport must be paused in 0x30 input report mode')
raise ValueError('Transport must be paused in full input report mode')

input_report = InputReport()
input_report.set_input_report_id(0x30)
input_report.set_vibrator_input()
input_report.set_misc()

reader = asyncio.ensure_future(self.transport.read())

try:
while True:
input_report.set_input_report_id(self._input_report_mode)
# TODO: improve timing
if self.controller == Controller.PRO_CONTROLLER:
# send state at 120Hz
Expand All @@ -159,6 +168,10 @@ async def input_report_mode_0x30(self):
pass
elif output_report_id == OutputReportID.SUB_COMMAND:
reply_send = await self._reply_to_sub_command(report)
elif output_report_id == OutputReportID.REQUEST_MCU:
reply_send = await self._reply_to_mcu(report)
else:
logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE')
except ValueError as v_err:
logger.warning(f'Report parsing error "{v_err}" - IGNORE')
except NotImplementedError as err:
Expand Down Expand Up @@ -207,6 +220,47 @@ async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int])
else:
logger.warning(f'Output report {output_report_id} not implemented - ignoring')

async def _reply_to_mcu(self, report):
sub_command = report.data[11]
sub_command_data = report.data[12:]

# logging.info(f'received output report - Request MCU sub command {sub_command}')

if self._mcu.get_action() == Action.READ_TAG or self._mcu.get_action() == Action.READ_TAG_2 or self._mcu.get_action() == Action.READ_FINISHED:
return

# Request mcu state
if sub_command == 0x01:
# input_report = InputReport()
# input_report.set_input_report_id(0x21)
# input_report.set_misc()

# input_report.set_ack(0xA0)
# input_report.reply_to_subcommand_id(0x21)

self._mcu.set_action(Action.REQUEST_STATUS)
# input_report.set_mcu(self._mcu)

# await self.write(input_report)
# Send Start tag discovery
elif sub_command == 0x02:
# 0: Cancel all, 4: StartWaitingReceive
if sub_command_data[0] == 0x04:
self._mcu.set_action(Action.START_TAG_DISCOVERY)
# 1: Start polling
elif sub_command_data[0] == 0x01:
self._mcu.set_action(Action.START_TAG_POLLING)
# 2: stop polling
elif sub_command_data[0] == 0x02:
self._mcu.set_action(Action.NON)
elif sub_command_data[0] == 0x06:
self._mcu.set_action(Action.READ_TAG)
else:
logging.info(f'Unknown sub_command_data arg {sub_command_data}')
else:
logging.info(f'Unknown MCU sub command {sub_command}')


async def _reply_to_sub_command(self, report):
# classify sub command
try:
Expand Down Expand Up @@ -317,34 +371,40 @@ async def _command_spi_flash_read(self, sub_command_data):

async def _command_set_input_report_mode(self, sub_command_data):
if sub_command_data[0] == 0x30:
logger.info('Setting input report mode to 0x30...')
pass
elif sub_command_data[0] == 0x31:
pass
else:
logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request')
return

input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
logger.info(f'Setting input report mode to {hex(sub_command_data[0])}...')

input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(0x03)
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()

input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(0x03)

await self.write(input_report)
await self.write(input_report)

# start sending 0x30 input reports
if self._input_report_mode != 0x30:
self._input_report_mode = 0x30
# start sending input reports
if self._input_report_mode is None:

self.transport.pause_reading()
new_reader = asyncio.ensure_future(self.input_report_mode_0x30())
self.transport.pause_reading()
new_reader = asyncio.ensure_future(self.input_report_mode_full())

# We need to swap the reader in the future because this function was probably called by it
async def set_reader():
await self.transport.set_reader(new_reader)
self.transport.resume_reading()
# We need to swap the reader in the future because this function was probably called by it
async def set_reader():
await self.transport.set_reader(new_reader)
self.transport.resume_reading()

asyncio.ensure_future(set_reader()).add_done_callback(
utils.create_error_check_callback()
)
else:
logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request')
asyncio.ensure_future(set_reader()).add_done_callback(
utils.create_error_check_callback()
)

self._input_report_mode = sub_command_data[0]

async def _command_trigger_buttons_elapsed_time(self, sub_command_data):
input_report = InputReport()
Expand Down Expand Up @@ -392,10 +452,26 @@ async def _command_set_nfc_ir_mcu_config(self, sub_command_data):
input_report.set_ack(0xA0)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)

# TODO
data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200]
self._mcu.update_status()
data = list(bytes(self._mcu)[0:34])
crc = crc8()
crc.update(bytes(data[:-1]))
checksum = crc.digest()
data[-1] = ord(checksum)

for i in range(len(data)):
input_report.data[16 + i] = data[i]
input_report.data[16+i] = data[i]

# Set MCU mode cmd
if sub_command_data[1] == 0:
if sub_command_data[2] == 0:
self._mcu.set_state(McuState.STAND_BY)
elif sub_command_data[2] == 4:
self._mcu.set_state(McuState.NFC)
else:
logger.info(f"unknown mcu state {sub_command_data[2]}")
else:
logger.info(f"unknown mcu config command {sub_command_data}")

await self.write(input_report)

Expand All @@ -408,10 +484,13 @@ async def _command_set_nfc_ir_mcu_state(self, sub_command_data):
# 0x01 = Resume
input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
self._mcu.set_action(Action.NON)
self._mcu.set_state(McuState.STAND_BY)
elif sub_command_data[0] == 0x00:
# 0x00 = Suspend
input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
self._mcu.set_state(McuState.STAND_BY)
else:
raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} '
f'not implemented.')
Expand Down
Loading

0 comments on commit be8dce7

Please sign in to comment.