diff --git a/free_fleet_adapter/free_fleet_adapter/nav2_robot_adapter.py b/free_fleet_adapter/free_fleet_adapter/nav2_robot_adapter.py index f778eabe..52f83e28 100644 --- a/free_fleet_adapter/free_fleet_adapter/nav2_robot_adapter.py +++ b/free_fleet_adapter/free_fleet_adapter/nav2_robot_adapter.py @@ -42,6 +42,7 @@ from tf_transformations import quaternion_from_euler import zenoh + class Nav2RobotAdapter: def __init__( self, @@ -104,7 +105,9 @@ def _tf_callback(sample: zenoh.Sample): ) def _battery_state_callback(sample: zenoh.Sample): - battery_state = SensorMsgs_BatteryState.deserialize(sample.payload.to_bytes()) + battery_state = SensorMsgs_BatteryState.deserialize( + sample.payload.to_bytes() + ) self.battery_soc = battery_state.percentage self.battery_state_sub = self.zenoh_session.declare_subscriber( diff --git a/free_fleet_examples/free_fleet_examples/tests/test_cancel_all_goals.py b/free_fleet_examples/free_fleet_examples/tests/test_cancel_all_goals.py index 301c01ce..a1840602 100755 --- a/free_fleet_examples/free_fleet_examples/tests/test_cancel_all_goals.py +++ b/free_fleet_examples/free_fleet_examples/tests/test_cancel_all_goals.py @@ -53,7 +53,9 @@ def main(argv=sys.argv): # 'Service Servers' using the same name) for reply in replies: # Deserialize the response - rep = ActionMsgs_CancelGoal_Response.deserialize(reply.ok.payload.to_bytes()) + rep = ActionMsgs_CancelGoal_Response.deserialize( + reply.ok.payload.to_bytes() + ) print('Return code: %d' % rep.return_code) session.close() diff --git a/free_fleet_examples/free_fleet_examples/tests/test_navigate_to_pose.py b/free_fleet_examples/free_fleet_examples/tests/test_navigate_to_pose.py index 1c3050ff..f6efe1d9 100755 --- a/free_fleet_examples/free_fleet_examples/tests/test_navigate_to_pose.py +++ b/free_fleet_examples/free_fleet_examples/tests/test_navigate_to_pose.py @@ -101,7 +101,9 @@ def main(argv=sys.argv): continue print('handling a reply!') # Deserialize the response - rep = NavigateToPose_SendGoal_Response.deserialize(reply.ok.payload.to_bytes()) + rep = NavigateToPose_SendGoal_Response.deserialize( + reply.ok.payload.to_bytes() + ) if not rep.accepted: print('Goal rejected') return @@ -131,7 +133,9 @@ def main(argv=sys.argv): # print("Result: {0}".format(rep.sequence)) print(f"Result: {rep.status}") if rep.status == GoalStatus.STATUS_ABORTED: - print("Received (ERROR: 'Plan aborted by planner_server')") + print( + "Received (ERROR: 'Plan aborted by planner_server')" + ) break if rep.status == GoalStatus.STATUS_SUCCEEDED: break diff --git a/free_fleet_examples/free_fleet_examples/tests/test_tf.py b/free_fleet_examples/free_fleet_examples/tests/test_tf.py index 812cc03b..33f2fd81 100755 --- a/free_fleet_examples/free_fleet_examples/tests/test_tf.py +++ b/free_fleet_examples/free_fleet_examples/tests/test_tf.py @@ -64,31 +64,31 @@ def main(argv=sys.argv): print(f"routers: {info.routers_zid()}") print(f"peers: {info.peers_zid()}") - # Subscribe to TF - pub = session.declare_subscriber( - namespacify('tf', args.namespace), - tf_callback - ) - - try: - while True: - try: - transform = tf_buffer.lookup_transform( - args.base_footprint_frame, - args.map_frame, - Time() - ) - print(transform) - except Exception as err: - print(f'Unable to get transform between base_footprint and ' - f'map: {type(err)}: {err}') - - time.sleep(1) - except (KeyboardInterrupt): - pass - finally: - pub.undeclare() - session.close() + # Subscribe to TF + pub = session.declare_subscriber( + namespacify('tf', args.namespace), + tf_callback + ) + + try: + while True: + try: + transform = tf_buffer.lookup_transform( + args.base_footprint_frame, + args.map_frame, + Time() + ) + print(transform) + except Exception as err: + print(f'Unable to get transform between base_footprint and ' + f'map: {type(err)}: {err}') + + time.sleep(1) + except (KeyboardInterrupt): + pass + finally: + pub.undeclare() + session.close() if __name__ == '__main__':