Skip to content

Commit

Permalink
now devices.py is directly accessing device w create()
Browse files Browse the repository at this point in the history
ykush and acroname are now inherting from usb_relay
attempting to support functionality even when the hub is connected to other usb hub
  • Loading branch information
AviaAv committed Jan 9, 2024
1 parent a8386d9 commit 3cedf76
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 141 deletions.
32 changes: 16 additions & 16 deletions unit-tests/py/rspy/acroname.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
https://acroname.com/reference/api/python/index.html
"""

from rspy import log
from rspy import log, usb_relay
import time


Expand Down Expand Up @@ -45,16 +45,15 @@ class NoneFoundError( RuntimeError ):
def __init__( self, message = None ):
super().__init__( self, message or 'no Acroname module found' )

class Acroname:
class Acroname(usb_relay.usb_relay):

hub = None

def __init__(self):
self.discover()

def __del__(self):
self.disconnect()

super().__init__()
acroname = self.discover()
if acroname is None:
raise NoneFoundError()

def discover(self, retries = 0):
"""
Expand Down Expand Up @@ -121,6 +120,7 @@ def connect(self, reset = False, req_spec = None ):
return
raise RuntimeError("failed to reconnect to Acroname (result={})".format(result))

self._set_connected(self, usb_relay.ACRONAME)

def find_all_hubs(self):
"""
Expand Down Expand Up @@ -367,17 +367,17 @@ def get_port_from_usb(self, first_usb_index, second_usb_index ):


if __name__ == '__main__':
device = Acroname()
acroname = Acroname()
for opt,arg in opts:
if opt in ('--enable'):
device.connect()
device.enable_ports() # so ports() will return all
acroname.connect()
acroname.enable_ports() # so ports() will return all
elif opt in ('--disable'):
device.connect()
device.disable_ports()
acroname.connect()
acroname.disable_ports()
elif opt in ('--recycle'):
device.connect()
device.enable_ports() # so ports() will return all
device.recycle_ports()
acroname.connect()
acroname.enable_ports() # so ports() will return all
acroname.recycle_ports()
elif opt in ('--reset'):
device.connect( reset = True )
acroname.connect( reset = True )
31 changes: 16 additions & 15 deletions unit-tests/py/rspy/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def usage():
try:
import pyrealsense2 as rs
log.d( rs )
relay = usb_relay.usb_relay()
relay = usb_relay.create()
sys.path = sys.path[:-1] # remove what we added
except ModuleNotFoundError:
log.w( 'No pyrealsense2 library is available! Running as if no cameras available...' )
Expand Down Expand Up @@ -71,7 +71,7 @@ def __init__( self, sn, dev ):
self._product_line = dev.get_info( rs.camera_info.product_line )
self._physical_port = dev.supports( rs.camera_info.physical_port ) and dev.get_info( rs.camera_info.physical_port ) or None

self._usb_location, self._port = relay.get_usb_and_port_location(self._physical_port, _hubs)
self._usb_location, self._port = usb_relay.get_usb_and_port_location(relay, self._physical_port, _hubs)

self._removed = False

Expand Down Expand Up @@ -125,7 +125,7 @@ def map_unknown_ports():
Fill in unknown ports in devices by enabling one port at a time, finding out which device
is there.
"""
if relay.not_supports_port_mapping:
if not relay or not relay.supports_port_mapping():
return
global _device_by_sn
devices_with_unknown_ports = [device for device in _device_by_sn.values() if device.port is None]
Expand Down Expand Up @@ -201,14 +201,15 @@ def query( monitor_changes=True, hub_reset=False, recycle_ports=True ):
#
# Before we can start a context and query devices, we need to enable all the ports
# on the relay, if any:
if not relay.is_connected():
relay.connect(hub_reset)
relay.disable_ports( sleep_on_change = 5 )
relay.enable_ports( sleep_on_change = MAX_ENUMERATION_TIME )

if platform.system() == 'Linux':
global _hubs
_hubs = set(relay.find_all_hubs())
if relay:
if not relay.is_connected():
relay.connect(hub_reset)
relay.disable_ports( sleep_on_change = 5 )
relay.enable_ports( sleep_on_change = MAX_ENUMERATION_TIME )

if platform.system() == 'Linux':
global _hubs
_hubs = set(relay.find_all_hubs())
#
# Get all devices, and store by serial-number
global _device_by_sn, _context, _port_to_sn
Expand Down Expand Up @@ -498,7 +499,7 @@ def enable_only( serial_numbers, recycle = False, timeout = MAX_ENUMERATION_TIME
re-enabling
:param timeout: The maximum seconds to wait to make sure the devices are indeed online
"""
if relay.has_relay:
if relay and relay.has_relay():
#
ports = [ get( sn ).port for sn in serial_numbers ]
#
Expand Down Expand Up @@ -653,7 +654,7 @@ def get_phys_port(dev):
if opt in ('--list'):
action = 'list'
elif opt in ('--port'):
if relay.has_relay:
if relay.has_relay():
log.f( 'No relay available' )
all_ports = relay.all_ports()
str_ports = arg.split(',')
Expand All @@ -665,12 +666,12 @@ def get_phys_port(dev):
elif opt in ('--ports'):
printer = get_phys_port
elif opt in ('--all'):
if relay.has_relay:
if relay.has_relay():
log.f( 'No relay available' )
relay.enable_ports()
action = 'none'
elif opt in ('--none'):
if relay.has_relay:
if relay.has_relay():
log.f( 'No relay available' )
relay.disable_ports()
action = 'none'
Expand Down
138 changes: 40 additions & 98 deletions unit-tests/py/rspy/usb_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,126 +16,64 @@ def __init__(self):
self._relay_type = NO_RELAY
self._is_connected = False

def __del__(self):
self.disable_ports()
self.disconnect()

def connect(self, reset = False):
if self.is_connected():
log.d("Already connected to", self._relay, "disconnecting...")
self.disconnect()
self._relay_type, self._relay = _find_active_relay()
if self._relay:
self._relay.connect(reset)
self._is_connected = True
log.d("Now connected to:", self._relay)

def disconnect(self):
if self._relay:
self._relay.disconnect()
del self._relay
self._relay = None
self._relay_type = NO_RELAY
self._is_connected = False

def is_connected(self):
return self._is_connected

def ports(self):
if self._relay_type != NO_RELAY:
return self._relay.ports()

def all_ports(self):
if self._relay_type != NO_RELAY:
return self._relay.all_ports()

def enable_ports(self, ports=None, disable_other_ports=False, sleep_on_change=0):
def _set_connected(self, relay, relay_type):
"""
Set enable state to provided ports
:param ports: List of port numbers; if not provided, enable all ports
:param disable_other_ports: if True, the ports not in the list will be disabled
:param sleep_on_change: Number of seconds to sleep if any change is made
:return: True if no errors found, False otherwise
helper function used by the inheriting classes to set the relay and its type
"""
if self._relay_type != NO_RELAY:
return self._relay.enable_ports(ports, disable_other_ports, sleep_on_change)
return False
self._relay = relay
self._relay_type = relay_type
self._is_connected = True

def disable_ports(self, ports=None, sleep_on_change=0):
"""
:param ports: List of port numbers; if not provided, disable all ports
:param sleep_on_change: Number of seconds to sleep if any change is made
:return: True if no errors found, False otherwise
"""
if self._relay_type != NO_RELAY:
return self._relay.disable_ports(ports, sleep_on_change)
return False

def recycle_ports(self, portlist=None, timeout=2):
"""
Disable and enable a port
:param portlist: List of port numbers; if not provided, recycle all ports
:param timeout: how long to wait before re-enabling
:return: True if everything OK, False otherwise
"""
if self._relay_type != NO_RELAY:
return self._relay.recycle_ports(portlist=portlist, timeout=timeout)
return False

def find_all_hubs(self):
"""
Yields all hub port numbers
"""
if self._relay_type != NO_RELAY:
return self._relay.find_all_hubs()

@property
def has_relay(self):
return self._relay_type != NO_RELAY

@property
def not_supports_port_mapping(self):
def supports_port_mapping(self):
"""
:return: whether the connected relay supports port mapping, currently only the acroname supports it
"""
return self._relay_type != ACRONAME
return self._relay_type == ACRONAME

def get_usb_and_port_location(relay, physical_port, hubs):
usb_location = None
port = None
try:
usb_location = _get_usb_location( physical_port )
except Exception as e:
log.e( 'Failed to get usb location:', e )

def get_usb_and_port_location(self, physical_port, hubs):
usb_location = None
port = None
if relay and relay.has_relay():
try:
usb_location = _get_usb_location( physical_port )
port = _get_port_by_loc( usb_location, hubs, relay._relay, relay._relay_type )
except Exception as e:
log.e( 'Failed to get usb location:', e )

if self.has_relay:
try:
port = _get_port_by_loc( usb_location, hubs, self._relay, self._relay_type )
except Exception as e:
log.e( 'Failed to get device port:', e )
log.d( ' physical port is', physical_port )
log.d( ' USB location is', usb_location )
log.e( 'Failed to get device port:', e )
log.d( ' physical port is', physical_port )
log.d( ' USB location is', usb_location )

return usb_location, port
return usb_location, port


#################################
def create():
return _find_active_relay()[1]


def _find_active_relay():
"""
Function finds an available relay to connect to and returns it
"""
acroname_relay = _is_there_acroname()
acroname_relay = _create_acroname()
if acroname_relay:
return ACRONAME, acroname_relay

ykush_relay = _is_there_ykush()
ykush_relay = _create_ykush()
if ykush_relay:
return YKUSH, ykush_relay

import sys
log.d('sys.path=', sys.path)
return NO_RELAY, None


def _is_there_acroname():
def _create_acroname():
try:
from rspy import acroname
return acroname.Acroname()
Expand All @@ -147,7 +85,7 @@ def _is_there_acroname():
return None


def _is_there_ykush():
def _create_ykush():
try:
from rspy import ykush
return ykush.Ykush()
Expand Down Expand Up @@ -199,7 +137,7 @@ def _get_usb_location( physical_port ):
# and, for T265: Port_#0002.Hub_#0006
return result[0]
#
def _get_port_by_loc( usb_location, hubs, acroname_relay, relay_type):
def _get_port_by_loc( usb_location, hubs, relay, relay_type):
"""
"""
if usb_location:
Expand All @@ -211,8 +149,12 @@ def _get_port_by_loc( usb_location, hubs, acroname_relay, relay_type):
return None #int(match.group(2))
else:
split_location = [int(x) for x in usb_location.split('.')]
# lambda helper to return the last 2 non-zero numbers, used when connecting using an additional hub
# ex: laptop -> hub -> ykush
get_last_two_digits = lambda array: tuple(reversed(list(reversed([i for i in array if i != 0]))[:2]))
# only the last two digits are necessary
return acroname_relay.get_port_from_usb( split_location[-5], split_location[-4] )
first_index, second_index = get_last_two_digits(split_location)
return relay.get_port_from_usb(first_index, second_index)
#
else:
#
Expand All @@ -236,11 +178,11 @@ def _get_usb_location( physical_port ):
return port_location
#

def _get_port_by_loc( usb_location, hubs, acroname_relay, relay_type ):
def _get_port_by_loc( usb_location, hubs, relay, relay_type ):
"""
"""
if relay_type == YKUSH:
return acroname_relay.get_port_from_usb(int(usb_location.split(".")[1]), 0)
return relay.get_port_from_usb(int(usb_location.split(".")[1]), 0)
if usb_location:
#
# Devices connected through an acroname will be in one of two sub-hubs under the acroname main
Expand All @@ -265,4 +207,4 @@ def _get_port_by_loc( usb_location, hubs, acroname_relay, relay_type ):
if usb_location.startswith( port + '.' ):
match = re.search( r'^(\d+)\.(\d+)', usb_location[len(port)+1:] )
if match:
return acroname_relay.get_port_from_usb( int(match.group(1)), int(match.group(2)) )
return relay.get_port_from_usb(int(match.group(1)), int(match.group(2)))
Loading

0 comments on commit 3cedf76

Please sign in to comment.