Programming
<Face Tracking>
The findface_publisher.py
is a publisher that detects people's face and transfer the x,y coordinates of people's face to the face_tracks.py script.
Directory: facedetects/nodes/findface_publisher.py
- | findface_publisher.py
#!/usr/bin/env python import cv2 as cv # opencv library name to cv import rospy #ros python from sensor_msgs.msg import Image from geometry_msgs.msg import Point rec_point=Point() rospy.init_node("furo_find") coordinates_pub = rospy.Publisher("target_coordinate", Point, queue_size=10) #publish x,y coordinate to the node ## Trained XML file for detecting face with its path face_cascade = cv.CascadeClassifier('/home/dasl/opencv/opencv-4.2.0/data/haarcascades/haarcascade_frontalface_alt.xml') # Trained XML file for detecting eyes eye_cascade = cv.CascadeClassifier('/home/dasl/opencv/opencv-4.2.0/data/haarcascades/haarcascade_eye.xml') # Capture frames from a camera cap = cv.VideoCapture(0,cv.CAP_V4L) while True: ret, img = cap.read() # Convert to gray scale of each frames gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY) # Detects faces of different sizes in the input image faces = face_cascade.detectMultiScale(gray, 1.3, 5) for (x,y,w,h) in faces: # To draw a rectangle in a face cv.rectangle(img,(x,y),(x+w,y+h),(255,255,0),2) roi_gray = gray[y:y+h, x:x+w] roi_color = img[y:y+h, x:x+w] target_x=(w/2.0)+x target_y=(h/2.0)+y target_direction_x = target_x/640 target_direction_y = target_y/480 target_direction_x -= 0.5 target_direction_y -= 0.5 target_direction_x *= 2.0 target_direction_y *= 2.0 rec_point.x = target_direction_x rec_point.y = target_direction_y # Detects eyes of different sizes in the input image # eyes = eye_cascade.detectMultiScale(roi_gray) #To draw a rectangle in eyes #for (ex,ey,ew,eh) in eyes: # cv.rectangle(roi_color,(ex,ey),(ex+ew,ey+eh),(0,127,255),2) ##Use the below comment if you need to display an image in a window screen_res=2560,1440 window_width=2560 window_height=1440 cv.namedWindow('img',cv.WINDOW_NORMAL) resized=cv.resize(img,screen_res) cv.resizeWindow('img',window_width, window_height) flipped =cv.flip(resized,1) cv.imshow('img', flipped) h,w,c=img.shape #print(w) #print('width:',img.width) #print('height:',img.height) coordinates_pub.publish(rec_point) rospy.loginfo(rec_point) rate = rospy.Rate(200) #rate.sleep() # Wait for Esc key to stop if cv.waitKey(5) ==27: break
The face_tracks.py
script is a subscriber that receives the x,y coordinate of a person's face and enable Furo to move toward the person.
Directory: head_motor/src/face_tracks.py
- | face_tracks.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy from geometry_msgs.msg import Twist from std_msgs.msg import Float32 from geometry_msgs.msg import Point import RPi.GPIO as GPIO from time import sleep GPIO.setmode(GPIO.BOARD) GPIO.setwarnings(False) AN2 =33 #pitch AN1 = 32 #roll DIG2 = 18 #pitch DIG1 = 37 #roll GPIO.setup(AN2,GPIO.OUT) GPIO.setup(AN1, GPIO.OUT) GPIO.setup(DIG2, GPIO.OUT) GPIO.setup(DIG1, GPIO.OUT) sleep(1) p1=GPIO.PWM(AN1,100) p2=GPIO.PWM(AN2,100) __author__= "4dimentional@kau.kr" target_x = 0; target_y = 0; error_x = 0; error_y = 0; diff_x = 0; diff_y = 0; max_output_x = 76.7; min_output_x = -76.7; max_output_y = 76.7; min_output_y = -76.7; def head_motion_callback(rec_point): global target_y,target_x global error_x,error_y global diff_x,diff_y global max_output_x, min_output_x global max_output_y, min_output_y Px,Dx =20, 30 Py,Dy = 20 , 30 prev_x = 0 prev_y = 0 target_x = rec_point.x target_y = rec_point.y error_x = 0 - target_x error_y = 0 - target_y diff_x = prev_x - error_x diff_y = prev_y - error_y prev_x = error_x prev_y = error_y output_x = Px*error_x + Dx*diff_x output_y = Py*error_y + Dy*diff_y if output_x > max_output_x: output_x = max_output_x elif output_x < min_output_x: output_x = min_output_x if output_x < 0: left_speed = abs(output_x) GPIO.output(DIG2,GPIO.LOW) p2.start(left_speed) elif output_x > 0: right_speed = abs(output_x) GPIO.output(DIG2,GPIO.HIGH) p2.start(right_speed) if output_y > max_output_y: output_y = max_output_y elif output_y < min_output_y: output_y = min_output_y if output_y < 0: upward_speed = abs(output_y) GPIO.output(DIG1,GPIO.LOW) p1.start(upward_speed) elif output_y > 0: downward_speed = abs(output_y) GPIO.output(DIG1,GPIO.HIGH) p1.start(downward_speed) print(output_y) def head_vel_callback(headCTR): head_roll = headCTR.data if head_roll==4: rospy.loginfo("Head downward") GPIO.output(DIG2, GPIO.HIGH) p1.start(0) # set AN2 as HIGH, M2B will turn ON p2.start(30) # set Direction for M2 #delay for 2 second if head_roll==2: rospy.loginfo("Head upward") GPIO.output(DIG2, GPIO.LOW) p1.start(0) p2.start(30) if head_roll==5: rospy.loginfo("STOP") p1.start(0) # Direction can ignore p2.start(0) # Direction can ignore #delay for 3 second def listener(): rospy.init_node("furo_head") rospy.Subscriber("cmd_head",Float32, head_vel_callback) rospy.Subscriber("target_coordinate",Point, head_motion_callback) rate = rospy.Rate(200) while not rospy.is_shutdown(): rate.sleep() if __name__ == '__main__': listener()
<Furo's Mobility using Joystick>
The joy_node
script is to convert joystick value to digital value.
Directory: joy/src/joy_node.cpp
The main.cpp
script is a topic that is both a publisher and a subscriber at the same time. It obtains value from the joystick, converts it to float data type and passes it to the motor controller.
Directory: learning_joy/src/main.cpp
CMakeList Directory: learning_joy/CMakeLists.txt
- | CMakeLists.txt
cmake_minimum_required(VERSION 2.8.3) project(learning_joy) ## Compile as C++11, supported in ROS Kinetic and newer # add_compile_options(-std=c++11) ## Find catkin macros and libraries ## if COMPONENTS list like find_package(catkin REQUIRED COMPONENTS xyz) ## is used, also find other catkin packages find_package(catkin REQUIRED COMPONENTS joy roscpp turtlesim ) ## System dependencies are found with CMake's conventions # find_package(Boost REQUIRED COMPONENTS system) ## Uncomment this if the package has a setup.py. This macro ensures ## modules and global scripts declared therein get installed ## See http://ros.org/doc/api/catkin/html/user_guide/setup_dot_py.html # catkin_python_setup() ########### ## Build ## ########### ## Specify additional locations of header files ## Your package locations should be listed before other locations include_directories( # include ${catkin_INCLUDE_DIRS} ) ############# ## Testing ## ############# ## Add gtest based cpp test target and link libraries # catkin_add_gtest(${PROJECT_NAME}-test test/test_learning_joy.cpp) # if(TARGET ${PROJECT_NAME}-test) # target_link_libraries(${PROJECT_NAME}-test ${PROJECT_NAME}) # endif() ## Add folders to be run by python nosetests # catkin_add_nosetests(test) add_executable(furo src/main.cpp) #Calls furo instead of main.cpp when launching the script from ROS target_link_libraries(furo ${catkin_LIBRARIES})
The roboclaw.launch
script is to control the motor controller that is connected to the wheels of FURO.
Directory: roboclaw_ros/roboclaw_node/launch/roboclaw.launch
- | roboclaw.launch
<?xml version="1.0"?> <launch> <arg name="dev" default="/dev/ttyACM0"/> <arg name="baud" default="38400"/> <arg name="address" default="128"/> <arg name="max_speed" default="1"/> <arg name="ticks_per_meter" default="265957.447"/> <arg name="base_width" default="0.315"/> <arg name="run_diag" default="true"/> #launches furo.py script from roboclaw_node <node if="$(arg run_diag)" pkg="roboclaw_node" type="furo.py" name="furo" output="screen">> <param name="~dev" value="$(arg dev)"/> <param name="~baud" value="$(arg baud)"/> <param name="~address" value="$(arg address)"/> <param name="~max_speed" value="$(arg max_speed)"/> <param name="~ticks_per_meter" value="$(arg ticks_per_meter)"/> <param name="~base_width" value="$(arg base_width)"/> </node> <node pkg="diagnostic_aggregator" type="aggregator_node" name="diagnostic_aggregator"> <rosparam command="load" file="$(find roboclaw_node)/config/roboclaw_diag.yaml"/> </node> </launch>
The furo.py
from roboclaw_node directory controls the motor controller of Furo's wheels.
Directory: roboclaw_ros/roboclaw_node/nodes/furo.py
- | furo.py
\\ #!/usr/bin/env python from math import pi, cos, sin import diagnostic_msgs import diagnostic_updater import roboclaw_driver.roboclaw_driver as roboclaw import rospy import tf from geometry_msgs.msg import Quaternion, Twist from nav_msgs.msg import Odometry __author__ = "phamquyenanh.qb@gmail.com modifine" # TODO need to find some better was of handling OSerror 11 or preventing it, any ideas? class EncoderOdom: def __init__(self, ticks_per_meter, base_width): self.TICKS_PER_METER = ticks_per_meter self.BASE_WIDTH = base_width self.odom_pub = rospy.Publisher('/odom', Odometry, queue_size=10) self.cur_x = 0 self.cur_y = 0 self.cur_theta = 0.0 self.last_enc_left = 0 self.last_enc_right = 0 self.last_enc_time = rospy.Time.now() @staticmethod def normalize_angle(angle): while angle > pi: angle -= 2.0 * pi while angle < -pi: angle += 2.0 * pi return angle def update(self, enc_left, enc_right): left_ticks = enc_left - self.last_enc_left right_ticks = enc_right - self.last_enc_right self.last_enc_left = enc_left self.last_enc_right = enc_right dist_left = left_ticks / self.TICKS_PER_METER dist_right = right_ticks / self.TICKS_PER_METER dist = (dist_right + dist_left) / 2.0 current_time = rospy.Time.now() d_time = (current_time - self.last_enc_time).to_sec() self.last_enc_time = current_time # TODO find better what to determine going straight, this means slight deviation is accounted if left_ticks == right_ticks: d_theta = 0.0 self.cur_x += dist * cos(self.cur_theta) self.cur_y += dist * sin(self.cur_theta) else: d_theta = (dist_right - dist_left) / self.BASE_WIDTH r = dist / d_theta self.cur_x += r * (sin(d_theta + self.cur_theta) - sin(self.cur_theta)) self.cur_y -= r * (cos(d_theta + self.cur_theta) - cos(self.cur_theta)) self.cur_theta = self.normalize_angle(self.cur_theta + d_theta) if abs(d_time) < 0.000001: vel_x = 0.0 vel_theta = 0.0 else: vel_x = dist / d_time vel_theta = d_theta / d_time return vel_x, vel_theta def update_publish(self, enc_left, enc_right): # 2106 per 0.1 seconds is max speed, error in the 16th bit is 32768 # TODO lets find a better way to deal with this error #print('Left Encoder' , enc_left) #print('Right Encoder' , enc_right) if abs(enc_left - self.last_enc_left) > 20000: rospy.logerr("Ignoring left encoder jump: cur %d, last %d" % (enc_left, self.last_enc_left)) elif abs(enc_right - self.last_enc_right) > 20000: rospy.logerr("Ignoring right encoder jump: cur %d, last %d" % (enc_right, self.last_enc_right)) else: vel_x, vel_theta = self.update(enc_left, enc_right) self.publish_odom(self.cur_x, self.cur_y, self.cur_theta, vel_x, vel_theta) def publish_odom(self, cur_x, cur_y, cur_theta, vx, vth): quat = tf.transformations.quaternion_from_euler(0, 0, cur_theta) current_time = rospy.Time.now() br = tf.TransformBroadcaster() br.sendTransform((cur_x, cur_y, 0), tf.transformations.quaternion_from_euler(0, 0, -cur_theta), current_time, "base_link", "odom") odom = Odometry() odom.header.stamp = current_time odom.header.frame_id = 'odom' odom.pose.pose.position.x = cur_x odom.pose.pose.position.y = cur_y odom.pose.pose.position.z = 0.0 odom.pose.pose.orientation = Quaternion(*quat) odom.pose.covariance[0] = 0.01 odom.pose.covariance[7] = 0.01 odom.pose.covariance[14] = 99999 odom.pose.covariance[21] = 99999 odom.pose.covariance[28] = 99999 odom.pose.covariance[35] = 0.01 odom.child_frame_id = 'base_link' odom.twist.twist.linear.x = vx odom.twist.twist.linear.y = 0 odom.twist.twist.angular.z = vth odom.twist.covariance = odom.pose.covariance self.odom_pub.publish(odom) class Node: def __init__(self): self.ERRORS = {0x0000: (diagnostic_msgs.msg.DiagnosticStatus.OK, "Normal"), 0x0001: (diagnostic_msgs.msg.DiagnosticStatus.WARN, "M1 over current"), 0x0002: (diagnostic_msgs.msg.DiagnosticStatus.WARN, "M2 over current"), 0x0004: (diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Emergency Stop"), 0x0008: (diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Temperature1"), 0x0010: (diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Temperature2"), 0x0020: (diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Main batt voltage high"), 0x0040: (diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Logic batt voltage high"), 0x0080: (diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Logic batt voltage low"), 0x0100: (diagnostic_msgs.msg.DiagnosticStatus.WARN, "M1 driver fault"), 0x0200: (diagnostic_msgs.msg.DiagnosticStatus.WARN, "M2 driver fault"), 0x0400: (diagnostic_msgs.msg.DiagnosticStatus.WARN, "Main batt voltage high"), 0x0800: (diagnostic_msgs.msg.DiagnosticStatus.WARN, "Main batt voltage low"), 0x1000: (diagnostic_msgs.msg.DiagnosticStatus.WARN, "Temperature1"), 0x2000: (diagnostic_msgs.msg.DiagnosticStatus.WARN, "Temperature2"), 0x4000: (diagnostic_msgs.msg.DiagnosticStatus.OK, "M1 home"), 0x8000: (diagnostic_msgs.msg.DiagnosticStatus.OK, "M2 home")} rospy.init_node("roboclaw_node") rospy.on_shutdown(self.shutdown) rospy.loginfo("Connecting to roboclaw") dev_name = rospy.get_param("~dev", "/dev/ttyACM1") baud_rate = int(rospy.get_param("~baud", "38400")) self.address = int(rospy.get_param("~address", "128")) if self.address > 0x87 or self.address < 0x80: rospy.logfatal("Address out of range") rospy.signal_shutdown("Address out of range") # TODO need someway to check if address is correct try: roboclaw.Open(dev_name, baud_rate) except Exception as e: rospy.logfatal("Could not connect to Roboclaw") rospy.logdebug(e) rospy.signal_shutdown("Could not connect to Roboclaw") self.updater = diagnostic_updater.Updater() self.updater.setHardwareID("Roboclaw") self.updater.add(diagnostic_updater. FunctionDiagnosticTask("Vitals", self.check_vitals)) try: version = roboclaw.ReadVersion(self.address) except Exception as e: rospy.logwarn("Problem getting roboclaw version") rospy.logdebug(e) pass if not version[0]: rospy.logwarn("Could not get version from roboclaw") else: rospy.logdebug(repr(version[1])) roboclaw.SpeedM1M2(self.address, 0, 0) roboclaw.ResetEncoders(self.address) self.MAX_SPEED = float(rospy.get_param("~max_speed", "1.0")) self.TICKS_PER_METER = float(rospy.get_param("~ticks_per_meter", "265957.447")) self.BASE_WIDTH = float(rospy.get_param("~base_width", "0.315")) self.encodm = EncoderOdom(self.TICKS_PER_METER, self.BASE_WIDTH) self.last_set_speed_time = rospy.get_rostime() rospy.Subscriber("cmd_vel", Twist, self.cmd_vel_callback) rospy.sleep(1) rospy.logdebug("dev %s", dev_name) rospy.logdebug("baud %d", baud_rate) rospy.logdebug("address %d", self.address) rospy.logdebug("max_speed %f", self.MAX_SPEED) rospy.logdebug("ticks_per_meter %f", self.TICKS_PER_METER) rospy.logdebug("base_width %f", self.BASE_WIDTH) def run(self): rospy.loginfo("Starting motor drive") r_time = rospy.Rate(10) while not rospy.is_shutdown(): # TODO need find solution to the OSError11 looks like sync problem with serial status1, enc1, crc1 = None, None, None status2, enc2, crc2 = None, None, None try: status1, enc1, crc1 = roboclaw.ReadEncM1(self.address) except ValueError: pass except OSError as e: rospy.logwarn("ReadEncM1 OSError: %d", e.errno) rospy.logdebug(e) try: status2, enc2, crc2 = roboclaw.ReadEncM2(self.address) except ValueError: pass except OSError as e: rospy.logwarn("ReadEncM2 OSError: %d", e.errno) rospy.logdebug(e) if (enc1 != None) & (enc2 != None): rospy.logdebug(" Encoders %d %d" % (enc1, enc2)) self.encodm.update_publish(enc1, enc2) self.updater.update() r_time.sleep() def cmd_vel_callback(self, twist): self.last_set_speed_time = rospy.get_rostime() linear_x = twist.linear.x if linear_x==10: roboclaw.ResetEncoders(self.address) linear_x=0 if linear_x > self.MAX_SPEED: linear_x = self.MAX_SPEED if linear_x < -self.MAX_SPEED: linear_x = -self.MAX_SPEED vr = linear_x - twist.angular.z * self.BASE_WIDTH / 2.0 # m/s vl = linear_x + twist.angular.z * self.BASE_WIDTH / 2.0 vr_ticks = int(vr * 60) # ticks/s vl_ticks = int(vl * 60) rospy.loginfo("vr_ticks:%d vl_ticks: %d", vr_ticks, vl_ticks) try: # This is a hack way to keep a poorly tuned PID from making noise at speed 0 if vr_ticks is 0 and vl_ticks is 0: roboclaw.ForwardM1(self.address, 0) roboclaw.ForwardM2(self.address, 0) else: if vr_ticks>0: roboclaw.BackwardM1(self.address, vr_ticks) else: roboclaw.ForwardM1(self.address, abs(vr_ticks)) if vl_ticks>0: roboclaw.BackwardM2(self.address, vl_ticks) else: roboclaw.ForwardM2(self.address, abs(vl_ticks)) except OSError as e: rospy.logwarn("SpeedM1M2 OSError: %d", e.errno) rospy.logdebug(e) # TODO: Need to make this work when more than one error is raised def check_vitals(self, stat): try: status = roboclaw.ReadError(self.address)[1] except OSError as e: rospy.logwarn("Diagnostics OSError: %d", e.errno) rospy.logdebug(e) return state, message = self.ERRORS[status] stat.summary(state, message) try: stat.add("Main Batt V:", float(roboclaw.ReadMainBatteryVoltage(self.address)[1] / 10)) stat.add("Logic Batt V:", float(roboclaw.ReadLogicBatteryVoltage(self.address)[1] / 10)) stat.add("Temp1 C:", float(roboclaw.ReadTemp(self.address)[1] / 10)) stat.add("Temp2 C:", float(roboclaw.ReadTemp2(self.address)[1] / 10)) except OSError as e: rospy.logwarn("Diagnostics OSError: %d", e.errno) rospy.logdebug(e) return stat # TODO: need clean shutdown so motors stop even if new msgs are arriving def shutdown(self): rospy.loginfo("Shutting down") try: roboclaw.ForwardM1(self.address, 0) roboclaw.ForwardM2(self.address, 0) except OSError: rospy.logerr("Shutdown did not work trying again") try: roboclaw.ForwardM1(self.address, 0) roboclaw.ForwardM2(self.address, 0) except OSError as e: rospy.logerr("Could not shutdown motors!!!!") rospy.logdebug(e) if __name__ == "__main__": try: node = Node() node.run() except rospy.ROSInterruptException: pass rospy.loginfo("Exiting")
The finalheadmotor.py
controls the motor controller of Furo's head.
Directory: head_motor/src/finalheadmotor.py
- | finalheadmotor.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy from geometry_msgs.msg import Twist from std_msgs.msg import Float32 import RPi.GPIO as GPIO from time import sleep GPIO.setmode(GPIO.BOARD) GPIO.setwarnings(False) AN2 =33 AN1 = 32 DIG2 = 18 DIG1 = 37 GPIO.setup(AN2,GPIO.OUT) #speed of the pitch movement GPIO.setup(AN1, GPIO.OUT) #speed of the roll movement GPIO.setup(DIG2, GPIO.OUT) #direction of the pitch movement GPIO.setup(DIG1, GPIO.OUT) #direction of the roll movement sleep(1) p1=GPIO.PWM(AN1,100) #GPIO.PWM(pin,scale) p2=GPIO.PWM(AN2,100) __author__= "4dimentional@kau.kr" class callback: def head_vel_callback(self,headCTR): head_roll = headCTR.data if head_roll==1:#1=A(Joystick) rospy.loginfo("Head downward") GPIO.output(DIG1, GPIO.HIGH) #downward direction p1.start(60) #speed 60 out of 100 p2.start(0) elif head_roll==5: #5=LB(Joystick) rospy.loginfo("Head upward") GPIO.output(DIG1, GPIO.LOW) #upward direction p1.start(60) p2.start(0) elif head_roll==3: #3=Y(Joystick) rospy.loginfo("Head left") GPIO.output(DIG2,GPIO.HIGH) #left direction p1.start(0) p2.start(60) elif head_roll==2: #2=B(Joystick) rospy.loginfo("Head right") GPIO.output(DIG2,GPIO.LOW) #right direction p1.start(0) p2.start(60) elif head_roll==4: #LB rospy.loginfo("STOP") p1.start(0) #speed 0 out of 100 p2.start(0) def __init__(self): rospy.Subscriber("cmd_head",Float32, self.head_vel_callback) rate = rospy.Rate(10) while not rospy.is_shutdown(): rate.sleep() print(1) if __name__ == '__main__': rospy.init_node('head_roll',anonymous=True) try: wow = callback() except rospy.ROSInitException: pass