-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use classes for each connection method
- Loading branch information
Showing
10 changed files
with
214 additions
and
110 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import atexit | ||
from PIL import ImageDraw | ||
from multiprocessing import Process | ||
from time import sleep | ||
|
||
from pitop import Pitop | ||
|
||
from .connection.methods import ConnectionMethod | ||
from .connection.monitor import ConnectionMonitor | ||
from .helpers import ( | ||
FIRST_LINE_Y, | ||
MARGIN_X, | ||
SECOND_LINE_Y, | ||
THIRD_LINE_Y, | ||
draw_text, | ||
play_animated_image_file, | ||
) | ||
|
||
|
||
class OnboardingApp: | ||
def __init__(self): | ||
self.miniscreen = Pitop().miniscreen | ||
self.__auto_play_thread = None | ||
self.__stop_thread = False | ||
atexit.register(self.stop) | ||
|
||
def start(self): | ||
self.__auto_play_thread = Process(target=self.__run_in_background, args=()) | ||
self.__auto_play_thread.start() | ||
|
||
def stop(self): | ||
self.__stop_thread = True | ||
if self.__auto_play_thread and self.__auto_play_thread.is_alive(): | ||
self.__auto_play_thread.join() | ||
|
||
def play_state_animation(self, connection_state): | ||
if connection_state.is_connected(): | ||
# final image shouldn't be cleared after animation finishes | ||
play_animated_image_file(self.miniscreen, connection_state.path_to_image) | ||
else: | ||
self.miniscreen.play_animated_image_file(connection_state.path_to_image, | ||
loop=True, | ||
background=True) | ||
|
||
def display_connection_data(self, connection_state): | ||
image = self.miniscreen.image.copy() | ||
canvas = ImageDraw.Draw(image) | ||
if connection_state.connection_method == ConnectionMethod.AP: | ||
draw_text(canvas, text=str(connection_state.ssid), xy=(MARGIN_X, FIRST_LINE_Y),) | ||
draw_text(canvas, text=str(connection_state.passphrase), xy=(MARGIN_X, SECOND_LINE_Y),) | ||
draw_text(canvas, text=str(connection_state.ip), xy=(MARGIN_X, THIRD_LINE_Y),) | ||
elif connection_state.connection_method == ConnectionMethod.USB: | ||
draw_text(canvas, text=str(connection_state.username), xy=(MARGIN_X, FIRST_LINE_Y),) | ||
draw_text(canvas, text=str(connection_state.password), xy=(MARGIN_X, SECOND_LINE_Y),) | ||
draw_text(canvas, text=str(connection_state.ip), xy=(MARGIN_X, THIRD_LINE_Y),) | ||
elif connection_state.connection_method == ConnectionMethod.ETHERNET: | ||
draw_text(canvas, text=str(connection_state.ip), xy=(MARGIN_X, SECOND_LINE_Y),) | ||
self.miniscreen.display_image(image) | ||
|
||
def __run_in_background(self): | ||
connection = ConnectionMonitor() | ||
previous_state = None | ||
while self.__stop_thread is False: | ||
current_state = connection.state | ||
if current_state != previous_state: | ||
previous_state = current_state | ||
self.play_state_animation(current_state) | ||
if current_state.is_connected(): | ||
self.display_connection_data(current_state) | ||
sleep(0.5) | ||
self.miniscreen.stop_animated_image() | ||
|
||
|
||
if __name__ == '__main__': | ||
try: | ||
app = OnboardingApp() | ||
app.start() | ||
except KeyboardInterrupt: | ||
pass |
Empty file.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
from enum import Enum, auto | ||
from getpass import getuser | ||
from ipaddress import ip_address | ||
from os import path | ||
|
||
from pitopcommon.sys_info import ( | ||
get_address_for_ptusb_connected_device, | ||
get_ap_mode_status, | ||
get_internal_ip, | ||
) | ||
from pitopcommon.pt_os import is_pi_using_default_password | ||
|
||
|
||
class ConnectionMethod(Enum): | ||
AP = auto() | ||
USB = auto() | ||
ETHERNET = auto() | ||
NONE = auto() | ||
|
||
|
||
class ConnectionMethodBase: | ||
def __init__(self, connection_method): | ||
self.connection_method = connection_method | ||
self.ip = "" | ||
self.path_to_image = "" | ||
self.interface_name = "" | ||
|
||
def is_connected(self): | ||
return False | ||
|
||
def get_image_file_path(self, relative_file_name): | ||
return path.abspath( | ||
path.join(path.dirname(path.abspath(__file__)) + "/../images", relative_file_name) | ||
) | ||
|
||
def __eq__(self, other): | ||
return isinstance(other, ConnectionMethodBase) \ | ||
and self.connection_method == other.connection_method \ | ||
and self.ip == other.ip \ | ||
and self.is_connected() == other.is_connected() | ||
|
||
|
||
class NoConnection(ConnectionMethodBase): | ||
def __init__(self): | ||
self.connection_method = ConnectionMethod.NONE | ||
self.ip = "" | ||
self.path_to_image = self.get_image_file_path("first_time_connect.gif") | ||
self.interface_name = "" | ||
|
||
def is_connected(self): | ||
return False | ||
|
||
|
||
class UsbConnection(ConnectionMethodBase): | ||
def __init__(self): | ||
self.connection_method = ConnectionMethod.USB | ||
self.username = "pi" if getuser() == "root" else getuser() | ||
self.password = "pi-top" if is_pi_using_default_password() is True else "********" | ||
self.path_to_image = self.get_image_file_path("usb.gif") | ||
self.interface_name = "ptusb0" | ||
|
||
try: | ||
self.ip = ip_address(get_internal_ip(iface=self.interface_name)) | ||
except Exception: | ||
self.ip = "" | ||
|
||
def is_connected(self): | ||
return get_address_for_ptusb_connected_device() != "" | ||
|
||
|
||
class ApConnection(ConnectionMethodBase): | ||
def __init__(self): | ||
self.connection_method = ConnectionMethod.AP | ||
connection_status = get_ap_mode_status() | ||
self.ssid = connection_status.get("ssid") | ||
self.passphrase = connection_status.get("passphrase") | ||
self.path_to_image = self.get_image_file_path("ap.gif") | ||
self.interface_name = "wlan_ap0" | ||
|
||
try: | ||
self.ip = ip_address(get_internal_ip(iface=self.interface_name)) | ||
except Exception: | ||
self.ip = "" | ||
|
||
def is_connected(self): | ||
try: | ||
return get_ap_mode_status().get("state", "") == "active" | ||
except Exception: | ||
return False | ||
|
||
|
||
class EthernetConnection(ConnectionMethodBase): | ||
def __init__(self): | ||
self.connection_method = ConnectionMethod.ETHERNET | ||
self.path_to_image = self.get_image_file_path("lan.gif") | ||
self.interface_name = "eth0" | ||
|
||
try: | ||
self.ip = ip_address(get_internal_ip(iface=self.interface_name)) | ||
except Exception: | ||
self.ip = "" | ||
|
||
def is_connected(self): | ||
return self.ip != "" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,39 @@ | ||
from getpass import getuser | ||
from ipaddress import ip_address | ||
from threading import Thread | ||
from time import sleep | ||
from threading import Thread | ||
|
||
from pitopcommon.sys_info import ( | ||
get_internal_ip, | ||
get_address_for_ptusb_connected_device | ||
from .methods import ( | ||
ConnectionMethod, | ||
NoConnection, | ||
ApConnection, | ||
UsbConnection, | ||
EthernetConnection, | ||
) | ||
from pitopcommon.pt_os import is_pi_using_default_password | ||
|
||
from .state import ConnectionState | ||
|
||
|
||
class ConnectionMonitor: | ||
def __init__(self): | ||
self.username = "pi" if getuser() == "root" else getuser() | ||
self.password = "pi-top" if is_pi_using_default_password() is True else "********" | ||
self.state = ConnectionState() | ||
self.connections = { | ||
ConnectionMethod.AP: ApConnection(), | ||
ConnectionMethod.USB: UsbConnection(), | ||
ConnectionMethod.ETHERNET: EthernetConnection(), | ||
ConnectionMethod.NONE: NoConnection(), | ||
} | ||
|
||
# initial state | ||
self.state = self.connections.get(ConnectionMethod.NONE) | ||
|
||
self.stop_thread = False | ||
self.__update_state_thread = Thread(target=self.__update_state, args=()) | ||
self.__update_state_thread.start() | ||
|
||
def __update_state(self): | ||
while self.stop_thread is False: | ||
ptusb0_ip = "" | ||
eth0_ip = "" | ||
connected_device_ip = "" | ||
if not self.state.ethernet_is_connected(): | ||
try: | ||
ptusb0_ip = ip_address(get_internal_ip(iface="ptusb0")) | ||
except ValueError: | ||
pass | ||
connected_device_ip = get_address_for_ptusb_connected_device() | ||
|
||
if not self.state.usb_is_connected(): | ||
try: | ||
eth0_ip = ip_address(get_internal_ip(iface="eth0")) | ||
except ValueError: | ||
pass | ||
|
||
self.state = ConnectionState( | ||
eth0_ip=eth0_ip, | ||
ptusb0_ip=ptusb0_ip, | ||
connected_device_ip=connected_device_ip) | ||
for connection_name, connection_data in self.connections.items(): | ||
if connection_data.is_connected(): | ||
self.state = connection_data | ||
break | ||
|
||
if not self.state.is_connected(): | ||
self.state = self.connections.get(ConnectionMethod.NONE) | ||
|
||
sleep(0.5) |
This file was deleted.
Oops, something went wrong.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters