Skip to content

Commit

Permalink
Add tests for batched HFP commands/responses; reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
ypomortsev committed Oct 21, 2024
1 parent e1714c1 commit 654030e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 22 deletions.
47 changes: 25 additions & 22 deletions bumble/hfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,9 @@ def _read_at(self, data: bytes):
elif response.code in UNSOLICITED_CODES:
self.unsolicited_queue.put_nowait(response)
else:
logger.warning(f"dropping unexpected response with code '{response.code}'")
logger.warning(
f"dropping unexpected response with code '{response.code}'"
)

async def execute_command(
self,
Expand Down Expand Up @@ -1245,31 +1247,32 @@ def _read_at(self, data: bytes):
# Append to the read buffer.
self.read_buffer.extend(data)

# Locate the trailer.
trailer = self.read_buffer.find(b'\r')
if trailer == -1:
return
while self.read_buffer:
# Locate the trailer.
trailer = self.read_buffer.find(b'\r')
if trailer == -1:
return

# Isolate the AT response code and parameters.
raw_command = self.read_buffer[:trailer]
command = AtCommand.parse_from(raw_command)
logger.debug(f"<<< {raw_command.decode()}")
# Isolate the AT response code and parameters.
raw_command = self.read_buffer[:trailer]
command = AtCommand.parse_from(raw_command)
logger.debug(f"<<< {raw_command.decode()}")

# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 1 :]
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 1 :]

if command.sub_code == AtCommand.SubCode.TEST:
handler_name = f'_on_{command.code.lower()}_test'
elif command.sub_code == AtCommand.SubCode.READ:
handler_name = f'_on_{command.code.lower()}_read'
else:
handler_name = f'_on_{command.code.lower()}'
if command.sub_code == AtCommand.SubCode.TEST:
handler_name = f'_on_{command.code.lower()}_test'
elif command.sub_code == AtCommand.SubCode.READ:
handler_name = f'_on_{command.code.lower()}_read'
else:
handler_name = f'_on_{command.code.lower()}'

if handler := getattr(self, handler_name, None):
handler(*command.parameters)
else:
logger.warning('Handler %s not found', handler_name)
self.send_response('ERROR')
if handler := getattr(self, handler_name, None):
handler(*command.parameters)
else:
logger.warning('Handler %s not found', handler_name)
self.send_response('ERROR')

def send_response(self, response: str) -> None:
"""Sends an AT response."""
Expand Down
32 changes: 32 additions & 0 deletions tests/hfp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,38 @@ def on_sco_request(_connection, _link_type: int):
await asyncio.gather(*sco_disconnection_futures)


# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_batched_response(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections

ag.send_response = lambda str: None
ag.dlc.write(b'\r\n+BIND: (1,2)\r\n\r\nOK\r\n')

await hf.execute_command("AT+BIND=?", response_type=hfp.AtResponseType.SINGLE)


# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_batched_commands(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections

answer_future = asyncio.get_running_loop().create_future()
ag.on('answer', lambda: answer_future.set_result(None))

hang_up_future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: hang_up_future.set_result(None))

hf.dlc.write(b'ATA\rAT+CHUP\r')

await answer_future
await hang_up_future


# -----------------------------------------------------------------------------
async def run():
await test_slc()
Expand Down

0 comments on commit 654030e

Please sign in to comment.