sqlalchemy app.app_context()

时间: 2025-03-30 13:05:57 浏览: 48
### 使用 Flask 的 `app.app_context()` 方法与 SQLAlchemy 集成的最佳实践 在 Flask 中,`app.app_context()` 是一种用于创建应用程序上下文的方法。这种上下文对于访问依赖于当前应用实例的对象(如配置变量或数据库连接)至关重要[^1]。 当使用 SQLAlchemy 作为 ORM 工具时,通常会通过 Flask-SQLAlchemy 扩展来集成两者。在这种情况下,确保正确管理应用上下文可以防止运行时错误并优化性能。以下是关于如何最佳实践地使用 `app.app_context()` 和 SQLAlchemy 的说明: #### 应用上下文的作用 Flask 提供了一个全局代理对象 `current_app` 来表示活动的应用程序实例。然而,在某些场景下(例如后台线程或 CLI 脚本),可能不存在默认的请求上下文。此时可以通过显式调用来激活应用上下文: ```python with app.app_context(): # 此处可安全访问 current_app 或 db 对象 pass ``` #### 数据库初始化 为了使 SQLAlchemy 可以正常工作,需先完成其初始化过程并与 Flask 实例绑定在一起。这一步骤一般发生在项目启动阶段: ```python from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///example.db' db = SQLAlchemy(app) class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) ``` 上述代码片段展示了如何设置基本模型以及关联到 SQLite 数据源[^2]。 #### 查询操作中的上下文管理 执行任何涉及数据库的操作之前,请确认已处于有效的应用上下文中。如果是在视图函数内部,则无需额外处理;但如果位于独立脚本或者异步任务里,则应手动推入上下文环境: ```python def run_query_in_background(): with app.app_context(): users_count = User.query.count() print(f'Total number of users: {users_count}') ``` #### 延伸讨论:单元测试中的考虑事项 编写针对数据库交互逻辑的自动化测试案例时,同样需要注意维护恰当的应用上下文状态。推荐做法如下所示: ```python import unittest from your_flask_project import create_app, db class TestDatabaseOperations(unittest.TestCase): def setUp(self): self.app = create_app('testing') # 加载特定配置文件 self.app_context = self.app.app_context() self.app_context.push() db.create_all() def tearDown(self): db.session.remove() db.drop_all() self.app_context.pop() def test_user_creation(self): new_user = User(username='testuser') db.session.add(new_user) db.session.commit() retrieved_user = User.query.filter_by(username='testuser').first() assert retrieved_user is not None ```
阅读全文

相关推荐

项目结构为flask_project/ ├── app.py ├── models.py ├── routes.py ├── config.py ├── templates/ │ └── index.html ├── static/ └── tests.py代码如下from flask import Flask, render_template from flask_project.models import db from flask_project.routes import api from flask_project.config import Config app = Flask(__name__) app.config.from_object(Config) db.init_app(app) app.register_blueprint(api, url_prefix='/api') # 初始化数据库 with app.app_context(): db.create_all() @app.route('/') def index(): return render_template('index.html') if __name__ == '__main__': app.run(debug=True) from flask import Blueprint, request, jsonify from flask_project.models import db, User api = Blueprint('api', __name__) @api.route('/users', methods=['GET']) def get_users(): users = User.query.all() return jsonify([user.to_dict() for user in users]) @api.route('/users', methods=['POST']) def create_user(): data = request.json user = User(name=data['name'], email=data['email']) db.session.add(user) db.session.commit() return jsonify(user.to_dict()), 201 @api.route('/users/<int:user_id>', methods=['PUT']) def update_user(user_id): user = User.query.get_or_404(user_id) data = request.json user.name = data.get('name', user.name) user.email = data.get('email', user.email) db.session.commit() return jsonify(user.to_dict()) @api.route('/users/<int:user_id>', methods=['DELETE']) def delete_user(user_id): try: user = User.query.get(user_id) if not user: return jsonify({"error": "用户不存在"}), 404 db.session.delete(user) db.session.commit() return jsonify({"message": f"用户{user_id}删除成功"}), 200 except Exception as e: db.session.rollback() return jsonify({"error": str(e)}), 500 import unittest import requests import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from flask_project.app import app from flask_project.models import db, User from flask_project.config import Config class UserApiTestCase(unittest.TestCase): base_url = "http://localhost:5000/api/users" test_data = { "name": "张三", "email": "[email protected]" } @classmethod def setUpClass(cls): """初始化测试环境""" cls.app = app cls.app.config.from_object(Config) cls.client = cls.app.test_client() with cls.app.app_context(): db.create_all() @classmethod def tearDownClass(cls): """清理测试环境""" with cls.app.app_context(): db.session.remove() db.drop_all() def setUp(self): """每个测试用例前清理数据""" with self.app.app_context(): User.query.delete() db.session.commit() def test_01_create_user(self): """测试用户创建接口""" with self.app.app_context(): # 正常创建 response = self.client.post('/api/users', json=self.test_data) self.assertEqual(response.status_code, 201) self.assertIn("id", response.json()) # 验证数据库 user = User.query.filter_by(email=self.test_data["email"]).first() self.assertEqual(user.name, self.test_data["name"]) # 重复创建测试 dup_response = self.client.post('/api/users', json=self.test_data) self.assertEqual(dup_response.status_code, 409) def test_02_get_users(self): """测试用户查询接口""" with self.app.app_context(): # 添加测试数据 user = User(name=self.test_data["name"], email=self.test_data["email"]) db.session.add(user) db.session.commit() # 正常查询 response = self.client.get('/api/users') self.assertEqual(response.status_code, 200) users = response.json() self.assertIsInstance(users, list) self.assertGreater(len(users), 0) def test_03_get_single_user(self): """测试单个用户查询接口""" with self.app.app_context(): # 添加测试数据 user = User(name=self.test_data["name"], email=self.test_data["email"]) db.session.add(user) db.session.commit() # 正常查询 response = self.client.get(f'/api/users/{user.id}') self.assertEqual(response.status_code, 200) self.assertEqual(response.json()["email"], self.test_data["email"]) # 无效ID查询 invalid_response = self.client.get('/api/users/9999') self.assertEqual(invalid_response.status_code, 404) def test_04_update_user(self): """测试用户更新接口""" with self.app.app_context(): # 创建测试数据 user = User(name=self.test_data["name"], email=self.test_data["email"]) db.session.add(user) db.session.commit() # 更新操作 update_data = {"name": "李四", "email": "[email protected]"} response = self.client.put(f'/api/users/{user.id}', json=update_data) self.assertEqual(response.status_code, 200) # 验证更新结果 updated_user = User.query.get(user.id) self.assertEqual(updated_user.name, update_data["name"]) self.assertEqual(updated_user.email, update_data["email"]) def test_05_delete_user(self): """测试用户删除接口""" with self.app.app_context(): # 创建测试数据 user = User(name=self.test_data["name"], email=self.test_data["email"]) db.session.add(user) db.session.commit() # 删除操作 response = self.client.delete(f'/api/users/{user.id}') self.assertEqual(response.status_code, 204) # 验证删除结果 deleted_user = User.query.get(user.id) self.assertIsNone(deleted_user) def test_06_invalid_create(self): """测试异常创建请求""" with self.app.app_context(): # 缺少必填字段 invalid_data = {"email": "[email protected]"} response = self.client.post('/api/users', json=invalid_data) self.assertEqual(response.status_code, 400) self.assertIn("name", response.json().get("message", {})) if __name__ == '__main__': unittest.main(verbosity=2) import os class Config: SQLALCHEMY_DATABASE_URI = 'sqlite:///users.db' SQLALCHEMY_TRACK_MODIFICATIONS = False DEBUG = True from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() class User(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(80), nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) def to_dict(self): return {'id': self.id, 'name': self.name, 'email': self.email}修改代码实现用Unittest+requests对上面代码中开发的接口进行自动化测试。要求出现个人信息

def create_app(config_name='default'): """应用工厂函数""" # 在应用启动时立即配置日志 # 这确保了所有后续的日志记录都使用我们定义的滚动文件处理器 from tools.log_config import setup_logging setup_logging() app = Flask(__name__, instance_relative_config=True) CORS(app) # 加载配置 # 处理 Worker 子进程中可能出现的 "No module named 'config'" 问题 try: from config import config except ModuleNotFoundError: # 动态将项目根目录加入 sys.path 后重试 current_dir = os.path.abspath(os.path.dirname(__file__)) project_root_retry = os.path.abspath(os.path.join(current_dir, os.pardir)) if project_root_retry not in sys.path: sys.path.insert(0, project_root_retry) from config import config # 再次尝试导入 app.config.from_object(config[config_name]) # 确保JSON序列化配置正确,解决中文乱码问题 - 适用于Flask 3.x app.config['JSON_AS_ASCII'] = False app.config['JSONIFY_PRETTYPRINT_REGULAR'] = True app.config['JSON_SORT_KEYS'] = False # Flask 3.x的正确配置方式 app.json.ensure_ascii = False app.json.sort_keys = False # 初始化数据库 db.init_app(app) # 为 gevent 环境配置数据库线程安全 with app.app_context(): # 确保数据库引擎使用线程安全模式 from sqlalchemy import event @event.listens_for(db.engine, "connect") def set_pragma(dbapi_connection, connection_record): # 只在数据库是 SQLite 时执行 PRAGMA 指令 if db.engine.name == 'sqlite': cursor = dbapi_connection.cursor() cursor.execute("PRAGMA journal_mode=WAL") cursor.execute("PRAGMA synchronous=NORMAL") cursor.execute("PRAGMA temp_store=memory") cursor.execute("PRAGMA mmap_size=268435456") # 256MB cursor.close() # 配置Swagger UI - 简化配置 swagger_template = { "swagger": "2.0", "info": { "title": "异步调度服务器", "description": "基于Flask和Celery的异步任务调度服务", "version": "1.0.0" }, "host": "127.0.0.1:5000", "basePath": "/", "schemes": ["http"] } swagger_config = { "headers": [], "specs": [ { "endpoint": 'apispec_1', "route": '/apispec_1.json', "rule_filter": lambda rule: True, # all in "model_filter": lambda tag: True, # all in } ], "static_url_path": "/flasgger_static", "swagger_ui": True, "specs_route": "/apidocs/", "uiversion": 3 } Swagger(app, template=swagger_template, config=swagger_config) # 添加全局响应处理器,确保中文编码正确 @app.after_request def after_request(response): # 确保JSON响应使用UTF-8编码 if response.content_type and response.content_type.startswith('application/json'): response.headers['Content-Type'] = 'application/json; charset=utf-8' return response # 注册teardown函数 @app.teardown_appcontext def shutdown_session(exception=None): db.session.remove() # 在应用上下文中创建所有数据表 with app.app_context(): # 导入所有模型,以便 SQLAlchemy 能够正确创建它们 from app.models import task, log # noqa db.create_all() return app def create_celery_app(app): # todo 从app.config读取 """创建独立的Celery应用""" # 在 Celery app 创建时,也配置一次日志 # 这对 beat 调度器或直接调用任务的场景很重要 from tools.log_config import setup_logging setup_logging() # 创建任务上下文 class ContextTask(Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery = Celery(app.import_name, task_cls=ContextTask) # 基础配置 celery.conf.update( broker_url='redis://127.0.0.1:6379/0', result_backend='redis://127.0.0.1:6379/1', # broker_url=app.config['CELERY_BROKER_URL'], # result_backend=app.config['CELERY_RESULT_BACKEND'], task_serializer='json', result_serializer='json', accept_content=['json'], timezone='UTC', enable_utc=True, # Windows兼容性配置 - 使用gevent # worker_pool='gevent', # 移除此行 worker_concurrency=20, # 添加任务发现配置 task_always_eager=False, # 确保任务异步执行 result_expires=3600, # 结果保存1小时 # 添加队列定义,确保只使用celery队列 task_default_queue='celery', task_create_missing_queues=True, # 让 Celery 不要覆盖我们自定义的 root logger,直接复用现有 handler worker_hijack_root_logger=False, # 将标准输出/错误也重定向到 logging,确保 print() 内容写入日志 worker_redirect_stdouts=True, worker_redirect_stdouts_level='INFO', # 可靠性配置:防止任务丢失 task_acks_late=True, # 任务完成后才确认,防止 Worker 崩溃导致任务丢失 worker_prefetch_multiplier=1, # 减少预取,确保任务均匀分配 task_reject_on_worker_lost=True, # Worker 丢失时拒绝任务,重新排队 ) celery.set_default() return celery # 创建全局Celery实例 flask_app = create_app() celery_app = create_celery_app(flask_app) @celery_app.task def long_task(data): sleep(10) return len(data) 优化以上代码便于flask编写蓝图和路由,可以拆分为几个python文件

现在我要删除指定的表活着创建指定的表,我设置好了相应的数据模型,如下:from DB import db, ORMOperationBase from sqlalchemy import create_engine, Column, Integer, String, DateTime from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class ETrainEvaluationsTemplate(db.Model, ORMOperationBase): __tablename__ = 'ETrain_evaluations_template' __table_args__ = { 'comment': '评定表模版表', 'mysql_collate': 'utf8mb3_unicode_ci' # 字符集配置 } # 主键字段 id = db.Column(db.BigInteger, primary_key=True, autoincrement=True, comment='自增主键') # 业务字段 name = db.Column(db.String(128), nullable=False, comment='模板名称') # 审计字段 create_time = db.Column(db.TIMESTAMP, server_default=db.func.current_timestamp(), nullable=False, comment='创建时间') last_update_time = db.Column(db.TIMESTAMP, server_default=db.func.current_timestamp(), onupdate=db.func.current_timestamp(), comment='最后更新时间') is_deleted = db.Column(db.Boolean, default=False, server_default='0', comment='软删除标记(0:正常 1:删除)') # 外键字段 create_by_user_id = db.Column(db.BigInteger, db.ForeignKey('ETrain_user.id'), nullable=True, comment='创建人ID') last_update_user_id = db.Column(db.BigInteger, db.ForeignKey('ETrain_user.id'), nullable=True, comment='最后更新人ID') # 关系定义 created_by = db.relationship('ETrainUser', foreign_keys=[create_by_user_id], backref='created_templates', lazy='select') last_updated_by = db.relationship('ETrainUser', foreign_keys=[last_update_user_id], backref='updated_templates', lazy='select') # 索引配置(已自动继承原表索引) # # 创建和删除表的函数,应该放在模型外部 # def init_db(): # app = Flask(__name__) # app.config.from_object(Config) # app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # db.init_app(app) # # with app.app_context(): # ETrainEvaluationsTemplate.__table__.drop(db.engine) # # db.drop_all() # 删除所有表 # # db.create_all() # 创建所有表 if __name__ == '__main__': pass

2025-03-16 12:38:46,235 INFO sqlalchemy.engine.Engine SELECT DATABASE() 2025-03-16 12:38:46,236 INFO sqlalchemy.engine.Engine [raw sql] () 2025-03-16 12:38:46,237 INFO sqlalchemy.engine.Engine SELECT @@sql_mode 2025-03-16 12:38:46,237 INFO sqlalchemy.engine.Engine [raw sql] () 2025-03-16 12:38:46,238 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names 2025-03-16 12:38:46,238 INFO sqlalchemy.engine.Engine [raw sql] () 2025-03-16 12:38:46,239 INFO sqlalchemy.engine.Engine BEGIN (implicit) 2025-03-16 12:38:46,239 INFO sqlalchemy.engine.Engine DESCRIBE testormsqlalchemy.student 2025-03-16 12:38:46,239 INFO sqlalchemy.engine.Engine [raw sql] () 2025-03-16 12:38:46,240 INFO sqlalchemy.engine.Engine DESCRIBE testormsqlalchemy.computer 2025-03-16 12:38:46,241 INFO sqlalchemy.engine.Engine [raw sql] () 2025-03-16 12:38:46,242 INFO sqlalchemy.engine.Engine COMMIT 2025-03-16 12:38:46,246 INFO sqlalchemy.engine.Engine BEGIN (implicit) 2025-03-16 12:38:46,248 INFO sqlalchemy.engine.Engine INSERT INTO computer (name) VALUES (%s) 2025-03-16 12:38:46,249 INFO sqlalchemy.engine.Engine [generated in 0.00067s] ('华硕 ',) 2025-03-16 12:38:46,250 INFO sqlalchemy.engine.Engine INSERT INTO computer (name) VALUES (%s) 2025-03-16 12:38:46,251 INFO sqlalchemy.engine.Engine [cached since 0.002766s ago] ('戴尔',) 2025-03-16 12:38:46,251 INFO sqlalchemy.engine.Engine INSERT INTO computer (name) VALUES (%s) 2025-03-16 12:38:46,252 INFO sqlalchemy.engine.Engine [cached since 0.003822s ago] ('华为',) 2025-03-16 12:38:46,253 INFO sqlalchemy.engine.Engine INSERT INTO student (name, computer_id) VALUES (%s, %s) 2025-03-16 12:38:46,253 INFO sqlalchemy.engine.Engine [generated in 0.00042s] ('张三 ', 1) 2025-03-16 12:38:46,254 INFO sqlalchemy.engine.Engine ROLLBACK Traceback (most recent call last): File "D:\Python\lib\site-packages\sqlalchemy\engine\base.py", line 1964, in _exec_single_context self.dialect.do_execute( File "D:\Python\lib\site-packages\sqlalchemy\engine\default.py", li

import uuid import os from flask import Flask, request, jsonify from flask_executor import Executor from sqlalchemy import text from tasks.validate_contract_tasks import validate_contract as validate from models.contract import Contract from services.utils import download_file_from_url from tasks.celery_method import extract_json_from_model_output from tasks.upload_contract_tasks import process_file_async, process_contract_task from tasks.validate_contract_tasks import validate_contract_task # 新增db.py导入(需配合新建文件) from conf.db import db from utils.images_utils import convert_pdf_to_png from utils.re_utils import extract_if_contract_matches app = Flask(__name__, static_folder=None) app.config.from_object('conf.config.DevelopmentConfig') db.init_app(app) # 先完成db实例与app的绑定 # 设置 MySQL 时区(在连接后执行 SQL) with app.app_context(): with db.engine.connect() as conn: conn.execute(text("SET time_zone = '+08:00'")) # 延迟导入模型类,避免循环依赖 app.debug = True # Initialize Flask-Executor executor = Executor(app) executor.init_app(app) # 移除 max_workers 参数 UPLOAD_DIR = "/tmp/uploads" # 临时保存上传文件路径 os.makedirs(UPLOAD_DIR, exist_ok=True) # 异步处理:表格提取、图像增强、OCR @app.route("/parser/file_by_stream", methods=['POST']) def get_file_by_stream(): file = request.files.get('file') callback_url = request.form.get('callBackUrl') template_type = request.form.get('type') if not file: return jsonify({"error": "请上传文件"}), 400 # 为这个文件生成唯一 ID task_id = str(uuid.uuid4()) try: # 保存文件到临时目录 file_ext = os.path.splitext(file.filename)[-1] saved_path = os.path.join(UPLOAD_DIR, f"{task_id}{file_ext}") file.save(saved_path) # 调用异步任务处理(传入文件路径和 task_id) process_file_async(saved_path, callback_url, task_id, template_type) # 同步测试 except Exception as e: return jsonify({"error": f"文件处理失败: {str(e)}"}), 500 return jsonify({ "message": "任务已分发,后台处理中", "task_id": task_id }), 200 @app.route("/parser/file", methods=['POST']) def get_file_by_ids(): ids = request.json.get('ids') callback_url = request.json.get('callBackUrl') template_type = request.json.get('type') if not ids or not isinstance(ids, list): return jsonify({"error": "错误的参数"}), 400 # 每次分发的最大任务数 max_batch_size = 10 task_ids = [] # 分批处理 for i in range(0, len(ids), max_batch_size): batch_ids = ids[i:i + max_batch_size] # 获取当前批次的ids batch_task_ids = [str(uuid.uuid4()) for _ in batch_ids] task_ids.extend(batch_task_ids) # 分发当前批次的任务 for contract_id, task_id in zip(batch_ids, batch_task_ids): # process_file_async.delay(contract_id, callback_url, task_id, template_type) process_file_async(contract_id, callback_url, task_id, template_type) # 同步测试 return jsonify({ "message": "任务已分发,后台处理中", "task_ids": task_ids }), 200 @app.route('/validate/contract', methods=['POST']) def validate_contract_view(): # 避免和逻辑函数重名 data = request.get_json() contract_id = data.get('contractId') if not contract_id: return jsonify({"error": "缺少合同ID"}), 400 # 异步任务调用 validate(contract_id) # celery任务 return jsonify({"message": "任务已提交", "contract_id": contract_id}), 202 @app.route('/ocr/contract', methods=['POST']) def ocr_contract(): data = request.get_json() contract_id = data.get('contract_id') template_type = data.get('template_type') if not contract_id: return jsonify({"error": "缺少合同ID"}), 400 contract = Contract.query.get(contract_id) if not contract: return jsonify({"error": "合同不存在"}), 404 task_id = str(uuid.uuid4()) # 异步执行任务(传入任务ID即可) from tasks.upload_contract_tasks import process_contract_task process_contract_task.delay(contract,template_type,task_id) # from tests.openai_test import process_contract_task as test_contract_test # test_contract_test(contract, template_type,task_id) return jsonify({"message": "OCR任务已提交", "task_id": task_id}), 202 @app.route('/contracts/test', methods=['GET']) def test_contract(): pass @app.route("/", methods=['GET']) def hello(): return "Hello flask!" if __name__ == '__main__': app.run(debug=True, host="127.0.0.1", port=5090) 以上代码怎么打断点

我在开发一个适老化服务平台, 现在正在开发健康板块下的一个在线问诊功能, 我引进了gpt-2模型进行回复功能, 但现在我在前端, 客户端界面发起问诊时, 后端无法正常调用模型生成回复现在应该怎么做 D:\Soft\app\anaconda\python.exe "D:\GDUT\projectdata\python\pythonProject\Eldly service platform\main.py" 2025-05-11 14:57:35,717 - INFO - Received Socket.IO status: {'message': 'Connected to server'} 2025-05-11 14:57:35,718 - INFO - Socket.IO connection opened. 2025-05-11 14:57:35,724 - WARNING - 无法找到背景图片:Data/Background.png 无法加载背景图片 2025-05-11 14:57:49,961 - INFO - Consult message sent successfully. 2025-05-11 14:57:50,228 - INFO - Received Socket.IO message: {'from': '张医生', 'message': '迷是�'} 2025-05-11 14:58:40,478 - INFO - Attempting to book exam: 常规体检 on 2025-05-11 2025-05-11 14:58:42,546 - ERROR - Booking request failed: 500 Server Error: INTERNAL SERVER ERROR for url: http://localhost:5000/api/book_exam 2025-05-11 14:58:56,119 - INFO - Attempting to book exam: 常规体检 on 2025-05-11 2025-05-11 14:58:58,170 - ERROR - Booking request failed: 500 Server Error: INTERNAL SERVER ERROR for url: http://localhost:5000/api/book_exam 进程已结束,退出代码为 -1 前端输出日志 D:\Soft\app\anaconda\python.exe "D:\GDUT\projectdata\python\pythonProject\Eldly service platform\Community_process.py" Server initialized for threading. 2025-05-11 14:57:11,013 - INFO - Server initialized for threading. 2025-05-11 14:57:11,015 - INFO - Database initialized successfully. 2025-05-11 14:57:11,015 - INFO - Loading GPT-2 model... Database tables created successfully! Device set to use cpu 2025-05-11 14:57:11,938 - INFO - GPT-2 model loaded successfully. 2025-05-11 14:57:11,938 - INFO - Starting Flask server on http://localhost:5000... 2025-05-11 14:57:11,939 - WARNING - Werkzeug appears to be used in a production deployment. Consider switching to a production web server instead. 2025-05-11 14:57:11,948 - INFO - WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Serving Flask app 'Community_process' * Debug mode: off * Running on http://localhost:5000 2025-05-11 14:57:11,948 - INFO - Press CTRL+C to quit 2025-05-11 14:57:18,192 - INFO - Health check request received. 2025-05-11 14:57:18,192 - INFO - 127.0.0.1 - - [11/May/2025 14:57:18] "GET / HTTP/1.1" 200 - fR7RxP4HZVl2KOItAAAA: Sending packet OPEN data {'sid': 'fR7RxP4HZVl2KOItAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000, 'maxPayload': 1000000} 2025-05-11 14:57:33,658 - INFO - fR7RxP4HZVl2KOItAAAA: Sending packet OPEN data {'sid': 'fR7RxP4HZVl2KOItAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000, 'maxPayload': 1000000} 2025-05-11 14:57:33,660 - INFO - 127.0.0.1 - - [11/May/2025 14:57:33] "GET /socket.io/?transport=polling&EIO=4&t=1746946651.6086588 HTTP/1.1" 200 - fR7RxP4HZVl2KOItAAAA: Received request to upgrade to websocket 2025-05-11 14:57:35,713 - INFO - fR7RxP4HZVl2KOItAAAA: Received request to upgrade to websocket fR7RxP4HZVl2KOItAAAA: Upgrade to websocket successful 2025-05-11 14:57:35,716 - INFO - fR7RxP4HZVl2KOItAAAA: Upgrade to websocket successful fR7RxP4HZVl2KOItAAAA: Received packet MESSAGE data 0{} 2025-05-11 14:57:35,716 - INFO - fR7RxP4HZVl2KOItAAAA: Received packet MESSAGE data 0{} 2025-05-11 14:57:35,717 - INFO - Client connected to WebSocket emitting event "status" to hPVrGdOiqB-_O4AEAAAB [/] 2025-05-11 14:57:35,717 - INFO - emitting event "status" to hPVrGdOiqB-_O4AEAAAB [/] fR7RxP4HZVl2KOItAAAA: Sending packet MESSAGE data 2["status",{"message":"Connected to server"}] 2025-05-11 14:57:35,717 - INFO - fR7RxP4HZVl2KOItAAAA: Sending packet MESSAGE data 2["status",{"message":"Connected to server"}] fR7RxP4HZVl2KOItAAAA: Sending packet MESSAGE data 0{"sid":"hPVrGdOiqB-_O4AEAAAB"} 2025-05-11 14:57:35,717 - INFO - fR7RxP4HZVl2KOItAAAA: Sending packet MESSAGE data 0{"sid":"hPVrGdOiqB-_O4AEAAAB"} fR7RxP4HZVl2KOItAAAA: Received packet MESSAGE data 2["message",{"message":"\u6211\u5934\u75db","doctor_name":"\u5f20\u533b\u751f","user_id":"elder_user_001"}] 2025-05-11 14:57:49,962 - INFO - fR7RxP4HZVl2KOItAAAA: Received packet MESSAGE data 2["message",{"message":"\u6211\u5934\u75db","doctor_name":"\u5f20\u533b\u751f","user_id":"elder_user_001"}] received event "message" from hPVrGdOiqB-_O4AEAAAB [/] 2025-05-11 14:57:49,962 - INFO - received event "message" from hPVrGdOiqB-_O4AEAAAB [/] 2025-05-11 14:57:49,963 - INFO - Received WebSocket message: {'message': '我头痛', 'doctor_name': '张医生', 'user_id': 'elder_user_001'} Truncation was not explicitly activated but max_length is provided a specific value, please use truncation=True to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to truncation. Setting pad_token_id to eos_token_id:50256 for open-end generation. 2025-05-11 14:57:50,210 - INFO - Generated response for 张医生: 迷是�... 2025-05-11 14:57:50,228 - ERROR - Database error during consultation storage: (sqlite3.OperationalError) no such column: users.age [SQL: SELECT users.id AS users_id, users.username AS users_username, users.age AS users_age, users.gender AS users_gender, users.phone AS users_phone, users.address AS users_address, users.emergency_contact AS users_emergency_contact, users.emergency_phone AS users_emergency_phone, users.created_at AS users_created_at FROM users WHERE users.id = ?] [parameters: ('elder_user_001',)] (Background on this error at: https://sqlalche.me/e/20/e3q8) emitting event "message" to hPVrGdOiqB-_O4AEAAAB [/] 2025-05-11 14:57:50,228 - INFO - emitting event "message" to hPVrGdOiqB-_O4AEAAAB [/] fR7RxP4HZVl2KOItAAAA: Sending packet MESSAGE data 2["message",{"from":"\u5f20\u533b\u751f","message":"\u8ff7\u662f\ufffd"}] 2025-05-11 14:57:50,228 - INFO - fR7RxP4HZVl2KOItAAAA: Sending packet MESSAGE data 2["message",{"from":"\u5f20\u533b\u751f","message":"\u8ff7\u662f\ufffd"}] fR7RxP4HZVl2KOItAAAA: Sending packet PING data None 2025-05-11 14:57:58,660 - INFO - fR7RxP4HZVl2KOItAAAA: Sending packet PING data None fR7RxP4HZVl2KOItAAAA: Received packet PONG data 2025-05-11 14:57:58,661 - INFO - fR7RxP4HZVl2KOItAAAA: Received packet PONG data fR7RxP4HZVl2KOItAAAA: Sending packet PING data None 2025-05-11 14:58:23,662 - INFO - fR7RxP4HZVl2KOItAAAA: Sending packet PING data None fR7RxP4HZVl2KOItAAAA: Received packet PONG data 2025-05-11 14:58:23,662 - INFO - fR7RxP4HZVl2KOItAAAA: Received packet PONG data 2025-05-11 14:58:42,538 - INFO - Received exam booking request. 2025-05-11 14:58:42,538 - ERROR - Exception on /api/book_exam [POST] Traceback (most recent call last): File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1967, in _exec_single_context self.dialect.do_execute( File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\default.py", line 924, in do_execute cursor.execute(statement, parameters) sqlite3.OperationalError: no such column: users.age The above exception was the direct cause of the following exception: Traceback (most recent call last): File "D:\Soft\app\anaconda\Lib\site-packages\flask\app.py", line 1473, in wsgi_app response = self.full_dispatch_request() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\flask\app.py", line 882, in full_dispatch_request rv = self.handle_user_exception(e) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\flask\app.py", line 880, in full_dispatch_request rv = self.dispatch_request() ^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\flask\app.py", line 865, in dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\GDUT\projectdata\python\pythonProject\Eldly service platform\Community_process.py", line 62, in book_exam user = session.get(User, user_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\session.py", line 3682, in get return self._get_impl( ^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\session.py", line 3862, in _get_impl return db_load_fn( ^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\loading.py", line 694, in load_on_pk_identity session.execute( File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\session.py", line 2351, in execute return self._execute_internal( ^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\session.py", line 2236, in _execute_internal result: Result[Any] = compile_state_cls.orm_execute_statement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\context.py", line 293, in orm_execute_statement result = conn.execute( ^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1418, in execute return meth( ^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\sql\elements.py", line 515, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1640, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1846, in _execute_context return self._exec_single_context( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1986, in _exec_single_context self._handle_dbapi_exception( File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 2353, in _handle_dbapi_exception raise sqlalchemy_exception.with_traceback(exc_info[2]) from e File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1967, in _exec_single_context self.dialect.do_execute( File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\default.py", line 924, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such column: users.age [SQL: SELECT users.id AS users_id, users.username AS users_username, users.age AS users_age, users.gender AS users_gender, users.phone AS users_phone, users.address AS users_address, users.emergency_contact AS users_emergency_contact, users.emergency_phone AS users_emergency_phone, users.created_at AS users_created_at FROM users WHERE users.id = ?] [parameters: ('elder_user_001',)] (Background on this error at: https://sqlalche.me/e/20/e3q8) 2025-05-11 14:58:42,546 - INFO - 127.0.0.1 - - [11/May/2025 14:58:42] "POST /api/book_exam HTTP/1.1" 500 - fR7RxP4HZVl2KOItAAAA: Sending packet PING data None 2025-05-11 14:58:48,664 - INFO - fR7RxP4HZVl2KOItAAAA: Sending packet PING data None fR7RxP4HZVl2KOItAAAA: Received packet PONG data 2025-05-11 14:58:48,664 - INFO - fR7RxP4HZVl2KOItAAAA: Received packet PONG data 2025-05-11 14:58:58,165 - INFO - Received exam booking request. 2025-05-11 14:58:58,167 - ERROR - Exception on /api/book_exam [POST] Traceback (most recent call last): File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1967, in _exec_single_context self.dialect.do_execute( File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\default.py", line 924, in do_execute cursor.execute(statement, parameters) sqlite3.OperationalError: no such column: users.age The above exception was the direct cause of the following exception: Traceback (most recent call last): File "D:\Soft\app\anaconda\Lib\site-packages\flask\app.py", line 1473, in wsgi_app response = self.full_dispatch_request() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\flask\app.py", line 882, in full_dispatch_request rv = self.handle_user_exception(e) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\flask\app.py", line 880, in full_dispatch_request rv = self.dispatch_request() ^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\flask\app.py", line 865, in dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) # type: ignore[no-any-return] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\GDUT\projectdata\python\pythonProject\Eldly service platform\Community_process.py", line 62, in book_exam user = session.get(User, user_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\session.py", line 3682, in get return self._get_impl( ^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\session.py", line 3862, in _get_impl return db_load_fn( ^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\loading.py", line 694, in load_on_pk_identity session.execute( File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\session.py", line 2351, in execute return self._execute_internal( ^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\session.py", line 2236, in _execute_internal result: Result[Any] = compile_state_cls.orm_execute_statement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\orm\context.py", line 293, in orm_execute_statement result = conn.execute( ^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1418, in execute return meth( ^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\sql\elements.py", line 515, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1640, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1846, in _execute_context return self._exec_single_context( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1986, in _exec_single_context self._handle_dbapi_exception( File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 2353, in _handle_dbapi_exception raise sqlalchemy_exception.with_traceback(exc_info[2]) from e File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\base.py", line 1967, in _exec_single_context self.dialect.do_execute( File "D:\Soft\app\anaconda\Lib\site-packages\sqlalchemy\engine\default.py", line 924, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such column: users.age [SQL: SELECT users.id AS users_id, users.username AS users_username, users.age AS users_age, users.gender AS users_gender, users.phone AS users_phone, users.address AS users_address, users.emergency_contact AS users_emergency_contact, users.emergency_phone AS users_emergency_phone, users.created_at AS users_created_at FROM users WHERE users.id = ?] [parameters: ('elder_user_001',)] (Background on this error at: https://sqlalche.me/e/20/e3q8) 2025-05-11 14:58:58,170 - INFO - 127.0.0.1 - - [11/May/2025 14:58:58] "POST /api/book_exam HTTP/1.1" 500 - fR7RxP4HZVl2KOItAAAA: Sending packet PING data None 2025-05-11 14:59:13,665 - INFO - fR7RxP4HZVl2KOItAAAA: Sending packet PING data None fR7RxP4HZVl2KOItAAAA: Received packet PONG data 2025-05-11 14:59:13,665 - INFO - fR7RxP4HZVl2KOItAAAA: Received packet PONG data 2025-05-11 14:59:23,613 - INFO - Client disconnected from WebSocket 2025-05-11 14:59:23,613 - INFO - 127.0.0.1 - - [11/May/2025 14:59:23] "GET /socket.io/?transport=websocket&EIO=4&sid=fR7RxP4HZVl2KOItAAAA&t=1746946653.6612737 HTTP/1.1" 200 - 进程已结束,退出代码为 -1 后端输出日志

from flask import Flask, render_template, redirect, url_for, request, flash import paho.mqtt.client as mqtt import json from threading import Thread from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash from flask_socketio import SocketIO from datetime import datetime from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives import padding from cryptography.hazmat.backends import default_backend import base64 from flask import request, jsonify from vosk import Model, KaldiRecognizer import wave import os from paddleocr import PaddleOCR from paddlehub.module.module import Module import cv2 # 初始化 Flask 和扩展 app = Flask(__name__) socketio = SocketIO(app) # 初始化 Flask-SocketIO app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///iot_130.db' app.config['SECRET_KEY'] = 'your_secret_key' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = 'login' # AES 配置 SECRET_KEY = b"your_secret_key".ljust(32, b'\0') # AES-256密钥(32字节) IV = b"16_byte_iv_12345" # 16字节的初始化向量 # AES解密函数 def decrypt_data(encrypted_data, key): backend = default_backend() cipher = Cipher(algorithms.AES(key), modes.CBC(IV), backend=backend) decryptor = cipher.decryptor() unpadder = padding.PKCS7(128).unpadder() decrypted = decryptor.update(base64.b64decode(encrypted_data)) + decryptor.finalize() unpadded_data = unpadder.update(decrypted) + unpadder.finalize() return unpadded_data.decode() # AES加密函数 def encrypt_data(data, key): backend = default_backend() cipher = Cipher(algorithms.AES(key), modes.CBC(IV), backend=backend) encryptor = cipher.encryptor() padder = padding.PKCS7(128).padder() padded_data = padder.update(data) + padder.finalize() encrypted = encryptor.update(padded_data) + encryptor.finalize() return base64.b64encode(encrypted).decode() # User 表 class User(UserMixin, db.Model): __tablename__ = 'User' id = db.Column(db.Integer, primary_key=True, autoincrement=True) username = db.Column(db.String(150), unique=True, nullable=False) password = db.Column(db.String(150), nullable=False) role = db.Column(db.String(50), default='user') # Device 表 class Device(db.Model): __tablename__ = 'Device' id = db.Column(db.Integer, primary_key=True, autoincrement=True) name = db.Column(db.String(150), nullable=False) type = db.Column(db.String(150), nullable=False) status = db.Column(db.String(50), default='offline') last_seen = db.Column(db.DateTime, default=None) # SensorData 表 class SensorData(db.Model): __tablename__ = 'SensorData' id = db.Column(db.Integer, primary_key=True, autoincrement=True) device_id = db.Column(db.Integer, db.ForeignKey('Device.id'), nullable=False) value = db.Column(db.Float, nullable=False) timestamp = db.Column(db.DateTime, default=datetime.utcnow) # Command 表 class Command(db.Model): __tablename__ = 'Command' id = db.Column(db.Integer, primary_key=True, autoincrement=True) device_id = db.Column(db.Integer, db.ForeignKey('Device.id'), nullable=False) command = db.Column(db.String(150), nullable=False) status = db.Column(db.String(50), default='pending') timestamp = db.Column(db.DateTime, default=datetime.utcnow) # 初始化数据库 with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] hashed_password = generate_password_hash(password) # 检查用户名是否已存在 if User.query.filter_by(username=username).first(): flash('用户名已存在!') return redirect(url_for('register')) # 创建新用户 new_user = User(username=username, password=hashed_password) db.session.add(new_user) db.session.commit() flash('注册成功!请登录。') return redirect(url_for('login')) return render_template('register.html') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password, password): login_user(user) return redirect(url_for('index')) flash('用户名或密码错误!') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) # 上传页面 @app.route('/upload', methods=['GET', 'POST']) @login_required def upload(): if request.method == 'POST': # 检查是否有文件上传 if 'file' not in request.files: flash('没有选择文件!') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('没有选择文件!') return redirect(request.url) # 保存文件到 wav 目录 if file and file.filename.endswith('.wav'): filepath = os.path.join('wav', file.filename) file.save(filepath) # 使用 Vosk 模型进行语音识别 text = transcribe_audio(filepath) # 返回识别结果 return render_template('upload.html', text=text) return render_template('upload.html', text=None) @app.route('/image_upload', methods=['GET', 'POST']) @login_required def image_upload(): if request.method == 'POST': if 'image' not in request.files: flash('没有选择图像文件!') return redirect(request.url) image_file = request.files['image'] if image_file.filename == '': flash('没有选择图像文件!') return redirect(request.url) if image_file and image_file.filename.lower().endswith(('.png', '.jpg', '.jpeg')): image_path = os.path.join('static/uploads/images', image_file.filename) image_file.save(image_path) # 使用 PP-OCRv5 模型进行文字识别和图像检测 recognized_text = recognize_text(image_path) return render_template('image_upload.html', text=recognized_text) return render_template('image_upload.html', text=None) @app.route('/video_upload', methods=['GET', 'POST']) @login_required def video_upload(): if request.method == 'POST': if 'video' not in request.files: flash('没有选择视频文件!') return redirect(request.url) video_file = request.files['video'] if video_file.filename == '': flash('没有选择视频文件!') return redirect(request.url) if video_file and video_file.filename.lower().endswith(('.mp4', '.avi', '.mov')): video_path = os.path.join('static/uploads/videos', video_file.filename) video_file.save(video_path) # 使用 PaddleHub 模型进行宠物分类 classification_result = classify_pets_in_video(video_path) return render_template('video_upload.html', result=classification_result) return render_template('video_upload.html', result=None) def classify_pets_in_video(video_path): """使用 PaddleHub 模型对视频中的宠物进行分类""" try: # 加载 PaddleHub 的宠物分类模型 module = Module(name="resnet50_vd_animals") except Exception as e: print(f"模型加载失败: {e}") return # 打开视频文件 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"无法打开视频文件: {video_path}") return frame_count = 0 results = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break # 每隔一定帧数进行分类 if frame_count % 30 == 0: # 每30帧处理一次 print(f"正在处理第 {frame_count} 帧...") try: # 转换帧为 RGB 格式(PaddleHub 模型需要 RGB 格式) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 使用 PaddleHub 模型进行分类 result = module.classification(images=[frame_rgb]) results.append(result) except Exception as e: print(f"处理第 {frame_count} 帧时出错: {e}") frame_count += 1 cap.release() print("视频处理完成!") return results def recognize_text(image_path): """使用 PP-OCRv5 模型识别图像中的文字并检测图像""" ocr = PaddleOCR( text_detection_model_name="PP-OCRv5_mobile_det", text_recognition_model_name="PP-OCRv5_mobile_rec", use_doc_orientation_classify=False, use_doc_unwarping=False, use_textline_orientation=False, ) result = ocr.predict(image_path) print("result:", result) # 打印结果以调试 return result[0]['rec_texts'] def transcribe_audio(filepath): """使用 Vosk 模型将音频转换为文本""" model_path = "models/vosk-model-small-cn-0.22" if not os.path.exists(model_path): raise FileNotFoundError("Vosk 模型未找到,请检查路径!") model = Model(model_path) wf = wave.open(filepath, "rb") rec = KaldiRecognizer(model, wf.getframerate()) result_text = "" while True: data = wf.readframes(4000) if len(data) == 0: break if rec.AcceptWaveform(data): result = rec.Result() result_text += json.loads(result).get("text", "") wf.close() return result_text # MQTT配置 MQTT_BROKER = "localhost" # 或EMQX服务器地址 MQTT_PORT = 1883 # 存储最新温度和风扇状态 mytemp = None myfan = "off" def init_device(device_name, device_type): """初始化设备到数据库""" with app.app_context(): device = Device.query.filter_by(name=device_name).first() if not device: device = Device(name=device_name, type=device_type, status="offline") db.session.add(device) db.session.commit() def save_sensor_data(device_name, value): """保存传感器数据到 SensorData 表""" with app.app_context(): device = Device.query.filter_by(name=device_name).first() if device: sensor_data = SensorData(device_id=device.id, value=value) db.session.add(sensor_data) db.session.commit() def save_command(device_name, command): """保存控制指令到 Command 表""" with app.app_context(): device = Device.query.filter_by(name=device_name).first() if device: cmd = Command(device_id=device.id, command=command) db.session.add(cmd) db.session.commit() def on_connect(client, userdata, flags, rc): print(f"MQTT连接结果: {rc}") client.subscribe("topic/temp") # 订阅温度主题 def on_message(client, userdata, msg): global mytemp, myfan try: if msg.topic == "topic/temp": encrypted_payload = msg.payload.decode() decrypted_payload = decrypt_data(encrypted_payload, SECRET_KEY) payload = json.loads(decrypted_payload) mytemp = payload["temp"] print(f"解密温度数据: {mytemp}°C") # 保存传感器数据到数据库 save_sensor_data("temp", mytemp) # 根据温度控制风扇 if mytemp >= 30: control_fan("on") myfan = "on" else: control_fan("off") myfan = "off" # 实时推送温度数据到前端 socketio.emit('sensor_data', {'temp': mytemp, 'fan': myfan}) except Exception as e: print(f"解密失败或处理异常: {e}") def control_fan(command): """发送加密控制指令给风扇并保存到数据库""" payload = json.dumps({"fan": command}) encrypted_payload = encrypt_data(payload.encode(), SECRET_KEY) mqtt_client.publish("topic/fan", encrypted_payload) print(f"发送加密控制指令: {encrypted_payload}") # 保存控制指令到数据库 save_command("fan", command) def run_mqtt_client(): global mqtt_client mqtt_client = mqtt.Client() mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message mqtt_client.username_pw_set("admin", "admin") # 账号密码验证 mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.loop_forever() @app.route('/') @login_required # 保护 chart.html def index(): return render_template('chart.html') # 渲染前端页面 if __name__ == "__main__": # 初始化设备 with app.app_context(): init_device("temp", "sensor") init_device("fan", "actuator") # 启动MQTT客户端线程 mqtt_thread = Thread(target=run_mqtt_client) mqtt_thread.daemon = True mqtt_thread.start() # 启动Flask-SocketIO应用,启用TLS socketio.run(app, host="0.0.0.0", port=9000, debug=False, ssl_context=("ca/server.crt", "ca/server.key"))怎么改使用一个页面上传按钮,可同时上传图像、音频、视频,并将识别结果显示在该页面。

大家在看

recommend-type

NR 5G考试等级考考试基础试题(含答案已核实).pdf

。。。
recommend-type

小游戏源码-端午节龙舟大赛.rar

小游戏源码-端午节龙舟大赛.rar
recommend-type

fonteditorV1.3.2.zip 字体工具

FontEditor为一款功能强大的字体编辑和字体格式转换工具,支持制作32*32的全字库。能将TTF,OTF矢量字库转换为BDF字符,可以很方便的将Windows已经安装到系统的矢量字库转换为BDF字库,并能将TTF,OTF文件直接转换成BDF格式,并支持BDF,FNT,FNB文件格式的互转换,随心所欲将windows字体应用于各种嵌入式系统中。并支持将GB2312,BIG5,GBK转换为UCS2,UTF8,并支持UCS2,UTF8编码的互转换   V1.2  运行环境:Win2003, WinXP, Win2000, NT, WinME   (1)BDF,FNT,FNB字体格式的互转换   (2)将Windows已经安装TTF转换为BDF格式   (3)将TTF文件导入转为BDF,FNT,FNB格式   (4)汉字查码   V1.25   (1)Windows已经安装TTF字体导入为BDF时,剔除无效字符   (2)将BDF,FNT,FNB导出为bitmap文件和字符宽度索引   (3)GB2312,BIG5,GBK转换为UCS2,UTF8,以及UCS2,UTF8互转换   V1.25.03   (1)将单个字符导出为Bitmap文件   (2)解决导出字库bitmap时,字符少于256个导出文件不正确的问题   (3)解决导出选择中字符实际上是导出所有字符的问题   V1.26   (1)增加修正字符点阵的功能,可对所有字符或者当前页字符的点阵大小和位移进行调整   (2)修正V1.25.03中导出位图文件选择取消无法退出程序的问题   V1.3   (1)增加导出全字库bitmap方式,同时支持二进制导出和ASCII码导出   (2)增强读取BDF文件的兼容性   (3)增加手动剔除无效字符功能   V1.3.2   (1)增加TTF文件导入调整字符点阵大小,控制位图的精度和导入位图的效果   运行环境:Win2003, WinXP, Win2000, NT, WinME
recommend-type

1-99分钟倒计时Multisim仿真实例源文件.zip

1-99分钟倒计时Multisim仿真实例源文件,1-99分钟倒计时Multisim仿真实例源文件,可供学习及设计参考。
recommend-type

HCIE-Storage实验手册06---Oracle主备容灾方案实验手册.docx

HCIE-Storage实验手册06---Oracle主备容灾方案实验手册.docx

最新推荐

recommend-type

信捷XC系列PLC主从通讯程序设计与实现——工业自动化控制核心技术

信捷XC系列PLC主从通讯程序的设计与实现方法。信捷XC系列PLC是一款高性能、高可靠性的可编程逻辑控制器,在工业自动化领域广泛应用。文中阐述了主从通讯的基本概念及其重要性,具体讲解了配置网络参数、编写程序、数据交换以及调试与测试四个主要步骤。此外,还探讨了该技术在生产线控制、仓储物流、智能交通等多个领域的应用实例,强调了其对系统效率和稳定性的提升作用。 适合人群:从事工业自动化控制的技术人员、工程师及相关专业学生。 使用场景及目标:适用于需要多台PLC协同工作的复杂工业控制系统,旨在提高系统的效率和稳定性,确保各设备间的数据交换顺畅无误。 其他说明:随着工业自动化的快速发展,掌握此类通信协议和技术对于优化生产流程至关重要。
recommend-type

Qt 5.12.4与Halcon构建视觉流程框架:编译与测试的成功实践

如何将Halcon视觉技术和Qt框架相结合,构建一个强大的视觉处理流程框架。文中首先阐述了选择Qt 5.12.4的原因及其优势,接着描述了框架的具体构建方法,包括利用Qt的跨平台特性和界面设计工具创建用户界面,以及用Halcon进行图像处理与识别。随后,文章讲解了编译过程中需要注意的关键步骤,如正确引入Halcon的头文件和库文件,并提供了一个简单的代码示例。最后,作者分享了测试阶段的经验,强调了确保系统在各种情况下都能正常工作的必要性。通过这次实践,证明了两者结合可以带来更高效、更稳定的视觉处理解决方案。 适合人群:具有一定编程经验的技术人员,尤其是对计算机视觉和图形界面开发感兴趣的开发者。 使用场景及目标:适用于希望深入了解Qt与Halcon集成应用的开发者,旨在帮助他们掌握从框架搭建到实际部署的全过程,从而提升自身技能水平。 阅读建议:读者可以在阅读过程中跟随作者的步伐逐步尝试相关操作,以便更好地理解和吸收所介绍的知识点和技术细节。
recommend-type

基于Debian Jessie的Kibana Docker容器部署指南

Docker是一种开源的容器化平台,它允许开发者将应用及其依赖打包进一个可移植的容器中。Kibana则是由Elastic公司开发的一款开源数据可视化插件,主要用于对Elasticsearch中的数据进行可视化分析。Kibana与Elasticsearch以及Logstash一起通常被称为“ELK Stack”,广泛应用于日志管理和数据分析领域。 在本篇文档中,我们看到了关于Kibana的Docker容器化部署方案。文档提到的“Docker-kibana:Kibana 作为基于 Debian Jessie 的Docker 容器”实际上涉及了两个版本的Kibana,即Kibana 3和Kibana 4,并且重点介绍了它们如何被部署在Docker容器中。 Kibana 3 Kibana 3是一个基于HTML和JavaScript构建的前端应用,这意味着它不需要复杂的服务器后端支持。在Docker容器中运行Kibana 3时,容器实际上充当了一个nginx服务器的角色,用以服务Kibana 3的静态资源。在文档中提及的配置选项,建议用户将自定义的config.js文件挂载到容器的/kibana/config.js路径。这一步骤使得用户能够将修改后的配置文件应用到容器中,以便根据自己的需求调整Kibana 3的行为。 Kibana 4 Kibana 4相较于Kibana 3,有了一个质的飞跃,它基于Java服务器应用程序。这使得Kibana 4能够处理更复杂的请求和任务。文档中指出,要通过挂载自定义的kibana.yml文件到容器的/kibana/config/kibana.yml路径来配置Kibana 4。kibana.yml是Kibana的主要配置文件,它允许用户配置各种参数,比如Elasticsearch服务器的地址,数据索引名称等等。通过Docker容器部署Kibana 4,用户可以很轻松地利用Docker提供的环境隔离和可复制性特点,使得Kibana应用的部署和运维更为简洁高效。 Docker容器化的优势 使用Docker容器化技术部署Kibana,有几个显著的优势: - **一致性**:Docker容器确保应用在开发、测试和生产环境中的行为保持一致。 - **轻量级**:相比传统虚拟机,Docker容器更加轻量,启动快速,资源占用更少。 - **隔离性**:容器之间的环境隔离,确保应用之间互不干扰。 - **可移植性**:容器可以在任何支持Docker的环境中运行,提高了应用的可移植性。 - **易于维护**:通过Dockerfile可以轻松构建和分发应用镜像,便于维护和升级。 在文档中,我们还看到了文件名“docker-kibana-master”。这个名称很可能是指向了存放Docker相关文件的源代码仓库,其中可能包含Dockerfile、构建和运行脚本以及可能的配置模板文件等。开发者可以从这个仓库中克隆或下载所需的Docker相关文件,并根据这些文件来构建和部署Kibana的Docker容器。 根据以上信息,对于希望利用Docker容器部署和管理Kibana应用的用户,需要掌握Docker的基本使用方法,包括Docker的安装、镜像管理、容器的创建和配置等。同时,还需要了解Kibana的基本原理和配置方法,这样才能充分利用Docker容器化带来的便利,优化Kibana应用的部署和管理流程。
recommend-type

Coze智能体工作流:打造钦天监视频内容的创新与实践

# 1. Coze智能体工作流概述 Coze智能体工作流是业界领先的智能工作流程管理系统,它通过整合最新的智能技术与人工智能算法,旨在为视频内容创造与优化带来革命性的变革。本章将对Coze智能体工作流进行全面的概述,揭示其在IT行业中的应用范围、流程结构以及技术基础。通过了解Coze智能体工作流,读者将能够把握它在当前和未
recommend-type

使用git仓库的利与弊

<think>好的,我需要分析Git仓库的优缺点。首先,根据用户提供的引用,看看有哪些点需要覆盖。引用[1]提到了Git有本地和远程仓库,需要push命令来更新远程,这应该属于优点还是缺点呢?可能优点在于本地操作灵活,缺点是需要额外命令。 引用[2]指出Git的权限管理薄弱,学习曲线陡峭,这显然是缺点。另外,引用[4]提到Git使用SHA-1哈希,内容完整性更好,属于优点。引用[5]比较了Git的分支管理简单,而缺少全局版本号,这可以作为优点和缺点分别说明。 现在需要将这些信息整理成结构化的优缺点,可能分点列出。同时,用户要求回答结构清晰,逐步解决问题,所以可能需要先介绍Git的基本概念,
recommend-type

TextWorld:基于文本游戏的强化学习环境沙箱

在给出的文件信息中,我们可以提取到以下IT知识点: ### 知识点一:TextWorld环境沙箱 **标题**中提到的“TextWorld”是一个专用的学习环境沙箱,专为强化学习(Reinforcement Learning,简称RL)代理的训练和测试而设计。在IT领域中,尤其是在机器学习的子领域中,环境沙箱是指一个受控的计算环境,允许实验者在隔离的条件下进行软件开发和测试。强化学习是一种机器学习方法,其中智能体(agent)通过与环境进行交互来学习如何在某个特定环境中执行任务,以最大化某种累积奖励。 ### 知识点二:基于文本的游戏生成器 **描述**中说明了TextWorld是一个基于文本的游戏生成器。在计算机科学中,基于文本的游戏(通常被称为文字冒险游戏)是一种游戏类型,玩家通过在文本界面输入文字指令来与游戏世界互动。TextWorld生成器能够创建这类游戏环境,为RL代理提供训练和测试的场景。 ### 知识点三:强化学习(RL) 强化学习是**描述**中提及的关键词,这是一种机器学习范式,用于训练智能体通过尝试和错误来学习在给定环境中如何采取行动。在强化学习中,智能体在环境中探索并执行动作,环境对每个动作做出响应并提供一个奖励或惩罚,智能体的目标是学习一个策略,以最大化长期累积奖励。 ### 知识点四:安装与支持的操作系统 **描述**提到TextWorld的安装需要Python 3,并且当前仅支持Linux和macOS系统。对于Windows用户,提供了使用Docker作为解决方案的信息。这里涉及几个IT知识点: - **Python 3**:一种广泛使用的高级编程语言,适用于快速开发,是进行机器学习研究和开发的常用语言。 - **Linux**和**macOS**:两种流行的操作系统,分别基于Unix系统和类Unix系统。 - **Windows**:另一种广泛使用的操作系统,具有不同的软件兼容性。 - **Docker**:一个开源的应用容器引擎,允许开发者打包应用及其依赖环境为一个轻量级、可移植的容器,使得在任何支持Docker的平台上一致地运行。 ### 知识点五:系统库和依赖 **描述**提到在基于Debian/Ubuntu的系统上,可以安装一些系统库来支持TextWorld的本机组件。这里涉及的知识点包括: - **Debian/Ubuntu**:基于Debian的Linux发行版,是目前最流行的Linux发行版之一。 - **系统库**:操作系统中包含的一系列预编译的软件包和库,供应用程序在运行时使用。 - **包管理工具**,如**apt**(Advanced Package Tool),它是一个在Debian及其衍生系统中用于安装、删除和管理软件包的命令行工具。 ### 知识点六:与创建者联系方式 **描述**提供了与TextWorld创建者的联系方式,包括电子邮件地址和一个Gitter频道。这说明了如何与开源项目的维护者进行沟通与反馈: - **电子邮件**是常见的沟通方式,允许用户与开发者直接交流。 - **Gitter**是一个基于GitHub的即时消息工具,通常用于开源项目中的实时协作和交流。 ### 结语 综合以上信息,我们可以了解到TextWorld是一个专为强化学习设计的学习环境沙箱,它通过创建基于文本的游戏环境,让研究者和开发者训练和测试RL代理。它主要针对Linux和macOS系统,不过也有适合Windows用户的替代方案。此外,了解如何安装和配置TextWorld,以及如何与创建者沟通,对于开发者来说是十分重要的基础技能。
recommend-type

Coze智能体工作流全攻略

# 1. Coze智能体工作流概述 在现代企业中,工作流管理不仅是提高效率的关键因素,而且
recommend-type

64位小端转大端c语言函数起名

<think>好的,用户需要找一个C语言函数的名字,用来将64位小端格式转换为大端格式。首先,我得回顾一下C语言中字节序转换函数的命名惯例。通常,像htonl和htons这样的函数用于将主机字节序转换到网络字节序(大端)。其中,htonl处理32位,htons处理16位。不过,对于64位的情况,可能没有标准函数,所以需要自己实现,并且命名也要符合惯例。 接下来,用户提到的是64位的小端转大端。这时候,可能需要类似htonll这样的名字,因为htonl是host to network long(32位),那么htonll可能就是host to network long long(64位)。不过
recommend-type

upReveal.js: 利用鼠标移动揭示图像的创新技术

根据提供的文件信息,我们可以分析并生成以下知识点: ### upReveal.js技术知识点 #### 标题分析 标题 "upReveal.js:upReveal.js 通过鼠标在图像上的移动来显示图像!" 明确告诉我们,该技术是一个JavaScript库,它的核心功能是允许用户通过在图像上移动鼠标来揭示隐藏在图像下面的其他图像或内容。这样的功能特别适合用于创建富有互动性的网页设计。 #### 描述分析 描述中提到的“向上揭示 upReveal 效果”表明upReveal.js使用了一种特定的视觉效果来显示图像。这种效果可以让用户感觉到图像好像是从底层“向上”显现出来的,从而产生一种动态和引人入胜的视觉体验。描述还提到了版权信息,指出upReveal.js拥有版权所有,且该许可证伴随源代码提供。这表明开发者或公司可以使用这个库,但需要注意其许可证条款,以确保合法合规使用。 #### 标签分析 标签“HTML”意味着这个JavaScript库需要与HTML配合使用,具体可能涉及对HTML的img标签或其他元素进行操作,以实现图像揭示的效果。HTML是构建网页内容的基础,而JavaScript则是用来增加交互性和动态效果的脚本语言,upReveal.js正是在这个层面上发挥作用。 #### 压缩包子文件的文件名称列表分析 文件名称列表 "upReveal.js-master" 表明该JavaScript库可以通过一个名为“upReveal.js”的主文件来引入和使用。文件名中的“master”通常意味着这是主版本或主要代码分支,用户可以使用该文件作为起点来集成和应用这个效果。 ### upReveal.js的具体知识点 1. **图像揭示技术:** upReveal.js利用鼠标悬停(hover)事件来实现图像揭示效果。当用户将鼠标移动到指定图像上时,底层图像或内容会被逐渐显示出来。 2. **CSS和JavaScript交互:** 要实现这种效果,upReveal.js可能会结合使用CSS来设计图像覆盖层和动画效果,同时利用JavaScript来监听鼠标事件并控制图像的显示逻辑。 3. **跨浏览器兼容性:** 一个成功的JavaScript库应该能够在不同的浏览器上一致地工作。upReveal.js可能包含跨浏览器兼容性的代码,确保所有用户都能体验到相同的效果。 4. **许可证使用:** 虽然upReveal.js允许用户使用,但开发者需要阅读并理解伴随源代码提供的许可证条款。通常这会涉及对源代码的使用、修改和重新分发的限制。 5. **HTML集成:** 为了使用upReveal.js,开发者需要在HTML文件中通过脚本标签引入JavaScript文件。同时,可能需要准备相应的HTML结构来展示图像。 6. **自定义和配置:** upReveal.js很可能会提供一些配置选项,允许开发者调整效果的动画速度、触发区域大小等,以适应不同的设计需求。 7. **性能和优化:** 在设计交互式图像效果时,性能优化是一个关键考虑因素。upReveal.js可能会通过优化代码和资源使用,减少对页面加载和交互性能的影响。 8. **可访问性考虑:** 虽然描述中未提及,但在开发类似JavaScript库时,考虑可访问性是一个好的实践,确保所有用户,包括那些有视觉障碍的用户,都能够受益于这种技术。 通过上述分析,我们可以看到upReveal.js作为一个JavaScript库,不仅提供了动态的交互效果,还涉及到了前端开发的多个方面,包括但不限于HTML结构设计、CSS样式应用、JavaScript事件处理、跨浏览器兼容性、性能优化以及许可证协议的遵守等。开发者在使用upReveal.js时,应该综合考虑这些知识点,以实现最佳的用户体验。
recommend-type

金融服务中AI Agent的崛起:智能投资顾问与风险管理

# 1. 金融服务中的AI Agent概述 金融服务行业正经历数字化转型,其中AI Agent(人工智能代理)扮演着越来越重要的角色。AI Agent,一种能够通过学习和适应来执行复杂任务的软件代理,已经广泛应用于金融服务的多个领域,如智能投资顾问、风险管理和合规性监控等。 在这一章,