Skip to content

Commit

Permalink
Added Offline and Busy to robot_status
Browse files Browse the repository at this point in the history
  • Loading branch information
andchiind committed Feb 22, 2024
1 parent 0fec005 commit 48c5e43
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
32 changes: 31 additions & 1 deletion src/isar_exr/api/energy_robotics_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def wake_up_robot(
wake_up_robot_mutation.variable_definitions = variable_definitions_graphql

try:
result: Dict[str, Any] = self.client.query(
result: Dict[str, Any] = self.client.query( # TODO: consider checking if request was accepted
dsl_gql(wake_up_robot_mutation), params
)
except Exception:
Expand Down Expand Up @@ -547,6 +547,36 @@ def get_battery_level(self, exr_robot_id: str) -> Optional[float]:
"percentage"
]
return battery_level

def is_connected(self, exr_robot_id: str) -> bool:
params: dict = {"robotID": exr_robot_id}

variable_definitions_graphql: DSLVariableDefinitions = DSLVariableDefinitions()

check_is_connected_query: DSLQuery = DSLQuery(
self.schema.Query.currentRobotStatus.args(
robotID=variable_definitions_graphql.robotID
).select(
self.schema.RobotStatusType.isConnected,
)
)

check_is_connected_query.variable_definitions = variable_definitions_graphql

try:
result: Dict[str, Any] = self.client.query(
dsl_gql(check_is_connected_query), params
)
except TimeoutError:
return False
except Exception:
message: str = "Could not check robot battery level"
self.logger.error(message)
raise RobotMissionStatusException(
error_description=message,
)

return result["currentRobotStatus"]["isConnected"]

def create_mission_definition(
self, site_id: str, mission_name: str, robot_id: str
Expand Down
9 changes: 7 additions & 2 deletions src/isar_exr/robotinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,14 @@ def get_telemetry_publishers(
return publisher_threads

def robot_status(self) -> RobotStatus:
# TODO: check if robot is running a task, or check if it is awake?
# TODO: use currentMissionExecution to see if Busy
# TODO: find endpoint to check if it is stuck, maybe MissionExecutionStatusEnum.PAUSED
if not self.api.is_connected():
return RobotStatus.Offline

mission_status: MissionStatus = self.api.get_mission_status(settings.ROBOT_EXR_ID)
if mission_status == MissionStatus.Paused or mission_status == MissionStatus.NotStarted or \
mission_status == MissionStatus.InProgress:
return RobotStatus.Busy
return RobotStatus.Available

def _get_pose_telemetry(self, isar_id: str, robot_name: str) -> str:
Expand Down

0 comments on commit 48c5e43

Please sign in to comment.