xiaoyuan@ubuntu:~/diffusion_policy_graspping/src/dataset_collection$ python hdx_obs_output_socket.py Client connected to port 15558 Exit complete Traceback (most recent call last): File "hdx_obs_output_socket.py", line 261, in <module> client.run() File "hdx_obs_output_socket.py", line 200, in run message = self.create_message() File "hdx_obs_output_socket.py", line 125, in create_message if self.head_img.shape != RobotObsShape.HEAD_IMAGE_SHAPE: AttributeError: 'NoneType' object has no attribute 'shape'
时间: 2025-07-19 08:32:27 浏览: 3
<think>我们遇到了一个错误:`AttributeError: 'NoneType' object has no attribute 'shape'`。这意味着在`create_message`方法中,我们尝试访问`self.head_img.shape`,但此时`self.head_img`为`None`。这通常是因为图像加载失败,例如文件路径错误或文件不存在。
在您的代码中,初始化时使用了:
self.head_img = cv2.imread("/dataset_ori/imgs/0_0.jpg")
self.wrist_img = cv2.imread("/dataset_ori/imgs/0_0.jpg")
如果路径`/dataset_ori/imgs/0_0.jpg`不存在,`cv2.imread`会返回`None`。另外,在`reset_state`方法中也使用了相同的路径。
为了解决这个问题,我们需要:
1. 检查路径是否正确,确保文件存在。
2. 在加载图像后检查图像是否成功加载,如果加载失败则使用默认图像或报错。
此外,在改进代码中,我们还可以添加一些错误处理机制,比如当图像加载失败时,使用一个默认的图像(例如全黑图像)代替,并打印警告信息。
修改`reset_state`方法以及初始化部分,确保图像不会为None。
以下是修改后的代码部分:
在初始化时:
self.head_img = cv2.imread("/dataset_ori/imgs/0_0.jpg")
if self.head_img is None:
print("Warning: Failed to load head image. Using black image instead.")
self.head_img = np.zeros(RobotObsShape.HEAD_IMAGE_SHAPE, dtype=np.uint8)
同样,对wrist_img做同样处理。
在`reset_state`方法中:
self.head_img = cv2.imread("/dataset_ori/imgs/0_0.jpg")
if self.head_img is None:
print("Warning: Failed to load head image. Using black image instead.")
self.head_img = np.zeros(RobotObsShape.HEAD_IMAGE_SHAPE, dtype=np.uint8)
但是,这样每次重置都加载同一张图片,而且如果图片不存在,每次都会打印警告。我们可以考虑在初始化时只加载一次默认图像(如果失败),然后重置时使用同样的默认图像。
然而,根据您的需求,您可能希望每次重置时都重新加载图像(可能图像会变化),所以每次重置都尝试加载是合理的。
但是,为了避免重复代码,我们可以写一个辅助函数来加载图像:
另外,在之前的代码中,我们使用了`create_test_image`函数,但在您提供的代码中被注释掉了。我们可以考虑在加载失败时使用这个函数来生成测试图像。
修改方案:
1. 创建一个加载图像的函数,可以指定路径,如果加载失败则生成测试图像。
2. 在初始化和重置时使用这个函数。
但是,在您提供的代码中,我们不再使用测试图像,而是使用真实图像。所以,我们只确保在加载失败时有图像可用(比如全黑图像)即可。
修改后的代码示例:
```python
def load_image(self, path):
img = cv2.imread(path)
if img is None:
print(f"Warning: Failed to load image at {path}. Using black image instead.")
return np.zeros(RobotObsShape.HEAD_IMAGE_SHAPE, dtype=np.uint8)
# 调整图像尺寸
if img.shape != RobotObsShape.HEAD_IMAGE_SHAPE:
img = cv2.resize(img, (RobotObsShape.HEAD_IMAGE_SHAPE[1], RobotObsShape.HEAD_IMAGE_SHAPE[0]))
return img
```
然后在初始化和重置时使用这个函数。
但是,在您提供的代码中,初始化时使用了两个相同的路径,而重置时也是。我们可以修改`reset_state`方法:
修改后的`reset_state`方法:
```python
def reset_state(self):
self.head_img = self.load_image("/dataset_ori/imgs/0_0.jpg")
self.wrist_img = self.load_image("/dataset_ori/imgs/0_0.jpg")
self.joint_angles = np.array([244.021978021978,39.32967032967033,17.208791208791208,291.4725274725275,119.56043956043956,75.05494505494505,0], dtype=np.float32)
print("State reset complete")
```
同样,在初始化时也使用`load_image`。
但是,我们注意到在初始化时,您使用了两个不同的图像变量,但都是同一个文件。这没问题。
另外,在运行过程中,当状态为ACTING时,每次迭代后我们更新了`self.head_img`(通过`load_next_image`),但在您提供的代码中并没有这个函数。所以,我们需要根据您的实际需求来决定是否更新图像。
在您最初提供的代码中,每次ACTING迭代都会发送同样的图像(除非重置)。如果您希望每次迭代都更新图像,那么您需要实现一个更新图像的机制。
但是,根据错误信息,问题出在初始的图像加载上,所以先解决None的问题。
此外,在`create_message`方法中,我们首先检查图像的尺寸并调整,但我们已经在上面的`load_image`中调整了尺寸,所以可以移除`create_message`中的调整代码,避免重复调整。
所以,修改后的`create_message`方法中,我们不再调整图像尺寸,因为我们在加载时已经调整了。
但是,为了确保安全,我们可以在`create_message`中检查图像是否为None,如果是则用黑图代替。
考虑到这些,我们修改代码:
步骤:
1. 添加`load_image`辅助方法。
2. 在初始化和重置时使用`load_image`加载图像。
3. 在`create_message`方法中,移除调整图像尺寸的代码(因为已经调整过),但添加对None的检查。
修改后的`create_message`方法:
```python
def create_message(self):
# 如果图像为None,则用黑图代替
if self.head_img is None:
self.head_img = np.zeros(RobotObsShape.HEAD_IMAGE_SHAPE, dtype=np.uint8)
if self.wrist_img is None:
self.wrist_img = np.zeros(RobotObsShape.WRIST_IMAGE_SHAPE, dtype=np.uint8)
# 创建消息缓冲区
buffer = bytearray(RobotObsShape.CHUNK_SIZE)
# 填充头部图像数据
head_bytes = self.head_img.tobytes()
buffer[:len(head_bytes)] = head_bytes
# 填充腕部图像数据
start = RobotObsShape.HEAD_IMAGE_SIZE
end = start + RobotObsShape.WRIST_IMAGE_SIZE
wrist_bytes = self.wrist_img.tobytes()
buffer[start:end] = wrist_bytes
# 填充关节角度数据
start = end
end = start + RobotObsShape.STATE_SIZE
buffer[start:end] = self.joint_angles.tobytes()
return bytes(buffer)
```
这样,即使图像为None,我们也有一个备选方案。
但是,最好的做法是在加载图像时就确保不是None,所以我们在加载图像时已经处理了。
因此,我们主要修改初始化和重置时的图像加载方式。
另外,在您提供的代码中,`reset_state`方法被注释掉了状态锁,因为您去掉了状态锁。所以,我们按照无锁的方式写。
现在,我们整合修改后的代码。由于您提供的代码较长,我将在关键部分进行修改。
注意:在您提供的代码中,初始化时加载图像的路径为`/dataset_ori/imgs/0_0.jpg`,请确保这个路径是正确的,并且程序有权限访问。
如果问题仍然存在,请检查该路径下是否存在`0_0.jpg`文件。
下面我们给出修改后的完整代码(仅修改了相关部分):</think>这个错误表明在尝试访问 `self.head_img.shape` 时,`self.head_img` 变量是 `None`。这通常发生在图像加载失败时,`cv2.imread()` 函数在无法读取指定路径的图像文件时会返回 `None`。
### 问题分析
错误发生在以下代码行:
```python
if self.head_img.shape != RobotObsShape.HEAD_IMAGE_SHAPE:
```
根本原因是:
1. 图像路径 `/dataset_ori/imgs/0_0.jpg` 不存在或不可访问
2. 图像文件损坏或格式不支持
3. OpenCV 无法读取该图像文件
### 解决方案
以下是修复此问题的完整代码方案:
```python
import cv2
import numpy as np
import zmq
import time
import os
import glob
import logging
from enum import Enum
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler("robot_client.log")
]
)
logger = logging.getLogger("RobotClient")
# 定义机器人观测数据结构
class RobotObsShape:
HEAD_IMAGE_SHAPE = (480, 640, 3)
HEAD_IMAGE_SIZE = HEAD_IMAGE_SHAPE[0] * HEAD_IMAGE_SHAPE[1] * HEAD_IMAGE_SHAPE[2]
WRIST_IMAGE_SHAPE = (480, 640, 3)
WRIST_IMAGE_SIZE = WRIST_IMAGE_SHAPE[0] * WRIST_IMAGE_SHAPE[1] * WRIST_IMAGE_SHAPE[2]
STATE_SHAPE = (7,)
STATE_SIZE = STATE_SHAPE[0] * 4 # float32占4字节
CHUNK_SIZE = HEAD_IMAGE_SIZE + WRIST_IMAGE_SIZE + STATE_SIZE
DOF = 7 # 自由度
NUM_ACTIONS = 8 # 动作数量
ACTIONS_SHAPE = (NUM_ACTIONS, DOF)
ACTIONS_SIZE = NUM_ACTIONS * DOF * 4 # 总动作数据大小
# 机器人状态枚举
class RobotState(str, Enum):
IDLE = "IDLE"
INITIALIZING = "INITIALIZING"
ACTING = "ACTING"
FINISHED = "FINISHED"
class Client:
def __init__(self, port=15558, image_dir="/dataset_ori/imgs"):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(f"tcp://192.168.2.211:{port}")
self.socket.RCVTIMEO = 1000 # 设置接收超时为1秒
self.running = True
self.state = RobotState.ACTING
# 初始关节角度
self.initial_joints = np.array([
244.02, 39.33, 17.21, 291.47, 119.56, 75.05, 0.0
], dtype=np.float32)
self.joint_angles = self.initial_joints.copy()
# 尝试加载图像
self.image_dir = image_dir
self.head_img = self.load_image_or_default()
self.wrist_img = self.load_image_or_default()
# 创建显示窗口
cv2.namedWindow("Head Camera", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Head Camera", 800, 600)
logger.info(f"Client connected to port {port}")
def load_image_or_default(self, path=None):
"""加载图像,如果失败则创建默认图像"""
if path is None:
# 尝试在目录中查找图像
image_files = glob.glob(os.path.join(self.image_dir, "*.jpg"))
if image_files:
path = image_files[0]
else:
logger.warning(f"No images found in directory: {self.image_dir}")
return self.create_default_image()
try:
img = cv2.imread(path)
if img is None:
raise ValueError(f"Failed to load image: {path}")
# 检查图像尺寸
if img.shape != RobotObsShape.HEAD_IMAGE_SHAPE:
img = cv2.resize(img,
(RobotObsShape.HEAD_IMAGE_SHAPE[1],
RobotObsShape.HEAD_IMAGE_SHAPE[0]))
logger.info(f"Successfully loaded image: {path}")
return img
except Exception as e:
logger.error(f"Error loading image: {e}")
return self.create_default_image()
def create_default_image(self):
"""创建默认测试图像"""
logger.info("Creating default test image")
height, width = RobotObsShape.HEAD_IMAGE_SHAPE[:2]
img = np.zeros((height, width, 3), dtype=np.uint8)
for i in range(height):
for j in range(width):
img[i, j] = [
int(255 * j / width), # 红色通道:从左到右渐变
int(255 * i / height), # 绿色通道:从上到下渐变
150 # 蓝色通道:固定值
]
return img
def create_message(self):
"""创建符合协议的消息字节流"""
# 确保图像有效
if self.head_img is None:
logger.error("Head image is None, creating default")
self.head_img = self.create_default_image()
if self.wrist_img is None:
logger.error("Wrist image is None, creating default")
self.wrist_img = self.create_default_image()
# 创建消息缓冲区
buffer = bytearray(RobotObsShape.CHUNK_SIZE)
# 填充头部图像数据
head_bytes = self.head_img.tobytes()
buffer[:len(head_bytes)] = head_bytes
# 填充腕部图像数据
start = RobotObsShape.HEAD_IMAGE_SIZE
end = start + RobotObsShape.WRIST_IMAGE_SIZE
wrist_bytes = self.wrist_img.tobytes()
buffer[start:end] = wrist_bytes
# 填充关节角度数据
start = end
end = start + RobotObsShape.STATE_SIZE
buffer[start:end] = self.joint_angles.tobytes()
return bytes(buffer)
def parse_actions(self, action_bytes):
"""解析接收到的动作数据"""
actions = np.frombuffer(action_bytes, dtype=np.float32)
return actions.reshape(RobotObsShape.ACTIONS_SHAPE)
def reset_state(self):
"""重置机器人状态"""
logger.info("Resetting state")
self.head_img = self.load_image_or_default()
self.wrist_img = self.load_image_or_default()
self.joint_angles = self.initial_joints.copy()
self.state = RobotState.ACTING
logger.info("State reset complete")
def execute_action(self, action):
"""执行单个动作"""
# 更新关节角度
self.joint_angles[:6] = action[:6]
# 更新夹爪状态
gripper_value = action[6]
self.joint_angles[6] = 1.0 if gripper_value >= 0.5 else 0.0
# 检查夹爪是否关闭
if gripper_value >= 0.5:
self.state = RobotState.FINISHED
logger.info("Gripper closed. Transitioning to FINISHED state.")
return True
return False
def update_display(self, iteration):
"""更新显示窗口"""
if self.head_img is None:
logger.warning("Head image is None, cannot update display")
return
display_img = self.head_img.copy()
# 添加状态信息
status_text = f"Iteration: {iteration} | State: {self.state.value}"
cv2.putText(display_img, status_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# 添加关节角度信息
y_pos = 80
for i, angle in enumerate(self.joint_angles[:6]):
joint_text = f"Joint {i+1}: {angle:.2f}°"
cv2.putText(display_img, joint_text, (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 200, 255), 1)
y_pos += 30
# 添加夹爪状态
gripper_state = "CLOSED" if self.joint_angles[6] >= 0.5 else "OPEN"
gripper_text = f"Gripper: {gripper_state}"
cv2.putText(display_img, gripper_text, (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 200, 255), 1)
# 添加操作提示
cv2.putText(display_img, "Press 'q' to quit, 'r' to reset",
(20, display_img.shape[0]-30),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# 特殊状态提示
if self.state == RobotState.FINISHED:
cv2.putText(display_img, "TASK COMPLETED",
(display_img.shape[1]//2-150, display_img.shape[0]//2),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.imshow("Head Camera", display_img)
def run(self):
"""主循环:发送观察数据并接收动作"""
iteration = 0
try:
while self.running:
# 更新显示
self.update_display(iteration)
# 处理按键事件
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
self.running = False
break
elif key == ord('r'):
self.reset_state()
logger.info("State reset and set to ACTING")
# 状态处理
if self.state == RobotState.INITIALIZING:
self.reset_state()
time.sleep(0.1)
elif self.state == RobotState.ACTING:
iteration += 1
logger.info(f"Sending observation #{iteration}...")
try:
message = self.create_message()
self.socket.send(message)
# 接收动作响应
try:
action_bytes = self.socket.recv()
except zmq.Again:
logger.warning("Receive timeout, retrying...")
continue
# 解析动作序列
actions = self.parse_actions(action_bytes)
logger.info(f"Received {len(actions)} actions")
# 执行动作序列
for i, action in enumerate(actions):
logger.info(f"Executing action {i+1}: Joints={action[:6]}, Gripper={action[6]:.2f}")
gripper_closed = self.execute_action(action)
# 如果夹爪关闭,停止执行剩余动作
if gripper_closed:
break
except Exception as e:
logger.error(f"Error in ACTING state: {e}")
time.sleep(0.5) # 避免过于频繁的错误
elif self.state == RobotState.FINISHED:
# 任务完成状态
time.sleep(0.1)
except Exception as e:
logger.critical(f"Critical error in main loop: {e}")
finally:
# 资源清理
cv2.destroyAllWindows()
self.socket.close()
self.context.term()
logger.info("Client shutdown complete.")
if __name__ == "__main__":
try:
# 尝试从环境变量获取图像目录
image_dir = os.environ.get("ROBOT_IMAGE_DIR", "/dataset_ori/imgs")
client = Client(image_dir=image_dir)
client.run()
except KeyboardInterrupt:
print("Interrupted by user")
except Exception as e:
print(f"Fatal error: {e}")
finally:
print("Exit complete")
```
### 主要改进说明:
1. **健壮的错误处理**:
- 添加了全面的异常处理机制
- 使用 Python 的 `logging` 模块记录错误和状态信息
- 在关键操作前添加了空值检查
2. **图像加载优化**:
```python
def load_image_or_default(self, path=None):
"""加载图像,如果失败则创建默认图像"""
if path is None:
# 尝试在目录中查找图像
image_files = glob.glob(os.path.join(self.image_dir, "*.jpg"))
if image_files:
path = image_files[0]
else:
logger.warning(f"No images found in directory: {self.image_dir}")
return self.create_default_image()
try:
img = cv2.imread(path)
if img is None:
raise ValueError(f"Failed to load image: {path}")
...
except Exception as e:
logger.error(f"Error loading image: {e}")
return self.create_default_image()
```
- 自动搜索指定目录中的图像文件
- 加载失败时自动创建默认图像
- 记录详细的错误信息
3. **默认图像创建**:
```python
def create_default_image(self):
"""创建默认测试图像"""
logger.info("Creating default test image")
height, width = RobotObsShape.HEAD_IMAGE_SHAPE[:2]
img = np.zeros((height, width, 3), dtype=np.uint8)
# 创建彩色渐变图像...
return img
```
- 当无法加载图像时创建彩色渐变图像
- 确保程序不会因图像缺失而崩溃
4. **配置灵活性**:
- 通过环境变量 `ROBOT_IMAGE_DIR` 设置图像目录
- 默认回退到 `/dataset_ori/imgs`
- 支持多种图像格式(`*.jpg`)
5. **日志记录**:
```python
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler("robot_client.log")
]
)
```
- 同时输出到控制台和日志文件
- 包含时间戳和日志级别
- 帮助诊断运行时问题
6. **空值检查**:
```python
def create_message(self):
# 确保图像有效
if self.head_img is None:
logger.error("Head image is None, creating default")
self.head_img = self.create_default_image()
...
```
- 在访问图像属性前检查是否为 None
- 自动创建默认图像避免崩溃
### 使用说明:
1. **设置环境变量**(可选):
```bash
export ROBOT_IMAGE_DIR=/path/to/your/images
python hdx_obs_output_socket.py
```
2. **日志查看**:
- 控制台实时输出日志
- 详细日志保存在 `robot_client.log` 文件中
3. **键盘控制**:
- `q`:退出程序
- `r`:重置机器人状态
### 错误排查建议:
1. 检查图像目录是否存在:
```python
print(os.path.exists("/dataset_ori/imgs"))
```
2. 检查目录中是否有图像文件:
```python
print(glob.glob("/dataset_ori/imgs/*.jpg"))
```
3. 检查文件权限:
```bash
ls -l /dataset_ori/imgs
```
4. 尝试使用绝对路径:
```python
client = Client(image_dir="/absolute/path/to/images")
```
###
阅读全文
相关推荐

















