diff --git a/labgrid/remote/client.py b/labgrid/remote/client.py index 6e2c92d5a..63c0cf859 100755 --- a/labgrid/remote/client.py +++ b/labgrid/remote/client.py @@ -1,5 +1,6 @@ """The remote.client module contains the functionality to connect to a coordinator, acquire a place and interact with the connected resources""" + import argparse import asyncio import contextlib @@ -20,11 +21,21 @@ from datetime import datetime from pprint import pformat import txaio + txaio.use_asyncio() from autobahn.asyncio.wamp import ApplicationSession -from .common import (ResourceEntry, ResourceMatch, Place, Reservation, ReservationState, TAG_KEY, - TAG_VAL, enable_tcp_nodelay, monkey_patch_max_msg_payload_size_ws_option) +from .common import ( + ResourceEntry, + ResourceMatch, + Place, + Reservation, + ReservationState, + TAG_KEY, + TAG_VAL, + enable_tcp_nodelay, + monkey_patch_max_msg_payload_size_ws_option, +) from .. import Environment, Target, target_factory from ..exceptions import NoDriverFoundError, NoResourceFoundError, InvalidConfigError from ..resource.remote import RemotePlaceManager, RemotePlace @@ -59,20 +70,20 @@ class ClientSession(ApplicationSession): the coordinator.""" def gethostname(self): - return os.environ.get('LG_HOSTNAME', gethostname()) + return os.environ.get("LG_HOSTNAME", gethostname()) def getuser(self): - return os.environ.get('LG_USERNAME', getuser()) + return os.environ.get("LG_USERNAME", getuser()) def onConnect(self): """Actions which are executed if a connection is successfully opened.""" - self.loop = self.config.extra['loop'] - self.connected = self.config.extra['connected'] - self.args = self.config.extra.get('args') - self.env = self.config.extra.get('env', None) - self.role = self.config.extra.get('role', None) - self.prog = self.config.extra.get('prog', os.path.basename(sys.argv[0])) - self.monitor = self.config.extra.get('monitor', False) + self.loop = self.config.extra["loop"] + self.connected = self.config.extra["connected"] + self.args = self.config.extra.get("args") + self.env = self.config.extra.get("env", None) + self.role = self.config.extra.get("role", None) + self.prog = self.config.extra.get("prog", os.path.basename(sys.argv[0])) + self.monitor = self.config.extra.get("monitor", False) enable_tcp_nodelay(self) self.join( self.config.realm, @@ -83,32 +94,31 @@ def onConnect(self): def onChallenge(self, challenge): import warnings - warnings.warn("Ticket authentication is deprecated. Please update your coordinator.", - DeprecationWarning) + + warnings.warn("Ticket authentication is deprecated. Please update your coordinator.", DeprecationWarning) logging.warning("Ticket authentication is deprecated. Please update your coordinator.") return "dummy-ticket" async def onJoin(self, details): # FIXME race condition? - resources = await self.call('org.labgrid.coordinator.get_resources') + resources = await self.call("org.labgrid.coordinator.get_resources") self.resources = {} for exporter, groups in resources.items(): for group_name, group in sorted(groups.items()): for resource_name, resource in sorted(group.items()): await self.on_resource_changed(exporter, group_name, resource_name, resource) - places = await self.call('org.labgrid.coordinator.get_places') + places = await self.call("org.labgrid.coordinator.get_places") self.places = {} for placename, config in places.items(): await self.on_place_changed(placename, config) - await self.subscribe(self.on_resource_changed, 'org.labgrid.coordinator.resource_changed') - await self.subscribe(self.on_place_changed, 'org.labgrid.coordinator.place_changed') + await self.subscribe(self.on_resource_changed, "org.labgrid.coordinator.resource_changed") + await self.subscribe(self.on_place_changed, "org.labgrid.coordinator.place_changed") await self.connected(self) async def on_resource_changed(self, exporter, group_name, resource_name, resource): - group = self.resources.setdefault(exporter, - {}).setdefault(group_name, {}) + group = self.resources.setdefault(exporter, {}).setdefault(group_name, {}) # Do not replace the ResourceEntry object, as other components may keep # a reference to it and want to see changes. if resource_name not in group: @@ -134,8 +144,8 @@ async def on_place_changed(self, name, config): print(f"Place {name} deleted") return config = config.copy() - config['name'] = name - config['matches'] = [ResourceMatch(**match) for match in config['matches']] + config["name"] = name + config["matches"] = [ResourceMatch(**match) for match in config["matches"]] config = filter_dict(config, Place, warn=True) if name not in self.places: place = Place(**config) @@ -158,19 +168,19 @@ async def do_monitor(self): await asyncio.sleep(3600.0) async def complete(self): - if self.args.type == 'resources': + if self.args.type == "resources": for exporter, groups in sorted(self.resources.items()): for group_name, group in sorted(groups.items()): for _, resource in sorted(group.items()): print(f"{exporter}/{group_name}/{resource.cls}") - elif self.args.type == 'places': + elif self.args.type == "places": for name in sorted(self.places.keys()): print(name) - elif self.args.type == 'matches': + elif self.args.type == "matches": place = self.get_place() for match in place.matches: print(repr(match)) - elif self.args.type == 'match-names': + elif self.args.type == "match-names": place = self.get_place() match_names = {match.rename for match in place.matches if match.rename is not None} print("\n".join(match_names)) @@ -198,8 +208,7 @@ async def print_resources(self): continue if self.args.acquired and resource.acquired is None: continue - if match and not match.ismatch((exporter, group_name, - resource.cls, resource_name)): + if match and not match.ismatch((exporter, group_name, resource.cls, resource_name)): continue filtered[exporter][group_name][resource_name] = resource @@ -211,9 +220,11 @@ async def print_resources(self): for group_name, group in sorted(groups.items()): print(f" Group '{group_name}' ({exporter}/{group_name}/*):") for resource_name, resource in sorted(group.items()): - print(" Resource '{res}' ({exporter}/{group}/{res_cls}[/{res}]):" - .format(res=resource_name, exporter=exporter, group=group_name, - res_cls=resource.cls)) + print( + " Resource '{res}' ({exporter}/{group}/{res_cls}[/{res}]):".format( + res=resource_name, exporter=exporter, group=group_name, res_cls=resource.cls + ) + ) print(indent(pformat(resource.asdict()), prefix=" ")) else: results = [] @@ -234,10 +245,7 @@ async def print_resources(self): for places, exporter, group_name, resource_cls in results: if self.args.sort_by_matched_place_change: - places_strs = [ - f"{p.name}: {datetime.fromtimestamp(p.changed):%Y-%m-%d}" - for p in places - ] + places_strs = [f"{p.name}: {datetime.fromtimestamp(p.changed):%Y-%m-%d}" for p in places] places_info = ", ".join(places_strs) if places_strs else "not used by any place" else: @@ -272,18 +280,18 @@ async def print_places(self): def print_who(self): """Print acquired places by user""" - result = ['User Host Place Changed'.split()] + result = ["User Host Place Changed".split()] if self.args.show_exporters: - result[0].append('Exporters') + result[0].append("Exporters") for name, place in self.places.items(): if place.acquired is None: continue - host, user = place.acquired.split('/') + host, user = place.acquired.split("/") result.append([user, host, name, str(datetime.fromtimestamp(place.changed))]) if self.args.show_exporters: exporters = {resource_path[0] for resource_path in place.acquired_resources} - result[-1].append(', '.join(sorted(exporters))) + result[-1].append(", ".join(sorted(exporters))) result.sort() widths = [max(map(len, c)) for c in zip(*result)] @@ -305,10 +313,10 @@ def _match_places(self, pattern): # reservation token lookup token = None - if pattern.startswith('+'): + if pattern.startswith("+"): token = pattern[1:] if not token: - token = os.environ.get('LG_TOKEN', None) + token = os.environ.get("LG_TOKEN", None) if not token: return [] for name, place in self.places.items(): @@ -323,8 +331,8 @@ def _match_places(self, pattern): if pattern in name: result.add(name) for alias in place.aliases: - if ':' in alias: - namespace, alias = alias.split(':', 1) + if ":" in alias: + namespace, alias = alias.split(":", 1) if namespace != self.getuser(): continue if alias == pattern: # prefer user namespace @@ -336,16 +344,12 @@ def _match_places(self, pattern): def _check_allowed(self, place): if not place.acquired: raise UserError(f"place {place.name} is not acquired") - if f'{self.gethostname()}/{self.getuser()}' not in place.allowed: - host, user = place.acquired.split('/') + if f"{self.gethostname()}/{self.getuser()}" not in place.allowed: + host, user = place.acquired.split("/") if user != self.getuser(): - raise UserError( - f"place {place.name} is not acquired by your user, acquired by {user}" - ) + raise UserError(f"place {place.name} is not acquired by your user, acquired by {user}") if host != self.gethostname(): - raise UserError( - f"place {place.name} is not acquired on this computer, acquired on {host}" - ) + raise UserError(f"place {place.name} is not acquired on this computer, acquired on {host}") def get_place(self, place=None): pattern = place or self.args.place @@ -408,7 +412,7 @@ async def add_place(self): raise UserError("missing place name. Set with -p or via env var $PLACE") if name in self.places: raise UserError(f"{name} already exists") - res = await self.call('org.labgrid.coordinator.add_place', name) + res = await self.call("org.labgrid.coordinator.add_place", name) if not res: raise ServerError(f"failed to add place {name}") return res @@ -426,7 +430,7 @@ async def del_place(self): raise UserError("missing place name. Set with -p or via env var $PLACE") if name not in self.places: raise UserError(f"{name} does not exist") - res = await self.call('org.labgrid.coordinator.del_place', name) + res = await self.call("org.labgrid.coordinator.del_place", name) if not res: raise ServerError(f"failed to delete place {name}") return res @@ -437,7 +441,7 @@ async def add_alias(self): alias = self.args.alias if alias in place.aliases: raise UserError(f"place {place.name} already has alias {alias}") - res = await self.call('org.labgrid.coordinator.add_place_alias', place.name, alias) + res = await self.call("org.labgrid.coordinator.add_place_alias", place.name, alias) if not res: raise ServerError(f"failed to add alias {alias} for place {place.name}") return res @@ -448,7 +452,7 @@ async def del_alias(self): alias = self.args.alias if alias not in place.aliases: raise UserError(f"place {place.name} has no alias {alias}") - res = await self.call('org.labgrid.coordinator.del_place_alias', place.name, alias) + res = await self.call("org.labgrid.coordinator.del_place_alias", place.name, alias) if not res: raise ServerError(f"failed to delete alias {alias} for place {place.name}") return res @@ -456,8 +460,8 @@ async def del_alias(self): async def set_comment(self): """Set the comment on a place""" place = self.get_place() - comment = ' '.join(self.args.comment) - res = await self.call('org.labgrid.coordinator.set_place_comment', place.name, comment) + comment = " ".join(self.args.comment) + res = await self.call("org.labgrid.coordinator.set_place_comment", place.name, comment) if not res: raise ServerError(f"failed to set comment {comment} for place {place.name}") return res @@ -468,7 +472,7 @@ async def set_tags(self): tags = {} for pair in self.args.tags: try: - k, v = pair.split('=') + k, v = pair.split("=") except ValueError: raise UserError(f"tag '{pair}' needs to match '='") if not TAG_KEY.match(k): @@ -476,11 +480,9 @@ async def set_tags(self): if not TAG_VAL.match(v): raise UserError(f"tag value '{v}' needs to match the rexex '{TAG_VAL.pattern}'") tags[k] = v - res = await self.call('org.labgrid.coordinator.set_place_tags', place.name, tags) + res = await self.call("org.labgrid.coordinator.set_place_tags", place.name, tags) if not res: - raise ServerError( - f"failed to set tags {' '.join(self.args.tags)} for place {place.name}" - ) + raise ServerError(f"failed to set tags {' '.join(self.args.tags)} for place {place.name}") return res async def add_match(self): @@ -491,13 +493,11 @@ async def add_match(self): raise UserError(f"can not change acquired place {place.name}") for pattern in self.args.patterns: if not 2 <= pattern.count("/") <= 3: - raise UserError( - f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')" - ) + raise UserError(f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')") if place.hasmatch(pattern.split("/")): print(f"pattern '{pattern}' exists, skipping", file=sys.stderr) continue - res = await self.call('org.labgrid.coordinator.add_place_match', place.name, pattern) + res = await self.call("org.labgrid.coordinator.add_place_match", place.name, pattern) if not res: raise ServerError(f"failed to add match {pattern} for place {place.name}") @@ -508,12 +508,10 @@ async def del_match(self): raise UserError(f"can not change acquired place {place.name}") for pattern in self.args.patterns: if not 2 <= pattern.count("/") <= 3: - raise UserError( - f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')" - ) + raise UserError(f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')") if not place.hasmatch(pattern.split("/")): print(f"pattern '{pattern}' not found, skipping", file=sys.stderr) - res = await self.call('org.labgrid.coordinator.del_place_match', place.name, pattern) + res = await self.call("org.labgrid.coordinator.del_place_match", place.name, pattern) if not res: raise ServerError(f"failed to delete match {pattern} for place {place.name}") @@ -530,11 +528,11 @@ async def add_named_match(self): raise UserError(f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')") if place.hasmatch(pattern.split("/")): raise UserError(f"pattern '{pattern}' exists") - if '*' in pattern: + if "*" in pattern: raise UserError(f"invalid pattern '{pattern}' ('*' not allowed for named matches)") if not name: raise UserError(f"invalid name '{name}'") - res = await self.call('org.labgrid.coordinator.add_place_match', place.name, pattern, name) + res = await self.call("org.labgrid.coordinator.add_place_match", place.name, pattern, name) if not res: raise ServerError(f"failed to add match {pattern} for place {place.name}") @@ -559,7 +557,7 @@ async def acquire(self): if not self.args.allow_unmatched: self.check_matches(place) - res = await self.call('org.labgrid.coordinator.acquire_place', place.name) + res = await self.call("org.labgrid.coordinator.acquire_place", place.name) if res: print(f"acquired place {place.name}") @@ -578,7 +576,9 @@ async def acquire(self): name = resource_name if match.rename: name = match.rename - print(f"Matching resource '{name}' ({exporter}/{group_name}/{resource.cls}/{resource_name}) already acquired by place '{resource.acquired}'") # pylint: disable=line-too-long + print( + f"Matching resource '{name}' ({exporter}/{group_name}/{resource.cls}/{resource_name}) already acquired by place '{resource.acquired}'" + ) # pylint: disable=line-too-long raise ServerError(f"failed to acquire place {place.name}") @@ -587,12 +587,14 @@ async def release(self): place = self.get_place() if not place.acquired: raise UserError(f"place {place.name} is not acquired") - _, user = place.acquired.split('/') + _, user = place.acquired.split("/") if user != self.getuser(): if not self.args.kick: - raise UserError(f"place {place.name} is acquired by a different user ({place.acquired}), use --kick if you are sure") # pylint: disable=line-too-long + raise UserError( + f"place {place.name} is acquired by a different user ({place.acquired}), use --kick if you are sure" + ) # pylint: disable=line-too-long print(f"warning: kicking user ({place.acquired})") - res = await self.call('org.labgrid.coordinator.release_place', place.name) + res = await self.call("org.labgrid.coordinator.release_place", place.name) if not res: raise ServerError(f"failed to release place {place.name}") @@ -602,7 +604,9 @@ async def release_from(self): """Release a place, but only if acquired by a specific user""" place = self.get_place() res = await self.call( - 'org.labgrid.coordinator.release_place_from', place.name, self.args.acquired, + "org.labgrid.coordinator.release_place_from", + place.name, + self.args.acquired, ) if not res: raise ServerError(f"failed to release place {place.name}") @@ -614,14 +618,12 @@ async def allow(self): place = self.get_place() if not place.acquired: raise UserError(f"place {place.name} is not acquired") - _, user = place.acquired.split('/') + _, user = place.acquired.split("/") if user != self.getuser(): - raise UserError( - f"place {place.name} is acquired by a different user ({place.acquired})" - ) - if '/' not in self.args.user: + raise UserError(f"place {place.name} is acquired by a different user ({place.acquired})") + if "/" not in self.args.user: raise UserError(f"user {self.args.user} must be in / format") - res = await self.call('org.labgrid.coordinator.allow_place', place.name, self.args.user) + res = await self.call("org.labgrid.coordinator.allow_place", place.name, self.args.user) if not res: raise ServerError(f"failed to allow {self.args.user} for place {place.name}") @@ -641,18 +643,18 @@ def get_target_resources(self, place): def get_target_config(self, place): config = {} - resources = config['resources'] = [] + resources = config["resources"] = [] for (name, _), resource in self.get_target_resources(place).items(): args = OrderedDict() if name != resource.cls: - args['name'] = name + args["name"] = name args.update(resource.args) resources.append({resource.cls: args}) return config def print_env(self): place = self.get_acquired_place() - env = {'targets': {place.name: self.get_target_config(place)}} + env = {"targets": {place.name: self.get_target_config(place)}} print(dump(env)) def _prepare_manager(self): @@ -756,7 +758,7 @@ def power(self): if delay is not None: drv.delay = delay res = getattr(drv, action)() - if action == 'get': + if action == "get": print(f"power{' ' + name if name else ''} for place {place.name} is {'on' if res else 'off'}") def digital_io(self): @@ -765,8 +767,7 @@ def digital_io(self): name = self.args.name target = self._get_target(place) from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput - from ..resource.remote import (NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, - NetworkHIDRelay) + from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay drv = None try: @@ -792,16 +793,17 @@ def digital_io(self): if not drv: raise UserError("target has no compatible resource available") - if action == 'get': + if action == "get": print(f"digital IO{' ' + name if name else ''} for place {place.name} is {'high' if drv.get() else 'low'}") - elif action == 'high': + elif action == "high": drv.set(True) - elif action == 'low': + elif action == "low": drv.set(False) async def _console(self, place, target, timeout, *, logfile=None, loop=False, listen_only=False): name = self.args.name from ..resource import NetworkSerialPort + resource = target.get_resource(NetworkSerialPort, name=name, wait_avail=False) # async await resources @@ -824,7 +826,7 @@ async def _console(self, place, target, timeout, *, logfile=None, loop=False, li # check for valid resources assert port is not None, "Port is not set" - call = ['microcom', '-s', str(resource.speed), '-t', f"{host}:{port}"] + call = ["microcom", "-s", str(resource.speed), "-t", f"{host}:{port}"] if listen_only: call.append("--listenonly") @@ -860,8 +862,9 @@ async def _console(self, place, target, timeout, *, logfile=None, loop=False, li async def console(self, place, target): while True: - res = await self._console(place, target, 10.0, logfile=self.args.logfile, - loop=self.args.loop, listen_only=self.args.listenonly) + res = await self._console( + place, target, 10.0, logfile=self.args.logfile, loop=self.args.loop, listen_only=self.args.listenonly + ) # place released if res == 255: break @@ -872,23 +875,24 @@ async def console(self, place, target): raise exc break await asyncio.sleep(1.0) + console.needs_target = True def dfu(self): place = self.get_acquired_place() target = self._get_target(place) name = self.args.name - if self.args.action == 'download' and not self.args.filename: - raise UserError('not enough arguments for dfu download') + if self.args.action == "download" and not self.args.filename: + raise UserError("not enough arguments for dfu download") drv = self._get_driver_or_new(target, "DFUDriver", activate=False, name=name) drv.dfu.timeout = self.args.wait target.activate(drv) - if self.args.action == 'download': + if self.args.action == "download": drv.download(self.args.altsetting, os.path.abspath(self.args.filename)) - if self.args.action == 'detach': + if self.args.action == "detach": drv.detach(self.args.altsetting) - if self.args.action == 'list': + if self.args.action == "list": drv.list() def fastboot(self): @@ -903,17 +907,17 @@ def fastboot(self): try: action = args[0] - if action == 'flash': + if action == "flash": drv.flash(args[1], os.path.abspath(args[2])) - elif action == 'boot': + elif action == "boot": args[1:] = map(os.path.abspath, args[1:]) drv.boot(args[1]) - elif action == 'oem' and args[1] == 'exec': - drv.run(' '.join(args[2:])) + elif action == "oem" and args[1] == "exec": + drv.run(" ".join(args[2:])) else: drv(*args) except IndexError: - raise UserError('not enough arguments for fastboot action') + raise UserError("not enough arguments for fastboot action") except subprocess.CalledProcessError as e: raise UserError(str(e)) @@ -929,24 +933,27 @@ def bootstrap(self): place = self.get_acquired_place() target = self._get_target(place) name = self.args.name - from ..resource.remote import (NetworkMXSUSBLoader, NetworkIMXUSBLoader, NetworkRKUSBLoader, - NetworkAlteraUSBBlaster) + from ..resource.remote import ( + NetworkMXSUSBLoader, + NetworkIMXUSBLoader, + NetworkRKUSBLoader, + NetworkAlteraUSBBlaster, + ) from ..driver import OpenOCDDriver + drv = None try: drv = target.get_driver("BootstrapProtocol", name=name) except NoDriverFoundError: for resource in target.resources: if isinstance(resource, NetworkIMXUSBLoader): - drv = self._get_driver_or_new(target, "IMXUSBDriver", activate=False, - name=name) + drv = self._get_driver_or_new(target, "IMXUSBDriver", activate=False, name=name) drv.loader.timeout = self.args.wait elif isinstance(resource, NetworkMXSUSBLoader): - drv = self._get_driver_or_new(target, "MXSUSBDriver", activate=False, - name=name) + drv = self._get_driver_or_new(target, "MXSUSBDriver", activate=False, name=name) drv.loader.timeout = self.args.wait elif isinstance(resource, NetworkAlteraUSBBlaster): - args = dict(arg.split('=', 1) for arg in self.args.bootstrap_args) + args = dict(arg.split("=", 1) for arg in self.args.bootstrap_args) try: drv = target.get_driver("OpenOCDDriver", activate=False, name=name) except NoDriverFoundError: @@ -981,7 +988,7 @@ def sd_mux(self): if not drv: raise UserError("target has no compatible resource available") - if action == 'get': + if action == "get": print(drv.get_mode()) else: try: @@ -993,10 +1000,10 @@ def usb_mux(self): place = self.get_acquired_place() name = self.args.name links = self.args.links - if links == 'off': + if links == "off": links = [] - elif links == 'host-dut+host-device': - links = ['host-dut', 'host-device'] + elif links == "host-dut+host-device": + links = ["host-dut", "host-device"] else: links = [links] target = self._get_target(place) @@ -1021,11 +1028,11 @@ def _get_ip(self, place): return resource.address matches = [] - for details in resource.extra.get('macs').values(): - ips = details.get('ips', []) + for details in resource.extra.get("macs").values(): + ips = details.get("ips", []) if not ips: continue - matches.append((details['timestamp'], ips)) + matches.append((details["timestamp"], ips)) matches.sort() newest = matches[-1][1] if len(ips) > 1: @@ -1042,13 +1049,14 @@ def _get_ssh(self): return drv except NoDriverFoundError: from ..resource import NetworkService + try: resource = target.get_resource(NetworkService, name=self.args.name) except NoResourceFoundError: ip = self._get_ip(place) if not ip: return - resource = NetworkService(target, address=str(ip), username='root') + resource = NetworkService(target, address=str(ip), username="root") drv = self._get_driver_or_new(target, "SSHDriver", name=resource.name) return drv @@ -1113,7 +1121,7 @@ def telnet(self): ip = self._get_ip(place) if not ip: return - args = ['telnet', str(ip)] + args = ["telnet", str(ip)] res = subprocess.call(args) if res: exc = InteractiveCommandError("telnet error") @@ -1129,6 +1137,7 @@ def video(self): from ..resource.httpvideostream import HTTPVideoStream from ..resource.udev import USBVideo from ..resource.remote import NetworkUSBVideo + drv = None try: drv = target.get_driver("VideoProtocol", name=name) @@ -1143,10 +1152,10 @@ def video(self): if not drv: raise UserError("target has no compatible resource available") - if quality == 'list': + if quality == "list": default, variants = drv.get_qualities() for name, caps in variants: - mark = '*' if default == name else ' ' + mark = "*" if default == name else " " print(f"{mark} {name:<10s} {caps:s}") else: res = drv.stream(quality, controls=controls) @@ -1175,10 +1184,10 @@ def _get_tmc(self): def tmc_command(self): drv = self._get_tmc() - command = ' '.join(self.args.command) + command = " ".join(self.args.command) if not command: raise UserError("no command given") - if '?' in command: + if "?" in command: result = drv.query(command) print(result) else: @@ -1186,7 +1195,7 @@ def tmc_command(self): def tmc_query(self): drv = self._get_tmc() - query = ' '.join(self.args.query) + query = " ".join(self.args.query) if not query: raise UserError("no query given") result = drv.query(query) @@ -1195,22 +1204,22 @@ def tmc_query(self): def tmc_screen(self): drv = self._get_tmc() action = self.args.action - if action in ['show', 'save']: + if action in ["show", "save"]: extension, data = drv.get_screenshot() - filename = 'tmc-screen_{0:%Y-%m-%d}_{0:%H:%M:%S}.{1}'.format(datetime.now(), extension) - with open(filename, 'wb') as f: + filename = "tmc-screen_{0:%Y-%m-%d}_{0:%H:%M:%S}.{1}".format(datetime.now(), extension) + with open(filename, "wb") as f: f.write(data) print(f"Saved as {filename}") - if action == 'show': - subprocess.call(['xdg-open', filename]) + if action == "show": + subprocess.call(["xdg-open", filename]) def tmc_channel(self): drv = self._get_tmc() channel = self.args.channel action = self.args.action - if action == 'info': + if action == "info": data = drv.get_channel_info(channel) - elif action == 'values': + elif action == "values": data = drv.get_channel_values(channel) else: raise ValueError(f"unknown action {action}") @@ -1234,11 +1243,13 @@ def write_files(self): if len(self.args.SOURCE) != 2: self.args.parser.error("the following arguments are required: SOURCE DEST") - drv.write_files([self.args.SOURCE[0]], self.args.SOURCE[1], - self.args.partition, target_is_directory=False) + drv.write_files( + [self.args.SOURCE[0]], self.args.SOURCE[1], self.args.partition, target_is_directory=False + ) else: - drv.write_files(self.args.SOURCE, self.args.target_directory, - self.args.partition, target_is_directory=True) + drv.write_files( + self.args.SOURCE, self.args.target_directory, self.args.partition, target_is_directory=True + ) except subprocess.CalledProcessError as e: raise UserError(f"could not copy files to network usb storage: {e}") except FileNotFoundError as e: @@ -1253,17 +1264,22 @@ def write_image(self): target.activate(drv) try: - drv.write_image(self.args.filename, partition=self.args.partition, skip=self.args.skip, - seek=self.args.seek, mode=self.args.write_mode) + drv.write_image( + self.args.filename, + partition=self.args.partition, + skip=self.args.skip, + seek=self.args.seek, + mode=self.args.write_mode, + ) except subprocess.CalledProcessError as e: raise UserError(f"could not write image to network usb storage: {e}") except FileNotFoundError as e: raise UserError(e) async def create_reservation(self): - filters = ' '.join(self.args.filters) + filters = " ".join(self.args.filters) prio = self.args.prio - res = await self.call('org.labgrid.coordinator.create_reservation', filters, prio=prio) + res = await self.call("org.labgrid.coordinator.create_reservation", filters, prio=prio) if res is None: raise ServerError("failed to create reservation") ((token, config),) = res.items() # we get a one-item dict @@ -1281,13 +1297,13 @@ async def create_reservation(self): async def cancel_reservation(self): token = self.args.token - res = await self.call('org.labgrid.coordinator.cancel_reservation', token) + res = await self.call("org.labgrid.coordinator.cancel_reservation", token) if not res: raise ServerError(f"failed to cancel reservation {token}") async def _wait_reservation(self, token, verbose=True): while True: - config = await self.call('org.labgrid.coordinator.poll_reservation', token) + config = await self.call("org.labgrid.coordinator.poll_reservation", token) if config is None: raise ServerError("reservation not found") config = filter_dict(config, Reservation, warn=True) @@ -1304,8 +1320,8 @@ async def wait_reservation(self): await self._wait_reservation(token) async def print_reservations(self): - reservations = await self.call('org.labgrid.coordinator.get_reservations') - for token, config in sorted(reservations.items(), key=lambda x: (-x[1]['prio'], x[1]['created'])): # pylint: disable=line-too-long + reservations = await self.call("org.labgrid.coordinator.get_reservations") + for token, config in sorted(reservations.items(), key=lambda x: (-x[1]["prio"], x[1]["created"])): # pylint: disable=line-too-long config = filter_dict(config, Reservation, warn=True) res = Reservation(token=token, **config) print(f"Reservation '{res.token}':") @@ -1340,6 +1356,7 @@ async def export(self, place, target): await asyncio.sleep(1.0) except GeneratorExit: print("Exiting...\n", file=sys.stderr) + export.needs_target = True def print_version(self): @@ -1357,8 +1374,8 @@ async def connected(session): # pylint: disable=unused-argument if not extra: extra = {} - extra['loop'] = loop - extra['connected'] = connected + extra["loop"] = loop + extra["connected"] = connected session = [None] @@ -1376,10 +1393,9 @@ def make(*args, **kwargs): # there is no other notification when the WAMP connection setup times out, # so we need to wait for one of these protocol futures to resolve - done, pending = loop.run_until_complete(asyncio.wait( - {protocol.is_open, protocol.is_closed}, - timeout=30, - return_when=asyncio.FIRST_COMPLETED)) + done, pending = loop.run_until_complete( + asyncio.wait({protocol.is_open, protocol.is_closed}, timeout=30, return_when=asyncio.FIRST_COMPLETED) + ) if protocol.is_closed in done: raise Error("connection closed during setup") if protocol.is_open in pending: @@ -1388,23 +1404,26 @@ def make(*args, **kwargs): loop.run_until_complete(ready.wait()) return session[0] + def find_role_by_place(config, place): for role, role_config in config.items(): resources, _ = target_factory.normalize_config(role_config) - remote_places = resources.get('RemotePlace', {}) + remote_places = resources.get("RemotePlace", {}) remote_place = remote_places.get(place) if remote_place: return role return None + def find_any_role_with_place(config): for role, role_config in config.items(): resources, _ = target_factory.normalize_config(role_config) - remote_places = resources.get('RemotePlace', {}) + remote_places = resources.get("RemotePlace", {}) for place in remote_places: return (role, place) return None, None + class LocalPort(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): if nargs is not None: @@ -1424,6 +1443,7 @@ def __call__(self, parser, namespace, value, option_string): v.append((local, remote)) setattr(namespace, self.dest, v) + class RemotePort(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): if nargs is not None: @@ -1459,411 +1479,371 @@ def main(): processwrapper.enable_logging() # Support both legacy variables and properly namespaced ones - place = os.environ.get('PLACE', None) - place = os.environ.get('LG_PLACE', place) - state = os.environ.get('STATE', None) - state = os.environ.get('LG_STATE', state) - initial_state = os.environ.get('LG_INITIAL_STATE', None) - token = os.environ.get('LG_TOKEN', None) + place = os.environ.get("PLACE", None) + place = os.environ.get("LG_PLACE", place) + state = os.environ.get("STATE", None) + state = os.environ.get("LG_STATE", state) + initial_state = os.environ.get("LG_INITIAL_STATE", None) + token = os.environ.get("LG_TOKEN", None) parser = argparse.ArgumentParser() parser.add_argument( - '-x', - '--crossbar', - metavar='URL', + "-x", + "--crossbar", + metavar="URL", type=str, - help="crossbar websocket URL (default: value from env variable LG_CROSSBAR, otherwise ws://127.0.0.1:20408/ws)" + help="crossbar websocket URL (default: value from env variable LG_CROSSBAR, otherwise ws://127.0.0.1:20408/ws)", ) + parser.add_argument("-c", "--config", type=str, default=os.environ.get("LG_ENV"), help="config file") + parser.add_argument("-p", "--place", type=str, default=place, help="place name/alias") + parser.add_argument("-s", "--state", type=str, default=state, help="strategy state to switch into before command") parser.add_argument( - '-c', - '--config', - type=str, - default=os.environ.get("LG_ENV"), - help="config file" - ) - parser.add_argument( - '-p', - '--place', - type=str, - default=place, - help="place name/alias" - ) - parser.add_argument( - '-s', - '--state', - type=str, - default=state, - help="strategy state to switch into before command" - ) - parser.add_argument( - '-i', - '--initial-state', + "-i", + "--initial-state", type=str, default=initial_state, - help="strategy state to force into before switching to desired state" - ) - parser.add_argument( - '-d', - '--debug', - action='store_true', - default=False, - help="enable debug mode (show python tracebacks)" - ) - parser.add_argument( - '-v', - '--verbose', - action='count', - default=0 + help="strategy state to force into before switching to desired state", ) parser.add_argument( - '-P', - '--proxy', - type=str, - help="proxy connections via given ssh host" + "-d", "--debug", action="store_true", default=False, help="enable debug mode (show python tracebacks)" ) + parser.add_argument("-v", "--verbose", action="count", default=0) + parser.add_argument("-P", "--proxy", type=str, help="proxy connections via given ssh host") subparsers = parser.add_subparsers( - dest='command', - title='available subcommands', + dest="command", + title="available subcommands", metavar="COMMAND", ) - subparser = subparsers.add_parser('help') + subparser = subparsers.add_parser("help") - subparser = subparsers.add_parser('complete') - subparser.add_argument('type', choices=['resources', 'places', 'matches', 'match-names']) + subparser = subparsers.add_parser("complete") + subparser.add_argument("type", choices=["resources", "places", "matches", "match-names"]) subparser.set_defaults(func=ClientSession.complete) - subparser = subparsers.add_parser('monitor', - help="monitor events from the coordinator") + subparser = subparsers.add_parser("monitor", help="monitor events from the coordinator") subparser.set_defaults(func=ClientSession.do_monitor) - subparser = subparsers.add_parser('resources', aliases=('r',), - help="list available resources") - subparser.add_argument('-a', '--acquired', action='store_true') - subparser.add_argument('-e', '--exporter') - subparser.add_argument('--sort-by-matched-place-change', action='store_true', - help="sort by matched place's changed date (oldest first) and show place and date") # pylint: disable=line-too-long - subparser.add_argument('match', nargs='?') + subparser = subparsers.add_parser("resources", aliases=("r",), help="list available resources") + subparser.add_argument("-a", "--acquired", action="store_true") + subparser.add_argument("-e", "--exporter") + subparser.add_argument( + "--sort-by-matched-place-change", + action="store_true", + help="sort by matched place's changed date (oldest first) and show place and date", + ) # pylint: disable=line-too-long + subparser.add_argument("match", nargs="?") subparser.set_defaults(func=ClientSession.print_resources) - subparser = subparsers.add_parser('places', aliases=('p',), - help="list available places") - subparser.add_argument('-a', '--acquired', action='store_true') - subparser.add_argument('--sort-last-changed', action='store_true', - help='sort by last changed date (oldest first)') + subparser = subparsers.add_parser("places", aliases=("p",), help="list available places") + subparser.add_argument("-a", "--acquired", action="store_true") + subparser.add_argument("--sort-last-changed", action="store_true", help="sort by last changed date (oldest first)") subparser.set_defaults(func=ClientSession.print_places) - subparser = subparsers.add_parser('who', - help="list acquired places by user") - subparser.add_argument('-e', '--show-exporters', action='store_true', - help='show exporters currently used by each place') + subparser = subparsers.add_parser("who", help="list acquired places by user") + subparser.add_argument( + "-e", "--show-exporters", action="store_true", help="show exporters currently used by each place" + ) subparser.set_defaults(func=ClientSession.print_who) - subparser = subparsers.add_parser('show', - help="show a place and related resources") + subparser = subparsers.add_parser("show", help="show a place and related resources") subparser.set_defaults(func=ClientSession.print_place) - subparser = subparsers.add_parser('create', help="add a new place") + subparser = subparsers.add_parser("create", help="add a new place") subparser.set_defaults(func=ClientSession.add_place) - subparser = subparsers.add_parser('delete', help="delete an existing place") + subparser = subparsers.add_parser("delete", help="delete an existing place") subparser.set_defaults(func=ClientSession.del_place) - subparser = subparsers.add_parser('add-alias', - help="add an alias to a place") - subparser.add_argument('alias') + subparser = subparsers.add_parser("add-alias", help="add an alias to a place") + subparser.add_argument("alias") subparser.set_defaults(func=ClientSession.add_alias) - subparser = subparsers.add_parser('del-alias', - help="delete an alias from a place") - subparser.add_argument('alias') + subparser = subparsers.add_parser("del-alias", help="delete an alias from a place") + subparser.add_argument("alias") subparser.set_defaults(func=ClientSession.del_alias) - subparser = subparsers.add_parser('set-comment', - help="update the place comment") - subparser.add_argument('comment', nargs='+') + subparser = subparsers.add_parser("set-comment", help="update the place comment") + subparser.add_argument("comment", nargs="+") subparser.set_defaults(func=ClientSession.set_comment) - subparser = subparsers.add_parser('set-tags', - help="update the place tags") - subparser.add_argument('tags', metavar='KEY=VALUE', nargs='+', - help="use an empty value for deletion") + subparser = subparsers.add_parser("set-tags", help="update the place tags") + subparser.add_argument("tags", metavar="KEY=VALUE", nargs="+", help="use an empty value for deletion") subparser.set_defaults(func=ClientSession.set_tags) - subparser = subparsers.add_parser('add-match', - help="add one (or multiple) match pattern(s) to a place") - subparser.add_argument('patterns', metavar='PATTERN', nargs='+') + subparser = subparsers.add_parser("add-match", help="add one (or multiple) match pattern(s) to a place") + subparser.add_argument("patterns", metavar="PATTERN", nargs="+") subparser.set_defaults(func=ClientSession.add_match) - subparser = subparsers.add_parser('del-match', - help="delete one (or multiple) match pattern(s) from a place") - subparser.add_argument('patterns', metavar='PATTERN', nargs='+') + subparser = subparsers.add_parser("del-match", help="delete one (or multiple) match pattern(s) from a place") + subparser.add_argument("patterns", metavar="PATTERN", nargs="+") subparser.set_defaults(func=ClientSession.del_match) - subparser = subparsers.add_parser('add-named-match', - help="add one match pattern with a name to a place") - subparser.add_argument('pattern', metavar='PATTERN') - subparser.add_argument('name', metavar='NAME') + subparser = subparsers.add_parser("add-named-match", help="add one match pattern with a name to a place") + subparser.add_argument("pattern", metavar="PATTERN") + subparser.add_argument("name", metavar="NAME") subparser.set_defaults(func=ClientSession.add_named_match) - subparser = subparsers.add_parser('acquire', - aliases=('lock',), - help="acquire a place") - subparser.add_argument('--allow-unmatched', action='store_true', - help="allow missing resources for matches when locking the place") + subparser = subparsers.add_parser("acquire", aliases=("lock",), help="acquire a place") + subparser.add_argument( + "--allow-unmatched", action="store_true", help="allow missing resources for matches when locking the place" + ) subparser.set_defaults(func=ClientSession.acquire) - subparser = subparsers.add_parser('release', - aliases=('unlock',), - help="release a place") - subparser.add_argument('-k', '--kick', action='store_true', - help="release a place even if it is acquired by a different user") + subparser = subparsers.add_parser("release", aliases=("unlock",), help="release a place") + subparser.add_argument( + "-k", "--kick", action="store_true", help="release a place even if it is acquired by a different user" + ) subparser.set_defaults(func=ClientSession.release) - subparser = subparsers.add_parser('release-from', - help="atomically release a place, but only if locked by a specific user") - subparser.add_argument("acquired", - metavar="HOST/USER", - help="User and host to match against when releasing") + subparser = subparsers.add_parser( + "release-from", help="atomically release a place, but only if locked by a specific user" + ) + subparser.add_argument("acquired", metavar="HOST/USER", help="User and host to match against when releasing") subparser.set_defaults(func=ClientSession.release_from) - subparser = subparsers.add_parser('allow', help="allow another user to access a place") - subparser.add_argument('user', help="/") + subparser = subparsers.add_parser("allow", help="allow another user to access a place") + subparser.add_argument("user", help="/") subparser.set_defaults(func=ClientSession.allow) - subparser = subparsers.add_parser('env', - help="generate a labgrid environment file for a place") + subparser = subparsers.add_parser("env", help="generate a labgrid environment file for a place") subparser.set_defaults(func=ClientSession.print_env) - subparser = subparsers.add_parser('power', - aliases=('pw',), - help="change (or get) a place's power status") - subparser.add_argument('action', choices=['on', 'off', 'cycle', 'get']) - subparser.add_argument('-t', '--delay', type=float, default=None, - help='wait time in seconds between off and on during cycle') - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("power", aliases=("pw",), help="change (or get) a place's power status") + subparser.add_argument("action", choices=["on", "off", "cycle", "get"]) + subparser.add_argument( + "-t", "--delay", type=float, default=None, help="wait time in seconds between off and on during cycle" + ) + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.power) - subparser = subparsers.add_parser('io', - help="change (or get) a digital IO status") - subparser.add_argument('action', choices=['high', 'low', 'get'], help="action") - subparser.add_argument('name', help="optional resource name", nargs='?') + subparser = subparsers.add_parser("io", help="change (or get) a digital IO status") + subparser.add_argument("action", choices=["high", "low", "get"], help="action") + subparser.add_argument("name", help="optional resource name", nargs="?") subparser.set_defaults(func=ClientSession.digital_io) - subparser = subparsers.add_parser('console', - aliases=('con',), - help="connect to the console") - subparser.add_argument('-l', '--loop', action='store_true', - help="keep trying to connect if the console is unavailable") - subparser.add_argument('-o', '--listenonly', action='store_true', - help="do not modify local terminal, do not send input from stdin") - subparser.add_argument('name', help="optional resource name", nargs='?') - subparser.add_argument('--logfile', metavar="FILE", help="Log output to FILE", default=None) + subparser = subparsers.add_parser("console", aliases=("con",), help="connect to the console") + subparser.add_argument( + "-l", "--loop", action="store_true", help="keep trying to connect if the console is unavailable" + ) + subparser.add_argument( + "-o", "--listenonly", action="store_true", help="do not modify local terminal, do not send input from stdin" + ) + subparser.add_argument("name", help="optional resource name", nargs="?") + subparser.add_argument("--logfile", metavar="FILE", help="Log output to FILE", default=None) subparser.set_defaults(func=ClientSession.console) - subparser = subparsers.add_parser('dfu', - help="communicate with device in DFU mode") - subparser.add_argument('action', choices=['download', 'detach', 'list'], help='action') - subparser.add_argument('altsetting', help='altsetting name or number (download, detach only)', - nargs='?') - subparser.add_argument('filename', help='file to write into device (download only)', nargs='?') - subparser.add_argument('--wait', type=float, default=10.0) - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("dfu", help="communicate with device in DFU mode") + subparser.add_argument("action", choices=["download", "detach", "list"], help="action") + subparser.add_argument("altsetting", help="altsetting name or number (download, detach only)", nargs="?") + subparser.add_argument("filename", help="file to write into device (download only)", nargs="?") + subparser.add_argument("--wait", type=float, default=10.0) + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.dfu) - subparser = subparsers.add_parser('fastboot', - help="run fastboot") - subparser.add_argument('fastboot_args', metavar='ARG', nargs=argparse.REMAINDER, - help='fastboot arguments') - subparser.add_argument('--wait', type=float, default=10.0) - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("fastboot", help="run fastboot") + subparser.add_argument("fastboot_args", metavar="ARG", nargs=argparse.REMAINDER, help="fastboot arguments") + subparser.add_argument("--wait", type=float, default=10.0) + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.fastboot) - subparser = subparsers.add_parser('flashscript', - help="run flash script") - subparser.add_argument('script', help="Flashing script") - subparser.add_argument('script_args', metavar='ARG', nargs=argparse.REMAINDER, - help='script arguments') - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("flashscript", help="run flash script") + subparser.add_argument("script", help="Flashing script") + subparser.add_argument("script_args", metavar="ARG", nargs=argparse.REMAINDER, help="script arguments") + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.flashscript) - subparser = subparsers.add_parser('bootstrap', - help="start a bootloader") - subparser.add_argument('-w', '--wait', type=float, default=10.0) - subparser.add_argument('filename', help='filename to boot on the target') - subparser.add_argument('bootstrap_args', metavar='ARG', nargs=argparse.REMAINDER, - help='extra bootstrap arguments') - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("bootstrap", help="start a bootloader") + subparser.add_argument("-w", "--wait", type=float, default=10.0) + subparser.add_argument("filename", help="filename to boot on the target") + subparser.add_argument("bootstrap_args", metavar="ARG", nargs=argparse.REMAINDER, help="extra bootstrap arguments") + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.bootstrap) - subparser = subparsers.add_parser('sd-mux', - help="switch USB SD Muxer or get current mode") - subparser.add_argument('action', choices=['dut', 'host', 'off', 'client', 'get']) - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("sd-mux", help="switch USB SD Muxer or get current mode") + subparser.add_argument("action", choices=["dut", "host", "off", "client", "get"]) + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.sd_mux) - subparser = subparsers.add_parser('usb-mux', - help="switch USB Muxer") - subparser.add_argument('links', choices=['off', 'dut-device', 'host-dut', 'host-device', 'host-dut+host-device']) - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("usb-mux", help="switch USB Muxer") + subparser.add_argument("links", choices=["off", "dut-device", "host-dut", "host-device", "host-dut+host-device"]) + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.usb_mux) - subparser = subparsers.add_parser('ssh', - help="connect via ssh (with optional arguments)", - epilog="Additional arguments are passed to the ssh subprocess.") - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser( + "ssh", + help="connect via ssh (with optional arguments)", + epilog="Additional arguments are passed to the ssh subprocess.", + ) + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.ssh) - subparser = subparsers.add_parser('scp', - help="transfer file via scp") - subparser.add_argument('--name', '-n', help="optional resource name") - subparser.add_argument('src', help='source path (use :dir/file for remote side)') - subparser.add_argument('dst', help='destination path (use :dir/file for remote side)') + subparser = subparsers.add_parser("scp", help="transfer file via scp") + subparser.add_argument("--name", "-n", help="optional resource name") + subparser.add_argument("src", help="source path (use :dir/file for remote side)") + subparser.add_argument("dst", help="destination path (use :dir/file for remote side)") subparser.set_defaults(func=ClientSession.scp) - subparser = subparsers.add_parser('rsync', - help="transfer files via rsync", - epilog="Additional arguments are passed to the rsync subprocess.") - subparser.add_argument('--name', '-n', help="optional resource name") - subparser.add_argument('src', help='source path (use :dir/file for remote side)') - subparser.add_argument('dst', help='destination path (use :dir/file for remote side)') + subparser = subparsers.add_parser( + "rsync", help="transfer files via rsync", epilog="Additional arguments are passed to the rsync subprocess." + ) + subparser.add_argument("--name", "-n", help="optional resource name") + subparser.add_argument("src", help="source path (use :dir/file for remote side)") + subparser.add_argument("dst", help="destination path (use :dir/file for remote side)") subparser.set_defaults(func=ClientSession.rsync) - subparser = subparsers.add_parser('sshfs', - help="mount via sshfs (blocking)") - subparser.add_argument('--name', '-n', help="optional resource name") - subparser.add_argument('path', help='remote path on the target') - subparser.add_argument('mountpoint', help='local path') + subparser = subparsers.add_parser("sshfs", help="mount via sshfs (blocking)") + subparser.add_argument("--name", "-n", help="optional resource name") + subparser.add_argument("path", help="remote path on the target") + subparser.add_argument("mountpoint", help="local path") subparser.set_defaults(func=ClientSession.sshfs) - subparser = subparsers.add_parser('forward', - help="forward local port to remote target") - subparser.add_argument('--name', '-n', help="optional resource name") - subparser.add_argument("--local", "-L", metavar="[LOCAL:]REMOTE", - action=LocalPort, - help="Forward local port LOCAL to remote port REMOTE. If LOCAL is unspecified, an arbitrary port will be chosen") - subparser.add_argument("--remote", "-R", metavar="REMOTE:LOCAL", - action=RemotePort, - help="Forward remote port REMOTE to local port LOCAL") + subparser = subparsers.add_parser("forward", help="forward local port to remote target") + subparser.add_argument("--name", "-n", help="optional resource name") + subparser.add_argument( + "--local", + "-L", + metavar="[LOCAL:]REMOTE", + action=LocalPort, + help="Forward local port LOCAL to remote port REMOTE. If LOCAL is unspecified, an arbitrary port will be chosen", + ) + subparser.add_argument( + "--remote", + "-R", + metavar="REMOTE:LOCAL", + action=RemotePort, + help="Forward remote port REMOTE to local port LOCAL", + ) subparser.set_defaults(func=ClientSession.forward) - subparser = subparsers.add_parser('telnet', - help="connect via telnet") + subparser = subparsers.add_parser("telnet", help="connect via telnet") subparser.set_defaults(func=ClientSession.telnet) - subparser = subparsers.add_parser('video', - help="start a video stream") - subparser.add_argument('-q', '--quality', type=str, - help="select a video quality (use 'list' to show options)") - subparser.add_argument('-c', '--controls', type=str, - help="configure v4l controls (such as 'focus_auto=0,focus_absolute=40')") - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("video", help="start a video stream") + subparser.add_argument("-q", "--quality", type=str, help="select a video quality (use 'list' to show options)") + subparser.add_argument( + "-c", "--controls", type=str, help="configure v4l controls (such as 'focus_auto=0,focus_absolute=40')" + ) + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.video) - subparser = subparsers.add_parser('audio', help="start a audio stream") - subparser.add_argument('--name', '-n', help="optional resource name") + subparser = subparsers.add_parser("audio", help="start a audio stream") + subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.audio) - tmc_parser = subparsers.add_parser('tmc', help="control a USB TMC device") - tmc_parser.add_argument('--name', '-n', help="optional resource name") + tmc_parser = subparsers.add_parser("tmc", help="control a USB TMC device") + tmc_parser.add_argument("--name", "-n", help="optional resource name") tmc_parser.set_defaults(func=lambda _: tmc_parser.print_help(file=sys.stderr)) tmc_subparsers = tmc_parser.add_subparsers( - dest='subcommand', - title='available subcommands', + dest="subcommand", + title="available subcommands", metavar="SUBCOMMAND", ) - tmc_subparser = tmc_subparsers.add_parser('cmd', - aliases=('c',), - help="execute raw command") - tmc_subparser.add_argument('command', nargs='+') + tmc_subparser = tmc_subparsers.add_parser("cmd", aliases=("c",), help="execute raw command") + tmc_subparser.add_argument("command", nargs="+") tmc_subparser.set_defaults(func=ClientSession.tmc_command) - tmc_subparser = tmc_subparsers.add_parser('query', - aliases=('q',), - help="execute raw query") - tmc_subparser.add_argument('query', nargs='+') + tmc_subparser = tmc_subparsers.add_parser("query", aliases=("q",), help="execute raw query") + tmc_subparser.add_argument("query", nargs="+") tmc_subparser.set_defaults(func=ClientSession.tmc_query) - tmc_subparser = tmc_subparsers.add_parser('screen', help="show or save a screenshot") - tmc_subparser.add_argument('action', choices=['show', 'save']) + tmc_subparser = tmc_subparsers.add_parser("screen", help="show or save a screenshot") + tmc_subparser.add_argument("action", choices=["show", "save"]) tmc_subparser.set_defaults(func=ClientSession.tmc_screen) - tmc_subparser = tmc_subparsers.add_parser('channel', help="use a channel") - tmc_subparser.add_argument('channel', type=int) - tmc_subparser.add_argument('action', choices=['info', 'values']) + tmc_subparser = tmc_subparsers.add_parser("channel", help="use a channel") + tmc_subparser.add_argument("channel", type=int) + tmc_subparser.add_argument("action", choices=["info", "values"]) tmc_subparser.set_defaults(func=ClientSession.tmc_channel) - subparser = subparsers.add_parser('write-files', help="copy files onto mass storage device", - usage="%(prog)s [OPTION]... -T SOURCE DEST\n" + - " %(prog)s [OPTION]... [-t DIRECTORY] SOURCE...") - subparser.add_argument('-w', '--wait', type=float, default=10.0, - help='storage poll timeout in seconds') - subparser.add_argument('-p', '--partition', type=int, choices=range(0, 256), - metavar='0-255', default=1, - help='partition number to mount or 0 to mount whole disk (default: %(default)s)') + subparser = subparsers.add_parser( + "write-files", + help="copy files onto mass storage device", + usage="%(prog)s [OPTION]... -T SOURCE DEST\n" + " %(prog)s [OPTION]... [-t DIRECTORY] SOURCE...", + ) + subparser.add_argument("-w", "--wait", type=float, default=10.0, help="storage poll timeout in seconds") + subparser.add_argument( + "-p", + "--partition", + type=int, + choices=range(0, 256), + metavar="0-255", + default=1, + help="partition number to mount or 0 to mount whole disk (default: %(default)s)", + ) group = subparser.add_mutually_exclusive_group() - group.add_argument('-t', '--target-directory', type=pathlib.PurePath, metavar='DIRECTORY', - default=pathlib.PurePath("/"), - help='copy all SOURCE files into DIRECTORY (default: partition root)') - group.add_argument('-T', action='store_true', dest='rename', - help='copy SOURCE file and rename to DEST') - subparser.add_argument('--name', '-n', help="optional resource name") - subparser.add_argument('SOURCE', type=pathlib.PurePath, nargs='+', - help='source file(s) to copy') - subparser.add_argument('DEST', type=pathlib.PurePath, nargs='?', - help='destination file name for SOURCE') + group.add_argument( + "-t", + "--target-directory", + type=pathlib.PurePath, + metavar="DIRECTORY", + default=pathlib.PurePath("/"), + help="copy all SOURCE files into DIRECTORY (default: partition root)", + ) + group.add_argument("-T", action="store_true", dest="rename", help="copy SOURCE file and rename to DEST") + subparser.add_argument("--name", "-n", help="optional resource name") + subparser.add_argument("SOURCE", type=pathlib.PurePath, nargs="+", help="source file(s) to copy") + subparser.add_argument("DEST", type=pathlib.PurePath, nargs="?", help="destination file name for SOURCE") subparser.set_defaults(func=ClientSession.write_files, parser=subparser) - subparser = subparsers.add_parser('write-image', help="write an image onto mass storage") - subparser.add_argument('-w', '--wait', type=float, default=10.0) - subparser.add_argument('-p', '--partition', type=int, help="partition number to write to") - subparser.add_argument('--skip', type=int, default=0, - help="skip n 512-sized blocks at start of input") - subparser.add_argument('--seek', type=int, default=0, - help="skip n 512-sized blocks at start of output") - subparser.add_argument('--mode', dest='write_mode', - type=Mode, choices=Mode, default=Mode.DD, - help="Choose tool for writing images (default: %(default)s)") - subparser.add_argument('--name', '-n', help="optional resource name") - subparser.add_argument('filename', help='filename to boot on the target') + subparser = subparsers.add_parser("write-image", help="write an image onto mass storage") + subparser.add_argument("-w", "--wait", type=float, default=10.0) + subparser.add_argument("-p", "--partition", type=int, help="partition number to write to") + subparser.add_argument("--skip", type=int, default=0, help="skip n 512-sized blocks at start of input") + subparser.add_argument("--seek", type=int, default=0, help="skip n 512-sized blocks at start of output") + subparser.add_argument( + "--mode", + dest="write_mode", + type=Mode, + choices=Mode, + default=Mode.DD, + help="Choose tool for writing images (default: %(default)s)", + ) + subparser.add_argument("--name", "-n", help="optional resource name") + subparser.add_argument("filename", help="filename to boot on the target") subparser.set_defaults(func=ClientSession.write_image) - subparser = subparsers.add_parser('reserve', help="create a reservation") - subparser.add_argument('--wait', action='store_true', - help="wait until the reservation is allocated") - subparser.add_argument('--shell', action='store_true', - help="format output as shell variables") - subparser.add_argument('--prio', type=float, default=0.0, - help="priority relative to other reservations (default 0)") - subparser.add_argument('filters', metavar='KEY=VALUE', nargs='+', - help="required tags") + subparser = subparsers.add_parser("reserve", help="create a reservation") + subparser.add_argument("--wait", action="store_true", help="wait until the reservation is allocated") + subparser.add_argument("--shell", action="store_true", help="format output as shell variables") + subparser.add_argument( + "--prio", type=float, default=0.0, help="priority relative to other reservations (default 0)" + ) + subparser.add_argument("filters", metavar="KEY=VALUE", nargs="+", help="required tags") subparser.set_defaults(func=ClientSession.create_reservation) - subparser = subparsers.add_parser('cancel-reservation', help="cancel a reservation") - subparser.add_argument('token', type=str, default=token, nargs='?' if token else None) + subparser = subparsers.add_parser("cancel-reservation", help="cancel a reservation") + subparser.add_argument("token", type=str, default=token, nargs="?" if token else None) subparser.set_defaults(func=ClientSession.cancel_reservation) - subparser = subparsers.add_parser('wait', help="wait for a reservation to be allocated") - subparser.add_argument('token', type=str, default=token, nargs='?' if token else None) + subparser = subparsers.add_parser("wait", help="wait for a reservation to be allocated") + subparser.add_argument("token", type=str, default=token, nargs="?" if token else None) subparser.set_defaults(func=ClientSession.wait_reservation) - subparser = subparsers.add_parser('reservations', help="list current reservations") + subparser = subparsers.add_parser("reservations", help="list current reservations") subparser.set_defaults(func=ClientSession.print_reservations) - subparser = subparsers.add_parser('export', help="export driver information to a file (needs environment with drivers)") - subparser.add_argument('--format', dest='format', - type=ExportFormat, choices=ExportFormat, default=ExportFormat.SHELL_EXPORT, - help="output format (default: %(default)s)") - subparser.add_argument('filename', help='output filename') + subparser = subparsers.add_parser( + "export", help="export driver information to a file (needs environment with drivers)" + ) + subparser.add_argument( + "--format", + dest="format", + type=ExportFormat, + choices=ExportFormat, + default=ExportFormat.SHELL_EXPORT, + help="output format (default: %(default)s)", + ) + subparser.add_argument("filename", help="output filename") subparser.set_defaults(func=ClientSession.export) - subparser = subparsers.add_parser('version', help="show version") + subparser = subparsers.add_parser("version", help="show version") subparser.set_defaults(func=ClientSession.print_version) # make any leftover arguments available for some commands args, leftover = parser.parse_known_args() - if args.command not in ['ssh', 'rsync', 'forward']: + if args.command not in ["ssh", "rsync", "forward"]: args = parser.parse_args() else: args.leftover = leftover @@ -1890,9 +1870,9 @@ def main(): env = Environment(config_file=args.config) role = None - if args.command != 'reserve' and env and env.config.get_targets(): + if args.command != "reserve" and env and env.config.get_targets(): if args.place: - if not args.place.startswith('+'): + if not args.place.startswith("+"): role = find_role_by_place(env.config.get_targets(), args.place) if not role: print(f"RemotePlace {args.place} not found in configuration file", file=sys.stderr) @@ -1906,36 +1886,35 @@ def main(): print(f"Selected role {role} and place {args.place} from configuration file") extra = { - 'args': args, - 'env': env, - 'role': role, - 'prog': parser.prog, + "args": args, + "env": env, + "role": role, + "prog": parser.prog, } - if args.command and args.command != 'help': + if args.command and args.command != "help": exitcode = 0 try: signal.signal(signal.SIGTERM, lambda *_: sys.exit(0)) try: - crossbar_url = args.crossbar or env.config.get_option('crossbar_url') + crossbar_url = args.crossbar or env.config.get_option("crossbar_url") except (AttributeError, KeyError): # in case of no env or not set, use LG_CROSSBAR env variable or default crossbar_url = os.environ.get("LG_CROSSBAR", "ws://127.0.0.1:20408/ws") try: - crossbar_realm = env.config.get_option('crossbar_realm') + crossbar_realm = env.config.get_option("crossbar_realm") except (AttributeError, KeyError): # in case of no env, use LG_CROSSBAR_REALM env variable or default crossbar_realm = os.environ.get("LG_CROSSBAR_REALM", "realm1") - logging.debug('Starting session with "%s", realm: "%s"', crossbar_url, - crossbar_realm) + logging.debug('Starting session with "%s", realm: "%s"', crossbar_url, crossbar_realm) session = start_session(crossbar_url, crossbar_realm, extra) try: if asyncio.iscoroutinefunction(args.func): - if getattr(args.func, 'needs_target', False): + if getattr(args.func, "needs_target", False): place = session.get_acquired_place() target = session._get_target(place) coro = args.func(session, place, target) @@ -1958,11 +1937,20 @@ def main(): for res in e.found: print(f"{res.name}", file=sys.stderr) else: - print("This may be caused by disconnected exporter or wrong match entries.\nYou can use the 'show' command to review all matching resources.", file=sys.stderr) # pylint: disable=line-too-long + print( + "This may be caused by disconnected exporter or wrong match entries.\nYou can use the 'show' command to review all matching resources.", + file=sys.stderr, + ) # pylint: disable=line-too-long elif isinstance(e, NoDriverFoundError): - print("This is likely caused by an error or missing driver in the environment configuration.", file=sys.stderr) # pylint: disable=line-too-long + print( + "This is likely caused by an error or missing driver in the environment configuration.", + file=sys.stderr, + ) # pylint: disable=line-too-long elif isinstance(e, InvalidConfigError): - print("This is likely caused by an error in the environment configuration or invalid\nresource information provided by the coordinator.", file=sys.stderr) # pylint: disable=line-too-long + print( + "This is likely caused by an error in the environment configuration or invalid\nresource information provided by the coordinator.", + file=sys.stderr, + ) # pylint: disable=line-too-long exitcode = 1 except ConnectionError as e: diff --git a/labgrid/remote/common.py b/labgrid/remote/common.py index 142455eb2..2ea1d2f1a 100644 --- a/labgrid/remote/common.py +++ b/labgrid/remote/common.py @@ -10,15 +10,15 @@ import attr __all__ = [ - 'TAG_KEY', - 'TAG_VAL', - 'ResourceEntry', - 'ResourceMatch', - 'Place', - 'ReservationState', - 'Reservation', - 'enable_tcp_nodelay', - 'monkey_patch_max_msg_payload_size_ws_option', + "TAG_KEY", + "TAG_VAL", + "ResourceEntry", + "ResourceMatch", + "Place", + "ReservationState", + "Reservation", + "enable_tcp_nodelay", + "monkey_patch_max_msg_payload_size_ws_option", ] TAG_KEY = re.compile(r"[a-z][a-z0-9_]+") @@ -30,59 +30,59 @@ class ResourceEntry: data = attr.ib() # cls, params def __attrs_post_init__(self): - self.data.setdefault('acquired', None) - self.data.setdefault('avail', False) + self.data.setdefault("acquired", None) + self.data.setdefault("avail", False) @property def acquired(self): - return self.data['acquired'] + return self.data["acquired"] @property def avail(self): - return self.data['avail'] + return self.data["avail"] @property def cls(self): - return self.data['cls'] + return self.data["cls"] @property def params(self): - return self.data['params'] + return self.data["params"] @property def args(self): """arguments for resource construction""" - args = self.data['params'].copy() - args.pop('extra', None) + args = self.data["params"].copy() + args.pop("extra", None) return args @property def extra(self): """extra resource information""" - return self.data['params'].get('extra', {}) + return self.data["params"].get("extra", {}) def asdict(self): return { - 'cls': self.cls, - 'params': self.params, - 'acquired': self.acquired, - 'avail': self.avail, + "cls": self.cls, + "params": self.params, + "acquired": self.acquired, + "avail": self.avail, } def update(self, data): """apply updated information from the exporter on the coordinator""" data = data.copy() - data.setdefault('acquired', None) - data.setdefault('avail', False) + data.setdefault("acquired", None) + data.setdefault("avail", False) self.data = data def acquire(self, place_name): - assert self.data['acquired'] is None - self.data['acquired'] = place_name + assert self.data["acquired"] is None + self.data["acquired"] = place_name def release(self): # ignore repeated releases - self.data['acquired'] = None + self.data["acquired"] = None @attr.s(eq=True, repr=False, str=False) @@ -99,9 +99,7 @@ class ResourceMatch: @classmethod def fromstr(cls, pattern): if not 2 <= pattern.count("/") <= 3: - raise ValueError( - f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')" - ) + raise ValueError(f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')") return cls(*pattern.split("/")) def __repr__(self): @@ -160,30 +158,30 @@ def asdict(self): acquired_resources.append(resource.path) return { - 'aliases': list(self.aliases), - 'comment': self.comment, - 'tags': self.tags, - 'matches': [attr.asdict(x) for x in self.matches], - 'acquired': self.acquired, - 'acquired_resources': acquired_resources, - 'allowed': list(self.allowed), - 'created': self.created, - 'changed': self.changed, - 'reservation': self.reservation, + "aliases": list(self.aliases), + "comment": self.comment, + "tags": self.tags, + "matches": [attr.asdict(x) for x in self.matches], + "acquired": self.acquired, + "acquired_resources": acquired_resources, + "allowed": list(self.allowed), + "created": self.created, + "changed": self.changed, + "reservation": self.reservation, } def update(self, config): fields = attr.fields_dict(type(self)) for k, v in config.items(): assert k in fields - if k == 'name': + if k == "name": # we cannot rename places assert v == self.name continue setattr(self, k, v) def show(self, level=0): - indent = ' ' * level + indent = " " * level if self.aliases: print(indent + f"aliases: {', '.join(sorted(self.aliases))}") if self.comment: @@ -240,7 +238,6 @@ def unmatched(self, resource_paths): if not any([match.ismatch(resource) for resource in resource_paths]): return match - def touch(self): self.changed = time.time() @@ -256,12 +253,14 @@ class ReservationState(enum.Enum): @attr.s(eq=False) class Reservation: owner = attr.ib(validator=attr.validators.instance_of(str)) - token = attr.ib(default=attr.Factory( - lambda: ''.join(random.choice(string.ascii_uppercase+string.digits) for i in range(10)))) + token = attr.ib( + default=attr.Factory(lambda: "".join(random.choice(string.ascii_uppercase + string.digits) for i in range(10))) + ) state = attr.ib( - default='waiting', + default="waiting", converter=lambda x: x if isinstance(x, ReservationState) else ReservationState[x], - validator=attr.validators.instance_of(ReservationState)) + validator=attr.validators.instance_of(ReservationState), + ) prio = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) # a dictionary of name -> filter dicts filters = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict)) @@ -272,13 +271,13 @@ class Reservation: def asdict(self): return { - 'owner': self.owner, - 'state': self.state.name, - 'prio': self.prio, - 'filters': self.filters, - 'allocations': self.allocations, - 'created': self.created, - 'timeout': self.timeout, + "owner": self.owner, + "state": self.state.name, + "prio": self.prio, + "filters": self.filters, + "allocations": self.allocations, + "created": self.created, + "timeout": self.timeout, } def refresh(self, delta=60): @@ -289,7 +288,7 @@ def expired(self): return self.timeout < time.time() def show(self, level=0): - indent = ' ' * level + indent = " " * level print(indent + f"owner: {self.owner}") print(indent + f"token: {self.token}") print(indent + f"state: {self.state.name}") @@ -311,7 +310,7 @@ def enable_tcp_nodelay(session): asyncio/autobahn does not set TCP_NODELAY by default, so we need to do it like this for now. """ - s = session._transport.transport.get_extra_info('socket') + s = session._transport.transport.get_extra_info("socket") s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) diff --git a/labgrid/remote/config.py b/labgrid/remote/config.py index e519b6ae2..fa5e237cb 100644 --- a/labgrid/remote/config.py +++ b/labgrid/remote/config.py @@ -12,24 +12,20 @@ @attr.s(eq=False) class ResourceConfig: filename = attr.ib(validator=attr.validators.instance_of(str)) - template_env = attr.ib( - default=attr.Factory(dict), validator=attr.validators.instance_of(dict) - ) + template_env = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict)) def __attrs_post_init__(self): env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.path.dirname(self.filename)), - line_statement_prefix='#', - line_comment_prefix='##', + line_statement_prefix="#", + line_comment_prefix="##", ) try: with open(self.filename) as file: template = env.from_string(file.read()) except FileNotFoundError: - raise NoConfigFoundError( - f"{self.filename} could not be found" - ) + raise NoConfigFoundError(f"{self.filename} could not be found") rendered = template.render(self.template_env) - pprint(('rendered', rendered)) + pprint(("rendered", rendered)) self.data = load(rendered) - pprint(('loaded', self.data)) + pprint(("loaded", self.data)) diff --git a/labgrid/remote/coordinator.py b/labgrid/remote/coordinator.py index 29b45d83b..e3ba8210f 100644 --- a/labgrid/remote/coordinator.py +++ b/labgrid/remote/coordinator.py @@ -1,4 +1,5 @@ """The coordinator module coordinates exported resources and clients accessing them.""" + # pylint: disable=no-member,unused-argument import asyncio import sys @@ -31,6 +32,7 @@ class Action(Enum): @attr.s(init=False, eq=False) class RemoteSession: """class encapsulating a session, used by ExporterSession and ClientSession""" + coordinator = attr.ib() session = attr.ib() authid = attr.ib() @@ -44,13 +46,14 @@ def key(self): @property def name(self): """Name of the session""" - return self.authid.split('/', 1)[1] + return self.authid.split("/", 1)[1] @attr.s(eq=False) class ExporterSession(RemoteSession): """An ExporterSession is opened for each Exporter connecting to the coordinator, allowing the Exporter to get and set resources""" + groups = attr.ib(default=attr.Factory(dict), init=False) def set_resource(self, groupname, resourcename, resourcedata): @@ -61,8 +64,7 @@ def set_resource(self, groupname, resourcename, resourcedata): new = old elif resourcedata and not old: new = group[resourcename] = ResourceImport( - resourcedata, - path=(self.name, groupname, resourcedata['cls'], resourcename) + resourcedata, path=(self.name, groupname, resourcedata["cls"], resourcename) ) elif not resourcedata and old: new = None @@ -72,8 +74,7 @@ def set_resource(self, groupname, resourcename, resourcedata): new = None self.coordinator.publish( - 'org.labgrid.coordinator.resource_changed', self.name, - groupname, resourcename, new.asdict() if new else {} + "org.labgrid.coordinator.resource_changed", self.name, groupname, resourcename, new.asdict() if new else {} ) if old and new: @@ -107,6 +108,7 @@ class ResourceImport(ResourceEntry): The ResourceEntry attributes contain the information for the client. """ + path = attr.ib(kw_only=True, validator=attr.validators.instance_of(tuple)) @@ -115,8 +117,10 @@ def locked(func): async def wrapper(self, *args, **kwargs): async with self.lock: return await func(self, *args, **kwargs) + return wrapper + class CoordinatorComponent(ApplicationSession): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -143,93 +147,60 @@ async def onConnect(self): @locked async def onJoin(self, details): - await self.subscribe(self.on_session_join, 'wamp.session.on_join') - await self.subscribe( - self.on_session_leave, 'wamp.session.on_leave' - ) + await self.subscribe(self.on_session_join, "wamp.session.on_join") + await self.subscribe(self.on_session_leave, "wamp.session.on_leave") await self.register( - self.attach, - 'org.labgrid.coordinator.attach', - options=RegisterOptions(details_arg='details') + self.attach, "org.labgrid.coordinator.attach", options=RegisterOptions(details_arg="details") ) # resources await self.register( - self.set_resource, - 'org.labgrid.coordinator.set_resource', - options=RegisterOptions(details_arg='details') - ) - await self.register( - self.get_resources, - 'org.labgrid.coordinator.get_resources' + self.set_resource, "org.labgrid.coordinator.set_resource", options=RegisterOptions(details_arg="details") ) + await self.register(self.get_resources, "org.labgrid.coordinator.get_resources") # places + await self.register(self.add_place, "org.labgrid.coordinator.add_place") + await self.register(self.del_place, "org.labgrid.coordinator.del_place") + await self.register(self.add_place_alias, "org.labgrid.coordinator.add_place_alias") + await self.register(self.del_place_alias, "org.labgrid.coordinator.del_place_alias") + await self.register(self.set_place_tags, "org.labgrid.coordinator.set_place_tags") + await self.register(self.set_place_comment, "org.labgrid.coordinator.set_place_comment") + await self.register(self.add_place_match, "org.labgrid.coordinator.add_place_match") + await self.register(self.del_place_match, "org.labgrid.coordinator.del_place_match") await self.register( - self.add_place, 'org.labgrid.coordinator.add_place' - ) - await self.register( - self.del_place, 'org.labgrid.coordinator.del_place' - ) - await self.register( - self.add_place_alias, 'org.labgrid.coordinator.add_place_alias' - ) - await self.register( - self.del_place_alias, 'org.labgrid.coordinator.del_place_alias' - ) - await self.register( - self.set_place_tags, 'org.labgrid.coordinator.set_place_tags' + self.acquire_place, "org.labgrid.coordinator.acquire_place", options=RegisterOptions(details_arg="details") ) await self.register( - self.set_place_comment, 'org.labgrid.coordinator.set_place_comment' - ) - await self.register( - self.add_place_match, 'org.labgrid.coordinator.add_place_match' - ) - await self.register( - self.del_place_match, 'org.labgrid.coordinator.del_place_match' - ) - await self.register( - self.acquire_place, - 'org.labgrid.coordinator.acquire_place', - options=RegisterOptions(details_arg='details') - ) - await self.register( - self.release_place, - 'org.labgrid.coordinator.release_place', - options=RegisterOptions(details_arg='details') + self.release_place, "org.labgrid.coordinator.release_place", options=RegisterOptions(details_arg="details") ) await self.register( self.release_place_from, - 'org.labgrid.coordinator.release_place_from', - options=RegisterOptions(details_arg='details') + "org.labgrid.coordinator.release_place_from", + options=RegisterOptions(details_arg="details"), ) await self.register( - self.allow_place, - 'org.labgrid.coordinator.allow_place', - options=RegisterOptions(details_arg='details') - ) - await self.register( - self.get_places, 'org.labgrid.coordinator.get_places' + self.allow_place, "org.labgrid.coordinator.allow_place", options=RegisterOptions(details_arg="details") ) + await self.register(self.get_places, "org.labgrid.coordinator.get_places") # reservations await self.register( self.create_reservation, - 'org.labgrid.coordinator.create_reservation', - options=RegisterOptions(details_arg='details'), + "org.labgrid.coordinator.create_reservation", + options=RegisterOptions(details_arg="details"), ) await self.register( self.cancel_reservation, - 'org.labgrid.coordinator.cancel_reservation', + "org.labgrid.coordinator.cancel_reservation", ) await self.register( self.poll_reservation, - 'org.labgrid.coordinator.poll_reservation', + "org.labgrid.coordinator.poll_reservation", ) await self.register( self.get_reservations, - 'org.labgrid.coordinator.get_reservations', + "org.labgrid.coordinator.get_reservations", ) self.poll_task = asyncio.get_event_loop().create_task(self.poll()) @@ -250,7 +221,7 @@ async def onDisconnect(self): if self.poll_task: self.poll_task.cancel() await asyncio.wait([self.poll_task]) - await asyncio.sleep(0.5) # give others a chance to clean up + await asyncio.sleep(0.5) # give others a chance to clean up async def _poll_step(self): # save changes @@ -259,26 +230,24 @@ async def _poll_step(self): # poll exporters for session in list(self.sessions.values()): if isinstance(session, ExporterSession): - fut = self.call( - f'org.labgrid.exporter.{session.name}.version' - ) + fut = self.call(f"org.labgrid.exporter.{session.name}.version") done, _ = await asyncio.wait([fut], timeout=5) if not done: - print(f'kicking exporter ({session.key}/{session.name})') - await self.call('wamp.session.kill', session.key, message="timeout detected by coordinator") - print(f'cleaning up exporter ({session.key}/{session.name})') + print(f"kicking exporter ({session.key}/{session.name})") + await self.call("wamp.session.kill", session.key, message="timeout detected by coordinator") + print(f"cleaning up exporter ({session.key}/{session.name})") await self.on_session_leave(session.key) - print(f'removed exporter ({session.key}/{session.name})') + print(f"removed exporter ({session.key}/{session.name})") continue try: session.version = done.pop().result() except wamp.exception.ApplicationError as e: if e.error == "wamp.error.no_such_procedure": - pass # old client + pass # old client elif e.error == "wamp.error.canceled": - pass # disconnected + pass # disconnected elif e.error == "wamp.error.no_such_session": - pass # client has already disconnected + pass # client has already disconnected else: raise # update reservations @@ -309,26 +278,26 @@ async def save(self): places = places.encode() loop = asyncio.get_event_loop() - await loop.run_in_executor(None, atomic_replace, 'resources.yaml', resources) - await loop.run_in_executor(None, atomic_replace, 'places.yaml', places) + await loop.run_in_executor(None, atomic_replace, "resources.yaml", resources) + await loop.run_in_executor(None, atomic_replace, "places.yaml", places) def load(self): try: self.places = {} - with open('places.yaml', 'r') as f: + with open("places.yaml", "r") as f: self.places = yaml.load(f.read()) for placename, config in self.places.items(): - config['name'] = placename + config["name"] = placename # FIXME maybe recover previously acquired places here? - if 'acquired' in config: - del config['acquired'] - if 'acquired_resources' in config: - del config['acquired_resources'] - if 'allowed' in config: - del config['allowed'] - if 'reservation' in config: - del config['reservation'] - config['matches'] = [ResourceMatch(**match) for match in config['matches']] + if "acquired" in config: + del config["acquired"] + if "acquired_resources" in config: + del config["acquired_resources"] + if "allowed" in config: + del config["allowed"] + if "reservation" in config: + del config["reservation"] + config["matches"] = [ResourceMatch(**match) for match in config["matches"]] place = Place(**config) self.places[placename] = place except FileNotFoundError: @@ -371,28 +340,26 @@ async def _update_acquired_places(self, action, resource, callback=True): self._publish_place(place) def _publish_place(self, place): - self.publish( - 'org.labgrid.coordinator.place_changed', place.name, place.asdict() - ) + self.publish("org.labgrid.coordinator.place_changed", place.name, place.asdict()) def _publish_resource(self, resource): self.publish( - 'org.labgrid.coordinator.resource_changed', - resource.path[0], # exporter name - resource.path[1], # group name - resource.path[3], # resource name + "org.labgrid.coordinator.resource_changed", + resource.path[0], # exporter name + resource.path[1], # group name + resource.path[3], # resource name resource.asdict(), ) @locked async def on_session_join(self, session_details): - print('join') + print("join") pprint(session_details) - session = session_details['session'] - authid = session_details['authextra'].get('authid') or session_details['authid'] - if authid.startswith('client/'): + session = session_details["session"] + authid = session_details["authextra"].get("authid") or session_details["authid"] + if authid.startswith("client/"): session = ClientSession(self, session, authid) - elif authid.startswith('exporter/'): + elif authid.startswith("exporter/"): session = ExporterSession(self, session, authid) else: return @@ -400,7 +367,7 @@ async def on_session_join(self, session_details): @locked async def on_session_leave(self, session_id): - print(f'leave ({session_id})') + print(f"leave ({session_id})") try: session = self.sessions.pop(session_id) except KeyError: @@ -417,7 +384,7 @@ async def attach(self, name, details=None): # TODO check if name is in use session = self.sessions[details.caller] session_details = self.sessions[session] - session_details['name'] = name + session_details["name"] = name self.exporters[name] = defaultdict(dict) # not @locked because set_resource my be triggered by a acquire() call to @@ -473,9 +440,7 @@ async def del_place(self, name, details=None): if name not in self.places: return False del self.places[name] - self.publish( - 'org.labgrid.coordinator.place_changed', name, {} - ) + self.publish("org.labgrid.coordinator.place_changed", name, {}) self.save_later() return True @@ -551,7 +516,7 @@ async def add_place_match(self, placename, pattern, rename=None, details=None): place = self.places[placename] except KeyError: return False - match = ResourceMatch(*pattern.split('/'), rename=rename) + match = ResourceMatch(*pattern.split("/"), rename=rename) if match in place.matches: return False place.matches.append(match) @@ -566,7 +531,7 @@ async def del_place_match(self, placename, pattern, rename=None, details=None): place = self.places[placename] except KeyError: return False - match = ResourceMatch(*pattern.split('/'), rename=rename) + match = ResourceMatch(*pattern.split("/"), rename=rename) try: place.matches.remove(match) except ValueError: @@ -577,7 +542,7 @@ async def del_place_match(self, placename, pattern, rename=None, details=None): return True async def _acquire_resources(self, place, resources): - resources = resources.copy() # we may modify the list + resources = resources.copy() # we may modify the list # all resources need to be free for resource in resources: if resource.acquired: @@ -589,8 +554,9 @@ async def _acquire_resources(self, place, resources): for resource in resources: # this triggers an update from the exporter which is published # to the clients - await self.call(f'org.labgrid.exporter.{resource.path[0]}.acquire', - resource.path[1], resource.path[3], place.name) + await self.call( + f"org.labgrid.exporter.{resource.path[0]}.acquire", resource.path[1], resource.path[3], place.name + ) acquired.append(resource) except: print(f"failed to acquire {resource}", file=sys.stderr) @@ -604,7 +570,7 @@ async def _acquire_resources(self, place, resources): return True async def _release_resources(self, place, resources, callback=True): - resources = resources.copy() # we may modify the list + resources = resources.copy() # we may modify the list for resource in resources: try: @@ -617,8 +583,9 @@ async def _release_resources(self, place, resources, callback=True): # this triggers an update from the exporter which is published # to the clients if callback: - await self.call(f'org.labgrid.exporter.{resource.path[0]}.release', - resource.path[1], resource.path[3]) + await self.call( + f"org.labgrid.exporter.{resource.path[0]}.release", resource.path[1], resource.path[3] + ) except: print(f"failed to release {resource}", file=sys.stderr) # at leaset try to notify the clients @@ -758,10 +725,10 @@ def schedule_reservations(self): res.state = ReservationState.expired res.allocations.clear() res.refresh() - print(f'reservation ({res.owner}/{res.token}) is now {res.state.name}') + print(f"reservation ({res.owner}/{res.token}) is now {res.state.name}") else: del self.reservations[res.token] - print(f'removed {res.state.name} reservation ({res.owner}/{res.token})') + print(f"removed {res.state.name} reservation ({res.owner}/{res.token})") # check which places are already allocated and handle state transitions allocated_places = set() @@ -775,7 +742,7 @@ def schedule_reservations(self): res.state = ReservationState.invalid res.allocations.clear() res.refresh(300) - print(f'reservation ({res.owner}/{res.token}) is now {res.state.name}') + print(f"reservation ({res.owner}/{res.token}) is now {res.state.name}") if place.acquired is not None: acquired_places.add(name) assert name not in allocated_places, "conflicting allocation" @@ -784,12 +751,12 @@ def schedule_reservations(self): # an allocated place was acquired res.state = ReservationState.acquired res.refresh() - print(f'reservation ({res.owner}/{res.token}) is now {res.state.name}') + print(f"reservation ({res.owner}/{res.token}) is now {res.state.name}") if not acquired_places and res.state is ReservationState.acquired: # all allocated places were released res.state = ReservationState.allocated res.refresh() - print(f'reservation ({res.owner}/{res.token}) is now {res.state.name}') + print(f"reservation ({res.owner}/{res.token}) is now {res.state.name}") # check which places are available for allocation available_places = set() @@ -811,21 +778,21 @@ def schedule_reservations(self): for name in available_places: tags = set(self.places[name].tags.items()) # support place names - tags |= {('name', name)} + tags |= {("name", name)} # support place aliases place_tagsets.append(TagSet(name, tags)) filter_tagsets = [] for res in pending_reservations: - filter_tagsets.append(TagSet(res.token, set(res.filters['main'].items()))) + filter_tagsets.append(TagSet(res.token, set(res.filters["main"].items()))) allocation = schedule(place_tagsets, filter_tagsets) # apply allocations for res_token, place_name in allocation.items(): res = self.reservations[res_token] - res.allocations = {'main': [place_name]} + res.allocations = {"main": [place_name]} res.state = ReservationState.allocated res.refresh() - print(f'reservation ({res.owner}/{res.token}) is now {res.state.name}') + print(f"reservation ({res.owner}/{res.token}) is now {res.state.name}") # update reservation property of each place and notify old_map = {} @@ -853,7 +820,7 @@ async def create_reservation(self, spec, prio=0.0, details=None): filter_ = {} for pair in spec.split(): try: - k, v = pair.split('=') + k, v = pair.split("=") except ValueError: return None if not TAG_KEY.match(k): @@ -862,7 +829,7 @@ async def create_reservation(self, spec, prio=0.0, details=None): return None filter_[k] = v - filters = {'main': filter_} # currently, only one group is implemented + filters = {"main": filter_} # currently, only one group is implemented owner = self.sessions[details.caller].name res = Reservation(owner=owner, prio=prio, filters=filters) @@ -893,7 +860,8 @@ async def poll_reservation(self, token, details=None): async def get_reservations(self, details=None): return {k: v.asdict() for k, v in self.reservations.items()} -if __name__ == '__main__': + +if __name__ == "__main__": runner = ApplicationRunner( url=environ.get("WS", "ws://127.0.0.1:20408/ws"), realm="realm1", diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index 577d3c926..03a8cd26c 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -1,5 +1,6 @@ """The remote.exporter module exports resources to the coordinator and makes them available to other clients on the same coordinator""" + import argparse import asyncio import logging @@ -28,6 +29,7 @@ exports: Dict[str, Type[ResourceEntry]] = {} reexec = False + class ExporterError(Exception): pass @@ -40,19 +42,21 @@ def log_subprocess_kernel_stack(logger, child): if child.poll() is not None: # nothing to check if no longer running return try: - with open(f'/proc/{child.pid}/stack', 'r') as f: + with open(f"/proc/{child.pid}/stack", "r") as f: stack = f.read() stack = stack.strip() except PermissionError: return logger.info("current kernel stack of %s is:\n%s", child.args, stack) + @attr.s(eq=False) class ResourceExport(ResourceEntry): """Represents a local resource exported via a specific protocol. The ResourceEntry attributes contain the information for the client. """ + host = attr.ib(default=gethostname(), validator=attr.validators.instance_of(str)) proxy = attr.ib(default=None) proxy_required = attr.ib(default=False) @@ -86,7 +90,7 @@ def broken(self, reason): # resource. For now, when trying to acquire a place with a match for # this resource, we get 'resource is already in used by ', # instead of an unspecific error. - self.data['acquired'] = '' + self.data["acquired"] = "" self.logger.error("marked as broken: %s", reason) def _get_start_params(self): # pylint: disable=no-self-use @@ -145,17 +149,17 @@ def poll(self): # check if resulting information has changed dirty = False if self.avail != (self.local.avail and not self.broken): - self.data['avail'] = self.local.avail and not self.broken + self.data["avail"] = self.local.avail and not self.broken dirty = True params = self._get_params() - if not params.get('extra'): - params['extra'] = {} - params['extra']['proxy_required'] = self.proxy_required - params['extra']['proxy'] = self.proxy + if not params.get("extra"): + params["extra"] = {} + params["extra"]["proxy_required"] = self.proxy_required + params["extra"]["proxy"] = self.proxy if self.broken: - params['extra']['broken'] = self.broken + params["extra"]["broken"] = self.broken if self.params != params: - self.data['params'].update(params) + self.data["params"].update(params) dirty = True return dirty @@ -181,11 +185,13 @@ def __attrs_post_init__(self): super().__attrs_post_init__() if self.cls == "RawSerialPort": from ..resource.serialport import RawSerialPort + self.local = RawSerialPort(target=None, name=None, **self.local_params) elif self.cls == "USBSerialPort": from ..resource.udev import USBSerialPort + self.local = USBSerialPort(target=None, name=None, **self.local_params) - self.data['cls'] = "NetworkSerialPort" + self.data["cls"] = "NetworkSerialPort" self.child = None self.port = None self.ser2net_bin = shutil.which("ser2net") @@ -203,31 +209,31 @@ def __del__(self): def _get_start_params(self): return { - 'path': self.local.port, + "path": self.local.port, } def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'port': self.port, - 'speed': self.local.speed, - 'extra': { - 'path': self.local.port, - } + "host": self.host, + "port": self.port, + "speed": self.local.speed, + "extra": { + "path": self.local.port, + }, } def _start(self, start_params): """Start ``ser2net`` subprocess""" assert self.local.avail assert self.child is None - assert start_params['path'].startswith('/dev/') + assert start_params["path"].startswith("/dev/") self.port = get_free_port() # Ser2net has switched to using YAML format at version 4.0.0. - result = subprocess.run([self.ser2net_bin,'-v'], capture_output=True, text=True) - _, _, version = str(result.stdout).split(' ') - major_version = version.split('.')[0] + result = subprocess.run([self.ser2net_bin, "-v"], capture_output=True, text=True) + _, _, version = str(result.stdout).split(" ") + major_version = version.split(".")[0] # There is a bug in ser2net between 4.4.0 and 4.6.1 where it # returns 1 on a successful call to 'ser2net -v'. We don't want @@ -239,20 +245,24 @@ def _start(self, start_params): if int(major_version) >= 4: cmd = [ self.ser2net_bin, - '-d', - '-n', - '-Y', f'connection: &con01# accepter: telnet(rfc2217,mode=server),{self.port}', - '-Y', f' connector: serialdev(nouucplock=true),{start_params["path"]},{self.local.speed}n81,local', # pylint: disable=line-too-long - '-Y', ' options:', - '-Y', ' max-connections: 10', + "-d", + "-n", + "-Y", + f"connection: &con01# accepter: telnet(rfc2217,mode=server),{self.port}", + "-Y", + f' connector: serialdev(nouucplock=true),{start_params["path"]},{self.local.speed}n81,local', # pylint: disable=line-too-long + "-Y", + " options:", + "-Y", + " max-connections: 10", ] else: cmd = [ self.ser2net_bin, - '-d', - '-n', - '-u', - '-C', + "-d", + "-n", + "-u", + "-C", f'{self.port}:telnet:0:{start_params["path"]}:{self.local.speed} NONE 8DATABITS 1STOPBIT LOCAL', # pylint: disable=line-too-long ] self.logger.info("Starting ser2net with: %s", " ".join(cmd)) @@ -263,7 +273,7 @@ def _start(self, start_params): except subprocess.TimeoutExpired: # good, ser2net didn't exit immediately pass - self.logger.info("started ser2net for %s on port %d", start_params['path'], self.port) + self.logger.info("started ser2net for %s on port %d", start_params["path"], self.port) def _stop(self, start_params): """Stop ``ser2net`` subprocess""" @@ -276,16 +286,17 @@ def _stop(self, start_params): try: child.wait(2.0) # ser2net takes about a second to react except subprocess.TimeoutExpired: - self.logger.warning("ser2net for %s still running after SIGTERM", start_params['path']) + self.logger.warning("ser2net for %s still running after SIGTERM", start_params["path"]) log_subprocess_kernel_stack(self.logger, child) child.kill() child.wait(1.0) - self.logger.info("stopped ser2net for %s on port %d", start_params['path'], port) + self.logger.info("stopped ser2net for %s on port %d", start_params["path"], port) exports["USBSerialPort"] = SerialPortExport exports["RawSerialPort"] = SerialPortExport + @attr.s(eq=False) class NetworkInterfaceExport(ResourceExport): """ResourceExport for a network interface""" @@ -294,21 +305,23 @@ def __attrs_post_init__(self): super().__attrs_post_init__() if self.cls == "NetworkInterface": from ..resource.base import NetworkInterface + self.local = NetworkInterface(target=None, name=None, **self.local_params) elif self.cls == "USBNetworkInterface": from ..resource.udev import USBNetworkInterface + self.local = USBNetworkInterface(target=None, name=None, **self.local_params) - self.data['cls'] = "RemoteNetworkInterface" + self.data["cls"] = "RemoteNetworkInterface" def _get_params(self): """Helper function to return parameters""" params = { - 'host': self.host, - 'ifname': self.local.ifname, + "host": self.host, + "ifname": self.local.ifname, } if self.cls == "USBNetworkInterface": - params['extra'] = { - 'state': self.local.if_state, + params["extra"] = { + "state": self.local.if_state, } return params @@ -317,6 +330,7 @@ def _get_params(self): exports["USBNetworkInterface"] = NetworkInterfaceExport exports["NetworkInterface"] = NetworkInterfaceExport + @attr.s(eq=False) class USBGenericExport(ResourceExport): """ResourceExport for USB devices accessed directly from userspace""" @@ -324,22 +338,24 @@ class USBGenericExport(ResourceExport): def __attrs_post_init__(self): super().__attrs_post_init__() local_cls_name = self.cls - self.data['cls'] = f"Network{self.cls}" + self.data["cls"] = f"Network{self.cls}" from ..resource import udev + local_cls = getattr(udev, local_cls_name) self.local = local_cls(target=None, name=None, **self.local_params) def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, } + @attr.s(eq=False) class USBSigrokExport(USBGenericExport): """ResourceExport for USB devices accessed directly from userspace""" @@ -350,16 +366,17 @@ def __attrs_post_init__(self): def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, - 'driver': self.local.driver, - 'channels': self.local.channels + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "driver": self.local.driver, + "channels": self.local.channels, } + @attr.s(eq=False) class USBSDMuxExport(USBGenericExport): """ResourceExport for USB devices accessed directly from userspace""" @@ -370,15 +387,16 @@ def __attrs_post_init__(self): def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, - 'control_path': self.local.control_path, + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "control_path": self.local.control_path, } + @attr.s(eq=False) class USBSDWireExport(USBGenericExport): """ResourceExport for USB devices accessed directly from userspace""" @@ -389,15 +407,16 @@ def __attrs_post_init__(self): def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, - 'control_serial': self.local.control_serial, + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "control_serial": self.local.control_serial, } + @attr.s(eq=False) class USBAudioInputExport(USBGenericExport): """ResourceExport for ports on switchable USB hubs""" @@ -408,16 +427,17 @@ def __attrs_post_init__(self): def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, - 'index': self.local.index, - 'alsa_name': self.local.alsa_name, + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "index": self.local.index, + "alsa_name": self.local.alsa_name, } + @attr.s(eq=False) class SiSPMPowerPortExport(USBGenericExport): """ResourceExport for ports on GEMBRID switches""" @@ -428,15 +448,16 @@ def __attrs_post_init__(self): def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, - 'index': self.local.index, + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "index": self.local.index, } + @attr.s(eq=False) class USBPowerPortExport(USBGenericExport): """ResourceExport for ports on switchable USB hubs""" @@ -447,15 +468,16 @@ def __attrs_post_init__(self): def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, - 'index': self.local.index, + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "index": self.local.index, } + @attr.s(eq=False) class USBDeditecRelaisExport(USBGenericExport): """ResourceExport for outputs on deditec relais""" @@ -466,15 +488,16 @@ def __attrs_post_init__(self): def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, - 'index': self.local.index, + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "index": self.local.index, } + @attr.s(eq=False) class USBHIDRelayExport(USBGenericExport): """ResourceExport for outputs on simple USB HID relays""" @@ -485,31 +508,35 @@ def __attrs_post_init__(self): def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'busnum': self.local.busnum, - 'devnum': self.local.devnum, - 'path': self.local.path, - 'vendor_id': self.local.vendor_id, - 'model_id': self.local.model_id, - 'index': self.local.index, + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "index": self.local.index, } + @attr.s(eq=False) class USBFlashableExport(USBGenericExport): """ResourceExport for Flashable USB devices""" + def __attrs_post_init__(self): super().__attrs_post_init__() def _get_params(self): p = super()._get_params() - p['devnode'] = self.local.devnode + p["devnode"] = self.local.devnode return p + @attr.s(eq=False) class USBGenericRemoteExport(USBGenericExport): def __attrs_post_init__(self): super().__attrs_post_init__() - self.data['cls'] = f"Remote{self.cls}".replace("Network", "") + self.data["cls"] = f"Remote{self.cls}".replace("Network", "") + exports["AndroidFastboot"] = USBGenericExport exports["AndroidUSBFastboot"] = USBGenericRemoteExport @@ -535,6 +562,7 @@ def __attrs_post_init__(self): exports["USBFlashableDevice"] = USBFlashableExport exports["LXAUSBMux"] = USBGenericExport + @attr.s(eq=False) class ProviderGenericExport(ResourceExport): """ResourceExport for Resources derived from BaseProvider""" @@ -542,23 +570,26 @@ class ProviderGenericExport(ResourceExport): def __attrs_post_init__(self): super().__attrs_post_init__() local_cls_name = self.cls - self.data['cls'] = f"Remote{self.cls}" + self.data["cls"] = f"Remote{self.cls}" from ..resource import provider + local_cls = getattr(provider, local_cls_name) self.local = local_cls(target=None, name=None, **self.local_params) def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'internal': self.local.internal, - 'external': self.local.external, + "host": self.host, + "internal": self.local.internal, + "external": self.local.external, } + exports["TFTPProvider"] = ProviderGenericExport exports["NFSProvider"] = ProviderGenericExport exports["HTTPProvider"] = ProviderGenericExport + @attr.s class EthernetPortExport(ResourceExport): """ResourceExport for a ethernet interface""" @@ -566,23 +597,21 @@ class EthernetPortExport(ResourceExport): def __attrs_post_init__(self): super().__attrs_post_init__() from ..resource.ethernetport import SNMPEthernetPort - self.data['cls'] = "EthernetPort" + + self.data["cls"] = "EthernetPort" self.local = SNMPEthernetPort(target=None, name=None, **self.local_params) def _get_params(self): """Helper function to return parameters""" - return { - 'switch': self.local.switch, - 'interface': self.local.interface, - 'extra': self.local.extra - } + return {"switch": self.local.switch, "interface": self.local.interface, "extra": self.local.extra} + exports["SNMPEthernetPort"] = EthernetPortExport @attr.s(eq=False) class GPIOSysFSExport(ResourceExport): - _gpio_sysfs_path_prefix = '/sys/class/gpio' + _gpio_sysfs_path_prefix = "/sys/class/gpio" """ResourceExport for GPIO lines accessed directly from userspace""" @@ -590,49 +619,51 @@ def __attrs_post_init__(self): super().__attrs_post_init__() if self.cls == "SysfsGPIO": from ..resource.base import SysfsGPIO + self.local = SysfsGPIO(target=None, name=None, **self.local_params) elif self.cls == "MatchedSysfsGPIO": from ..resource.udev import MatchedSysfsGPIO + self.local = MatchedSysfsGPIO(target=None, name=None, **self.local_params) - self.data['cls'] = "NetworkSysfsGPIO" - self.export_path = Path(GPIOSysFSExport._gpio_sysfs_path_prefix, - f'gpio{self.local.index}') + self.data["cls"] = "NetworkSysfsGPIO" + self.export_path = Path(GPIOSysFSExport._gpio_sysfs_path_prefix, f"gpio{self.local.index}") self.system_exported = False def _get_params(self): """Helper function to return parameters""" return { - 'host': self.host, - 'index': self.local.index, + "host": self.host, + "index": self.local.index, } def _get_start_params(self): return { - 'index': self.local.index, + "index": self.local.index, } def _start(self, start_params): """Start a GPIO export to userspace""" - index = start_params['index'] + index = start_params["index"] if self.export_path.exists(): self.system_exported = True return - export_sysfs_path = os.path.join(GPIOSysFSExport._gpio_sysfs_path_prefix, 'export') - with open(export_sysfs_path, mode='wb') as export: - export.write(str(index).encode('utf-8')) + export_sysfs_path = os.path.join(GPIOSysFSExport._gpio_sysfs_path_prefix, "export") + with open(export_sysfs_path, mode="wb") as export: + export.write(str(index).encode("utf-8")) def _stop(self, start_params): """Disable a GPIO export to userspace""" - index = start_params['index'] + index = start_params["index"] if self.system_exported: return - export_sysfs_path = os.path.join(GPIOSysFSExport._gpio_sysfs_path_prefix, 'unexport') - with open(export_sysfs_path, mode='wb') as unexport: - unexport.write(str(index).encode('utf-8')) + export_sysfs_path = os.path.join(GPIOSysFSExport._gpio_sysfs_path_prefix, "unexport") + with open(export_sysfs_path, mode="wb") as unexport: + unexport.write(str(index).encode("utf-8")) + exports["SysfsGPIO"] = GPIOSysFSExport exports["MatchedSysfsGPIO"] = GPIOSysFSExport @@ -649,9 +680,10 @@ class NetworkServiceExport(ResourceExport): def __attrs_post_init__(self): super().__attrs_post_init__() from ..resource.networkservice import NetworkService - self.data['cls'] = "NetworkService" + + self.data["cls"] = "NetworkService" self.local = NetworkService(target=None, name=None, **self.local_params) - if '%' in self.local_params['address']: + if "%" in self.local_params["address"]: self.proxy_required = True def _get_params(self): @@ -660,22 +692,28 @@ def _get_params(self): **self.local_params, } + exports["NetworkService"] = NetworkServiceExport + @attr.s class HTTPVideoStreamExport(ResourceExport): """ResourceExport for an HTTPVideoStream""" + def __attrs_post_init__(self): super().__attrs_post_init__() from ..resource.httpvideostream import HTTPVideoStream - self.data['cls'] = "HTTPVideoStream" + + self.data["cls"] = "HTTPVideoStream" self.local = HTTPVideoStream(target=None, name=None, **self.local_params) def _get_params(self): return self.local_params + exports["HTTPVideoStream"] = HTTPVideoStreamExport + @attr.s(eq=False) class LXAIOBusNodeExport(ResourceExport): """ResourceExport for LXAIOBusNode devices accessed via the HTTP API""" @@ -683,32 +721,38 @@ class LXAIOBusNodeExport(ResourceExport): def __attrs_post_init__(self): super().__attrs_post_init__() local_cls_name = self.cls - self.data['cls'] = f"Network{self.cls}" + self.data["cls"] = f"Network{self.cls}" from ..resource import lxaiobus + local_cls = getattr(lxaiobus, local_cls_name) self.local = local_cls(target=None, name=None, **self.local_params) def _get_params(self): return self.local_params + exports["LXAIOBusPIO"] = LXAIOBusNodeExport + @attr.s(eq=False) class AndroidNetFastbootExport(ResourceExport): def __attrs_post_init__(self): super().__attrs_post_init__() local_cls_name = self.cls - self.data['cls'] = f"Remote{self.cls}" + self.data["cls"] = f"Remote{self.cls}" from ..resource import fastboot + local_cls = getattr(fastboot, local_cls_name) self.local = local_cls(target=None, name=None, **self.local_params) def _get_params(self): """Helper function to return parameters""" - return {'host' : self.host, **self.local_params} + return {"host": self.host, **self.local_params} + exports["AndroidNetFastboot"] = AndroidNetFastbootExport + @attr.s(eq=False) class YKUSHPowerPortExport(ResourceExport): """ResourceExport for YKUSHPowerPort devices""" @@ -716,29 +760,29 @@ class YKUSHPowerPortExport(ResourceExport): def __attrs_post_init__(self): super().__attrs_post_init__() local_cls_name = self.cls - self.data['cls'] = f"Network{local_cls_name}" + self.data["cls"] = f"Network{local_cls_name}" from ..resource import ykushpowerport + local_cls = getattr(ykushpowerport, local_cls_name) self.local = local_cls(target=None, name=None, **self.local_params) def _get_params(self): - return { - "host": self.host, - **self.local_params - } + return {"host": self.host, **self.local_params} + exports["YKUSHPowerPort"] = YKUSHPowerPortExport + class ExporterSession(ApplicationSession): def onConnect(self): """Set up internal datastructures on successful connection: - Setup loop, name, authid and address - Join the coordinator as an exporter""" - self.loop = self.config.extra['loop'] - self.name = self.config.extra['name'] - self.hostname = self.config.extra['hostname'] - self.isolated = self.config.extra['isolated'] - self.address = self._transport.transport.get_extra_info('sockname')[0] + self.loop = self.config.extra["loop"] + self.name = self.config.extra["name"] + self.hostname = self.config.extra["hostname"] + self.isolated = self.config.extra["isolated"] + self.address = self._transport.transport.get_extra_info("sockname")[0] self.checkpoint = time.monotonic() self.poll_task = None @@ -765,35 +809,31 @@ async def onJoin(self, details): """ print(details) - prefix = f'org.labgrid.exporter.{self.name}' + prefix = f"org.labgrid.exporter.{self.name}" try: - await self.register(self.acquire, f'{prefix}.acquire') - await self.register(self.release, f'{prefix}.release') - await self.register(self.version, f'{prefix}.version') + await self.register(self.acquire, f"{prefix}.acquire") + await self.register(self.release, f"{prefix}.release") + await self.register(self.version, f"{prefix}.version") config_template_env = { - 'env': os.environ, - 'isolated': self.isolated, - 'hostname': self.hostname, - 'name': self.name, + "env": os.environ, + "isolated": self.isolated, + "hostname": self.hostname, + "name": self.name, } - resource_config = ResourceConfig( - self.config.extra['resources'], config_template_env - ) + resource_config = ResourceConfig(self.config.extra["resources"], config_template_env) for group_name, group in resource_config.data.items(): group_name = str(group_name) for resource_name, params in group.items(): resource_name = str(resource_name) - if resource_name == 'location': + if resource_name == "location": continue if params is None: continue - cls = params.pop('cls', resource_name) + cls = params.pop("cls", resource_name) # this may call back to acquire the resource immediately - await self.add_resource( - group_name, resource_name, cls, params - ) + await self.add_resource(group_name, resource_name, cls, params) self.checkpoint = time.monotonic() except Exception: # pylint: disable=broad-except @@ -817,7 +857,7 @@ async def onDisconnect(self): if self.poll_task: self.poll_task.cancel() await asyncio.wait([self.poll_task]) - await asyncio.sleep(0.5) # give others a chance to clean up + await asyncio.sleep(0.5) # give others a chance to clean up self.loop.stop() async def acquire(self, group_name, resource_name, place_name): @@ -871,25 +911,22 @@ async def poll(self): async def add_resource(self, group_name, resource_name, cls, params): """Add a resource to the exporter and update status on the coordinator""" - print( - f"add resource {group_name}/{resource_name}: {cls}/{params}" - ) + print(f"add resource {group_name}/{resource_name}: {cls}/{params}") group = self.groups.setdefault(group_name, {}) assert resource_name not in group export_cls = exports.get(cls, ResourceEntry) config = { - 'avail': export_cls is ResourceEntry, - 'cls': cls, - 'params': params, + "avail": export_cls is ResourceEntry, + "cls": cls, + "params": params, } proxy_req = self.isolated if issubclass(export_cls, ResourceExport): - group[resource_name] = export_cls(config, host=self.hostname, proxy=getfqdn(), - proxy_required=proxy_req) + group[resource_name] = export_cls(config, host=self.hostname, proxy=getfqdn(), proxy_required=proxy_req) else: - config['params']['extra'] = { - 'proxy': getfqdn(), - 'proxy_required': proxy_req, + config["params"]["extra"] = { + "proxy": getfqdn(), + "proxy_required": proxy_req, } group[resource_name] = export_cls(config) await self.update_resource(group_name, resource_name) @@ -899,73 +936,56 @@ async def update_resource(self, group_name, resource_name): resource = self.groups[group_name][resource_name] data = resource.asdict() print(data) - await self.call( - 'org.labgrid.coordinator.set_resource', group_name, resource_name, - data - ) + await self.call("org.labgrid.coordinator.set_resource", group_name, resource_name, data) def main(): parser = argparse.ArgumentParser() parser.add_argument( - '-x', - '--crossbar', - metavar='URL', + "-x", + "--crossbar", + metavar="URL", type=str, default=os.environ.get("LG_CROSSBAR", "ws://127.0.0.1:20408/ws"), - help="crossbar websocket URL" + help="crossbar websocket URL", ) parser.add_argument( - '-n', - '--name', - dest='name', + "-n", + "--name", + dest="name", type=str, default=None, - help='public name of this exporter (defaults to the system hostname)' + help="public name of this exporter (defaults to the system hostname)", ) parser.add_argument( - '--hostname', - dest='hostname', + "--hostname", + dest="hostname", type=str, default=None, - help='hostname (or IP) published for accessing resources (defaults to the system hostname)' + help="hostname (or IP) published for accessing resources (defaults to the system hostname)", ) parser.add_argument( - '--fqdn', - action='store_true', - default=False, - help='Use fully qualified domain name as default for hostname' + "--fqdn", action="store_true", default=False, help="Use fully qualified domain name as default for hostname" ) + parser.add_argument("-d", "--debug", action="store_true", default=False, help="enable debug mode") parser.add_argument( - '-d', - '--debug', - action='store_true', + "-i", + "--isolated", + action="store_true", default=False, - help="enable debug mode" - ) - parser.add_argument( - '-i', - '--isolated', - action='store_true', - default=False, - help="enable isolated mode (always request SSH forwards)" - ) - parser.add_argument( - 'resources', - metavar='RESOURCES', - type=str, - help='resource config file name' + help="enable isolated mode (always request SSH forwards)", ) + parser.add_argument("resources", metavar="RESOURCES", type=str, help="resource config file name") args = parser.parse_args() - level = 'debug' if args.debug else 'info' + level = "debug" if args.debug else "info" extra = { - 'name': args.name or gethostname(), - 'hostname': args.hostname or (getfqdn() if args.fqdn else gethostname()), - 'resources': args.resources, - 'isolated': args.isolated + "name": args.name or gethostname(), + "hostname": args.hostname or (getfqdn() if args.fqdn else gethostname()), + "resources": args.resources, + "isolated": args.isolated, } crossbar_url = args.crossbar @@ -977,7 +997,7 @@ def main(): print(f"exporter hostname: {extra['hostname']}") print(f"resource config file: {extra['resources']}") - extra['loop'] = loop = asyncio.get_event_loop() + extra["loop"] = loop = asyncio.get_event_loop() if args.debug: loop.set_debug(True) runner = ApplicationRunner(url=crossbar_url, realm=crossbar_realm, extra=extra)