diff --git a/pose_estimation/scripts/arm_keypoint_capture.py b/pose_estimation/scripts/arm_keypoint_capture.py index 782138acc7b1aad393ac0e44039580daada6d303..8ec7883740efef45595e4b165517ed964d5d20cf 100755 --- a/pose_estimation/scripts/arm_keypoint_capture.py +++ b/pose_estimation/scripts/arm_keypoint_capture.py @@ -1,33 +1,30 @@ ############################## TODO ################################ -# 1) the angle closer to the table not accurate - - - -# TESTS: -# - y (depth) -> poor accuracy -# - z (height) -> better accuracy but gets out of range -# - x (width) -> the range is too small - -### VERIFY: spherical to coordinates - +# +# +### TEST: spherical to coordinates +# # IMPROVEMENTS: How often do we update the arm? How long to move the arm? Which IKs won't be solved -# GENERAL: Global variables, comments and structure +# GENERAL: Global variables, comments and structure. Dependencies (mediapipe, vg) #################################################################### - import mediapipe as mp import numpy as np import rospy import math import cv2 import vg -import logging +import os from mediapipe.framework.formats import landmark_pb2 ### Importing messages from pose_estimation.msg import SphericalCoordinates +### Replaying a pre-recorded mp4 video or using the webcam +video_replay = True +test_video = "test_movement.mp4" +VIDEO = f"{os.getcwd()}/../catkin_ws/src/pose_estimation/test/{test_video}" + ### Mediapipe drawing mp_drawing = mp.solutions.drawing_utils mp_drawing_styles = mp.solutions.drawing_styles @@ -45,7 +42,6 @@ z_axis = np.array([0, 0, 1]) ### Ranges mp_distance_range = (0.0,0.53) robot_distance_range = (0.14, 0.46) -# robot_distance_range = (0.14, 0.46) ### Threshold VISIBILITY_THRESHOLD = 0.2 @@ -125,12 +121,14 @@ def publish_keypoint(pub, results): # Scaling the angle which defines the height to mimic the robotic arm's range of movement phi = vg.angle(shifted_keypoint, y_axis) + + ## TODO: Global variable if phi < 30 or phi > 150: rospy.logwarn("Invalid angle") break - phi = adjust_range(60, 140, 30, 80, phi) + phi = adjust_range(30, 150, 30, 80, phi) distance = euclid_dist(np.array([0,0,0]), shifted_keypoint) @@ -140,9 +138,6 @@ def publish_keypoint(pub, results): robot_distance_range[0], robot_distance_range[1], distance) - - print(f"Calculated distance: {distance}") - spherical_coordinates = SphericalCoordinates() spherical_coordinates.theta = math.radians(theta) spherical_coordinates.phi = math.radians(phi) @@ -158,7 +153,6 @@ def publish_keypoint(pub, results): def skeleton_estimation_pose(pub, img, pose, data_collection = True): results = pose.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) - pose_landmarks = results.pose_landmarks try: arm = [results.pose_landmarks.landmark[arm_keypoint] for arm_keypoint in arm_landmarks] @@ -181,19 +175,37 @@ def skeleton_estimation_pose(pub, img, pose, data_collection = True): def video_capture(): node = 'arm_keypoint_capture' rospy.init_node(node) + pub = rospy.Publisher(node, SphericalCoordinates, queue_size=50) - video = cv2.VideoCapture(0) + if (video_replay): + video = cv2.VideoCapture(VIDEO) + else: + video = cv2.VideoCapture(0) + data_collection = False + # Check if camera opened successfully + if (not video.isOpened()): + print("Error opening video stream or file") + return + + if (not video): + print("Error - video is None") + return + with mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, model_complexity=2, enable_segmentation=True) as pose: - + print("Press 'E' to capture the keypoints") while not rospy.is_shutdown(): _, capture = video.read() - skeleton_estimation_pose(pub, capture, pose, data_collection) + try: + skeleton_estimation_pose(pub, capture, pose, data_collection) + except Exception as e: + rospy.logwarn("Video finished") + break key = cv2.waitKey(1) if key == ord('e'): diff --git a/pose_estimation/scripts/pose_estimation_control.py b/pose_estimation/scripts/pose_estimation_control.py index 38579a59aa4a203653ff97dedf6fdbae2f55b5bf..7d39d4314a7073146da07f0ae9531c9b2319c968 100755 --- a/pose_estimation/scripts/pose_estimation_control.py +++ b/pose_estimation/scripts/pose_estimation_control.py @@ -10,12 +10,16 @@ from open_manipulator_msgs.srv import SetJointPosition from open_manipulator_msgs.msg import JointPosition from geometry_msgs.msg import Pose +from geometry_msgs.msg import PointStamped + from open_manipulator_msgs.srv import SetKinematicsPose from open_manipulator_msgs.msg import KinematicsPose ### Importing the messages for Spherical Coordinates from pose_estimation.msg import SphericalCoordinates +from visualization_msgs.msg import Marker + ### Defining available joints by name JOINT_NAMES = ["joint1", "joint2", "joint3", "joint4"] @@ -24,13 +28,16 @@ JOINT_NAMES = ["joint1", "joint2", "joint3", "joint4"] END_EFFECTOR_NAME = "gripper" PATH_TIME = 2 -# ### Define starting joint positions +### Starting joint positions START_LOCATION = { 'x': 0.286, 'y': 0.0, 'z': 0.204, } +### If set to true will publish the coordinates as PointStamped msg which can be displayed in RVIZ +DEBUG = True + # Calculating the distance between two points def dis(point1, point2): return math.sqrt((point2[0] - point1[0])**2 + (point2[1] - point1[1])**2) @@ -38,25 +45,60 @@ def dis(point1, point2): # SOURCES: https://math.libretexts.org/Bookshelves/Calculus/Book%3A_Calculus_(OpenStax)/12%3A_Vectors_in_Space/12.07%3A_Cylindrical_and_Spherical_Coordinates def spherical_to_cartesian(d, theta, phi): - # TODO: Prove why change of cos and sin + global debug_pub + + # TODO: # x = d * sin(phi) * cos(theta) # y = d * sin(phi) * sin(theta) # z = d * cos(phi) - x = d * sin(phi) * sin(theta) + x = d * cos(phi) * sin(theta) y = d * sin(phi) * cos(theta) z = d * sin(phi) - print(f"Distance: {d}") coordinates = { - "x" : START_LOCATION['x'], + "x" : x, #START_LOCATION['x'], "y" : START_LOCATION['y'], "z" : z } + + + if (DEBUG): + msg = Marker() + + msg.header.frame_id = "my_frame" + msg.header.stamp = rospy.Time.now() + + msg.pose.position.x = coordinates['x'] * 10 + msg.pose.position.y = coordinates['y'] * 10 + msg.pose.position.z = coordinates['z'] * 10 + + msg.pose.orientation.x = 0 + msg.pose.orientation.y = 0 + msg.pose.orientation.z = 0 + msg.pose.orientation.w = 0 + + msg.scale.x = 0.2 * 10 + msg.scale.y = 0.01 * 10 + msg.scale.z = 0.05 * 10 + + msg.color.r = 199 + msg.color.g = 21 + msg.color.b = 133 + msg.color.a = 1.0 + + debug_pub.publish(msg) + + print(f"Distance {d}") + print(coordinates) + print() return coordinates def set_position(target_location): + # TODO: Exception and stop program if connections fails + rospy.wait_for_service('goal_task_space_path_position_only', 2) + set_kinematics_pose = rospy.ServiceProxy('goal_task_space_path_position_only', SetKinematicsPose) pose = Pose() @@ -75,27 +117,26 @@ def set_position(target_location): def video_capture(coordinate): - # TODO: Exception and stop program if connections fails - rospy.wait_for_service('goal_task_space_path_position_only', 2) - # Converting the spherical coordiantes received from pose estimation to cartesian coordinates target_location = spherical_to_cartesian(coordinate.distance, coordinate.theta, coordinate.phi) - try: - # Setting the location of the robotic arm - set_position(target_location) - except rospy.ServiceException as e: - print("Service call failed %s"%e) + if(not DEBUG): + try: + # Setting the location of the robotic arm + set_position(target_location) - # Sleep equal to the time needed for the arm to move to the desired location - rospy.sleep(PATH_TIME) + except rospy.ServiceException as e: + rospy.logerr(f"Service call failed {e}") -def pose_estimation(): - rospy.init_node('pose_estimation_control') + # Sleep equal to the time needed for the arm to move to the desired location + rospy.sleep(PATH_TIME) +def pose_estimation(): # Moving the arm to the starting position rospy.loginfo("Moving the arm to the starting location") - set_position(START_LOCATION) + + if (not DEBUG): + set_position(START_LOCATION) print("Subscribing to `arm_keypoint_capture`") rospy.Subscriber("arm_keypoint_capture", SphericalCoordinates, video_capture, queue_size = 1) @@ -104,6 +145,11 @@ def pose_estimation(): if __name__ == '__main__': + node = 'pose_estimation_control' + rospy.init_node(node) + + debug_pub = rospy.Publisher(node, Marker, queue_size=50) + try: pose_estimation() except rospy.ROSInterruptException as e: diff --git a/pose_estimation/test/test_movement.mp4 b/pose_estimation/test/test_movement.mp4 new file mode 100644 index 0000000000000000000000000000000000000000..7bbdb2211eb91ccf2ff725a6f3481199dd756e24 Binary files /dev/null and b/pose_estimation/test/test_movement.mp4 differ