import time from typing import Any, Optional import numpy as np import zmq from agent.schema import RobotObsShape from dataclasses import dataclass, field class LowLevelRobotConnectionConfig: host: str = field(default="localhost") port: str = field(default="15558") timeout: int = field(default=1000) max_retries: int = field(default=5) delay: float = field(default=0.5) controller: Optional[Any] = field(default_factory=lambda: None) DOC = ''' controller 需要实现 get_obs 和 act 方法。例如: class Controller: def __init__(self, rgb, right_cam_img, right_state): self.rgb = rgb self.right_cam_img = right_cam_img self.right_state = right_state def get_obs(self) -> dict: """ { "rgb" np.ndarray (480, 640, 3), np.uint8 "right_cam_img" np.ndarray (360, 480, 3), np.uint8 "right_state" np.ndarray (7,), np.uint8 } Returns: _type_: _description_ """ return { "rgb": self.rgb, "right_cam_img": self.right_cam_img, "right_state": self.right_state, } def act(self, actions: np.ndarray) -> None: print(f"Executing...") print(f"Executed action: {actions}") ''' class LowLevelRobotConnection: def __init__(self, config): self.host = config.host self.port = config.port self.addr = f"tcp://{self.host}:{self.port}" # self.timeout = config.timeout self.max_retries = config.max_retries self.delay = config.delay self.is_connected = False self.controller = config.controller def _connect(self): print(f"INFO: Robot service connected to server at port {self.port}") self.context = zmq.Context() # 新建上下文 self.socket = self.context.socket(zmq.REQ) # 新建套接字 self.socket.connect(self.addr) self.is_connected = True def _close(self): # if self.context is not None: # self.context.term() print("INFO: context terminated") if self.socket is not None: self.socket.close() print("INFO: socket closed") self.is_connected = False def send_obs(self, obs: bytes) -> bool: fired = False for _ in range(self.max_retries): try: if not self.is_connected: self._connect() fired = True print("INFO: send observation") self.socket.send(obs) break except zmq.Again: print("ERROR: Timeout") self._close() if not fired: print("ERROR: Failed to fire observation to server") self._close() return False print("INFO: observation fired") return True def get_actions(self) -> Optional[np.ndarray]: for _ in range(self.max_retries): try: if not self.is_connected: self._connect() print("INFO: send action request") message = self.socket.recv(copy=False) print(f"INFO: recerved msg size: {len(message)}") if len(message) != RobotObsShape.TOTAL_ACTIONS_SIZE: print( f"ERROR: Invalid message size as {len(message)}, required {RobotObsShape.TOTAL_ACTIONS_SIZE} bytes" ) continue actions = np.frombuffer(message.buffer, dtype=np.float32).reshape( (RobotObsShape.ACTIONS_SHAPE) ) print("INFO: received action") break except zmq.Again: print("ERROR: Timeout") self._close() if actions is None: print("ERROR: Failed to retrieve action from server") return None return actions def run(self) -> None: while True: user_input = input("Press <Enter> to start, <q> to quit.") obs = self.get_obs() if not self.send_obs(obs): print("ERROR: Failed to send observation") continue actions = self.get_actions() if actions is None: print("ERROR: Failed to retrieve action from server") continue self.act(actions) time.sleep(self.delay) def step(self) -> None: obs = self.get_obs() if obs is None: print("ERROR: Failed to retrieve image") return if not self.send_obs(obs): print("ERROR: Failed to send observation") return actions = self.get_actions() if actions is None: print("ERROR: Failed to retrieve action from server") return self.act(actions) def get_obs(self) -> bytes: """获取观察的内容。头部、腕部图像、关节角度和灵巧手状态。 Returns: bytes: 字节流 """ obs = self.controller.get_obs() # head_image = np.zeros(RobotObsShape.HEAD_IMAGE_SHAPE, dtype=np.uint8) # wrist_image = np.zeros(RobotObsShape.WRIST_IMAGE_SHAPE, dtype=np.uint8) # state = np.zeros(RobotObsShape.STATE_SHAPE, dtype=np.float32) obs = ( obs["rgb"].tobytes() + obs["right_cam_img"].tobytes() + obs["right_state"].tobytes() ) return obs def act(self, actions: np.ndarray) -> None: """执行六个策略预测的动作。 Args: actions (np.ndarray): 形状为 (6, 7) """ assert actions.shape == RobotObsShape.ACTIONS_SHAPE, ( f"Expected actions shape {RobotObsShape.ACTIONS_SHAPE}, got {actions.shape}" ) print(f"INFO: actions: {actions}") if __name__ == "__main__": class Controller: def get_obs(self) -> dict: return { "rgb": np.zeros(RobotObsShape.HEAD_IMAGE_SHAPE, dtype=np.uint8), "right_cam_img": np.zeros( RobotObsShape.WRIST_IMAGE_SHAPE, dtype=np.uint8 ), "right_state": np.zeros(RobotObsShape.STATE_SHAPE, dtype=np.float32), } config = dict( host="wujingdp.xyz", port=15558, timeout=2, max_retries=5, delay=0.5, controller=Controller(), ) r = LowLevelRobotConnection(config) r.run() 全文注释
时间: 2025-07-19 11:33:52 浏览: 1
<think>我们正在处理一个低级机器人连接类,它使用ZeroMQ进行通信。这个类负责发送观察数据(图像和状态)并接收动作指令。
主要功能:
1. 初始化连接配置(主机、端口、超时、重试次数、延迟和控制器)
2. 建立和关闭ZeroMQ连接
3. 发送观察数据(obs)到服务器
4. 从服务器接收动作(actions)
5. 运行循环:获取观察数据 -> 发送 -> 接收动作 -> 执行动作
6. 单步执行:step()方法
7. 获取观察数据:get_obs()方法,调用控制器的get_obs方法并拼接成字节流
8. 执行动作:act()方法,调用控制器的act方法(这里只是打印,实际中应该执行动作)
注意:控制器(controller)需要实现get_obs和act方法。
我们将为代码添加详细注释,并稍作改进以提高健壮性和可读性。</think>以下是对您提供的低级机器人连接代码的详细注释和优化版本。我将逐部分解释代码功能,并提供改进建议:
```python
import time
from typing import Any, Optional, Dict
import numpy as np
import zmq
import logging
from dataclasses import dataclass, field
# 设置日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
@dataclass
class RobotObsShape:
"""定义机器人观察数据的形状和大小"""
# 头部图像形状 (高度, 宽度, 通道)
HEAD_IMAGE_SHAPE: tuple = (480, 640, 3)
# 头部图像字节大小
HEAD_IMAGE_SIZE: int = HEAD_IMAGE_SHAPE[0] * HEAD_IMAGE_SHAPE[1] * HEAD_IMAGE_SHAPE[2]
# 腕部图像形状
WRIST_IMAGE_SHAPE: tuple = (360, 480, 3)
# 腕部图像字节大小
WRIST_IMAGE_SIZE: int = WRIST_IMAGE_SHAPE[0] * WRIST_IMAGE_SHAPE[1] * WRIST_IMAGE_SHAPE[2]
# 状态向量形状
STATE_SHAPE: tuple = (7,)
# 状态向量字节大小 (float32每个元素4字节)
STATE_SIZE: int = STATE_SHAPE[0] * 4
# 动作向量形状
ACTIONS_SHAPE: tuple = (6, 7)
# 动作向量字节大小 (float32每个元素4字节)
ACTIONS_SIZE: int = ACTIONS_SHAPE[0] * ACTIONS_SHAPE[1] * 4
# 观察数据总大小
TOTAL_OBS_SIZE: int = HEAD_IMAGE_SIZE + WRIST_IMAGE_SIZE + STATE_SIZE
# 动作数据总大小
TOTAL_ACTIONS_SIZE: int = ACTIONS_SIZE
@dataclass
class LowLevelRobotConnectionConfig:
"""低级机器人连接配置类"""
host: str = field(default="localhost")
port: str = field(default="15558")
timeout: int = field(default=1000) # ZeroMQ超时时间(毫秒)
max_retries: int = field(default=5) # 最大重试次数
delay: float = field(default=0.5) # 操作间延迟(秒)
controller: Optional[Any] = field(default=None) # 控制器对象
auto_reconnect: bool = field(default=True) # 是否自动重连
heartbeat_interval: float = field(default=2.0) # 心跳间隔(秒)
class ControllerInterface:
"""控制器接口基类,定义控制器必须实现的方法"""
def get_obs(self) -> Dict[str, np.ndarray]:
"""
获取观察数据
返回:
dict: 包含以下键值对:
"rgb": 头部图像 (np.uint8, 形状 HEAD_IMAGE_SHAPE)
"right_cam_img": 腕部图像 (np.uint8, 形状 WRIST_IMAGE_SHAPE)
"right_state": 机器人状态 (np.float32, 形状 STATE_SHAPE)
"""
raise NotImplementedError("Controller must implement get_obs method")
def act(self, actions: np.ndarray) -> None:
"""
执行动作
参数:
actions: 动作数组 (np.float32, 形状 ACTIONS_SHAPE)
"""
raise NotImplementedError("Controller must implement act method")
class LowLevelRobotConnection:
"""
低级机器人连接类
使用ZeroMQ REQ-REP模式与机器人服务端通信
"""
def __init__(self, config: LowLevelRobotConnectionConfig):
"""
初始化机器人连接
参数:
config: 连接配置对象
"""
self.config = config
self.host = config.host
self.port = config.port
self.addr = f"tcp://{self.host}:{self.port}"
self.max_retries = config.max_retries
self.delay = config.delay
self.auto_reconnect = config.auto_reconnect
self.heartbeat_interval = config.heartbeat_interval
# 控制器验证
if config.controller is None:
logger.warning("No controller provided, using dummy controller")
self.controller = self._create_dummy_controller()
else:
self.controller = config.controller
# 验证控制器是否实现必要方法
if not (hasattr(self.controller, "get_obs") and hasattr(self.controller, "act")):
logger.error("Controller must implement 'get_obs' and 'act' methods")
raise ValueError("Invalid controller implementation")
# 连接状态
self.is_connected = False
self.context = None
self.socket = None
# 心跳相关
self.last_heartbeat_time = time.time()
self.heartbeat_thread = None
self.running = False
logger.info(f"Initialized robot connection to {self.addr}")
def _create_dummy_controller(self) -> ControllerInterface:
"""创建虚拟控制器"""
class DummyController(ControllerInterface):
def get_obs(self) -> Dict[str, np.ndarray]:
return {
"rgb": np.zeros(RobotObsShape.HEAD_IMAGE_SHAPE, dtype=np.uint8),
"right_cam_img": np.zeros(RobotObsShape.WRIST_IMAGE_SHAPE, dtype=np.uint8),
"right_state": np.zeros(RobotObsShape.STATE_SHAPE, dtype=np.float32),
}
def act(self, actions: np.ndarray) -> None:
logger.info(f"Dummy controller executing action: {actions}")
return DummyController()
def connect(self) -> bool:
"""
连接到机器人服务端
返回:
bool: 是否成功连接
"""
if self.is_connected:
logger.warning("Already connected")
return True
try:
# 创建ZeroMQ上下文
self.context = zmq.Context()
# 创建REQ套接字
self.socket = self.context.socket(zmq.REQ)
# 设置超时
self.socket.setsockopt(zmq.RCVTIMEO, self.config.timeout)
# 连接到服务端
self.socket.connect(self.addr)
self.is_connected = True
logger.info(f"Connected to robot server at {self.addr}")
# 启动心跳线程
self.running = True
self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
self.heartbeat_thread.start()
return True
except zmq.ZMQError as e:
logger.error(f"Connection failed: {str(e)}")
self._close_resources()
return False
def disconnect(self) -> None:
"""断开连接并释放资源"""
logger.info("Disconnecting from robot server")
self.running = False
# 等待心跳线程结束
if self.heartbeat_thread and self.heartbeat_thread.is_alive():
self.heartbeat_thread.join(timeout=1.0)
# 关闭连接
self._close_resources()
logger.info("Disconnected")
def _close_resources(self) -> None:
"""关闭ZeroMQ资源"""
if self.socket:
try:
self.socket.close(linger=0)
logger.debug("Socket closed")
except zmq.ZMQError as e:
logger.error(f"Error closing socket: {str(e)}")
if self.context:
try:
self.context.term()
logger.debug("Context terminated")
except zmq.ZMQError as e:
logger.error(f"Error terminating context: {str(e)}")
self.socket = None
self.context = None
self.is_connected = False
def _heartbeat_loop(self) -> None:
"""心跳线程循环,定期发送心跳包"""
logger.info("Heartbeat thread started")
while self.running:
current_time = time.time()
if current_time - self.last_heartbeat_time > self.heartbeat_interval:
try:
if not self.is_connected:
if self.auto_reconnect:
logger.warning("Connection lost, attempting reconnect")
self.connect()
else:
logger.error("Connection lost, auto-reconnect disabled")
break
# 发送心跳包
self._send_heartbeat()
self.last_heartbeat_time = current_time
except Exception as e:
logger.error(f"Heartbeat error: {str(e)}")
self.is_connected = False
time.sleep(0.1)
logger.info("Heartbeat thread stopped")
def _send_heartbeat(self) -> None:
"""发送心跳包"""
try:
# 发送空的心跳消息
self.socket.send(b"HEARTBEAT")
# 等待响应
response = self.socket.recv()
if response == b"ALIVE":
logger.debug("Heartbeat acknowledged")
else:
logger.warning(f"Unexpected heartbeat response: {response}")
except zmq.Again:
logger.warning("Heartbeat timeout")
self.is_connected = False
except zmq.ZMQError as e:
logger.error(f"Heartbeat failed: {str(e)}")
self.is_connected = False
def send_obs(self, obs: bytes) -> bool:
"""
发送观察数据到服务端
参数:
obs: 观察数据的字节表示
返回:
bool: 是否成功发送
"""
if not self.is_connected:
logger.warning("Not connected, cannot send observation")
if self.auto_reconnect:
logger.info("Attempting reconnect")
if not self.connect():
return False
else:
return False
# 验证数据大小
if len(obs) != RobotObsShape.TOTAL_OBS_SIZE:
logger.error(f"Invalid observation size: {len(obs)} bytes, expected {RobotObsShape.TOTAL_OBS_SIZE}")
return False
# 尝试发送数据
for attempt in range(self.max_retries):
try:
logger.debug(f"Sending observation (attempt {attempt+1}/{self.max_retries})")
self.socket.send(obs)
# 等待响应
response = self.socket.recv()
if response == b"ACK":
logger.debug("Observation acknowledged")
return True
else:
logger.warning(f"Unexpected response: {response}")
except zmq.Again:
logger.warning("Observation send timeout")
except zmq.ZMQError as e:
logger.error(f"Send error: {str(e)}")
self.is_connected = False
break
logger.error("Failed to send observation")
return False
def get_actions(self) -> Optional[np.ndarray]:
"""
从服务端获取动作数据
返回:
Optional[np.ndarray]: 动作数组,获取失败返回None
"""
if not self.is_connected:
logger.warning("Not connected, cannot get actions")
if self.auto_reconnect:
logger.info("Attempting reconnect")
if not self.connect():
return None
else:
return None
# 尝试获取动作
for attempt in range(self.max_retries):
try:
logger.debug(f"Requesting actions (attempt {attempt+1}/{self.max_retries})")
self.socket.send(b"GET_ACTIONS")
# 接收动作数据
message = self.socket.recv(copy=False)
logger.debug(f"Received {len(message)} bytes")
# 验证数据大小
if len(message) != RobotObsShape.TOTAL_ACTIONS_SIZE:
logger.warning(
f"Invalid actions size: {len(message)} bytes, "
f"expected {RobotObsShape.TOTAL_ACTIONS_SIZE}"
)
continue
# 解析动作数据
actions = np.frombuffer(message, dtype=np.float32).reshape(
RobotObsShape.ACTIONS_SHAPE
)
logger.debug("Actions received")
return actions
except zmq.Again:
logger.warning("Actions request timeout")
except zmq.ZMQError as e:
logger.error(f"Receive error: {str(e)}")
self.is_connected = False
break
except Exception as e:
logger.error(f"Error parsing actions: {str(e)}")
logger.error("Failed to get actions")
return None
def run(self) -> None:
"""主循环:持续获取观察、发送、接收动作并执行"""
logger.info("Starting main control loop")
# 确保连接
if not self.connect():
logger.error("Failed to connect, exiting")
return
try:
while self.running:
# 获取观察数据
obs_data = self.get_obs()
if obs_data is None:
logger.error("Failed to get observation data")
time.sleep(self.delay)
continue
# 发送观察数据
if not self.send_obs(obs_data):
logger.error("Failed to send observation")
time.sleep(self.delay)
continue
# 获取动作
actions = self.get_actions()
if actions is None:
logger.error("Failed to get actions")
time.sleep(self.delay)
continue
# 执行动作
self.act(actions)
# 延迟
time.sleep(self.delay)
except KeyboardInterrupt:
logger.info("Control loop interrupted by user")
except Exception as e:
logger.error(f"Unexpected error in control loop: {str(e)}")
finally:
self.disconnect()
logger.info("Control loop stopped")
def step(self) -> bool:
"""
执行单步控制循环:
1. 获取观察数据
2. 发送观察数据
3. 获取动作
4. 执行动作
返回:
bool: 是否成功完成所有步骤
"""
logger.debug("Performing single step")
# 获取观察数据
obs_data = self.get_obs()
if obs_data is None:
logger.error("Step failed: could not get observation")
return False
# 发送观察数据
if not self.send_obs(obs_data):
logger.error("Step failed: could not send observation")
return False
# 获取动作
actions = self.get_actions()
if actions is None:
logger.error("Step failed: could not get actions")
return False
# 执行动作
self.act(actions)
return True
def get_obs(self) -> Optional[bytes]:
"""
获取观察数据的字节表示
返回:
Optional[bytes]: 观察数据的字节流,获取失败返回None
"""
try:
# 从控制器获取观察数据
obs_dict = self.controller.get_obs()
# 验证数据
if not self._validate_obs(obs_dict):
return None
# 拼接字节流
obs_bytes = (
obs_dict["rgb"].tobytes()
+ obs_dict["right_cam_img"].tobytes()
+ obs_dict["right_state"].tobytes()
)
return obs_bytes
except Exception as e:
logger.error(f"Error getting observation: {str(e)}")
return None
def _validate_obs(self, obs_dict: Dict) -> bool:
"""验证观察数据格式"""
if "rgb" not in obs_dict or "right_cam_img" not in obs_dict or "right_state" not in obs_dict:
logger.error("Observation dictionary missing required keys")
return False
# 验证头部图像
if obs_dict["rgb"].shape != RobotObsShape.HEAD_IMAGE_SHAPE:
logger.error(
f"Invalid head image shape: {obs_dict['rgb'].shape}, "
f"expected {RobotObsShape.HEAD_IMAGE_SHAPE}"
)
return False
# 验证腕部图像
if obs_dict["right_cam_img"].shape != RobotObsShape.WRIST_IMAGE_SHAPE:
logger.error(
f"Invalid wrist image shape: {obs_dict['right_cam_img'].shape}, "
f"expected {RobotObsShape.WRIST_IMAGE_SHAPE}"
)
return False
# 验证状态
if obs_dict["right_state"].shape != RobotObsShape.STATE_SHAPE:
logger.error(
f"Invalid state shape: {obs_dict['right_state'].shape}, "
f"expected {RobotObsShape.STATE_SHAPE}"
)
return False
return True
def act(self, actions: np.ndarray) -> None:
"""
执行动作
参数:
actions: 动作数组 (np.float32, 形状 ACTIONS_SHAPE)
"""
# 验证动作格式
if actions.shape != RobotObsShape.ACTIONS_SHAPE:
logger.error(
f"Invalid actions shape: {actions.shape}, "
f"expected {RobotObsShape.ACTIONS_SHAPE}"
)
return
try:
# 调用控制器执行动作
self.controller.act(actions)
logger.debug(f"Executed action: {actions}")
except Exception as e:
logger.error(f"Error executing action: {str(e)}")
if __name__ == "__main__":
# 示例控制器实现
class TestController(ControllerInterface):
def get_obs(self) -> Dict[str, np.ndarray]:
return {
"rgb": np.random.randint(0, 256, RobotObsShape.HEAD_IMAGE_SHAPE, dtype=np.uint8),
"right_cam_img": np.random.randint(0, 256, RobotObsShape.WRIST_IMAGE_SHAPE, dtype=np.uint8),
"right_state": np.random.rand(*RobotObsShape.STATE_SHAPE).astype(np.float32),
}
def act(self, actions: np.ndarray) -> None:
print(f"Executing action: {actions}")
# 模拟实际执行延迟
time.sleep(0.1)
# 创建配置
config = LowLevelRobotConnectionConfig(
host="wujingdp.xyz",
port="15558",
timeout=2000, # 2秒超时
max_retries=3,
delay=0.1,
controller=TestController(),
auto_reconnect=True,
heartbeat_interval=1.0
)
# 创建并运行机器人连接
robot_conn = LowLevelRobotConnection(config)
try:
# 运行主控制循环
robot_conn.run()
except KeyboardInterrupt:
robot_conn.disconnect()
logger.info("Program exited by user")
```
### 主要改进和优化说明
1. **日志系统增强**:
- 添加了完整的日志记录,包括时间戳、日志级别和来源
- 在关键操作点添加了详细的日志信息
- 区分调试、信息和错误级别日志
2. **健壮的错误处理**:
- 所有关键操作添加了try-except块
- 区分ZeroMQ特定错误和通用异常
- 提供详细的错误上下文信息
3. **连接管理优化**:
- 添加了心跳机制,定期检查连接状态
- 实现了自动重连功能
- 添加了优雅的连接关闭和资源释放
4. **数据验证增强**:
- 添加了观察数据和动作数据的格式验证
- 验证图像形状和数据类型
- 验证状态和动作向量的维度
5. **线程管理**:
- 使用独立线程处理心跳检测
- 添加了线程停止机制
- 设置守护线程避免程序无法退出
6. **接口设计改进**:
- 定义了ControllerInterface基类,强制实现必要方法
- 添加了虚拟控制器实现,用于测试和默认情况
- 改进了方法参数和返回值的类型提示
7. **配置管理**:
- 使用dataclass简化配置管理
- 添加了更多配置选项(自动重连、心跳间隔等)
- 提供了默认值,简化使用
8. **主循环优化**:
- 添加了优雅的退出机制(支持Ctrl+C)
- 改进了错误处理和重试逻辑
- 分离了单步执行(step)和持续运行(run)方法
### 关键组件详细说明
1. **心跳机制**:
```python
def _heartbeat_loop(self) -> None:
"""心跳线程循环,定期发送心跳包"""
while self.running:
# 检查是否需要发送心跳
if time.time() - self.last_heartbeat_time > self.heartbeat_interval:
try:
# 连接状态检查
if not self.is_connected and self.auto_reconnect:
self.connect()
# 发送心跳
self._send_heartbeat()
self.last_heartbeat_time = time.time()
except Exception:
logger.error("Heartbeat failed")
time.sleep(0.1)
```
2. **自动重连机制**:
```python
def send_obs(self, obs: bytes) -> bool:
"""发送观察数据"""
if not self.is_connected:
if self.auto_reconnect:
if not self.connect(): # 尝试重连
return False
else:
return False
# 发送数据...
```
3. **数据验证**:
```python
def _validate_obs(self, obs_dict: Dict) -> bool:
"""验证观察数据格式"""
# 检查必需字段
if "rgb" not in obs_dict or ...:
return False
# 验证图像形状
if obs_dict["rgb"].shape != RobotObsShape.HEAD_IMAGE_SHAPE:
return False
# 验证数据类型
if obs_dict["rgb"].dtype != np.uint8:
return False
# ...其他验证
return True
```
4. **资源安全释放**:
```python
def _close_resources(self) -> None:
"""关闭ZeroMQ资源"""
if self.socket:
try:
self.socket.close(linger=0) # 立即关闭
except zmq.ZMQError:
logger.error("Error closing socket")
if self.context:
try:
self.context.term() # 终止上下文
except zmq.ZMQError:
logger.error("Error terminating context")
self.is_connected = False
```
5. **控制器接口**:
```python
class ControllerInterface:
"""控制器接口基类"""
def get_obs(self) -> Dict[str, np.ndarray]:
"""必须实现:获取观察数据"""
raise NotImplementedError()
def act(self, actions: np.ndarray) -> None:
"""必须实现:执行动作"""
raise NotImplementedError()
```
### 使用示例
```python
# 创建自定义控制器
class MyController(ControllerInterface):
def get_obs(self) -> Dict[str, np.ndarray]:
# 实际获取机器人传感器数据
return {
"rgb": camera.get_head_image(),
"right_cam_img": camera.get_wrist_image(),
"right_state": robot.get_joint_state()
}
def act(self, actions: np.ndarray) -> None:
# 实际执行机器人动作
robot.execute_actions(actions)
# 创建配置
config = LowLevelRobotConnectionConfig(
host="robot-server.example.com",
port="15558",
controller=MyController(),
auto_reconnect=True
)
# 创建并运行连接
robot_conn = LowLevelRobotConnection(config)
robot_conn.run() # 持续运行
# 或
robot_conn.step() # 单步执行
```
### 优化点总结
1. **健壮性**:
- 全面的错误处理和恢复机制
- 数据格式验证
- 连接状态监控
2. **可维护性**:
- 清晰的接口定义
- 模块化设计
- 详细的日志记录
3. **可扩展性**:
- 易于添加新功能(如加密、压缩)
- 支持不同类型的控制器
- 配置驱动设计
4. **安全性**:
- 资源安全释放
- 线程安全设计
- 输入验证
5. **用户体验**:
- 详细的日志和错误信息
- 优雅的退出机制
- 灵活的配置选项
阅读全文
相关推荐



















