file-type

OpenMV在无人机中的应用:光流定点与直角转弯技术

ZIP文件

下载需积分: 5 | 16KB | 更新于2024-10-07 | 117 浏览量 | 0 下载量 举报 收藏
download 立即下载
教程中主要涉及到的技术和知识点包括光流传感器的原理、无人机的控制算法、定点、巡线以及直角转弯的实现方式,以及如何通过openmv实现这些功能。 光流定点(Optical Flow)是一种基于视觉的传感器,它可以测量传感器与地面上点的相对运动。在无人机领域,光流定点通常被用于实现无人机在飞行中对地面的相对定位,以维持稳定或者在特定位置进行悬停。 巡线(Line Following)技术则是通过检测和跟踪路径(例如地面上的线条或地标)来引导无人机沿着预定路径飞行。这项技术在自动导航和自主飞行中非常关键,它涉及到图像处理和模式识别的知识。 直角转弯(Right Angle Turn)是无人机导航中的一个特定动作,要求无人机能够精确控制转弯的角度,完成直角转弯,这对于执行复杂飞行任务或避免障碍物至关重要。 openmv是一个开源的机器视觉模块,它支持Python语言进行编程,非常适合用于实现无人机的视觉相关功能。openmv模块通常具备图像采集、处理和算法实现的功能,能够较为简单地集成到无人机系统中。 教程中涉及的文件目录名为optical_flow-master,表明这可能是一个版本控制(如Git)下的项目主分支(master),包含了所有核心代码、文档和资源文件。具体到本资源,读者可以期待在代码中找到实现光流定点、巡线和直角转弯功能的函数和类,以及对应的使用示例和说明文档。 本教程的目的是为了让无人机爱好者和开发者能够通过openmv和Python语言,快速上手无人机的高级功能开发,理解视觉传感器在无人机自主飞行中的作用,并能够应用这些技术解决实际问题。"

相关推荐

filetype

import rclpy from rclpy.node import Node from sensor_msgs.msg import CompressedImage from nav_msgs.msg import Odometry from geometry_msgs.msg import Twist from cv_bridge import CvBridge import cv2 import numpy as np import math import time from transforms3d.euler import quat2euler from test1_yolo_bin import YoloNV12Inferencer bridge = CvBridge() class ImageProcessor(Node): def __init__(self): super().__init__('image_processor_node') self.subscription = self.create_subscription( CompressedImage, '/image', self.image_callback, 10) self.odom_sub = self.create_subscription( Odometry, '/odom', self.odom_callback, 10) self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10) self.lost_lane_counter = 0 self.lost_lane_threshold = 5 self.inferencer = YoloNV12Inferencer('/root/dev_ws/3w/bin/yolov5s_672x672_nv12.bin') self.get_logger().info("✅ YOLO模型加载成功") self.image_cx = self.declare_parameter('image_cx', 320.0).value self.focal_length = self.declare_parameter('focal_length', 550.0).value self.target_height = self.declare_parameter('target_height', 0.3).value # 避障状态及阶段 self.in_avoidance = False self.avoid_stage = 0 # 0=正常巡线,1=避障转弯,2=回正转弯 self.avoid_direction = None # 'left' or 'right' self.avoid_start_time = None # 当前车头朝向 self.current_yaw = 0.0 # 保存最新图像,用于回正阶段检测车道线 self.latest_frame = None self.timer = self.create_timer(0.05, self.control_loop) # 20Hz执行控制 def odom_callback(self, msg): q = msg.pose.pose.orientation _, _, yaw = quat2euler([q.x, q.y, q.z, q.w]) self.current_yaw = yaw def control_speed(self, linear_speed, angular_z): msg = Twist() msg.linear.x = linear_speed msg.angular.z = angular_z self.publisher_.publish(msg) # 你的 img2alpha 和 detect_lane 函数保持不变 def img2alpha(self, frame): hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) lower_black = np.array([0, 0, 0]) upper_black = np.array([180, 255, 50]) black_mask = cv2.inRange(hsv, lower_black, upper_black) black_mask = cv2.dilate(black_mask, None, iterations=5) black_mask = cv2.erode(black_mask, None, iterations=5) detection_band = 50 x_coords = np.where(black_mask[-detection_band:] == 255)[1] overlay_color = np.full_like(frame[-detection_band:, :], (0, 255, 0)) frame[-detection_band:, :] = cv2.addWeighted( frame[-detection_band:, :], 0.7, overlay_color, 0.3, 0 ) if len(x_coords) != 0: self.lost_lane_counter = 0 mean_x = np.mean(x_coords) h, w = frame.shape[:2] alpha = float(-(mean_x - w / 2) / (w / 2)) v = 0.3 else: self.lost_lane_counter += 1 self.get_logger().info(f"🚫 未识别车道线: {self.lost_lane_counter} 帧") if self.lost_lane_counter >= self.lost_lane_threshold: self.get_logger().info("🛑 连续 5 帧未识别,停车") v = 0.0 alpha = 0.0 else: v = 0.2 alpha = 0 time.sleep(0.8) return v, alpha def detect_lane(self, frame): hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) lower_black = np.array([0, 0, 0]) upper_black = np.array([180, 255, 50]) black_mask = cv2.inRange(hsv, lower_black, upper_black) detection_band = 50 x_coords = np.where(black_mask[-detection_band:] == 255)[1] return len(x_coords) > 0 def detect_obstacle_on_lane(self, frame): detections = self.inferencer.infer(frame) h_img, w_img = frame.shape[:2] for class_id, score, (x1, y1, x2, y2) in detections: if class_id != 0: continue bbox_h = y2 - y1 if bbox_h <= 0: continue distance = (self.target_height * self.focal_length) / bbox_h if distance < 0.6: cx = (x1 + x2) // 2 offset = cx - self.image_cx if abs(offset) < w_img * 0.4: center_threshold = w_img * 0.1 if abs(offset) < center_threshold: direction = 'right' else: direction = 'left' if offset < 0 else 'right' self.get_logger().info(f"⚠️ 触发避障: 距离 {distance:.2f} m, 方位: {direction}") return True, direction return False, None def image_callback(self, msg): img = bridge.compressed_imgmsg_to_cv2(msg) h1, w = img.shape[:2] bili = 0.4 img_cropped = img[int(h1 * bili):, :] frame = cv2.resize(img_cropped, (160, 120)) self.latest_frame = frame # 保存最新帧供回正检测车道线用 if not self.in_avoidance: obstacle_detected, direction = self.detect_obstacle_on_lane(frame) if obstacle_detected: self.in_avoidance = True self.avoid_stage = 1 self.avoid_direction = direction self.avoid_start_time = self.get_clock().now() self.get_logger().info(f"🚗 避障开始,方向: {direction}") return if not self.in_avoidance: # 保留你原有巡线逻辑不变 v, alpha = self.img2alpha(frame.copy()) self.get_logger().info(f"当前速度和角度为: v={v:.2f}, alpha={alpha:.2f}") self.control_speed(v, alpha) def control_loop(self): if not self.in_avoidance: return if self.latest_frame is None: return now = self.get_clock().now() elapsed = (now - self.avoid_start_time).nanoseconds / 1e9 if self.avoid_stage == 1: angular_speed = 1.046 # 60度/1秒 if self.avoid_direction == 'left': angular_speed = -angular_speed else: angular_speed = angular_speed self.control_speed(0.25, angular_speed) if elapsed >= 1: self.avoid_stage = 2 self.avoid_start_time = now self.get_logger().info("进入回正阶段") elif self.avoid_stage == 2: # 回正时检测车道线,检测到立即切换巡线 if self.detect_lane(self.latest_frame): self.get_logger().info("✅ 回正阶段检测到车道线,立即恢复巡线") self.avoid_stage = 0 self.in_avoidance = False self.avoid_direction = None v, alpha = self.img2alpha(self.latest_frame.copy()) self.control_speed(v, alpha) return angular_speed = 2.093 # 120度/0.6秒 if self.avoid_direction == 'left': angular_speed = angular_speed else: angular_speed = -angular_speed self.control_speed(0.25, angular_speed) if elapsed >= 1: self.get_logger().info("回正完成,恢复巡线") self.avoid_stage = 0 self.in_avoidance = False self.avoid_direction = None v, alpha = self.img2alpha(self.latest_frame.copy()) self.control_speed(v, alpha) def main(args=None): rclpy.init(args=args) node = ImageProcessor() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() 请你帮我检查我的上述代码是否实现了边巡线边避障

filetype

import rclpy from rclpy.node import Node from sensor_msgs.msg import CompressedImage from geometry_msgs.msg import Twist from cv_bridge import CvBridge import cv2 import numpy as np import time # 假设YoloNV12Inferencer类在test1_yolo_bin模块中 from test1_yolo_bin import YoloNV12Inferencer class LineAndObstacleNode(Node): def __init__(self): super().__init__('line_and_obstacle_node') self.bridge = CvBridge() self.subscription = self.create_subscription( CompressedImage, '/image', self.image_callback, 10) self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10) # YOLO 模型加载 try: self.inferencer = YoloNV12Inferencer('/root/dev_ws/3w/bin/yolov5s_672x672_nv12.bin') self.get_logger().info("✅ YOLO模型加载成功") except Exception as e: self.get_logger().error(f"YOLO加载失败: {e}") self.inferencer = None # 避障参数 self.focal_length = 550.0 self.image_cx = 320.0 self.target_height = 0.3 self.repulsion_range = 0.5 self.state = 'normal' # normal / avoiding_turn / avoiding_straight / realignment self.avoid_start_time = 0 self.obstacle_list = [] # 车道线检测参数 self.lost_lane_counter = 0 self.lost_lane_threshold = 5 self.current_frame = None self.last_valid_alpha = 0.0 # 状态超时参数 self.REALIGN_TIMEOUT = 1.5 # 回正阶段最大超时时间 self.AVOID_TURN_DURATION = 0.6 # 避障转向持续时间 self.timer = self.create_timer(0.1, self.control_loop) def image_callback(self, msg): try: img = self.bridge.compressed_imgmsg_to_cv2(msg, desired_encoding='bgr8') # 保存原始图像用于YOLO检测 self.raw_image = img.copy() # --- 巡线区域处理 --- h1, width = img.shape[:2] crop = img[int(h1 * 0.4):, :] self.current_frame = cv2.resize(crop, (160, 120)) # --- YOLO处理 --- if self.inferencer: detections = self.inferencer.infer(self.raw_image) self.obstacle_list = [] for class_id, score, (x1, y1, x2, y2) in detections: h = y2 - y1 if h <= 0: continue distance = (self.target_height * self.focal_length) / h cx = (x1 + x2) / 2 offset_y = (cx - self.image_cx) * distance / self.focal_length if class_id == 0: # 假设0是障碍物类别 self.obstacle_list.append((distance, offset_y)) except Exception as e: self.get_logger().error(f"图像处理错误: {e}") def detect_lane_line(self, frame): """检测车道线并返回速度、转向角和检测状态""" if frame is None: return 0.0, 0.0, False try: hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # 黑色线条 HSV 阈值 lower_black = np.array([0, 0, 0]) upper_black = np.array([180, 255, 50]) black_mask = cv2.inRange(hsv, lower_black, upper_black) # 形态学操作增强检测 kernel = np.ones((5, 5), np.uint8) black_mask = cv2.morphologyEx(black_mask, cv2.MORPH_OPEN, kernel) black_mask = cv2.dilate(black_mask, kernel, iterations=2) # 检测区域高度 detection_band = 40 x_coords = np.where(black_mask[-detection_band:] == 255)[1] if len(x_coords) > 50: # 确保有足够的像素点 self.lost_lane_counter = 0 mean_x = np.mean(x_coords) h, w = frame.shape[:2] alpha = float(-(mean_x - w / 2) / (w / 2)) self.last_valid_alpha = alpha # 保存有效转向角 return 0.2, alpha, True else: self.lost_lane_counter += 1 if self.lost_lane_counter >= self.lost_lane_threshold: return 0.0, 0.0, False else: # 使用最后有效的转向角 return 0.10, self.last_valid_alpha, False except Exception as e: self.get_logger().error(f"车道线检测错误: {e}") return 0.0, 0.0, False def control_loop(self): now = time.time() elapsed_time = now - self.avoid_start_time # 障碍物检测(仅考虑前方0.5米内的障碍物) valid_obs = [o for o in self.obstacle_list if 0 < o[0] < self.repulsion_range and abs(o[1]) < 0.5] # 状态机实现 if self.state == 'avoiding_turn': # 右转避障阶段 if elapsed_time < self.AVOID_TURN_DURATION: self.publish_cmd(0.15, -0.8) # 右转避障 else: self.state = 'realignment' self.avoid_start_time = now self.get_logger().info("🔄 进入回正阶段") elif self.state == 'realignment': # 回正阶段(优先检测车道线) v, alpha, lane_detected = self.detect_lane_line(self.current_frame) if lane_detected: self.state = 'normal' self.get_logger().info("✅ 检测到车道线,恢复巡线") self.publish_cmd(v, alpha) elif elapsed_time > self.REALIGN_TIMEOUT: self.state = 'normal' self.get_logger().warn("⏱️ 回正超时,强制恢复巡线") else: # 缓慢直行并轻微右转寻找车道线 self.publish_cmd(0.1, -0.3) elif valid_obs and self.state == 'normal': # 检测到障碍物,开始避障. self.state = 'avoiding_turn' self.avoid_start_time = now self.get_logger().warn(f"🚧 检测到障碍物,距离={min(o[0] for o in valid_obs):.2f}m") else: # 正常巡线模式 v, alpha, lane_detected = self.detect_lane_line(self.current_frame) if lane_detected: self.publish_cmd(v, alpha) else: # 未检测到车道线时的安全策略 self.publish_cmd(0.05, self.last_valid_alpha) def publish_cmd(self, speed, steer): """发布控制指令,带平滑过渡""" msg = Twist() msg.linear.x = float(speed) msg.angular.z = float(steer) self.publisher_.publish(msg) def main(args=None): rclpy.init(args=args) node = LineAndObstacleNode() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() 在这个代码的基础上加

filetype

import rclpy from rclpy.node import Node from sensor_msgs.msg import CompressedImage from geometry_msgs.msg import Twist, Point from std_msgs.msg import Bool from cv_bridge import CvBridge import cv2 import numpy as np import math import time from racecar_description.msg import ObstacleArray from test1_yolo_bin import YoloNV12Inferencer class LineAndObstacleNode(Node): def __init__(self): super().__init__('line_and_obstacle_node') self.bridge = CvBridge() self.subscription = self.create_subscription( CompressedImage, '/image', self.image_callback, 10) self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10) # YOLO 模型加载 try: self.inferencer = YoloNV12Inferencer('/root/dev_ws/3w/bin/yolov5s_672x672_nv12.bin') self.get_logger().info("✅ YOLO模型加载成功") except Exception as e: self.get_logger().error(f"YOLO加载失败: {e}") self.inferencer = None # 避障参数 self.focal_length = 550.0 self.image_cx = 320.0 self.target_height = 0.3 self.repulsion_range = 0.5 self.state = 'normal' # normal / avoiding_turn / avoiding_straight self.avoid_start_time = 0 self.obstacle_list = [] self.lost_lane_counter = 0 self.lost_lane_threshold = 5 self.timer = self.create_timer(0.1, self.control_loop) def image_callback(self, msg): img = self.bridge.compressed_imgmsg_to_cv2(msg, desired_encoding='bgr8') # --- 巡线区域处理 --- h1, width = img.shape[:2] crop = img[int(h1 * 0.4):, :] self.line_frame = cv2.resize(crop, (160, 120)) v, alpha = self.img2alpha(self.line_frame.copy()) print("当前角度: ", alpha) self.publish_cmd(alpha, v) # --- YOLO处理 --- if self.inferencer: detections = self.inferencer.infer(img) self.obstacle_list = [] for class_id, score, (x1, y1, x2, y2) in detections: h = y2 - y1 if h <= 0: continue distance = (self.target_height * self.focal_length) / h cx = (x1 + x2) / 2 offset_y = (cx - self.image_cx) * distance / self.focal_length if class_id == 0: self.obstacle_list.append((distance, offset_y)) def img2alpha(self, frame): hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # 黑色线条 HSV 阈值 lower_black = np.array([0, 0, 0]) upper_black = np.array([180, 255, 50]) black_mask = cv2.inRange(hsv, lower_black, upper_black) black_mask = cv2.dilate(black_mask, None, iterations=5) black_mask = cv2.erode(black_mask, None, iterations=5) # ✅ 扩大检测区域高度 detection_band = 40 x_coords = np.where(black_mask[-detection_band:] == 255)[1] # ✅ 可视化检测区域(绿色透明覆盖,便于调试) overlay_color = np.full_like(frame[-detection_band:, :], (0, 255, 0)) frame[-detection_band:, :] = cv2.addWeighted( frame[-detection_band:, :], 0.7, overlay_color, 0.3, 0 ) if len(x_coords) != 0: self.lost_lane_counter = 0 mean_x = np.mean(x_coords) frame[:, int(mean_x)] = [0, 0, 255] h, w = frame.shape[:2] alpha = float(-(mean_x - w / 2) / (w / 2)) # 范围约 -1 到 1 v = 0.2 else: self.lost_lane_counter += 1 print(f"🚫 未识别车道线: {self.lost_lane_counter} 帧") if self.lost_lane_counter >= self.lost_lane_threshold: print("🛑 连续 5 帧未识别,停车") v = 0.0 alpha = 0.0 else: v = 0.10 alpha = 0.15 return v, alpha def control_loop(self): now = time.time() valid_obs = [o for o in self.obstacle_list if 0 < o[0] < self.repulsion_range and abs(o[1]) < 0.5] # 状态1:正在右转避障 if self.state == 'avoiding_turn': if now - self.avoid_start_time < 0.6: self.publish_cmd(0.15, -0.8) # 小半圆右转 return else: self.state = 'avoiding_straight' self.avoid_start_time = now # 进入下一阶段,重新计时 return # 状态2:避障后回正 elif self.state == 'avoiding_straight': if hasattr(self, 'line_frame'): v, alpha = self.img2alpha(self.line_frame) if self.lost_lane_counter == 0: self.get_logger().info("✅ 避障后检测到车道线,恢复巡线") self.state = 'normal' self.publish_cmd(v, alpha) return if now - self.avoid_start_time < 0.6: # 仍然朝障碍物方向轻微回正(使用障碍物偏移方向) if valid_obs: obs_y = valid_obs[0][1] steer = 0.4 if obs_y < 0 else -0.4 else: steer = 0.0 self.publish_cmd(0.1 , steer) return else: self.get_logger().info("⏱️ 避障结束,恢复巡线") self.state = 'normal' return # 状态3:检测到障碍物,开始避障 if valid_obs: self.get_logger().warn("🚧 检测到障碍物,执行右转避障") self.state = 'avoiding_turn' self.avoid_start_time = now return # 状态4:正常巡线 self.get_logger().info("🚗 当前状态:正常巡线") if hasattr(self, 'line_frame'): v, alpha = self.img2alpha(self.line_frame) self.publish_cmd(v, alpha) def publish_cmd(self, speed, steer): msg = Twist() msg.linear.x = speed msg.angular.z = steer self.publisher_.publish(msg) def main(args=None): rclpy.init(args=args) node = LineAndObstacleNode() try: rclpy.spin(node) except KeyboardInterrupt: pass node.destroy_node() rclpy.shutdown() 0 if __name__ == '__main__': main() 上述代码实现了巡线加上避障的功能,请你帮我检查检查看看上述代码转弯大小和逻辑是否有问题?小车只用躲避一个障碍物,重点检查小车在躲避完这个障碍物之后回正阶段要是检测到车道线一定要巡线!!

filetype
好家伙VCC
  • 粉丝: 4545
上传资源 快速赚钱