写一个串口读数据,并用波形图显示的程序,基于pyqt5、pyqtgraph、python,包括串口打开、串口关闭、串口收发数据、按时间显示收到的数据的波形图
时间: 2025-04-18 19:53:46 浏览: 47
<think>好的,我现在需要帮助用户解决如何使用PyQt5和pyqtgraph编写一个实现串口通信和实时波形显示的程序。首先,我得理清用户的需求。用户希望有一个完整的示例程序,涵盖串口的打开、关闭、数据收发,以及用波形图实时展示数据。他们可能在做某种数据采集或监控系统,比如传感器数据的实时显示。
接下来,我需要考虑如何组织这个程序的结构。通常,这样的应用需要GUI界面,包括串口配置的输入框、按钮来控制打开和关闭,以及一个显示波形的区域。PyQt5用于构建界面,pyqtgraph负责绘图,而pyserial处理串口通信。
首先,界面设计部分。用户需要选择串口号、波特率等参数,所以需要QComboBox或QLineEdit来输入这些信息。然后,打开/关闭按钮可以用QPushButton实现,状态栏显示当前连接状态。数据接收显示可能用QTextEdit,而波形图部分则用pyqtgraph的PlotWidget。
然后是串口通信部分。使用pyserial库,当用户点击打开按钮时,程序根据输入的参数初始化串口连接。需要注意异常处理,比如无效的端口或波特率。串口数据的读取需要异步处理,避免阻塞主界面。可能要用QThread或者Qt的定时器来轮询串口数据。
数据接收和处理方面,串口返回的是字节数据,需要解码成字符串,可能还要根据协议解析出数值。例如,假设数据格式是“value1,value2\n”,需要分割字符串并转换为浮点数。如果数据不完整或格式错误,需要处理异常情况。
实时波形显示是关键部分。pyqtgraph适合高性能绘图。需要维护一个数据缓冲区,定期更新到图表上。可以使用PlotWidget的plot方法,或者更高效地使用setData方法更新数据。考虑到实时性,可能需要定时器定时刷新图表,比如每隔50毫秒更新一次。
线程处理很重要,因为串口读取是阻塞操作。如果在主线程中执行,会导致界面卡顿。所以应该创建一个工作线程来处理串口通信,通过信号槽机制与主界面通信。例如,当接收到数据时,线程发出信号,主界面更新数据和图表。
信号与槽的连接需要正确设置。比如,点击打开按钮触发连接串口的函数,关闭按钮断开连接。数据接收后,触发更新文本框和波形图的函数。还要处理窗口关闭事件,确保串口正确关闭。
异常处理也不能忽视。比如,串口无法打开时弹出警告框,数据解析错误时忽略或提示用户。此外,串口断开或意外错误时的处理,比如自动关闭连接并更新界面状态。
测试部分,用户可能需要发送特定格式的数据,比如逗号分隔的数值,以测试波形显示是否正确。可能需要模拟数据发送,比如使用虚拟串口工具,或者从文件读取数据测试。
最后,整合所有部分,确保代码结构清晰,注释充分,方便用户理解和修改。可能需要参考现有的示例代码,结合用户提供的引用内容中的片段,比如引用[1]中的串口连接和引用[2]中的模块导入。
现在,我需要检查是否有遗漏的部分,比如UI组件的布局、线程的安全退出、资源释放等。同时,确保代码符合Python和PyQt的最佳实践,比如使用面向对象的结构,将主窗口作为类,信号处理作为方法。
总结步骤:设计界面,初始化串口,处理数据,更新图表,处理线程,异常处理,测试验证。现在可以开始编写示例代码,并解释关键部分,确保用户能够理解并应用。</think>以下是基于PyQt5和pyqtgraph实现串口通信与波形显示的完整解决方案:
```python
import sys
import serial
import pyqtgraph as pg
from PyQt5.QtCore import QTimer, Qt, QThread, pyqtSignal
from PyQt5.QtWidgets import (QApplication, QWidget, QVBoxLayout,
QHBoxLayout, QPushButton, QLabel,
QComboBox, QLineEdit, QTextEdit)
class SerialThread(QThread):
data_received = pyqtSignal(bytes) # 原始数据信号
def __init__(self, port, baudrate):
super().__init__()
self.ser = serial.Serial()
self.ser.port = port
self.ser.baudrate = baudrate
self.running = True
def run(self):
try:
self.ser.open()
while self.running and self.ser.is_open:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting)
self.data_received.emit(data)
except Exception as e:
print("Serial error:", str(e))
finally:
if self.ser.is_open:
self.ser.close()
def stop(self):
self.running = False
class MainWindow(QWidget):
def __init__(self):
super().__init__()
self.serial_thread = None
self.data_buffer = []
self.init_ui()
self.init_plot()
def init_ui(self):
# 串口配置区域
self.port_combo = QComboBox()
self.port_combo.addItems(['COM3', 'COM4', 'COM5']) # 常用端口
self.baudrate_combo = QComboBox()
self.baudrate_combo.addItems(['9600', '115200', '57600'])
self.open_btn = QPushButton("打开端口")
self.close_btn = QPushButton("关闭端口")
self.close_btn.setEnabled(False)
# 波形控制
self.plot_timer = QTimer()
self.plot_timer.setInterval(50) # 50ms刷新周期
# 数据显示区域
self.recv_text = QTextEdit()
self.recv_text.setReadOnly(True)
# 布局设置
config_layout = QHBoxLayout()
config_layout.addWidget(QLabel("端口:"))
config_layout.addWidget(self.port_combo)
config_layout.addWidget(QLabel("波特率:"))
config_layout.addWidget(self.baudrate_combo)
config_layout.addWidget(self.open_btn)
config_layout.addWidget(self.close_btn)
main_layout = QVBoxLayout()
main_layout.addLayout(config_layout)
main_layout.addWidget(self.recv_text)
# 信号连接
self.open_btn.clicked.connect(self.open_serial)
self.close_btn.clicked.connect(self.close_serial)
self.plot_timer.timeout.connect(self.update_plot)
self.setLayout(main_layout)
self.setWindowTitle("串口波形监控")
self.resize(800, 600)
def init_plot(self):
self.plot_widget = pg.PlotWidget()
self.plot_curve = self.plot_widget.plot(pen='y')
self.layout().addWidget(self.plot_widget)
def open_serial(self):
port = self.port_combo.currentText()
baudrate = int(self.baudrate_combo.currentText())
self.serial_thread = SerialThread(port, baudrate)
self.serial_thread.data_received.connect(self.handle_data)
self.serial_thread.start()
self.open_btn.setEnabled(False)
self.close_btn.setEnabled(True)
self.plot_timer.start()
def close_serial(self):
if self.serial_thread:
self.serial_thread.stop()
self.serial_thread.wait()
self.open_btn.setEnabled(True)
self.close_btn.setEnabled(False)
self.plot_timer.stop()
def handle_data(self, data):
try:
decoded = data.decode('utf-8')
self.recv_text.append(decoded.strip())
# 假设数据格式为: "value1,value2,...\n"
values = [float(x) for x in decoded.strip().split(',') if x]
self.data_buffer.extend(values)
# 保持最新500个数据点
if len(self.data_buffer) > 500:
self.data_buffer = self.data_buffer[-500:]
except Exception as e:
print("数据处理错误:", str(e))
def update_plot(self):
if len(self.data_buffer) > 0:
x = list(range(len(self.data_buffer)))
self.plot_curve.setData(x, self.data_buffer)
def closeEvent(self, event):
self.close_serial()
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
```
### 关键功能说明
1. **多线程架构**
- 使用`QThread`创建独立的串口线程,避免阻塞主界面[^1]
- 通过`pyqtSignal`实现线程间通信
2. **数据解析**
- 假设接收数据格式为逗号分隔的数值字符串
- 示例数据格式:`"23.5,18.7\n"`
- 支持动态扩展多个数据通道(需修改数据处理逻辑)
3. **波形显示优化**
- 使用`pyqtgraph`的`setData()`方法实现高效刷新
- 50ms定时刷新平衡性能与实时性
- 自动保持最新500个数据点
4. **异常处理**
- 包含串口打开失败保护
- 数据解码错误捕获
- 窗口关闭时自动释放资源
阅读全文
相关推荐


















