-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #10 from FormantIO/ros-service
Ros service adapter
- Loading branch information
Showing
17 changed files
with
1,160 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
|
||
import json | ||
import logging | ||
import time | ||
from typing import List | ||
|
||
import rospy | ||
from formant.sdk.agent.v1.client import Client as FormantClient | ||
from std_msgs.msg import Bool | ||
|
||
from config import Config | ||
from input_to_ros_service_params import parse | ||
from services import ServiceChecker | ||
from utils import service_call | ||
|
||
logger = logging.getLogger() | ||
|
||
|
||
class Adapter: | ||
|
||
def __init__(self): | ||
"""Initialize the adapter""" | ||
|
||
self._fclient = FormantClient() | ||
self._config = Config().get_config() | ||
self._api_button_map = self._config["api-button-mapping"] | ||
self._ros_button_map = self._config["ros-button-mapping"] | ||
self._service_checker = ServiceChecker() | ||
rospy.init_node('service_call_adapter') | ||
|
||
def run(self): | ||
"""Run the adapter. This function will never return, but is non-blocking.""" | ||
|
||
self._service_checker.start() | ||
|
||
self._fclient.register_teleop_callback( | ||
self._handle_button_press, ["Buttons"]) | ||
self._fclient.register_command_request_callback( | ||
self._handle_command, self._config["service-commands"]) | ||
|
||
self.subscribers = [] | ||
for topic in self._ros_button_map: | ||
self.subscribers.append( | ||
rospy.Subscriber( | ||
topic, | ||
Bool, | ||
self._handle_ros_button_press, | ||
topic)) | ||
|
||
logger.info( | ||
"Callback's have successfully been registered with the Formant client.") | ||
|
||
while True: | ||
time.sleep(1) | ||
|
||
def _handle_ros_button_press(self, msg, topic): | ||
"""Handle a button press from a ROS topic.""" | ||
|
||
if not msg.data: | ||
return | ||
|
||
logger.info(f"ROS button press received from {topic}") | ||
|
||
if topic not in self._ros_button_map: | ||
logger.info(f"ROS topic {topic} has no service mapping") | ||
return | ||
|
||
service_json = json.dumps(self._ros_button_map.get(topic, {})) | ||
|
||
try: | ||
datum = parse(service_json) | ||
except: | ||
logger.warn(f"Failed parsing json for service call mapped to ROS topic {topic}") | ||
return | ||
|
||
service_name = datum[0] | ||
service_args = datum[1] | ||
self._handle_service_call(service_name, service_args) | ||
|
||
def _handle_button_press(self, button_press): | ||
"""Handle a button press along with the proper parameters.""" | ||
|
||
if not button_press.bitset.bits[0].value: | ||
return | ||
|
||
button_name = button_press.bitset.bits[0].key | ||
|
||
logger.info(f"Button press received from button {button_name}") | ||
|
||
if button_name not in self._api_button_map: | ||
logger.info(f"Button {button_name} has not service mapping") | ||
return | ||
|
||
service_json = json.dumps(self._api_button_map[button_name]) | ||
try: | ||
datum = parse(service_json) | ||
except: | ||
logger.warn(f"Failed parsing json for service call mapped to API button {button_name}") | ||
return | ||
|
||
service_name = datum[0] | ||
service_args = datum[1] | ||
|
||
self._handle_service_call(service_name, service_args) | ||
|
||
def _handle_command(self, data): | ||
"""Handles an incoming formant command as specified in service-commands in config.json""" | ||
logger.info( | ||
f"New command received. Parsing and executing: {data.text}") | ||
|
||
print(f"New command received. Parsing and executing: {data.text}") | ||
|
||
try: | ||
datum = parse(data.text) | ||
except Exception as e: | ||
print(f"Failed parsing {data.text}. Dropping command. Reason: {e}") | ||
|
||
return | ||
|
||
service_name = datum[0] | ||
service_args = datum[1] | ||
|
||
self._handle_service_call(service_name, service_args) | ||
|
||
def _handle_service_call(self, service_name: str, service_args: List[str]): | ||
"""This allows the adapter to call a ROS service, then post the response.""" | ||
|
||
logger.info( | ||
f"Sending service call to service {rospy.resolve_name(service_name)}") | ||
response = service_call( | ||
rospy.resolve_name(service_name), *service_args) | ||
|
||
if response and len(str(response)): | ||
self._post_service_data(service_name, str(response)) | ||
|
||
def _post_service_data(self, service_name: str, data: str): | ||
"""Post's a string to Formant given the service_name string.""" | ||
response_stream = f"ros.services.response" | ||
print(f"Posting {data} to {response_stream}") | ||
try: | ||
self._fclient.post_text(response_stream, str(data), tags={"ros_service":service_name}) | ||
except Exception: | ||
logger.info("Failed to post response") | ||
return | ||
|
||
logger.info( | ||
f"Successfully send '{str(data)}' to Formant stream '{response_stream}'") | ||
|
||
def shutdown(self): | ||
self._service_checker.shutdown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
{ | ||
"service-commands": [ | ||
"rosservice" | ||
], | ||
"api-button-mapping": {}, | ||
"ros-button-mapping": { | ||
"/ros_button": { | ||
"/random": { | ||
"string": "sample-string", | ||
"int64_arr": [ | ||
1.0, | ||
2.0 | ||
], | ||
"float32": 3.0, | ||
"float64_arr": [ | ||
4.0 | ||
], | ||
"twist": { | ||
"twist.linear": { | ||
"twist.linear.x": 5.0, | ||
"twist.linear.y": 6.0, | ||
"twist.linear.z": 7.0 | ||
}, | ||
"twist.angular": { | ||
"twist.angular.x": 8.0, | ||
"twist.angular.y": 9.0, | ||
"twist.angular.z": 10.0 | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
|
||
from json import load as json_load | ||
|
||
|
||
class Config: | ||
"""The config class loads the config.json from memory.""" | ||
|
||
def __init__(self): | ||
with open("config.json", "r") as f: | ||
self.config_json = json_load(f) | ||
|
||
def get_config(self): | ||
return self.config_json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
import services | ||
import json | ||
import rospy | ||
|
||
def main(): | ||
s_checker = services.ServiceChecker() | ||
|
||
current_services = json.loads(s_checker.get_services_json()) | ||
|
||
print("Current services: \n") | ||
for service in current_services: | ||
print(f"\t{service}") | ||
|
||
print("\n") | ||
|
||
has_service = False | ||
while(not has_service): | ||
service = rospy.resolve_name(input("Please enter service: ")) | ||
if service not in current_services: | ||
print("Service does not exist.. ") | ||
continue | ||
has_service = True | ||
|
||
call_params = CallCreator(current_services[service]) | ||
|
||
call_full = json.dumps({service:call_params.get_parsed()}) | ||
|
||
print("Successfully Generated service call...\n\n") | ||
|
||
print(json.dumps(json.loads(call_full))) | ||
print() | ||
|
||
class CallCreator: | ||
|
||
def __init__(self, call_desc): | ||
self._call = {} | ||
self._call_desc = call_desc | ||
self.parsed = self.parse() | ||
|
||
def get_parsed(self): | ||
return self.parsed | ||
|
||
def parse(self): | ||
return self._parse_object(self._call_desc) | ||
|
||
def _parse_object(self, obj): | ||
values = {} | ||
|
||
properties_dict = obj["properties"] | ||
|
||
for param in properties_dict: | ||
if CallCreator.is_primitive(properties_dict[param]): | ||
values[param] = self._parse_primitive(properties_dict[param]) | ||
|
||
elif CallCreator.is_array(properties_dict[param]): | ||
values[param] = self._parse_array(properties_dict[param]) | ||
|
||
else: | ||
values[param] = self._parse_object(properties_dict[param]) | ||
|
||
return values | ||
|
||
def _parse_primitive(self, obj): | ||
if obj["type"] == "number": | ||
return float(input(f"Please enter a number for parameter '{obj['title']}': ")) | ||
|
||
if obj["type"] == "string": | ||
return input(f"Please enter a value for parameter '{obj['title']}': ") | ||
|
||
if obj["type"] == "integer": | ||
return int(input(f"Please enter an integer for parameter '{obj['title']}': ")) | ||
|
||
if obj["type"] == "boolean": | ||
i = input(f"Please enter a boolean ('true' or 'false') for parameter '{obj['title']}': ") | ||
if i.lower() == "true": | ||
return True | ||
elif i.lower() == "false": | ||
return False | ||
else: | ||
return False | ||
|
||
def _parse_array(self, obj): | ||
if obj["items"]["type"] == "string": | ||
t = "values" | ||
else: | ||
t = "numbers" | ||
|
||
values = input(f"Please enter a comma separated array of {t} for parameter '{obj['title']}':\n\t") | ||
values_split = [v.rstrip().lstrip() for v in values.split(",")] | ||
|
||
if t == "numbers": | ||
return [float(n) for n in values_split] | ||
|
||
return values_split | ||
|
||
@staticmethod | ||
def is_primitive(desc): | ||
if desc["type"] in {"string", "number", "integer", "boolean"}: | ||
return True | ||
return False | ||
|
||
@staticmethod | ||
def is_array(desc): | ||
if desc["type"] == "array": | ||
return True | ||
return False | ||
|
||
if __name__ == "__main__": | ||
main() |
Oops, something went wrong.