# -*- coding: utf-8 -*-
import sys
from pyqtgraph.Qt import QtCore
import numpy as np
import pyqtgraph as pg
import threading, time
import serial
import crcmod
from collections import deque
import copy
# 创建一个图形布局小部件
win = pg.GraphicsLayoutWidget(show=True, size=(900, 600), title="谐振腔波形")
# label = pg.LabelItem(justify='center')
label = pg.LabelItem(justify='left')
win.addItem(label)
p = win.addPlot(title="谐振腔波形", row=1, col=0)
p.setXRange(0, 127, padding=0)
p.setYRange(0, 4100, padding=0)
p.showGrid(x=True, y=True, alpha=1)
# curve1 = p.plot(pen=pg.mkPen("r",width=2),name="y1") #设置pen 格式
curve = p.plot(pen=pg.mkPen("g", width=2))
l_curve = p.plot(pen=pg.mkPen("y", width=4), )
t_curve = p.plot(pen=pg.mkPen("y", width=4))
# x_data = np.arange(128)
x_data = []
y_data = []
ptr = 0
F0 = 0
U0 = 0
tp_scatter = pg.ScatterPlotItem(size=10, brush=pg.mkBrush(255, 255, 255, 255))
p.addItem(tp_scatter)
lp_scatter = pg.ScatterPlotItem(size=10, brush=pg.mkBrush(255, 255, 255, 255))
p.addItem(lp_scatter)
rp_scatter = pg.ScatterPlotItem(size=10, brush=pg.mkBrush(255, 255, 255, 255))
p.addItem(rp_scatter)
def find_nearest(array, value):
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return array[idx], idx
def gaussian(x, *param):
return param[0] * np.exp(-np.power(x - param[2], 2.) / (2 * np.power(param[4], 2.))) + \
param[1] * np.exp(-np.power(x - param[3], 2.) / (2 * np.power(param[5], 2.)))
fit_f = 0;
fit_u = 0;
avg_f = 0;
fcnt = 0;
sum_f = 0
top_index = 0
def update():
global curve, data, ptr, p, y_data, l_curve, t_curve, scatter, fit_curve, fit_f, fit_u, avg_f, fcnt, top_index, U0
# curve.setData(data[ptr % 10])
if len(y_data) == 128:
curve.setData(y_data)
index = y_data.index(max(y_data))
U0 = y_data[index]
top_index = index
tx = []
tu = []
tx.append(index)
tu.append(U0)
print("跳频顶点:{} 拟合顶点f:{},u:{}".format(top_index, fit_f, fit_u))
tp_scatter.setData(tx, tu)
# print("tf={:.1f} tu={} lf={:.1f} lu={} rf={:.1f} ru={}".format(F0,U0,LF,LU,RF,RU))
# if fit_f != 0:
label.setText(
"<span style='font-size: 16pt'><span style='color: yellow'>TF= %.3d TU=%.4d \n</span>"
" <span style='font-size:22pt'><span style='color: yellow'>avg_f= %.4d fit_f = %.4d fit u=%.4d \n</span> "
% (
top_index, U0,
avg_f, fit_f , fit_u))
# 定时器相关配置
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(1)
class SerThread:
def __init__(self, Port=0, Baudrate=115200):
# 初始化串口、blog文件名称
self.my_serial = serial.Serial()
self.my_serial.port = Port
self.my_serial.baudrate = Baudrate
self.my_serial.timeout = 1
self.alive = False
self.waitEnd = None
self.q_ser = deque()
def waiting(self):
# 等待event停止标志
if not self.waitEnd is None:
self.waitEnd.wait()
def start(self):
# 开串口以及blog文件
self.my_serial.open()
if self.my_serial.isOpen():
self.waitEnd = threading.Event()
self.alive = True
self.thread_decode = threading.Thread(target=self.decode_thread)
self.thread_decode.setDaemon(True)
self.thread_decode.start()
self.thread_read = threading.Thread(target=self.Reader)
self.thread_read.setDaemon(True)
self.thread_read.start()
return True
else:
return False
def Reader(self):
while self.alive:
try:
n = self.my_serial.inWaiting()
data = ''
if n:
data = self.my_serial.read(n)
self.q_ser.extend(data)
# l = [hex(int(i)) for i in data]
# print(" ".join(l))
except Exception as ex:
print(ex)
self.waitEnd.set()
self.alive = False
def decode_thread(self):
global x_data, y_data
while self.alive:
try:
while len(self.q_ser):
data1 = ''
data2 = ''
data1 = self.q_ser.popleft()
data2 = self.q_ser.popleft()
if data2 == 0xfc and data1 == 0xcf:
while len(self.q_ser) < 129 * 2:
pass
i = 0
ls_data = []
crc_data = []
while (i < 129 * 2):
i = i + 2
data1 = self.q_ser.popleft()
data2 = self.q_ser.popleft()
data = (data1 << 8) + data2;
ls_data.append(data)
crc_data.append(data1)
crc_data.append(data2)
assert (len(ls_data) == 129)
crc = crc_data[128 * 2]
crc2 = crc_data[128 * 2 +1]
p = bytes(crc_data[0:256])
crc16 = crcmod.predefined.Crc('crc-16-dnp')
crc16.update(bytes(p))
crc16v = crc16.crcValue
if crc2*256+crc == crc16v:
y_data.clear()
y_data = copy.deepcopy(ls_data[0:128])
elif data1 == 0xc3 or data2 == 0xc3:
crc_data = []
d1 = 0
if data1 == 0xc3:
d1 = data2
elif data2 == 0xc3:
d1 = self.q_ser.popleft()
crc_data.append(d1)
d2 = self.q_ser.popleft()
crc_data.append(d2)
d3 = self.q_ser.popleft()
crc_data.append(d3)
d4 = self.q_ser.popleft()
crc_data.append(d4)
d5 = self.q_ser.popleft()
p = bytes(crc_data[0:4])
crc8 = crcmod.predefined.Crc('crc-8-maxim')
crc8.update(bytes(p))
if d5 == d1 ^ d2 ^ d3 ^ d4:
# if d5 == 0xff:
global fit_f, fit_u, sum_f, avg_f, fcnt
fit_f = (d1 << 8) + d2
fit_u = (d3 << 8) + d4
sum_f = fit_f + sum_f
fcnt = fcnt + 1
if fcnt == 4:
fcnt = 0
avg_f = sum_f / 4
sum_f = 0
except Exception as ex:
print(ex)
self.waitEnd.set()
self.alive = False
def Sender(self):
while self.alive:
try:
snddata = input("input data:\n")
self.my_serial.write(snddata.encode('utf-8'))
print('sent' + ' ' + time.strftime("%Y-%m-%d %X"))
print(snddata, file=self.sfile)
except Exception as ex:
print(ex)
self.waitEnd.set()
self.alive = False
def stop(self):
self.alive = False
# self.thread_read.join()
# self.thread_send.join()
if self.my_serial.isOpen():
self.my_serial.close()
if __name__ == '__main__':
print(len(sys.argv))
if (len(sys.argv) != 3):
print("输入端口号和波特率")
else:
port = sys.argv[1]
baud = sys.argv[2]
# ser = SerThread('COM3',471153)