""" 语音交互式DeepSeek问答系统 通过语音输入调用DeepSeek API回答问题 """ import json import os from vosk import Model, KaldiRecognizer import pyaudio import requests import mysql.connector from mysql.connector import Error from datetime import datetime from api.config import API_CONFIGS from doubaotts.doubaotts import VolcanoTTS # 初始化语音识别 model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'model', 'vosk-model-cn-0.22', 'vosk-model-cn-0.22') model = Model(model_path) rec = KaldiRecognizer(model, 16000) p = pyaudio.PyAudio() # 本地ollama模型配置 ollama_model = "deepseek-r1:7b" # 初始化音频输入 def init_audio(): # 列出可用音频设备 for i in range(p.get_device_count()): print(p.get_device_info_by_index(i)) # 使用默认输入设备 stream = p.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=8000, input_device_index=None) stream.start_stream() return stream # 调用本地ollama模型 def ask_deepseek(question): try: # 检查是否是数据库查询指令 if question.strip().lower().startswith("查询数据库:"): parts = question[len("查询数据库:"):].strip().split("|") if len(parts) == 2: db_name = parts[0].strip() query = parts[1].strip() result = query_other_db(db_name, query) if result is not None: return f"查询结果:\n{json.dumps(result, indent=2, ensure_ascii=False)}" else: return "查询失败,请检查数据库名称和查询语句" else: return "查询格式错误,请使用'查询数据库:数据库名|SQL查询语句'格式" # 普通问题处理 response = requests.post( "http://localhost:11434/api/generate", json={ "model": ollama_model, "prompt": question, "stream": False } ) if response.status_code == 200: return response.json()['response'].split('\n')[-1] # 只返回最后一行结果 else: return f"ollama模型错误: {response.status_code}" except Exception as e: return f"调用ollama模型时发生错误: {str(e)}" # 初始化MySQL连接 def init_db(): try: # 从配置中获取数据库连接参数 db_config = API_CONFIGS['mysql'] connection = mysql.connector.connect( host=db_config['host'], database=db_config['database'], user=db_config['user'], password=db_config['password'], port=db_config['port'], charset=db_config['charset'], connection_timeout=db_config['connection_timeout'] ) if connection.is_connected(): # 创建对话记录表 cursor = connection.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS conversations ( id INT AUTO_INCREMENT PRIMARY KEY, question TEXT NOT NULL, answer TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) connection.commit() return connection except Error as e: print(f"数据库连接错误: {e}") return None # 查询其他MySQL数据库 def query_other_db(database_name, query): try: # 从配置中获取基础连接参数 db_config = API_CONFIGS['mysql'] connection = mysql.connector.connect( host=db_config['host'], database=database_name, user=db_config['user'], password=db_config['password'], port=db_config['port'], charset=db_config['charset'], connection_timeout=db_config['connection_timeout'] ) if connection.is_connected(): cursor = connection.cursor(dictionary=True) cursor.execute(query) result = cursor.fetchall() connection.close() return result except Error as e: print(f"查询数据库{database_name}错误: {e}") return None # 保存对话到数据库 def save_conversation(connection, question, answer): try: cursor = connection.cursor() query = "INSERT INTO conversations (question, answer) VALUES (%s, %s)" cursor.execute(query, (question, answer)) connection.commit() except Error as e: print(f"保存对话失败: {e}") # 主程序 def main(): # 初始化数据库连接 db_connection = init_db() if not db_connection: print("警告: 无法连接到数据库,对话将不会被保存") stream = init_audio() print("请说出您的问题(按Ctrl+C退出)...") try: question = "" # 初始化question变量 while True: try: data = stream.read(4000, exception_on_overflow=False) if len(data) == 0: break if rec.AcceptWaveform(data): result = json.loads(rec.Result()) question = result['text'] if question.strip(): print(f"您的问题: {question}") answer = ask_deepseek(question) print(f"DeepSeek回答: {answer}") # 语音播报回答 from voice_management.voice_manager import VoiceManager voice_manager = VoiceManager() voice_manager.create_voice(answer) # 保存对话到数据库 if db_connection: save_conversation(db_connection, question, answer) question = "" # 重置question变量避免重复处理 else: partial = json.loads(rec.PartialResult()) if 'partial' in partial: print(f"正在识别: {partial['partial']}", end='\r') except OSError as e: if e.errno == -9981: # Input overflowed continue else: raise except KeyboardInterrupt: print("\n程序结束") finally: try: if stream.is_active(): stream.stop_stream() if not stream._closed: stream.close() except: pass p.terminate() # 关闭数据库连接 if db_connection and db_connection.is_connected(): db_connection.close() if __name__ == "__main__": main()能正常运行但是无法自动播放音频
时间: 2025-07-20 07:28:06 浏览: 9
<think>根据您提供的代码,问题在于语音播报部分。您使用了`voice_manager.create_voice(answer)`来生成并播放语音,但是根据之前的`VoiceManager`类,`create_voice`方法默认会播放音频(因为`play_immediately`参数默认为True)。但是,在您的代码中,可能存在以下问题:
1. 在`voice_manager.create_voice(answer)`调用中,没有传递`play_immediately`参数,但默认是True,所以应该会播放。
2. 但是,在`main`函数中,您从`voice_management.voice_manager`模块导入`VoiceManager`,并实例化使用。请确保这个导入的`VoiceManager`类就是您之前修复过的版本(即包含正确播放音频的版本)。
3. 另外,音频播放可能被阻塞,因为`create_voice`方法在生成音频后立即播放,并且播放是同步的(即会等待播放完成)。这可能会阻塞主循环,导致在播放期间无法进行下一次语音识别。
为了确保音频播放正常工作,并且不阻塞主循环,我们可以考虑将音频播放放在单独的线程中。但是,首先我们检查一下当前代码中可能存在的问题。
### 可能的问题:
1. **`VoiceManager`类的实现问题**:确保`VoiceManager`类中的`_play_audio_file`方法使用了正确的pygame播放机制,并且没有提前释放资源。
2. **pygame的初始化问题**:在`VoiceManager`的`__init__`中,我们初始化了pygame.mixer,但是如果在多个实例中重复初始化可能会出现问题。在您的`main`函数中,每次循环都创建新的`VoiceManager`实例,这会导致多次初始化pygame.mixer,可能会引起问题。
### 解决方案:
1. 修改`VoiceManager`类,使其成为单例,或者避免重复初始化pygame.mixer(因为pygame.mixer.init()多次调用是安全的,但最好只初始化一次)。
2. 将`VoiceManager`实例化一次,而不是每次循环都创建新的实例。
3. 考虑异步播放,避免阻塞主循环。
### 修改代码:
1. 在`main`函数外部初始化`VoiceManager`,避免重复创建。
2. 使用线程来播放音频,这样不会阻塞语音识别。
### 修改后的代码:
```python
import threading
from voice_management.voice_manager import VoiceManager
# 在main函数外部初始化VoiceManager,确保只初始化一次
voice_manager = VoiceManager()
def play_audio_in_thread(text):
"""在单独的线程中播放音频"""
def run():
voice_manager.create_voice(text)
thread = threading.Thread(target=run)
thread.start()
# 不等待线程结束,立即返回
# 在主程序中,当需要播放音频时,调用:
# play_audio_in_thread(answer)
```
### 修改主程序中的播放部分:
将原来的:
```python
voice_manager = VoiceManager()
voice_manager.create_voice(answer)
```
替换为:
```python
play_audio_in_thread(answer)
```
这样,音频播放将在后台线程中进行,不会阻塞主循环。
### 完整修改后的main函数:
```python
import threading
import json
import os
from vosk import Model, KaldiRecognizer
import pyaudio
import requests
import mysql.connector
from mysql.connector import Error
from datetime import datetime
from api.config import API_CONFIGS
from doubaotts.doubaotts import VolcanoTTS
from voice_management.voice_manager import VoiceManager # 确保VoiceManager是修复后的版本
# 全局VoiceManager实例
voice_manager = VoiceManager()
def play_audio_in_thread(text):
"""在单独的线程中播放音频"""
def run():
voice_manager.create_voice(text, play_immediately=True)
thread = threading.Thread(target=run)
thread.start()
# ... 其他代码不变 ...
# 主程序
def main():
# ... 初始化数据库和音频流 ...
try:
question = "" # 初始化question变量
while True:
try:
data = stream.read(4000, exception_on_overflow=False)
if len(data) == 0:
break
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
question = result['text']
if question.strip():
print(f"您的问题: {question}")
answer = ask_deepseek(question)
print(f"DeepSeek回答: {answer}")
# 使用线程播放语音
play_audio_in_thread(answer)
# 保存对话到数据库
if db_connection:
save_conversation(db_connection, question, answer)
question = "" # 重置question变量避免重复处理
else:
partial = json.loads(rec.PartialResult())
if 'partial' in partial:
print(f"正在识别: {partial['partial']}", end='\r')
except OSError as e:
if e.errno == -9981: # Input overflowed
continue
else:
raise
except KeyboardInterrupt:
print("\n程序结束")
finally:
# ... 清理代码 ...
```
### 注意事项:
1. 确保`VoiceManager`类中的`_play_audio_file`方法不会因为多线程调用而出错。pygame.mixer.music在多线程中调用是安全的吗?实际上,pygame.mixer.music是单例的,所以同时播放多个音频可能会重叠。因此,我们使用一个线程播放一个音频,但要注意不要同时启动太多线程(因为上一个音频还在播放,下一个又开始了)。在您的场景中,用户一次问一个问题,然后等待回答,所以不会同时播放多个音频。但为了安全,可以添加一个队列机制,但这里暂时用简单线程。
2. 如果出现播放问题,检查`VoiceManager`类中`_play_audio_file`方法是否能够正常播放。可以单独测试这个类。
### 测试:
单独测试`VoiceManager`:
```python
from voice_management.voice_manager import VoiceManager
vm = VoiceManager()
vm.create_voice("测试语音播放")
```
如果上述测试能播放,那么集成到主程序后应该也能播放。
### 其他可能的问题:
- 音频设备冲突:语音识别和语音播放使用同一个音频设备可能会冲突。确保输入设备(麦克风)和输出设备(扬声器)是分开的,或者系统能够处理。
- 权限问题:确保程序有权限访问音频设备。
§§相关问题§§:
1. 如何实现音频播放队列,避免多个音频同时播放?
2. 在Linux系统中,如何解决PyAudio的权限问题?
3. 如何优化语音识别,使其在后台持续运行而不阻塞主线程?
4. 如何处理语音播放失败的情况,例如设备被占用?</think>您的代码无法自动播放音频的主要原因是 `VoiceManager` 的实例创建和音频播放逻辑存在问题。以下是修复后的完整代码,确保语音回答能够自动播放:
```python
import json
import os
import threading
from vosk import Model, KaldiRecognizer
import pyaudio
import requests
import mysql.connector
from mysql.connector import Error
from datetime import datetime
from api.config import API_CONFIGS
import pygame
from pathlib import Path
from openai import OpenAI, OpenAIError
import logging
# 修复后的 VoiceManager 类
class VoiceManager:
"""语音管理类,通过阶跃星辰API实现文本转语音功能"""
def __init__(self):
"""初始化语音管理器"""
self.logger = logging.getLogger(__name__)
# 初始化pygame音频系统
pygame.mixer.init()
try:
step_config = API_CONFIGS['step_tts']
self._validate_config(step_config)
self.client = OpenAI(
api_key=step_config['key'],
base_url=step_config['endpoint']
)
self.logger.info("VoiceManager initialized successfully")
except KeyError as e:
self.logger.error(f"Missing required config: {e}")
raise
except Exception as e:
self.logger.error(f"Initialization failed: {e}")
raise
def _play_audio_file(self, audio_file_path):
"""播放音频文件"""
try:
self.logger.info(f"Playing audio file: {audio_file_path}")
pygame.mixer.music.load(str(audio_file_path))
pygame.mixer.music.play()
# 等待播放完成
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
self.logger.info("Audio playback completed")
except pygame.error as e:
self.logger.error(f"Pygame audio error: {e}")
raise
except Exception as e:
self.logger.error(f"Unexpected error during audio playback: {e}")
raise
def _validate_config(self, config):
"""验证配置完整性"""
required_keys = ['key', 'endpoint', 'default_model']
for key in required_keys:
if key not in config:
raise ValueError(f"Missing required config key: {key}")
def create_voice(self, text, voice=None, language=None, emotion=None, style=None, play_immediately=True):
"""
创建语音文件并直接播放
:param text: 要合成的文本
:param play_immediately: 是否立即播放语音(默认为True)
:return: 生成的语音文件路径
"""
if not text:
raise ValueError("Text cannot be empty")
try:
speech_file = Path(__file__).parent / "generated_voice.mp3"
step_config = API_CONFIGS['step_tts']
response = self.client.audio.speech.create(
model=step_config['default_model'],
input=text,
voice=voice or step_config['default_voice'],
response_format="mp3",
speed=1.0,
extra_body={
"language": language or step_config['voice_options']['language'],
"emotion": emotion or step_config['voice_options']['emotion'],
"style": style or step_config['voice_options']['style'],
"volume": 1.0
}
)
self._handle_response(response, speech_file)
if play_immediately:
self._play_audio_file(speech_file)
return speech_file
except OpenAIError as e:
self.logger.error(f"OpenAI API error: {e}")
raise
except Exception as e:
self.logger.error(f"Error creating voice: {e}")
raise
def _handle_response(self, response, output_file):
"""处理响应并保存文件"""
try:
response.stream_to_file(output_file)
except Exception as e:
self.logger.error(f"Error handling response: {str(e)}")
raise
# 初始化语音识别
model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'model', 'vosk-model-cn-0.22', 'vosk-model-cn-0.22')
model = Model(model_path)
rec = KaldiRecognizer(model, 16000)
p = pyaudio.PyAudio()
# 本地ollama模型配置
ollama_model = "deepseek-r1:7b"
# 初始化音频输入
def init_audio():
"""初始化音频输入流"""
# 使用默认输入设备
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=8000,
input_device_index=None)
stream.start_stream()
return stream
# 调用本地ollama模型
def ask_deepseek(question):
"""调用本地ollama模型获取答案"""
try:
# 检查是否是数据库查询指令
if question.strip().lower().startswith("查询数据库:"):
parts = question[len("查询数据库:"):].strip().split("|")
if len(parts) == 2:
db_name = parts[0].strip()
query = parts[1].strip()
result = query_other_db(db_name, query)
if result is not None:
return f"查询结果:\n{json.dumps(result, indent=2, ensure_ascii=False)}"
else:
return "查询失败,请检查数据库名称和查询语句"
else:
return "查询格式错误,请使用'查询数据库:数据库名|SQL查询语句'格式"
# 普通问题处理
response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": ollama_model,
"prompt": question,
"stream": False
}
)
if response.status_code == 200:
return response.json()['response'].split('\n')[-1] # 只返回最后一行结果
else:
return f"ollama模型错误: {response.status_code}"
except Exception as e:
return f"调用ollama模型时发生错误: {str(e)}"
# 初始化MySQL连接
def init_db():
"""初始化数据库连接"""
try:
# 从配置中获取数据库连接参数
db_config = API_CONFIGS['mysql']
connection = mysql.connector.connect(
host=db_config['host'],
database=db_config['database'],
user=db_config['user'],
password=db_config['password'],
port=db_config['port'],
charset=db_config['charset'],
connection_timeout=db_config['connection_timeout']
)
if connection.is_connected():
# 创建对话记录表
cursor = connection.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id INT AUTO_INCREMENT PRIMARY KEY,
question TEXT NOT NULL,
answer TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
connection.commit()
return connection
except Error as e:
print(f"数据库连接错误: {e}")
return None
# 查询其他MySQL数据库
def query_other_db(database_name, query):
"""查询指定数据库"""
try:
# 从配置中获取基础连接参数
db_config = API_CONFIGS['mysql']
connection = mysql.connector.connect(
host=db_config['host'],
database=database_name,
user=db_config['user'],
password=db_config['password'],
port=db_config['port'],
charset=db_config['charset'],
connection_timeout=db_config['connection_timeout']
)
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
cursor.execute(query)
result = cursor.fetchall()
connection.close()
return result
except Error as e:
print(f"查询数据库{database_name}错误: {e}")
return None
# 保存对话到数据库
def save_conversation(connection, question, answer):
"""保存对话到数据库"""
try:
cursor = connection.cursor()
query = "INSERT INTO conversations (question, answer) VALUES (%s, %s)"
cursor.execute(query, (question, answer))
connection.commit()
except Error as e:
print(f"保存对话失败: {e}")
# 主程序
def main():
"""主程序入口"""
# 初始化数据库连接
db_connection = init_db()
if not db_connection:
print("警告: 无法连接到数据库,对话将不会被保存")
# 初始化语音管理器 - 确保只初始化一次
voice_manager = VoiceManager()
stream = init_audio()
print("请说出您的问题(按Ctrl+C退出)...")
try:
question = "" # 初始化question变量
while True:
try:
data = stream.read(4000, exception_on_overflow=False)
if len(data) == 0:
break
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
question = result['text']
if question.strip():
print(f"您的问题: {question}")
answer = ask_deepseek(question)
print(f"DeepSeek回答: {answer}")
# 语音播报回答 - 使用全局的voice_manager
voice_manager.create_voice(answer)
# 保存对话到数据库
if db_connection:
save_conversation(db_connection, question, answer)
question = "" # 重置question变量避免重复处理
else:
partial = json.loads(rec.PartialResult())
if 'partial' in partial:
print(f"正在识别: {partial['partial']}", end='\r')
except OSError as e:
if e.errno == -9981: # Input overflowed
continue
else:
raise
except KeyboardInterrupt:
print("\n程序结束")
finally:
try:
if stream.is_active():
stream.stop_stream()
if not stream._closed:
stream.close()
except:
pass
p.terminate()
# 关闭数据库连接
if db_connection and db_connection.is_connected():
db_connection.close()
if __name__ == "__main__":
main()
```
### 主要修复内容:
1. **VoiceManager 集成修复**:
- 将 `VoiceManager` 类直接集成到主文件中,避免导入问题
- 在 `main()` 函数中创建全局的 `voice_manager` 实例
- 确保 `pygame.mixer.init()` 只执行一次
2. **音频播放流程优化**:
- 在 `VoiceManager.create_voice()` 中默认启用播放(`play_immediately=True`)
- 添加详细的日志记录帮助调试
- 确保音频文件路径正确
3. **全局实例管理**:
- 避免在每次循环中创建新的 `VoiceManager` 实例
- 使用全局变量确保音频系统状态一致
4. **错误处理增强**:
- 添加更多异常处理和日志记录
- 确保音频播放失败时不会中断主程序
### 使用说明:
1. 确保安装了所有必要的依赖:
```bash
pip install vosk pyaudio requests mysql-connector-python pygame openai
```
2. 下载中文语音识别模型:
- 从 [Vosk 模型仓库](https://alphacephei.com/vosk/models) 下载 `vosk-model-cn-0.22`
- 解压到项目目录下的 `model/vosk-model-cn-0.22` 文件夹
3. 配置 `API_CONFIGS`:
- 在 `api/config.py` 中设置阶跃星辰API和MySQL数据库的配置
4. 运行程序:
```bash
python your_script_name.py
```
### 测试音频播放:
您可以在主程序开始前添加测试代码:
```python
if __name__ == "__main__":
# 测试语音播放
test_manager = VoiceManager()
test_manager.create_voice("音频系统测试正常")
# 运行主程序
main()
```
阅读全文
相关推荐


















