Skip to content

Commit

Permalink
fix linting issues and test example test files
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkimsour committed Oct 16, 2024
1 parent 9625a6f commit 6b3590d
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 29 deletions.
5 changes: 4 additions & 1 deletion free_fleet_adapter/free_fleet_adapter/nav2_robot_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from tf_transformations import quaternion_from_euler
import zenoh


class Nav2RobotAdapter:
def __init__(
self,
Expand Down Expand Up @@ -104,7 +105,9 @@ def _tf_callback(sample: zenoh.Sample):
)

def _battery_state_callback(sample: zenoh.Sample):
battery_state = SensorMsgs_BatteryState.deserialize(sample.payload.to_bytes())
battery_state = SensorMsgs_BatteryState.deserialize(
sample.payload.to_bytes()
)
self.battery_soc = battery_state.percentage

self.battery_state_sub = self.zenoh_session.declare_subscriber(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def main(argv=sys.argv):
# 'Service Servers' using the same name)
for reply in replies:
# Deserialize the response
rep = ActionMsgs_CancelGoal_Response.deserialize(reply.ok.payload.to_bytes())
rep = ActionMsgs_CancelGoal_Response.deserialize(
reply.ok.payload.to_bytes()
)
print('Return code: %d' % rep.return_code)

session.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ def main(argv=sys.argv):
continue
print('handling a reply!')
# Deserialize the response
rep = NavigateToPose_SendGoal_Response.deserialize(reply.ok.payload.to_bytes())
rep = NavigateToPose_SendGoal_Response.deserialize(
reply.ok.payload.to_bytes()
)
if not rep.accepted:
print('Goal rejected')
return
Expand Down Expand Up @@ -131,7 +133,9 @@ def main(argv=sys.argv):
# print("Result: {0}".format(rep.sequence))
print(f"Result: {rep.status}")
if rep.status == GoalStatus.STATUS_ABORTED:
print("Received (ERROR: 'Plan aborted by planner_server')")
print(
"Received (ERROR: 'Plan aborted by planner_server')"
)
break
if rep.status == GoalStatus.STATUS_SUCCEEDED:
break
Expand Down
50 changes: 25 additions & 25 deletions free_fleet_examples/free_fleet_examples/tests/test_tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,31 +64,31 @@ def main(argv=sys.argv):
print(f"routers: {info.routers_zid()}")
print(f"peers: {info.peers_zid()}")

# Subscribe to TF
pub = session.declare_subscriber(
namespacify('tf', args.namespace),
tf_callback
)

try:
while True:
try:
transform = tf_buffer.lookup_transform(
args.base_footprint_frame,
args.map_frame,
Time()
)
print(transform)
except Exception as err:
print(f'Unable to get transform between base_footprint and '
f'map: {type(err)}: {err}')

time.sleep(1)
except (KeyboardInterrupt):
pass
finally:
pub.undeclare()
session.close()
# Subscribe to TF
pub = session.declare_subscriber(
namespacify('tf', args.namespace),
tf_callback
)

try:
while True:
try:
transform = tf_buffer.lookup_transform(
args.base_footprint_frame,
args.map_frame,
Time()
)
print(transform)
except Exception as err:
print(f'Unable to get transform between base_footprint and '
f'map: {type(err)}: {err}')

time.sleep(1)
except (KeyboardInterrupt):
pass
finally:
pub.undeclare()
session.close()


if __name__ == '__main__':
Expand Down

0 comments on commit 6b3590d

Please sign in to comment.