活动介绍

#!/usr/local/bin/python3 # -*- coding: utf-8 -*- import asyncio import argparse import os import shutil import sys import time import signal from state import ProcessManagerState from queue_runner import QueueRunner from comm_interface import CommInterface import ipaddress import logging from dotenv import load_dotenv load_dotenv() if __name__ == '__main__': PROCESSMANAGER_DATA_PATH = '/data' PROCESSMANAGER_LOG_PATH = os.path.join(PROCESSMANAGER_DATA_PATH, 'logs') PROCESSMANAGER_LOG_ARCHIVE_PATH = os.path.join(PROCESSMANAGER_LOG_PATH, 'archive') if not os.path.exists(PROCESSMANAGER_LOG_PATH): os.makedirs(PROCESSMANAGER_LOG_PATH) if not os.path.exists(PROCESSMANAGER_LOG_ARCHIVE_PATH): os.makedirs(PROCESSMANAGER_LOG_ARCHIVE_PATH) parser = argparse.ArgumentParser(description='Processmanager for Robot Test Executions') parser.add_argument('--robot_ip', type=str, help="Robot vacuum IP") args = parser.parse_args() logfile_path = os.path.join(PROCESSMANAGER_LOG_PATH, f"processmanager_{args.robot_ip}.log") # in case an old log does exist: keep and rename it if os.path.exists(logfile_path) and os.path.isfile(logfile_path): archive_timestamp = time.time() archived_logfile_path = os.path.join(PROCESSMANAGER_LOG_ARCHIVE_PATH, f"processmanager_{args.robot_ip}_{archive_timestamp}.log") shutil.move(logfile_path, archived_logfile_path) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(logfile_path), logging.StreamHandler() # when you want to see log print in console ] ) logging.log(logging.INFO, "Booting Processmanager..") print("Starting Processmanager", flush=True) try: _ = ipaddress.ip_address(args.robot_ip) except ValueError: print(f"Error: Invalid IP Address provided: {args.robot_ip}") logging.error(f"Error: Invalid IP Address provided: {args.robot_ip}") raise ValueError loop = asyncio.get_event_loop() async_lock = asyncio.Lock() processmanager_state = ProcessManagerState(args.robot_ip, loop) queue_runner = QueueRunner(args.robot_ip, processmanager_state, async_lock) comm_interface = CommInterface(args.robot_ip, processmanager_state, async_lock) asyncio.ensure_future(queue_runner.queue_item_execution()) asyncio.ensure_future(comm_interface.fetch_rabbitmq()) def handle_sigterm(): loop.stop() logging.log(logging.INFO, "Received SIGTERM: Shutting down Processmanager..") processmanager_state.set_queue_offline() sys.exit(0) loop.add_signal_handler(signal.SIGTERM, handle_sigterm) loop.add_signal_handler(signal.SIGINT, handle_sigterm) print("State, Queue Runner and Comm Interface initialized, starting loops..", flush=True) loop.run_forever()

时间: 2025-07-16 16:13:48 浏览: 4
<think>嗯,我现在需要分析用户提供的这段Python脚本,并按照他们的要求用中文解释清楚。首先,我得仔细看一下用户给出的系统级指令,确保在回答过程中不违反任何格式要求。比如,所有行内数学表达式要用$...$,独立公式用$$...$$,但看起来这个脚本里可能没有数学公式,不过还是要注意其他地方是否符合要求。另外,回答结构要清晰,分步骤帮助用户理解代码。 用户提供的代码看起来是一个处理机器人测试执行的进程管理工具。首先,我会从代码的结构入手,分部分解析。脚本开头有shebang和编码声明,然后是各种库的导入,比如asyncio、argparse、os等,这些库都是用于异步处理、参数解析、文件操作等。接下来,从自定义模块导入ProcessManagerState、QueueRunner、CommInterface,还有加载环境变量的部分。 主程序部分,首先检查日志目录是否存在,不存在则创建。然后使用argparse处理命令行参数,特别是机器人IP地址。接下来是日志文件的配置,包括检查旧日志文件并归档,设置日志格式和处理器。这里需要注意用户可能想了解日志轮转的逻辑,比如如何归档旧日志。 然后验证IP地址的有效性,使用ipaddress模块,如果无效则报错。接下来是异步事件循环的初始化,创建ProcessManagerState、QueueRunner和CommInterface的实例,并启动两个异步任务:queue_item_execution和fetch_rabbitmq。这部分可能涉及异步编程的概念,需要解释asyncio的作用。 信号处理部分,处理SIGTERM和SIGINT信号,优雅关闭进程管理,停止事件循环并设置队列离线。最后运行事件循环直到结束。 用户可能的疑问点包括:各个模块的作用(如ProcessManagerState管理状态,QueueRunner处理队列任务,CommInterface负责通信),异步任务如何协同工作,日志归档机制,信号处理的重要性,以及整个程序的工作流程。 需要确保解释清晰每个部分的功能,以及它们如何组合在一起实现进程管理。同时要注意代码中的异常处理和资源管理,比如目录创建、日志移动、信号处理等,这些都是保证程序健壮性的关键点。 可能还需要提到代码中使用的设计模式,比如依赖注入(将processmanager_state传递给QueueRunner和CommInterface),或者单例模式,但不确定是否有必要深入。重点还是放在功能模块的交互上。 最后,总结整个脚本的作用:作为一个进程管理器,负责协调测试任务的执行,处理来自RabbitMQ的消息,维护状态,并确保在收到终止信号时能优雅关闭。</think>以下是对该Python脚本的逐步分析: $$注意:这是一个基于异步IO的进程管理工具,用于机器人测试执行$$ 1. **基础配置** - 第1-2行:声明Python3解释器路径和UTF-8编码 - 第4-14行:导入依赖库 - `asyncio` 用于异步任务处理 - `argparse` 处理命令行参数 - `os/sys` 系统操作 - `shutil` 文件操作 2. **环境初始化** ```python PROCESSMANAGER_DATA_PATH = '/data' PROCESSMANAGER_LOG_PATH = os.path.join(...) ``` - 创建日志目录结构 `/data/logs/archive` - 使用`os.makedirs()`确保目录存在 3. **参数解析** ```python parser = argparse.ArgumentParser(description='...') parser.add_argument('--robot_ip', type=str, ...) ``` - 必须参数 `--robot_ip` 验证格式后用于日志命名 4. **日志管理** ```python if os.path.exists(logfile_path): shutil.move(...) # 归档旧日志 logging.basicConfig(...) # 同时输出到文件和终端 ``` - 使用时间戳实现日志轮转 - 日志格式:`时间 - 名称 - 级别 - 消息` 5. **核心组件** - `ProcessManagerState`:状态管理(单例模式) - `QueueRunner`:测试任务队列执行器 - `CommInterface`:通过RabbitMQ通信的接口 ```python processmanager_state = ProcessManagerState(...) queue_runner = QueueRunner(...) comm_interface = CommInterface(...) ``` 6. **异步任务启动** ```python asyncio.ensure_future(queue_runner.queue_item_execution()) asyncio.ensure_future(comm_interface.fetch_rabbitmq()) ``` - 并行运行两个协程: 1. 持续执行队列中的测试任务 2. 监听消息队列获取新任务 7. **信号处理** ```python def handle_sigterm(): loop.stop() processmanager_state.set_queue_offline() ``` - 捕获`SIGTERM/SIGINT`信号时: 1. 停止事件循环 2. 设置队列离线状态 3. 优雅退出进程 8. **运行架构** ```mermaid graph LR A[主线程] --> B[事件循环] B --> C[QueueRunner] B --> D[CommInterface] C -->|任务状态更新| E[ProcessManagerState] D -->|接收新任务| E ``` **典型工作流程**: 1. 通过命令行启动:`python3 processmanager.py --robot_ip=192.168.1.100` 2. 初始化日志系统和状态管理器 3. 开始监听RabbitMQ消息队列 4. 当收到新测试任务时,加入执行队列 5. QueueRunner按顺序执行队列中的任务 6. 收到终止信号时保存状态并清理资源 $$注意:该代码需要配合state.py/queue_runner.py/comm_interface.py等模块才能完整运行$$
阅读全文

相关推荐

#!/usr/local/bin/python3 # -*- coding: utf-8 -*- # @Time : 2023/6/11 14:19 # @Author : 志在星空 # @File : jietu12.py # @Software: PyCharm import base64 import urllib import requests import json API_KEY = "jHxdMDf2bhEPxcG6zFoRUejo" SECRET_KEY = "QF5GO9eLQncG2Zr1MKrxLOWvKAkAtVfI" def main(): # 发送 OCR 请求并获取响应 url = "https://aip.baidubce.com/rest/2.0/ocr/v1/general?access_token=" + get_access_token() # payload = 'image=%2F9j%2F4AAQ' headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' } image_path = "11.jpg" with open(image_path, "rb") as image_file: image_data = image_file.read() response = requests.post(url, headers=headers, data=image_data) # response = requests.request("POST", url, headers=headers, data=payload) # 解析响应并提取 words_result 中的所有 words 字段 result = json.loads(response.text) print(result) # words_list = [item['words'] for item in result['words_result']] # print(words_list) # # # 打印所有提取到的内容 # for words in words_list: # print(words) def get_file_content_as_base64(path, urlencoded=False): """ 获取文件base64编码 :param path: 文件路径 :param urlencoded: 是否对结果进行urlencoded :return: base64编码信息 """ with open(path, "rb") as f: content = base64.b64encode(f.read()).decode("utf8") if urlencoded: content = urllib.parse.quote_plus(content) return content def get_access_token(): """ 使用 AK,SK 生成鉴权签名(Access Token) :return: access_token,或是None(如果错误) """ url = "https://aip.baidubce.com/oauth/2.0/token" params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY} return str(requests.post(url, params=params).json().get("access_token")) if __name__ == '__main__': main()运行出现{'log_id': 1667825949995168966, 'error_msg': 'param image not exist', 'error_code': 216101},怎么办

#!/usr/local/bin/python3 # -- coding: utf-8 -- # @Time : 2023/6/11 14:19 # @Author : 志在星空 # @File : jietu12.py # @Software: PyCharm import base64 import urllib import requests import json API_KEY = "jHxdMDf2bhEPxcG6zFoRUejo" SECRET_KEY = "QF5GO9eLQncG2Zr1MKrxLOWvKAkAtVfI" def main(): # 发送 OCR 请求并获取响应 url = "https://aip.baidubce.com/rest/2.0/ocr/v1/general?access_token=" + get_access_token() # payload = 'image=%2F9j%2F4AAQ' headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' } image_path = "11.jpg" with open(image_path, "rb") as image_file: image_data = image_file.read() response = requests.post(url, headers=headers, data=image_data) # response = requests.request("POST", url, headers=headers, data=payload) # 解析响应并提取 words_result 中的所有 words 字段 result = json.loads(response.text) print(result) # words_list = [item['words'] for item in result['words_result']] # print(words_list) # # # 打印所有提取到的内容 # for words in words_list: # print(words) def get_file_content_as_base64(path, urlencoded=False): """ 获取文件base64编码 :param path: 文件路径 :param urlencoded: 是否对结果进行urlencoded :return: base64编码信息 """ with open(path, "rb") as f: content = base64.b64encode(f.read()).decode("utf8") if urlencoded: content = urllib.parse.quote_plus(content) return content def get_access_token(): """ 使用 AK,SK 生成鉴权签名(Access Token) :return: access_token,或是None(如果错误) """ url = "https://aip.baidubce.com/oauth/2.0/token" params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY} return str(requests.post(url, params=params).json().get("access_token")) if name == 'main': main()运行出现{'log_id': 1667825949995168966, 'error_msg': 'param image not exist', 'error_code': 216101},请修改一下

#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy import tf import matplotlib.pyplot as plt from nav_msgs.msg import Path from sensor_msgs.msg import LaserScan import numpy as np # 存储路径和障碍物的全局变量 path_points = [] obstacles = [] # 初始化 tf.TransformListener rospy.init_node('plotter_node') listener = tf.TransformListener() def path_callback(msg): global path_points path_points = [] # 直接读取map坐标系下的路径点 for pose in msg.poses: x = pose.pose.position.x y = pose.pose.position.y path_points.append((x, y)) def scan_callback(msg): global obstacles obstacles = [] try: # 获取当前雷达数据时间戳对应的坐标变换 (trans, rot) = listener.lookupTransform("map", msg.header.frame_id, msg.header.stamp) # 将雷达数据转换到map坐标系 rotation_matrix = tf.transformations.quaternion_matrix(rot) translation = np.array(trans) angle_min = msg.angle_min angle_increment = msg.angle_increment for i, r in enumerate(msg.ranges): if msg.range_min < r < msg.range_max: # 计算极坐标转笛卡尔坐标(相对于雷达坐标系) angle = angle_min + i * angle_increment x = r * np.cos(angle) y = r * np.sin(angle) # 转换到map坐标系 point = np.dot(rotation_matrix, [x, y, 0, 1]) # 齐次坐标 point = point[:3] + translation obstacles.append((point[0], point[1])) except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e: rospy.logwarn("坐标转换失败: %s", e) def main(): rospy.init_node('path_obstacle_visualizer') rospy.Subscriber('/move_base/GlobalPlanner/plan', Path, path_callback) rospy.Subscriber('/scan', LaserScan, scan_callback) # 初始化可视化界面 plt.ion() fig, ax = plt.subplots(figsize=(8, 8)) ax.set_title("Real-time Path and Obstacles") ax.set_xlabel("X (m)") ax.set_ylabel("Y (m)") ax.grid(True) path_line, = ax.plot([], [], 'b-', label="Path") obstacles_scatter = ax.scatter([], [], c='r', s=10, label="Obstacles") ax.legend() # 动态调整坐标轴范围 ax.set_autoscale_on(True) while not rospy.is_shutdown(): # 更新路径数据 if path_points: x, y = zip(*path_points) path_line.set_data(x, y) ax.relim() ax.autoscale_view() # 更新障碍物数据 if obstacles: x, y = zip(*obstacles) obstacles_scatter.set_offsets(np.c_[x, y]) # 刷新图像 fig.canvas.draw_idle() plt.pause(0.05) if __name__ == '__main__': try: main() except rospy.ROSInterruptException: pass怎么修改

#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import sys import argparse from collections import defaultdict # 配置路径 NGINX_CONF_DIR = "/usr/local/tengine/conf" SERVER_DIR = os.path.join(NGINX_CONF_DIR, "vhosts/servername") UPSTREAM_DIR = os.path.join(NGINX_CONF_DIR, "vhosts/upstream") def parse_config_file(file_path): """ 解析配置文件,提取非注释内容 :param file_path: 配置文件路径 :return: 解析后的配置内容列表 """ config_lines = [] try: with open(file_path, 'r', encoding='utf-8') as f: in_block_comment = False for line in f: # 处理块注释 if '/*' in line: in_block_comment = True # 跳过注释行和空行 stripped = line.strip() if not stripped or stripped.startswith('#') or in_block_comment: if '*/' in line: in_block_comment = False continue # 移除行尾注释 clean_line = re.sub(r'#.*$', '', line).strip() if clean_line: config_lines.append(clean_line) except Exception as e: print(f"Error reading {file_path}: {str(e)}") return config_lines def find_upstreams(upstream_dir): """ 查找所有 upstream 配置 :param upstream_dir: upstream 配置目录 :return: upstream 名称到后端主机的映射字典 """ upstreams = defaultdict(list) for filename in os.listdir(upstream_dir): if not filename.endswith('.conf'): continue file_path = os.path.join(upstream_dir, filename) lines = parse_config_file(file_path) current_upstream = None for line in lines: # 匹配 upstream 定义 upstream_match = re.match(r'upstream\s+(\S+)\s*{', line) if upstream_match: current_upstream = upstream_match.group(1) continue # 匹配 server 定义 if current_upstream and line.startswith('server'): server_match = re.search(r'server\s+(\S+);', line) if server_match: upstreams[current_upstream].append(server_match.group(1)) return upstreams def find_server_backends(server_dir, upstreams): """ 查找所有 server 配置及其对应的后端主机 :param server_dir: server 配置目录 :param upstreams: upstream 名称到后端主机的映射 :return: 站点到后端主机的映射字典 """ site_backends = {} for filename in os.listdir(server_dir): if not filename.endswith('.conf'): continue file_path = os.path.join(server_dir, filename) lines = parse_config_file(file_path) server_name = None upstream_name = None for line in lines: # 提取 server_name server_match = re.match(r'server_name\s+(.*?);', line) if server_match: server_name = server_match.group(1).split()[0] # 取第一个域名 continue # 提取 proxy_pass 中的 upstream proxy_match = re.search(r'proxy_pass\s+http://(\w+);', line) if proxy_match: upstream_name = proxy_match.group(1) continue # 如果找到匹配的 server 和 upstream if server_name and upstream_name: if upstream_name in upstreams: site_backends[server_name] = { 'config_file': filename, 'upstream': upstream_name, 'backends': upstreams[upstream_name] } else: site_backends[server_name] = { 'config_file': filename, 'error': f"Upstream '{upstream_name}' not found" } return site_backends def format_output(site_backends): """ 格式化输出结果 :param site_backends: 站点到后端主机的映射字典 """ print("\nNginx Site Backend Mapping Report") print("=" * 50) for site, info in site_backends.items(): print(f"\nSite: {site}") print(f"Config File: {info['config_file']}") if 'error' in info: print(f" Error: {info['error']}") else: print(f" Upstream: {info['upstream']}") print(" Backend Servers:") for i, backend in enumerate(info['backends'], 1): print(f" {i}. {backend}") print("\n" + "=" * 50) print(f"Total sites found: {len(site_backends)}") def main(): parser = argparse.ArgumentParser(description='Find Nginx site backend servers') parser.add_argument('--site', help='Specific site to search for') args = parser.parse_args() # 检查目录是否存在 for path in [NGINX_CONF_DIR, SERVER_DIR, UPSTREAM_DIR]: if not os.path.exists(path): print(f"Error: Directory not found: {path}") sys.exit(1) # 查找所有 upstream upstreams = find_upstreams(UPSTREAM_DIR) print(f"Found {len(upstreams)} upstream configurations") # 查找所有 server 配置 site_backends = find_server_backends(SERVER_DIR, upstreams) # 处理特定站点查询 if args.site: if args.site in site_backends: print(f"\nBackend for site '{args.site}':") info = site_backends[args.site] if 'backends' in info: for backend in info['backends']: print(f" - {backend}") else: print(f" Error: {info.get('error', 'Unknown error')}") else: print(f"\nSite '{args.site}' not found in configurations") else: format_output(site_backends) if __name__ == "__main__": main() ---------------------------- nginx的配置在nginx.conf中

#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy import matplotlib.pyplot as plt from nav_msgs.msg import OccupancyGrid, Odometry from matplotlib.animation import FuncAnimation import math # 添加数学库 class TrajectoryPlotter: def __init__(self): self.fig, self.ax = plt.subplots() self.map_data = None self.traj_x, self.traj_y = [], [] self.current_pose = None self.traj_length = 0.0 # 新增轨迹长度变量 # ROS初始化 rospy.init_node('trajectory_visualizer') self.map_sub = rospy.Subscriber('/map', OccupancyGrid, self.map_cb) self.odom_sub = rospy.Subscriber('/odom', Odometry, self.odom_cb) # 初始化文本对象 self.length_text = self.ax.text(0.05, 0.95, '', transform=self.ax.transAxes, bbox=dict(facecolor='white', alpha=0.8)) # 新增文本框 self.ani = FuncAnimation(self.fig, self.update_plot, interval=200) def map_cb(self, msg): """ 地图数据处理 """ self.map_data = { 'resolution': msg.info.resolution, 'width': msg.info.width, 'height': msg.info.height, 'origin': msg.info.origin, 'data': msg.data } self.plot_map() def odom_cb(self, msg): """ 里程计数据处理 """ self.current_pose = msg.pose.pose x = msg.pose.pose.position.x y = msg.pose.pose.position.y # 计算轨迹长度 if len(self.traj_x) > 0: # 当有历史点时计算距离 dx = x - self.traj_x[-1] dy = y - self.traj_y[-1] self.traj_length += math.hypot(dx, dy) # 累加欧式距离 self.traj_x.append(x) self.traj_y.append(y) def plot_map(self): """ 绘制静态地图 """ if not self.map_data: return # 创建网格坐标系 grid = [[0]*self.map_data['width'] for _ in range(self.map_data['height'])] for i in range(self.map_data['height']): for j in range(self.map_data['width']): grid[i][j] = self.map_data['data'][i*self.map_data['width']+j] # 转换为物理坐标系 origin_x = self.map_data['origin'].position.x origin_y = self.map_data['origin'].position.y self.ax.imshow(grid, cmap='gray', origin='lower', extent=[origin_x, origin_x + self.map_data['width']*self.map_data['resolution'], origin_y, origin_y + self.map_data['height']*self.map_data['resolution']]) def update_plot(self, frame): """ 动态更新轨迹 """ if self.current_pose: # 更新轨迹绘图 self.ax.plot(self.traj_x, self.traj_y, 'r-', linewidth=2) self.ax.plot(self.traj_x[-1], self.traj_y[-1], 'bo', markersize=8) # 更新轨迹长度显示(保留两位小数) self.length_text.set_text(f'Trajectory Length: {self.traj_length:.2f} m') # 更新文本内容 # 设置坐标轴和标题 self.ax.set_xlabel('X (m)') self.ax.set_ylabel('Y (m)') self.ax.set_title('Real-time Trajectory') plt.draw() if __name__ == '__main__': tp = TrajectoryPlotter() plt.show()在这段代码的基础上我还想订阅/move_base/local_costmap/costmap来得到局部障碍物的地图

#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import tf import threading import rospy import sys import math from sensor_msgs.msg import LaserScan import actionlib from actionlib_msgs.msg import * from geometry_msgs.msg import Pose, PoseWithCovarianceStamped, Point, Quaternion, Twist from std_msgs.msg import Int32 from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal import roslib import tf2_ros from geometry_msgs.msg import TransformStamped import numpy as np from std_msgs.msg import String, Float32MultiArray import ctypes from ctypes.util import find_library import os from playsound import playsound from ar_track_alvar_msgs.msg import AlvarMarkers #from robot_voice.msg import Targettt # 自定义消息类型 key = 0 F_list = [] global xy global task_xy global qtn_list_xy global qtn_list_task_xy global pose_num_xy global pose_num_task_xy global yaw global now_pose_X global now_pose_Y #xy =[[0.16,-0.30,3.14], [3.0,-0.32,0], [3.03,-3.02,-0], [0.01, -2.95, 3.14],[0.5, -2.55, 3.14]] xy = [[0.21,-0.81,3.12],[0.82,-0.28,1.54], [0.821,-0.80,1.491], [2.95,-2.44,0.02],[2.37,-3.15,-1.574]] #task_xy =[[0.17,-1.22,3.14],[1.08,-0.25,3.14],[2.17,-0.27,0], [2.93,-1.14,0], [2.96,-2.15,0],[2.10,-3.04,0],[1.09,-3.0,3.14],[0.23,-2.10,3.14]] task_xy=[[0.821,-0.80,1.491],[2.55,-0.17,1.54], [2.99,-0.85,0.05], [2.95,-2.44,0.02],[2.37,-3.15,-1.574]] pid_stop_xy =[[0.395,-1.06,0],[0.416,-0.856,-90],[-0.99,-0.826,90], [0.52,-0.49,180], [0.52,0.50,180],[-0.54,-0.49,180],[-0.50,0.49,0],[0.51,-0.49,0]] qtn_list_xy = [] #四元数列表 qtn_list_task_xy = [] #任务点四元数列表 pose_num_xy= len(xy) pose_num_task_xy=len(task_xy) yaw = 0 global move_base now_pose_X=0 now_pose_Y=0 # -----------pid函数及其参数------------ # 注: # 1. global w_kp global w_ki global w_kd global w_target global w_e_all global w_last_e w_kp = 2 #2 w_ki = 0.001 w_kd = 0.005 #0 w_e_all = 0 w_last_e = 0 global x_f,x_b,y_l,y_r x_f=0.0 x_b=0.0 y_l=0.0 y_r=0.0 def w_pid_cal(pid_target,dis): global w_kp global w_ki global w_kd global w_e_all global w_last_e e = dis -pid_target #if e>-0.1 and e<0.1: #e = 0 w_e_all = w_e_all+e pid = w_kp*e+w_ki*w_e_all+w_kd*(e-w_last_e) w_last_e = e return pid global p_kp global p_ki global p_kd global p_e_all global p_last_e global p_pid p_kp = -7 p_ki = 0 p_kd = -3 p_e_all = 0 p_last_e = 0 p_pid = 0 def p_pid_cal(pid_target,pose): global p_kp global p_ki global p_kd global p_e_all global p_last_e ture_pose = (pose/3.14159265359*180.0+180.0)%360 if pid_target==0: if ture_pose>0 and ture_pose<180: pid_target=0 if ture_pose>180 and ture_pose<360: pid_target=360 e = ture_pose -pid_target # print(e) p_e_all = p_e_all+e pid = p_kp*e+p_ki*p_e_all+p_kd*(e-p_last_e) #rospy.loginfo("e %f",e) p_last_e = e return pid global point_kp global point_ki global point_kd global point_e_all global point_last_e global point_pid point_kp = -3 point_ki = 0 point_kd = 0 point_e_all = 0 point_last_e = 0 point_pid = 0 def point_pid(pid_target_x,ture): global point_kp global point_ki global point_kd global point_e_all global point_last_e e = ture -pid_target_x point_e_all = point_e_all+e pid = point_kp*e+point_ki*point_e_all+point_kd*(e-point_last_e) point_last_e = e return pid def limt(limt,target): if limt>target: limt=target if limt<-target: limt=-target return limt # ----------pid停车------- def pid_stop2(target_x,target_y,target_yaw): global w_kp,w_ki,w_kd,w_e_all w_kp = 1.5 #1.5 w_ki = 0.005 #0.005 w_kd = 0.006 #0.01 w_e_all=0 count =0 pid_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=7) rate_loop_pid=rospy.Rate(7) speed = Twist() wich_x = 0 wich_y = 0 time = 0 while not rospy.is_shutdown(): rate_loop_pid.sleep() time+=1 if target_x>0: pid_x = w_pid_cal(target_x,x_f) wich_x=x_f if target_x<0: pid_x = w_pid_cal(target_x,-x_b) wich_x=-x_b if target_y>0: pid_y = w_pid_cal(target_y,y_l) wich_y = y_l if target_y<0: pid_y = w_pid_cal(target_y,-y_r) wich_y = -y_r p_pid = p_pid_cal(target_yaw,yaw) #if abs(wich_x)<0.8: #speed.linear.y = 0 #else: speed.linear.y = pid_y speed.linear.x = pid_x speed.angular.z = p_pid/180.0*3.14159265359 w_e_all=limt(w_e_all,5) print("w_e_all:",w_e_all) print("wich_x:",wich_x,"wich_y:",wich_y) if time>=150: w_e_all=0 break #if abs(wich_x-target_x)<=0.05 and abs(wich_y-target_y)<=0.07 and abs(target_yaw-(yaw/3.1415926*180+180))<=5: #w_e_all=5 #w_ki = 0.001 if abs(wich_x-target_x)<=0.03 and abs(wich_y-target_y)<=0.03 and abs(target_yaw-(yaw/3.1415926*180+180))<=3: w_e_all=0 count+=1 if count>=6: speed.linear.x = 0 speed.linear.y = 0 speed.linear.z = 0 pid_vel_pub.publish(speed) #rospy.sleep(0.5) w_e_all=0 break pid_vel_pub.publish(speed) def pid_stop(target_x,target_y,target_yaw): global w_kp,w_ki,w_kd,w_e_all w_kp = 1.5 #1.5 w_ki = 0.005 #0.005 w_kd = 0.006 #0.01 w_e_all=0 count =0 pid_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=7) rate_loop_pid=rospy.Rate(7) speed = Twist() wich_x = 0 wich_y = 0 time = 0 while not rospy.is_shutdown(): rate_loop_pid.sleep() time+=1 if target_x>0: pid_x = w_pid_cal(target_x,x_f) wich_x=x_f if target_x<0: pid_x = w_pid_cal(target_x,-x_b) wich_x=-x_b if target_y>0: pid_y = w_pid_cal(target_y,y_l) wich_y = y_l if target_y<0: pid_y = w_pid_cal(target_y,-y_r) wich_y = -y_r p_pid = p_pid_cal(target_yaw,yaw) #if abs(wich_x)<0.8: #speed.linear.y = 0 #else: speed.linear.y = pid_y speed.linear.x = pid_x speed.angular.z = p_pid/180.0*3.14159265359 w_e_all=limt(w_e_all,5) print("w_e_all:",w_e_all) print("wich_x:",wich_x,"wich_y:",wich_y) if time>=150: w_e_all=0 break #if abs(wich_x-target_x)<=0.05 and abs(wich_y-target_y)<=0.07 and abs(target_yaw-(yaw/3.1415926*180+180))<=5: #w_e_all=5 #w_ki = 0.001 if abs(wich_x-target_x)<=0.08 and abs(wich_y-target_y)<=0.08 and abs(target_yaw-(yaw/3.1415926*180+180))<=5: w_e_all=0 count+=1 if count>=3: speed.linear.x = 0 speed.linear.y = 0 speed.linear.z = 0 pid_vel_pub.publish(speed) #rospy.sleep(0.5) w_e_all=0 break pid_vel_pub.publish(speed) def pid_go(target_x,target_y,target_yaw): global point_kp, vision_result, dis_trun_off,point_ki global w_kp,w_ki,w_kd w_kp = 2 #2 w_ki = 0 w_kd = 0.008#0 count =0 pid_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=7) rate_loop_pid=rospy.Rate(7) speed = Twist() wich_x = 0 wich_y = 0 while not rospy.is_shutdown(): if target_x>0: pid_x = w_pid_cal(target_x,x_f) wich_x=x_f if target_x<0: pid_x = w_pid_cal(target_x,-x_b) wich_x=-x_b if target_y>0: pid_y = w_pid_cal(target_y,y_l) wich_y = y_l if target_y<0: pid_y = w_pid_cal(target_y,-y_r) wich_y = -y_r p_pid = p_pid_cal(target_yaw,yaw) if abs(target_x-wich_x)<0.2 and abs(target_y-wich_y)<0.2: speed.linear.x = 0 speed.linear.y = 0 else: speed.linear.y = pid_y speed.linear.x = pid_x speed.angular.z = p_pid/180.0*3.14159265359 pid_vel_pub.publish(speed) rate_loop_pid.sleep() print("x_f:",x_f,"y_l:",y_l) #print(abs(target_yaw-yaw/3.1415926*180)) if vision_result != 0: # rospy.sleep(0.3) # thread_dis.join() break def pid_go2(target_x,target_y,target_yaw): global vision_result global w_kp,w_ki,w_kd,w_e_all print("--------开始pid_go2--------") w_kp = 2 #2 w_ki = 0 w_kd = 0.01 #0 count =0 w_e_all=0 pid_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=7) rate_loop_pid=rospy.Rate(7) speed = Twist() wich_x = 0 wich_y = 0 while not rospy.is_shutdown(): if target_x>0: pid_x = w_pid_cal(target_x,x_f) wich_x=x_f if target_x<0: pid_x = w_pid_cal(target_x,-x_b) wich_x=-x_b if target_y>0: pid_y = w_pid_cal(target_y,y_l) wich_y = y_l if target_y<0: pid_y = w_pid_cal(target_y,-y_r) wich_y = -y_r p_pid = p_pid_cal(target_yaw,yaw) if abs(target_x-wich_x)<0.2 and abs(target_y-wich_y)<0.2: speed.linear.x = 0 speed.linear.y = 0 else: if abs(wich_x)>0.55: speed.linear.y = 0.03*pid_y else: speed.linear.y = pid_y speed.linear.x = pid_x speed.angular.z = p_pid/180.0*3.14159265359 pid_vel_pub.publish(speed) rate_loop_pid.sleep() if vision_result != 0: w_e_all=0 # rospy.sleep(0.3) # thread_dis.join() break print("--------结束pid_go2--------") def pid_turn(target_x,target_y,target_yaw): global point_kp pid_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=7) rate_loop_pid=rospy.Rate(7.2) speed = Twist() while not rospy.is_shutdown(): map_pid_x = point_pid(target_x,now_pose_X) map_pid_y = point_pid(target_y,now_pose_Y) p_pid = p_pid_cal(target_yaw,yaw) if target_yaw!=180: speed.linear.x = 0 speed.linear.y = 0 else: speed.linear.x = 0 speed.linear.y = 0 speed.angular.z = 1.5*p_pid/180.0*3.14159265359 pid_vel_pub.publish(speed) rate_loop_pid.sleep() # print(abs(target_yaw-yaw/3.1415926*180)) if abs(target_yaw-(yaw/3.1415926*180+180))<=50: #rospy.sleep(0.3) break # -----------pid函数及其参数------------ # -----------获取视觉消息--------------- global object_vision object_vision = 0 def vision_callback(msg): global object_vision if msg.data: print(msg.data) object_vision = msg.data def vision_listen(): rospy.Subscriber('/detected_label',Int32,vision_callback,queue_size=10) #订阅视觉话题 rospy.spin() global ar_sign global task_vision ar_sign = 0 def ar_callback(msg): global object_vision , task_vision ar_sign = 0 if len(msg.markers): print("----------------------------------ar") print(msg.markers[0].id) task_vision = msg.markers[0].id ar_sign = 1 def vision_ar(): rospy.Subscriber('/ar_pose_marker',AlvarMarkers,ar_callback,queue_size=10) #订阅视觉话题 rospy.spin() global ai_vision ai_vision = 0 global ai_sign ai_sign = 0 def ai_callback(msg): global object_vision , task_vision if (msg.data): print("++++++++++++++++++++++++++++++++++++ai") print("ai回复",msg.data) task_vision = msg.data ai_sign = 1 def vision_ai(): rospy.Subscriber('/ai_result',Int32,ai_callback,queue_size=10) #订阅视觉话题 rospy.spin() # -----------获取视觉消息--------------- #------------------- 雷达话题订阅线程 ---------------- global scan_data scan_data = [] def get_valid_distance(scan, start_index, direction, angle_resolution): """ 从指定的起始角度开始,以给定的方向和角度分辨率寻找有效的激光数据, 并计算修正后的距离。 参数: - scan: 激光雷达扫描数据 - start_index: 起始角度索引 - direction: 搜索方向(1 表示顺时针,-1 表示逆时针) - angle_resolution: 角度分辨率(每个索引代表的角度) 返回值: - 修正后的有效距离 """ max_angle = len(scan.ranges) for i in range(max_angle): index = (start_index + i * direction) % max_angle if scan.ranges[index] != float('inf'): distance = scan.ranges[index] angle = np.radians((index - start_index) * angle_resolution) distance_corrected = distance * np.cos(angle) return distance_corrected return float('inf') def get_laserscan(scan): """ 激光雷达回调函数,计算前后左右方向的有效激光距离并进行修正。 参数: - scan: 激光雷达扫描数据 """ global x_f, x_b, y_l, y_r, yaw, scan_data scan_data = scan.ranges front_index = 360 # 前方角度索引 angle_resolution = 0.5 # 每个索引代表的角度(假设为0.5度) # 找到有效距离并进行修正 x_f = get_valid_distance(scan, front_index, 1, angle_resolution) # 从前方开始向右搜索 x_b = get_valid_distance(scan, 0, 1, angle_resolution) # 从后方开始向右搜索 y_l = get_valid_distance(scan, 540, -1, angle_resolution) # 从左侧开始向左搜索 y_r = get_valid_distance(scan, 180, 1, angle_resolution) # 从右侧开始向右搜索 #print("x_f:", x_f, "x_b:", x_b, "y_l:", y_l, "y_r:", y_r) def laser_listen(): rospy.Subscriber('/scan_filtered', LaserScan,get_laserscan,queue_size=7) rospy.spin() #------------------- 雷达话题订阅线程 ---------------- # -----------------------pid回家-------------------------- def pid_loop(): print("---------启动回家--------") global yaw global w_kp,w_ki,w_kd,w_e_all w_e_all=0 w_kp = 0.8 #2 w_ki = 0 w_kd = 0.00 #0 pid_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=7) rate_loop_pid=rospy.Rate(7) speed = Twist() while not rospy.is_shutdown(): pid_x = w_pid_cal(0.155,x_f) #pid_y = w_pid_cal(-0.35,-y_r) pid_y = w_pid_cal(0.155,y_l) p_pid = p_pid_cal(0,yaw) print("x_f",x_f) print("y_l",y_l) print("yaw",yaw) speed.linear.x = pid_x speed.linear.y = pid_y speed.angular.z = p_pid/180.0*3.14159265359 pid_vel_pub.publish(speed) rate_loop_pid.sleep() if abs(x_f-0.1)<0.18 and abs(y_l-0.1)<0.18 and abs(0-(yaw/3.1415926*180+180))<=5: speed.linear.x = 0 speed.linear.y = 0 pid_vel_pub.publish(speed) break print("---------结束回家--------") # -----------------------pid回家-------------------------- # ---------------vioce 语音播报----------------- def play_voice(number): playsound(str(number)+".mp3") def play_voice_begin(numbers): print("获取的任务目标为:"+str(target_point_arr)) numbers=sorted(numbers) playsound(str(numbers[0])+"_"+str(numbers[1])+"_"+str(numbers[2])+".mp3") print("播放文件为:"+str(numbers[0])+"_"+str(numbers[1])+"_"+str(numbers[2])+".mp3") # ---------------vioce 语音播报----------------- # -----------------导航点---------------------- # 1.本区域共三个函数导航实时距离获取,发布导航点,控制导航点发布 # 2.控制导航点发布部分默认是逐一发布点列表中的点,需要请注释并自行更改 # 发布目标点 # 控制导航点发布 global count_dis_times count_dis_times=0 def dis_func(x, y,min_dis): global dis_trun_off,now_pose_X,now_pose_Y,distance, vision_result,target_point_arr,times_voice_play,count_dis_times print("dis_func函数已启动:"+str(count_dis_times)+"次") count_dis_times+=1 # lock = threading.RLock() dis_fun_rate=rospy.Rate(10) while not rospy.is_shutdown(): dis_fun_rate.sleep() car_to_map_x=now_pose_X car_to_map_y=now_pose_Y # 计算小车到目标点的距离 distance = pow(pow( car_to_map_x - x, 2) + pow(car_to_map_y - y, 2), 0.5) #print("distance:"+str(distance)) if distance<min_dis: print("reach_goal") rospy.sleep(1) break def goals(x, y, i): min_dis=0.07 global qtn_list_xy,move_base,dis_trun_off,now_pose_X,pid_stop_vision_stop_flag,goal_vision_weight_flag target_point = Pose(Point(x, y, 0), Quaternion(qtn_list_xy[i][0], qtn_list_xy[i][1], qtn_list_xy[i][2], qtn_list_xy[i][3])) goal = MoveBaseGoal() goal.target_pose.pose = target_point goal.target_pose.header.frame_id = 'map' goal.target_pose.header.stamp = rospy.Time.now() rospy.loginfo("Going to: " + str(target_point)) print("goal") move_base.send_goal(goal) def goals_task(x, y, i): min_dis=0.07 global qtn_list_task_xy,move_base,dis_trun_off,now_pose_X target_point = Pose(Point(x, y, 0), Quaternion(qtn_list_task_xy[i][0], qtn_list_task_xy[i][1], qtn_list_task_xy[i][2], qtn_list_task_xy[i][3])) goal = MoveBaseGoal() goal.target_pose.pose = target_point goal.target_pose.header.frame_id = 'map' goal.target_pose.header.stamp = rospy.Time.now() rospy.loginfo("Going to: " + str(target_point)) print("goal") move_base.send_goal(goal) def send_None_goal(): global move_base goal = MoveBaseGoal() move_base.send_goal(goal) # ---------------vioce 语音播报----------------- global times_voice_play times_voice_play=0 def play_voice(number): global times_voice_play playsound(str(number)+".mp3") # ---------------vioce 语音播报----------------- # ----------------- init --------------------- # 1.处理点列表的四元数并放在新的列表 # 2.连接move_base # -------------- def now_pose_xy(): global now_pose_X,now_pose_Y,yaw now_pose=rospy.Rate(10) listener = tf.TransformListener() while not rospy.is_shutdown(): now_pose.sleep() try: (trans, rot) = listener.lookupTransform("map", "base_link", rospy.Time(0)) # 小车坐标 now_pose_X=trans[0] now_pose_Y=trans[1] euler = tf.transformations.euler_from_quaternion(rot) yaw = euler[2] # 第三个元素是yaw角 # print("x",now_pose_X,"y",now_pose_Y,"yaw",yaw) except Exception as e: print(".........") # -------------- def init_fun(): #转换点 global qtn_list_xy,qtn_list_task_xy,pose_num_xy,pose_num_task_xy,move_base for i in range(pose_num_xy): qtn = tf.transformations.quaternion_from_euler(0,0,xy[i][2]) qtn_list_xy.append(qtn) i=0 for i in range(pose_num_task_xy): qtn = tf.transformations.quaternion_from_euler(0,0,task_xy[i][2]) qtn_list_task_xy.append(qtn) #连接move_base—actition move_base = actionlib.SimpleActionClient("move_base", MoveBaseAction) rospy.loginfo("Waiting for move_base action server...") while move_base.wait_for_server(rospy.Duration(5.0)) == 0: rospy.loginfo("Request to connect to move_base server") rospy.loginfo("Be connected successfully") thread_lidar = threading.Thread(target=laser_listen) thread_lidar.start() thread_vision = threading.Thread(target=vision_listen) thread_vision.start() thread_ar = threading.Thread(target=vision_ar) thread_ar.start() thread_ai = threading.Thread(target=vision_ai) thread_ai.start() thread_now_pose = threading.Thread(target=now_pose_xy) thread_now_pose.start() # 初始化 history 变量 history = {"current_value": None, "stable_count": 0, "values": []} required_stable_count = 20 def get_stable_value(new_value, required_stable_count, history, max_history_size=50): # 更新历史记录列表,保留最近的 max_history_size 个值 history["values"].append(new_value) if len(history["values"]) > max_history_size: history["values"].pop(0) # 超过最大限制时移除最早的值 # 检查稳定计数 if new_value == history["current_value"]: history["stable_count"] += 1 else: history["current_value"] = new_value history["stable_count"] = 1 # 如果达到所需的稳定次数,返回稳定的值 if history["stable_count"] >= required_stable_count: return history["current_value"] else: return None def vision_to_get_task(t_long): global object_vision , ar_sign , task_vision,ai_sign task_vision = 0 object_vision = 0 ar_sign=0 ai_sign = 0 t = 0 sure_required = 0 rate_loop_pid=rospy.Rate(10) rospy.set_param('/top_view_shot_node/im_flag', 1) rospy.sleep(3) while not rospy.is_shutdown(): print("find.............",object_vision) if object_vision and not task_vision: sure_required = get_stable_value(object_vision, 50, history) if sure_required is not None: print("sure_required",sure_required) task_vision = sure_required t+=1 if t>=t_long: print("超时") task_vision=999 break if task_vision: print("========",task_vision) oject_vision = 0 break rate_loop_pid.sleep() return task_vision def get_voice(voice): global target_2 global target_3 if (target_2==0 or target_3==0): target_2 = voice.er target_3 = voice.san rospy.loginfo("收到/target话题的反馈: %d %d",target_2, target_3) # 访问自定义消息的字段 def robot_voice(): rospy.Subscriber('/target',Targettt,get_voice,queue_size=1) rospy.spin() def voice_wakeup_publisher(): pub = rospy.Publisher('/voiceWakeup', String, queue_size=10) user_input = input("请输入1开始启动: ") if user_input == "1": msg = String() msg.data = "1" pub.publish(msg) rospy.loginfo("已发布消息: %s", msg.data) # ----------------- init --------------------- if __name__ == '__main__': # 初始化节点 rospy.init_node('move_test', anonymous=True) init_fun() a = input() rospy.loginfo("节点初始化完成,开始执行任务") # 发布唤醒信号 #voice_wakeup_publisher() goals(xy[0][0], xy[0][1], 0) dis_func(xy[0][0], xy[0][1], 0.05) send_None_goal() pid_stop2(pid_stop_xy[0][0], pid_stop_xy[0][1], pid_stop_xy[0][2]) rospy.sleep(5) '''goals(xy[1][0], xy[1][1], 1) dis_func(xy[0][0], xy[0][1], 0.05) send_None_goal() pid_stop2(pid_stop_xy[1][0], pid_stop_xy[1][1], pid_stop_xy[1][2]) #rospy.loginfo("任务2完成") rospy.sleep(5)''' '''goals_task(task_xy[0][0], task_xy[0][1], 0) dis_func(task_xy[0][0], task_xy[0][1], 0.05) # 等待到达目标点附近 send_None_goal() pid_stop2(pid_stop_xy[2][0], pid_stop_xy[2][1], pid_stop_xy[2][2]) rospy.sleep(4)''' rospy.loginfo("任务序列执行完成") 我完整代码是这个应该怎么修改

#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function import os os.environ['QT_X11_NO_MITSHM'] = '1' os.environ['DISPLAY'] = ':0' import rospy import cv2 import numpy as np import time import yaml import logging import threading import signal import sys from sensor_msgs.msg import Image, CameraInfo from cv_bridge import CvBridge, CvBridgeError from robot_package.msg import TR_Arm_Msg # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('robust_hand_eye_calibration') class RobustCalibrator: def __init__(self): rospy.init_node('robust_hand_eye_calibration', anonymous=True) # 参数配置 self.pattern_size = rospy.get_param('~pattern_size', (6, 8)) self.square_size = rospy.get_param('~square_size', 0.02) self.min_poses = rospy.get_param('~min_poses', 15) self.max_poses = rospy.get_param('~max_poses', 20) # 数据存储 self.gripper_poses = [] self.target_poses = [] self.images = [] self.camera_info = None self.T_cam_end = None # ROS工具 self.bridge = CvBridge() self.current_image = None self.current_arm_pose = None self.last_corners = None # 生成世界坐标系点 self.objp = np.zeros((self.pattern_size[0]*self.pattern_size[1], 3), np.float32) self.objp[:, :2] = np.mgrid[0:self.pattern_size[0], 0:self.pattern_size[1]].T.reshape(-1, 2) * self.square_size logger.info("鲁棒手眼标定系统已启动") # 订阅者 rospy.Subscriber("/ascamera/rgb0/image", Image, self.image_callback) rospy.Subscriber("/ascamera/rgb0/camera_info", CameraInfo, self.camera_info_callback) rospy.Subscriber("/TR_Arm_topic", TR_Arm_Msg, self.arm_pose_callback) # 创建调试图像发布者 self.debug_pub = rospy.Publisher("/calibration/debug_image", Image, queue_size=1) # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) # 状态监控线程 self.monitor_thread = threading.Thread(target=self.monitor_system) self.monitor_thread.daemon = True self.monitor_thread.start() def signal_handler(self, signum, frame): """处理中断信号""" logger.warning("收到中断信号 %d,正在安全关闭...", signum) self.cleanup_resources() rospy.signal_shutdown("外部中断") sys.exit(0) def monitor_system(self): """系统监控线程""" while not rospy.is_shutdown(): # 检查系统资源 try: # 监控GPU内存(适用于NVIDIA Jetson) if os.path.exists('/sys/devices/gpu.0'): with open('/sys/devices/gpu.0/load', 'r') as f: gpu_load = int(f.read().strip()) if gpu_load > 90: logger.warning("GPU负载过高: %d%%,考虑降低图像分辨率", gpu_load) # 监控CPU温度 if os.path.exists('/sys/class/thermal/thermal_zone0/temp'): with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f: temp = int(f.read().strip()) / 1000.0 if temp > 75: logger.warning("CPU温度过高: %.1f°C,暂停处理10秒", temp) rospy.sleep(10.0) rospy.sleep(5.0) # 每5秒检查一次 except Exception as e: logger.error("监控错误: %s", e) rospy.sleep(10.0) def cleanup_resources(self): """清理资源""" logger.info("清理系统资源...") try: # 关闭所有OpenCV窗口 cv2.destroyAllWindows() # 保存当前进度 if len(self.gripper_poses) > 0: self.save_progress() logger.info("资源清理完成") except Exception as e: logger.error("清理资源时出错: %s", e) def save_progress(self, filename="calibration_progress.yaml"): """保存当前进度""" try: data = { 'gripper_poses': [pose.tolist() for pose in self.gripper_poses], 'target_poses': [pose.tolist() for pose in self.target_poses], 'num_poses': len(self.gripper_poses), 'last_update': time.strftime("%Y-%m-%d %H:%M:%S") } with open(filename, 'w') as f: yaml.dump(data, f, default_flow_style=False) logger.info("标定进度已保存至: %s", filename) return True except Exception as e: logger.error("保存进度失败: %s", e) return False def load_progress(self, filename="calibration_progress.yaml"): """加载之前保存的进度""" try: if not os.path.exists(filename): logger.warning("进度文件不存在: %s", filename) return False with open(filename, 'r') as f: data = yaml.safe_load(f) self.gripper_poses = [np.array(pose) for pose in data['gripper_poses']] self.target_poses = [np.array(pose) for pose in data['target_poses']] logger.info("已从 %s 加载进度: %d 个位姿", filename, len(self.gripper_poses)) return True except Exception as e: logger.error("加载进度失败: %s", e) return False def image_callback(self, msg): try: # 使用独立线程处理图像,避免阻塞主线程 threading.Thread(target=self.process_image, args=(msg,)).start() except Exception as e: logger.error("图像处理线程错误: %s", e) def process_image(self, msg): """在独立线程中处理图像""" try: self.current_image = self.bridge.imgmsg_to_cv2(msg, "bgr8") self.detect_corners() except CvBridgeError as e: logger.error("图像转换错误: %s", e) def detect_corners(self): """检测角点并发布调试图像""" if self.current_image is None or self.camera_info is None: return try: gray = cv2.cvtColor(self.current_image, cv2.COLOR_BGR2GRAY) ret, corners = cv2.findChessboardCorners(gray, self.pattern_size, None) if ret: criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001) corners_refined = cv2.cornerSubPix(gray, corners, (11,11), (-1,-1), criteria) self.last_corners = corners_refined # 创建调试图像 debug_img = self.current_image.copy() cv2.drawChessboardCorners(debug_img, self.pattern_size, corners_refined, ret) # 添加状态信息 status_text = f"Corners Detected [{len(self.gripper_poses)}/{self.max_poses}]" cv2.putText(debug_img, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # 发布调试图像 try: debug_msg = self.bridge.cv2_to_imgmsg(debug_img, "bgr8") self.debug_pub.publish(debug_msg) except CvBridgeError as e: logger.error("发布调试图像错误: %s", e) except Exception as e: logger.error("角点检测错误: %s", e) def camera_info_callback(self, msg): if self.camera_info is None: self.camera_info = { 'K': np.array(msg.K).reshape(3,3), 'D': np.array(msg.D), 'width': msg.width, 'height': msg.height } logger.info("相机内参已获取") def arm_pose_callback(self, msg): if len(msg.homogeneousMatrix) == 16: self.current_arm_pose = np.array(msg.homogeneousMatrix).reshape(4,4).astype(np.float64) def capture_data(self): if not all([self.current_image, self.current_arm_pose, self.camera_info]): logger.error("数据不完整,无法采集") return False try: gray = cv2.cvtColor(self.current_image, cv2.COLOR_BGR2GRAY) ret, corners = cv2.findChessboardCorners(gray, self.pattern_size, None) if not ret: logger.warning("未检测到棋盘格") return False criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001) corners_refined = cv2.cornerSubPix(gray, corners, (11,11), (-1,-1), criteria) ret, rvec, tvec = cv2.solvePnP(self.objp, corners_refined, self.camera_info['K'], self.camera_info['D']) if not ret: logger.error("solvePnP失败") return False R_board_cam, _ = cv2.Rodrigues(rvec) T_board_cam = np.eye(4) T_board_cam[:3, :3] = R_board_cam T_board_cam[:3, 3] = tvec.flatten() self.gripper_poses.append(self.current_arm_pose.copy()) self.target_poses.append(T_board_cam.copy()) self.images.append(self.current_image.copy()) logger.info("成功采集位姿数据: %d/%d", len(self.gripper_poses), self.max_poses) # 定期保存进度 if len(self.gripper_poses) % 5 == 0: self.save_progress() return True except Exception as e: logger.exception("数据采集失败: %s", e) return False def calibrate(self): if len(self.gripper_poses) < self.min_poses: logger.error("需要至少%d个位姿数据, 当前: %d", self.min_poses, len(self.gripper_poses)) return None try: R_gripper2base, t_gripper2base = [], [] R_target2cam, t_target2cam = [], [] for i in range(len(self.gripper_poses)-1): inv_pose = np.linalg.inv(self.gripper_poses[i]) A = np.dot(inv_pose, self.gripper_poses[i+1]) R_gripper2base.append(A[:3, :3]) t_gripper2base.append(A[:3, 3]) inv_target = np.linalg.inv(self.target_poses[i]) B = np.dot(inv_target, self.target_poses[i+1]) R_target2cam.append(B[:3, :3]) t_target2cam.append(B[:3, 3]) R_cam2gripper = np.eye(3) t_cam2gripper = np.zeros(3) # 尝试不同的标定方法 methods = [ cv2.CALIB_HAND_EYE_TSAI, cv2.CALIB_HAND_EYE_PARK, cv2.CALIB_HAND_EYE_HORAUD ] best_error = float('inf') best_result = None for method in methods: try: R, t = cv2.calibrateHandEye( R_gripper2base, t_gripper2base, R_target2cam, t_target2cam, R_cam2gripper, t_cam2gripper, method=method ) # 计算误差 error = self.calculate_error(R, t) logger.info("方法 %d 标定误差: %.6f", method, error) if error < best_error: best_error = error best_result = (R, t) except Exception as e: logger.warning("标定方法 %d 失败: %s", method, str(e)) if best_result is None: logger.error("所有标定方法均失败") return None R_cam2gripper, t_cam2gripper = best_result self.T_cam_end = np.eye(4) self.T_cam_end[:3, :3] = R_cam2gripper self.T_cam_end[:3, 3] = t_cam2gripper logger.info("最佳标定误差: %.6f", best_error) logger.info("相机到机械臂末端的变换矩阵 T_cam_end:\n%s", self.T_cam_end) return self.T_cam_end except Exception as e: logger.exception("标定失败: %s", e) return None def calculate_error(self, R, t): """计算标定误差""" errors = [] for i in range(len(self.gripper_poses)): # 计算预测的标定板位姿 T_cam_end = np.eye(4) T_cam_end[:3, :3] = R T_cam_end[:3, 3] = t.flatten() predicted_target = T_cam_end.dot(self.gripper_poses[i]).dot(np.linalg.inv(T_cam_end)) # 计算与实测位姿的差异 error = np.linalg.norm(predicted_target[:3, 3] - self.target_poses[i][:3, 3]) errors.append(error) return np.mean(errors) def save_calibration(self, filename="hand_eye_calibration.yaml"): if self.T_cam_end is None: logger.error("尚未标定,无法保存结果") return False try: # 计算最终误差 final_error = self.calculate_error(self.T_cam_end[:3, :3], self.T_cam_end[:3, 3]) data = { 'T_cam_end': self.T_cam_end.tolist(), 'camera_matrix': self.camera_info['K'].tolist(), 'distortion_coefficients': self.camera_info['D'].tolist(), 'calibration_date': time.strftime("%Y-%m-%d %H:%M:%S"), 'num_poses': len(self.gripper_poses), 'pattern_size': list(self.pattern_size), 'square_size': self.square_size, 'calibration_error': float(final_error) } with open(filename, 'w') as f: yaml.dump(data, f, default_flow_style=False) logger.info("标定结果已保存至: %s (误差: %.6f)", filename, final_error) # 保存采集的图像 self.save_calibration_images() # 清理进度文件 if os.path.exists("calibration_progress.yaml"): os.remove("calibration_progress.yaml") return True except Exception as e: logger.exception("保存失败: %s", e) return False def save_calibration_images(self): """保存采集的图像用于验证""" try: save_dir = "calibration_images" os.makedirs(save_dir, exist_ok=True) for i, img in enumerate(self.images): filename = os.path.join(save_dir, f"pose_{i:02d}.png") cv2.imwrite(filename, img) logger.info("保存了%d张标定图像到目录: %s", len(self.images), save_dir) return True except Exception as e: logger.error("保存图像失败: %s", e) return False def main(): # 修复X11环境 os.system('xhost +local:') # 允许本地连接 os.system('export DISPLAY=:0') # 确保显示设置 # 初始化校准器 calibrator = RobustCalibrator() # 尝试加载之前的进度 calibrator.load_progress() rospy.sleep(2.0) # 等待初始数据 logger.info("\n===== 鲁棒标定系统操作指南 =====") logger.info("1. 移动机械臂使棋盘格在相机视野中央") logger.info("2. 按回车键采集当前位姿 (需要至少%d个不同位姿)", calibrator.min_poses) logger.info("3. 采集完成后输入 'c' 开始标定") logger.info("4. 标定完成后输入 's' 保存结果") logger.info("5. 输入 'q' 退出程序") logger.info("6. 系统会自动保存进度,意外中断后可恢复") try: while not rospy.is_shutdown() and len(calibrator.gripper_poses) < calibrator.max_poses: cmd = raw_input("等待命令 (回车采集/'c'标定/'s'保存/'q'退出): ").strip().lower() if cmd == '': if calibrator.capture_data(): logger.info("数据采集成功") elif cmd == 'c': if len(calibrator.gripper_poses) < calibrator.min_poses: logger.warning("需要至少%d个位姿,当前只有%d个", calibrator.min_poses, len(calibrator.gripper_poses)) else: result = calibrator.calibrate() if result is not None: logger.info("标定成功") elif cmd == 's': if calibrator.T_cam_end is None: logger.warning("请先执行标定 ('c')") else: calibrator.save_calibration() elif cmd == 'q': logger.info("程序退出") calibrator.cleanup_resources() break except rospy.ROSInterruptException: calibrator.cleanup_resources() except Exception as e: logger.exception("主循环错误: %s", e) calibrator.cleanup_resources() if __name__ == "__main__": main() 修改

最新推荐

recommend-type

微软解决方案面向服务的架构.doc

微软解决方案面向服务的架构.doc
recommend-type

Huawei S6780-H-V600R024SPH120

Huawei S6780-H_V600R024SPH120,里面包含补丁说明书和补丁安装指导书,该补丁支持哪些型号,支持哪些版本可以安装当前补丁,请参考补丁说明书和补丁安装指导书。
recommend-type

VC图像编程全面资料及程序汇总

【标题】:"精通VC图像编程资料全览" 【知识点】: VC即Visual C++,是微软公司推出的一个集成开发环境(IDE),专门用于C++语言的开发。VC图像编程涉及到如何在VC++开发环境中处理和操作图像。在VC图像编程中,开发者通常会使用到Windows API中的GDI(图形设备接口)或GDI+来进行图形绘制,以及DirectX中的Direct2D或DirectDraw进行更高级的图形处理。 1. GDI(图形设备接口): - GDI是Windows操作系统提供的一套应用程序接口,它允许应用程序通过设备无关的方式绘制图形。 - 在VC图像编程中,主要使用CDC类(设备上下文类)来调用GDI函数进行绘制,比如绘制线条、填充颜色、显示文本等。 - CDC类提供了很多函数,比如`MoveTo`、`LineTo`、`Rectangle`、`Ellipse`、`Polygon`等,用于绘制基本的图形。 - 对于图像处理,可以使用`StretchBlt`、`BitBlt`、`TransparentBlt`等函数进行图像的位块传输。 2. GDI+: - GDI+是GDI的后继技术,提供了更丰富的图形处理功能。 - GDI+通过使用`Graphics`类来提供图像的绘制、文本的渲染、图像的处理和颜色管理等功能。 - GDI+引入了对矢量图形、渐变色、复杂的文本格式和坐标空间等更高级的图形处理功能。 - `Image`类是GDI+中用于图像操作的基础类,通过它可以进行图像的加载、保存、旋转、缩放等操作。 3. DirectX: - DirectX是微软推出的一系列API集合,用于在Windows平台上进行高性能多媒体编程。 - DirectX中的Direct2D是用于硬件加速的二维图形API,专门用于UI元素和简单的图形渲染。 - DirectDraw主要用于硬件加速的位图操作,比如全屏游戏开发中的画面渲染。 4. 位图操作: - 在VC图像编程中,位图操作是一个重要的部分。需要了解如何加载、保存和处理位图(BMP)文件。 - 可以使用位图文件格式的解析,来访问位图的像素数据,进行像素级别的图像处理和修改。 5. 高级图像处理技术: - 包括图像滤镜、图像转换、图像压缩和解压缩技术。 - 需要掌握一些图像处理算法,比如卷积、FFT(快速傅里叶变换)、DCT(离散余弦变换)等。 - 了解图像的色彩空间转换,比如RGB到YUV的转换,这在视频处理中非常重要。 6. 图像库的使用: - 除了直接使用API进行图像处理之外,还可以使用开源的图像处理库,如OpenCV。 - OpenCV是一个跨平台的计算机视觉和机器学习软件库,它提供了很多高级的图像处理功能。 【压缩包子文件的文件名称列表】: VC++ 文件名称列表显示为"VC++",这表明文件包中可能包含了Visual C++的项目、源代码文件、动态链接库(DLLs)、编译器设置、链接器设置等。如果要使用这些文件进行VC图像编程,需要确保Visual C++开发环境已经安装并且配置好,同时需要安装好所有依赖的库和工具。 要开始VC图像编程,开发者需要具备C++编程基础,熟悉Windows编程概念,并且对图形学有一定的了解。掌握VC图像编程技巧,对于进行桌面应用程序开发、图像处理软件开发以及游戏开发等都是至关重要的。
recommend-type

Pokemmo响应速度翻倍:多线程处理的高级技巧

# 摘要 多线程处理是提高软件并发性能和响应速度的关键技术,本文从基础概念与优势出发,深入探讨了多线程的实现机制,包括线程模型、调度算法、编程关键技术、线程安全性最佳实践、死锁预防与处理等方面。通
recommend-type

人名列表滚动抽奖

<think>嗯,用户想实现一个滚动抽奖功能,需要前端动画效果和随机选择逻辑。首先,我得考虑前端怎么实现滚动效果。可能用CSS动画或者JavaScript的requestAnimationFrame?比如,使用CSS的@keyframes来控制位移,或者用JS动态更新样式。然后,随机选择算法,可能需要确保公平性,比如用Fisher-Yates洗牌算法,或者用Math.random()来生成随机索引。然后,用户可能需要平滑的滚动动画,比如先快速滚动,然后逐渐减速,最后停在选中的人名上。这可能需要设置定时器,逐步改变位置,或者使用CSS过渡效果。另外,还要考虑性能,避免页面卡顿,可能需要使用硬件加
recommend-type

一站式JSF开发环境:即解压即用JAR包

标题:“jsf开发完整JAR包”所指的知识点: 1. JSF全称JavaServer Faces,是Java EE(现EE4J)规范之一,用于简化Java Web应用中基于组件的用户界面构建。JSF提供了一种模型-视图-控制器(MVC)架构的实现,使得开发者可以将业务逻辑与页面表示分离。 2. “开发完整包”意味着这个JAR包包含了JSF开发所需的所有类库和资源文件。通常来说,一个完整的JSF包会包含核心的JSF库,以及一些可选的扩展库,例如PrimeFaces、RichFaces等,这些扩展库提供了额外的用户界面组件。 3. 在一个项目中使用JSF,开发者无需单独添加每个必要的JAR文件到项目的构建路径中。因为打包成一个完整的JAR包后,所有这些依赖都被整合在一起,极大地方便了开发者的部署工作。 4. “解压之后就可以直接导入工程中使用”表明这个JAR包是一个可执行的归档文件,可能是一个EAR包或者一个可直接部署的Java应用包。解压后,开发者只需将其内容导入到他们的IDE(如Eclipse或IntelliJ IDEA)中,或者将其放置在Web应用服务器的正确目录下,就可以立即进行开发。 描述中所指的知识点: 1. “解压之后就可以直接导入工程中使用”说明这个JAR包是预先配置好的,它可能包含了所有必要的配置文件,例如web.xml、faces-config.xml等,这些文件是JSF项目运行所必需的。 2. 直接使用意味着减少了开发者配置环境和处理依赖的时间,有助于提高开发效率。 标签“jsf jar包”所指的知识点: 1. 标签指明了JAR包的内容是专门针对JSF框架的。因此,这个JAR包包含了JSF规范所定义的API以及可能包含的具体实现,比如Mojarra或MyFaces。 2. “jar包”是一种Java平台的归档文件格式,用于聚合多个文件到一个文件中。在JSF开发中,JAR文件经常被用来打包和分发库或应用程序。 文件名称列表“jsf”所指的知识点: 1. “jsf”文件名可能意味着这是JSF开发的核心库,它应该包含了所有核心的JavaServer Faces类文件以及资源文件。 2. 如果是使用特定版本的JSF,例如“jsf-2.2.jar”,则表明文件内包含了对应版本的JSF实现。这种情况下,开发者必须确认他们所使用的Web服务器或应用程序服务器支持该版本的JSF。 3. 文件名称也可能是“jsf-components.jar”、“jsf-impl.jar”等,表明这个JAR包是JSF的一个子模块或特定功能组件。例如,“jsf-components.jar”可能包含了一系列用于在JSF应用中使用的自定义组件。 4. 对于开发者而言,了解文件名称中所蕴含的信息非常重要,因为这将决定他们需要下载哪些JAR包来满足特定项目的需求。 综合以上信息,开发者在使用JSF进行Java Web应用开发时,会通过一个预先配置好的JAR包来快速地搭建和启动项目。这样做不仅简化了项目初始化的过程,也使得开发者能够更加聚焦于业务逻辑的实现和界面设计,而不必深究底层框架配置的细节。
recommend-type

Pokemmo内存优化揭秘:专家教你如何降低50%资源消耗

# 摘要 本文综述了Pokemmo游戏的内存优化方法,从内存管理基础出发,探讨内存使用效率的影响因素,并介绍了性能监控与分析工具。在内存优化实践技巧章节中,详细讨论了代码层面的优化、数据结构和算法选择对内存效率的影响,并通过案例分析展示了实际的优化过程。针对Pokemmo游戏特点,分析了内存消耗特性并提出了特定优化技术。最后,本文展望了未来内存管理技术的发展方向,以及游戏开发中面临的新挑战,为Pokemmo及类似游戏提供了优化建议。 # 关键字 内存优化;内存管理;性能监控;数据结构;算法效率;游戏开发 参考资源链接:[Pokemmo必备资源包:四种ROM与汉化补丁](https://we
recommend-type

直接访问子路由是吧

<think>我们正在使用Vue2,因此需要参考VueRouterv3的文档(因为Vue2对应VueRouterv3,Vue3对应VueRouterv4)。用户的问题是:如何配置路由以支持直接访问子路由?即,在Vue2中,我们希望直接通过URL访问嵌套的子路由(例如:/parent/child),而不仅仅是先访问父路由再导航到子路由。根据之前的回答和引用,我们已经知道:1.在父路由的配置中,使用`children`数组来定义子路由。2.子路由的`path`不能以斜杠开头(例如:'child'而不是'/child'),这样它就会基于父路由的路径进行拼接。3.在父组件的模板中放置`<router-
recommend-type

C++函数库查询辞典使用指南与功能介绍

标题中提到的“C++函数库查询辞典”指的是一个参考工具书或者是一个软件应用,专门用来查询C++编程语言中提供的标准库中的函数。C++是一种静态类型、编译式、通用编程语言,它支持多种编程范式,包括过程化、面向对象和泛型编程。C++标准库是一组包含函数、类、迭代器和模板的库,它为C++程序员提供标准算法和数据结构。 描述中提供的内容并没有给出实际的知识点,只是重复了标题的内容,并且有一串无关的字符“sdfsdfsdffffffffffffffffff”,因此这部分内容无法提供有价值的信息。 标签“C++ 函数库 查询辞典”强调了该工具的用途,即帮助开发者查询C++的标准库函数。它可能包含每个函数的详细说明、语法、使用方法、参数说明以及示例代码等,是学习和开发过程中不可或缺的参考资源。 文件名称“c++函数库查询辞典.exe”表明这是一个可执行程序。在Windows操作系统中,以“.exe”结尾的文件通常是可执行程序。这意味着用户可以通过双击或者命令行工具来运行这个程序,进而使用其中的查询功能查找C++标准库中各类函数的详细信息。 详细知识点如下: 1. C++标准库的组成: C++标准库由多个组件构成,包括输入输出流(iostream)、算法(algorithm)、容器(container)、迭代器(iterator)、字符串处理(string)、数值计算(numeric)、本地化(locale)等。 2. 输入输出流(iostream)库: 提供输入输出操作的基本功能。使用诸如iostream、fstream、sstream等头文件中的类和对象(如cin, cout, cerr等)来实现基本的输入输出操作。 3. 算法(algorithm)库: 包含对容器进行操作的大量模板函数,如排序(sort)、查找(find)、拷贝(copy)等。 4. 容器(container)库: 提供各种数据结构,如向量(vector)、列表(list)、队列(queue)、映射(map)等。 5. 迭代器(iterator): 迭代器提供了一种方法来访问容器中的元素,同时隐藏了容器的内部结构。 6. 字符串处理(string)库: C++标准库中的字符串类提供了丰富的功能用于处理字符串。 7. 数值计算(numeric)库: 提供数值计算所需的函数和类,比如对复数的支持和数值算法。 8. 本地化(locale)库: 提供本地化相关的功能,比如日期、时间的格式化显示以及字符的本地化比较。 9. 错误处理和异常: C++通过throw、try、catch关键字和标准异常类提供了一套异常处理机制。 10. 智能指针: C++11及其后续版本提供了智能指针(如unique_ptr、shared_ptr、weak_ptr)来自动管理动态分配的内存。 11. lambda表达式: 在C++11中引入,允许临时创建匿名函数对象。 12. C++11新特性: 包括范围for循环、移动语义、类内初始化器、auto类型推导等。 使用C++函数库查询辞典的用户可能需要对C++的基础知识有一定的掌握,例如变量、数据类型、控制结构、函数以及面向对象的概念等。了解C++标准库的结构和内容能够帮助程序员有效地利用库函数进行软件开发,提高编程效率并减少重复造轮子的工作。 总结来说,一个C++函数库查询辞典工具对于C++程序员来说是一个非常有用的资源,它能够提供快速查找标准库函数的能力,帮助程序员更高效地解决问题和进行学习。同时,随着C++标准的不断更新,例如C++11、C++14、C++17和C++20,函数库查询辞典也会不断地更新以包含新的特性,这对于紧跟技术发展的开发者来说尤为重要。
recommend-type

【bat脚本安全最佳实践】:保护你的系统与脚本安全的黄金法则

# 摘要 本文旨在全面阐述BAT脚本的基础知识、安全编写原则、审查与优化方法以及在企业环境中的安全应用。通过深入分析脚本安全基础、常见安全陷阱及脚本的权限管理,文章提出了安全编写的具体实践和预防措施。本文详细介绍了脚本安全审查流程、代码优化、错误处理和安全更新维护策略。在企业应用方面,探讨了企业安全政策制定、脚本审计和版本控制以及外部威胁的防范措施。通过案例分析,总结了脚本