file-type

使用social-auth模块管理OAuth 2.0 access_token和refresh_token

ZIP文件

下载需积分: 50 | 3KB | 更新于2025-04-22 | 102 浏览量 | 0 下载量 举报 收藏
download 立即下载
### 知识点详述 #### 标题解读 标题“social-auth:验证access_token的模块,从refresh_token获取access_token,将它们存储在db等中”表明,此处所探讨的为一个名为`social-auth`的JavaScript模块,它主要负责处理社交平台(例如Google、Facebook等)的用户认证过程。该模块不仅可以验证`access_token`的有效性,还能利用`refresh_token`来获取新的`access_token`。此外,验证和刷新令牌的过程所涉及的用户认证信息,最终会被存储在数据库(db)或其他存储系统中。 #### 描述解读 描述中提供的代码段是`social-auth`模块在Node.js环境中的使用示例。该代码段通过几个关键步骤完成用户认证: 1. **初始化模块**:通过`require`语句加载`social-auth`模块中的`google`策略,然后使用`client_id`、`client_secret`和`callback`这三个参数初始化`Google`认证对象`social`。 - `client_id`是应用在社交平台注册时获得的标识符,用于标识你的应用。 - `client_secret`是应用在社交平台注册时获得的一个密钥,用于保护你的应用不被未授权使用。 - `callback`是用户授权完成后的回调地址,用于接收社交平台返回的授权码。 2. **使用CODE获取刷新令牌**:代码中的`code`是一个授权码,用户在完成社交平台的认证流程后会获得这样的一个授权码。通过该授权码,可以向社交平台的API请求`refresh_token`。 - `getRefreshTokenFromCode`方法是异步的,使用`code`和回调函数作为参数,回调函数处理可能的错误`err`和数据`data`。返回的`data`中包含`refresh_token`。 这个过程符合OAuth2.0协议,是Web应用中最常用的授权方式之一。通过`client_id`和`client_secret`获取授权码,再用授权码交换`access_token`和`refresh_token`,其中`access_token`用于访问用户的受保护资源,`refresh_token`则用于在`access_token`过期后获取新的访问令牌。 #### 标签解读 标签“JavaScript”表明,`social-auth`模块是用JavaScript语言编写的,能够运行在遵循Node.js环境的应用程序中。Node.js是一个基于Chrome V8引擎的JavaScript运行环境,它让JavaScript能够运行在服务器端,适合处理高并发的场景。 #### 压缩包子文件名称解读 文件名称列表中的“social-auth-master”表明这个模块的源代码是存放在一个名为`social-auth`的压缩包文件中,`master`通常意味着该压缩包包含了该模块的主分支代码,是最稳定和最新的版本。 #### 补充知识点 - **OAuth2.0认证流程**:OAuth2.0认证流程包含四种角色:资源拥有者(用户)、资源服务器、客户端(应用)和认证服务器。其中,客户端通过向资源拥有者发起认证请求开始流程,资源拥有者授权后,客户端从认证服务器获得`access_token`和`refresh_token`,用于之后访问资源拥有者的资源。 - **access_token与refresh_token**:`access_token`是访问令牌,用于访问受保护的资源,通常具有较短的有效期。`refresh_token`则是刷新令牌,用于在`access_token`失效时重新获取新的`access_token`,有效期较长。 - **数据库存储**:存储认证信息,如`access_token`、`refresh_token`等,需要使用数据库。常见的数据库有MySQL、MongoDB等,它们可以提供数据持久化存储功能。 综上所述,`social-auth`模块为Node.js环境下的JavaScript应用程序提供了方便快捷的社交平台用户认证解决方案,通过遵循OAuth2.0协议的流程,使得开发者能够轻松地集成社交认证功能,并有效管理和存储认证过程中的相关数据。

相关推荐

filetype

import requests import json from datetime import datetime import pandas as pd from sqlalchemy import create_engine # ======== 1. 请务必替换下面这些值为你自己的配置 ========= # (1)开放平台应用信息 APP_ID = "1833892432352315" # 在巨量引擎开放平台·我的应用里看到的 App ID APP_SECRET = "12c4b768e07d6b83813b56e7efd354f504f3e283" # 对应的 App Secret CALLBACK_URL = "https://example.com/callback" # 回调地址(Redirect URI),须与平台后台填写的一模一样 # (2)SQL Server 连接信息 DB_USER = "sa" DB_PASSWORD = "YourStrong!Pass" # 请替换成你自己的 SQL Server 密码 DB_HOST = "192.168.1.100" # SQL Server 的 IP 或主机名 DB_PORT = 1433 # 默认 1433,如有改动请写对应端口 DB_NAME = "LiveAnalytics" # 目标数据库名称 ODBC_DRIVER = "ODBC+Driver+17+for+SQL+Server" # 已在 Windows 上安装的 ODBC 驱动名称 # 把上面拼成 SQLAlchemy 的连接字符串 # “mssql+pyodbc://<用户>:<密码>@<主机>:<端口>/<数据库名>?driver=<ODBC_DRIVER>” DATABASE_URL = ( f"mssql+pyodbc://{DB_USER}:{DB_PASSWORD}" f"@{DB_HOST}:{DB_PORT}/{DB_NAME}" f"?driver={ODBC_DRIVER}" ) # ========================================================== def get_access_token(auth_code: str) -> str: """ 用 auth_code 换取 access_token。 返回 access_token 字符串,若失败则抛异常。 """ url = "https://open.oceanengine.com/open_api/oauth2/access_token/" payload = { "app_id": APP_ID, "secret": APP_SECRET, "grant_type": "auth_code", "auth_code": auth_code } resp = requests.post(url, data=payload, timeout=10) if resp.status_code != 200: raise RuntimeError(f"换取 access_token HTTP 错误:{resp.status_code},响应文本:{resp.text}") data = resp.json() if data.get("code") != 0: raise RuntimeError(f"换取 access_token 失败:{data.get('message')} (request_id={data.get('request_id')})") return data["data"]["access_token"] def get_advertiser_ids(access_token: str) -> list[int]: """ 用 access_token 调 /advertiser/info/ 接口拿到该 token 可访问的 advertiser_id 列表。 返回一个非空的整型列表;若为空或失败则抛异常。 """ url = "https://open.oceanengine.com/open_api/qianchuan/advertiser/info/" headers = { "Access-Token": access_token, "Content-Type": "application/json" } body = { "page": 1, "page_size": 50 } resp = requests.post(url, headers=headers, json=body, timeout=10) if resp.status_code != 200: raise RuntimeError(f"获取广告主列表 HTTP 错误:{resp.status_code},响应文本:{resp.text}") data = resp.json() if data.get("code") != 0: raise RuntimeError(f"/advertiser/info/ 失败:{data.get('message')} (request_id={data.get('request_id')})") lst = data["data"].get("list", []) ids = [item["advertiser_id"] for item in lst if "advertiser_id" in item] if not ids: raise RuntimeError("advertiser_ids 为空:请检查“授权流程”是否在“选择授权账号”里至少勾选了一个真实广告主,\n" "并且在应用权限里打了“千川账户管理 → 广告主管理 → 获取广告主信息”这一项。") return ids def get_today_live_data(access_token: str, advertiser_id: int, date_str: str) -> list[dict]: """ 用 access_token 和 advertiser_id 调 /today_live/room/get/ 接口,拉指定日期的直播数据。 date_str 格式必须是 "YYYY-MM-DD"。 返回一个字典列表,每条包含 room_id、room_title、watch_cnt、gmv 等字段; 若接口失败则抛异常。 """ url = "https://open.oceanengine.com/open_api/qianchuan/today_live/room/get/" headers = { "Access-Token": access_token, "Content-Type": "application/json" } body = { "advertiser_id": advertiser_id, "date": date_str } resp = requests.post(url, headers=headers, json=body, timeout=10) if resp.status_code != 200: raise RuntimeError(f"拉今日直播 HTTP 错误:{resp.status_code},响应文本:{resp.text}") data = resp.json() if data.get("code") != 0: raise RuntimeError(f"/today_live/room/get/ 失败:{data.get('message')} (request_id={data.get('request_id')})") return data["data"].get("list", []) def write_to_sqlserver(rows: list[dict], table_name: str = "live_data") -> None: """ 把一个字典列表写入 SQL Server 表。如果表不存在则自动建表,否则追加写入。 rows 中的每个 dict 键名对应数据库表的列名。 """ if not rows: print("⚠️ 没有数据,跳过写库。") return df = pd.DataFrame(rows) engine = create_engine(DATABASE_URL, echo=False) # pandas.to_sql 会自动建表(如果不存在),并追加数据 df.to_sql( name = table_name, con = engine, if_exists = "append", index = False, chunksize = 1000 ) print(f"✅ 已成功写入 {len(rows)} 条记录到 SQL Server 表 [{table_name}]") def main(): print("▶️ 请输入在浏览器“授权回调 URL”里 ?auth_code= 后面的值,然后回车:") auth_code = input().strip() if not auth_code: print("❌ auth_code 为空。请先完成「授权→同意授权→回调」流程,拿到回调地址里的 auth_code。") return # Step 1:换取 access_token try: access_token = get_access_token(auth_code) except Exception as e: print(f"❌ 换取 access_token 出错:\n {e}") return print("\n✅ 成功获取 access_token:", access_token) # Step 2:拿到 advertiser_ids 列表 try: advertiser_ids = get_advertiser_ids(access_token) except Exception as e: print(f"\n❌ 获取 advertiser_ids 出错:\n {e}") return print("\n✅ 可用的 advertiser_ids:", advertiser_ids) # 这里只取第一个广告主,若需要批量可循环 chosen_adv = advertiser_ids[0] print(f"\n👉 选中的第一个 advertiser_id:{chosen_adv}") # Step 3:拉今日直播数据 today_str = datetime.now().strftime("%Y-%m-%d") try: live_list = get_today_live_data(access_token, chosen_adv, today_str) except Exception as e: print(f"\n❌ 拉今日直播数据出错:\n {e}") return print(f"\n📊 今日直播总条数:{len(live_list)}") for item in live_list: print(f" • room_id={item.get('room_id')} title={item.get('room_title')} " f"watch_cnt={item.get('watch_cnt')} gmv={item.get('gmv')}") # Step 4:写入 SQL Server # 将字段筛选并拼成适合 DataFrame 的格式 rows = [] for rec in live_list: rows.append({ "room_id": rec.get("room_id"), "room_title": rec.get("room_title"), "watch_count": rec.get("watch_cnt"), "gmv": rec.get("gmv"), "fetch_date": today_str }) try: write_to_sqlserver(rows, table_name="live_data") except Exception as e: print(f"\n❌ 写入 SQL Server 出错:\n {e}") return print("\n▶️ 全流程执行完毕。") if __name__ == "__main__": main() 帮我修改这些错误,主要是API的URL有误

filetype

import json import requests import os import time from datetime import datetime, timedelta import pytz from mutagen.mp3 import MP3 from mutagen.mp4 import MP4 import subprocess import platform # 全局变量,用于缓存 tenant_access_token 和上次更新的时间 tenant_access_token = None token_last_updated = None def get_broadcast_data(): """ 获取并提取播报数据 """ # 获取 tenant_access_token global tenant_access_token, token_last_updated # 检查 token 是否过期,过期则重新获取 if tenant_access_token is None or token_last_updated is None or (datetime.now() - token_last_updated) > timedelta(hours=2): tenant_access_token = get_auth_token() if tenant_access_token: token_last_updated = datetime.now() if not tenant_access_token: print("获取 tenant_access_token 失败!") return [] # 获取 Feishu Bitable 数据 url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/E1zybPqiqa0TaesZjKKch5ZcnJd/tables/tblwFY4k3pmrV5WK/records/search' headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {tenant_access_token}' # 使用获取到的 token } data = {} # 如果需要传递查询条件,可以在这里添加 try: response = requests.post(url, headers=headers, json=data) response.raise_for_status() # 如果响应失败,将抛出异常 response_dict = response.json() # 将返回的 JSON 数据转换为字典 items = response_dict.get("data", {}).get("items", []) data = [] for item in items: fields = item.get("fields", {}) data.append({ "播音日期": extract_broadcast_date(fields, '播音日期'), "时间段": extract_time_segment(fields, '时间段'), "开播音乐file_token": extract_file_token(fields, '开播音乐'), "开场白-播报file_token": extract_file_token(fields, '开场白-播报'), "需更新文案-播报file_token": extract_file_token(fields, '需更新文案-播报'), "壹首歌file_token": extract_file_token(fields, '壹首歌'), "需更新文案2-播报file_token": extract_file_token(fields, '需更新文案2-播报'), "贰首歌file_token": extract_file_token(fields, '贰首歌'), "结束语-播报file_token": extract_file_token(fields, '结束语-播报'), "结束音乐file_token": extract_file_token(fields, '结束音乐') }) return data except requests.exceptions.HTTPError as http_err: print(f"HTTP 错误发生: {http_err}") except Exception as err: print(f"其他错误发生: {err}") return [] def extract_file_token(fields, field_name): """提取 file_token""" field_data = fields.get(field_name, []) if isinstance(field_data, list) and len(field_data) > 0: value = field_data[0] if isinstance(value, dict): return value.get("file_token", "") return '' def extract_time_segment(fields, field_name): """提取时间段字段""" field_data = fields.get(field_name, []) if isinstance(field_data, list) and len(field_data) > 0: value = field_data[0] if isinstance(value, dict): return value.get("text", "") return None def extract_broadcast_date(fields, field_name): """提取播音日期字段""" field_data = fields.get(field_name, 0) if isinstance(field_data, int): try: timestamp = field_data / 1000 # 时间戳转化为秒 parsed_date = datetime.fromtimestamp(timestamp, tz=pytz.utc).astimezone(pytz.timezone('Asia/Shanghai')) return parsed_date.strftime("%Y-%m-%d") # 转换为 "YYYY-MM-DD" 格式 except (ValueError, OverflowError): pass return None def get_auth_token(): """获取认证 token""" url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" headers = {"Content-Type": "application/json; charset=utf-8"} payload = {"app_id": "cli_a882683e8779d00c", "app_secret": "3NKkALA7vyMRVnpKJinmrb1LJ7YuK4H0"} try: response = requests.post(url, json=payload, headers=headers) response.raise_for_status() data = response.json() if data["code"] == 0: return data["tenant_access_token"] else: print(f"请求失败:{data['msg']}(错误码:{data['code']})") except requests.exceptions.RequestException as e: print(f"请求异常:{e}") return None def create_folder(folder_name): """创建文件夹""" if not os.path.exists(folder_name): os.makedirs(folder_name) def download_file(file_token, save_path, authorization): """下载文件""" url = f"https://open.feishu.cn/open-apis/drive/v1/medias/{file_token}/download" headers = {"Authorization": "Bearer " + authorization} try: response = requests.get(url, headers=headers, stream=True) if response.status_code == 200: with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"文件已成功下载到: {save_path}") return True else: print(f"请求失败,状态码: {response.status_code}") print(f"错误信息: {response.text}") except Exception as e: print(f"发生异常: {str(e)}") return False def get_audio_duration(file_path): """获取音频时长""" try: if file_path.endswith(".mp3"): audio = MP3(file_path) elif file_path.endswith(".mp4"): audio = MP4(file_path) else: print(f"不支持的文件格式: {file_path}") return 0 return audio.info.length except Exception as e: print(f"获取音频时长失败: {e}") return 0 def kill_previous_players(): """清理之前残留的播放器进程""" system = platform.system() # 获取当前操作系统类型 try: if system == "Windows": subprocess.run(['taskkill', '/F', '/IM', 'wmplayer.exe'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) subprocess.run(['taskkill', '/F', '/IM', 'vlc.exe'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) elif system == "Darwin": # macOS subprocess.run(['killall', 'afplay'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) elif system == "Linux": subprocess.run(['pkill', 'mpg123'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) print("已清理之前残留的播放器进程") except Exception as e: print(f"清理播放器进程时发生错误: {e}") def play_music_in_folder(folder_path): """播放文件夹中的音频文件,并在播放完成后关闭播放器""" audio_files = [f for f in os.listdir(folder_path) if f.endswith((".mp3", ".mp4"))] processes = [] # 用于存储播放器进程 for file_name in audio_files: full_file_path = os.path.join(folder_path, file_name) try: duration = get_audio_duration(full_file_path) if duration <= 0: print(f"无法获取 {file_name} 的时长,跳过播放") continue print(f"播放 {file_name},预计播放时长:{duration} 秒") if os.name == 'nt': # Windows 系统 process = subprocess.Popen(['start', '', full_file_path], shell=True) elif os.name == 'posix': # MacOS 或 Linux 系统 process = subprocess.Popen(['afplay', full_file_path]) # 对于 MacOS 使用 afplay else: print(f"不支持的操作系统类型: {os.name}") continue processes.append(process) # 保存进程对象 time.sleep(duration) # 等待音频播放完成 except Exception as e: print(f"无法播放 {full_file_path}: {e}") # 关闭所有播放器进程 for process in processes: try: if process.poll() is None: # 检查进程是否仍在运行 process.kill() # 强制终止进程 print("播放器已强制关闭") except Exception as e: print(f"关闭播放器失败: {e}") def process_time_segment(segment, download_offset, play_offset, data, authorization, folder_name): """处理一个时间段的下载和播放""" target_data = next((entry for entry in data if entry["时间段"] == segment), None) if not target_data: print(f"未找到时间段 {segment} 的文件数据!") return current_time = datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%H:%M') segment_start_time = segment.split("-")[0] # 如果当前时间已经超过该时间段的开始时间,则跳过 if current_time > segment_start_time: print(f"当前时间已超过时间段 {segment} 的开始时间,跳过该时间段的处理") return # 等待到达下载时间 download_time = (datetime.strptime(segment_start_time, "%H:%M") - timedelta(minutes=download_offset)).strftime("%H:%M") while current_time != download_time: current_time = datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%H:%M') time.sleep(1) print(f"开始下载 {segment} 的文件") files = [] file_tokens = [ ("开播音乐file_token", "mp3"), ("开场白-播报file_token", "mp4"), ("需更新文案-播报file_token", "mp4"), ("壹首歌file_token", "mp3"), ("需更新文案2-播报file_token", "mp4"), ("贰首歌file_token", "mp3"), ("结束语-播报file_token", "mp4"), ("结束音乐file_token", "mp3") ] for i, (key, file_format) in enumerate(file_tokens): token = target_data.get(key) if token: save_path = os.path.join(folder_name, f"file_{i+1}.{file_format}") if download_file(token, save_path, authorization): files.append(save_path) # 清理之前残留的播放器进程(调整到这里) kill_previous_players() # 等待到达播放时间 play_time = (datetime.strptime(segment_start_time, "%H:%M") - timedelta(minutes=play_offset)).strftime("%H:%M") while current_time != play_time: current_time = datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%H:%M') time.sleep(1) print(f"开始播放 {segment} 的文件") play_music_in_folder(folder_name) # 播放结束后再次清理播放器进程 kill_previous_players() # 删除下载的文件 for file in files: os.remove(file) print(f"已删除文件: {file}") def main(): """主函数""" data = get_broadcast_data() if not data: print("未获取到有效的数据!") return global tenant_access_token authorization = tenant_access_token if not authorization: return folder_name = "bobao" create_folder(folder_name) segments = [ ("08:10-08:15", 10, 0), # 提前10分钟下载文件,0-准点播放 ("10:30-10:40", 10, 0), ("13:00-13:10", 10, 0), ("15:00-15:10", 10, 0), ("16:30-16:35", 10, 0), ("19:00-19:14", 10, 0) ] for segment, download_offset, play_offset in segments: process_time_segment(segment, download_offset, play_offset, data, authorization, folder_name) # 主程序结束后,再清理一次播放器进程 kill_previous_players() if __name__ == "__main__": main() 上述代码可以实现tenant_access_token过期后,自动获取tenant_access_token吗?

filetype

[E 2025-06-09 09:43:01.897 ServerApp] Uncaught exception GET /aext_core_server/feature_flag/init?1749433381438 (::1) HTTPServerRequest(protocol='http', host='localhost:8888', method='GET', uri='/aext_core_server/feature_flag/init?1749433381438', version='HTTP/1.1', remote_ip='::1') Traceback (most recent call last): File "E:\AAAshixunruanjian\Lib\site-packages\tornado\web.py", line 1790, in _execute result = await result ^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_core_server\handlers.py", line 40, in get [account, organizations, account_notebooks] = await asyncio.gather( ^^^^^^^^^^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_shared\handler.py", line 119, in anaconda_cloud_proxy user_access_credentials = await self.get_user_access_credentials(auth_optional=auth_optional) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_shared\handler.py", line 96, in get_user_access_credentials raise e File "E:\AAAshixunruanjian\Lib\site-packages\aext_shared\handler.py", line 93, in get_user_access_credentials new_access_token, expires_in = await get_access_token(self.request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_shared\handler.py", line 182, in get_access_token raise UnauthorizedError({"reason": "missing refresh_token"}) aext_shared.errors.UnauthorizedError: BackendError 403: missing refresh_token [W 2025-06-09 09:43:01.905 ServerApp] 403 GET /aext_core_server/feature_flag/init?1749433381438 (90f71a62e1044316b870c0c31be2b0ee@::1) 327.22ms referer=http://localhost:8888/tree [W 2025-06-09 09:43:03.271 ServerApp] 404 GET /aext_profile_manager_server/account_details?1749433383151 (90f71a62e1044316b870c0c31be2b0ee@::1) 118.93ms referer=http://localhost:8888/tree [E 2025-06-09 09:43:06.372 ServerApp] Uncaught exception GET /aext_core_server/feature_flag/init?1749433385069 (::1) HTTPServerRequest(protocol='http', host='localhost:8888', method='GET', uri='/aext_core_server/feature_flag/init?1749433385069', version='HTTP/1.1', remote_ip='::1') Traceback (most recent call last): File "E:\AAAshixunruanjian\Lib\site-packages\tornado\web.py", line 1790, in _execute result = await result ^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_core_server\handlers.py", line 40, in get [account, organizations, account_notebooks] = await asyncio.gather( ^^^^^^^^^^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_shared\handler.py", line 119, in anaconda_cloud_proxy user_access_credentials = await self.get_user_access_credentials(auth_optional=auth_optional) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_shared\handler.py", line 96, in get_user_access_credentials raise e File "E:\AAAshixunruanjian\Lib\site-packages\aext_shared\handler.py", line 93, in get_user_access_credentials new_access_token, expires_in = await get_access_token(self.request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_shared\handler.py", line 182, in get_access_token raise UnauthorizedError({"reason": "missing refresh_token"}) aext_shared.errors.UnauthorizedError: BackendError 403: missing refresh_token [W 2025-06-09 09:43:06.375 ServerApp] 403 GET /aext_core_server/feature_flag/init?1749433385069 (90f71a62e1044316b870c0c31be2b0ee@::1) 19.23ms referer=http://localhost:8888/tree [W 2025-06-09 09:43:06.893 ServerApp] wrote error: 'Forbidden' Traceback (most recent call last): File "E:\AAAshixunruanjian\Lib\site-packages\tornado\web.py", line 1790, in _execute result = await result ^^^^^^^^^^^^ File "E:\AAAshixunruanjian\Lib\site-packages\aext_assistant_server\handlers.py", line 124, in get raise HTTPError(403, reason="missing nucleus_token") tornado.web.HTTPError: HTTP 403: missing nucleus_token [W 2025-06-09 09:43:06.906 ServerApp] 403 GET /aext_assistant_server/nucleus_token?1749433386647 (90f71a62e1044316b870c0c31be2b0ee@::1) 34.55ms referer=http://localhost:8888/tree [W 2025-06-09 09:43:06.949 ServerApp] 404 GET /aext_profile_manager_server/account_details?1749433386706 (90f71a62e1044316b870c0c31be2b0ee@::1) 2.17ms referer=http://

filetype
filetype

from fastapi import FastAPI, HTTPException, Depends, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from typing import List, Optional, Dict import json import os import uuid from datetime import datetime, timedelta import jwt import hashlib # 初始化FastAPI应用 app = FastAPI(title="诗词库管理系统") # 配置信息 SECRET_KEY = "your-secret-key" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 # JSON文件路径 POETRY_FILE_PATH = os.path.join(os.path.dirname(__file__), "poetry", "poetry.json") USERS_FILE_PATH = os.path.join(os.path.dirname(__file__), "users.json") # 确保用户文件存在 if not os.path.exists(USERS_FILE_PATH): with open(USERS_FILE_PATH, "w", encoding="utf-8") as f: json.dump([], f, ensure_ascii=False, indent=2) # 数据模型 class Poetry(BaseModel): title: str author: str dynasty: str content: str explanation: Optional[str] = None emotion_metaphor: Optional[str] = None background_story: Optional[str] = None appreciation: Optional[str] = None class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str class UserCreate(BaseModel): username: str password: str email: Optional[str] = None full_name: Optional[str] = None # 工具函数 def get_poetry_data() -> List[Dict]: with open(POETRY_FILE_PATH, "r", encoding="utf-8") as f: data = json.load(f) return data def save_poetry_data(data: List[Dict]): with open(POETRY_FILE_PATH, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def get_users_db() -> List[Dict]: with open(USERS_FILE_PATH, "r", encoding="utf-8") as f: return json.load(f) def save_users_db(data: List[Dict]): with open(USERS_FILE_PATH, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def verify_password(plain_password: str, hashed_password: str) -> bool: return plain_password == hashed_password def get_password_hash(password: str) -> str: return password def get_user(db: List[Dict], username: str) -> Optional[UserInDB]: for user in db: if user["username"] == username: return UserInDB(**user) return None def authenticate_user(db: List[Dict], username: str, password: str) -> Optional[UserInDB]: user = get_user(db, username) if not user or not verify_password(password, user.hashed_password): return None return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # OAuth2依赖 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except jwt.PyJWTError: raise credentials_exception users_db = get_users_db() user = get_user(users_db, username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user # API路由 @app.post("/token") async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): users_db = get_users_db() user = authenticate_user(users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.post("/users/", response_model=User) async def create_user(user: UserCreate): users_db = get_users_db() if get_user(users_db, user.username): raise HTTPException(status_code=400, detail="Username already registered") hashed_password = get_password_hash(user.password) user_dict = user.dict() user_dict["hashed_password"] = hashed_password del user_dict["password"] user_dict["disabled"] = False users_db.append(user_dict) save_users_db(users_db) return User(**user_dict) @app.get("/users/me/") async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user # 诗词相关路由 @app.get("/poetry/", response_model=List[Poetry]) async def read_poetry(skip: int = 0, limit: int = 100): poetry_data = get_poetry_data() return poetry_data[skip : skip + limit] @app.post("/poetry/") async def create_poetry(poetry: Poetry, current_user: User = Depends(get_current_active_user)): poetry_data = get_poetry_data() poetry_dict = poetry.dict() # 移除生成id的代码 poetry_data.append(poetry_dict) save_poetry_data(poetry_data) return poetry_dict @app.get("/poetry/search/") async def search_poetry(query: str): poetry_data = get_poetry_data() results = [] query = query.lower() for item in poetry_data: if ( query in item["title"].lower() or query in item["author"].lower() or query in item["dynasty"].lower() or query in item["content"].lower() ): results.append(item) return results # 运行应用 if __name__ == "__main__": import uvicorn uvicorn.run("main:app", port=8080,reload=True)解释代码

filetype

kettle 要实现的逻辑如下: 1. 发送get请求 url = "http://dmp.hbxytc.cn/open_api/authentication/get_access_token" "key": "20250430167419173945385954918401012", "secret": "2e637ad668c8fae556b75eca98416a684af75825" 携带key和secret参数,如果成功返回如下: {'code': 10000, 'message': 'ok', 'description': 'api请求成功', 'uuid': 'ee5b643c8a304287bb03acc094a578f3', 'result': {'access_token': '44369011-6050-4733-bf8d-fd8bfc889533', 'token_type': 'bearer', 'refresh_token': '3128f3d5-a4df-4f43-8592-fee8f0165dd2', 'expires_in': 2661, 'scope': 'server', 'license': 'made by pangu'}} result--{'access_token': '44369011-6050-4733-bf8d-fd8bfc889533', 'token_type': 'bearer', 'refresh_token': '3128f3d5-a4df-4f43-8592-fee8f0165dd2', 'expires_in': 2661, 'scope': 'server', 'license': 'made by pangu'} 获取里面的access_token和expires_in(过期时间单位秒,过期后重新获取) 2. 发送get 请求 url = "http://dmp.hbxytc.cn/open_api/customization/odsxbsjzxsjjcqksj/full" headers = { "Authorization": f"Bearer {access_token}" } 成功后返回的数据如下: { "code": 10000, "message": "ok", "description": "api请求成功", "uuid": "f75993870f7a48f7a86330c3787f7dc9", "result": { "page": 1, "per": 10, "total": 853, "max_page": 86, "data_struct": { "ZJSJWYXBS": "主键数据唯一性标识", "XXDM": "学校代码", "SJKMC": "数据库名称", "SJBMC": "数据表名称", "SJBSM": "数据表说明", "SJBGXPLM": "数据表更新频率码", "XXHXTBH": "信息化系统编号", "SJHJL": "数据汇聚量", "SJJKFWDYL": "数据接口服务调用量", "SJBGXFSM": "数据表更新方式码", "SJBGXRQ": "数据表更新日期", "SJCJSJ": "数据采集时间" }, "data": [ { "ZJSJWYXBS": "17078564", "XXDM": "4142012354", "SJKMC": "主数据库", "SJBMC": "T_ODS_ZHJSSBYXSJ", "SJBSM": "智慧教室设备运行数据子类表", "SJBGXPLM": "5", "XXHXTBH": "3", "SJHJL": 0, "SJJKFWDYL": 0, "SJBGXFSM": "2", "SJBGXRQ": "20250529", "SJCJSJ": "20250529 234014" }, { "ZJSJWYXBS": "17078566", "XXDM": "4142012354", "SJKMC": "主数据库", "SJBMC": "T_ODS_XSXFSJ", "SJBSM": "学生消费数据子类表", "SJBGXPLM": "5", "XXHXTBH": "3", "SJHJL": 0, "SJJKFWDYL": 0, "SJBGXFSM": "2", "SJBGXRQ": "20250529", "SJCJSJ": "20250529 234014" } } page 是页数,max_page是总页数 如果获取第二页携带参数page 获取,data中的列表数据要插入到数据库中,循环获取直到max_page 也为止 请根据上述逻辑给出完整详细的kettle步骤,具体到每一个参数的设置每一个步骤的操作等

咣荀
  • 粉丝: 36
上传资源 快速赚钱