活动介绍

详细解释total_up_1 = np.array([0, 130, 80]) total_down_1 = np.array([179, 255, 255]) total_mask_1 = cv2.inRange(hsv_roi, total_up_1, total_down_1)

时间: 2024-03-18 10:44:57 浏览: 76
这段代码的作用是基于HSV颜色空间,通过设定颜色的上下阈值来创建一个掩膜(mask)。 - np.array([0, 130, 80]) 表示一个包含3个元素的数组,分别是色相(H)、饱和度(S)和亮度(V)的阈值上限。 - np.array([179, 255, 255]) 表示一个包含3个元素的数组,分别是色相(H)、饱和度(S)和亮度(V)的阈值下限。 - cv2.inRange() 函数用于根据设定的上下阈值,创建一个掩膜。 hsv_roi 是输入图像,它是在前面对原始图像进行 cv2.cvtColor() 函数调用后得到的 HSV 彩色图像。total_up_1 和 total_down_1 是设定的颜色阈值,用于确定需要提取的颜色范围。 函数 cv2.inRange() 根据上下颜色阈值,创建一个掩膜,其中像素值为 0 的地方表示不属于设定的颜色范围,像素值为 255 的地方表示属于设定的颜色范围。这个掩膜可以用于去除图像中不需要的颜色区域,或者仅保留特定颜色区域。在这个例子中,生成的掩膜图像 total_mask_1 将只包含HSV颜色空间中满足上述阈值条件的像素点。
相关问题

解释# HSV范围1 total_up_1 = np.array([0, 130, 80]) total_down_1 = np.array([179, 255, 255]) total_mask_1 = cv2.inRange(hsv_roi, total_up_1, total_down_1)

这段代码是使用 OpenCV 库中的 cv2.inRange() 函数创建了一个掩膜(mask),用于过滤掉一张图像中不在特定颜色范围内的像素。这里的颜色范围是基于 HSV(色相、饱和度、亮度)颜色空间的。HSV 颜色空间将颜色表示为三个维度的值:色相、饱和度和亮度。 在这里,total_up_1 和 total_down_1 是两个包含了 HSV 范围的 numpy 数组,它们定义了所需的颜色范围。具体来说,total_up_1 包含了最高的 H(色相)值、中等的 S(饱和度)值和低的 V(亮度)值,而 total_down_1 包含了最低的 H 值、最高的 S 值和最高的 V 值。这个颜色范围包括了从 0 到 130 的所有 H 值,从 80 到 255 的所有 S 值和从 0 到 179 的所有 V 值。 cv2.inRange() 函数将使用这个颜色范围来创建一个掩膜(total_mask_1),其中所有像素的值都是 0 或 255。具体来说,如果某个像素的 HSV 值落在所需的颜色范围内,那么它在掩膜中的值就是 255,否则就是 0。这个掩膜可以与原始图像进行按位与操作来过滤掉不需要的像素。

解释 total_up_1 = np.array([0, 130, 80]) total_down_1 = np.array([179, 255, 255]) total_mask_1 = cv2.inRange(hsv_roi, total_up_1, total_down_1)中的三个数值

这三个数值是用来定义颜色范围的。 total_up_1和total_down_1是颜色的上下限,它们被定义为包含三个数字的numpy数组。这些数字是HSV颜色空间中的值,分别表示色调(H),饱和度(S)和亮度(V)。在这里,total_up_1 = np.array([0, 130, 80])表示最低的色调为0,最低的饱和度为130,最低的亮度为80。total_down_1 = np.array([179, 255, 255])表示最高的色调为179,最高的饱和度为255,最高的亮度为255。 total_mask_1是通过使用cv2.inRange函数,将hsv_roi图像中的像素值与total_up_1和total_down_1进行比较,得到的二进制掩模。在这个掩模中,所有符合颜色范围的像素点被标记为白色(255),不符合颜色范围的像素点被标记为黑色(0)。
阅读全文

相关推荐

def read_data(pkl_path, loc): with open(pkl_path, 'rb') as file: data = pickle.load(file) pos_data, neg_data = [], [] for d in data: if d['label'] == 0: pos_data.append(d[loc]) else: neg_data.append(d[loc]) pos_data = torch.stack(pos_data).float() neg_data = torch.stack(neg_data).float() # assert pos_data.shape[0] == neg_data.shape[0] return pos_data, neg_data def read_data_all(pkl_path): with open(pkl_path, 'rb') as file: data = pickle.load(file) pos_data, neg_data = [], [] for d in data: if d['label'] == 0: pos_data.append(d) else: neg_data.append(d) return pos_data, neg_data def train_val_split(pos_data, neg_data, num_shot=10): total_num = pos_data.shape[0] train_num = int(num_shot) if num_shot != -1 else total_num val_num = total_num - train_num train_idx = random.sample(range(total_num), train_num) val_idx = list(set(range(total_num)) - set(train_idx)) x_train = torch.cat([pos_data[train_idx], neg_data[train_idx]], dim=0) y_train = torch.cat([torch.zeros(train_num), torch.ones(train_num)]) x_val = torch.cat([pos_data[val_idx], neg_data[val_idx]], dim=0) y_val = torch.cat([torch.zeros(val_num), torch.ones(val_num)]) return x_train, y_train, x_val, y_val def mix_data_fitting(x_train, y_train, model_name='lr'): model_list = [] for head in range(x_train.shape[1]): # 32 * 32 = 1024 if model_name == 'lda': model = LinearDiscriminantAnalysis() elif model_name == 'lr': model = LogisticRegression(penalty='l2') elif model_name == 'sgd': model = SGDClassifier(loss='log_loss') else: raise NotImplementedError model.fit(x_train[:, head], y_train) model_list.append(model) return model_list # def data_fitting_opt(x_train): # solutions = [] # pos_data = x_train[:len(x_train) // 2] # neg_data = x_train[len(x_train) // 2:] # for head in range(x_train.shape[1]): # def objective(x,u1,u2): # s12 = np.dot(np.transpose(u1-u2),u1-u2) # s1 = np.dot(np.transpose(u1),u1) # s2 = np.dot(np.transpose(u2),u2) # np12 = np.dot(np.dot(x,s12),np.transpose(x)) # np1 = np.sqrt(np.dot(np.dot(x,s1),np.transpose(x))) # np2 = np.sqrt(np.dot(np.dot(x,s2),np.transpose(x))) # down = np12 + (np1 + np2)**2 # return -np12/down import numpy as np def objective(x, u1, u2): # Number of samples n = u1.shape[0] u1 = np.array(u1) u2 = np.array(u2) # Compute means mu_x = np.mean(u1, axis=0) # Shape: (128,) mu_y = np.mean(u2, axis=0) # Shape: (128,) # Numerator: |x^T (mu_x - mu_y)|^2 diff_mean = mu_x - mu_y numerator = (np.dot(x, diff_mean))**2 #/ (n**2) # Scalar # Compute covariance matrices with 1/(n-1) u1_centered = u1 - mu_x # Shape: (17, 128) u2_centered = u2 - mu_y # Shape: (17, 128) Sigma_x = np.dot(u1_centered.T, u1_centered) #/ (n - 1) # Shape: (128, 128) Sigma_y = np.dot(u2_centered.T, u2_centered) #/ (n - 1) # Shape: (128, 128),样本数较少时(1%) # Compute denominator terms s_x = np.dot(np.dot(x, Sigma_x), x) # x^T Sigma_x x, scalar s_y = np.dot(np.dot(x, Sigma_y), x) # x^T Sigma_y x, scalar denominator = (np.sqrt(s_x) + np.sqrt(s_y))**2 # Compute alpha, avoiding division by zero alpha = numerator / (numerator + denominator) # if (numerator + denominator) > 0 else 0 return -alpha def model_opt(pos, neg): import scipy.optimize as opt u1 = pos # Shape (n, 128) u2 = neg # Shape (n, 128) x0 = np.random.random(u1.shape[1]) # (128,) result = opt.minimize(objective, x0, args=(u1, u2)) return result.x, -result.fun 这是第二段代码

class KeyWordSpotter(torch.nn.Module): def __init__( self, ckpt_path, config_path, token_path, lexicon_path, threshold, min_frames=5, max_frames=250, interval_frames=50, score_beam=3, path_beam=20, gpu=-1, is_jit_model=False, ): super().__init__() os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu) with open(config_path, 'r') as fin: configs = yaml.load(fin, Loader=yaml.FullLoader) dataset_conf = configs['dataset_conf'] # feature related self.sample_rate = 16000 self.wave_remained = np.array([]) self.num_mel_bins = dataset_conf['feature_extraction_conf'][ 'num_mel_bins'] self.frame_length = dataset_conf['feature_extraction_conf'][ 'frame_length'] # in ms self.frame_shift = dataset_conf['feature_extraction_conf'][ 'frame_shift'] # in ms self.downsampling = dataset_conf.get('frame_skip', 1) self.resolution = self.frame_shift / 1000 # in second # fsmn splice operation self.context_expansion = dataset_conf.get('context_expansion', False) self.left_context = 0 self.right_context = 0 if self.context_expansion: self.left_context = dataset_conf['context_expansion_conf']['left'] self.right_context = dataset_conf['context_expansion_conf'][ 'right'] self.feature_remained = None self.feats_ctx_offset = 0 # after downsample, offset exist. # model related if is_jit_model: model = torch.jit.load(ckpt_path) # For script model, only cpu is supported. device = torch.device('cpu') else: # Init model from configs model = init_model(configs['model']) load_checkpoint(model, ckpt_path) use_cuda = gpu >= 0 and torch.cuda.is_available() device = torch.device('cuda' if use_cuda else 'cpu') self.device = device self.model = model.to(device) self.model.eval() logging.info(f'model {ckpt_path} loaded.') self.token_table = read_token(token_path) logging.info(f'tokens {token_path} with ' f'{len(self.token_table)} units loaded.') self.lexicon_table = read_lexicon(lexicon_path) logging.info(f'lexicons {lexicon_path} with ' f'{len(self.lexicon_table)} units loaded.') self.in_cache = torch.zeros(0, 0, 0, dtype=torch.float) # decoding and detection related self.score_beam = score_beam self.path_beam = path_beam self.threshold = threshold self.min_frames = min_frames self.max_frames = max_frames self.interval_frames = interval_frames self.cur_hyps = [(tuple(), (1.0, 0.0, []))] self.hit_score = 1.0 self.hit_keyword = None self.activated = False self.total_frames = 0 # frame offset, for absolute time self.last_active_pos = -1 # the last frame of being activated self.result = {} def set_keywords(self, keywords): # 4. parse keywords tokens assert keywords is not None, \ 'at least one keyword is needed, ' \ 'multiple keywords should be splitted with comma(,)' keywords_str = keywords keywords_list = keywords_str.strip().replace(' ', '').split(',') keywords_token = {} keywords_idxset = {0} keywords_strset = {'<blk>'} keywords_tokenmap = {'<blk>': 0} for keyword in keywords_list: strs, indexes = query_token_set(keyword, self.token_table, self.lexicon_table) keywords_token[keyword] = {} keywords_token[keyword]['token_id'] = indexes keywords_token[keyword]['token_str'] = ''.join('%s ' % str(i) for i in indexes) [keywords_strset.add(i) for i in strs] [keywords_idxset.add(i) for i in indexes] for txt, idx in zip(strs, indexes): if keywords_tokenmap.get(txt, None) is None: keywords_tokenmap[txt] = idx token_print = '' for txt, idx in keywords_tokenmap.items(): token_print += f'{txt}({idx}) ' logging.info(f'Token set is: {token_print}') self.keywords_idxset = keywords_idxset self.keywords_token = keywords_token def accept_wave(self, wave): assert isinstance(wave, bytes), \ "please make sure the input format is bytes(raw PCM)" # convert bytes into float32 data = [] for i in range(0, len(wave), 2): value = struct.unpack('<h', wave[i:i + 2])[0] data.append(value) # here we don't divide 32768.0, # because kaldi.fbank accept original input wave = np.array(data) wave = np.append(self.wave_remained, wave) if wave.size < (self.frame_length * self.sample_rate / 1000) \ * self.right_context : self.wave_remained = wave return None wave_tensor = torch.from_numpy(wave).float().to(self.device) wave_tensor = wave_tensor.unsqueeze(0) # add a channel dimension feats = kaldi.fbank(wave_tensor, num_mel_bins=self.num_mel_bins, frame_length=self.frame_length, frame_shift=self.frame_shift, dither=0, energy_floor=0.0, sample_frequency=self.sample_rate) # update wave remained feat_len = len(feats) frame_shift = int(self.frame_shift / 1000 * self.sample_rate) self.wave_remained = wave[feat_len * frame_shift:] if self.context_expansion: assert feat_len > self.right_context, \ "make sure each chunk feat length is large than right context." # pad feats with remained feature from last chunk if self.feature_remained is None: # first chunk # pad first frame at the beginning, # replicate just support last dimension, so we do transpose. feats_pad = F.pad(feats.T, (self.left_context, 0), mode='replicate').T else: feats_pad = torch.cat((self.feature_remained, feats)) ctx_frm = feats_pad.shape[0] - (self.right_context + self.right_context) ctx_win = (self.left_context + self.right_context + 1) ctx_dim = feats.shape[1] * ctx_win feats_ctx = torch.zeros(ctx_frm, ctx_dim, dtype=torch.float32) for i in range(ctx_frm): feats_ctx[i] = torch.cat(tuple( feats_pad[i:i + ctx_win])).unsqueeze(0) # update feature remained, and feats self.feature_remained = \ feats[-(self.left_context + self.right_context):] feats = feats_ctx.to(self.device) if self.downsampling > 1: last_remainder = 0 if self.feats_ctx_offset == 0 \ else self.downsampling - self.feats_ctx_offset remainder = (feats.size(0) + last_remainder) % self.downsampling feats = feats[self.feats_ctx_offset::self.downsampling, :] self.feats_ctx_offset = remainder \ if remainder == 0 else self.downsampling - remainder return feats def decode_keywords(self, t, probs): absolute_time = t + self.total_frames # search next_hyps depend on current probs and hyps. next_hyps = ctc_prefix_beam_search(absolute_time, probs, self.cur_hyps, self.keywords_idxset, self.score_beam) # update cur_hyps. note: the hyps is sort by path score(pnb+pb), # not the keywords' probabilities. cur_hyps = next_hyps[:self.path_beam] self.cur_hyps = cur_hyps def execute_detection(self, t): absolute_time = t + self.total_frames hit_keyword = None start = 0 end = 0 # hyps for detection hyps = [(y[0], y[1][0] + y[1][1], y[1][2]) for y in self.cur_hyps] # detect keywords in decoding paths. for one_hyp in hyps: prefix_ids = one_hyp[0] # path_score = one_hyp[1] prefix_nodes = one_hyp[2] assert len(prefix_ids) == len(prefix_nodes) for word in self.keywords_token.keys(): lab = self.keywords_token[word]['token_id'] offset = is_sublist(prefix_ids, lab) if offset != -1: hit_keyword = word start = prefix_nodes[offset]['frame'] end = prefix_nodes[offset + len(lab) - 1]['frame'] for idx in range(offset, offset + len(lab)): self.hit_score *= prefix_nodes[idx]['prob'] break if hit_keyword is not None: self.hit_score = math.sqrt(self.hit_score) break duration = end - start if hit_keyword is not None: if self.hit_score >= self.threshold and \ self.min_frames <= duration <= self.max_frames \ and (self.last_active_pos == -1 or end - self.last_active_pos >= self.interval_frames): self.activated = True self.last_active_pos = end logging.info( f"Frame {absolute_time} detect {hit_keyword} " f"from {start} to {end} frame. " f"duration {duration}, score {self.hit_score}, Activated.") elif self.last_active_pos > 0 and \ end - self.last_active_pos < self.interval_frames: logging.info( f"Frame {absolute_time} detect {hit_keyword} " f"from {start} to {end} frame. " f"but interval {end-self.last_active_pos} " f"is lower than {self.interval_frames}, Deactivated. ") elif self.hit_score < self.threshold: logging.info(f"Frame {absolute_time} detect {hit_keyword} " f"from {start} to {end} frame. " f"but {self.hit_score} " f"is lower than {self.threshold}, Deactivated. ") elif self.min_frames > duration or duration > self.max_frames: logging.info( f"Frame {absolute_time} detect {hit_keyword} " f"from {start} to {end} frame. " f"but {duration} beyond range" f"({self.min_frames}~{self.max_frames}), Deactivated. ") self.result = { "state": 1 if self.activated else 0, "keyword": hit_keyword if self.activated else None, "start": start * self.resolution if self.activated else None, "end": end * self.resolution if self.activated else None, "score": self.hit_score if self.activated else None } def forward(self, wave_chunk): feature = self.accept_wave(wave_chunk) if feature is None or feature.size(0) < 1: return {} # # the feature is not enough to get result. feature = feature.unsqueeze(0) # add a batch dimension logits, self.in_cache = self.model(feature, self.in_cache) probs = logits.softmax(2) # (batch_size, maxlen, vocab_size) probs = probs[0].cpu() # remove batch dimension for (t, prob) in enumerate(probs): t *= self.downsampling self.decode_keywords(t, prob) self.execute_detection(t) if self.activated: self.reset() # since a chunk include about 30 frames, # once activated, we can jump the latter frames. # TODO: there should give another method to update result, # avoiding self.result being cleared. break # update frame offset self.total_frames += len(probs) * self.downsampling # For streaming kws, the cur_hyps should be reset if the time of # a possible keyword last over the max_frames value you set. # see this issue:https://github.com/duj12/kws_demo/issues/2 if len(self.cur_hyps) > 0 and len(self.cur_hyps[0][0]) > 0: keyword_may_start = int(self.cur_hyps[0][1][2][0]['frame']) if (self.total_frames - keyword_may_start) > self.max_frames: self.reset() return self.result def reset(self): self.cur_hyps = [(tuple(), (1.0, 0.0, []))] self.activated = False self.hit_score = 1.0 def reset_all(self): self.reset() self.wave_remained = np.array([]) self.feature_remained = None self.feats_ctx_offset = 0 # after downsample, offset exist. self.in_cache = torch.zeros(0, 0, 0, dtype=torch.float) self.total_frames = 0 # frame offset, for absolute time self.last_active_pos = -1 # the last frame of being activated self.result = {}请帮我缕清整个脉络

class ResidualBlock(nn.Module): def init(self, in_channels, out_channels, dilation): super(ResidualBlock, self).init() self.conv = nn.Sequential( nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=dilation, dilation=dilation), nn.BatchNorm1d(out_channels), nn.ReLU(), nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=dilation, dilation=dilation), nn.BatchNorm1d(out_channels), nn.ReLU() ) self.attention = nn.Sequential( nn.Conv1d(out_channels, out_channels, kernel_size=1), nn.Sigmoid() ) self.downsample = nn.Conv1d(in_channels, out_channels, kernel_size=1) if in_channels != out_channels else None def forward(self, x): residual = x out = self.conv(x) attention = self.attention(out) out = out * attention if self.downsample: residual = self.downsample(residual) out += residual return out class VMD_TCN(nn.Module): def init(self, input_size, output_size, n_k=1, num_channels=16, dropout=0.2): super(VMD_TCN, self).init() self.input_size = input_size self.nk = n_k if isinstance(num_channels, int): num_channels = [num_channels*(2**i) for i in range(4)] self.layers = nn.ModuleList() self.layers.append(nn.utils.weight_norm(nn.Conv1d(input_size, num_channels[0], kernel_size=1))) for i in range(len(num_channels)): dilation_size = 2 ** i in_channels = num_channels[i-1] if i > 0 else num_channels[0] out_channels = num_channels[i] self.layers.append(ResidualBlock(in_channels, out_channels, dilation_size)) self.pool = nn.AdaptiveMaxPool1d(1) self.fc = nn.Linear(num_channels[-1], output_size) self.w = nn.Sequential(nn.Conv1d(num_channels[-1], num_channels[-1], kernel_size=1), nn.Sigmoid()) # 特征融合 门控系统 # self.fc1 = nn.Linear(output_size * (n_k + 1), output_size) # 全部融合 self.fc1 = nn.Linear(output_size * 2, output_size) # 只选择其中两个融合 self.dropout = nn.Dropout(dropout) # self.weight_fc = nn.Linear(num_channels[-1] * (n_k + 1), n_k + 1) # 置信度系数,对各个结果加权平均 软投票思路 def vmd(self, x): x_imfs = [] signal = np.array(x).flatten() # flatten()必须加上 否则最后一个batch报错size不匹配! u, u_hat, omega = VMD(signal, alpha=512, tau=0, K=self.nk, DC=0, init=1, tol=1e-7) for i in range(u.shape[0]): imf = torch.tensor(u[i], dtype=torch.float32) imf = imf.reshape(-1, 1, self.input_size) x_imfs.append(imf) x_imfs.append(x) return x_imfs def forward(self, x): x_imfs = self.vmd(x) total_out = [] # for data in x_imfs: for data in [x_imfs[0], x_imfs[-1]]: out = data.transpose(1, 2) for layer in self.layers: out = layer(out) out = self.pool(out) # torch.Size([96, 56, 1]) w = self.w(out) out = w * out # torch.Size([96, 56, 1]) out = out.view(out.size(0), -1) out = self.dropout(out) out = self.fc(out) total_out.append(out) total_out = torch.cat(total_out, dim=1) # 考虑w1total_out[0]+ w2total_out[1],在第一维,权重相加得到最终结果,不用cat total_out = self.dropout(total_out) output = self.fc1(total_out) return output优化代码

# data/dataset.py import os import numpy as np import torch from torch.utils.data import Dataset from plyfile import PlyData import open3d as o3d def read_ply_points(file_path): """ 使用 plyfile 解析 .ply 文件,提取 x, y, z 字段 返回一个 numpy 数组 (N, 3) """ ply = PlyData.read(file_path) # 提取 x, y, z 字段 x = ply['vertex']['x'] y = ply['vertex']['y'] z = ply['vertex']['z'] # 拼接为点云数组 points = np.vstack([x, y, z]).T points = points.astype(np.float32) return points class UnifiedPointCloudDataset(Dataset): def __init__(self, root_dirs, file_exts=['.ply', '.stl', '.obj'], num_points=1024): """ :param root_dirs: 包含所有数据文件夹的列表 :param file_exts: 支持的点云格式 :param num_points: 每个点云采样点数 """ self.file_list = [] self.num_points = num_points # 收集所有点云文件路径 for root_dir in root_dirs: if not os.path.exists(root_dir): raise FileNotFoundError(f"❌ 数据目录不存在: {root_dir}") for root, _, files in os.walk(root_dir): for file in files: if any(file.lower().endswith(ext) for ext in file_exts): full_path = os.path.join(root, file) if os.path.exists(full_path): # 确保文件真实存在 self.file_list.append(full_path) print(f"✅ 共发现 {len(self.file_list)} 个点云文件,用于训练") if len(self.file_list) == 0: raise ValueError("⚠️ 没有发现任何点云文件,请检查路径和文件格式") def __len__(self): return len(self.file_list) def __getitem__(self, idx): path = self.file_list[idx] ext = os.path.splitext(path)[1].lower() try: if ext == '.ply': # 使用 plyfile 读取 .ply 文件 points = read_ply_points(path) elif ext == '.stl': # 使用 open3d 读取 STL 文件并采样成点云 mesh = o3d.io.read_triangle_mesh(path) pcd = mesh.sample_points_uniformly(number_of_points=100000) points = np.asarray(pcd.points) elif ext == '.obj': # 使用 open3d 读取 OBJ 文件 pcd = o3d.io.read_point_cloud(path) if not pcd.has_points(): raise ValueError(f"点云为空或损坏: {path}") points = np.asarray(pcd.points) else: raise ValueError(f"不支持的格式: {ext}") # 检查点云是否为空 if len(points) < 10: raise ValueError(f"点云为空或损坏: {path}") # 固定点数采样 if len(points) < self.num_points: indices = np.random.choice(len(points), self.num_points, replace=True) else: indices = np.random.choice(len(points), self.num_points, replace=False) points = points[indices] return torch.tensor(points, dtype=torch.float32) except Exception as e: print(f"❌ 读取失败: {path},错误: {str(e)}") return self.__getitem__((idx + 1) % len(self.file_list)) # models/ballmamba.py import torch import torch.nn as nn from torch_geometric.nn import knn_graph, radius_graph from models.mamba_block import MambaBlock from utils.pointcloud_utils import farthest_point_sample class FPS_Causal(nn.Module): def __init__(self, in_channels, hidden_channels, k=512): super(FPS_Causal, self).__init__() self.k = k self.downsample = nn.Linear(in_channels, hidden_channels) self.mamba = MambaBlock(hidden_channels) def forward(self, x, pos): batch_size, num_points, _ = pos.shape idxs = [] for i in range(batch_size): idx = farthest_point_sample(pos[i], self.k) # 现在支持 (N, 3) idxs.append(idx) idx = torch.stack(idxs, dim=0) # (B, k) # 使用 gather 正确采样 x_sampled = torch.gather(pos, 1, idx.unsqueeze(-1).expand(-1, -1, 3)) # (B, k, 3) x_sampled = self.downsample(x_sampled) # (B, k, hidden_channels) x_sampled = self.mamba(x_sampled) # (B, k, hidden_channels) return x_sampled class BallQuery_Sort(nn.Module): def __init__(self, radius=0.3, k=64): super(BallQuery_Sort, self).__init__() self.radius = radius self.k = k def forward(self, pos, x): edge_index = radius_graph(pos, r=self.radius) row, col = edge_index neighbor_pos = pos[col] neighbor_x = x[col] dist = torch.norm(neighbor_pos - pos[row], dim=1) sorted_indices = torch.argsort(dist.view(-1, self.k), dim=1) neighbors = neighbor_x.view(-1, self.k, neighbor_x.shape[1]) return neighbors.gather(1, sorted_indices.unsqueeze(-1).expand(-1, -1, neighbor_x.shape[1])) class BallMambaModel(nn.Module): def __init__(self, in_channels=3, num_keypoints=1024): super(BallMambaModel, self).__init__() self.fps = FPS_Causal(in_channels, 64, k=num_keypoints) self.ball_query = BallQuery_Sort(radius=0.1, k=64) self.mamba = MambaBlock(64) self.decoder = nn.Linear(64, 3) def forward(self, pos): print("pos shape:", pos.shape) # 添加此行,查看 pos 的形状 x = self.fps(None, pos) x = self.ball_query(pos, x) x = x.mean(dim=1) x = self.mamba(x.unsqueeze(0)).squeeze(0) out = self.decoder(x) return out import torch import torch.nn as nn import torch.nn.functional as F class MambaBlock(nn.Module): def __init__(self, d_model: int, d_state: int = 16, d_conv: int = 4, expand: int = 2, dt_rank: int or str = "auto"): """ 完整的 MambaBlock 实现(无 LSTM,基于状态空间模型 SSM) :param d_model: 输入特征维度(通道数) :param d_state: 状态维度(SSM 中的 N) :param d_conv: 卷积核大小(用于局部依赖建模) :param expand: 扩展因子,中间维度 = d_model * expand :param dt_rank: 离散时间步秩,控制参数量,若为 "auto" 则自动计算 """ super(MambaBlock, self).__init__() self.d_model = d_model self.d_state = d_state self.d_inner = int(expand * d_model) self.dt_rank = dt_rank if dt_rank != "auto" else max(1, self.d_model // 16) # 输入投影:将输入特征升维 self.in_proj = nn.Linear(d_model, self.d_inner * 2, bias=False) # 卷积分支(局部建模) self.conv = nn.Conv1d( in_channels=self.d_inner, out_channels=self.d_inner, kernel_size=d_conv, bias=True, groups=self.d_inner, padding=d_conv - 1, # 保证 causal ) # x_proj 将 x 映射到 dt、B、C self.x_proj = nn.Linear(self.d_inner, self.dt_rank + 2 * d_state, bias=False) # dt_proj 将 dt 映射到 d_inner self.dt_proj = nn.Linear(self.dt_rank, self.d_inner, bias=True) # A 和 D 参数(状态矩阵和跳跃连接) A = torch.arange(1, d_state + 1, dtype=torch.float32)[None, :].repeat(self.d_inner, 1) self.A_log = nn.Parameter(torch.log(A)) self.D = nn.Parameter(torch.ones(self.d_inner)) # (d_inner, ) # 输出投影 self.out_proj = nn.Linear(self.d_inner, d_model, bias=False) def forward(self, x: torch.Tensor) -> torch.Tensor: """ 前向传播 :param x: 输入张量,形状 (B, L, d_model) :return: 输出张量,形状 (B, L, d_model) """ batch, seqlen, dim = x.shape # 1. 输入投影 + 拆分 xz = self.in_proj(x) # (B, L, 2 * d_inner) x, z = torch.split(xz, [self.d_inner, self.d_inner], dim=-1) # (B, L, d_inner) # 2. 卷积处理 x = x.transpose(1, 2) # (B, d_inner, L) x = self.conv(x)[:, :, :seqlen] # (B, d_inner, L) x = x.transpose(1, 2) # (B, L, d_inner) # 3. x_proj 分割出 dt、B、C x_dbl = self.x_proj(x) # (B, L, dt_rank + 2 * d_state) dt, B, C = torch.split( x_dbl, [self.dt_rank, self.d_state, self.d_state], dim=-1 ) # 4. dt_proj 映射 dt dt = F.softplus(self.dt_proj(dt)) # (B, L, d_inner) # 5. 获取 A A = -torch.exp(self.A_log) # (d_inner, d_state) A = A.unsqueeze(0).unsqueeze(0) # (1, 1, d_inner, d_state) B = B.unsqueeze(dim=2) # (B, L, 1, d_state) C = C.unsqueeze(dim=2) # (B, L, 1, d_state) # 调试信息 print(f"[MambaBlock] A.shape = {A.shape}") print(f"[MambaBlock] B.shape = {B.shape}") print(f"[MambaBlock] C.shape = {C.shape}") print(f"[MambaBlock] dt.shape = {dt.shape}") states = torch.zeros(batch, self.d_inner, self.d_state, device=x.device) outputs = [] for t in range(seqlen): # 更新状态 states = states + x[:, t:t+1, :, None] * B[:, t:t+1, :, :] states = states * torch.exp(A * dt[:, t:t+1, :, None]) # 添加 dt 到状态更新 # 获取当前时间步的 C 并进行 einsum current_C = C[:, t] # (B, 1, d_state) current_C = current_C.squeeze(1) # (B, d_state) # 使用广播机制 y = torch.einsum("binc,bc->bin", states, current_C) # bc 会广播为 binc outputs.append(y) y = torch.stack(outputs, dim=1) # (B, L, d_inner) # ✅ 修复:self.D 扩展为 (1, 1, d_inner) 以便广播 y = y + x * self.D.view(1, 1, -1) # 加上跳跃连接 # 激活 + 输出 y = y * F.silu(z) out = self.out_proj(y) # 调试信息 print(f"[MambaBlock] y.shape = {y.shape}") print(f"[MambaBlock] out.shape = {out.shape}") return out # train.py import os import torch import torch.nn as nn from torch.utils.data import DataLoader from data.dataset import UnifiedPointCloudDataset from models.ballmamba import BallMambaModel from utils.loss import ChamferLoss # 👇 Windows 多进程训练必须放在 if __name__ == '__main__' 里面 if __name__ == '__main__': # 设置多进程启动方式(Windows 下推荐 spawn) torch.multiprocessing.set_start_method('spawn') # ✅ 修改为你自己的路径 ROOT_DIRS = [ r"D:\桌面\point\data1\part1", r"D:\桌面\point\data1\part2", r"D:\桌面\point\data1\part3", r"D:\桌面\point\data1\part4", r"D:\桌面\point\data1\part5", r"D:\桌面\point\data1\part6", r"D:\桌面\point\data1\part7", r"D:\桌面\point\data1\part8", r"D:\桌面\point\data1\part9", r"D:\桌面\point\data1\part10", r"D:\桌面\point\data1\part11", r"D:\桌面\point\data1\part12", r"D:\桌面\point\data1\part13", r"D:\桌面\point\data1\part14", r"D:\桌面\point\data1\part15", r"D:\桌面\point\data1\part16", r"D:\桌面\point\data1\part17", r"D:\桌面\point\data1\part18", r"D:\桌面\point\data1\part19", r"D:\桌面\point\data1\part20", ] # ✅ 创建 Dataset dataset = UnifiedPointCloudDataset( root_dirs=ROOT_DIRS, file_exts=['.ply', '.stl'], num_points=1024 ) print(f"✅ 共发现 {len(dataset)} 个点云文件,用于训练") if len(dataset) == 0: raise ValueError("⚠️ 没有发现任何点云文件,请检查路径和文件格式") # ✅ 创建 DataLoader(num_workers=0 可临时绕过问题) loader = DataLoader( dataset, batch_size=16, shuffle=True, num_workers=0, # 👈 Windows 下训练先设置为 0,后续再尝试 4 pin_memory=True ) # ✅ 模型初始化 device = torch.device("cpu") model = BallMambaModel(in_channels=3, num_keypoints=512).to(device) optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) criterion = ChamferLoss().to(device) # ✅ 训练循环 for epoch in range(50): model.train() total_loss = 0 for i, points in enumerate(loader): points = points.to(device) # 输入输出一致(重构任务) recon_points = model(points) loss = criterion(recon_points, points) optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() if i % 10 == 0: print(f"Epoch [{epoch+1}/50], Batch [{i+1}/{len(loader)}], Loss: {loss.item():.4f}") print(f"Epoch [{epoch+1}/50] 完成,平均 Loss: {total_loss / len(loader):.4f}") torch.save(model.state_dict(), f"models/ballmamba_epoch_{epoch+1}.pth") 上述是我的项目模型训练的代码,现在运行后出现问题C:\ProgramData\miniconda3\envs\torch\python.exe D:\桌面\point\scripts\train_model.py ✅ 共发现 907 个点云文件,用于训练 ✅ 共发现 907 个点云文件,用于训练 pos shape: torch.Size([16, 1024, 3]) [MambaBlock] A.shape = torch.Size([1, 1, 128, 16]) [MambaBlock] B.shape = torch.Size([16, 512, 1, 16]) [MambaBlock] C.shape = torch.Size([16, 512, 1, 16]) [MambaBlock] dt.shape = torch.Size([16, 512, 128]) Traceback (most recent call last): File "D:\桌面\point\scripts\train_model.py", line 76, in <module> recon_points = model(points) File "C:\ProgramData\miniconda3\envs\torch\lib\site-packages\torch\nn\modules\module.py", line 727, in _call_impl result = self.forward(*input, **kwargs) File "D:\桌面\point\models\ballmamba.py", line 63, in forward x = self.fps(None, pos) File "C:\ProgramData\miniconda3\envs\torch\lib\site-packages\torch\nn\modules\module.py", line 727, in _call_impl result = self.forward(*input, **kwargs) File "D:\桌面\point\models\ballmamba.py", line 30, in forward x_sampled = self.mamba(x_sampled) # (B, k, hidden_channels) File "C:\ProgramData\miniconda3\envs\torch\lib\site-packages\torch\nn\modules\module.py", line 727, in _call_impl result = self.forward(*input, **kwargs) File "D:\桌面\point\models\mamba_block.py", line 111, in forward y = y + x * self.D.view(1, 1, -1) # 加上跳跃连接 RuntimeError: The size of tensor a (16) must match the size of tensor b (512) at non-singleton dimension 2 进程已结束,退出代码为 1 应该如何修改代码?给我修改后的完整代码

Traceback (most recent call last): File "d:\海康\MVS\Development\Samples\Python\MvImport\two.py", line 702, in <module> class MainWindow(QMainWindow): File "d:\海康\MVS\Development\Samples\Python\MvImport\two.py", line 820, in MainWindow brightness = np.mean(gray) NameError: name 'gray' is not defined这个错误是下面这个代码的 # -*- coding: utf-8 -*- import sys import os import cv2 import numpy as np import math import time import logging import threading from collections import deque from PyQt5.QtWidgets import ( QApplication, QMainWindow, QPushButton, QWidget, QVBoxLayout, QHBoxLayout, QMessageBox, QLabel, QFileDialog, QToolBox, QComboBox, QStatusBar, QGroupBox, QSlider, QDockWidget, QProgressDialog, QLineEdit, QRadioButton, QGridLayout, QSpinBox, QCheckBox, QDialog, QDialogButtonBox, QDoubleSpinBox, QProgressBar, ) from PyQt5.QtCore import QRect, Qt, QSettings, QThread, pyqtSignal, QTimer, QMetaObject, pyqtSlot,Q_ARG from PyQt5.QtGui import QImage, QPixmap from CamOperation_class import CameraOperation from MvCameraControl_class import * import ctypes from ctypes import cast, POINTER from datetime import datetime import skimage import platform from CameraParams_header import ( MV_GIGE_DEVICE, MV_USB_DEVICE, MV_GENTL_CAMERALINK_DEVICE, MV_GENTL_CXP_DEVICE, MV_GENTL_XOF_DEVICE ) # ===== 全局配置 ===== # 模板匹配参数 MATCH_THRESHOLD = 0.75 # 降低匹配置信度阈值以提高灵敏度 MIN_MATCH_COUNT = 10 # 最小匹配特征点数量 MIN_FRAME_INTERVAL = 0.1 # 最小检测间隔(秒) # ===== 全局变量 ===== current_sample_path = "" detection_history = [] isGrabbing = False isOpen = False obj_cam_operation = None frame_monitor_thread = None template_matcher_thread = None MV_OK = 0 MV_E_CALLORDER = -2147483647 # ==================== 优化后的质量检测算法 ==================== def enhanced_check_print_quality(sample_image_path, test_image, threshold=0.05): # 不再使用传感器数据调整阈值 adjusted_threshold = threshold try: sample_img_data = np.fromfile(sample_image_path, dtype=np.uint8) sample_image = cv2.imdecode(sample_img_data, cv2.IMREAD_GRAYSCALE) if sample_image is None: logging.error(f"无法解码样本图像: {sample_image_path}") return None, None, None except Exception as e: logging.exception(f"样本图像读取异常: {str(e)}") return None, None, None if len(test_image.shape) == 3: test_image_gray = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY) else: test_image_gray = test_image.copy() sample_image = cv2.GaussianBlur(sample_image, (5, 5), 0) test_image_gray = cv2.GaussianBlur(test_image_gray, (5, 5), 0) try: # 使用更鲁棒的SIFT特征检测器 sift = cv2.SIFT_create() keypoints1, descriptors1 = sift.detectAndCompute(sample_image, None) keypoints2, descriptors2 = sift.detectAndCompute(test_image_gray, None) if descriptors1 is None or descriptors2 is None: logging.warning("无法提取特征描述符,跳过配准") aligned_sample = sample_image else: # 使用FLANN匹配器提高匹配精度 FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) flann = cv2.FlannBasedMatcher(index_params, search_params) matches = flann.knnMatch(descriptors1, descriptors2, k=2) # 应用Lowe's比率测试筛选优质匹配 good_matches = [] for m, n in matches: if m.distance < 0.7 * n.distance: good_matches.append(m) if len(good_matches) > MIN_MATCH_COUNT: src_pts = np.float32([keypoints1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) dst_pts = np.float32([keypoints2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0) if H is not None: aligned_sample = cv2.warpPerspective( sample_image, H, (test_image_gray.shape[1], test_image_gray.shape[0]) ) logging.info("图像配准成功,使用配准后样本") else: aligned_sample = sample_image logging.warning("无法计算单应性矩阵,使用原始样本") else: aligned_sample = sample_image logging.warning(f"特征点匹配不足({len(good_matches)}/{MIN_MATCH_COUNT}),跳过图像配准") except Exception as e: logging.error(f"图像配准失败: {str(e)}") aligned_sample = sample_image try: if aligned_sample.shape != test_image_gray.shape: test_image_gray = cv2.resize(test_image_gray, (aligned_sample.shape[1], aligned_sample.shape[0])) except Exception as e: logging.error(f"图像调整大小失败: {str(e)}") return None, None, None try: from skimage.metrics import structural_similarity as compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True, data_range=255 ) except ImportError: from skimage.measure import compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True ) except Exception as e: logging.error(f"SSIM计算失败: {str(e)}") abs_diff = cv2.absdiff(aligned_sample, test_image_gray) ssim_diff = abs_diff.astype(np.float32) / 255.0 ssim_score = 1.0 - np.mean(ssim_diff) ssim_diff = (1 - ssim_diff) * 255 abs_diff = cv2.absdiff(aligned_sample, test_image_gray) combined_diff = cv2.addWeighted(ssim_diff.astype(np.uint8), 0.7, abs_diff, 0.3, 0) _, thresholded = cv2.threshold(combined_diff, 30, 255, cv2.THRESH_BINARY) kernel = np.ones((3, 3), np.uint8) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_OPEN, kernel) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_CLOSE, kernel) diff_pixels = np.count_nonzero(thresholded) total_pixels = aligned_sample.size diff_ratio = diff_pixels / total_pixels is_qualified = diff_ratio <= adjusted_threshold marked_image = cv2.cvtColor(test_image_gray, cv2.COLOR_GRAY2BGR) marked_image[thresholded == 255] = [0, 0, 255] # 放大缺陷标记 scale_factor = 2.0 # 放大2倍 marked_image = cv2.resize(marked_image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR) labels = skimage.measure.label(thresholded) properties = skimage.measure.regionprops(labels) for prop in properties: if prop.area > 50: y, x = prop.centroid # 根据放大比例调整坐标 x_scaled = int(x * scale_factor) y_scaled = int(y * scale_factor) cv2.putText(marked_image, f"Defect", (x_scaled, y_scaled), cv2.FONT_HERSHEY_SIMPLEX, 0.5 * scale_factor, (0, 255, 255), int(scale_factor)) return is_qualified, diff_ratio, marked_image # ==================== 视觉触发的质量检测流程 ==================== def vision_controlled_check(capture_image=None, match_score=0.0): """修改为接受图像帧和匹配分数""" global current_sample_path, detection_history logging.info("视觉触发质量检测启动") # 如果没有提供图像,使用当前帧 if capture_image is None: frame = obj_cam_operation.get_current_frame() else: frame = capture_image if frame is None: QMessageBox.warning(mainWindow, "错误", "无法获取当前帧图像!", QMessageBox.Ok) return progress = QProgressDialog("正在检测...", "取消", 0, 100, mainWindow) progress.setWindowModality(Qt.WindowModal) progress.setValue(10) try: diff_threshold = mainWindow.sliderDiffThreshold.value() / 100.0 logging.info(f"使用差异度阈值: {diff_threshold}") progress.setValue(30) is_qualified, diff_ratio, marked_image = enhanced_check_print_quality( current_sample_path, frame, threshold=diff_threshold ) progress.setValue(70) if is_qualified is None: QMessageBox.critical(mainWindow, "检测错误", "检测失败,请检查日志", QMessageBox.Ok) return logging.info(f"检测结果: 合格={is_qualified}, 差异={diff_ratio}") progress.setValue(90) update_diff_display(diff_ratio, is_qualified) result_text = f"印花是否合格: {'合格' if is_qualified else '不合格'}\n差异占比: {diff_ratio*100:.2f}%\n阈值: {diff_threshold*100:.2f}%" QMessageBox.information(mainWindow, "检测结果", result_text, QMessageBox.Ok) if marked_image is not None: # 创建可调整大小的窗口 cv2.namedWindow("缺陷标记结果", cv2.WINDOW_NORMAL) cv2.resizeWindow("缺陷标记结果", 800, 600) # 初始大小 cv2.imshow("缺陷标记结果", marked_image) cv2.waitKey(0) cv2.destroyAllWindows() detection_result = { 'timestamp': datetime.now(), 'qualified': is_qualified, 'diff_ratio': diff_ratio, 'threshold': diff_threshold, 'trigger_type': 'vision' if capture_image else 'manual' } detection_history.append(detection_result) update_history_display() progress.setValue(100) except Exception as e: logging.exception("印花检测失败") QMessageBox.critical(mainWindow, "检测错误", f"检测过程中发生错误: {str(e)}", QMessageBox.Ok) finally: progress.close() # ==================== 相机操作函数 ==================== def open_device(): global deviceList, nSelCamIndex, obj_cam_operation, isOpen, frame_monitor_thread, mainWindow if isOpen: QMessageBox.warning(mainWindow, "Error", '相机已打开!', QMessageBox.Ok) return MV_E_CALLORDER nSelCamIndex = mainWindow.ComboDevices.currentIndex() if nSelCamIndex < 0: QMessageBox.warning(mainWindow, "Error", '请选择相机!', QMessageBox.Ok) return MV_E_CALLORDER # 创建相机控制对象 cam = MvCamera() # 初始化相机操作对象 obj_cam_operation = CameraOperation(cam, deviceList, nSelCamIndex) ret = obj_cam_operation.open_device() if 0 != ret: strError = "打开设备失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) isOpen = False else: set_continue_mode() get_param() isOpen = True enable_controls() # 创建并启动帧监控线程 frame_monitor_thread = FrameMonitorThread(obj_cam_operation) frame_monitor_thread.frame_status.connect(mainWindow.statusBar().showMessage) frame_monitor_thread.start() def start_grabbing(): global obj_cam_operation, isGrabbing, template_matcher_thread ret = obj_cam_operation.start_grabbing(mainWindow.widgetDisplay.winId()) if ret != 0: strError = "开始取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = True enable_controls() # 等待第一帧到达 QThread.msleep(500) if not obj_cam_operation.is_frame_available(): QMessageBox.warning(mainWindow, "警告", "开始取流后未接收到帧,请检查相机连接!", QMessageBox.Ok) # 如果启用了自动检测,启动检测线程 if mainWindow.chkContinuousMatch.isChecked(): toggle_template_matching(True) def stop_grabbing(): global obj_cam_operation, isGrabbing, template_matcher_thread ret = obj_cam_operation.Stop_grabbing() if ret != 0: strError = "停止取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = False enable_controls() # 停止模板匹配线程 if template_matcher_thread and template_matcher_thread.isRunning(): template_matcher_thread.stop() def close_device(): global isOpen, isGrabbing, obj_cam_operation, frame_monitor_thread, template_matcher_thread if frame_monitor_thread and frame_monitor_thread.isRunning(): frame_monitor_thread.stop() frame_monitor_thread.wait(2000) # 停止模板匹配线程 if template_matcher_thread and template_matcher_thread.isRunning(): template_matcher_thread.stop() template_matcher_thread.wait(2000) template_matcher_thread = None if isOpen and obj_cam_operation: obj_cam_operation.close_device() isOpen = False isGrabbing = False enable_controls() # ==================== 连续帧匹配检测器 ==================== class ContinuousFrameMatcher(QThread): frame_processed = pyqtSignal(np.ndarray, float, bool) # 处理后的帧, 匹配分数, 是否匹配 match_score_updated = pyqtSignal(float) # 匹配分数更新信号 match_success = pyqtSignal(np.ndarray, float) # 匹配成功信号 (帧, 匹配分数) def __init__(self, cam_operation, parent=None): super().__init__(parent) self.cam_operation = cam_operation self.running = True self.sample_template = None self.min_match_count = MIN_MATCH_COUNT self.match_threshold = MATCH_THRESHOLD self.sample_kp = None self.sample_des = None self.current_match_score = 0.0 self.last_match_time = 0 self.frame_counter = 0 self.consecutive_fail_count = 0 self.last_trigger_time = 0 # 上次触发时间 self.cool_down = 0.2 # 冷却时间(秒) # 特征检测器 - 使用SIFT self.sift = cv2.SIFT_create() # 特征匹配器 - 使用FLANN提高匹配精度 FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) self.flann = cv2.FlannBasedMatcher(index_params, search_params) # 性能监控 self.processing_times = deque(maxlen=100) self.frame_rates = deque(maxlen=100) # 黑白相机优化 self.clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) def preprocess_image(self, image): """增强黑白图像特征提取""" # 如果是单通道图像,转换为三通道 if len(image.shape) == 2: image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) # 对比度增强 (LAB空间) lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) cl = self.clahe.apply(l) limg = cv2.merge((cl, a, b)) enhanced = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR) # 边缘增强 gray = cv2.cvtColor(enhanced, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150) # 组合特征 return cv2.addWeighted(enhanced, 0.7, cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR), 0.3, 0) def set_sample(self, sample_img): """设置标准样本并提取特征""" # 保存样本图像 self.sample_img = sample_img # 预处理增强特征 processed_sample = self.preprocess_image(sample_img) # 提取样本特征点 self.sample_kp, self.sample_des = self.sift.detectAndCompute(processed_sample, None) if self.sample_des is None or len(self.sample_kp) < self.min_match_count: logging.warning("样本特征点不足") return False logging.info(f"样本特征提取成功: {len(self.sample_kp)}个关键点") return True def process_frame(self, frame): """处理帧:特征提取、匹配和可视化""" is_matched = False match_score = 0.0 processed_frame = frame.copy() # 检查是否已设置样本 if self.sample_kp is None or self.sample_des is None: return processed_frame, match_score, is_matched # 预处理当前帧 processed_frame = self.preprocess_image(frame) # 转换为灰度图像用于特征提取 gray_frame = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2GRAY) try: # 提取当前帧的特征点 kp, des = self.sift.detectAndCompute(gray_frame, None) if des is None or len(kp) < 10: # 特征点不足 return processed_frame, match_score, is_matched # 匹配特征点 matches = self.flann.knnMatch(self.sample_des, des, k=2) # 应用Lowe's比率测试 good_matches = [] for m, n in matches: if m.distance < 0.7 * n.distance: good_matches.append(m) # 计算匹配分数(匹配点数量占样本特征点数量的比例) if len(self.sample_kp) > 0: match_score = len(good_matches) / len(self.sample_kp) match_score = min(1.0, max(0.0, match_score)) # 确保在0-1范围内 else: match_score = 0.0 # 判断是否匹配成功 if len(good_matches) >= self.min_match_count and match_score >= self.match_threshold: is_matched = True # 在图像上绘制匹配结果 if len(gray_frame.shape) == 2: processed_frame = cv2.cvtColor(gray_frame, cv2.COLOR_GRAY2BGR) # 绘制匹配点 processed_frame = cv2.drawMatches( self.sample_img, self.sample_kp, processed_frame, kp, good_matches, None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS ) # 在图像上显示匹配分数 cv2.putText(processed_frame, f"Match Score: {match_score:.2f}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) # 更新当前匹配分数 self.current_match_score = match_score self.match_score_updated.emit(match_score) # 检查是否匹配成功且超过冷却时间 current_time = time.time() if is_matched and (current_time - self.last_trigger_time) > self.cool_down: self.last_trigger_time = current_time logging.info(f"匹配成功! 分数: {match_score:.2f}, 触发质量检测") # 发出匹配成功信号 (传递当前帧) self.match_success.emit(frame.copy(), match_score) except Exception as e: logging.error(f"帧处理错误: {str(e)}") return processed_frame, match_score, is_matched def set_threshold(self, threshold): """更新匹配阈值""" self.match_threshold = max(0.0, min(1.0, threshold)) logging.info(f"更新匹配阈值: {self.match_threshold:.2f}") def run(self): """主处理循环 - 连续处理每一帧""" logging.info("连续帧匹配线程启动") self.last_match_time = time.time() self.consecutive_fail_count = 0 while self.running: start_time = time.time() # 检查相机状态 if not self.cam_operation or not self.cam_operation.is_grabbing: if self.consecutive_fail_count % 10 == 0: logging.debug("相机未取流,等待...") time.sleep(0.1) self.consecutive_fail_count += 1 continue # 获取当前帧 frame = self.cam_operation.get_current_frame() if frame is None: self.consecutive_fail_count += 1 if self.consecutive_fail_count % 10 == 0: logging.warning(f"连续{self.consecutive_fail_count}次获取帧失败") time.sleep(0.05) continue self.consecutive_fail_count = 0 try: # 处理帧 processed_frame, match_score, is_matched = self.process_frame(frame) # 发送处理结果 self.frame_processed.emit(processed_frame, match_score, is_matched) except Exception as e: logging.error(f"帧处理错误: {str(e)}") # 控制处理频率 processing_time = time.time() - start_time sleep_time = max(0.01, MIN_FRAME_INTERVAL - processing_time) time.sleep(sleep_time) logging.info("连续帧匹配线程退出") # ==================== 模板匹配控制函数 ==================== def toggle_template_matching(state): global template_matcher_thread, current_sample_path logging.debug(f"切换连续匹配状态: {state}") if state == Qt.Checked and isGrabbing: # 确保已设置样本 if not current_sample_path: logging.warning("尝试启动连续匹配但未设置样本") QMessageBox.warning(mainWindow, "错误", "请先设置标准样本", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if template_matcher_thread is None: logging.info("创建新的连续帧匹配线程") template_matcher_thread = ContinuousFrameMatcher(obj_cam_operation) template_matcher_thread.frame_processed.connect(update_frame_display) template_matcher_thread.match_score_updated.connect(update_match_score_display) # 正确连接匹配成功信号到质量检测函数 template_matcher_thread.match_success.connect( lambda frame, score: vision_controlled_check(frame, score) ) # 加载样本图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: logging.error("无法加载标准样本图像") QMessageBox.warning(mainWindow, "错误", "无法加载标准样本图像", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if not template_matcher_thread.set_sample(sample_img): logging.warning("标准样本特征不足") QMessageBox.warning(mainWindow, "错误", "标准样本特征不足", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if not template_matcher_thread.isRunning(): logging.info("启动连续帧匹配线程") template_matcher_thread.start() elif template_matcher_thread and template_matcher_thread.isRunning(): logging.info("停止连续帧匹配线程") template_matcher_thread.stop() # 重置匹配分数显示 update_match_score_display(0.0) # 重置帧显示 if obj_cam_operation and obj_cam_operation.is_frame_available(): frame = obj_cam_operation.get_current_frame() if frame is not None: display_frame = frame.copy() # 添加状态信息 cv2.putText(display_frame, "Continuous Matching Disabled", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) update_frame_display(display_frame, 0.0, False) # 添加类型转换函数 def numpy_to_qimage(np_array): """将 numpy 数组转换为 QImage""" if np_array is None: return QImage() height, width, channel = np_array.shape bytes_per_line = 3 * width # 确保数据是连续的 if not np_array.flags['C_CONTIGUOUS']: np_array = np.ascontiguousarray(np_array) # 转换 BGR 到 RGB rgb_image = cv2.cvtColor(np_array, cv2.COLOR_BGR2RGB) # 创建 QImage qimg = QImage( rgb_image.data, width, height, bytes_per_line, QImage.Format_RGB888 ) # 复制数据以避免内存问题 return qimg.copy() # 修改 update_frame_display 函数 def update_frame_display(frame, match_score, is_matched): """更新主显示窗口(线程安全)""" # 确保在GUI线程中执行 if QThread.currentThread() != QApplication.instance().thread(): # 转换为 QImage 再传递 qimg = numpy_to_qimage(frame) QMetaObject.invokeMethod( mainWindow, "updateDisplay", Qt.QueuedConnection, Q_ARG(QImage, qimg), Q_ARG(float, match_score), Q_ARG(bool, is_matched) ) return # 如果已经在主线程,直接调用主窗口的更新方法 mainWindow.updateDisplay(frame, match_score, is_matched) def update_match_score_display(score): """更新匹配分数显示""" # 将分数转换为百分比显示 score_percent = score * 100 mainWindow.lblMatchScoreValue.setText(f"{score_percent:.1f}%") # 根据分数设置颜色 if score > 0.8: # 高于80%显示绿色 color = "green" elif score > 0.6: # 60%-80%显示黄色 color = "orange" else: # 低于60%显示红色 color = "red" mainWindow.lblMatchScoreValue.setStyleSheet(f"color: {color}; font-weight: bold;") def update_diff_display(diff_ratio, is_qualified): mainWindow.lblCurrentDiff.setText(f"当前差异度: {diff_ratio*100:.2f}%") if is_qualified: mainWindow.lblDiffStatus.setText("状态: 合格") mainWindow.lblDiffStatus.setStyleSheet("color: green; font-size: 12px;") else: mainWindow.lblDiffStatus.setText("状态: 不合格") mainWindow.lblDiffStatus.setStyleSheet("color: red; font-size: 12px;") def update_diff_threshold(value): mainWindow.lblDiffValue.setText(f"{value}%") def update_sample_display(): global current_sample_path if current_sample_path: mainWindow.lblSamplePath.setText(f"当前样本: {os.path.basename(current_sample_path)}") mainWindow.lblSamplePath.setToolTip(current_sample_path) mainWindow.bnPreviewSample.setEnabled(True) else: mainWindow.lblSamplePath.setText("当前样本: 未设置样本") mainWindow.bnPreviewSample.setEnabled(False) def update_history_display(): global detection_history mainWindow.cbHistory.clear() for i, result in enumerate(detection_history[-10:]): timestamp = result['timestamp'].strftime("%H:%M:%S") status = "合格" if result['qualified'] else "不合格" ratio = f"{result['diff_ratio']*100:.2f}%" trigger = "视觉" if result['trigger_type'] == 'vision' else "手动" mainWindow.cbHistory.addItem(f"[{trigger} {timestamp}] {status} - 差异: {ratio}") def update_match_threshold(value): """更新匹配阈值显示并应用到匹配器""" global template_matcher_thread # 更新UI显示 if mainWindow: mainWindow.lblThresholdValue.setText(f"{value}%") # 如果匹配线程存在,更新其匹配阈值 if template_matcher_thread: # 转换为0-1范围的浮点数 threshold = value / 100.0 template_matcher_thread.set_threshold(threshold) logging.debug(f"更新匹配阈值: {threshold:.2f}") # ==================== 主窗口类 ==================== class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("布料印花检测系统 - 连续匹配版") self.resize(1200, 800) central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # 设备枚举区域 device_layout = QHBoxLayout() self.ComboDevices = QComboBox() self.bnEnum = QPushButton("枚举设备") self.bnOpen = QPushButton("打开设备") self.bnClose = QPushButton("关闭设备") device_layout.addWidget(self.ComboDevices) device_layout.addWidget(self.bnEnum) device_layout.addWidget(self.bnOpen) device_layout.addWidget(self.bnClose) main_layout.addLayout(device_layout) # 取流控制组 self.groupGrab = QGroupBox("取流控制") grab_layout = QHBoxLayout(self.groupGrab) self.bnStart = QPushButton("开始取流") self.bnStop = QPushButton("停止取流") self.radioContinueMode = QRadioButton("连续模式") self.radioTriggerMode = QRadioButton("触发模式") self.bnSoftwareTrigger = QPushButton("软触发") grab_layout.addWidget(self.bnStart) grab_layout.addWidget(self.bnStop) grab_layout.addWidget(self.radioContinueMode) grab_layout.addWidget(self.radioTriggerMode) grab_layout.addWidget(self.bnSoftwareTrigger) main_layout.addWidget(self.groupGrab) # 参数设置组 self.paramgroup = QGroupBox("相机参数") param_layout = QGridLayout(self.paramgroup) self.edtExposureTime = QLineEdit() self.edtGain = QLineEdit() self.edtFrameRate = QLineEdit() self.bnGetParam = QPushButton("获取参数") self.bnSetParam = QPushButton("设置参数") self.bnSaveImage = QPushButton("保存图像") param_layout.addWidget(QLabel("曝光时间:"), 0, 0) param_layout.addWidget(self.edtExposureTime, 0, 1) param_layout.addWidget(self.bnGetParam, 0, 2) param_layout.addWidget(QLabel("增益:"), 1, 0) param_layout.addWidget(self.edtGain, 1, 1) param_layout.addWidget(self.bnSetParam, 1, 2) param_layout.addWidget(QLabel("帧率:"), 2, 0) param_layout.addWidget(self.edtFrameRate, 2, 1) param_layout.addWidget(self.bnSaveImage, 2, 2) main_layout.addWidget(self.paramgroup) # 图像显示区域 self.widgetDisplay = QLabel() self.widgetDisplay.setMinimumSize(640, 480) self.widgetDisplay.setStyleSheet("background-color: black;") self.widgetDisplay.setAlignment(Qt.AlignCenter) self.widgetDisplay.setText("相机预览区域") main_layout.addWidget(self.widgetDisplay, 1) # 创建自定义UI组件 self.setup_custom_ui() # 添加阈值自适应定时器 self.threshold_timer = QTimer() self.threshold_timer.timeout.connect(self.auto_adjust_threshold) self.threshold_timer.start(2000) # 每2秒调整一次 def auto_adjust_threshold(self): """根据环境亮度自动调整匹配阈值""" if not obj_cam_operation or not isGrabbing: return # 获取当前帧并计算平均亮度 frame = obj_cam_operation.get_current_frame() if frame is None: return gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) brightness = np.mean(gray) # 根据亮度动态调整阈值 (亮度低时降低阈值要求) if brightness < 50: # 暗环境 new_threshold = 40 # 40% elif brightness > 200: # 亮环境 new_threshold = 65 # 65% else: # 正常环境 new_threshold = 55 # 55% # 更新UI self.sliderThreshold.setValue(new_threshold) self.lblThresholdValue.setText(f"{new_threshold}%") # 更新匹配器阈值 update_match_threshold(new_threshold) # 状态栏显示调整信息 self.statusBar().showMessage(f"亮度: {brightness:.1f}, 自动调整阈值至: {new_threshold}%", 3000) # 处理不同通道数的图像 if len(frame.shape) == 2 or frame.shape[2] == 1: # 已经是灰度图 gray = frame elif frame.shape[2] == 3: # 三通道彩色图 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) elif frame.shape[2] == 4: # 四通道图(带alpha) gray = cv2.cvtColor(frame, cv2.COLOR_BGRA2GRAY) else: # 其他通道数,无法处理 logging.warning(f"无法处理的图像格式: {frame.shape}") return # 计算平均亮度 brightness = np.mean(gray) def setup_custom_ui(self): # 工具栏 toolbar = self.addToolBar("检测工具") self.bnCheckPrint = QPushButton("手动检测") self.bnSaveSample = QPushButton("保存标准样本") self.bnPreviewSample = QPushButton("预览样本") self.cbHistory = QComboBox() self.cbHistory.setMinimumWidth(300) toolbar.addWidget(self.bnCheckPrint) toolbar.addWidget(self.bnSaveSample) toolbar.addWidget(self.bnPreviewSample) toolbar.addWidget(QLabel("历史记录:")) toolbar.addWidget(self.cbHistory) # 状态栏样本路径 self.lblSamplePath = QLabel("当前样本: 未设置样本") self.statusBar().addPermanentWidget(self.lblSamplePath) # 右侧面板 right_panel = QWidget() right_layout = QVBoxLayout(right_panel) right_layout.setContentsMargins(10, 10, 10, 10) # 差异度调整组 diff_group = QGroupBox("差异度调整") diff_layout = QVBoxLayout(diff_group) self.lblDiffThreshold = QLabel("差异度阈值 (0-100%):") self.sliderDiffThreshold = QSlider(Qt.Horizontal) self.sliderDiffThreshold.setRange(0, 100) self.sliderDiffThreshold.setValue(5) self.lblDiffValue = QLabel("5%") self.lblCurrentDiff = QLabel("当前差异度: -") self.lblCurrentDiff.setStyleSheet("font-size: 14px; font-weight: bold;") self.lblDiffStatus = QLabel("状态: 未检测") self.lblDiffStatus.setStyleSheet("font-size: 12px;") diff_layout.addWidget(self.lblDiffThreshold) diff_layout.addWidget(self.sliderDiffThreshold) diff_layout.addWidget(self.lblDiffValue) diff_layout.addWidget(self.lblCurrentDiff) diff_layout.addWidget(self.lblDiffStatus) right_layout.addWidget(diff_group) # ===== 连续匹配面板 ===== match_group = QGroupBox("连续帧匹配") match_layout = QVBoxLayout(match_group) # 样本设置 sample_layout = QHBoxLayout() self.bnSetSample = QPushButton("设置标准样本") self.bnPreviewSample = QPushButton("预览样本") self.lblSampleStatus = QLabel("状态: 未设置样本") sample_layout.addWidget(self.bnSetSample) sample_layout.addWidget(self.bnPreviewSample) sample_layout.addWidget(self.lblSampleStatus) match_layout.addLayout(sample_layout) # 匹配参数 param_layout = QHBoxLayout() self.lblMatchThreshold = QLabel("匹配阈值:") self.sliderThreshold = QSlider(Qt.Horizontal) self.sliderThreshold.setRange(50, 100) self.sliderThreshold.setValue(75) # 降低默认阈值 self.lblThresholdValue = QLabel("75%") param_layout.addWidget(self.lblMatchThreshold) param_layout.addWidget(self.sliderThreshold) param_layout.addWidget(self.lblThresholdValue) match_layout.addLayout(param_layout) # 匹配分数显示 match_score_layout = QHBoxLayout() self.lblMatchScore = QLabel("实时匹配分数:") self.lblMatchScoreValue = QLabel("0.0%") self.lblMatchScoreValue.setStyleSheet("font-weight: bold;") match_score_layout.addWidget(self.lblMatchScore) match_score_layout.addWidget(self.lblMatchScoreValue) match_layout.addLayout(match_score_layout) # 连续匹配开关 self.chkContinuousMatch = QCheckBox("启用连续帧匹配") self.chkContinuousMatch.setChecked(False) match_layout.addWidget(self.chkContinuousMatch) right_layout.addWidget(match_group) right_layout.addStretch(1) # 停靠窗口 dock = QDockWidget("检测控制面板", self) dock.setWidget(right_panel) dock.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) self.addDockWidget(Qt.RightDockWidgetArea, dock) @pyqtSlot(QImage, float, bool) def updateDisplay(self, qimg, match_score, is_matched): """线程安全的显示更新方法(只接收 QImage)""" if qimg.isNull(): return # 创建QPixmap并缩放 pixmap = QPixmap.fromImage(qimg) scaled_pixmap = pixmap.scaled( self.widgetDisplay.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation ) # 更新显示 self.widgetDisplay.setPixmap(scaled_pixmap) self.widgetDisplay.setAlignment(Qt.AlignCenter) # 更新显示 self.widgetDisplay.setPixmap(scaled_pixmap) self.widgetDisplay.setAlignment(Qt.AlignCenter) def closeEvent(self, event): logging.info("主窗口关闭,执行清理...") close_device() event.accept() # ===== 辅助函数 ===== def ToHexStr(num): if not isinstance(num, int): try: num = int(num) except: return f"<非整数:{type(num)}>" chaDic = {10: 'a', 11: 'b', 12: 'c', 13: 'd', 14: 'e', 15: 'f'} hexStr = "" if num < 0: num = num + 2 ** 32 while num >= 16: digit = num % 16 hexStr = chaDic.get(digit, str(digit)) + hexStr num //= 16 hexStr = chaDic.get(num, str(num)) + hexStr return "0x" + hexStr def enum_devices(): global deviceList, obj_cam_operation n_layer_type = ( MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE ) # 创建设备列表 deviceList = MV_CC_DEVICE_INFO_LIST() # 枚举设备 ret = MvCamera.MV_CC_EnumDevices(n_layer_type, deviceList) if ret != MV_OK: error_msg = f"枚举设备失败! 错误码: 0x{ret:x}" logging.error(error_msg) QMessageBox.warning(mainWindow, "错误", error_msg, QMessageBox.Ok) return ret if deviceList.nDeviceNum == 0: QMessageBox.warning(mainWindow, "提示", "未找到任何设备", QMessageBox.Ok) return MV_OK logging.info(f"找到 {deviceList.nDeviceNum} 个设备") # 处理设备信息 devList = [] for i in range(deviceList.nDeviceNum): # 获取设备信息 mvcc_dev_info = ctypes.cast( deviceList.pDeviceInfo[i], ctypes.POINTER(MV_CC_DEVICE_INFO) ).contents # 根据设备类型提取信息 if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: st_gige_info = mvcc_dev_info.SpecialInfo.stGigEInfo ip_addr = ( f"{(st_gige_info.nCurrentIp >> 24) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 16) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 8) & 0xFF}." f"{st_gige_info.nCurrentIp & 0xFF}" ) # 修复:将c_ubyte_Array_16转换为字节串再解码 user_defined_bytes = bytes(st_gige_info.chUserDefinedName) dev_name = f"GigE: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} ({ip_addr})") elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: st_usb_info = mvcc_dev_info.SpecialInfo.stUsb3VInfo serial = bytes(st_usb_info.chSerialNumber).decode('ascii', 'ignore').rstrip('\x00') # 修复:同样处理用户自定义名称 user_defined_bytes = bytes(st_usb_info.chUserDefinedName) dev_name = f"USB: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} (SN: {serial})") else: devList.append(f"[{i}] 未知设备类型: {mvcc_dev_info.nTLayerType}") # 更新UI mainWindow.ComboDevices.clear() mainWindow.ComboDevices.addItems(devList) if devList: mainWindow.ComboDevices.setCurrentIndex(0) mainWindow.statusBar().showMessage(f"找到 {deviceList.nDeviceNum} 个设备", 3000) return MV_OK def set_continue_mode(): ret = obj_cam_operation.set_trigger_mode(False) if ret != 0: strError = "设置连续模式失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(True) mainWindow.radioTriggerMode.setChecked(False) mainWindow.bnSoftwareTrigger.setEnabled(False) def set_software_trigger_mode(): ret = obj_cam_operation.set_trigger_mode(True) if ret != 0: strError = "设置触发模式失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(False) mainWindow.radioTriggerMode.setChecked(True) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing) def trigger_once(): ret = obj_cam_operation.trigger_once() if ret != 0: strError = "软触发失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) def save_sample_image(): global isGrabbing, obj_cam_operation, current_sample_path if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return # 尝试捕获当前帧 frame = obj_cam_operation.capture_frame() if frame is None: QMessageBox.warning(mainWindow, "无有效图像", "未捕获到有效图像,请检查相机状态!", QMessageBox.Ok) return # 确保图像有效 if frame.size == 0 or frame.shape[0] == 0 or frame.shape[1] == 0: QMessageBox.warning(mainWindow, "无效图像", "捕获的图像无效,请检查相机设置!", QMessageBox.Ok) return settings = QSettings("ClothInspection", "CameraApp") last_dir = settings.value("last_save_dir", os.path.join(os.getcwd(), "captures")) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") default_filename = f"sample_{timestamp}" file_path, selected_filter = QFileDialog.getSaveFileName( mainWindow, "保存标准样本图像", os.path.join(last_dir, default_filename), "BMP Files (*.bmp);;PNG Files (*.png);;JPEG Files (*.jpg);;所有文件 (*)", options=QFileDialog.DontUseNativeDialog ) if not file_path: return # 确保文件扩展名正确 file_extension = os.path.splitext(file_path)[1].lower() if not file_extension: if "BMP" in selected_filter: file_path += ".bmp" elif "PNG" in selected_filter: file_path += ".png" elif "JPEG" in selected_filter or "JPG" in selected_filter: file_path += ".jpg" else: file_path += ".bmp" file_extension = os.path.splitext(file_path)[1].lower() # 创建目录(如果不存在) directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) except OSError as e: QMessageBox.critical(mainWindow, "目录创建错误", f"无法创建目录 {directory}: {str(e)}", QMessageBox.Ok) return # 保存图像 try: # 使用OpenCV保存图像 if not cv2.imwrite(file_path, frame): raise Exception("OpenCV保存失败") # 更新状态 current_sample_path = file_path update_sample_display() settings.setValue("last_save_dir", os.path.dirname(file_path)) # 显示成功消息 QMessageBox.information(mainWindow, "成功", f"标准样本已保存至:\n{file_path}", QMessageBox.Ok) # 更新样本状态 mainWindow.lblSampleStatus.setText("状态: 样本已设置") mainWindow.lblSampleStatus.setStyleSheet("color: green;") except Exception as e: logging.error(f"保存图像失败: {str(e)}") QMessageBox.critical(mainWindow, "保存错误", f"保存图像时发生错误:\n{str(e)}", QMessageBox.Ok) def preview_sample(): global current_sample_path if not current_sample_path or not os.path.exists(current_sample_path): QMessageBox.warning(mainWindow, "错误", "请先设置有效的标准样本图像!", QMessageBox.Ok) return try: # 直接使用OpenCV加载图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: raise Exception("无法加载图像") # 显示图像 cv2.namedWindow("标准样本预览", cv2.WINDOW_NORMAL) cv2.resizeWindow("标准样本预览", 800, 600) cv2.imshow("标准样本预览", sample_img) cv2.waitKey(0) cv2.destroyAllWindows() except Exception as e: QMessageBox.warning(mainWindow, "错误", f"预览样本失败: {str(e)}", QMessageBox.Ok) def is_float(str): try: float(str) return True except ValueError: return False def get_param(): try: ret = obj_cam_operation.get_parameters() if ret != MV_OK: strError = "获取参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) else: mainWindow.edtExposureTime.setText("{0:.2f}".format(obj_cam_operation.exposure_time)) mainWindow.edtGain.setText("{0:.2f}".format(obj_cam_operation.gain)) mainWindow.edtFrameRate.setText("{0:.2f}".format(obj_cam_operation.frame_rate)) except Exception as e: error_msg = f"获取参数时发生错误: {str(e)}" QMessageBox.critical(mainWindow, "严重错误", error_msg, QMessageBox.Ok) def set_param(): frame_rate = mainWindow.edtFrameRate.text() exposure = mainWindow.edtExposureTime.text() gain = mainWindow.edtGain.text() if not (is_float(frame_rate) and is_float(exposure) and is_float(gain)): strError = "设置参数失败: 参数必须是有效的浮点数" QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) return MV_E_PARAMETER try: ret = obj_cam_operation.set_param( frame_rate=float(frame_rate), exposure_time=float(exposure), gain=float(gain) ) if ret != MV_OK: strError = "设置参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, "错误", strError, QMessageBox.Ok) except Exception as e: error_msg = f"设置参数时发生错误: {str(e)}" QMessageBox.critical(mainWindow, "严重错误", error_msg, QMessageBox.Ok) def enable_controls(): global isGrabbing, isOpen mainWindow.groupGrab.setEnabled(isOpen) mainWindow.paramgroup.setEnabled(isOpen) mainWindow.bnOpen.setEnabled(not isOpen) mainWindow.bnClose.setEnabled(isOpen) mainWindow.bnStart.setEnabled(isOpen and (not isGrabbing)) mainWindow.bnStop.setEnabled(isOpen and isGrabbing) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing and mainWindow.radioTriggerMode.isChecked()) mainWindow.bnSaveImage.setEnabled(isOpen and isGrabbing) mainWindow.bnCheckPrint.setEnabled(isOpen and isGrabbing) mainWindow.bnSaveSample.setEnabled(isOpen and isGrabbing) mainWindow.bnPreviewSample.setEnabled(bool(current_sample_path)) # 连续匹配控制 mainWindow.chkContinuousMatch.setEnabled(bool(current_sample_path) and isGrabbing) # ===== 相机帧监控线程 ===== class FrameMonitorThread(QThread): frame_status = pyqtSignal(str) # 用于发送状态消息的信号 def __init__(self, cam_operation): super().__init__() self.cam_operation = cam_operation self.running = True self.frame_count = 0 self.last_time = time.time() def run(self): """监控相机帧状态的主循环""" while self.running: try: if self.cam_operation and self.cam_operation.is_grabbing: # 获取帧统计信息 frame_info = self.get_frame_info() if frame_info: fps = frame_info.get('fps', 0) dropped = frame_info.get('dropped', 0) status = f"FPS: {fps:.1f} | 丢帧: {dropped}" self.frame_status.emit(status) else: self.frame_status.emit("取流中...") else: self.frame_status.emit("相机未取流") except Exception as e: self.frame_status.emit(f"监控错误: {str(e)}") # 每500ms检查一次 QThread.msleep(500) def stop(self): """停止监控线程""" self.running = False self.wait(1000) # 等待线程结束 def calculate_fps(self): """计算当前帧率""" current_time = time.time() elapsed = current_time - self.last_time if elapsed > 0: fps = self.frame_count / elapsed self.frame_count = 0 self.last_time = current_time return fps return 0 def get_frame_info(self): """获取帧信息""" try: # 更新帧计数 self.frame_count += 1 # 返回帧信息 return { 'fps': self.calculate_fps(), 'dropped': 0 # 实际应用中需要从相机获取真实丢帧数 } except Exception as e: logging.error(f"获取帧信息失败: {str(e)}") return None # ===== 主程序入口 ===== if __name__ == "__main__": # 配置日志系统 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("cloth_inspection_continuous.log"), logging.StreamHandler() ] ) logging.info("布料印花检测系统(连续匹配版)启动") app = QApplication(sys.argv) mainWindow = MainWindow() mainWindow.sliderThreshold.valueChanged.connect( lambda value: update_match_threshold(value) ) # 信号连接 mainWindow.sliderDiffThreshold.valueChanged.connect(update_diff_threshold) mainWindow.bnCheckPrint.clicked.connect(lambda: vision_controlled_check(None)) mainWindow.bnSaveSample.clicked.connect(save_sample_image) mainWindow.bnPreviewSample.clicked.connect(preview_sample) mainWindow.bnEnum.clicked.connect(enum_devices) mainWindow.bnOpen.clicked.connect(open_device) mainWindow.bnClose.clicked.connect(close_device) mainWindow.bnStart.clicked.connect(start_grabbing) mainWindow.bnStop.clicked.connect(stop_grabbing) mainWindow.bnSoftwareTrigger.clicked.connect(trigger_once) mainWindow.radioTriggerMode.clicked.connect(set_software_trigger_mode) mainWindow.radioContinueMode.clicked.connect(set_continue_mode) mainWindow.bnGetParam.clicked.connect(get_param) mainWindow.bnSetParam.clicked.connect(set_param) mainWindow.bnSaveImage.clicked.connect(save_sample_image) # 连续匹配信号连接 mainWindow.sliderThreshold.valueChanged.connect(update_match_score_display) mainWindow.chkContinuousMatch.stateChanged.connect(toggle_template_matching) mainWindow.show() app.exec_() close_device() sys.exit()

这个程序运行后也保存过样本后还是没有实现每一帧都与标准样本进行匹配比对(我是已经点了自动连续帧匹配后),是这个程序里的每一帧比对里有什么错误和问题吗?在运行过程中每一次画面变化界面里的匹配度数值也一直没有变化。 -- coding: utf-8 -- import sys import os import cv2 import numpy as np import math import time import logging from collections import deque from PyQt5.QtWidgets import ( QApplication, QMainWindow, QPushButton, QWidget, QVBoxLayout, QHBoxLayout, QMessageBox, QLabel, QFileDialog, QToolBox, QComboBox, QStatusBar, QGroupBox, QSlider, QDockWidget, QProgressDialog, QLineEdit, QRadioButton, QGridLayout, QSpinBox, QCheckBox, QDialog, QDialogButtonBox, QDoubleSpinBox, QProgressBar ) from PyQt5.QtCore import QRect, Qt, QSettings, QThread, pyqtSignal, QTimer,QMetaObject,pyqtSlot,QObject from PyQt5.QtGui import QImage, QPixmap from CamOperation_class import CameraOperation from MvCameraControl_class import * import ctypes from ctypes import cast, POINTER from datetime import datetime import skimage import platform from CameraParams_header import ( MV_GIGE_DEVICE, MV_USB_DEVICE, MV_GENTL_CAMERALINK_DEVICE, MV_GENTL_CXP_DEVICE, MV_GENTL_XOF_DEVICE ) ===== 全局配置 ===== 模板匹配参数 MATCH_THRESHOLD = 0.75 # 降低匹配置信度阈值以提高灵敏度 MIN_MATCH_COUNT = 10 # 最小匹配特征点数量 MIN_FRAME_INTERVAL = 0.1 # 最小检测间隔(秒) ===== 全局变量 ===== current_sample_path = “” detection_history = [] isGrabbing = False isOpen = False obj_cam_operation = None frame_monitor_thread = None template_matcher_thread = None MV_OK = 0 MV_E_CALLORDER = -2147483647 ==================== 优化后的质量检测算法 ==================== def enhanced_check_print_quality(sample_image_path, test_image, threshold=0.05): # 不再使用传感器数据调整阈值 adjusted_threshold = threshold try: sample_img_data = np.fromfile(sample_image_path, dtype=np.uint8) sample_image = cv2.imdecode(sample_img_data, cv2.IMREAD_GRAYSCALE) if sample_image is None: logging.error(f"无法解码样本图像: {sample_image_path}") return None, None, None except Exception as e: logging.exception(f"样本图像读取异常: {str(e)}") return None, None, None if len(test_image.shape) == 3: test_image_gray = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY) else: test_image_gray = test_image.copy() sample_image = cv2.GaussianBlur(sample_image, (5, 5), 0) test_image_gray = cv2.GaussianBlur(test_image_gray, (5, 5), 0) try: # 使用更鲁棒的SIFT特征检测器 sift = cv2.SIFT_create() keypoints1, descriptors1 = sift.detectAndCompute(sample_image, None) keypoints2, descriptors2 = sift.detectAndCompute(test_image_gray, None) if descriptors1 is None or descriptors2 is None: logging.warning("无法提取特征描述符,跳过配准") aligned_sample = sample_image else: # 使用FLANN匹配器提高匹配精度 FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) flann = cv2.FlannBasedMatcher(index_params, search_params) matches = flann.knnMatch(descriptors1, descriptors2, k=2) # 应用Lowe's比率测试筛选优质匹配 good_matches = [] for m, n in matches: if m.distance < 0.7 * n.distance: good_matches.append(m) if len(good_matches) > MIN_MATCH_COUNT: src_pts = np.float32([keypoints1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) dst_pts = np.float32([keypoints2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0) if H is not None: aligned_sample = cv2.warpPerspective( sample_image, H, (test_image_gray.shape[1], test_image_gray.shape[0]) ) logging.info("图像配准成功,使用配准后样本") else: aligned_sample = sample_image logging.warning("无法计算单应性矩阵,使用原始样本") else: aligned_sample = sample_image logging.warning(f"特征点匹配不足({len(good_matches)}/{MIN_MATCH_COUNT}),跳过图像配准") except Exception as e: logging.error(f"图像配准失败: {str(e)}") aligned_sample = sample_image try: if aligned_sample.shape != test_image_gray.shape: test_image_gray = cv2.resize(test_image_gray, (aligned_sample.shape[1], aligned_sample.shape[0])) except Exception as e: logging.error(f"图像调整大小失败: {str(e)}") return None, None, None try: from skimage.metrics import structural_similarity as compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True, data_range=255 ) except ImportError: from skimage.measure import compare_ssim ssim_score, ssim_diff = compare_ssim( aligned_sample, test_image_gray, full=True, gaussian_weights=True ) except Exception as e: logging.error(f"SSIM计算失败: {str(e)}") abs_diff = cv2.absdiff(aligned_sample, test_image_gray) ssim_diff = abs_diff.astype(np.float32) / 255.0 ssim_score = 1.0 - np.mean(ssim_diff) ssim_diff = (1 - ssim_diff) * 255 abs_diff = cv2.absdiff(aligned_sample, test_image_gray) combined_diff = cv2.addWeighted(ssim_diff.astype(np.uint8), 0.7, abs_diff, 0.3, 0) _, thresholded = cv2.threshold(combined_diff, 30, 255, cv2.THRESH_BINARY) kernel = np.ones((3, 3), np.uint8) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_OPEN, kernel) thresholded = cv2.morphologyEx(thresholded, cv2.MORPH_CLOSE, kernel) diff_pixels = np.count_nonzero(thresholded) total_pixels = aligned_sample.size diff_ratio = diff_pixels / total_pixels is_qualified = diff_ratio <= adjusted_threshold marked_image = cv2.cvtColor(test_image_gray, cv2.COLOR_GRAY2BGR) marked_image[thresholded == 255] = [0, 0, 255] # 放大缺陷标记 scale_factor = 2.0 # 放大2倍 marked_image = cv2.resize(marked_image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR) labels = skimage.measure.label(thresholded) properties = skimage.measure.regionprops(labels) for prop in properties: if prop.area > 50: y, x = prop.centroid # 根据放大比例调整坐标 x_scaled = int(x * scale_factor) y_scaled = int(y * scale_factor) cv2.putText(marked_image, f"Defect", (x_scaled, y_scaled), cv2.FONT_HERSHEY_SIMPLEX, 0.5 * scale_factor, (0, 255, 255), int(scale_factor)) return is_qualified, diff_ratio, marked_image ==================== 视觉触发的质量检测流程 ==================== def vision_controlled_check(capture_path=None): global current_sample_path, detection_history logging.info(“视觉触发质量检测启动”) # 如果没有提供抓拍路径,使用当前帧 if capture_path is None: frame = obj_cam_operation.get_current_frame() else: # 从文件加载抓拍的图像 frame = cv2.imread(capture_path) if frame is None: logging.error(f"无法加载抓拍图像: {capture_path}") frame = obj_cam_operation.get_current_frame() if frame is None: QMessageBox.warning(mainWindow, "错误", "无法获取当前帧图像!", QMessageBox.Ok) return progress = QProgressDialog("正在检测...", "取消", 0, 100, mainWindow) progress.setWindowModality(Qt.WindowModal) progress.setValue(10) try: diff_threshold = mainWindow.sliderDiffThreshold.value() / 100.0 logging.info(f"使用差异度阈值: {diff_threshold}") progress.setValue(30) is_qualified, diff_ratio, marked_image = enhanced_check_print_quality( current_sample_path, frame, threshold=diff_threshold ) progress.setValue(70) if is_qualified is None: QMessageBox.critical(mainWindow, "检测错误", "检测失败,请检查日志", QMessageBox.Ok) return logging.info(f"检测结果: 合格={is_qualified}, 差异={diff_ratio}") progress.setValue(90) update_diff_display(diff_ratio, is_qualified) result_text = f"印花是否合格: {'合格' if is_qualified else '不合格'}\n差异占比: {diff_ratio*100:.2f}%\n阈值: {diff_threshold*100:.2f}%" QMessageBox.information(mainWindow, "检测结果", result_text, QMessageBox.Ok) if marked_image is not None: # 创建可调整大小的窗口 cv2.namedWindow("缺陷标记结果", cv2.WINDOW_NORMAL) cv2.resizeWindow("缺陷标记结果", 800, 600) # 初始大小 cv2.imshow("缺陷标记结果", marked_image) cv2.waitKey(0) cv2.destroyAllWindows() detection_result = { 'timestamp': datetime.now(), 'qualified': is_qualified, 'diff_ratio': diff_ratio, 'threshold': diff_threshold, 'trigger_type': 'vision' if capture_path else 'manual' } detection_history.append(detection_result) update_history_display() progress.setValue(100) except Exception as e: logging.exception("印花检测失败") QMessageBox.critical(mainWindow, "检测错误", f"检测过程中发生错误: {str(e)}", QMessageBox.Ok) finally: progress.close() ==================== 相机操作函数 ==================== def open_device(): global deviceList, nSelCamIndex, obj_cam_operation, isOpen, frame_monitor_thread, mainWindow if isOpen: QMessageBox.warning(mainWindow, “Error”, ‘相机已打开!’, QMessageBox.Ok) return MV_E_CALLORDER nSelCamIndex = mainWindow.ComboDevices.currentIndex() if nSelCamIndex < 0: QMessageBox.warning(mainWindow, "Error", '请选择相机!', QMessageBox.Ok) return MV_E_CALLORDER # 创建相机控制对象 cam = MvCamera() # 初始化相机操作对象 obj_cam_operation = CameraOperation(cam, deviceList, nSelCamIndex) ret = obj_cam_operation.open_device() if 0 != ret: strError = "打开设备失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) isOpen = False else: set_continue_mode() get_param() isOpen = True enable_controls() # 创建并启动帧监控线程 frame_monitor_thread = FrameMonitorThread(obj_cam_operation) frame_monitor_thread.frame_status.connect(mainWindow.statusBar().showMessage) frame_monitor_thread.start() def start_grabbing(): global obj_cam_operation, isGrabbing, template_matcher_thread ret = obj_cam_operation.start_grabbing(mainWindow.widgetDisplay.winId()) if ret != 0: strError = "开始取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = True enable_controls() # 等待第一帧到达 QThread.msleep(500) if not obj_cam_operation.is_frame_available(): QMessageBox.warning(mainWindow, "警告", "开始取流后未接收到帧,请检查相机连接!", QMessageBox.Ok) # 如果启用了自动检测,启动检测线程 if mainWindow.chkContinuousMatch.isChecked(): toggle_template_matching(True) def stop_grabbing(): global obj_cam_operation, isGrabbing, template_matcher_thread ret = obj_cam_operation.Stop_grabbing() if ret != 0: strError = "停止取流失败 ret:" + ToHexStr(ret) QMessageBox.warning(mainWindow, "Error", strError, QMessageBox.Ok) else: isGrabbing = False enable_controls() # 停止模板匹配线程 if template_matcher_thread and template_matcher_thread.isRunning(): template_matcher_thread.stop() def close_device(): global isOpen, isGrabbing, obj_cam_operation, frame_monitor_thread, template_matcher_thread if frame_monitor_thread and frame_monitor_thread.isRunning(): frame_monitor_thread.stop() frame_monitor_thread.wait(2000) # 停止模板匹配线程 if template_matcher_thread and template_matcher_thread.isRunning(): template_matcher_thread.stop() template_matcher_thread.wait(2000) template_matcher_thread = None if isOpen and obj_cam_operation: obj_cam_operation.close_device() isOpen = False isGrabbing = False enable_controls() ==================== 连续帧匹配检测器 ==================== class ContinuousFrameMatcher(QThread): frame_processed = pyqtSignal(np.ndarray, float, bool) # 处理后的帧, 匹配分数, 是否匹配 match_score_updated = pyqtSignal(float) # 匹配分数更新信号 match_success = pyqtSignal(np.ndarray, float) # 新增: 匹配成功信号 (帧, 匹配分数) def __init__(self, cam_operation, parent=None): super().__init__(parent) self.cam_operation = cam_operation self.running = True self.sample_template = None self.min_match_count = MIN_MATCH_COUNT self.match_threshold = MATCH_THRESHOLD self.sample_kp = None self.sample_des = None self.current_match_score = 0.0 self.last_match_time = 0 self.frame_counter = 0 self.consecutive_fail_count = 0 self.last_trigger_time = 0 # 上次触发时间 self.cool_down = 0.2 # 冷却时间(秒) # 特征检测器 - 使用SIFT self.sift = cv2.SIFT_create() # 特征匹配器 - 使用FLANN提高匹配精度 FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) search_params = dict(checks=50) self.flann = cv2.FlannBasedMatcher(index_params, search_params) # 性能监控 self.processing_times = deque(maxlen=100) self.frame_rates = deque(maxlen=100) def set_sample(self, sample_img): """设置标准样本""" # ...现有代码不变... def process_frame(self, frame): is_matched=False match_score=0.0 processed_frame = frame.copy() # 默认返回原始帧 """处理帧:特征提取、匹配和可视化""" # 检查是否匹配成功且超过冷却时间 current_time = time.time() if is_matched and (current_time - self.last_trigger_time) > self.cool_down: self.last_trigger_time = current_time logging.info(f"匹配成功! 分数: {match_score:.2f}, 触发质量检测") # 发出匹配成功信号 self.match_success.emit(frame.copy(), match_score) return processed_frame, match_score, is_matched def run(self): """主处理循环 - 连续处理每一帧""" logging.info("连续帧匹配线程启动") self.last_match_time = time.time() self.consecutive_fail_count = 0 while self.running: start_time = time.time() # 检查相机状态 if not self.cam_operation or not self.cam_operation.is_grabbing: if self.consecutive_fail_count % 10 == 0: logging.debug("相机未取流,等待...") time.sleep(0.1) self.consecutive_fail_count += 1 continue # 获取当前帧 frame = self.cam_operation.get_current_frame() if frame is None: self.consecutive_fail_count += 1 if self.consecutive_fail_count % 10 == 0: logging.warning(f"连续{self.consecutive_fail_count}次获取帧失败") time.sleep(0.05) continue self.consecutive_fail_count = 0 try: # 处理帧 processed_frame, match_score, is_matched = self.process_frame(frame) # 发送处理结果 self.frame_processed.emit(processed_frame, match_score, is_matched) except Exception as e: logging.error(f"帧处理错误: {str(e)}") # 控制处理频率 processing_time = time.time() - start_time sleep_time = max(0.01, MIN_FRAME_INTERVAL - processing_time) time.sleep(sleep_time) logging.info("连续帧匹配线程退出") ==================== 模板匹配控制函数 ==================== def toggle_template_matching(state): global template_matcher_thread, current_sample_path logging.debug(f"切换连续匹配状态: {state}") if state == Qt.Checked and isGrabbing: # 确保已设置样本 if not current_sample_path: logging.warning("尝试启动连续匹配但未设置样本") QMessageBox.warning(mainWindow, "错误", "请先设置标准样本", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if template_matcher_thread is None: logging.info("创建新的连续帧匹配线程") template_matcher_thread = ContinuousFrameMatcher(obj_cam_operation) template_matcher_thread.frame_processed.connect(update_frame_display) template_matcher_thread.match_score_updated.connect(update_match_score_display) # 连接匹配成功信号 template_matcher_thread.match_success.connect( lambda frame, score: vision_controlled_check(frame, score) ) # 加载样本图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: logging.error("无法加载标准样本图像") QMessageBox.warning(mainWindow, "错误", "无法加载标准样本图像", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if not template_matcher_thread.set_sample(sample_img): logging.warning("标准样本特征不足") QMessageBox.warning(mainWindow, "错误", "标准样本特征不足", QMessageBox.Ok) mainWindow.chkContinuousMatch.setChecked(False) return if not template_matcher_thread.isRunning(): logging.info("启动连续帧匹配线程") template_matcher_thread.start() elif template_matcher_thread and template_matcher_thread.isRunning(): logging.info("停止连续帧匹配线程") template_matcher_thread.stop() # 重置匹配分数显示 update_match_score_display(0.0) # 重置帧显示 if obj_cam_operation and obj_cam_operation.is_frame_available(): frame = obj_cam_operation.get_current_frame() if frame is not None: display_frame = frame.copy() # 添加状态信息 cv2.putText(display_frame, "Continuous Matching Disabled", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) update_frame_display(display_frame, 0.0, False) ==================== UI更新函数 ==================== def update_frame_display(frame, match_score, is_matched): “”“更新主显示窗口(线程安全)”“” # 确保在GUI线程中执行 if QThread.currentThread() != QApplication.instance().thread(): QMetaObject.invokeMethod(mainWindow, "updateDisplay", Qt.QueuedConnection, Q_ARG(np.ndarray, frame), Q_ARG(float, match_score), Q_ARG(bool, is_matched)) return # 原始显示逻辑 if len(frame.shape) == 3: # 彩色图像 h, w, ch = frame.shape bytes_per_line = ch * w q_img = QImage(frame.data, w, h, bytes_per_line, QImage.Format_RGB888).rgbSwapped() else: # 灰度图像 h, w = frame.shape bytes_per_line = w q_img = QImage(frame.data, w, h, bytes_per_line, QImage.Format_Grayscale8) # 创建QPixmap并缩放以适应显示区域 pixmap = QPixmap.fromImage(q_img) scaled_pixmap = pixmap.scaled( mainWindow.widgetDisplay.width(), mainWindow.widgetDisplay.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation ) # 更新显示 mainWindow.widgetDisplay.setPixmap(scaled_pixmap) # 更新匹配分数显示 update_match_score_display(match_score) 在MainWindow类中添加 class MainWindow(QMainWindow): # …现有代码… @pyqtSlot(np.ndarray, float, bool) def updateDisplay(self, frame, match_score, is_matched): """线程安全的显示更新方法""" # 将OpenCV图像转换为Qt图像 if len(frame.shape) == 3: # 彩色图像 h, w, ch = frame.shape bytes_per_line = ch * w q_img = QImage(frame.data, w, h, bytes_per_line, QImage.Format_RGB888).rgbSwapped() else: # 灰度图像 h, w = frame.shape bytes_per_line = w q_img = QImage(frame.data, w, h, bytes_per_line, QImage.Format_Grayscale8) # 创建QPixmap并缩放 pixmap = QPixmap.fromImage(q_img) scaled_pixmap = pixmap.scaled( self.widgetDisplay.width(), self.widgetDisplay.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation ) # 更新显示 self.widgetDisplay.setPixmap(scaled_pixmap) update_match_score_display(match_score) ==================== UI更新函数 ==================== def update_frame_display(frame, match_score, is_matched): “”“更新主显示窗口”“” # 将OpenCV图像转换为Qt图像 if len(frame.shape) == 3: # 彩色图像 h, w, ch = frame.shape bytes_per_line = ch * w q_img = QImage(frame.data, w, h, bytes_per_line, QImage.Format_RGB888).rgbSwapped() else: # 灰度图像 h, w = frame.shape bytes_per_line = w q_img = QImage(frame.data, w, h, bytes_per_line, QImage.Format_Grayscale8) # 创建QPixmap并缩放以适应显示区域 pixmap = QPixmap.fromImage(q_img) scaled_pixmap = pixmap.scaled( mainWindow.widgetDisplay.width(), mainWindow.widgetDisplay.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation ) # 更新显示 mainWindow.widgetDisplay.setPixmap(scaled_pixmap) # 更新匹配分数显示 update_match_score_display(match_score) def update_match_score_display(score): “”“更新匹配分数显示”“” # 将分数转换为百分比显示 score_percent = score * 100 mainWindow.lblMatchScoreValue.setText(f"{score_percent:.1f}%") # 根据分数设置颜色 if score > 0.8: # 高于80%显示绿色 color = "green" elif score > 0.6: # 60%-80%显示黄色 color = "orange" else: # 低于60%显示红色 color = "red" mainWindow.lblMatchScoreValue.setStyleSheet(f"color: {color}; font-weight: bold;") def update_diff_display(diff_ratio, is_qualified): mainWindow.lblCurrentDiff.setText(f"当前差异度: {diff_ratio*100:.2f}%") if is_qualified: mainWindow.lblDiffStatus.setText(“状态: 合格”) mainWindow.lblDiffStatus.setStyleSheet(“color: green; font-size: 12px;”) else: mainWindow.lblDiffStatus.setText(“状态: 不合格”) mainWindow.lblDiffStatus.setStyleSheet(“color: red; font-size: 12px;”) def update_diff_threshold(value): mainWindow.lblDiffValue.setText(f"{value}%") def update_sample_display(): global current_sample_path if current_sample_path: mainWindow.lblSamplePath.setText(f"当前样本: {os.path.basename(current_sample_path)}") mainWindow.lblSamplePath.setToolTip(current_sample_path) mainWindow.bnPreviewSample.setEnabled(True) else: mainWindow.lblSamplePath.setText(“当前样本: 未设置样本”) mainWindow.bnPreviewSample.setEnabled(False) def update_history_display(): global detection_history mainWindow.cbHistory.clear() for i, result in enumerate(detection_history[-10:]): timestamp = result[‘timestamp’].strftime(“%H:%M:%S”) status = “合格” if result[‘qualified’] else “不合格” ratio = f"{result[‘diff_ratio’]*100:.2f}%" trigger = “视觉” if result[‘trigger_type’] == ‘vision’ else “手动” mainWindow.cbHistory.addItem(f"[{trigger} {timestamp}] {status} - 差异: {ratio}") ==================== 主窗口类 ==================== class MainWindow(QMainWindow): def init(self): super().init() self.setWindowTitle(“布料印花检测系统 - 连续匹配版”) self.resize(1200, 800) central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # 设备枚举区域 device_layout = QHBoxLayout() self.ComboDevices = QComboBox() self.bnEnum = QPushButton("枚举设备") self.bnOpen = QPushButton("打开设备") self.bnClose = QPushButton("关闭设备") device_layout.addWidget(self.ComboDevices) device_layout.addWidget(self.bnEnum) device_layout.addWidget(self.bnOpen) device_layout.addWidget(self.bnClose) main_layout.addLayout(device_layout) # 取流控制组 self.groupGrab = QGroupBox("取流控制") grab_layout = QHBoxLayout(self.groupGrab) self.bnStart = QPushButton("开始取流") self.bnStop = QPushButton("停止取流") self.radioContinueMode = QRadioButton("连续模式") self.radioTriggerMode = QRadioButton("触发模式") self.bnSoftwareTrigger = QPushButton("软触发") grab_layout.addWidget(self.bnStart) grab_layout.addWidget(self.bnStop) grab_layout.addWidget(self.radioContinueMode) grab_layout.addWidget(self.radioTriggerMode) grab_layout.addWidget(self.bnSoftwareTrigger) main_layout.addWidget(self.groupGrab) # 参数设置组 self.paramgroup = QGroupBox("相机参数") param_layout = QGridLayout(self.paramgroup) self.edtExposureTime = QLineEdit() self.edtGain = QLineEdit() self.edtFrameRate = QLineEdit() self.bnGetParam = QPushButton("获取参数") self.bnSetParam = QPushButton("设置参数") self.bnSaveImage = QPushButton("保存图像") param_layout.addWidget(QLabel("曝光时间:"), 0, 0) param_layout.addWidget(self.edtExposureTime, 0, 1) param_layout.addWidget(self.bnGetParam, 0, 2) param_layout.addWidget(QLabel("增益:"), 1, 0) param_layout.addWidget(self.edtGain, 1, 1) param_layout.addWidget(self.bnSetParam, 1, 2) param_layout.addWidget(QLabel("帧率:"), 2, 0) param_layout.addWidget(self.edtFrameRate, 2, 1) param_layout.addWidget(self.bnSaveImage, 2, 2) main_layout.addWidget(self.paramgroup) # 图像显示区域 self.widgetDisplay = QLabel() self.widgetDisplay.setMinimumSize(640, 480) self.widgetDisplay.setStyleSheet("background-color: black;") self.widgetDisplay.setAlignment(Qt.AlignCenter) self.widgetDisplay.setText("相机预览区域") main_layout.addWidget(self.widgetDisplay, 1) # 创建自定义UI组件 self.setup_custom_ui() def setup_custom_ui(self): # 工具栏 toolbar = self.addToolBar("检测工具") self.bnCheckPrint = QPushButton("手动检测") self.bnSaveSample = QPushButton("保存标准样本") self.bnPreviewSample = QPushButton("预览样本") self.cbHistory = QComboBox() self.cbHistory.setMinimumWidth(300) toolbar.addWidget(self.bnCheckPrint) toolbar.addWidget(self.bnSaveSample) toolbar.addWidget(self.bnPreviewSample) toolbar.addWidget(QLabel("历史记录:")) toolbar.addWidget(self.cbHistory) # 状态栏样本路径 self.lblSamplePath = QLabel("当前样本: 未设置样本") self.statusBar().addPermanentWidget(self.lblSamplePath) # 右侧面板 right_panel = QWidget() right_layout = QVBoxLayout(right_panel) right_layout.setContentsMargins(10, 10, 10, 10) # 差异度调整组 diff_group = QGroupBox("差异度调整") diff_layout = QVBoxLayout(diff_group) self.lblDiffThreshold = QLabel("差异度阈值 (0-100%):") self.sliderDiffThreshold = QSlider(Qt.Horizontal) self.sliderDiffThreshold.setRange(0, 100) self.sliderDiffThreshold.setValue(5) self.lblDiffValue = QLabel("5%") self.lblCurrentDiff = QLabel("当前差异度: -") self.lblCurrentDiff.setStyleSheet("font-size: 14px; font-weight: bold;") self.lblDiffStatus = QLabel("状态: 未检测") self.lblDiffStatus.setStyleSheet("font-size: 12px;") diff_layout.addWidget(self.lblDiffThreshold) diff_layout.addWidget(self.sliderDiffThreshold) diff_layout.addWidget(self.lblDiffValue) diff_layout.addWidget(self.lblCurrentDiff) diff_layout.addWidget(self.lblDiffStatus) right_layout.addWidget(diff_group) # ===== 连续匹配面板 ===== match_group = QGroupBox("连续帧匹配") match_layout = QVBoxLayout(match_group) # 样本设置 sample_layout = QHBoxLayout() self.bnSetSample = QPushButton("设置标准样本") self.bnPreviewSample = QPushButton("预览样本") self.lblSampleStatus = QLabel("状态: 未设置样本") sample_layout.addWidget(self.bnSetSample) sample_layout.addWidget(self.bnPreviewSample) sample_layout.addWidget(self.lblSampleStatus) match_layout.addLayout(sample_layout) # 匹配参数 param_layout = QHBoxLayout() self.lblMatchThreshold = QLabel("匹配阈值:") self.sliderThreshold = QSlider(Qt.Horizontal) self.sliderThreshold.setRange(50, 100) self.sliderThreshold.setValue(75) # 降低默认阈值 self.lblThresholdValue = QLabel("75%") param_layout.addWidget(self.lblMatchThreshold) param_layout.addWidget(self.sliderThreshold) param_layout.addWidget(self.lblThresholdValue) match_layout.addLayout(param_layout) # 匹配分数显示 match_score_layout = QHBoxLayout() self.lblMatchScore = QLabel("实时匹配分数:") self.lblMatchScoreValue = QLabel("0.0%") self.lblMatchScoreValue.setStyleSheet("font-weight: bold;") match_score_layout.addWidget(self.lblMatchScore) match_score_layout.addWidget(self.lblMatchScoreValue) match_layout.addLayout(match_score_layout) # 连续匹配开关 self.chkContinuousMatch = QCheckBox("启用连续帧匹配") self.chkContinuousMatch.setChecked(False) match_layout.addWidget(self.chkContinuousMatch) right_layout.addWidget(match_group) right_layout.addStretch(1) # 停靠窗口 dock = QDockWidget("检测控制面板", self) dock.setWidget(right_panel) dock.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) self.addDockWidget(Qt.RightDockWidgetArea, dock) def setup_connections(self): # 连接匹配成功信号到质量检测 if template_matcher_thread: template_matcher_thread.match_success.connect( lambda frame, score: vision_controlled_check(frame, score) ) def closeEvent(self, event): logging.info("主窗口关闭,执行清理...") close_device() event.accept() ===== 辅助函数 ===== def ToHexStr(num): if not isinstance(num, int): try: num = int(num) except: return f"<非整数:{type(num)}>" chaDic = {10: ‘a’, 11: ‘b’, 12: ‘c’, 13: ‘d’, 14: ‘e’, 15: ‘f’} hexStr = “” if num < 0: num = num + 2 ** 32 while num >= 16: digit = num % 16 hexStr = chaDic.get(digit, str(digit)) + hexStr num //= 16 hexStr = chaDic.get(num, str(num)) + hexStr return “0x” + hexStr def enum_devices(): global deviceList, obj_cam_operation n_layer_type = ( MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE ) # 创建设备列表 deviceList = MV_CC_DEVICE_INFO_LIST() # 枚举设备 ret = MvCamera.MV_CC_EnumDevices(n_layer_type, deviceList) if ret != MV_OK: error_msg = f"枚举设备失败! 错误码: 0x{ret:x}" logging.error(error_msg) QMessageBox.warning(mainWindow, "错误", error_msg, QMessageBox.Ok) return ret if deviceList.nDeviceNum == 0: QMessageBox.warning(mainWindow, "提示", "未找到任何设备", QMessageBox.Ok) return MV_OK logging.info(f"找到 {deviceList.nDeviceNum} 个设备") # 处理设备信息 devList = [] for i in range(deviceList.nDeviceNum): # 获取设备信息 mvcc_dev_info = ctypes.cast( deviceList.pDeviceInfo[i], ctypes.POINTER(MV_CC_DEVICE_INFO) ).contents # 根据设备类型提取信息 if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: st_gige_info = mvcc_dev_info.SpecialInfo.stGigEInfo ip_addr = ( f"{(st_gige_info.nCurrentIp >> 24) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 16) & 0xFF}." f"{(st_gige_info.nCurrentIp >> 8) & 0xFF}." f"{st_gige_info.nCurrentIp & 0xFF}" ) # 修复:将c_ubyte_Array_16转换为字节串再解码 user_defined_bytes = bytes(st_gige_info.chUserDefinedName) dev_name = f"GigE: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} ({ip_addr})") elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: st_usb_info = mvcc_dev_info.SpecialInfo.stUsb3VInfo serial = bytes(st_usb_info.chSerialNumber).decode('ascii', 'ignore').rstrip('\x00') # 修复:同样处理用户自定义名称 user_defined_bytes = bytes(st_usb_info.chUserDefinedName) dev_name = f"USB: {user_defined_bytes.decode('gbk', 'ignore')}" devList.append(f"[{i}] {dev_name} (SN: {serial})") else: devList.append(f"[{i}] 未知设备类型: {mvcc_dev_info.nTLayerType}") # 更新UI mainWindow.ComboDevices.clear() mainWindow.ComboDevices.addItems(devList) if devList: mainWindow.ComboDevices.setCurrentIndex(0) mainWindow.statusBar().showMessage(f"找到 {deviceList.nDeviceNum} 个设备", 3000) return MV_OK def set_continue_mode(): ret = obj_cam_operation.set_trigger_mode(False) if ret != 0: strError = “设置连续模式失败 ret:” + ToHexStr(ret) QMessageBox.warning(mainWindow, “Error”, strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(True) mainWindow.radioTriggerMode.setChecked(False) mainWindow.bnSoftwareTrigger.setEnabled(False) def set_software_trigger_mode(): ret = obj_cam_operation.set_trigger_mode(True) if ret != 0: strError = “设置触发模式失败 ret:” + ToHexStr(ret) QMessageBox.warning(mainWindow, “Error”, strError, QMessageBox.Ok) else: mainWindow.radioContinueMode.setChecked(False) mainWindow.radioTriggerMode.setChecked(True) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing) def trigger_once(): ret = obj_cam_operation.trigger_once() if ret != 0: strError = “软触发失败 ret:” + ToHexStr(ret) QMessageBox.warning(mainWindow, “Error”, strError, QMessageBox.Ok) def save_sample_image(): global isGrabbing, obj_cam_operation, current_sample_path if not isGrabbing: QMessageBox.warning(mainWindow, "错误", "请先开始取流并捕获图像!", QMessageBox.Ok) return # 尝试捕获当前帧 frame = obj_cam_operation.capture_frame() if frame is None: QMessageBox.warning(mainWindow, "无有效图像", "未捕获到有效图像,请检查相机状态!", QMessageBox.Ok) return # 确保图像有效 if frame.size == 0 or frame.shape[0] == 0 or frame.shape[1] == 0: QMessageBox.warning(mainWindow, "无效图像", "捕获的图像无效,请检查相机设置!", QMessageBox.Ok) return settings = QSettings("ClothInspection", "CameraApp") last_dir = settings.value("last_save_dir", os.path.join(os.getcwd(), "captures")) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") default_filename = f"sample_{timestamp}" file_path, selected_filter = QFileDialog.getSaveFileName( mainWindow, "保存标准样本图像", os.path.join(last_dir, default_filename), "BMP Files (*.bmp);;PNG Files (*.png);;JPEG Files (*.jpg);;所有文件 (*)", options=QFileDialog.DontUseNativeDialog ) if not file_path: return # 确保文件扩展名正确 file_extension = os.path.splitext(file_path)[1].lower() if not file_extension: if "BMP" in selected_filter: file_path += ".bmp" elif "PNG" in selected_filter: file_path += ".png" elif "JPEG" in selected_filter or "JPG" in selected_filter: file_path += ".jpg" else: file_path += ".bmp" file_extension = os.path.splitext(file_path)[1].lower() # 创建目录(如果不存在) directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) except OSError as e: QMessageBox.critical(mainWindow, "目录创建错误", f"无法创建目录 {directory}: {str(e)}", QMessageBox.Ok) return # 保存图像 try: # 使用OpenCV保存图像 if not cv2.imwrite(file_path, frame): raise Exception("OpenCV保存失败") # 更新状态 current_sample_path = file_path update_sample_display() settings.setValue("last_save_dir", os.path.dirname(file_path)) # 显示成功消息 QMessageBox.information(mainWindow, "成功", f"标准样本已保存至:\n{file_path}", QMessageBox.Ok) # 更新样本状态 mainWindow.lblSampleStatus.setText("状态: 样本已设置") mainWindow.lblSampleStatus.setStyleSheet("color: green;") except Exception as e: logging.error(f"保存图像失败: {str(e)}") QMessageBox.critical(mainWindow, "保存错误", f"保存图像时发生错误:\n{str(e)}", QMessageBox.Ok) def preview_sample(): global current_sample_path if not current_sample_path or not os.path.exists(current_sample_path): QMessageBox.warning(mainWindow, “错误”, “请先设置有效的标准样本图像!”, QMessageBox.Ok) return try: # 直接使用OpenCV加载图像 sample_img = cv2.imread(current_sample_path) if sample_img is None: raise Exception("无法加载图像") # 显示图像 cv2.namedWindow("标准样本预览", cv2.WINDOW_NORMAL) cv2.resizeWindow("标准样本预览", 800, 600) cv2.imshow("标准样本预览", sample_img) cv2.waitKey(0) cv2.destroyAllWindows() except Exception as e: QMessageBox.warning(mainWindow, "错误", f"预览样本失败: {str(e)}", QMessageBox.Ok) def is_float(str): try: float(str) return True except ValueError: return False def get_param(): try: ret = obj_cam_operation.get_parameters() if ret != MV_OK: strError = “获取参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, “错误”, strError, QMessageBox.Ok) else: mainWindow.edtExposureTime.setText(”{0:.2f}“.format(obj_cam_operation.exposure_time)) mainWindow.edtGain.setText(”{0:.2f}“.format(obj_cam_operation.gain)) mainWindow.edtFrameRate.setText(”{0:.2f}“.format(obj_cam_operation.frame_rate)) except Exception as e: error_msg = f"获取参数时发生错误: {str(e)}” QMessageBox.critical(mainWindow, “严重错误”, error_msg, QMessageBox.Ok) def set_param(): frame_rate = mainWindow.edtFrameRate.text() exposure = mainWindow.edtExposureTime.text() gain = mainWindow.edtGain.text() if not (is_float(frame_rate) and is_float(exposure) and is_float(gain)): strError = “设置参数失败: 参数必须是有效的浮点数” QMessageBox.warning(mainWindow, “错误”, strError, QMessageBox.Ok) return MV_E_PARAMETER try: ret = obj_cam_operation.set_param( frame_rate=float(frame_rate), exposure_time=float(exposure), gain=float(gain) ) if ret != MV_OK: strError = “设置参数失败,错误码: " + ToHexStr(ret) QMessageBox.warning(mainWindow, “错误”, strError, QMessageBox.Ok) except Exception as e: error_msg = f"设置参数时发生错误: {str(e)}” QMessageBox.critical(mainWindow, “严重错误”, error_msg, QMessageBox.Ok) def enable_controls(): global isGrabbing, isOpen mainWindow.groupGrab.setEnabled(isOpen) mainWindow.paramgroup.setEnabled(isOpen) mainWindow.bnOpen.setEnabled(not isOpen) mainWindow.bnClose.setEnabled(isOpen) mainWindow.bnStart.setEnabled(isOpen and (not isGrabbing)) mainWindow.bnStop.setEnabled(isOpen and isGrabbing) mainWindow.bnSoftwareTrigger.setEnabled(isGrabbing and mainWindow.radioTriggerMode.isChecked()) mainWindow.bnSaveImage.setEnabled(isOpen and isGrabbing) mainWindow.bnCheckPrint.setEnabled(isOpen and isGrabbing) mainWindow.bnSaveSample.setEnabled(isOpen and isGrabbing) mainWindow.bnPreviewSample.setEnabled(bool(current_sample_path)) # 连续匹配控制 mainWindow.chkContinuousMatch.setEnabled(bool(current_sample_path) and isGrabbing) ===== 相机帧监控线程 ===== class FrameMonitorThread(QThread): frame_status = pyqtSignal(str) # 用于发送状态消息的信号 def __init__(self, cam_operation): super().__init__() self.cam_operation = cam_operation self.running = True self.frame_count = 0 self.last_time = time.time() def run(self): """监控相机帧状态的主循环""" while self.running: try: if self.cam_operation and self.cam_operation.is_grabbing: # 获取帧统计信息 frame_info = self.get_frame_info() if frame_info: fps = frame_info.get('fps', 0) dropped = frame_info.get('dropped', 0) status = f"FPS: {fps:.1f} | 丢帧: {dropped}" self.frame_status.emit(status) else: self.frame_status.emit("取流中...") else: self.frame_status.emit("相机未取流") except Exception as e: self.frame_status.emit(f"监控错误: {str(e)}") # 每500ms检查一次 QThread.msleep(500) def stop(self): """停止监控线程""" self.running = False self.wait(1000) # 等待线程结束 def calculate_fps(self): """计算当前帧率""" current_time = time.time() elapsed = current_time - self.last_time if elapsed > 0: fps = self.frame_count / elapsed self.frame_count = 0 self.last_time = current_time return fps return 0 def get_frame_info(self): """获取帧信息""" try: # 更新帧计数 self.frame_count += 1 # 返回帧信息 return { 'fps': self.calculate_fps(), 'dropped': 0 # 实际应用中需要从相机获取真实丢帧数 } except Exception as e: logging.error(f"获取帧信息失败: {str(e)}") return None ===== 主程序入口 ===== if name == “main”: # 配置日志系统 logging.basicConfig( level=logging.DEBUG, format=‘%(asctime)s - %(name)s - %(levelname)s - %(message)s’, handlers=[ logging.FileHandler(“cloth_inspection_continuous.log”), logging.StreamHandler() ] ) logging.info(“布料印花检测系统(连续匹配版)启动”) app = QApplication(sys.argv) mainWindow = MainWindow() # 信号连接 mainWindow.sliderDiffThreshold.valueChanged.connect(update_diff_threshold) mainWindow.bnCheckPrint.clicked.connect(lambda: vision_controlled_check(None)) mainWindow.bnSaveSample.clicked.connect(save_sample_image) mainWindow.bnPreviewSample.clicked.connect(preview_sample) mainWindow.bnEnum.clicked.connect(enum_devices) mainWindow.bnOpen.clicked.connect(open_device) mainWindow.bnClose.clicked.connect(close_device) mainWindow.bnStart.clicked.connect(start_grabbing) mainWindow.bnStop.clicked.connect(stop_grabbing) mainWindow.bnSoftwareTrigger.clicked.connect(trigger_once) mainWindow.radioTriggerMode.clicked.connect(set_software_trigger_mode) mainWindow.radioContinueMode.clicked.connect(set_continue_mode) mainWindow.bnGetParam.clicked.connect(get_param) mainWindow.bnSetParam.clicked.connect(set_param) mainWindow.bnSaveImage.clicked.connect(save_sample_image) # 连续匹配信号连接 mainWindow.sliderThreshold.valueChanged.connect(update_match_score_display) mainWindow.chkContinuousMatch.stateChanged.connect(toggle_template_matching) mainWindow.show() app.exec_() close_device() sys.exit()

import os import pandas as pd import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D import numpy as np # ----------------------------- # 参数设定 # ----------------------------- # 文件夹路径 data_dir = 'D:/MyPythonProjects/ev_supplychain_sim/data/遗忘率创新性实验' # 根据你的路径调整 # 策略组合标签(与文件名中的 ll, lm 等对应) strategies = ['ll', 'lm', 'lh', 'ml', 'mm', 'mh', 'hl', 'hm', 'hh'] # 生成文件路径字典 file_map = {s: os.path.join(data_dir, f'knowledge_evolution{s}.csv') for s in strategies} # ----------------------------- # 读取数据并组织成表格 # ----------------------------- all_data = [] for strat in strategies: df = pd.read_csv(file_map[strat], sep='\t' if '.tsv' in file_map[strat] else ',') for _, row in df.iterrows(): all_data.append({ 'Step': row['Step'], 'Strategy': strat, 'Total_Average': row['Total_Average'] }) # 转为 DataFrame df_all = pd.DataFrame(all_data) # ----------------------------- # 创建网格数据用于3D Surface绘图 # ----------------------------- # 将策略标签映射为数字以用于坐标轴 strategy_to_num = {s: i for i, s in enumerate(strategies)} df_all['Strategy_Num'] = df_all['Strategy'].map(strategy_to_num) # 构建 meshgrid step_vals = sorted(df_all['Step'].unique()) strategy_vals = list(range(len(strategies))) # 创建Z值矩阵 Z = np.zeros((len(step_vals), len(strategy_vals))) for i, step in enumerate(step_vals): for j, s in enumerate(strategies): val = df_all[(df_all['Step'] == step) & (df_all['Strategy'] == s)]['Total_Average'].values Z[i, j] = val[0] if len(val) > 0 else np.nan X, Y = np.meshgrid(strategy_vals, step_vals) # ----------------------------- # 绘制3D图 # ----------------------------- fig = plt.figure(figsize=(12, 8)) ax = fig.add_subplot(111, projection='3d') surf = ax.plot_surface(X, Y, Z, cmap='viridis', edgecolor='k', linewidth=0.3, antialiased=True) # 设置坐标轴标签 ax.set_xlabel('Strategy') ax.set_ylabel('Step') ax.set_zlabel('Total_Average') # 设置策略标签顺序 ax.set_xticks(range(len(strategies))) # 用 0 到 8 的索引 ax.set_xticklabels(strategies) # 对应标签按指定顺序排列 # # 设置策略标签 # ax.set_xticks(strategy_vals) # ax.set_xticklabels(strategies) # 标题和颜色条 ax.set_title('Total_Average over Steps and Strategies') fig.colorbar(surf, shrink=0.5, aspect=10) plt.tight_layout() plt.show()

zip

最新推荐

recommend-type

langchain4j-anthropic-spring-boot-starter-0.31.0.jar中文文档.zip

1、压缩文件中包含: 中文文档、jar包下载地址、Maven依赖、Gradle依赖、源代码下载地址。 2、使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 3、特殊说明: (1)本文档为人性化翻译,精心制作,请放心使用; (2)只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; (3)不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 4、温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件。 5、本文件关键字: jar中文文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册。
recommend-type

TMS320F28335电机控制程序详解:BLDC、PMSM无感有感及异步VF源代码与开发资料

TMS320F28335这款高性能数字信号处理器(DSP)在电机控制领域的应用,涵盖了BLDC(无刷直流电机)、PMSM(永磁同步电机)的无感有感控制以及异步VF(变频调速)程序。文章不仅解释了各类型的电机控制原理,还提供了完整的开发资料,包括源代码、原理图和说明文档,帮助读者深入了解其工作原理和编程技巧。 适合人群:从事电机控制系统开发的技术人员,尤其是对TMS320F28335感兴趣的工程师。 使用场景及目标:适用于需要掌握TMS320F28335在不同电机控制应用场景下具体实现方法的专业人士,旨在提高他们对该微控制器的理解和实际操作能力。 其他说明:文中提供的开发资料为读者提供了从硬件到软件的全面支持,有助于加速项目开发进程并提升系统性能。
recommend-type

基于爬山搜索法的风力发电MPPT控制Simulink仿真:定步长与变步长算法性能对比 - 爬山搜索法 最新版

基于爬山搜索法的风力发电最大功率点追踪(MPPT)控制的Simulink仿真模型,重点比较了定步长和变步长算法在不同风速条件下的表现。文中展示了两种算法的具体实现方法及其优缺点。定步长算法虽然结构简单、计算量小,但在风速突变时响应较慢,存在明显的稳态振荡。相比之下,变步长算法能够根据功率变化动态调整步长,表现出更快的响应速度和更高的精度,尤其在风速突变时优势明显。实验数据显示,变步长算法在风速从8m/s突增至10m/s的情况下,仅用0.3秒即可稳定,功率波动范围仅为±15W,而定步长算法则需要0.8秒,功率波动达到±35W。 适合人群:从事风力发电研究的技术人员、对MPPT控制感兴趣的工程技术人员以及相关专业的高校师生。 使用场景及目标:适用于风力发电系统的设计与优化,特别是需要提高系统响应速度和精度的场合。目标是在不同风速条件下,选择合适的MPPT算法以最大化风能利用率。 其他说明:文章还讨论了定步长算法在风速平稳情况下的优势,提出了根据不同应用场景灵活选择或组合使用这两种算法的建议。
recommend-type

基于MatlabSimulink的风电场调频策略研究:虚拟惯性、超速减载与下垂控制的协调优化

内容概要:本文详细探讨了在Matlab/Simulink环境下,针对风电场调频的研究,尤其是双馈风机调频策略的应用及其与其他调频策略的协调工作。文中介绍了三种主要的调频策略——虚拟惯性、超速减载和下垂控制,并基于三机九节点系统进行了改进,模拟了四组风电机组的协同调频过程。研究指出,虚拟惯性的应用虽然可以提供短期频率支持,但也可能导致频率二次跌落的问题。因此,需要通过超速减载和下垂控制来进行补偿,以维持电网的稳定。此外,文章还展示了在变风速条件下,风电机组对电网频率支撑能力的显著提升,尤其是在高比例风电并网渗透的情况下,频率最低点提高了50%,验证了调频策略的有效性。 适合人群:从事电力系统、风电场运营管理和调频技术研发的专业人士,以及对风电调频感兴趣的科研人员和技术爱好者。 使用场景及目标:适用于希望深入理解风电场调频机制及其优化方法的人群。目标是掌握不同调频策略的工作原理及其协调工作的关键点,提高风电场的运行效率和稳定性。 其他说明:本文通过具体的案例研究和仿真数据,展示了调频策略的实际效果,强调了合理运用调频策略对于风电场稳定运行的重要意义。同时,也为未来的风电调频技术创新提供了理论依据和实践经验。
recommend-type

三菱QL系列PLC在3C-FPC组装机中的定位与伺服控制及触摸屏应用解析

三菱Q系列和L系列PLC在3C-FPC组装机中的具体应用,涵盖硬件架构、软件编程以及实际操作技巧。主要内容包括:使用QX42数字输入模块和QY42P晶体管输出模块进行高效信号处理;采用JE系列伺服控制系统实现高精度四轴联动;利用SFC(顺序功能图)和梯形图编程方法构建稳定可靠的控制系统;通过触摸屏实现多用户管理和权限控制;并分享了一些实用的调试和维护技巧,如流水线节拍控制和工程师模式进入方法。最终,该系统的设备综合效率(OEE)达到了92%以上。 适合人群:从事自动化控制领域的工程师和技术人员,尤其是对三菱PLC有初步了解并希望深入了解其高级应用的人群。 使用场景及目标:适用于需要高精度、高效能的工业生产设备控制场合,旨在帮助用户掌握三菱PLC及其相关组件的应用技能,提高生产效率和产品质量。 其他说明:文中提供了详细的编程实例和操作指南,有助于读者更好地理解和实践。同时提醒使用者在调试过程中应注意伺服刚性参数调整,避免不必要的机械损伤。
recommend-type

Visual C++.NET编程技术实战指南

根据提供的文件信息,可以生成以下知识点: ### Visual C++.NET编程技术体验 #### 第2章 定制窗口 - **设置窗口风格**:介绍了如何通过编程自定义窗口的外观和行为。包括改变窗口的标题栏、边框样式、大小和位置等。这通常涉及到Windows API中的`SetWindowLong`和`SetClassLong`函数。 - **创建六边形窗口**:展示了如何创建一个具有特殊形状边界的窗口,这类窗口不遵循标准的矩形形状。它需要使用`SetWindowRgn`函数设置窗口的区域。 - **创建异形窗口**:扩展了定制窗口的内容,提供了创建非标准形状窗口的方法。这可能需要创建一个不规则的窗口区域,并将其应用到窗口上。 #### 第3章 菜单和控制条高级应用 - **菜单编程**:讲解了如何创建和修改菜单项,处理用户与菜单的交互事件,以及动态地添加或删除菜单项。 - **工具栏编程**:阐述了如何使用工具栏,包括如何创建工具栏按钮、分配事件处理函数,并实现工具栏按钮的响应逻辑。 - **状态栏编程**:介绍了状态栏的创建、添加不同类型的指示器(如文本、进度条等)以及状态信息的显示更新。 - **为工具栏添加皮肤**:展示了如何为工具栏提供更加丰富的视觉效果,通常涉及到第三方的控件库或是自定义的绘图代码。 #### 第5章 系统编程 - **操作注册表**:解释了Windows注册表的结构和如何通过程序对其进行读写操作,这对于配置软件和管理软件设置非常关键。 - **系统托盘编程**:讲解了如何在系统托盘区域创建图标,并实现最小化到托盘、从托盘恢复窗口的功能。 - **鼠标钩子程序**:介绍了钩子(Hook)技术,特别是鼠标钩子,如何拦截和处理系统中的鼠标事件。 - **文件分割器**:提供了如何将文件分割成多个部分,并且能够重新组合文件的技术示例。 #### 第6章 多文档/多视图编程 - **单文档多视**:展示了如何在同一个文档中创建多个视图,这在文档编辑软件中非常常见。 #### 第7章 对话框高级应用 - **实现无模式对话框**:介绍了无模式对话框的概念及其应用场景,以及如何实现和管理无模式对话框。 - **使用模式属性表及向导属性表**:讲解了属性表的创建和使用方法,以及如何通过向导性质的对话框引导用户完成多步骤的任务。 - **鼠标敏感文字**:提供了如何实现点击文字触发特定事件的功能,这在阅读器和编辑器应用中很有用。 #### 第8章 GDI+图形编程 - **图像浏览器**:通过图像浏览器示例,展示了GDI+在图像处理和展示中的应用,包括图像的加载、显示以及基本的图像操作。 #### 第9章 多线程编程 - **使用全局变量通信**:介绍了在多线程环境下使用全局变量进行线程间通信的方法和注意事项。 - **使用Windows消息通信**:讲解了通过消息队列在不同线程间传递信息的技术,包括发送消息和处理消息。 - **使用CriticalSection对象**:阐述了如何使用临界区(CriticalSection)对象防止多个线程同时访问同一资源。 - **使用Mutex对象**:介绍了互斥锁(Mutex)的使用,用以同步线程对共享资源的访问,保证资源的安全。 - **使用Semaphore对象**:解释了信号量(Semaphore)对象的使用,它允许一个资源由指定数量的线程同时访问。 #### 第10章 DLL编程 - **创建和使用Win32 DLL**:介绍了如何创建和链接Win32动态链接库(DLL),以及如何在其他程序中使用这些DLL。 - **创建和使用MFC DLL**:详细说明了如何创建和使用基于MFC的动态链接库,适用于需要使用MFC类库的场景。 #### 第11章 ATL编程 - **简单的非属性化ATL项目**:讲解了ATL(Active Template Library)的基础使用方法,创建一个不使用属性化组件的简单项目。 - **使用ATL开发COM组件**:详细阐述了使用ATL开发COM组件的步骤,包括创建接口、实现类以及注册组件。 #### 第12章 STL编程 - **list编程**:介绍了STL(标准模板库)中的list容器的使用,讲解了如何使用list实现复杂数据结构的管理。 #### 第13章 网络编程 - **网上聊天应用程序**:提供了实现基本聊天功能的示例代码,包括客户端和服务器的通信逻辑。 - **简单的网页浏览器**:演示了如何创建一个简单的Web浏览器程序,涉及到网络通信和HTML解析。 - **ISAPI服务器扩展编程**:介绍了如何开发ISAPI(Internet Server API)服务器扩展来扩展IIS(Internet Information Services)的功能。 #### 第14章 数据库编程 - **ODBC数据库编程**:解释了ODBC(开放数据库互联)的概念,并提供了使用ODBC API进行数据库访问的示例。 - **ADO编程**:介绍了ADO(ActiveX Data Objects)技术,讲解了如何使用ADO进行数据库的增删改查等操作。 #### 第15章 HTML帮助系统编程 - **创建HTML帮助文件**:讲解了如何创建HTML帮助文件(.chm),这是微软帮助系统的一种形式。 - **区分上下文帮助**:提供了在应用程序中实现上下文敏感帮助的技术示例。 #### 第16章 安装和部署项目 - **用安装程序向导创建安装项目**:介绍了如何使用Visual Studio中的安装和部署项目向导,快速创建安装程序。 - **部署应用程序**:阐述了部署应用程序到目标系统的过程,以及如何确保应用程序的正确安装和运行。 以上内容涵盖了Visual C++.NET编程技术体验一书中涉及的各个技术点,并对每个章节中的示例做了详细的说明。这些知识点对于理解和掌握Visual C++.NET编程非常重要,并能够帮助读者在实际项目开发中应用这些技术。
recommend-type

HarmonyOS内核深度探秘:优化自由行旅游系统的策略

# 1. HarmonyOS内核概述与旅游系统需求分析 ## 1.1 HarmonyOS内核概述 HarmonyOS(鸿蒙操作系统)是由华为开发的一个分布式操作系统,旨在提供跨多种设备的无缝体验。其内核设计理念强调高效、安全和模块化,以微内核架构和分布式能力
recommend-type

tkinter模块所有控件

<think>好的,我现在需要回答用户关于Tkinter模块中所有控件列表的问题。首先,我需要回忆Tkinter的基础知识,确保自己了解各个控件的名称和用途。用户希望得到所有支持的控件列表,所以必须全面覆盖,不能遗漏。 首先,我应该从Tkinter的标准控件开始。常见的控件包括Label、Button、Entry这些基础部件。然后是Frame,用于布局,还有Canvas用于绘图。接下来是Checkbutton、Radiobutton,这些属于选择类控件。Listbox和Scrollbar通常一起使用,处理滚动内容。还有Scale(滑块)、Spinbox、Menu、Menubutton这些可能
recommend-type

局域网五子棋游戏:娱乐与聊天的完美结合

标题“网络五子棋”和描述“适合于局域网之间娱乐和聊天!”以及标签“五子棋 网络”所涉及的知识点主要围绕着五子棋游戏的网络版本及其在局域网中的应用。以下是详细的知识点: 1. 五子棋游戏概述: 五子棋是一种两人对弈的纯策略型棋类游戏,又称为连珠、五子连线等。游戏的目标是在一个15x15的棋盘上,通过先后放置黑白棋子,使得任意一方先形成连续五个同色棋子的一方获胜。五子棋的规则简单,但策略丰富,适合各年龄段的玩家。 2. 网络五子棋的意义: 网络五子棋是指可以在互联网或局域网中连接进行对弈的五子棋游戏版本。通过网络版本,玩家不必在同一地点即可进行游戏,突破了空间限制,满足了现代人们快节奏生活的需求,同时也为玩家们提供了与不同对手切磋交流的机会。 3. 局域网通信原理: 局域网(Local Area Network,LAN)是一种覆盖较小范围如家庭、学校、实验室或单一建筑内的计算机网络。它通过有线或无线的方式连接网络内的设备,允许用户共享资源如打印机和文件,以及进行游戏和通信。局域网内的计算机之间可以通过网络协议进行通信。 4. 网络五子棋的工作方式: 在局域网中玩五子棋,通常需要一个客户端程序(如五子棋.exe)和一个服务器程序。客户端负责显示游戏界面、接受用户输入、发送落子请求给服务器,而服务器负责维护游戏状态、处理玩家的游戏逻辑和落子请求。当一方玩家落子时,客户端将该信息发送到服务器,服务器确认无误后将更新后的棋盘状态传回给所有客户端,更新显示。 5. 五子棋.exe程序: 五子棋.exe是一个可执行程序,它使得用户可以在个人计算机上安装并运行五子棋游戏。该程序可能包含了游戏的图形界面、人工智能算法(如果支持单机对战AI的话)、网络通信模块以及游戏规则的实现。 6. put.wav文件: put.wav是一个声音文件,很可能用于在游戏进行时提供声音反馈,比如落子声。在网络环境中,声音文件可能被用于提升玩家的游戏体验,尤其是在局域网多人游戏场景中。当玩家落子时,系统会播放.wav文件中的声音,为游戏增添互动性和趣味性。 7. 网络五子棋的技术要求: 为了确保多人在线游戏的顺利进行,网络五子棋需要具备一些基本的技术要求,包括但不限于稳定的网络连接、高效的数据传输协议(如TCP/IP)、以及安全的数据加密措施(如果需要的话)。此外,还需要有一个良好的用户界面设计来提供直观和舒适的用户体验。 8. 社交与娱乐: 网络五子棋除了是一个娱乐游戏外,它还具有社交功能。玩家可以通过游戏内的聊天系统进行交流,分享经验和策略,甚至通过网络寻找新的朋友。这使得网络五子棋不仅是一个个人娱乐工具,同时也是一种社交活动。 总结来说,网络五子棋结合了五子棋游戏的传统魅力和现代网络技术,使得不同地区的玩家能够在局域网内进行娱乐和聊天,既丰富了人们的娱乐生活,又加强了人际交流。而实现这一切的基础在于客户端程序的设计、服务器端的稳定运行、局域网的高效通信,以及音效文件增强的游戏体验。
recommend-type

自由行旅游新篇章:HarmonyOS技术融合与系统架构深度解析

# 1. HarmonyOS技术概述 ## 1.1 HarmonyOS的起源与发展 HarmonyOS(鸿蒙操作系统)由华为公司开发,旨在构建全场景分布式OS,以应对不同设备间的互联问题。自从2019年首次发布以来,HarmonyOS迅速成长,并迅速应用于智能手机、平板、智能穿戴、车载设备等多种平台。该系