Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 26 additions & 48 deletions src/wokwi_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@
from pathlib import Path
from typing import Any, Optional, Union, cast

from wokwi_client.exceptions import ProtocolError
from wokwi_client.framebuffer import (
read_framebuffer_png_bytes,
save_framebuffer_png,
)

from .__version__ import get_version
from .constants import DEFAULT_WS_URL
from .control import set_control
from .event_queue import EventQueue
from .exceptions import ProtocolError
from .file_ops import download, upload, upload_file
from .pins import gpio_list, pin_listen, pin_read
from .protocol_types import EventMessage, ResponseMessage
from .framebuffer import (
read_framebuffer_png_bytes,
save_framebuffer_png,
)
from .pins import PinReadMessage, gpio_list, pin_listen, pin_read
from .protocol_types import EventMessage
from .serial import monitor_lines, write_serial
from .simulation import pause, restart, resume, start
from .transport import Transport
Expand Down Expand Up @@ -65,33 +64,25 @@ async def disconnect(self) -> None:
"""
await self._transport.close()

async def upload(self, name: str, content: bytes) -> ResponseMessage:
async def upload(self, name: str, content: bytes) -> None:
"""
Upload a file to the simulator from bytes content.

Args:
name: The name to use for the uploaded file.
content: The file content as bytes.

Returns:
The response message from the server.
"""
return await upload(self._transport, name, content)
await upload(self._transport, name, content)

async def upload_file(
self, filename: str, local_path: Optional[Path] = None
) -> ResponseMessage:
async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> None:
"""
Upload a local file to the simulator.

Args:
filename: The name to use for the uploaded file.
local_path: Optional path to the local file. If not provided, uses filename as the path.

Returns:
The response message from the server.
"""
return await upload_file(self._transport, filename, local_path)
await upload_file(self._transport, filename, local_path)

async def download(self, name: str) -> bytes:
"""
Expand Down Expand Up @@ -127,7 +118,7 @@ async def start_simulation(
elf: Optional[str] = None,
pause: bool = False,
chips: list[str] = [],
) -> ResponseMessage:
) -> None:
"""
Start a new simulation with the given parameters.

Expand All @@ -150,38 +141,29 @@ async def start_simulation(
elf: The ELF file filename (optional).
pause: Whether to start the simulation paused (default: False).
chips: List of custom chips to load into the simulation (default: empty list).

Returns:
The response message from the server.
"""
return await start(
await start(
self._transport,
firmware=firmware,
elf=elf,
pause=pause,
chips=chips,
)

async def pause_simulation(self) -> ResponseMessage:
async def pause_simulation(self) -> None:
"""
Pause the running simulation.

Returns:
The response message from the server.
"""
return await pause(self._transport)
await pause(self._transport)

async def resume_simulation(self, pause_after: Optional[int] = None) -> ResponseMessage:
async def resume_simulation(self, pause_after: Optional[int] = None) -> None:
"""
Resume the simulation, optionally pausing after a given number of nanoseconds.

Args:
pause_after: Number of nanoseconds to run before pausing again (optional).

Returns:
The response message from the server.
"""
return await resume(self._transport, pause_after)
await resume(self._transport, pause_after)

async def wait_until_simulation_time(self, seconds: float) -> None:
"""
Expand All @@ -197,17 +179,14 @@ async def wait_until_simulation_time(self, seconds: float) -> None:
await resume(self._transport, int(remaining_nanos))
await self._pause_queue.get()

async def restart_simulation(self, pause: bool = False) -> ResponseMessage:
async def restart_simulation(self, pause: bool = False) -> None:
"""
Restart the simulation, optionally starting paused.

Args:
pause: Whether to start the simulation paused (default: False).

Returns:
The response message from the server.
"""
return await restart(self._transport, pause)
await restart(self._transport, pause)

async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None:
"""
Expand Down Expand Up @@ -235,16 +214,17 @@ async def serial_write(self, data: Union[bytes, str, list[int]]) -> None:
def _on_pause(self, event: EventMessage) -> None:
self.last_pause_nanos = int(event["nanos"])

async def read_pin(self, part: str, pin: str) -> ResponseMessage:
async def read_pin(self, part: str, pin: str) -> PinReadMessage:
"""Read the current state of a pin.

Args:
part: The part id (e.g. "uno").
pin: The pin name (e.g. "A2").
"""
return await pin_read(self._transport, part=part, pin=pin)
pin_data = await pin_read(self._transport, part=part, pin=pin)
return cast(PinReadMessage, pin_data["result"])

async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage:
async def listen_pin(self, part: str, pin: str, listen: bool = True) -> None:
"""Start or stop listening for changes on a pin.

When enabled, "pin:change" events will be delivered via the transport's
Expand All @@ -255,7 +235,7 @@ async def listen_pin(self, part: str, pin: str, listen: bool = True) -> Response
pin: The pin name.
listen: True to start listening, False to stop.
"""
return await pin_listen(self._transport, part=part, pin=pin, listen=listen)
await pin_listen(self._transport, part=part, pin=pin, listen=listen)

async def gpio_list(self) -> list[str]:
"""Get a list of all GPIO pins available in the simulation.
Expand All @@ -269,17 +249,15 @@ async def gpio_list(self) -> list[str]:
raise ProtocolError("Malformed gpio:list response: expected result.pins: list[str]")
return cast(list[str], pins_val)

async def set_control(
self, part: str, control: str, value: Union[int, bool, float]
) -> ResponseMessage:
async def set_control(self, part: str, control: str, value: Union[int, bool, float]) -> None:
"""Set a control value (e.g. simulate button press).

Args:
part: Part id (e.g. "btn1").
control: Control name (e.g. "pressed").
value: Control value to set (float).
"""
return await set_control(self._transport, part=part, control=control, value=value)
await set_control(self._transport, part=part, control=control, value=value)

async def read_framebuffer_png_bytes(self, id: str) -> bytes:
"""Return the current framebuffer as PNG bytes."""
Expand Down
28 changes: 23 additions & 5 deletions src/wokwi_client/pins.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,30 @@
#
# SPDX-License-Identifier: MIT

from .protocol_types import ResponseMessage
from typing import TypedDict, Union

from wokwi_client.protocol_types import ResponseMessage

from .transport import Transport


class PinReadMessage(TypedDict):
pin: str
direction: str
value: Union[float, int, bool]
pullUp: bool
pullDown: bool


class PinListenEvent(TypedDict):
part: str
pin: str
direction: str
value: Union[float, int, bool]
pullUp: bool
pullDown: bool


async def pin_read(transport: Transport, *, part: str, pin: str) -> ResponseMessage:
"""Read the state of a pin.

Expand All @@ -27,9 +47,7 @@ async def pin_read(transport: Transport, *, part: str, pin: str) -> ResponseMess
return await transport.request("pin:read", {"part": part, "pin": pin})


async def pin_listen(
transport: Transport, *, part: str, pin: str, listen: bool = True
) -> ResponseMessage:
async def pin_listen(transport: Transport, *, part: str, pin: str, listen: bool = True) -> None:
"""Enable or disable listening for changes on a pin.

When listening is enabled, "pin:change" events will be emitted with the
Expand All @@ -42,7 +60,7 @@ async def pin_listen(
listen: True to start listening, False to stop.
"""

return await transport.request("pin:listen", {"part": part, "pin": pin, "listen": listen})
await transport.request("pin:listen", {"part": part, "pin": pin, "listen": listen})


async def gpio_list(transport: Transport) -> ResponseMessage:
Expand Down