diff --git a/docs/reference/rosbridge.md b/docs/reference/rosbridge.md index 6e19acbc51..d0c155e4d7 100755 --- a/docs/reference/rosbridge.md +++ b/docs/reference/rosbridge.md @@ -59,25 +59,28 @@ ROSBridge.from_ros_pose(self, ros_pose) ``` -Converts a ROS pose into an OpenDR pose. +Converts an OpenDRPose2D message into an OpenDR Pose. Parameters: -- **message**: *ros_bridge.msg.Pose*\ - ROS pose to be converted into an OpenDR pose. +- **ros_pose**: *ros_bridge.msg.OpenDRPose2D*\ + ROS pose to be converted into an OpenDR Pose. #### `ROSBridge.to_ros_pose` ```python ROSBridge.to_ros_pose(self, - ros_pose) + pose) ``` -Converts an OpenDR pose into a ROS pose. +Converts an OpenDR Pose into a OpenDRPose2D msg that can carry the same information, i.e. a list of keypoints, +the pose detection confidence and the pose id. +Each keypoint is represented as an OpenDRPose2DKeypoint with x, y pixel position on input image with (0, 0) +being the top-left corner. Parameters: -- **message**: *engine.target.Pose*\ - OpenDR pose to be converted to ROS pose. +- **pose**: *engine.target.Pose*\ + OpenDR Pose to be converted to ROS OpenDRPose2D. #### `ROSBridge.to_ros_category` diff --git a/projects/opendr_ws/src/perception/README.md b/projects/opendr_ws/src/perception/README.md index ba0ab81059..7f0df49566 100755 --- a/projects/opendr_ws/src/perception/README.md +++ b/projects/opendr_ws/src/perception/README.md @@ -31,14 +31,25 @@ Assuming that you have already [activated the OpenDR environment](../../../../do rosrun usb_cam usb_cam_node ``` -2. You are then ready to start the pose detection node +2. You are then ready to start the pose detection node (use `-h` to print out help for various arguments) ```shell rosrun perception pose_estimation.py ``` 3. You can examine the annotated image stream using `rqt_image_view` (select the topic `/opendr/image_pose_annotated`) or - `rostopic echo /opendr/poses` + `rostopic echo /opendr/poses`. + +Note that to use the pose messages properly, you need to create an appropriate subscriber that will convert the ROS pose messages back to OpenDR poses which you can access as described in the [documentation](https://github.com/opendr-eu/opendr/blob/master/docs/reference/engine-target.md#posekeypoints-confidence): +```python + ... + rospy.Subscriber("opendr/poses", Detection2DArray, self.callback) + ... + def callback(self, data): + opendr_pose = self.bridge.from_ros_pose(data) + print(opendr_pose) + print(opendr_pose['r_eye']) +``` ## Fall Detection ROS Node Assuming that you have already [activated the OpenDR environment](../../../../docs/reference/installation.md), [built your workspace](../../README.md) and started roscore (i.e., just run `roscore`), then you can diff --git a/projects/opendr_ws/src/perception/scripts/face_detection_retinaface.py b/projects/opendr_ws/src/perception/scripts/face_detection_retinaface.py index 7227951b17..72ecbc94cb 100755 --- a/projects/opendr_ws/src/perception/scripts/face_detection_retinaface.py +++ b/projects/opendr_ws/src/perception/scripts/face_detection_retinaface.py @@ -13,115 +13,130 @@ # See the License for the specific language governing permissions and # limitations under the License. -import rospy +import argparse import mxnet as mx + +import rospy from vision_msgs.msg import Detection2DArray from sensor_msgs.msg import Image as ROS_Image from opendr_bridge import ROSBridge + +from opendr.engine.data import Image from opendr.perception.object_detection_2d import RetinaFaceLearner from opendr.perception.object_detection_2d import draw_bounding_boxes -from opendr.engine.data import Image class FaceDetectionNode: - def __init__(self, input_image_topic="/usb_cam/image_raw", output_image_topic="/opendr/image_boxes_annotated", - face_detections_topic="/opendr/faces", device="cuda", backbone="resnet"): + + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", + output_rgb_image_topic="/opendr/image_faces_annotated", detections_topic="/opendr/faces", + device="cuda", backbone="resnet"): """ - Creates a ROS Node for face detection - :param input_image_topic: Topic from which we are reading the input image - :type input_image_topic: str - :param output_image_topic: Topic to which we are publishing the annotated image (if None, we are not publishing - annotated image) - :type output_image_topic: str - :param face_detections_topic: Topic to which we are publishing the annotations (if None, we are not publishing - annotated pose annotations) - :type face_detections_topic: str + Creates a ROS Node for face detection with Retinaface. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the annotated image (if None, no annotated + image is published) + :type output_rgb_image_topic: str + :param detections_topic: Topic to which we are publishing the annotations (if None, no face detection message + is published) + :type detections_topic: str :param device: device on which we are running inference ('cpu' or 'cuda') :type device: str - :param backbone: retinaface backbone, options are ('mnet' and 'resnet'), where 'mnet' detects masked faces as well + :param backbone: retinaface backbone, options are either 'mnet' or 'resnet', + where 'mnet' detects masked faces as well :type backbone: str """ + self.input_rgb_image_topic = input_rgb_image_topic - # Initialize the face detector - self.face_detector = RetinaFaceLearner(backbone=backbone, device=device) - self.face_detector.download(path=".", verbose=True) - self.face_detector.load("retinaface_{}".format(backbone)) - self.class_names = ["face", "masked_face"] - - # Initialize OpenDR ROSBridge object - self.bridge = ROSBridge() - - # setup communications - if output_image_topic is not None: - self.image_publisher = rospy.Publisher(output_image_topic, ROS_Image, queue_size=10) + if output_rgb_image_topic is not None: + self.image_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) else: self.image_publisher = None - if face_detections_topic is not None: - self.face_publisher = rospy.Publisher(face_detections_topic, Detection2DArray, queue_size=10) + if detections_topic is not None: + self.face_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=1) else: self.face_publisher = None - rospy.Subscriber(input_image_topic, ROS_Image, self.callback) + self.bridge = ROSBridge() + + # Initialize the face detector + self.face_detector = RetinaFaceLearner(backbone=backbone, device=device) + self.face_detector.download(path=".", verbose=True) + self.face_detector.load("retinaface_{}".format(backbone)) + self.class_names = ["face", "masked_face"] + + def listen(self): + """ + Start the node and begin processing input data. + """ + rospy.init_node('face_detection_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Face detection RetinaFace node started.") + rospy.spin() def callback(self, data): """ - Callback that process the input data and publishes to the corresponding topics - :param data: input message + Callback that processes the input data and publishes to the corresponding topics. + :param data: Input image message :type data: sensor_msgs.msg.Image """ - # Convert sensor_msgs.msg.Image into OpenDR Image image = self.bridge.from_ros_image(data, encoding='bgr8') - # Run pose estimation + # Run face detection boxes = self.face_detector.infer(image) # Get an OpenCV image back image = image.opencv() - - # Convert detected boxes to ROS type and publish - ros_boxes = self.bridge.to_ros_boxes(boxes) + # Publish detections in ROS message + ros_boxes = self.bridge.to_ros_boxes(boxes) # Convert to ROS boxes if self.face_publisher is not None: self.face_publisher.publish(ros_boxes) - rospy.loginfo("Published face boxes") - # Annotate image and publish result - # NOTE: converting back to OpenDR BoundingBoxList is unnecessary here, - # only used to test the corresponding bridge methods - odr_boxes = self.bridge.from_ros_boxes(ros_boxes) - image = draw_bounding_boxes(image, odr_boxes, class_names=self.class_names) if self.image_publisher is not None: - message = self.bridge.to_ros_image(Image(image), encoding='bgr8') - self.image_publisher.publish(message) - rospy.loginfo("Published annotated image") + # Annotate image with face detection boxes + image = draw_bounding_boxes(image, boxes, class_names=self.class_names) + # Convert the annotated OpenDR image to ROS2 image message using bridge and publish it + self.image_publisher.publish(self.bridge.to_ros_image(Image(image), encoding='bgr8')) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_rgb_image_topic", help="Topic name for output annotated rgb image", + type=str, default="/opendr/image_faces_annotated") + parser.add_argument("-d", "--detections_topic", help="Topic name for detection messages", + type=str, default="/opendr/faces") + parser.add_argument("--device", help="Device to use, either \"cpu\" or \"cuda\", defaults to \"cuda\"", + type=str, default="cuda", choices=["cuda", "cpu"]) + parser.add_argument("--backbone", + help="Retinaface backbone, options are either 'mnet' or 'resnet', where 'mnet' detects " + "masked faces as well", + type=str, default="resnet", choices=["resnet", "mnet"]) + args = parser.parse_args() - -if __name__ == '__main__': - # Automatically run on GPU/CPU try: - if mx.context.num_gpus() > 0: - print("GPU found.") - device = 'cuda' - else: + if args.device == "cuda" and mx.context.num_gpus() > 0: + device = "cuda" + elif args.device == "cuda": print("GPU not found. Using CPU instead.") - device = 'cpu' + device = "cpu" + else: + print("Using CPU.") + device = "cpu" except: - device = 'cpu' + print("Using CPU.") + device = "cpu" - # initialize ROS node - rospy.init_node('opendr_face_detection', anonymous=True) - rospy.loginfo("Face detection node started!") + face_detection_node = FaceDetectionNode(device=device, backbone=args.backbone, + input_rgb_image_topic=args.input_rgb_image_topic, + output_rgb_image_topic=args.output_rgb_image_topic, + detections_topic=args.detections_topic) + face_detection_node.listen() - # get network backbone ("mnet" detects masked faces as well) - backbone = rospy.get_param("~backbone", "resnet") - input_image_topic = rospy.get_param("~input_image_topic", "/videofile/image_raw") - rospy.loginfo("Using backbone: {}".format(backbone)) - assert backbone in ["resnet", "mnet"], "backbone should be one of ['resnet', 'mnet']" - - # created node object - face_detection_node = FaceDetectionNode(device=device, backbone=backbone, - input_image_topic=input_image_topic) - # begin ROS communications - rospy.spin() +if __name__ == '__main__': + main() diff --git a/projects/opendr_ws/src/perception/scripts/face_recognition.py b/projects/opendr_ws/src/perception/scripts/face_recognition.py index 9bbe783f33..34e97e10fc 100755 --- a/projects/opendr_ws/src/perception/scripts/face_recognition.py +++ b/projects/opendr_ws/src/perception/scripts/face_recognition.py @@ -13,14 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import argparse +import cv2 +import torch import rospy -import torch -from vision_msgs.msg import ObjectHypothesis from std_msgs.msg import String +from vision_msgs.msg import ObjectHypothesis from sensor_msgs.msg import Image as ROS_Image from opendr_bridge import ROSBridge +from opendr.engine.data import Image from opendr.perception.face_recognition import FaceRecognitionLearner from opendr.perception.object_detection_2d import RetinaFaceLearner from opendr.perception.object_detection_2d.datasets.transforms import BoundingBoxListToNumpyArray @@ -28,24 +31,48 @@ class FaceRecognitionNode: - def __init__(self, input_image_topic="/usb_cam/image_raw", - face_recognition_topic="/opendr/face_recognition", - face_id_topic="/opendr/face_recognition_id", - database_path="./database", device="cuda", - backbone='mobilefacenet'): + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", + output_rgb_image_topic="/opendr/image_face_reco_annotated", + detections_topic="/opendr/face_recognition", detections_id_topic="/opendr/face_recognition_id", + database_path="./database", device="cuda", backbone="mobilefacenet"): """ - Creates a ROS Node for face recognition - :param input_image_topic: Topic from which we are reading the input image - :type input_image_topic: str - :param face_recognition_topic: Topic to which we are publishing the recognized face info - (if None, we are not publishing the info) - :type face_recognition_topic: str - :param face_id_topic: Topic to which we are publishing the ID of the recognized person - (if None, we are not publishing the ID) - :type face_id_topic: str - :param device: device on which we are running inference ('cpu' or 'cuda') + Creates a ROS Node for face recognition. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the annotated image (if None, no annotated + image is published) + :type output_rgb_image_topic: str + :param detections_topic: Topic to which we are publishing the recognized face information (if None, + no face recognition message is published) + :type detections_topic: str + :param detections_id_topic: Topic to which we are publishing the ID of the recognized person (if None, + no ID message is published) + :type detections_id_topic: str + :param device: Device on which we are running inference ('cpu' or 'cuda') :type device: str + :param backbone: Backbone network + :type backbone: str + :param database_path: Path of the directory where the images of the faces to be recognized are stored + :type database_path: str """ + self.input_rgb_image_topic = input_rgb_image_topic + + if output_rgb_image_topic is not None: + self.image_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) + else: + self.image_publisher = None + + if detections_topic is not None: + self.face_publisher = rospy.Publisher(detections_topic, ObjectHypothesis, queue_size=1) + else: + self.face_publisher = None + + if detections_id_topic is not None: + self.face_id_publisher = rospy.Publisher(detections_id_topic, String, queue_size=1) + else: + self.face_id_publisher = None + + self.bridge = ROSBridge() # Initialize the face recognizer self.recognizer = FaceRecognitionLearner(device=device, mode='backbone_only', backbone=backbone) @@ -59,27 +86,24 @@ def __init__(self, input_image_topic="/usb_cam/image_raw", self.face_detector.load("retinaface_{}".format('mnet')) self.class_names = ["face", "masked_face"] - if face_recognition_topic is not None: - self.face_publisher = rospy.Publisher(face_recognition_topic, ObjectHypothesis, queue_size=10) - else: - self.face_publisher = None - - if face_id_topic is not None: - self.face_id_publisher = rospy.Publisher(face_id_topic, String, queue_size=10) - else: - self.face_id_publisher = None - - self.bridge = ROSBridge() - rospy.Subscriber(input_image_topic, ROS_Image, self.callback) + def listen(self): + """ + Start the node and begin processing input data. + """ + rospy.init_node('face_recognition_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Face recognition node started.") + rospy.spin() def callback(self, data): """ - Callback that process the input data and publishes to the corresponding topics - :param data: input message + Callback that processes the input data and publishes to the corresponding topics. + :param data: Input image message :type data: sensor_msgs.msg.Image """ # Convert sensor_msgs.msg.Image into OpenDR Image - image = self.bridge.from_ros_image(data) + image = self.bridge.from_ros_image(data, encoding='bgr8') + # Get an OpenCV image back image = image.opencv() # Run face detection and recognition @@ -90,59 +114,71 @@ def callback(self, data): boxes = bounding_boxes[:, :4] for idx, box in enumerate(boxes): (startX, startY, endX, endY) = int(box[0]), int(box[1]), int(box[2]), int(box[3]) - img = image[startY:endY, startX:endX] - result = self.recognizer.infer(img) - - if result.data is not None: - if self.face_publisher is not None: - ros_face = self.bridge.to_ros_face(result) - self.face_publisher.publish(ros_face) - - if self.face_id_publisher is not None: - ros_face_id = self.bridge.to_ros_face_id(result) - self.face_id_publisher.publish(ros_face_id.data) - - else: - result.description = "Unknown" - if self.face_publisher is not None: - ros_face = self.bridge.to_ros_face(result) - self.face_publisher.publish(ros_face) + frame = image[startY:endY, startX:endX] + result = self.recognizer.infer(frame) + + # Publish face information and ID + if self.face_publisher is not None: + self.face_publisher.publish(self.bridge.to_ros_face(result)) + + if self.face_id_publisher is not None: + self.face_id_publisher.publish(self.bridge.to_ros_face_id(result)) + + if self.image_publisher is not None: + if result.description != 'Not found': + color = (0, 255, 0) + else: + color = (0, 0, 255) + # Annotate image with face detection/recognition boxes + cv2.rectangle(image, (startX, startY), (endX, endY), color, thickness=2) + cv2.putText(image, result.description, (startX, endY - 10), cv2.FONT_HERSHEY_SIMPLEX, + 1, color, 2, cv2.LINE_AA) + + if self.image_publisher is not None: + # Convert the annotated OpenDR image to ROS2 image message using bridge and publish it + self.image_publisher.publish(self.bridge.to_ros_image(Image(image), encoding='bgr8')) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_rgb_image_topic", help="Topic name for output annotated rgb image", + type=str, default="/opendr/image_face_reco_annotated") + parser.add_argument("-d", "--detections_topic", help="Topic name for detection messages", + type=str, default="/opendr/face_recognition") + parser.add_argument("-id", "--detections_id_topic", help="Topic name for detection ID messages", + type=str, default="/opendr/face_recognition_id") + parser.add_argument("--device", help="Device to use, either \"cpu\" or \"cuda\", defaults to \"cuda\"", + type=str, default="cuda", choices=["cuda", "cpu"]) + parser.add_argument("--backbone", help="Backbone network, defaults to mobilefacenet", + type=str, default="mobilefacenet", choices=["mobilefacenet"]) + parser.add_argument("--dataset_path", + help="Path of the directory where the images of the faces to be recognized are stored, " + "defaults to \"./database\"", + type=str, default="./database") + args = parser.parse_args() - if self.face_id_publisher is not None: - ros_face_id = self.bridge.to_ros_face_id(result) - self.face_id_publisher.publish(ros_face_id.data) + try: + if args.device == "cuda" and torch.cuda.is_available(): + device = "cuda" + elif args.device == "cuda": + print("GPU not found. Using CPU instead.") + device = "cpu" + else: + print("Using CPU.") + device = "cpu" + except: + print("Using CPU.") + device = "cpu" - # We get can the data back using self.bridge.from_ros_face(ros_face) - # e.g. - # face = self.bridge.from_ros_face(ros_face) - # face.description = self.recognizer.database[face.id][0] + face_recognition_node = FaceRecognitionNode(device=device, backbone=args.backbone, database_path=args.dataset_path, + input_rgb_image_topic=args.input_rgb_image_topic, + output_rgb_image_topic=args.output_rgb_image_topic, + detections_topic=args.detections_topic, + detections_id_topic=args.detections_id_topic) + face_recognition_node.listen() if __name__ == '__main__': - # Select the device for running the - try: - if torch.cuda.is_available(): - print("GPU found.") - device = 'cuda' - else: - print("GPU not found. Using CPU instead.") - device = 'cpu' - except: - device = 'cpu' - - # initialize ROS node - rospy.init_node('opendr_face_recognition', anonymous=True) - rospy.loginfo("Face recognition node started!") - - # get network backbone - backbone = rospy.get_param("~backbone", "mobilefacenet") - input_image_topic = rospy.get_param("~input_image_topic", "/usb_cam/image_raw") - database_path = rospy.get_param('~database_path', './') - rospy.loginfo("Using backbone: {}".format(backbone)) - assert backbone in ["mobilefacenet", "ir_50"], "backbone should be one of ['mobilefacenet', 'ir_50']" - - face_recognition_node = FaceRecognitionNode(device=device, backbone=backbone, - input_image_topic=input_image_topic, - database_path=database_path) - # begin ROS communications - rospy.spin() + main() diff --git a/projects/opendr_ws/src/perception/scripts/fall_detection.py b/projects/opendr_ws/src/perception/scripts/fall_detection.py index ef456d2ec8..077e4edad4 100644 --- a/projects/opendr_ws/src/perception/scripts/fall_detection.py +++ b/projects/opendr_ws/src/perception/scripts/fall_detection.py @@ -13,76 +13,89 @@ # See the License for the specific language governing permissions and # limitations under the License. +import cv2 +import argparse +import torch import rospy -import torch -import cv2 from vision_msgs.msg import Detection2DArray from sensor_msgs.msg import Image as ROS_Image from opendr_bridge import ROSBridge + +from opendr.engine.data import Image +from opendr.engine.target import BoundingBox, BoundingBoxList from opendr.perception.pose_estimation import get_bbox from opendr.perception.pose_estimation import LightweightOpenPoseLearner from opendr.perception.fall_detection import FallDetectorLearner -from opendr.engine.data import Image -from opendr.engine.target import BoundingBox, BoundingBoxList class FallDetectionNode: - def __init__(self, input_image_topic="/usb_cam/image_raw", output_image_topic="/opendr/image_fall_annotated", - fall_annotations_topic="/opendr/falls", device="cuda"): + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", + output_rgb_image_topic="/opendr/image_fallen_annotated", detections_topic="/opendr/fallen", + device="cuda", num_refinement_stages=2, use_stride=False, half_precision=False): """ - Creates a ROS Node for fall detection - :param input_image_topic: Topic from which we are reading the input image - :type input_image_topic: str - :param output_image_topic: Topic to which we are publishing the annotated image (if None, we are not publishing - annotated image) - :type output_image_topic: str - :param fall_annotations_topic: Topic to which we are publishing the annotations (if None, we are not publishing - annotated fall annotations) - :type fall_annotations_topic: str + Creates a ROS Node for rule-based fall detection based on Lightweight OpenPose. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the annotated image (if None, no annotated + image is published) + :type output_rgb_image_topic: str + :param detections_topic: Topic to which we are publishing the annotations (if None, no fall detection message + is published) + :type detections_topic: str :param device: device on which we are running inference ('cpu' or 'cuda') :type device: str + :param num_refinement_stages: Specifies the number of pose estimation refinement stages are added on the + model's head, including the initial stage. Can be 0, 1 or 2, with more stages meaning slower and more accurate + inference + :type num_refinement_stages: int + :param use_stride: Whether to add a stride value in the model, which reduces accuracy but increases + inference speed + :type use_stride: bool + :param half_precision: Enables inference using half (fp16) precision instead of single (fp32) precision. + Valid only for GPU-based inference + :type half_precision: bool """ - if output_image_topic is not None: - self.image_publisher = rospy.Publisher(output_image_topic, ROS_Image, queue_size=10) + self.input_rgb_image_topic = input_rgb_image_topic + + if output_rgb_image_topic is not None: + self.image_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) else: self.image_publisher = None - if fall_annotations_topic is not None: - self.fall_publisher = rospy.Publisher(fall_annotations_topic, Detection2DArray, queue_size=10) + if detections_topic is not None: + self.fall_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=1) else: self.fall_publisher = None - self.input_image_topic = input_image_topic - self.bridge = ROSBridge() - # Initialize the pose estimation - self.pose_estimator = LightweightOpenPoseLearner(device=device, num_refinement_stages=2, - mobilenet_use_stride=False, - half_precision=False) + # Initialize the pose estimation learner + self.pose_estimator = LightweightOpenPoseLearner(device=device, num_refinement_stages=num_refinement_stages, + mobilenet_use_stride=use_stride, + half_precision=half_precision) self.pose_estimator.download(path=".", verbose=True) self.pose_estimator.load("openpose_default") + # Initialize the fall detection learner self.fall_detector = FallDetectorLearner(self.pose_estimator) def listen(self): """ - Start the node and begin processing input data + Start the node and begin processing input data. """ - rospy.init_node('opendr_fall_detection', anonymous=True) - rospy.Subscriber(self.input_image_topic, ROS_Image, self.callback) - rospy.loginfo("Fall detection node started!") + rospy.init_node('fall_detection_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Fall detection node started.") rospy.spin() def callback(self, data): """ - Callback that process the input data and publishes to the corresponding topics - :param data: input message + Callback that processes the input data and publishes to the corresponding topics. + :param data: Input image message :type data: sensor_msgs.msg.Image """ - # Convert sensor_msgs.msg.Image into OpenDR Image image = self.bridge.from_ros_image(data, encoding='bgr8') @@ -100,34 +113,64 @@ def callback(self, data): if fallen == 1: color = (0, 0, 255) x, y, w, h = get_bbox(pose) - bbox = BoundingBox(left=x, top=y, width=w, height=h, name=0) - bboxes.data.append(bbox) cv2.rectangle(image, (x, y), (x + w, y + h), color, 2) - cv2.putText(image, "Detected fallen person", (5, 55), cv2.FONT_HERSHEY_SIMPLEX, - 0.75, color, 1, cv2.LINE_AA) + cv2.putText(image, "Fallen person", (x, y + h - 10), cv2.FONT_HERSHEY_SIMPLEX, + 1, color, 2, cv2.LINE_AA) # Convert detected boxes to ROS type and publish + bboxes.data.append(BoundingBox(left=x, top=y, width=w, height=h, name="fallen person")) ros_boxes = self.bridge.to_ros_boxes(bboxes) if self.fall_publisher is not None: self.fall_publisher.publish(ros_boxes) if self.image_publisher is not None: - message = self.bridge.to_ros_image(Image(image), encoding='bgr8') - self.image_publisher.publish(message) - + self.image_publisher.publish(self.bridge.to_ros_image(Image(image), encoding='bgr8')) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_rgb_image_topic", help="Topic name for output annotated rgb image", + type=str, default="/opendr/image_fallen_annotated") + parser.add_argument("-d", "--detections_topic", help="Topic name for detection messages", + type=str, default="/opendr/fallen") + parser.add_argument("--device", help="Device to use, either \"cpu\" or \"cuda\", defaults to \"cuda\"", + type=str, default="cuda", choices=["cuda", "cpu"]) + parser.add_argument("--accelerate", help="Enables acceleration flags (e.g., stride)", default=False, + action="store_true") + args = parser.parse_args() -if __name__ == '__main__': - # Select the device for running the try: - if torch.cuda.is_available(): - print("GPU found.") - device = 'cuda' - else: + if args.device == "cuda" and torch.cuda.is_available(): + device = "cuda" + elif args.device == "cuda": print("GPU not found. Using CPU instead.") - device = 'cpu' + device = "cpu" + else: + print("Using CPU.") + device = "cpu" except: - device = 'cpu' - - fall_detection_node = FallDetectionNode(device=device) + print("Using CPU.") + device = "cpu" + + if args.accelerate: + stride = True + stages = 0 + half_prec = True + else: + stride = False + stages = 2 + half_prec = False + + fall_detection_node = FallDetectionNode(device=device, + input_rgb_image_topic=args.input_rgb_image_topic, + output_rgb_image_topic=args.output_rgb_image_topic, + detections_topic=args.detections_topic, + num_refinement_stages=stages, use_stride=stride, half_precision=half_prec) fall_detection_node.listen() + + +if __name__ == '__main__': + main() diff --git a/projects/opendr_ws/src/perception/scripts/object_detection_2d_centernet.py b/projects/opendr_ws/src/perception/scripts/object_detection_2d_centernet.py index c1615f99a7..ffbbb0054d 100755 --- a/projects/opendr_ws/src/perception/scripts/object_detection_2d_centernet.py +++ b/projects/opendr_ws/src/perception/scripts/object_detection_2d_centernet.py @@ -13,110 +13,126 @@ # See the License for the specific language governing permissions and # limitations under the License. -import rospy +import argparse import mxnet as mx -import numpy as np + +import rospy from vision_msgs.msg import Detection2DArray from sensor_msgs.msg import Image as ROS_Image -from opendr.engine.data import Image from opendr_bridge import ROSBridge + +from opendr.engine.data import Image from opendr.perception.object_detection_2d import CenterNetDetectorLearner from opendr.perception.object_detection_2d import draw_bounding_boxes class ObjectDetectionCenterNetNode: - def __init__(self, input_image_topic="/usb_cam/image_raw", output_image_topic="/opendr/image_boxes_annotated", - detections_topic="/opendr/objects", device="cuda", backbone="resnet50_v1b"): + + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", + output_rgb_image_topic="/opendr/image_objects_annotated", detections_topic="/opendr/objects", + device="cuda", backbone="resnet50_v1b"): """ - Creates a ROS Node for face detection - :param input_image_topic: Topic from which we are reading the input image - :type input_image_topic: str - :param output_image_topic: Topic to which we are publishing the annotated image (if None, we are not publishing - annotated image) - :type output_image_topic: str - :param detections_topic: Topic to which we are publishing the annotations (if None, we are not publishing - annotated pose annotations) + Creates a ROS Node for object detection with Centernet. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the annotated image (if None, no annotated + image is published) + :type output_rgb_image_topic: str + :param detections_topic: Topic to which we are publishing the annotations (if None, no object detection message + is published) :type detections_topic: str :param device: device on which we are running inference ('cpu' or 'cuda') :type device: str :param backbone: backbone network :type backbone: str """ + self.input_rgb_image_topic = input_rgb_image_topic - # Initialize the face detector - self.object_detector = CenterNetDetectorLearner(backbone=backbone, device=device) - self.object_detector.download(path=".", verbose=True) - self.object_detector.load("centernet_default") - self.class_names = self.object_detector.classes - - # Initialize OpenDR ROSBridge object - self.bridge = ROSBridge() - - # setup communications - if output_image_topic is not None: - self.image_publisher = rospy.Publisher(output_image_topic, ROS_Image, queue_size=10) + if output_rgb_image_topic is not None: + self.image_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) else: self.image_publisher = None if detections_topic is not None: - self.bbox_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=10) + self.object_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=1) else: - self.bbox_publisher = None + self.object_publisher = None - rospy.Subscriber(input_image_topic, ROS_Image, self.callback) + self.bridge = ROSBridge() + + # Initialize the object detector + self.object_detector = CenterNetDetectorLearner(backbone=backbone, device=device) + self.object_detector.download(path=".", verbose=True) + self.object_detector.load("centernet_default") + + def listen(self): + """ + Start the node and begin processing input data. + """ + rospy.init_node('object_detection_centernet_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Object detection Centernet node started.") + rospy.spin() def callback(self, data): """ - Callback that process the input data and publishes to the corresponding topics + Callback that processes the input data and publishes to the corresponding topics. :param data: input message :type data: sensor_msgs.msg.Image """ - # Convert sensor_msgs.msg.Image into OpenDR Image image = self.bridge.from_ros_image(data, encoding='bgr8') - # Run pose estimation + # Run object detection boxes = self.object_detector.infer(image, threshold=0.45, keep_size=False) # Get an OpenCV image back - image = np.float32(image.opencv()) - - # Convert detected boxes to ROS type and publish - ros_boxes = self.bridge.to_ros_boxes(boxes) - if self.bbox_publisher is not None: - self.bbox_publisher.publish(ros_boxes) - rospy.loginfo("Published face boxes") - - # Annotate image and publish result - # NOTE: converting back to OpenDR BoundingBoxList is unnecessary here, - # only used to test the corresponding bridge methods - odr_boxes = self.bridge.from_ros_boxes(ros_boxes) - image = draw_bounding_boxes(image, odr_boxes, class_names=self.class_names) - if self.image_publisher is not None: - message = self.bridge.to_ros_image(Image(image), encoding='bgr8') - self.image_publisher.publish(message) - rospy.loginfo("Published annotated image") + image = image.opencv() + # Publish detections in ROS message + ros_boxes = self.bridge.to_ros_boxes(boxes) # Convert to ROS boxes + if self.object_publisher is not None: + self.object_publisher.publish(ros_boxes) + + if self.image_publisher is not None: + # Annotate image with object detection boxes + image = draw_bounding_boxes(image, boxes, class_names=self.object_detector.classes) + # Convert the annotated OpenDR image to ROS2 image message using bridge and publish it + self.image_publisher.publish(self.bridge.to_ros_image(Image(image), encoding='bgr8')) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_rgb_image_topic", help="Topic name for output annotated rgb image", + type=str, default="/opendr/image_objects_annotated") + parser.add_argument("-d", "--detections_topic", help="Topic name for detection messages", + type=str, default="/opendr/objects") + parser.add_argument("--device", help="Device to use (cpu, cuda)", type=str, default="cuda", choices=["cuda", "cpu"]) + parser.add_argument("--backbone", help="Backbone network, defaults to \"resnet50_v1b\"", + type=str, default="resnet50_v1b", choices=["resnet50_v1b"]) + args = parser.parse_args() -if __name__ == '__main__': - # Automatically run on GPU/CPU try: - if mx.context.num_gpus() > 0: - print("GPU found.") - device = 'cuda' - else: + if args.device == "cuda" and mx.context.num_gpus() > 0: + device = "cuda" + elif args.device == "cuda": print("GPU not found. Using CPU instead.") - device = 'cpu' + device = "cpu" + else: + print("Using CPU.") + device = "cpu" except: - device = 'cpu' + print("Using CPU.") + device = "cpu" - # initialize ROS node - rospy.init_node('opendr_object_detection', anonymous=True) - rospy.loginfo("Object detection node started!") + object_detection_centernet_node = ObjectDetectionCenterNetNode(device=device, backbone=args.backbone, + input_rgb_image_topic=args.input_rgb_image_topic, + output_rgb_image_topic=args.output_rgb_image_topic, + detections_topic=args.detections_topic) + object_detection_centernet_node.listen() - input_image_topic = rospy.get_param("~input_image_topic", "/videofile/image_raw") - # created node object - object_detection_node = ObjectDetectionCenterNetNode(device=device, input_image_topic=input_image_topic) - # begin ROS communications - rospy.spin() +if __name__ == '__main__': + main() diff --git a/projects/opendr_ws/src/perception/scripts/object_detection_2d_detr.py b/projects/opendr_ws/src/perception/scripts/object_detection_2d_detr.py index ec98c4ddf0..42f7000966 100644 --- a/projects/opendr_ws/src/perception/scripts/object_detection_2d_detr.py +++ b/projects/opendr_ws/src/perception/scripts/object_detection_2d_detr.py @@ -14,46 +14,48 @@ # limitations under the License. -import rospy +import argparse import torch -import numpy as np + +import rospy from vision_msgs.msg import Detection2DArray from sensor_msgs.msg import Image as ROS_Image -from opendr.engine.data import Image from opendr_bridge import ROSBridge -from opendr.perception.object_detection_2d.detr.algorithm.util.draw import draw + +from opendr.engine.data import Image from opendr.perception.object_detection_2d import DetrLearner +from opendr.perception.object_detection_2d.detr.algorithm.util.draw import draw -class DetrNode: +class ObjectDetectionDetrNode: - def __init__(self, input_image_topic="/usb_cam/image_raw", output_image_topic="/opendr/image_boxes_annotated", - detection_annotations_topic="/opendr/objects", device="cuda"): + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", + output_rgb_image_topic="/opendr/image_objects_annotated", detections_topic="/opendr/objects", + device="cuda"): """ - Creates a ROS Node for object detection with DETR - :param input_image_topic: Topic from which we are reading the input image - :type input_image_topic: str - :param output_image_topic: Topic to which we are publishing the annotated image (if None, we are not publishing - annotated image) - :type output_image_topic: str - :param detection_annotations_topic: Topic to which we are publishing the annotations (if None, we are not publishing - annotations) - :type detection_annotations_topic: str + Creates a ROS Node for object detection with DETR. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the annotated image (if None, no annotated + image is published) + :type output_rgb_image_topic: str + :param detections_topic: Topic to which we are publishing the annotations (if None, no object detection message + is published) + :type detections_topic: str :param device: device on which we are running inference ('cpu' or 'cuda') :type device: str """ + self.input_rgb_image_topic = input_rgb_image_topic - if output_image_topic is not None: - self.image_publisher = rospy.Publisher(output_image_topic, ROS_Image, queue_size=10) + if output_rgb_image_topic is not None: + self.image_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) else: self.image_publisher = None - if detection_annotations_topic is not None: - self.detection_publisher = rospy.Publisher(detection_annotations_topic, Detection2DArray, queue_size=10) + if detections_topic is not None: + self.object_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=1) else: - self.detection_publisher = None - - rospy.Subscriber(input_image_topic, ROS_Image, self.callback) + self.object_publisher = None self.bridge = ROSBridge() @@ -63,52 +65,71 @@ def __init__(self, input_image_topic="/usb_cam/image_raw", output_image_topic="/ def listen(self): """ - Start the node and begin processing input data + Start the node and begin processing input data. """ - rospy.init_node('detr', anonymous=True) - rospy.loginfo("DETR node started!") + rospy.init_node('object_detection_detr_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Object detection DETR node started.") rospy.spin() def callback(self, data): """ - Callback that process the input data and publishes to the corresponding topics + Callback that processes the input data and publishes to the corresponding topics. :param data: input message :type data: sensor_msgs.msg.Image """ - # Convert sensor_msgs.msg.Image into OpenDR Image image = self.bridge.from_ros_image(data, encoding='bgr8') - # Run detection estimation + # Run object detection boxes = self.detr_learner.infer(image) # Get an OpenCV image back - image = np.float32(image.opencv()) + image = image.opencv() - # Annotate image and publish results: - if self.detection_publisher is not None: - ros_detection = self.bridge.to_ros_bounding_box_list(boxes) - self.detection_publisher.publish(ros_detection) - # We get can the data back using self.bridge.from_ros_bounding_box_list(ros_detection) - # e.g., opendr_detection = self.bridge.from_ros_bounding_box_list(ros_detection) + # Publish detections in ROS message + ros_boxes = self.bridge.to_ros_bounding_box_list(boxes) # Convert to ROS bounding_box_list + if self.object_publisher is not None: + self.object_publisher.publish(ros_boxes) if self.image_publisher is not None: + # Annotate image with object detection boxes image = draw(image, boxes) - message = self.bridge.to_ros_image(Image(image), encoding='bgr8') - self.image_publisher.publish(message) - + # Convert the annotated OpenDR image to ROS2 image message using bridge and publish it + self.image_publisher.publish(self.bridge.to_ros_image(Image(image), encoding='bgr8')) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_rgb_image_topic", help="Topic name for output annotated rgb image", + type=str, default="/opendr/image_objects_annotated") + parser.add_argument("-d", "--detections_topic", help="Topic name for detection messages", + type=str, default="/opendr/objects") + parser.add_argument("--device", help="Device to use, either \"cpu\" or \"cuda\", defaults to \"cuda\"", + type=str, default="cuda", choices=["cuda", "cpu"]) + args = parser.parse_args() -if __name__ == '__main__': - # Select the device for running the try: - if torch.cuda.is_available(): - print("GPU found.") - device = 'cuda' - else: + if args.device == "cuda" and torch.cuda.is_available(): + device = "cuda" + elif args.device == "cuda": print("GPU not found. Using CPU instead.") - device = 'cpu' + device = "cpu" + else: + print("Using CPU.") + device = "cpu" except: - device = 'cpu' + print("Using CPU.") + device = "cpu" + + object_detection_detr_node = ObjectDetectionDetrNode(device=device, + input_rgb_image_topic=args.input_rgb_image_topic, + output_rgb_image_topic=args.output_rgb_image_topic, + detections_topic=args.detections_topic) + object_detection_detr_node.listen() - detection_estimation_node = DetrNode(device=device) - detection_estimation_node.listen() + +if __name__ == '__main__': + main() diff --git a/projects/opendr_ws/src/perception/scripts/object_detection_2d_ssd.py b/projects/opendr_ws/src/perception/scripts/object_detection_2d_ssd.py index f0dd7ca1d3..062ae739a9 100755 --- a/projects/opendr_ws/src/perception/scripts/object_detection_2d_ssd.py +++ b/projects/opendr_ws/src/perception/scripts/object_detection_2d_ssd.py @@ -13,12 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import rospy +import argparse import mxnet as mx -import numpy as np + +import rospy from vision_msgs.msg import Detection2DArray from sensor_msgs.msg import Image as ROS_Image from opendr_bridge import ROSBridge + from opendr.engine.data import Image from opendr.perception.object_detection_2d import SingleShotDetectorLearner from opendr.perception.object_detection_2d import draw_bounding_boxes @@ -26,114 +28,138 @@ class ObjectDetectionSSDNode: - def __init__(self, input_image_topic="/usb_cam/image_raw", output_image_topic="/opendr/image_boxes_annotated", - detections_topic="/opendr/objects", device="cuda", backbone="vgg16_atrous", nms_type='default'): + + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", + output_rgb_image_topic="/opendr/image_objects_annotated", detections_topic="/opendr/objects", + device="cuda", backbone="vgg16_atrous", nms_type='default'): """ - Creates a ROS Node for face detection - :param input_image_topic: Topic from which we are reading the input image - :type input_image_topic: str - :param output_image_topic: Topic to which we are publishing the annotated image (if None, we are not publishing - annotated image) - :type output_image_topic: str - :param detections_topic: Topic to which we are publishing the annotations (if None, we are not publishing - annotated pose annotations) + Creates a ROS Node for object detection with SSD. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the annotated image (if None, no annotated + image is published) + :type output_rgb_image_topic: str + :param detections_topic: Topic to which we are publishing the annotations (if None, no object detection message + is published) :type detections_topic: str :param device: device on which we are running inference ('cpu' or 'cuda') :type device: str :param backbone: backbone network :type backbone: str - :param ms_type: type of NMS method + :param nms_type: type of NMS method :type nms_type: str """ + self.input_rgb_image_topic = input_rgb_image_topic + + if output_rgb_image_topic is not None: + self.image_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) + else: + self.image_publisher = None - # Initialize the face detector + if detections_topic is not None: + self.object_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=1) + else: + self.object_publisher = None + + self.bridge = ROSBridge() + + # Initialize the object detector self.object_detector = SingleShotDetectorLearner(backbone=backbone, device=device) self.object_detector.download(path=".", verbose=True) self.object_detector.load("ssd_default_person") - self.class_names = self.object_detector.classes self.custom_nms = None - # Initialize Seq2Seq-NMS if selected + # Initialize NMS if selected if nms_type == 'seq2seq-nms': self.custom_nms = Seq2SeqNMSLearner(fmod_map_type='EDGEMAP', iou_filtering=0.8, - app_feats='fmod', device=self.device) - self.custom_nms.download(model_name='seq2seq_pets_jpd', path='.') - self.custom_nms.load('./seq2seq_pets_jpd/', verbose=True) + app_feats='fmod', device=device) + self.custom_nms.download(model_name='seq2seq_pets_jpd_fmod', path='.') + self.custom_nms.load('./seq2seq_pets_jpd_fmod/', verbose=True) + rospy.loginfo("Object Detection 2D SSD node seq2seq-nms initialized.") elif nms_type == 'soft-nms': - self.custom_nms = SoftNMS(nms_thres=0.45, device=self.device) + self.custom_nms = SoftNMS(nms_thres=0.45, device=device) + rospy.loginfo("Object Detection 2D SSD node soft-nms initialized.") elif nms_type == 'fast-nms': - self.custom_nms = FastNMS(nms_thres=0.45, device=self.device) + self.custom_nms = FastNMS(device=device) + rospy.loginfo("Object Detection 2D SSD node fast-nms initialized.") elif nms_type == 'cluster-nms': - self.custom_nms = ClusterNMS(nms_thres=0.45, device=self.device) - - # Initialize OpenDR ROSBridge object - self.bridge = ROSBridge() - - # setup communications - if output_image_topic is not None: - self.image_publisher = rospy.Publisher(output_image_topic, ROS_Image, queue_size=10) - else: - self.image_publisher = None - - if detections_topic is not None: - self.bbox_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=10) + self.custom_nms = ClusterNMS(device=device) + rospy.loginfo("Object Detection 2D SSD node cluster-nms initialized.") else: - self.bbox_publisher = None + rospy.loginfo("Object Detection 2D SSD node using default NMS.") - rospy.Subscriber(input_image_topic, ROS_Image, self.callback) + def listen(self): + """ + Start the node and begin processing input data. + """ + rospy.init_node('object_detection_ssd_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Object detection SSD node started.") + rospy.spin() def callback(self, data): """ - Callback that process the input data and publishes to the corresponding topics + Callback that processes the input data and publishes to the corresponding topics. :param data: input message :type data: sensor_msgs.msg.Image """ - # Convert sensor_msgs.msg.Image into OpenDR Image image = self.bridge.from_ros_image(data, encoding='bgr8') - # Run pose estimation + # Run object detection boxes = self.object_detector.infer(image, threshold=0.45, keep_size=False, custom_nms=self.custom_nms) # Get an OpenCV image back - image = np.float32(image.opencv()) - - # Convert detected boxes to ROS type and publish - ros_boxes = self.bridge.to_ros_boxes(boxes) - if self.bbox_publisher is not None: - self.bbox_publisher.publish(ros_boxes) - rospy.loginfo("Published face boxes") - - # Annotate image and publish result - # NOTE: converting back to OpenDR BoundingBoxList is unnecessary here, - # only used to test the corresponding bridge methods - odr_boxes = self.bridge.from_ros_boxes(ros_boxes) - image = draw_bounding_boxes(image, odr_boxes, class_names=self.class_names) - if self.image_publisher is not None: - message = self.bridge.to_ros_image(Image(image), encoding='bgr8') - self.image_publisher.publish(message) - rospy.loginfo("Published annotated image") + image = image.opencv() + # Publish detections in ROS message + ros_boxes = self.bridge.to_ros_boxes(boxes) # Convert to ROS boxes + if self.object_publisher is not None: + self.object_publisher.publish(ros_boxes) + + if self.image_publisher is not None: + # Annotate image with object detection boxes + image = draw_bounding_boxes(image, boxes, class_names=self.object_detector.classes) + # Convert the annotated OpenDR image to ROS2 image message using bridge and publish it + self.image_publisher.publish(self.bridge.to_ros_image(Image(image), encoding='bgr8')) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_rgb_image_topic", help="Topic name for output annotated rgb image", + type=str, default="/opendr/image_objects_annotated") + parser.add_argument("-d", "--detections_topic", help="Topic name for detection messages", + type=str, default="/opendr/objects") + parser.add_argument("--device", help="Device to use (cpu, cuda)", type=str, default="cuda", choices=["cuda", "cpu"]) + parser.add_argument("--backbone", help="Backbone network, defaults to vgg16_atrous", + type=str, default="vgg16_atrous", choices=["vgg16_atrous"]) + parser.add_argument("--nms_type", help="Non-Maximum Suppression type, defaults to \"default\", options are " + "\"seq2seq-nms\", \"soft-nms\", \"fast-nms\", \"cluster-nms\"", + type=str, default="default", + choices=["default", "seq2seq-nms", "soft-nms", "fast-nms", "cluster-nms"]) + args = parser.parse_args() -if __name__ == '__main__': - # Automatically run on GPU/CPU try: - if mx.context.num_gpus() > 0: - print("GPU found.") - device = 'cuda' - else: + if args.device == "cuda" and mx.context.num_gpus() > 0: + device = "cuda" + elif args.device == "cuda": print("GPU not found. Using CPU instead.") - device = 'cpu' + device = "cpu" + else: + print("Using CPU.") + device = "cpu" except: - device = 'cpu' + print("Using CPU.") + device = "cpu" - # initialize ROS node - rospy.init_node('opendr_object_detection', anonymous=True) - rospy.loginfo("Object detection node started!") + object_detection_ssd_node = ObjectDetectionSSDNode(device=device, backbone=args.backbone, nms_type=args.nms_type, + input_rgb_image_topic=args.input_rgb_image_topic, + output_rgb_image_topic=args.output_rgb_image_topic, + detections_topic=args.detections_topic) + object_detection_ssd_node.listen() - input_image_topic = rospy.get_param("~input_image_topic", "/videofile/image_raw") - # created node object - object_detection_node = ObjectDetectionSSDNode(device=device, input_image_topic=input_image_topic) - # begin ROS communications - rospy.spin() +if __name__ == '__main__': + main() diff --git a/projects/opendr_ws/src/perception/scripts/object_detection_2d_yolov3.py b/projects/opendr_ws/src/perception/scripts/object_detection_2d_yolov3.py index 93155f148b..546d7f11fd 100755 --- a/projects/opendr_ws/src/perception/scripts/object_detection_2d_yolov3.py +++ b/projects/opendr_ws/src/perception/scripts/object_detection_2d_yolov3.py @@ -13,111 +13,127 @@ # See the License for the specific language governing permissions and # limitations under the License. -import rospy +import argparse import mxnet as mx -import numpy as np + +import rospy from vision_msgs.msg import Detection2DArray from sensor_msgs.msg import Image as ROS_Image from opendr_bridge import ROSBridge + from opendr.engine.data import Image from opendr.perception.object_detection_2d import YOLOv3DetectorLearner from opendr.perception.object_detection_2d import draw_bounding_boxes class ObjectDetectionYOLONode: - def __init__(self, input_image_topic="/usb_cam/image_raw", output_image_topic="/opendr/image_boxes_annotated", - detections_topic="/opendr/objects", device="cuda", backbone="darknet53"): + + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", + output_rgb_image_topic="/opendr/image_objects_annotated", detections_topic="/opendr/objects", + device="cuda", backbone="darknet53"): """ - Creates a ROS Node for face detection - :param input_image_topic: Topic from which we are reading the input image - :type input_image_topic: str - :param output_image_topic: Topic to which we are publishing the annotated image (if None, we are not publishing - annotated image) - :type output_image_topic: str - :param detections_topic: Topic to which we are publishing the annotations (if None, we are not publishing - annotated pose annotations) + Creates a ROS Node for object detection with YOLOV3. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the annotated image (if None, no annotated + image is published) + :type output_rgb_image_topic: str + :param detections_topic: Topic to which we are publishing the annotations (if None, no object detection message + is published) :type detections_topic: str :param device: device on which we are running inference ('cpu' or 'cuda') :type device: str :param backbone: backbone network :type backbone: str """ + self.input_rgb_image_topic = input_rgb_image_topic - # Initialize the face detector - self.object_detector = YOLOv3DetectorLearner(backbone=backbone, device=device) - self.object_detector.download(path=".", verbose=True) - self.object_detector.load("yolo_default") - self.class_names = self.object_detector.classes - - # Initialize OpenDR ROSBridge object - self.bridge = ROSBridge() - - # setup communications - if output_image_topic is not None: - self.image_publisher = rospy.Publisher(output_image_topic, ROS_Image, queue_size=10) + if output_rgb_image_topic is not None: + self.image_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) else: self.image_publisher = None if detections_topic is not None: - self.bbox_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=10) + self.object_publisher = rospy.Publisher(detections_topic, Detection2DArray, queue_size=1) else: - self.bbox_publisher = None + self.object_publisher = None - rospy.Subscriber(input_image_topic, ROS_Image, self.callback) + self.bridge = ROSBridge() + + # Initialize the object detector + self.object_detector = YOLOv3DetectorLearner(backbone=backbone, device=device) + self.object_detector.download(path=".", verbose=True) + self.object_detector.load("yolo_default") + + def listen(self): + """ + Start the node and begin processing input data. + """ + rospy.init_node('object_detection_yolov3_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Object detection YOLOV3 node started.") + rospy.spin() def callback(self, data): """ - Callback that process the input data and publishes to the corresponding topics + Callback that processes the input data and publishes to the corresponding topics. :param data: input message :type data: sensor_msgs.msg.Image """ - # Convert sensor_msgs.msg.Image into OpenDR Image image = self.bridge.from_ros_image(data, encoding='bgr8') - rospy.loginfo("image info: {}".format(image.numpy().shape)) - # Run pose estimation + # Run object detection boxes = self.object_detector.infer(image, threshold=0.1, keep_size=False) # Get an OpenCV image back - image = np.float32(image.opencv()) - - # Convert detected boxes to ROS type and publish - ros_boxes = self.bridge.to_ros_boxes(boxes) - if self.bbox_publisher is not None: - self.bbox_publisher.publish(ros_boxes) - rospy.loginfo("Published face boxes") - - # Annotate image and publish result - # NOTE: converting back to OpenDR BoundingBoxList is unnecessary here, - # only used to test the corresponding bridge methods - odr_boxes = self.bridge.from_ros_boxes(ros_boxes) - image = draw_bounding_boxes(image, odr_boxes, class_names=self.class_names) - if self.image_publisher is not None: - message = self.bridge.to_ros_image(Image(image), encoding='bgr8') - self.image_publisher.publish(message) - rospy.loginfo("Published annotated image") + image = image.opencv() + # Publish detections in ROS message + ros_boxes = self.bridge.to_ros_bounding_box_list(boxes) # Convert to ROS bounding_box_list + if self.object_publisher is not None: + self.object_publisher.publish(ros_boxes) + + if self.image_publisher is not None: + # Annotate image with object detection boxes + image = draw_bounding_boxes(image, boxes, class_names=self.object_detector.classes) + # Convert the annotated OpenDR image to ROS2 image message using bridge and publish it + self.image_publisher.publish(self.bridge.to_ros_image(Image(image), encoding='bgr8')) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_rgb_image_topic", help="Topic name for output annotated rgb image", + type=str, default="/opendr/image_objects_annotated") + parser.add_argument("-d", "--detections_topic", help="Topic name for detection messages", + type=str, default="/opendr/objects") + parser.add_argument("--device", help="Device to use, either \"cpu\" or \"cuda\", defaults to \"cuda\"", + type=str, default="cuda", choices=["cuda", "cpu"]) + parser.add_argument("--backbone", help="Backbone network, defaults to \"darknet53\"", + type=str, default="darknet53", choices=["darknet53"]) + args = parser.parse_args() -if __name__ == '__main__': - # Automatically run on GPU/CPU try: - if mx.context.num_gpus() > 0: - print("GPU found.") - device = 'cuda' - else: + if args.device == "cuda" and mx.context.num_gpus() > 0: + device = "cuda" + elif args.device == "cuda": print("GPU not found. Using CPU instead.") - device = 'cpu' + device = "cpu" + else: + print("Using CPU.") + device = "cpu" except: - device = 'cpu' + print("Using CPU.") + device = "cpu" - # initialize ROS node - rospy.init_node('opendr_object_detection', anonymous=True) - rospy.loginfo("Object detection node started!") + object_detection_yolov3_node = ObjectDetectionYOLONode(device=device, backbone=args.backbone, + input_rgb_image_topic=args.input_rgb_image_topic, + output_rgb_image_topic=args.output_rgb_image_topic, + detections_topic=args.detections_topic) + object_detection_yolov3_node.listen() - input_image_topic = rospy.get_param("~input_image_topic", "/videofile/image_raw") - # created node object - object_detection_node = ObjectDetectionYOLONode(device=device, input_image_topic=input_image_topic) - # begin ROS communications - rospy.spin() +if __name__ == '__main__': + main() diff --git a/projects/opendr_ws/src/perception/scripts/pose_estimation.py b/projects/opendr_ws/src/perception/scripts/pose_estimation.py index 855ada40cf..34171afad4 100644 --- a/projects/opendr_ws/src/perception/scripts/pose_estimation.py +++ b/projects/opendr_ws/src/perception/scripts/pose_estimation.py @@ -13,71 +13,83 @@ # See the License for the specific language governing permissions and # limitations under the License. +import argparse +import torch import rospy -import torch -from vision_msgs.msg import Detection2DArray from sensor_msgs.msg import Image as ROS_Image +from ros_bridge.msg import OpenDRPose2D from opendr_bridge import ROSBridge + +from opendr.engine.data import Image from opendr.perception.pose_estimation import draw from opendr.perception.pose_estimation import LightweightOpenPoseLearner -from opendr.engine.data import Image class PoseEstimationNode: - def __init__(self, input_image_topic="/usb_cam/image_raw", output_image_topic="/opendr/image_pose_annotated", - pose_annotations_topic="/opendr/poses", device="cuda"): + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", + output_rgb_image_topic="/opendr/image_pose_annotated", detections_topic="/opendr/poses", device="cuda", + num_refinement_stages=2, use_stride=False, half_precision=False): """ - Creates a ROS Node for pose detection - :param input_image_topic: Topic from which we are reading the input image - :type input_image_topic: str - :param output_image_topic: Topic to which we are publishing the annotated image (if None, we are not publishing - annotated image) - :type output_image_topic: str - :param pose_annotations_topic: Topic to which we are publishing the annotations (if None, we are not publishing - annotated pose annotations) - :type pose_annotations_topic: str + Creates a ROS Node for pose estimation with Lightweight OpenPose. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the annotated image (if None, no annotated + image is published) + :type output_rgb_image_topic: str + :param detections_topic: Topic to which we are publishing the annotations (if None, no pose detection message + is published) + :type detections_topic: str :param device: device on which we are running inference ('cpu' or 'cuda') :type device: str + :param num_refinement_stages: Specifies the number of pose estimation refinement stages are added on the + model's head, including the initial stage. Can be 0, 1 or 2, with more stages meaning slower and more accurate + inference + :type num_refinement_stages: int + :param use_stride: Whether to add a stride value in the model, which reduces accuracy but increases + inference speed + :type use_stride: bool + :param half_precision: Enables inference using half (fp16) precision instead of single (fp32) precision. + Valid only for GPU-based inference + :type half_precision: bool """ - if output_image_topic is not None: - self.image_publisher = rospy.Publisher(output_image_topic, ROS_Image, queue_size=10) + self.input_rgb_image_topic = input_rgb_image_topic + + if output_rgb_image_topic is not None: + self.image_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) else: self.image_publisher = None - if pose_annotations_topic is not None: - self.pose_publisher = rospy.Publisher(pose_annotations_topic, Detection2DArray, queue_size=10) + if detections_topic is not None: + self.pose_publisher = rospy.Publisher(detections_topic, OpenDRPose2D, queue_size=1) else: self.pose_publisher = None - self.input_image_topic = input_image_topic - self.bridge = ROSBridge() - # Initialize the pose estimation - self.pose_estimator = LightweightOpenPoseLearner(device=device, num_refinement_stages=0, - mobilenet_use_stride=False, - half_precision=False) + # Initialize the pose estimation learner + self.pose_estimator = LightweightOpenPoseLearner(device=device, num_refinement_stages=num_refinement_stages, + mobilenet_use_stride=use_stride, + half_precision=half_precision) self.pose_estimator.download(path=".", verbose=True) self.pose_estimator.load("openpose_default") def listen(self): """ - Start the node and begin processing input data + Start the node and begin processing input data. """ - rospy.init_node('opendr_pose_estimation', anonymous=True) - rospy.Subscriber(self.input_image_topic, ROS_Image, self.callback) - rospy.loginfo("Pose estimation node started!") + rospy.init_node('pose_estimation_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Pose estimation node started.") rospy.spin() def callback(self, data): """ - Callback that process the input data and publishes to the corresponding topics - :param data: input message + Callback that processes the input data and publishes to the corresponding topics. + :param data: Input image message :type data: sensor_msgs.msg.Image """ - # Convert sensor_msgs.msg.Image into OpenDR Image image = self.bridge.from_ros_image(data, encoding='bgr8') @@ -86,31 +98,63 @@ def callback(self, data): # Get an OpenCV image back image = image.opencv() - # Annotate image and publish results - for pose in poses: - if self.pose_publisher is not None: - ros_pose = self.bridge.to_ros_pose(pose) - self.pose_publisher.publish(ros_pose) - # We get can the data back using self.bridge.from_ros_pose(ros_pose) - # e.g., opendr_pose = self.bridge.from_ros_pose(ros_pose) - draw(image, pose) + # Publish detections in ROS message + if self.pose_publisher is not None: + for pose in poses: + # Convert OpenDR pose to ROS2 pose message using bridge and publish it + self.pose_publisher.publish(self.bridge.to_ros_pose(pose)) if self.image_publisher is not None: - message = self.bridge.to_ros_image(Image(image), encoding='bgr8') - self.image_publisher.publish(message) - + # Annotate image with poses + for pose in poses: + draw(image, pose) + # Convert the annotated OpenDR image to ROS2 image message using bridge and publish it + self.image_publisher.publish(self.bridge.to_ros_image(Image(image), encoding='bgr8')) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_rgb_image_topic", help="Topic name for output annotated rgb image", + type=str, default="/opendr/image_pose_annotated") + parser.add_argument("-d", "--detections_topic", help="Topic name for detection messages", + type=str, default="/opendr/poses") + parser.add_argument("--device", help="Device to use, either \"cpu\" or \"cuda\", defaults to \"cuda\"", + type=str, default="cuda", choices=["cuda", "cpu"]) + parser.add_argument("--accelerate", help="Enables acceleration flags (e.g., stride)", default=False, + action="store_true") + args = parser.parse_args() -if __name__ == '__main__': - # Select the device for running the try: - if torch.cuda.is_available(): - print("GPU found.") - device = 'cuda' - else: + if args.device == "cuda" and torch.cuda.is_available(): + device = "cuda" + elif args.device == "cuda": print("GPU not found. Using CPU instead.") - device = 'cpu' + device = "cpu" + else: + print("Using CPU.") + device = "cpu" except: - device = 'cpu' + print("Using CPU.") + device = "cpu" + + if args.accelerate: + stride = True + stages = 0 + half_prec = True + else: + stride = False + stages = 2 + half_prec = False + + pose_estimator_node = PoseEstimationNode(device=device, + input_rgb_image_topic=args.input_rgb_image_topic, + output_rgb_image_topic=args.output_rgb_image_topic, + detections_topic=args.detections_topic, + num_refinement_stages=stages, use_stride=stride, half_precision=half_prec) + pose_estimator_node.listen() - pose_estimation_node = PoseEstimationNode(device=device) - pose_estimation_node.listen() + +if __name__ == '__main__': + main() diff --git a/projects/opendr_ws/src/perception/scripts/semantic_segmentation_bisenet.py b/projects/opendr_ws/src/perception/scripts/semantic_segmentation_bisenet.py index 32390c9157..e28e81139e 100644 --- a/projects/opendr_ws/src/perception/scripts/semantic_segmentation_bisenet.py +++ b/projects/opendr_ws/src/perception/scripts/semantic_segmentation_bisenet.py @@ -14,98 +14,177 @@ # limitations under the License. import argparse +import numpy as np import torch +import cv2 +import colorsys + import rospy from sensor_msgs.msg import Image as ROS_Image from opendr_bridge import ROSBridge + from opendr.engine.data import Image +from opendr.engine.target import Heatmap from opendr.perception.semantic_segmentation import BisenetLearner -import numpy as np -import cv2 class BisenetNode: - def __init__(self, - input_image_topic, - output_heatmap_topic=None, - device="cuda" - ): + + def __init__(self, input_rgb_image_topic="/usb_cam/image_raw", output_heatmap_topic="/opendr/heatmap", + output_rgb_image_topic="/opendr/heatmap_visualization", device="cuda"): """ - Initialize the Bisenet ROS node and create an instance of the respective learner class. - :param input_image_topic: ROS topic for the input image - :type input_image_topic: str - :param output_heatmap_topic: ROS topic for the predicted heatmap + Creates a ROS Node for semantic segmentation with Bisenet. + :param input_rgb_image_topic: Topic from which we are reading the input image + :type input_rgb_image_topic: str + :param output_heatmap_topic: Topic to which we are publishing the heatmap in the form of a ROS image containing + class ids :type output_heatmap_topic: str + :param output_rgb_image_topic: Topic to which we are publishing the heatmap image blended with the + input image and a class legend for visualization purposes + :type output_rgb_image_topic: str :param device: device on which we are running inference ('cpu' or 'cuda') :type device: str """ - self.input_image_topic = input_image_topic - self.output_heatmap_topic = output_heatmap_topic + self.input_rgb_image_topic = input_rgb_image_topic - if self.output_heatmap_topic is not None: - self._heatmap_publisher = rospy.Publisher(f'{self.output_heatmap_topic}/semantic', ROS_Image, queue_size=10) + if output_heatmap_topic is not None: + self.heatmap_publisher = rospy.Publisher(output_heatmap_topic, ROS_Image, queue_size=1) else: - self._heatmap_publisher = None + self.heatmap_publisher = None - rospy.Subscriber(self.input_image_topic, ROS_Image, self.callback) + if output_rgb_image_topic is not None: + self.visualization_publisher = rospy.Publisher(output_rgb_image_topic, ROS_Image, queue_size=1) + else: + self.visualization_publisher = None - # Initialize OpenDR ROSBridge object - self._bridge = ROSBridge() + self.bridge = ROSBridge() # Initialize the semantic segmentation model - self._learner = BisenetLearner(device=device) - self._learner.download(path="bisenet_camvid") - self._learner.load("bisenet_camvid") + self.learner = BisenetLearner(device=device) + self.learner.download(path="bisenet_camvid") + self.learner.load("bisenet_camvid") - self._colors = np.random.randint(0, 256, (256, 3), dtype=np.uint8) + self.class_names = ["Bicyclist", "Building", "Car", "Column Pole", "Fence", "Pedestrian", "Road", "Sidewalk", + "Sign Symbol", "Sky", "Tree", "Unknown"] + self.colors = self.getDistinctColors(len(self.class_names)) # Generate n distinct colors def listen(self): """ - Start the node and begin processing input data + Start the node and begin processing input data. """ - rospy.init_node('bisenet', anonymous=True) - rospy.loginfo("Bisenet node started!") + rospy.init_node('semantic_segmentation_bisenet_node', anonymous=True) + rospy.Subscriber(self.input_rgb_image_topic, ROS_Image, self.callback, queue_size=1, buff_size=10000000) + rospy.loginfo("Semantic segmentation BiSeNet node started.") rospy.spin() - def callback(self, data: ROS_Image): + def callback(self, data): """ - Predict the heatmap from the input image and publish the results. + Callback that processes the input data and publishes to the corresponding topics. :param data: Input image message :type data: sensor_msgs.msg.Image """ - # Convert sensor_msgs.msg.Image to OpenDR Image - image = self._bridge.from_ros_image(data) + # Convert sensor_msgs.msg.Image into OpenDR Image + image = self.bridge.from_ros_image(data, encoding='bgr8') try: - # Retrieve the OpenDR heatmap - prediction = self._learner.infer(image) - - if self._heatmap_publisher is not None and self._heatmap_publisher.get_num_connections() > 0: - heatmap_np = prediction.numpy() - heatmap_o = self._colors[heatmap_np] - heatmap_o = cv2.resize(np.uint8(heatmap_o), (960, 720)) - self._heatmap_publisher.publish(self._bridge.to_ros_image(Image(heatmap_o), encoding='bgr8')) - + # Run semantic segmentation to retrieve the OpenDR heatmap + heatmap = self.learner.infer(image) + + # Publish heatmap in the form of an image containing class ids + if self.heatmap_publisher is not None: + heatmap = Heatmap(heatmap.data.astype(np.uint8)) # Convert to uint8 + self.heatmap_publisher.publish(self.bridge.to_ros_image(heatmap)) + + # Publish heatmap color visualization blended with the input image and a class color legend + if self.visualization_publisher is not None: + heatmap_colors = Image(self.colors[heatmap.numpy()]) + image = Image(cv2.resize(image.convert("channels_last", "bgr"), (960, 720))) + alpha = 0.4 # 1.0 means full input image, 0.0 means full heatmap + beta = (1.0 - alpha) + image_blended = cv2.addWeighted(image.opencv(), alpha, heatmap_colors.opencv(), beta, 0.0) + # Add a legend + image_blended = self.addLegend(image_blended, np.unique(heatmap.data)) + + self.visualization_publisher.publish(self.bridge.to_ros_image(Image(image_blended), + encoding='bgr8')) except Exception: rospy.logwarn('Failed to generate prediction.') + def addLegend(self, image, unique_class_ints): + # Text setup + origin_x, origin_y = 5, 5 # Text origin x, y + color_rectangle_size = 25 + font_size = 1.0 + font_thickness = 2 + w_max = 0 + for i in range(len(unique_class_ints)): + text = self.class_names[unique_class_ints[i]] # Class name + x, y = origin_x, origin_y + i * color_rectangle_size # Text position + # Determine class color and convert to regular integers + color = (int(self.colors[unique_class_ints[i]][0]), + int(self.colors[unique_class_ints[i]][1]), + int(self.colors[unique_class_ints[i]][2])) + # Get text width and height + (w, h), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_size, font_thickness) + if w >= w_max: + w_max = w + # Draw partial background rectangle + image = cv2.rectangle(image, (x - origin_x, y), + (x + origin_x + color_rectangle_size + w_max, + y + color_rectangle_size), + (255, 255, 255, 0.5), -1) + # Draw color rectangle + image = cv2.rectangle(image, (x, y), + (x + color_rectangle_size, y + color_rectangle_size), color, -1) + # Draw class name text + image = cv2.putText(image, text, (x + color_rectangle_size + 2, y + h), + cv2.FONT_HERSHEY_SIMPLEX, font_size, (0, 0, 0), font_thickness) + return image + + @staticmethod + def HSVToRGB(h, s, v): + (r, g, b) = colorsys.hsv_to_rgb(h, s, v) + return np.array([int(255 * r), int(255 * g), int(255 * b)]) + + def getDistinctColors(self, n): + huePartition = 1.0 / (n + 1) + return np.array([self.HSVToRGB(huePartition * value, 1.0, 1.0) for value in range(0, n)]).astype(np.uint8) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input_rgb_image_topic", help="Topic name for input rgb image", + type=str, default="/usb_cam/image_raw") + parser.add_argument("-o", "--output_heatmap_topic", help="Topic to which we are publishing the heatmap in the form " + "of a ROS image containing class ids", + type=str, default="/opendr/heatmap") + parser.add_argument("-v", "--output_rgb_image_topic", help="Topic to which we are publishing the heatmap image " + "blended with the input image and a class legend for " + "visualization purposes", + type=str, default="/opendr/heatmap_visualization") + parser.add_argument("--device", help="Device to use, either \"cpu\" or \"cuda\", defaults to \"cuda\"", + type=str, default="cuda", choices=["cuda", "cpu"]) + args = parser.parse_args() -if __name__ == '__main__': - # Select the device for running the try: - if torch.cuda.is_available(): - print("GPU found.") + if args.device == "cuda" and torch.cuda.is_available(): device = "cuda" - else: + elif args.device == "cuda": print("GPU not found. Using CPU instead.") device = "cpu" + else: + print("Using CPU.") + device = "cpu" except: + print("Using CPU.") device = "cpu" - parser = argparse.ArgumentParser() - parser.add_argument('image_topic', type=str, help='listen to images on this topic') - parser.add_argument('--heatmap_topic', type=str, help='publish the heatmap on this topic') - args = parser.parse_args() - - bisenet_node = BisenetNode(device=device, input_image_topic=args.image_topic, output_heatmap_topic=args.heatmap_topic) + bisenet_node = BisenetNode(device=device, + input_rgb_image_topic=args.input_rgb_image_topic, + output_heatmap_topic=args.output_heatmap_topic, + output_rgb_image_topic=args.output_rgb_image_topic) bisenet_node.listen() + + +if __name__ == '__main__': + main() diff --git a/projects/opendr_ws/src/ros_bridge/CMakeLists.txt b/projects/opendr_ws/src/ros_bridge/CMakeLists.txt index b7ed470ae0..f66066c41f 100644 --- a/projects/opendr_ws/src/ros_bridge/CMakeLists.txt +++ b/projects/opendr_ws/src/ros_bridge/CMakeLists.txt @@ -14,6 +14,12 @@ catkin_python_setup() ################################################ ## Declare ROS messages, services and actions ## ################################################ +add_message_files( + DIRECTORY msg + FILES + OpenDRPose2DKeypoint.msg + OpenDRPose2D.msg +) generate_messages( DEPENDENCIES diff --git a/projects/opendr_ws/src/ros_bridge/msg/.keep b/projects/opendr_ws/src/ros_bridge/msg/.keep deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/projects/opendr_ws/src/ros_bridge/msg/OpenDRPose2D.msg b/projects/opendr_ws/src/ros_bridge/msg/OpenDRPose2D.msg new file mode 100644 index 0000000000..09b1443027 --- /dev/null +++ b/projects/opendr_ws/src/ros_bridge/msg/OpenDRPose2D.msg @@ -0,0 +1,26 @@ +# Copyright 2020-2022 OpenDR European Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This message represents a full OpenDR human pose 2D as a list of keypoints + +Header header + +# The id of the pose +int32 pose_id + +# The pose detection confidence of the model +float32 conf + +# A list of a human 2D pose keypoints +OpenDRPose2DKeypoint[] keypoint_list diff --git a/projects/opendr_ws/src/ros_bridge/msg/OpenDRPose2DKeypoint.msg b/projects/opendr_ws/src/ros_bridge/msg/OpenDRPose2DKeypoint.msg new file mode 100644 index 0000000000..72d14a19f2 --- /dev/null +++ b/projects/opendr_ws/src/ros_bridge/msg/OpenDRPose2DKeypoint.msg @@ -0,0 +1,22 @@ +# Copyright 2020-2022 OpenDR European Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This message contains all relevant information for an OpenDR human pose 2D keypoint + +# The kpt_name according to https://github.com/opendr-eu/opendr/blob/master/docs/reference/lightweight-open-pose.md#notes +string kpt_name + +# x and y pixel position on the input image, (0, 0) is top-left corner of image +int32 x +int32 y diff --git a/projects/opendr_ws/src/ros_bridge/src/opendr_bridge/bridge.py b/projects/opendr_ws/src/ros_bridge/src/opendr_bridge/bridge.py index fe7e4171f2..d9f8efdb9d 100755 --- a/projects/opendr_ws/src/ros_bridge/src/opendr_bridge/bridge.py +++ b/projects/opendr_ws/src/ros_bridge/src/opendr_bridge/bridge.py @@ -28,6 +28,7 @@ from sensor_msgs.msg import Image as ImageMsg, PointCloud as PointCloudMsg, ChannelFloat32 as ChannelFloat32Msg import rospy from geometry_msgs.msg import Point32 as Point32Msg, Quaternion as QuaternionMsg +from ros_bridge.msg import OpenDRPose2D, OpenDRPose2DKeypoint class ROSBridge: @@ -69,51 +70,50 @@ def to_ros_image(self, image: Image, encoding: str='passthrough') -> ImageMsg: message = self._cv_bridge.cv2_to_imgmsg(image.opencv(), encoding=encoding) return message - def to_ros_pose(self, pose): + def to_ros_pose(self, pose: Pose): """ - Converts an OpenDR pose into a Detection2DArray msg that can carry the same information - Each keypoint is represented as a bbox centered at the keypoint with zero width/height. The subject id is also - embedded on each keypoint (stored in ObjectHypothesisWithPose). - :param pose: OpenDR pose to be converted + Converts an OpenDR Pose into a OpenDRPose2D msg that can carry the same information, i.e. a list of keypoints, + the pose detection confidence and the pose id. + Each keypoint is represented as an OpenDRPose2DKeypoint with x, y pixel position on input image with (0, 0) + being the top-left corner. + :param pose: OpenDR Pose to be converted to OpenDRPose2D :type pose: engine.target.Pose :return: ROS message with the pose - :rtype: vision_msgs.msg.Detection2DArray + :rtype: ros_bridge.msg.OpenDRPose2D """ data = pose.data - keypoints = Detection2DArray() - for i in range(data.shape[0]): - keypoint = Detection2D() - keypoint.bbox = BoundingBox2D() - keypoint.results.append(ObjectHypothesisWithPose()) - keypoint.bbox.center = Pose2D() - keypoint.bbox.center.x = data[i][0] - keypoint.bbox.center.y = data[i][1] - keypoint.bbox.size_x = 0 - keypoint.bbox.size_y = 0 - keypoint.results[0].id = pose.id - if pose.confidence: - keypoint.results[0].score = pose.confidence - keypoints.detections.append(keypoint) - return keypoints + # Setup ros pose + ros_pose = OpenDRPose2D() + ros_pose.pose_id = int(pose.id) + if pose.confidence: + ros_pose.conf = pose.confidence - def from_ros_pose(self, ros_pose): - """ - Converts a ROS message with pose payload into an OpenDR pose - :param ros_pose: the pose to be converted (represented as vision_msgs.msg.Detection2DArray) - :type ros_pose: vision_msgs.msg.Detection2DArray - :return: an OpenDR pose + # Add keypoints to pose + for i in range(data.shape[0]): + ros_keypoint = OpenDRPose2DKeypoint() + ros_keypoint.kpt_name = pose.kpt_names[i] + ros_keypoint.x = data[i][0] + ros_keypoint.y = data[i][1] + # Add keypoint to pose + ros_pose.keypoint_list.append(ros_keypoint) + return ros_pose + + def from_ros_pose(self, ros_pose: OpenDRPose2D): + """ + Converts an OpenDRPose2D message into an OpenDR Pose. + :param ros_pose: the ROS pose to be converted + :type ros_pose: ros_bridge.msg.OpenDRPose2D + :return: an OpenDR Pose :rtype: engine.target.Pose """ - keypoints = ros_pose.detections - data = [] - pose_id, confidence = None, None + ros_keypoints = ros_pose.keypoint_list + keypoints = [] + pose_id, confidence = ros_pose.pose_id, ros_pose.conf - for keypoint in keypoints: - data.append(keypoint.bbox.center.x) - data.append(keypoint.bbox.center.y) - confidence = keypoint.results[0].score - pose_id = keypoint.results[0].id - data = np.asarray(data).reshape((-1, 2)) + for ros_keypoint in ros_keypoints: + keypoints.append(int(ros_keypoint.x)) + keypoints.append(int(ros_keypoint.y)) + data = np.asarray(keypoints).reshape((-1, 2)) pose = Pose(data, confidence) pose.id = pose_id @@ -213,7 +213,7 @@ def to_ros_boxes(self, box_list): ros_box.bbox.center.y = box.top + box.height / 2. ros_box.bbox.size_x = box.width ros_box.bbox.size_y = box.height - ros_box.results[0].id = box.name + ros_box.results[0].id = int(box.name) if box.confidence: ros_box.results[0].score = box.confidence ros_boxes.detections.append(ros_box) @@ -235,8 +235,8 @@ def from_ros_boxes(self, ros_detections): height = box.bbox.size_y left = box.bbox.center.x - width / 2. top = box.bbox.center.y - height / 2. - id = box.results[0].id - bbox = BoundingBox(top=top, left=left, width=width, height=height, name=id) + _id = int(box.results[0].id) + bbox = BoundingBox(top=top, left=left, width=width, height=height, name=_id) bboxes.data.append(bbox) return bboxes diff --git a/projects/opendr_ws/src/simulation/scripts/human_model_generation_client.py b/projects/opendr_ws/src/simulation/scripts/human_model_generation_client.py index 1f9470f9c6..375f9c9f0b 100644 --- a/projects/opendr_ws/src/simulation/scripts/human_model_generation_client.py +++ b/projects/opendr_ws/src/simulation/scripts/human_model_generation_client.py @@ -46,6 +46,6 @@ human_model = Model_3D(vertices, triangles, vertex_colors) human_model.save_obj_mesh('./human_model.obj') [out_imgs, human_pose_2D] = human_model.get_img_views(rotations=[30, 120], human_pose_3D=pose, plot_kps=True) - cv2.imwrite('./rendering.png', out_imgs[0].numpy()) + cv2.imwrite('./rendering.png', out_imgs[0].opencv()) except rospy.ServiceException as e: print("Service call failed: %s" % e) diff --git a/src/opendr/engine/target.py b/src/opendr/engine/target.py index 9bc9dbacf1..54a7c4931b 100644 --- a/src/opendr/engine/target.py +++ b/src/opendr/engine/target.py @@ -296,9 +296,13 @@ def data(self, data): raise ValueError("Pose expects either NumPy arrays or lists as data") def __str__(self): - """Matches kpt_names and keypoints x,y to get the best human-readable format for pose.""" + """ + Returns pose in a human-readable format, that contains the pose ID, detection confidence and + the matched kpt_names and keypoints x,y position. + """ - out_string = "" + out_string = "Pose ID: " + str(self.id) + out_string += "\nDetection confidence: " + str(self.confidence) + "\nKeypoints name-position:\n" # noinspection PyUnresolvedReferences for name, kpt in zip(Pose.kpt_names, self.data.tolist()): out_string += name + ": " + str(kpt) + "\n"