python串口绘图CUSTOM协议
时间: 2025-04-17 19:51:21 浏览: 25
### 实现Python串口通信接收CUSTOM协议数据并绘制图形
为了实现这一目标,可以采用`pyserial`库处理串口通信,并利用`matplotlib`库实现实时绘图。考虑到串口大量数据传输可能导致的丢包问题[^1],建议优化串口读取机制。
#### 选择合适的串口读取方法
对于串口通信中的大量数据流,在Qt环境中使用`readyRead()`信号配合`readAll()`函数可能会因为频繁触发而影响性能和准确性。而在Python环境下,则可以通过设置合理的超时时间和缓冲区大小来缓解此问题:
```python
import serial
ser = serial.Serial(
port='COM3', # 替换成实际使用的端口号
baudrate=9600, # 设置波特率
timeout=0.5 # 设定非阻塞模式下的等待时间
)
```
#### 解析CUSTOM协议
假设CUSTOM协议具有特定帧结构(起始位、长度字段、命令字节等),则需编写解析逻辑以提取有效负载部分用于后续处理。这里提供了一个简化版的例子说明如何识别完整的消息帧:
```python
def parse_custom_protocol(data_chunk):
frames = []
buffer = bytearray()
for byte in data_chunk:
buffer.append(byte)
if len(buffer) >= MIN_FRAME_SIZE and is_valid_frame(buffer): # 自定义校验函数is_valid_frame
frame = extract_payload_from_buffer(buffer) # 提取消息体
frames.append(frame)
# 清除已处理过的字节数组
del buffer[:len_of_consumed_bytes]
return frames
```
#### 数据存储与更新策略
为了避免因连续不断的写入操作造成文件I/O瓶颈或内存溢出风险,推荐采取循环队列的方式保存最新N条记录供图表展示之用;当新样本到来时自动覆盖最早一条旧纪录的位置即可。
#### 动态绘图方案
针对实时性较高的应用场景,可考虑借助于`matplotlib.animation.FuncAnimation`类创建动画效果,它能够按照指定的时间间隔调用给定的方法刷新视窗内的图像内容。下面给出了一段基于上述思路构建的完整代码片段作为参考:
```python
from collections import deque
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
class RealtimePlotter(object):
def __init__(self, max_points=100):
self.max_points = max_points
self.x_data = deque([None]*max_points, maxlen=max_points)
self.y_data = deque([None]*max_points, maxlen=max_points)
fig, ax = plt.subplots(figsize=(8, 4))
line, = ax.plot([], [], lw=2)
self.line = line
self.ax = ax
ani = FuncAnimation(fig, self.update_line, interval=UPDATE_INTERVAL_MS,
init_func=self.init_plotting_area)
plt.show()
def update_line(self, i):
try:
raw_data = ser.read_all() # 非阻塞性读取可用字符
parsed_frames = parse_custom_protocol(raw_data)
latest_value = get_latest_measurement(parsed_frames)[-1] # 假设返回的是一个数值列表
current_time = time.time()
self.x_data.append(current_time)
self.y_data.append(latest_value)
self.line.set_xdata(list(self.x_data))
self.line.set_ydata(list(self.y_data))
self.adjust_viewport_limits()
except Exception as e:
print(f"Error during updating plot: {e}")
def adjust_viewport_limits(self):
"""调整坐标轴范围"""
xmin, xmax = min(self.x_data), max(self.x_data)
ymin, ymax = min(filter(None.__ne__, self.y_data)), \
max(filter(None.__ne__, self.y_data))
margin_factor = 0.1
self.ax.set_xlim(xmin-(xmax-xmin)*margin_factor,
xmax+(xmax-xmin)*margin_factor)
self.ax.set_ylim(ymin-(ymax-ymin)*margin_factor,
ymax+(ymax-ymin)*margin_factor)
if __name__ == '__main__':
from datetime import timedelta
UPDATE_INTERVAL_MS = int(timedelta(seconds=.1).total_seconds()*1000)
rtp = RealtimePlotter(max_points=MAX_POINTS_TO_DISPLAY)
```
阅读全文
相关推荐









