活动介绍
file-type

实现基于ROS的路径跟踪导航系统

ZIP文件

下载需积分: 41 | 7KB | 更新于2024-12-22 | 44 浏览量 | 22 下载量 举报 11 收藏
download 立即下载
具体操作包括读取一个文本文件中的坐标数据,将这些数据转换为ROS导航目标,并最终通过move_base节点实现机器人的移动和定位。这涉及到ROS的几个关键概念:文件读取、节点通信、目标发布以及路径规划。" 知识点详细说明: 1. ROS(Robot Operating System): ROS是一个为机器人软件开发提供服务的灵活框架。它具有大量已经实现的工具和库,使得创建复杂和健壮的机器人行为更加简单。ROS中的一切都是以“节点”来实现,节点是ROS软件应用中的一个进程,可以发布或订阅消息。 2. move_base/goal: move_base是一个ROS包,它结合了导航堆栈的全局路径规划和本地避障功能。其核心是一个名为base_local_planner的插件,用于处理来自传感器数据的实时避障。move_base/goal是move_base包中的一个概念,指的是机器人导航的目标位置,通常通过发布一个目标消息来指定。 3. 路径跟踪控制导航: 路径跟踪是指机器人根据预定路径点进行移动的过程。在这个过程中,机器人需要不断地接收到目标位置,并调整自己的运动状态以确保能够到达这些位置,同时避让障碍物。路径跟踪控制导航的关键在于路径规划、动态避障、路径跟踪算法的实现。 4. 文件读取: 在ROS中,文件读取通常使用标准的C++ I/O流库来实现。例如,在代码片段中使用的是fin.open("/home/~/point.txt")来打开一个文件。这里的“~”是一个ROS参数,代表用户的家目录。当执行程序时,ROS会自动将其替换为当前登录用户的家目录路径。 5. catkin_make: catkin是ROS的一种构建系统。catkin_make是catkin构建系统的封装,用于构建工作空间中的ROS包。它结合了cmake和make的步骤,能够自动找到所有依赖关系并编译源代码。 6. rosrun: rosrun是ROS中一个常用的运行命令行工具,用于运行一个特定的ROS节点。在描述中提到的"运行rosrun follow follow",意味着使用rosrun命令运行一个名为follow的ROS包中的follow节点。 7. 坐标数据的处理: 在路径跟踪中,需要从文本文件中读取坐标数据,这通常涉及到字符串处理、格式转换等问题。例如,点.txt文件可能包含多个坐标对,程序需要读取每对坐标并将其转换为ROS可识别的目标消息格式。 8. ROS话题(Topic): 在ROS中,节点之间的通信主要通过话题来实现。发布者(publisher)将消息发布到特定的话题上,而订阅者(subscriber)订阅该话题以接收消息。在路径跟踪导航中,坐标数据被发布到move_base/goal话题,move_base节点订阅该话题,并据此进行路径规划和机器人导航。 9. ROS导航堆栈: ROS导航堆栈是一组用于实现机器人自主导航的软件包。它包括路径规划、动态避障、传感器数据处理、全局定位系统(GPS)集成等功能。使用这些工具,机器人可以在未知环境中导航,到达指定的目标位置。 总结: 在本项目中,需要掌握如何读取和处理文本文件中的坐标数据,理解ROS的文件操作和节点通信机制,熟悉ROS导航堆栈和move_base/goal概念。通过ROS提供的构建和运行命令,实现机器人的路径跟踪控制导航功能。这些步骤的实现是机器人自动化和智能化的关键技术,广泛应用于工业、研究和教育领域。

相关推荐

filetype

#include <ros/ros.h> #include <geometry_msgs/PoseStamped.h> #include <actionlib_msgs/GoalStatusArray.h> #include <iostream> #include <vector> #include <geometry_msgs/Pose.h> #include <geometry_msgs/Point.h> #include <geometry_msgs/Quaternion.h> #include <move_base_msgs/MoveBaseAction.h> #include <nav_msgs/Path.h> #include <std_msgs/Int32.h> #include <xmlrpcpp/XmlRpcValue.h> bool go_flag=true; bool go_flagg = true; std::vector<std::vector<std::vector<double>>> param; float Num[2][4] = {{0.55,0.0,0.0,1.0},{1.6,1.4,0,0.9989}}; // float Num[4] = {7.0413,4.01,-0.0229,0.99999}; // float Num[4] = {7.164,5.456,0.99999,0.0089}; int i=0; int j = 0; void param_init(){ XmlRpc::XmlRpcValue xmlRpcParam; xmlRpcParam.setSize(param[0].size()); for (size_t i = 0; i < param[0].size(); ++i) { xmlRpcParam[i].setSize(param[0][i].size()); for (size_t j = 0; j < param[0][i].size(); ++j) { xmlRpcParam[i][j] = param[0][i][j]; } } ros::param::set("/static_path/segments", xmlRpcParam); } void goalStatusArrayCallback(const actionlib_msgs::GoalStatusArrayConstPtr &msg){ // if (msg->status_list.back().status == 3) // { // ROS_INFO("ok"); // } if ( !msg->status_list.empty()) // if (go_flag) { // go_flag=false; actionlib_msgs::GoalStatus status = msg->status_list.back(); if (status.status == 3) // 状态3表示目标已到达 { // ros::Duration(6).sleep(); go_flagg = false; ROS_INFO("ok"); i=1; ros::Duration(1).sleep(); } } } int main(int argc, char **argv) { ros::init(argc,argv,"goal"); ros::NodeHandle nh; geometry_msgs::PoseStamped msg; std_msgs::Int32 msgg; ros::Publisher pub = nh.advertise<geometry_msgs::PoseStamped>("/gerona/goal", 10); //发布 ros::Subscriber sub = nh.subscribe<actionlib_msgs::GoalStatusArray>("/navigate_to_goal/status",10,goalStatusArrayCallback); ros::Duration(1).sleep(); ros::Rate loop_rate(10); while (ros::ok()) { if (go_flagg&&go_flag) { go_flagg=false; msg.header.stamp = ros::Time::now(); msg.header.frame_id = "map"; msg.pose.position.x = Num[i][0]; msg.pose.position.y = Num[i][1]; msg.pose.orientation.z = Num[i][2]; msg.pose.orientation.w = Num[i][3]; pub.publish(msg); // i++; // printf("%d",i); // if (i==1) // go_flag = false; } if(i==1) { msg.header.stamp = ros::Time::now(); msg.header.frame_id = "map"; msg.pose.position.x = Num[i][0]; msg.pose.position.y = Num[i][1]; msg.pose.orientation.z = Num[i][2]; msg.pose.orientation.w = Num[i][3]; pub.publish(msg); } ros::spinOnce(); loop_rate.sleep(); } return 0; } 修改一下

filetype

namespace ucar_solution { Solution::Solution(ros::NodeHandle &nh) { ros::NodeHandle navi_nh(nh, "navigation"); //ros::NodeHandle speed_nh(nh, "speed"); mbf_client_ = std::make_unique<actionlib::SimpleActionClient<move_base_msgs::MoveBaseAction>>("move_base", true); if (!mbf_client_->waitForServer(ros::Duration(10.0))) { ROS_INFO("Waiting for the move_base action server to come up"); } target_sub = nh.subscribe("/target_msg", 100, &Solution::solutionCallback, this); start_sub = nh.subscribe("/ucar/is_start", 100, &Solution::startCallback, this); cmd_pub = nh.advertise<geometry_msgs::Twist>("/cmd_vel", 50); scan_sub_ = nh.subscribe("/scan", 1000, &Solution::scanCallback, this); rpy_sub_ = nh.subscribe("/rpy_topic", 100, &Solution::rpyCallback, this); scan_mode_pub_ = nh.advertise<std_msgs::Int32>("/scan_mode", 10); scan_start_pub_ = nh.advertise<std_msgs::Bool>("/scan_start", 10); detect_start_pub_ = nh.advertise<std_msgs::Bool>("/detect_start", 10); follow_start_pub_ = nh.advertise<std_msgs::Bool>("/follow_start", 10); pidInit(0.01, 0.003, 0.005, 960,-0.3); XmlRpc::XmlRpcValue patrol_list; navi_nh.getParam("speed", MAX_LIMIT); navi_nh.getParam("goal_list", patrol_list); for (int i = 0; i < patrol_list.size(); ++i) { ROS_ASSERT(patrol_list[i].getType() == XmlRpc::XmlRpcValue::TypeArray); geometry_msgs::PoseStamped pose_stamped; pose_stamped.header.frame_id = "map"; ROS_ASSERT(patrol_list[i][0].getType() == XmlRpc::XmlRpcValue::TypeDouble and patrol_list[i][1].getType() == XmlRpc::XmlRpcValue::TypeDouble and patrol_list[i][2].getType() == XmlRpc::XmlRpcValue::TypeDouble); pose_stamped.pose.position.x = static_cast<double>(patrol_list[i][0]); pose_stamped.pose.posit

filetype

import os import sys import math import heapq import time import numpy as np import matplotlib.pyplot as plt import scipy.spatial as kd import imageio.v2 as imageio import pandas as pd import openpyxl sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../MotionPlanning/") import astar as astar import reeds_shepp as rs class C: # Parameter config PI = np.pi XY_RESO = 2.0 # [m] YAW_RESO = np.deg2rad(15.0) # [rad] GOAL_YAW_ERROR = np.deg2rad(3.0) # [rad] MOVE_STEP = 0.2 # [m] path interporate resolution N_STEER = 20.0 # number of steer command COLLISION_CHECK_STEP = 10 # skip number for collision check EXTEND_AREA = 5.0 # [m] map extend length GEAR_COST = 100.0 # switch back penalty cost BACKWARD_COST = 5.0 # backward penalty cost STEER_CHANGE_COST = 5.0 # steer angle change penalty cost STEER_ANGLE_COST = 1.0 # steer angle penalty cost SCISSORS_COST = 200.0 # scissors cost H_COST = 10.0 # Heuristic cost W = 3.75 # [m] width of vehicle WB = 4.5 # [m] wheel base: rear to front steer WD = 2.1 # [m] distance between left-right wheels RF = 7.23 # [m] distance from rear to vehicle front end of vehicle RB = 0.77 # [m] distance from rear to vehicle back end of vehicle RTR = 15.596 # [m] rear to trailer wheel RTF = 4.35 # [m] distance from rear to vehicle front end of trailer RTB = 35.15 # [m] distance from rear to vehicle back end of trailer TR = 0.5 # [m] tyre radius TW = 0.285 # [m] tyre width TR1 = 0.447 # [m] tyre radius TW1 = 0.283 # [m] tyre width MAX_STEER = 0.52 # [rad] maximum steering angle TR2 = 0.565 TW2 = 0.419+0.8636 TR3 = 0.343 TW3 = 0.1968+0.4064 TR4 = 0.536 TW4 = 0.385 # 拖车参数 HINGE_DISTANCE = 1.45 # 铰接点距离牵引车后轴的距离 HINGE_DISTANCE_Front = 4.09 # 铰接点距离水平部分前端的距离 HINGE_DISTANCE_RW = 15.596 # 铰接点距离主起的距离 DISTANCE_MAIN_GEAR = 5.7146 # 主起的距离 TRAILER_VERTICAL_LENGTH = 16.65*2 # 垂直部分的总长度 TRAILER_VERTICAL_WIDTH = 5.0 # 垂直部分的宽度 TRAILER_HORIZONTAL_LENGTH = 39.5 # 水平部分的长度 TRAILER_HORIZONTAL_WIDTH = 3.759 # 水平部分的宽度 Length_hinge_end = TRAILER_HORIZONTAL_LENGTH - HINGE_DISTANCE_Front Length_hinge_front = HINGE_DISTANCE_Front Length_rear_main_gear = HINGE_DISTANCE_RW class Node: def __init__(self, xind, yind, yawind, direction, x, y, yaw, yawt, directions, steer, cost, pind): self.xind = xind self.yind = yind self.yawind = yawind self.direction = direction self.x = x self.y = y self.yaw = yaw self.yawt = yawt self.directions = directions self.steer = steer self.cost = cost self.pind = pind class Para: def __init__(self, minx, miny, minyaw, minyawt, maxx, maxy, maxyaw, maxyawt, xw, yw, yaww, yawtw, xyreso, yawreso, ox, oy, kdtree): self.minx = minx self.miny = miny self.minyaw = minyaw self.minyawt = minyawt self.maxx = maxx self.maxy = maxy self.maxyaw = maxyaw self.maxyawt = maxyawt self.xw = xw self.yw = yw self.yaww = yaww self.yawtw = yawtw self.xyreso = xyreso self.yawreso = yawreso self.ox = ox self.oy = oy self.kdtree = kdtree class Path: def __init__(self, x, y, yaw, yawt, direction, cost): self.x = x self.y = y self.yaw = yaw self.yawt = yawt self.direction = direction self.cost = cost class QueuePrior: def __init__(self): self.queue = [] def empty(self): return len(self.queue) == 0 # if Q is empty def put(self, item, priority): heapq.heappush(self.queue, (priority, item)) # reorder x using priority def get(self): return heapq.heappop(self.queue)[1] # pop out element with smallest priority def hybrid_astar_planning(sx, sy, syaw, syawt, gx, gy, gyaw, gyawt, ox, oy, xyreso, yawreso): """ planning hybrid A* path. :param sx: starting node x position [m] :param sy: starting node y position [m] :param syaw: starting node yaw angle [rad] :param syawt: starting node trailer yaw angle [rad] :param gx: goal node x position [m] :param gy: goal node y position [m] :param gyaw: goal node yaw angle [rad] :param gyawt: goal node trailer yaw angle [rad] :param ox: obstacle x positions [m] :param oy: obstacle y positions [m] :param xyreso: grid resolution [m] :param yawreso: yaw resolution [m] :return: hybrid A* path """ sxr, syr = round(sx / xyreso), round(sy / xyreso) gxr, gyr = round(gx / xyreso), round(gy / xyreso) syawr = round(rs.pi_2_pi(syaw) / yawreso) gyawr = round(rs.pi_2_pi(gyaw) / yawreso) nstart = Node(sxr, syr, syawr, 1, [sx], [sy], [syaw], [syawt], [1], 0.0, 0.0, -1) ngoal = Node(gxr, gyr, gyawr, 1, [gx], [gy], [gyaw], [gyawt], [1], 0.0, 0.0, -1) kdtree = kd.KDTree([[x, y] for x, y in zip(ox, oy)]) P = calc_parameters(ox, oy, xyreso, yawreso, kdtree) hmap = astar.calc_holonomic_heuristic_with_obstacle(ngoal, P.ox, P.oy, P.xyreso, 1.0) steer_set, direc_set = calc_motion_set() open_set, closed_set = {calc_index(nstart, P): nstart}, {} qp = QueuePrior() qp.put(calc_index(nstart, P), calc_hybrid_cost(nstart, hmap, P)) while True: if not open_set: return None ind = qp.get() n_curr = open_set[ind] closed_set[ind] = n_curr open_set.pop(ind) update, fpath = update_node_with_analystic_expantion(n_curr, ngoal, gyawt, P) if update: fnode = fpath break yawt0 = n_curr.yawt[0] for i in range(len(steer_set)): node = calc_next_node(n_curr, ind, steer_set[i], direc_set[i], P) if not is_index_ok(node, yawt0, P): continue node_ind = calc_index(node, P) if node_ind in closed_set: continue if node_ind not in open_set: open_set[node_ind] = node qp.put(node_ind, calc_hybrid_cost(node, hmap, P)) else: if open_set[node_ind].cost > node.cost: open_set[node_ind] = node print("final expand node: ", len(open_set) + len(closed_set)) return extract_path(closed_set, fnode, nstart) def extract_path(closed, ngoal, nstart): rx, ry, ryaw, ryawt, direc = [], [], [], [], [] cost = 0.0 node = ngoal while True: rx += node.x[::-1] ry += node.y[::-1] ryaw += node.yaw[::-1] ryawt += node.yawt[::-1] direc += node.directions[::-1] cost += node.cost if is_same_grid(node, nstart): break node = closed[node.pind] rx = rx[::-1] ry = ry[::-1] ryaw = ryaw[::-1] ryawt = ryawt[::-1] direc = direc[::-1] direc[0] = direc[1] path = Path(rx, ry, ryaw, ryawt, direc, cost) return path def update_node_with_analystic_expantion(n_curr, ngoal, gyawt, P): path = analystic_expantion(n_curr, ngoal, P) # rs path: n -> ngoal if not path: return False, None steps = [C.MOVE_STEP * d for d in path.directions] yawt = calc_trailer_yaw(path.yaw, n_curr.yawt[-1], steps) if abs(rs.pi_2_pi(yawt[-1] - gyawt)) >= C.GOAL_YAW_ERROR: return False, None fx = path.x[1:-1] fy = path.y[1:-1] fyaw = path.yaw[1:-1] fd = [] for d in path.directions[1:-1]: if d >= 0: fd.append(1.0) else: fd.append(-1.0) # fd = path.directions[1:-1] fcost = n_curr.cost + calc_rs_path_cost(path, yawt) fpind = calc_index(n_curr, P) fyawt = yawt[1:-1] fsteer = 0.0 fpath = Node(n_curr.xind, n_curr.yind, n_curr.yawind, n_curr.direction, fx, fy, fyaw, fyawt, fd, fsteer, fcost, fpind) return True, fpath def analystic_expantion(node, ngoal, P): sx, sy, syaw = node.x[-1], node.y[-1], node.yaw[-1] gx, gy, gyaw = ngoal.x[-1], ngoal.y[-1], ngoal.yaw[-1] maxc = math.tan(C.MAX_STEER) / C.WB paths = rs.calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=C.MOVE_STEP) if not paths: return None pq = QueuePrior() for path in paths: steps = [C.MOVE_STEP * d for d in path.directions] yawt = calc_trailer_yaw(path.yaw, node.yawt[-1], steps) pq.put(path, calc_rs_path_cost(path, yawt)) # while not pq.empty(): path = pq.get() steps = [C.MOVE_STEP * d for d in path.directions] yawt = calc_trailer_yaw(path.yaw, node.yawt[-1], steps) ind = range(0, len(path.x), C.COLLISION_CHECK_STEP) pathx = [path.x[k] for k in ind] pathy = [path.y[k] for k in ind] pathyaw = [path.yaw[k] for k in ind] pathyawt = [yawt[k] for k in ind] if not is_collision(pathx, pathy, pathyaw, pathyawt, P): return path return None def calc_next_node(n, ind, u, d, P): step = C.XY_RESO * 2.0 nlist = math.ceil(step / C.MOVE_STEP) xlist = [n.x[-1] + d * C.MOVE_STEP * math.cos(n.yaw[-1])] ylist = [n.y[-1] + d * C.MOVE_STEP * math.sin(n.yaw[-1])] yawlist = [rs.pi_2_pi(n.yaw[-1] + d * C.MOVE_STEP / C.WB * math.tan(u))] yawtlist = [rs.pi_2_pi(n.yawt[-1] + d * C.MOVE_STEP / C.RTR * math.sin(n.yaw[-1] - n.yawt[-1]))] for i in range(nlist - 1): xlist.append(xlist[i] + d * C.MOVE_STEP * math.cos(yawlist[i])) ylist.append(ylist[i] + d * C.MOVE_STEP * math.sin(yawlist[i])) yawlist.append(rs.pi_2_pi(yawlist[i] + d * C.MOVE_STEP / C.WB * math.tan(u))) yawtlist.append(rs.pi_2_pi(yawtlist[i] + d * C.MOVE_STEP / C.RTR * math.sin(yawlist[i] - yawtlist[i]))) xind = round(xlist[-1] / P.xyreso) yind = round(ylist[-1] / P.xyreso) yawind = round(yawlist[-1] / P.yawreso) cost = 0.0 if d > 0: direction = 1.0 cost += abs(step) else: direction = -1.0 cost += abs(step) * C.BACKWARD_COST if direction != n.direction: # switch back penalty cost += C.GEAR_COST cost += C.STEER_ANGLE_COST * abs(u) # steer penalyty cost += C.STEER_CHANGE_COST * abs(n.steer - u) # steer change penalty cost += C.SCISSORS_COST * sum([abs(rs.pi_2_pi(x - y)) for x, y in zip(yawlist, yawtlist)]) # jacknif cost cost = n.cost + cost directions = [direction for _ in range(len(xlist))] node = Node(xind, yind, yawind, direction, xlist, ylist, yawlist, yawtlist, directions, u, cost, ind) return node def is_collision(x, y, yaw, yawt, P): for ix, iy, iyaw, iyawt in zip(x, y, yaw, yawt): d = 1 deltal = (C.RF - C.RB) / 2.0 rc = (C.RF + C.RB) / 2.0 + d cx = ix + deltal * math.cos(iyaw) cy = iy + deltal * math.sin(iyaw) ids = P.kdtree.query_ball_point([cx, cy], rc) if ids: for i in ids: xo = P.ox[i] - cx yo = P.oy[i] - cy dx_car = xo * math.cos(iyaw) + yo * math.sin(iyaw) dy_car = -xo * math.sin(iyaw) + yo * math.cos(iyaw) if abs(dx_car) <= rc and \ abs(dy_car) <= C.W / 2.0 + d: return True d1 = 7.5 deltal = C.HINGE_DISTANCE_RW rt = C.TRAILER_VERTICAL_LENGTH / 2.0 + d1 ctx = ix + C.HINGE_DISTANCE * math.cos(iyaw) - deltal * math.cos(iyawt) cty = iy + C.HINGE_DISTANCE * math.sin(iyaw) - deltal * math.sin(iyawt) idst = P.kdtree.query_ball_point([ctx, cty], rt) if idst: for i in idst: xot = P.ox[i] - ctx yot = P.oy[i] - cty dx_trail = xot * math.cos(iyawt) + yot * math.sin(iyawt) dy_trail = -xot * math.sin(iyawt) + yot * math.cos(iyawt) if abs(dx_trail) <= rt and \ abs(dy_trail) <= rt: return True # 添加横摆角差值的判断 yaw_diff = abs(rs.pi_2_pi(iyaw - iyawt)) if np.rad2deg(yaw_diff) > 70: return True return False def calc_trailer_yaw(yaw, yawt0, steps): yawt = [0.0 for _ in range(len(yaw))] yawt[0] = yawt0 for i in range(1, len(yaw)): yawt[i] += yawt[i - 1] + steps[i - 1] / C.RTR * math.sin(yaw[i - 1] - yawt[i - 1]) return yawt def trailer_motion_model(x, y, yaw, yawt, D, d, L, delta): x += D * math.cos(yaw) y += D * math.sin(yaw) yaw += D / L * math.tan(delta) yawt += D / d * math.sin(yaw - yawt) return x, y, yaw, yawt def calc_rs_path_cost(rspath, yawt): cost = 0.0 for lr in rspath.lengths: if lr >= 0: cost += 1 else: cost += abs(lr) * C.BACKWARD_COST for i in range(len(rspath.lengths) - 1): if rspath.lengths[i] * rspath.lengths[i + 1] < 0.0: cost += C.GEAR_COST for ctype in rspath.ctypes: if ctype != "S": cost += C.STEER_ANGLE_COST * abs(C.MAX_STEER) nctypes = len(rspath.ctypes) ulist = [0.0 for _ in range(nctypes)] for i in range(nctypes): if rspath.ctypes[i] == "R": ulist[i] = -C.MAX_STEER elif rspath.ctypes[i] == "WB": ulist[i] = C.MAX_STEER for i in range(nctypes - 1): cost += C.STEER_CHANGE_COST * abs(ulist[i + 1] - ulist[i]) cost += C.SCISSORS_COST * sum([abs(rs.pi_2_pi(x - y)) for x, y in zip(rspath.yaw, yawt)]) return cost def calc_motion_set(): s = [i for i in np.arange(C.MAX_STEER / C.N_STEER, C.MAX_STEER, C.MAX_STEER / C.N_STEER)] steer = [0.0] + s + [-i for i in s] direc = [1.0 for _ in range(len(steer))] + [-1.0 for _ in range(len(steer))] steer = steer + steer return steer, direc def calc_hybrid_cost(node, hmap, P): cost = node.cost + \ C.H_COST * hmap[node.xind - P.minx][node.yind - P.miny] return cost def calc_index(node, P): ind = (node.yawind - P.minyaw) * P.xw * P.yw + \ (node.yind - P.miny) * P.xw + \ (node.xind - P.minx) yawt_ind = round(node.yawt[-1] / P.yawreso) ind += (yawt_ind - P.minyawt) * P.xw * P.yw * P.yaww return ind def is_index_ok(node, yawt0, P): if node.xind <= P.minx or \ node.xind >= P.maxx or \ node.yind <= P.miny or \ node.yind >= P.maxy: return False steps = [C.MOVE_STEP * d for d in node.directions] yawt = calc_trailer_yaw(node.yaw, yawt0, steps) ind = range(0, len(node.x), C.COLLISION_CHECK_STEP) x = [node.x[k] for k in ind] y = [node.y[k] for k in ind] yaw = [node.yaw[k] for k in ind] yawt = [yawt[k] for k in ind] if is_collision(x, y, yaw, yawt, P): return False return True def calc_parameters(ox, oy, xyreso, yawreso, kdtree): minxm = min(ox) - C.EXTEND_AREA minym = min(oy) - C.EXTEND_AREA maxxm = max(ox) + C.EXTEND_AREA maxym = max(oy) + C.EXTEND_AREA ox.append(minxm) oy.append(minym) ox.append(maxxm) oy.append(maxym) minx = round(minxm / xyreso) miny = round(minym / xyreso) maxx = round(maxxm / xyreso) maxy = round(maxym / xyreso) xw, yw = maxx - minx, maxy - miny minyaw = round(-C.PI / yawreso) - 1 maxyaw = round(C.PI / yawreso) yaww = maxyaw - minyaw minyawt, maxyawt, yawtw = minyaw, maxyaw, yaww P = Para(minx, miny, minyaw, minyawt, maxx, maxy, maxyaw, maxyawt, xw, yw, yaww, yawtw, xyreso, yawreso, ox, oy, kdtree) return P def is_same_grid(node1, node2): if node1.xind != node2.xind or \ node1.yind != node2.yind or \ node1.yawind != node2.yawind: return False return True def draw_arrow(x, y, yaw, length, car_color): plt.arrow(x, y, length * math.cos(yaw), length * math.sin(yaw), head_length=0.5, head_width=0.5, color=car_color) def draw_model(x, y, yaw, yawt, steer, car_color='blue', trailer_color='green'): car = np.array([[-C.RB, -C.RB, C.RF, C.RF, -C.RB], [C.W / 2, -C.W / 2, -C.W / 2, C.W / 2, C.W / 2]]) # 定义车轮的几何形状 wheel1 = np.array([[-C.TR1, -C.TR1, C.TR1, C.TR1, -C.TR1], [C.TW1 / 2, -C.TW1 / 2, -C.TW1 / 2, C.TW1 / 2, C.TW1 / 2]]) wheel3 = np.array([[-C.TR3, -C.TR3, C.TR3, C.TR3, -C.TR3], [C.TW3 / 2, -C.TW3 / 2, -C.TW3 / 2, C.TW3 / 2, C.TW3 / 2]]) wheel2 = np.array([[-C.TR2, -C.TR2, C.TR2, C.TR2, -C.TR2], [C.TW2 / 2, -C.TW2 / 2, -C.TW2 / 2, C.TW2 / 2, C.TW2 / 2]]) wheel4 = np.array([[-C.TR4, -C.TR4, C.TR4, C.TR4, -C.TR4], [C.TW4 / 2, -C.TW4 / 2, -C.TW4 / 2, C.TW4 / 2, C.TW4 / 2]]) # 复制车轮的几何形状 rlWheel = wheel4.copy() rrWheel = wheel4.copy() frWheel = wheel1.copy() flWheel = wheel1.copy() # 定义拖车的两个矩形 # 拖车主体(水平部分) trail_horizontal = np.array([[-C.Length_hinge_end, -C.Length_hinge_end, C.Length_hinge_front, C.Length_hinge_front, -C.Length_hinge_end], [-C.TRAILER_HORIZONTAL_WIDTH / 2, C.TRAILER_HORIZONTAL_WIDTH / 2, C.TRAILER_HORIZONTAL_WIDTH / 2, -C.TRAILER_HORIZONTAL_WIDTH / 2, -C.TRAILER_HORIZONTAL_WIDTH / 2]]) # 拖车垂直部分 trail_vertical = np.array([[-C.Length_rear_main_gear + C.TRAILER_VERTICAL_WIDTH / 2, -C.Length_rear_main_gear + C.TRAILER_VERTICAL_WIDTH / 2, -C.Length_rear_main_gear - C.TRAILER_VERTICAL_WIDTH / 2, -C.Length_rear_main_gear - C.TRAILER_VERTICAL_WIDTH / 2, -C.Length_rear_main_gear + C.TRAILER_VERTICAL_WIDTH / 2], [-C.TRAILER_VERTICAL_LENGTH / 2, C.TRAILER_VERTICAL_LENGTH / 2, C.TRAILER_VERTICAL_LENGTH / 2, -C.TRAILER_VERTICAL_LENGTH / 2, -C.TRAILER_VERTICAL_LENGTH / 2]]) # 复制拖车轮 rltWheel = wheel2.copy() rrtWheel = wheel2.copy() rltWheel[0, :] -= C.HINGE_DISTANCE_RW rltWheel[1, :] += C.DISTANCE_MAIN_GEAR / 2 rrtWheel[0, :] -= C.HINGE_DISTANCE_RW rrtWheel[1, :] += - C.DISTANCE_MAIN_GEAR / 2 fltWheel = wheel3.copy() # 定义旋转矩阵 Rot1 = np.array([[math.cos(yaw), -math.sin(yaw)], [math.sin(yaw), math.cos(yaw)]]) Rot2 = np.array([[math.cos(steer), -math.sin(steer)], [math.sin(steer), math.cos(steer)]]) Rot3 = np.array([[math.cos(yawt), -math.sin(yawt)], [math.sin(yawt), math.cos(yawt)]]) # 旋转前轮 frWheel = np.dot(Rot2, frWheel) flWheel = np.dot(Rot2, flWheel) # 调整车轮的位置 frWheel += np.array([[C.WB], [-C.WD / 2]]) flWheel += np.array([[C.WB], [C.WD / 2]]) rrWheel[1, :] -= C.WD / 2 rlWheel[1, :] += C.WD / 2 # 旋转车轮和车身 frWheel = np.dot(Rot1, frWheel) flWheel = np.dot(Rot1, flWheel) rrWheel = np.dot(Rot1, rrWheel) rlWheel = np.dot(Rot1, rlWheel) car = np.dot(Rot1, car) hinge_point = np.array([C.HINGE_DISTANCE * math.cos(yaw), C.HINGE_DISTANCE * math.sin(yaw)]) # 调整拖车轮和拖车的位置 fltWheel = np.dot(Rot3, fltWheel) rltWheel = np.dot(Rot3, rltWheel) rrtWheel = np.dot(Rot3, rrtWheel) fltWheel[0, :] += hinge_point[0] fltWheel[1, :] += hinge_point[1] rltWheel[0, :] += hinge_point[0] rltWheel[1, :] += hinge_point[1] rrtWheel[0, :] += hinge_point[0] rrtWheel[1, :] += hinge_point[1] # 旋转拖车主体 trail_horizontal = np.dot(Rot3, trail_horizontal) trail_horizontal[0, :] += hinge_point[0] trail_horizontal[1, :] += hinge_point[1] # 旋转拖车垂直部分90度 trail_vertical = np.dot(Rot3, trail_vertical) trail_vertical[0, :] += hinge_point[0] trail_vertical[1, :] += hinge_point[1] # 平移所有部件到车辆中心 frWheel += np.array([[x], [y]]) flWheel += np.array([[x], [y]]) rrWheel += np.array([[x], [y]]) rlWheel += np.array([[x], [y]]) rrtWheel += np.array([[x], [y]]) rltWheel += np.array([[x], [y]]) fltWheel += np.array([[x], [y]]) car += np.array([[x], [y]]) trail_horizontal += np.array([[x], [y]]) trail_vertical += np.array([[x], [y]]) # 绘制车辆和拖车 plt.plot(car[0, :], car[1, :], car_color) plt.plot(trail_horizontal[0, :], trail_horizontal[1, :], trailer_color) plt.plot(trail_vertical[0, :], trail_vertical[1, :], trailer_color) plt.plot(frWheel[0, :], frWheel[1, :], car_color) plt.plot(rrWheel[0, :], rrWheel[1, :], car_color) plt.plot(flWheel[0, :], flWheel[1, :], car_color) plt.plot(rlWheel[0, :], rlWheel[1, :], car_color) plt.plot(rrtWheel[0, :], rrtWheel[1, :], trailer_color) plt.plot(rltWheel[0, :], rltWheel[1, :], trailer_color) plt.plot(fltWheel[0, :], fltWheel[1, :], trailer_color) draw_arrow(x, y, yaw, C.WB * 0.8, car_color) def design_obstacles(): ox, oy = [], [] for i in range(40, 150): ox.append(i) oy.append(70) for i in range(40, 150): ox.append(i) oy.append(10) for j in range(-100, 160): ox.append(-50) oy.append(j) for j in range(-100, 10): ox.append(40) oy.append(j) for j in range(70, 160): ox.append(40) oy.append(j) for j in range(80, 90): ox.append(-20) oy.append(j) for j in range(80, 90): ox.append(-30) oy.append(j) for i in range(-30, -20): ox.append(i) oy.append(80) for i in range(-30, -20): ox.append(i) oy.append(90) return ox, oy def test(x, y, yaw, yawt, ox, oy): d = 0.5 deltal = (C.RTF - C.RTB) / 2.0 rt = (C.RTF + C.RTB) / 2.0 + d ctx = x + deltal * math.cos(yawt) cty = y + deltal * math.sin(yawt) deltal = (C.RF - C.RB) / 2.0 rc = (C.RF + C.RB) / 2.0 + d xot = ox - ctx yot = oy - cty dx_trail = xot * math.cos(yawt) + yot * math.sin(yawt) dy_trail = -xot * math.sin(yawt) + yot * math.cos(yawt) if abs(dx_trail) <= rt - d and \ abs(dy_trail) <= C.W / 2.0: print("test1: Collision") else: print("test1: No collision") # test 2 cx = x + deltal * math.cos(yaw) cy = y + deltal * math.sin(yaw) xo = ox - cx yo = oy - cy dx_car = xo * math.cos(yaw) + yo * math.sin(yaw) dy_car = -xo * math.sin(yaw) + yo * math.cos(yaw) if abs(dx_car) <= rc - d and \ abs(dy_car) <= C.W / 2.0: print("test2: Collision") else: print("test2: No collision") theta = np.linspace(0, 2 * np.pi, 200) x1 = ctx + np.cos(theta) * rt y1 = cty + np.sin(theta) * rt x2 = cx + np.cos(theta) * rc y2 = cy + np.sin(theta) * rc plt.plot(x1, y1, 'b') plt.plot(x2, y2, 'g') plt.plot(ox, oy, 'sr') plt.plot([-rc, -rc, rc, rc, -rc], [C.W / 2, -C.W / 2, -C.W / 2, C.W / 2, C.W / 2]) plt.plot([-rt, -rt, rt, rt, -rt], [C.W / 2, -C.W / 2, -C.W / 2, C.W / 2, C.W / 2]) plt.plot(dx_car, dy_car, 'sr') plt.plot(dx_trail, dy_trail, 'sg') def main(): print("start!") sx, sy = 70, 40 # [m] syaw0 = np.deg2rad(0.0) syawt = np.deg2rad(0.0) gx, gy = 10, 130 # [m] gyaw0 = np.deg2rad(90.0) gyawt = np.deg2rad(90.0) ox, oy = design_obstacles() plt.plot(ox, oy, 'sk') draw_model(sx, sy, syaw0, syawt, 0.0) draw_model(gx, gy, gyaw0, gyawt, 0.0) # test(sx, sy, syaw0, syawt, 3.5, 32) plt.xlabel('横坐标/米', fontdict={'family': 'SimSun', 'size': 18}) plt.ylabel('纵坐标/米', fontdict={'family': 'SimSun', 'size': 18}) plt.axis("equal") plt.show() # 保存初始图像 plt.savefig('frame_000.png') frames = ['frame_000.png'] oox, ooy = ox[:], oy[:] t0 = time.time() path = hybrid_astar_planning(sx, sy, syaw0, syawt, gx, gy, gyaw0, gyawt, oox, ooy, C.XY_RESO, C.YAW_RESO) t1 = time.time() print("running T: ", t1 - t0) x = path.x y = path.y yaw = path.yaw yawt = path.yawt direction = path.direction plt.pause(10) # 创建动图 with imageio.get_writer('gif/hybrid_astar_4.gif', mode='I') as writer: # 输出牵引车和挂车的横摆角 vehicle_yaws = [] trailer_yaws = [] HINGE_x = [] HINGE_y = [] rear_x = [] rear_y = [] wingtip_l_x = [] wingtip_l_y = [] wingtip_r_x = [] wingtip_r_y = [] hinge_angle = [] # 添加代码记录路径规划过程中的牵引车和挂车的横摆角 for k in range(len(x)): plt.cla() plt.plot(ox, oy, "sk") plt.xlabel('横坐标/米', fontdict={'family': 'SimSun', 'size': 18}) plt.ylabel('纵坐标/米', fontdict={'family': 'SimSun', 'size': 18}) plt.plot(x, y, linewidth=1.5, color='r') # 记录牵引车和飞机的横摆角 current_vehicle_yaw = yaw[k] current_trailer_yaw = yawt[k] vehicle_yaws.append(current_vehicle_yaw) trailer_yaws.append(current_trailer_yaw) hinge_angle.append(current_vehicle_yaw - current_trailer_yaw) hinge_x = x[k] + C.HINGE_DISTANCE * math.cos(yaw[k]) hinge_y = y[k] + C.HINGE_DISTANCE * math.sin(yaw[k]) rear_x.append(x[k]) rear_y.append(y[k]) HINGE_x.append(hinge_x) HINGE_y.append(hinge_y) wingtip_l_current_x = np.array(-C.HINGE_DISTANCE_RW) wingtip_l_current_y = np.array(C.TRAILER_VERTICAL_LENGTH / 2) wingtip_r_current_x = np.array(-C.HINGE_DISTANCE_RW) wingtip_r_current_y = np.array(-C.TRAILER_VERTICAL_LENGTH / 2) final_wingtip_l_current_x = wingtip_l_current_x * math.cos(yawt[k]) - \ math.sin(yawt[k])*wingtip_l_current_y + hinge_x final_wingtip_l_current_y = wingtip_l_current_x * math.sin(yawt[k]) + \ math.cos(yawt[k])*wingtip_l_current_y + hinge_y final_wingtip_r_current_x = wingtip_r_current_x * math.cos(yawt[k]) - \ math.sin(yawt[k]) * wingtip_r_current_y + hinge_x final_wingtip_r_current_y = wingtip_r_current_x * math.sin(yawt[k]) + \ math.cos(yawt[k]) * wingtip_r_current_y + hinge_y wingtip_l_x.append(final_wingtip_l_current_x) wingtip_l_y.append(final_wingtip_l_current_y) wingtip_r_x.append(final_wingtip_r_current_x) wingtip_r_y.append(final_wingtip_r_current_y) if k < len(x) - 2: dy = (yaw[k + 1] - yaw[k]) / C.MOVE_STEP steer = math.atan(C.WB * dy / direction[k]) else: steer = 0.0 draw_model(gx, gy, gyaw0, gyawt, 0.0, 'gray') draw_model(x[k], y[k], yaw[k], yawt[k], steer) plt.axis("equal") # 保存当前帧 frame_name = f'frame_{k + 1:03d}.png' plt.savefig(frame_name) frames.append(frame_name) # 将当前帧添加到GIF image = imageio.imread(frame_name) writer.append_data(image) plt.pause(0.0001) # 在路径规划完成后,输出牵引车和挂车的横摆角 time_steps = list(range(len(vehicle_yaws))) data = { 'rear_x': rear_x, 'rear_y': rear_y, 'vehicle_yaws': vehicle_yaws, 'trailer_yaws': trailer_yaws, 'HINGE_x': HINGE_x, 'HINGE_y': HINGE_y, 'hinge_angle': hinge_angle, 'wingtip_l_x': wingtip_l_x, 'wingtip_l_y': wingtip_l_y, 'wingtip_r_x': wingtip_r_x, 'wingtip_r_y': wingtip_r_y, } # 创建DataFrame df = pd.DataFrame(data) # 保存到Excel df.to_excel('vehicle_data3.xlsx', index=False) # 绘制牵引车横摆角 plt.figure(figsize=(10, 6)) plt.plot(time_steps, [np.rad2deg(vy) for vy in vehicle_yaws], 'b-', label='牵引车横摆角') plt.xlabel('时间步长', fontdict={'family': 'SimSun', 'size': 18}) plt.ylabel('横摆角/°', fontdict={'family': 'SimSun', 'size': 18}) plt.xlim(0, 900) plt.xticks(np.arange(0, 900, 100)) plt.legend(labels=['牵引车横摆角'], prop={'family': 'SimSun', 'size': 16}) plt.savefig('TLTV-yaw3.png') # 绘制挂车横摆角 plt.figure(figsize=(10, 6)) plt.plot(time_steps, [np.rad2deg(tr_y) for tr_y in trailer_yaws], 'r--', label='飞机横摆角') plt.xlabel('时间步长', fontdict={'family': 'SimSun', 'size': 18}) plt.ylabel('横摆角/°', fontdict={'family': 'SimSun', 'size': 18}) plt.xlim(0, 900) plt.xticks(np.arange(0, 900, 100)) plt.legend(labels=['飞机横摆角'], prop={'family': 'SimSun', 'size': 16}) # 保存图表为图片文件 plt.savefig('aircraft-yaw3.png') plt.figure(figsize=(10, 6)) plt.cla() plt.plot(time_steps, [np.rad2deg(hinge) for hinge in hinge_angle], 'k-', label='铰接角' ) plt.xlim(0, 900) plt.xticks(np.arange(0, 900, 100)) plt.ylim(-70, 70) plt.xlabel('时间步长', fontdict={'family': 'SimSun', 'size': 18}) plt.ylabel('铰接角/°', fontdict={'family': 'SimSun', 'size': 18}) plt.savefig('articulated-angle3.png') plt.figure(figsize=(10, 6)) plt.cla() plt.plot(ox, oy, "sk") plt.xlabel('横坐标/米', fontdict={'family': 'SimSun', 'size': 18}) plt.ylabel('纵坐标/米', fontdict={'family': 'SimSun', 'size': 18}) plt.plot(wingtip_l_x, wingtip_l_y, linewidth=1.5, color='r') plt.savefig('wingtip_left3.png') plt.figure(figsize=(10, 6)) plt.cla() plt.plot(ox, oy, "sk") plt.xlabel('横坐标/米', fontdict={'family': 'SimSun', 'size': 18}) plt.ylabel('纵坐标/米', fontdict={'family': 'SimSun', 'size': 18}) plt.plot(wingtip_r_x, wingtip_r_y, linewidth=1.5, color='r') plt.savefig('wingtip_right3.png') # 清理临时文件 #for frame in frames: # os.remove(frame) plt.show() print("Done") if __name__ == '__main__': main() 地图构建部分在哪

filetype

import logging import time from functools import cached_property from typing import Any from lerobot.cameras.utils import make_cameras_from_configs from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.motors import Motor, MotorCalibration, MotorNormMode from lerobot.motors.dynamixel import ( DynamixelMotorsBus, OperatingMode, ) from ..robot import Robot from ..utils import ensure_safe_goal_position from .config_koch_follower import KochFollowerConfig logger = logging.getLogger(__name__) class KochFollower(Robot): """ - [Koch v1.0](https://github.com/AlexanderKoch-Koch/low_cost_robot), with and without the wrist-to-elbow expansion, developed by Alexander Koch from [Tau Robotics](https://tau-robotics.com) - [Koch v1.1](https://github.com/jess-moss/koch-v1-1) developed by Jess Moss """ config_class = KochFollowerConfig name = "koch_follower" def __init__(self, config: KochFollowerConfig): super().__init__(config) self.config = config norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100 self.bus = DynamixelMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "xl430-w250", norm_mode_body), "shoulder_lift": Motor(2, "xl430-w250", norm_mode_body), "elbow_flex": Motor(3, "xl330-m288", norm_mode_body), "wrist_flex": Motor(4, "xl330-m288", norm_mode_body), "wrist_roll": Motor(5, "xl330-m288", norm_mode_body), "gripper": Motor(6, "xl330-m288", MotorNormMode.RANGE_0_100), }, calibration=self.calibration, ) self.cameras = make_cameras_from_configs(config.cameras) @property def _motors_ft(self) -> dict[str, type]: return {f"{motor}.pos": float for motor in self.bus.motors} @property def _cameras_ft(self) -> dict[str, tuple]: return { cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras } @cached_property def observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property def action_features(self) -> dict[str, type]: return self._motors_ft @property def is_connected(self) -> bool: return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) def connect(self, calibrate: bool = True) -> None: """ We assume that at connection time, arm is in a rest position, and torque can be safely disabled to run calibration. """ if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() for cam in self.cameras.values(): cam.connect() self.configure() logger.info(f"{self} connected.") @property def is_calibrated(self) -> bool: return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") self.bus.disable_torque() for motor in self.bus.motors: self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) input(f"Move {self} to the middle of its range of motion and press ENTER....") homing_offsets = self.bus.set_half_turn_homings() full_turn_motors = ["shoulder_pan", "wrist_roll"] unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors] print( f"Move all joints except {full_turn_motors} sequentially through their entire " "ranges of motion.\nRecording positions. Press ENTER to stop..." ) range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) for motor in full_turn_motors: range_mins[motor] = 0 range_maxes[motor] = 4095 self.calibration = {} for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, homing_offset=homing_offsets[motor], range_min=range_mins[motor], range_max=range_maxes[motor], ) self.bus.write_calibration(self.calibration) self._save_calibration() logger.info(f"Calibration saved to {self.calibration_fpath}") def configure(self) -> None: with self.bus.torque_disabled(): self.bus.configure_motors() # Use 'extended position mode' for all motors except gripper, because in joint mode the servos # can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling # the arm, you could end up with a servo with a position 0 or 4095 at a crucial point for motor in self.bus.motors: if motor != "gripper": self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value) # Use 'position control current based' for gripper to be limited by the limit of the current. For # the follower gripper, it means it can grasp an object without forcing too much even tho, its # goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch). # For the leader gripper, it means we can use it as a physical trigger, since we can force with # our finger to make it move, and it will move back to its original target position when we # release the force. self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) # Set better PID values to close the gap between recorded states and actions # TODO(rcadene): Implement an automatic procedure to set optimal PID values for each motor self.bus.write("Position_P_Gain", "elbow_flex", 1500) self.bus.write("Position_I_Gain", "elbow_flex", 0) self.bus.write("Position_D_Gain", "elbow_flex", 600) def setup_motors(self) -> None: for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") self.bus.setup_motor(motor) print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") def get_observation(self) -> dict[str, Any]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") # Read arm position start = time.perf_counter() obs_dict = self.bus.sync_read("Present_Position") obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): start = time.perf_counter() obs_dict[cam_key] = cam.async_read() dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict def send_action(self, action: dict[str, float]) -> dict[str, float]: """Command arm to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter `max_relative_target`. In this case, the action sent differs from original action. Thus, this function always returns the action actually sent. Args: action (dict[str, float]): The goal positions for the motors. Returns: dict[str, float]: The action sent to the motors, potentially clipped. """ if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: present_pos = self.bus.sync_read("Present_Position") goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the arm self.bus.sync_write("Goal_Position", goal_pos) return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") self.bus.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() logger.info(f"{self} disconnected.")