活动介绍

os.makedirs(os.path.join('..', 'data'), exist_ok=True) data_file = os.path.join('..', 'data', 'house_tiny.csv') Traceback (most recent call last): Cell In[73], line 2 os.makedirs(os.path.join('..', 'data'), exist_ok=True) File D:\Program\anaconda3\lib\os.py:225 in makedirs mkdir(name, mode) PermissionError: [WinError 5] 拒绝访问。: '..\\data'

时间: 2024-01-06 18:06:33 浏览: 221
在这种情况下,出现了权限错误。这意味着您没有足够的权限来创建文件夹 `data`。有几种方法可以解决这个问题: 1. 以管理员身份运行您的代码。右键单击您使用的Python IDE或命令提示符,然后选择“以管理员身份运行”。 2. 更改文件夹的权限。右键单击 `data` 文件夹,选择“属性”,然后进入“安全”选项卡。点击“编辑”按钮,然后添加您的用户帐户并为其分配适当的权限。 3. 将数据文件保存在不需要管理员权限的位置,例如您的用户文件夹。您可以修改代码中的路径,将 `data_file` 设置为 `data_file = os.path.join('~', 'house_tiny.csv')`。 请注意,第一种方法是最简单和最常见的解决方法,但如果您没有管理员权限,可能需要尝试其他方法。
相关问题

import os import shutil import random # random.seed(0) #随机种子,可自选开启 def split_data(file_path, label_path, new_file_path, train_rate, val_rate, test_rate): images = os.listdir(file_path) labels = os.listdir(label_path) images_no_ext = {os.path.splitext(image)[0]: image for image in images} labels_no_ext = {os.path.splitext(label)[0]: label for label in labels} matched_data = [(img, images_no_ext[img], labels_no_ext[img]) for img in images_no_ext if img in labels_no_ext] unmatched_images = [img for img in images_no_ext if img not in labels_no_ext] unmatched_labels = [label for label in labels_no_ext if label not in images_no_ext] if unmatched_images: print("未匹配的图片文件:") for img in unmatched_images: print(images_no_ext[img]) if unmatched_labels: print("未匹配的标签文件:") for label in unmatched_labels: print(labels_no_ext[label]) random.shuffle(matched_data) total = len(matched_data) train_data = matched_data[:int(train_rate * total)] val_data = matched_data[int(train_rate * total):int((train_rate + val_rate) * total)] test_data = matched_data[int((train_rate + val_rate) * total):] # 处理训练集 for img_name, img_file, label_file in train_data: old_img_path = os.path.join(file_path, img_file) old_label_path = os.path.join(label_path, label_file) new_img_dir = os.path.join(new_file_path, 'train', 'images') new_label_dir = os.path.join(new_file_path, 'train', 'labels') os.makedirs(new_img_dir, exist_ok=True) os.makedirs(new_label_dir, exist_ok=True) shutil.copy(old_img_path, os.path.join(new_img_dir, img_file)) shutil.copy(old_label_path, os.path.join(new_label_dir, label_file)) # 处理验证集 for img_name, img_file, label_file in val_data: old_img_path = os.path.join(file_path, img_file) old_label_path = os.path.join(label_path, label_file) new_img_dir = os.path.join(new_file_path, 'val', 'images') new_label_dir = os.path.join(new_file_path, 'val', 'labels') os.makedirs(new_img_dir, exist_ok=True) os.makedirs(new_label_dir,

<think>好的,用户想优化Python中数据集划分的代码,包括训练集、验证集和测试集。首先,我需要回忆常见的优化方法。用户提到的引用里有提到sklearn,可能用现成的库会更高效。比如train_test_split,但可能需要多次划分。另外,分层抽样也是一个点,保持数据分布,尤其是分类问题。然后是代码的复用性,写成函数或者类方便调用。还有打乱数据的重要性,避免顺序影响。可能还要考虑数据量大的时候,内存效率的问题,比如生成索引而不是复制数据。 接下来要检查用户提供的现有代码。根据引用2,用户参考了其他博主的代码并改进,可能原来的代码是手动划分,没有利用库函数,效率不高。比如自己写随机抽样,但可能没有考虑分层或者随机种子的问题。或者没有处理数据不平衡的情况。 然后需要思考如何优化。比如使用sklearn的train_test_split两次,先分训练和临时集,再分验证和测试。或者用StratifiedShuffleSplit来做分层划分。另外,添加参数比如stratify用于分类,random_state保证可重复性。还有将代码封装成函数,接受数据集路径、比例参数等,提高灵活性。 还要考虑数据量大的时候,使用生成器或者分批加载,但可能用户的数据不是特别大,所以优先考虑简洁性和可读性。另外,添加异常处理,比如比例之和是否合理,数据是否足够划分。 可能用户原来的代码没有打乱数据,或者打乱的方式不够高效,比如使用numpy的shuffle或者pandas的sample。另外,保存划分后的数据集到文件,避免重复处理。 最后,要结合引用里的内容,引用3提到可以使用sklearn的模块,所以推荐使用库函数。同时,引用1和2提到划分方法,需要确保训练集、验证集和测试集的比例正确,并且随机性可控。</think>针对Python数据集划分代码的优化,可以从以下几个关键点进行改进,结合最佳实践和常用工具库: 1. **使用scikit-learn内置方法** 推荐采用`train_test_split`的二次划分法,提高代码可维护性: ```python from sklearn.model_selection import train_test_split def split_data(data, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15, random_state=None): assert train_ratio + val_ratio + test_ratio == 1.0 # 首次划分:训练集与剩余集 train, temp = train_test_split(data, train_size=train_ratio, random_state=random_state, shuffle=True) # 二次划分计算实际比例 remaining_ratio = val_ratio + test_ratio adj_val_ratio = val_ratio / remaining_ratio val, test = train_test_split(temp, train_size=adj_val_ratio, random_state=random_state) return train, val, test ``` 这种方法通过两次划分确保精确的比例控制[^3] 2. **增加分层抽样功能**(适用于分类任务) ```python def stratified_split(data, stratify_col, ratios, random_state=None): from sklearn.model_selection import train_test_split train, temp = train_test_split(data, test_size=1-ratios[0], stratify=data[stratify_col], random_state=random_state) val, test = train_test_split(temp, test_size=ratios[2]/(ratios[1]+ratios[2]), stratify=temp[stratify_col], random_state=random_state) return train, val, test ``` 该实现能保持各类别样本比例一致[^3] 3. **添加数据预处理增强** ```python def enhanced_split(data, ratios, shuffle=True, random_state=None, normalize=False, drop_na=True): # 数据清洗 if drop_na: data = data.dropna() # 数据标准化 if normalize: from sklearn.preprocessing import StandardScaler scaler = StandardScaler() scaled_data = scaler.fit_transform(data.select_dtypes(include='number')) # 划分逻辑 return split_data(data, *ratios, random_state=random_state) ``` 4. **内存优化方案**(针对大型数据集) ```python def memory_efficient_split(file_path, ratios): # 使用生成器逐行处理 with open(file_path) as f: header = next(f) for line in f: # 根据随机数决定分配区间 rand = random.random() if rand < ratios[0]: yield 'train', line elif rand < ratios[0]+ratios[1]: yield 'val', line else: yield 'test', line ``` **优化建议总结**: 1. 优先使用`scikit-learn`的`train_test_split`,而非手动实现随机抽样 2. 对分类任务添加`stratify`参数保证数据分布一致性 3. 通过`random_state`参数确保结果可复现 4. 添加异常处理机制(如比例校验、空值检测) 5. 支持多种数据格式输入(DataFrame、ndarray、文件路径) 6. 增加时间戳记录和日志输出功能

os.makedirs("output", exist_ok=True) os.makedirs("checkpoints", exist_ok=True)

<think>好的,我现在需要帮助用户理解这两行代码:`os.makedirs("output", exist_ok=True)` 和 `os.makedirs("checkpoints", exist_ok=True)`。首先,我得回忆一下`os.makedirs`函数的作用和参数。用户之前已经问过关于PyTorch设备选择的代码,现在转向文件目录创建,可能是在进行机器学习项目时需要创建输出和检查点目录。 首先,我应该解释这两行代码的基本功能,即创建目录。然后需要强调`exist_ok=True`参数的重要性,因为它可以避免目录已存在时引发的错误。用户可能遇到过因重复运行脚本而导致的错误,所以这个参数很关键。 接下来,我需要对比传统写法,比如先检查目录是否存在再创建,这样能突出`exist_ok=True`的优势,让代码更简洁,避免竞争条件。此外,用户可能关心实际应用场景,比如在训练模型时自动创建保存输出和模型的目录,确保代码在不同环境中都能运行,无需手动创建文件夹。 还要注意常见误区,比如忘记处理目录已存在的情况,或者路径权限问题。需要提醒用户检查路径是否正确,以及是否有写入权限。另外,可能需要提到跨平台兼容性,比如Windows和Linux的路径差异,建议使用`os.path.join`来处理。 用户可能还需要一些进阶技巧,比如动态生成目录名称,结合时间戳,或者处理嵌套目录的结构。同时,异常处理也是重要的,比如捕获权限错误或其他异常情况,增强代码的健壮性。 最后,要总结最佳实践,比如统一管理目录路径,使用`exist_ok=True`来简化代码,确保跨平台兼容性,以及添加必要的异常处理。这样用户能够全面理解这两行代码的作用,并在实际项目中正确应用。</think>以下是关于 `os.makedirs("output", exist_ok=True)` 和 `os.makedirs("checkpoints", exist_ok=True)` 的详细解析,通过对比和场景说明其作用: --- ### 一、**核心功能解析** 这两行代码的 **核心目的** 是创建指定目录: - 若目录不存在 → **自动创建** - 若目录已存在 → **不报错继续执行**(关键特性) - 支持创建 **多级嵌套目录**(如 `a/b/c`) #### 类比解释 > 类似「智能文件夹生成器」:检查保险柜是否存在,如果不存在就新建一个,如果已存在则不破坏原有内容 --- ### 二、**参数解析对比** #### 关键参数 `exist_ok=True` | 参数状态 | 行为表现 | 典型场景 | |----------------|----------------------------------|-----------------------| | `exist_ok=True` | 目录存在时不报错 | 避免重复执行时崩溃 | | 默认(无此参数) | 目录存在时抛出 `FileExistsError` | 需要严格检测目录新建 | #### 示例对比 ```python # 危险写法(可能崩溃) os.makedirs("output") # 若目录已存在 → 报错 # 安全写法(推荐) os.makedirs("output", exist_ok=True) # 兼容目录已存在场景 ``` --- ### 三、**典型应用场景** #### 场景1:机器学习项目初始化 ```python # 创建训练输出目录 os.makedirs("output/logs", exist_ok=True) # 保存训练日志 os.makedirs("output/models", exist_ok=True) # 保存训练模型 os.makedirs("checkpoints", exist_ok=True) # 保存训练中间状态 ``` #### 场景2:文件批量处理 ```python for filename in file_list: output_path = f"processed/{filename[:-4]}_result" os.makedirs(output_path, exist_ok=True) # 为每个文件创建独立输出目录 ``` --- ### 四、**与传统写法的对比** #### ❌ 旧式写法(存在缺陷) ```python if not os.path.exists("output"): os.makedirs("output") # 可能产生竞态条件(race condition) ``` #### ✅ 现代安全写法 ```python os.makedirs("output", exist_ok=True) # 原子操作,避免竞态条件 ``` --- ### 五、**常见问题与解决方案** | 问题现象 | 原因分析 | 解决方案 | |-----------------------------|-------------------------|----------------------------| | `PermissionError` | 无目标目录的写入权限 | 检查路径权限或使用用户目录 | | `NotADirectoryError` | 路径中包含文件同名冲突 | 删除冲突文件或修改路径名称 | | 跨平台路径符号问题 | Windows/Linux路径差异 | 使用 `os.path.join()` | --- ### 六、**进阶使用技巧** #### 1. 动态路径生成 ```python timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") run_dir = f"experiments/run_{timestamp}" os.makedirs(run_dir, exist_ok=True) # 按时间戳创建唯一实验目录 ``` #### 2. 嵌套目录结构 ```python # 一次性创建多级目录 os.makedirs("project/data/raw", exist_ok=True) # 创建三级目录结构 ``` #### 3. 异常处理增强 ```python try: os.makedirs("output", exist_ok=True) except OSError as error: print(f"目录创建失败: {error}") ``` --- ### 七、**最佳实践建议** 1. **统一路径管理**:在代码开头集中定义路径常量 ```python OUTPUT_DIR = "output" CHECKPOINT_DIR = "checkpoints" os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(CHECKPOINT_DIR, exist_ok=True) ``` 2. **路径标准化**:使用 `os.path` 处理跨平台兼容 ```python config_path = os.path.join(OUTPUT_DIR, "config.yaml") ``` 3. **环境检测**:验证目录是否可写 ```python if not os.access(OUTPUT_DIR, os.W_OK): print(f"警告:无写入权限 {OUTPUT_DIR}") ``` 如果需要具体场景的目录结构设计建议,可以告诉我具体需求! 📂
阅读全文

相关推荐

import os import numpy as np from scipy.io import loadmat def mat_to_npy(mat_path, save_path, chunk_size=401*1024): mat = loadmat(mat_path) # 提取振动信号(假设信号存储在'X'开头的变量中) signal = mat[next(key for key in mat if key.startswith('X'))] # 转换为一维数组 signal = signal.flatten() # 计算每个chunk的样本数(假设每个float32占4字节) samples_per_chunk = chunk_size // 4 # 分割数据 for i in range(0, len(signal), samples_per_chunk): chunk = signal[i:i+samples_per_chunk] np.save(f"{save_path}_part{i//samples_per_chunk}.npy", chunk) root_dir = "data/real" os.makedirs(os.path.join(root_dir, "train", "normal"), exist_ok=True) os.makedirs(os.path.join(root_dir, "val", "normal"), exist_ok=True) for fault_type in ["inner_race", "outer_race", "roller"]: os.makedirs(os.path.join(root_dir, "train", fault_type), exist_ok=True) os.makedirs(os.path.join(root_dir, "val", fault_type), exist_ok=True) def process_data(input_dir, output_dir, category): for load in os.listdir(input_dir): load_dir = os.path.join(input_dir, load) if not os.path.isdir(load_dir): continue # 划分训练集和验证集(示例:负载0-2为训练,负载3为验证) split = "train" if load != "3" else "val" for mat_file in os.listdir(load_dir): if mat_file.endswith(".mat"): mat_path = os.path.join(load_dir, mat_file) save_path = os.path.join(output_dir, split, category, mat_file[:-4]) mat_to_npy(mat_path, save_path) # 处理正常数据 normal_dir = "CRWU/Normal Baseline" process_data(normal_dir, "data/real", "normal") # 处理故障数据 fault_categories = { "Ball": "roller", "Inner Race": "inner_race", "Outer Race": "outer_race" } for freq_dir in ["12k Drive End Bearing Fault Data", "48k Drive End Bearing Fault Data", "12k Fan End Bearing Fault Data"]: for fault_type, target in fault_categories.items(): fault_path = os.path.join("CRWU", freq_dir, fault_type) if os.path.exists(fault_path): process_data(fault_path, "data/real", target)

此代码import os import numpy as np from PIL import Image def process_image(image_path, save_path): # 读取nii文件 image_array = np.load(image_path).astype(np.float32) # 归一化到0-255之间 image_array = (image_array - np.min(image_array)) / (np.max(image_array) - np.min(image_array)) * 255 # 将数据类型转换为uint8 image_array = image_array.astype(np.uint8) # 将三维图像分成若干个二维图像 for i in range(image_array.shape[0]): image = Image.fromarray(image_array[i]) image.save(os.path.join(save_path, f"{i}.png")) def process_label(label_path, save_path): # 读取nii文件 label_array = np.load(label_path).astype(np.uint8) # 将标签转换为灰度图 label_array[label_array == 1] = 255 label_array[label_array == 2] = 128 # 将三维标签分成若干个二维标签 for i in range(label_array.shape[0]): label = Image.fromarray(label_array[i]) label.save(os.path.join(save_path, f"{i}.png")) # LiTS2017数据集路径 data_path = "C:\\Users\\Administrator\\Desktop\\LiTS2017" # 保存路径 save_path = "C:\\Users\\Administrator\\Desktop\\2D-LiTS2017" # 创建保存路径 os.makedirs(save_path, exist_ok=True) os.makedirs(os.path.join(save_path, "image"), exist_ok=True) os.makedirs(os.path.join(save_path, "mask"), exist_ok=True) # 处理Training Batch 1 image_path = os.path.join(data_path, "Training Batch 1", "volume-{}.npy") for i in range(131): process_image(image_path.format(i), os.path.join(save_path, "image")) # 处理Training Batch 2 label_path = os.path.join(data_path, "Training Batch 2", "segmentation-{}.npy") for i in range(131): process_label(label_path.format(i), os.path.join(save_path, "mask"))出现FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Administrator\\Desktop\\LiTS2017\\Training Batch 1\\volume-0.npy',修复它,并给出完整代码

if __name__ == '__main__': PROCESSMANAGER_DATA_PATH = '/data' PROCESSMANAGER_LOG_PATH = os.path.join(PROCESSMANAGER_DATA_PATH, 'logs') PROCESSMANAGER_LOG_ARCHIVE_PATH = os.path.join(PROCESSMANAGER_LOG_PATH, 'archive') if not os.path.exists(PROCESSMANAGER_LOG_PATH): os.makedirs(PROCESSMANAGER_LOG_PATH) if not os.path.exists(PROCESSMANAGER_LOG_ARCHIVE_PATH): os.makedirs(PROCESSMANAGER_LOG_ARCHIVE_PATH) parser = argparse.ArgumentParser(description='Processmanager for Robot Test Executions') parser.add_argument('--robot_ip', type=str, help="Robot vacuum IP") args = parser.parse_args() logfile_path = os.path.join(PROCESSMANAGER_LOG_PATH, f"processmanager_{args.robot_ip}.log") # in case an old log does exist: keep and rename it if os.path.exists(logfile_path) and os.path.isfile(logfile_path): archive_timestamp = time.time() archived_logfile_path = os.path.join(PROCESSMANAGER_LOG_ARCHIVE_PATH, f"processmanager_{args.robot_ip}_{archive_timestamp}.log") shutil.move(logfile_path, archived_logfile_path) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(logfile_path), logging.StreamHandler() # when you want to see log print in console ] ) logging.log(logging.INFO, "Booting Processmanager..") print("Starting Processmanager", flush=True) try: _ = ipaddress.ip_address(args.robot_ip) except ValueError: print(f"Error: Invalid IP Address provided: {args.robot_ip}") logging.error(f"Error: Invalid IP Address provided: {args.robot_ip}") raise ValueError loop = asyncio.get_event_loop() async_lock = asyncio.Lock() processmanager_state = ProcessManagerState(args.robot_ip, loop) queue_runner = QueueRunner(args.robot_ip, processmanager_state, async_lock) comm_interface = CommInterface(args.robot_ip, processmanager_state, async_lock) asyncio.ensure_future(queue_runner.queue_item_execution()) asyncio.ensure_future(comm_interface.fetch_rabbitmq()) def handle_sigterm(): loop.stop() logging.log(logging.INFO, "Received SIGTERM: Shutting down Processmanager..") processmanager_state.set_queue_offline() sys.exit(0) loop.add_signal_handler(signal.SIGTERM, handle_sigterm) loop.add_signal_handler(signal.SIGINT, handle_sigterm) print("State, Queue Runner and Comm Interface initialized, starting loops..", flush=True) loop.run_forever()

优化代码 def fault_classification_wrapper(vin, main_path, data_path, log_path, done_path): start_time = time.time() isc_path = os.path.join(done_path, vin, 'isc_cal_result', f'{vin}_report.xlsx') if not os.path.exists(isc_path): print('No isc detection input!') else: isc_input = isc_produce_alarm(isc_path, vin) ica_path = os.path.join(done_path, vin, 'ica_cal_result', f'ica_detection_alarm_{vin}.csv') if not os.path.exists(ica_path): print('No ica detection input!') else: ica_input = ica_produce_alarm(ica_path) soh_path = os.path.join(done_path, vin, 'SOH_cal_result', f'{vin}_sohAno.csv') if not os.path.exists(soh_path): print('No soh detection input!') else: soh_input = soh_produce_alarm(soh_path, vin) alarm_df = pd.concat([isc_input, ica_input, soh_input]) alarm_df.reset_index(drop=True, inplace=True) alarm_df['alarm_cell'] = alarm_df['alarm_cell'].apply(lambda _: str(_)) print(vin) module = AutoAnalysisMain(alarm_df, main_path, data_path, done_path) module.analysis_process() flags = os.O_WRONLY | os.O_CREAT modes = stat.S_IWUSR | stat.S_IRUSR with os.fdopen(os.open(os.path.join(log_path, 'log.txt'), flags, modes), 'w') as txt_file: for k, v in module.output.items(): txt_file.write(k + ':' + str(v)) txt_file.write('\n') for x, y in module.output_sub.items(): txt_file.write(x + ':' + str(y)) txt_file.write('\n\n') fc_result_path = os.path.join(done_path, vin, 'fc_result') if not os.path.exists(fc_result_path): os.makedirs(fc_result_path) pd.DataFrame(module.output).to_csv( os.path.join(fc_result_path, 'main_structure.csv')) df2 = pd.DataFrame() for subs in module.output_sub.keys(): sub_s = pd.Series(module.output_sub[subs]) df2 = df2.append(sub_s, ignore_index=True) df2.to_csv(os.path.join(fc_result_path, 'sub_structure.csv')) end_time = time.time() print("time cost of fault classification:", float(end_time - start_time) * 1000.0, "ms") return

#!/usr/local/bin/python3 # -*- coding: utf-8 -*- import asyncio import argparse import os import shutil import sys import time import signal from state import ProcessManagerState from queue_runner import QueueRunner from comm_interface import CommInterface import ipaddress import logging from dotenv import load_dotenv load_dotenv() if __name__ == '__main__': PROCESSMANAGER_DATA_PATH = '/data' PROCESSMANAGER_LOG_PATH = os.path.join(PROCESSMANAGER_DATA_PATH, 'logs') PROCESSMANAGER_LOG_ARCHIVE_PATH = os.path.join(PROCESSMANAGER_LOG_PATH, 'archive') if not os.path.exists(PROCESSMANAGER_LOG_PATH): os.makedirs(PROCESSMANAGER_LOG_PATH) if not os.path.exists(PROCESSMANAGER_LOG_ARCHIVE_PATH): os.makedirs(PROCESSMANAGER_LOG_ARCHIVE_PATH) parser = argparse.ArgumentParser(description='Processmanager for Robot Test Executions') parser.add_argument('--robot_ip', type=str, help="Robot vacuum IP") args = parser.parse_args() logfile_path = os.path.join(PROCESSMANAGER_LOG_PATH, f"processmanager_{args.robot_ip}.log") # in case an old log does exist: keep and rename it if os.path.exists(logfile_path) and os.path.isfile(logfile_path): archive_timestamp = time.time() archived_logfile_path = os.path.join(PROCESSMANAGER_LOG_ARCHIVE_PATH, f"processmanager_{args.robot_ip}_{archive_timestamp}.log") shutil.move(logfile_path, archived_logfile_path) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(logfile_path), logging.StreamHandler() # when you want to see log print in console ] ) logging.log(logging.INFO, "Booting Processmanager..") print("Starting Processmanager", flush=True) try: _ = ipaddress.ip_address(args.robot_ip) except ValueError: print(f"Error: Invalid IP Address provided: {args.robot_ip}") logging.error(f"Error: Invalid IP Address provided: {args.robot_ip}") raise ValueError loop = asyncio.get_event_loop() async_lock = asyncio.Lock() processmanager_state = ProcessManagerState(args.robot_ip, loop) queue_runner = QueueRunner(args.robot_ip, processmanager_state, async_lock) comm_interface = CommInterface(args.robot_ip, processmanager_state, async_lock) asyncio.ensure_future(queue_runner.queue_item_execution()) asyncio.ensure_future(comm_interface.fetch_rabbitmq()) def handle_sigterm(): loop.stop() logging.log(logging.INFO, "Received SIGTERM: Shutting down Processmanager..") processmanager_state.set_queue_offline() sys.exit(0) loop.add_signal_handler(signal.SIGTERM, handle_sigterm) loop.add_signal_handler(signal.SIGINT, handle_sigterm) print("State, Queue Runner and Comm Interface initialized, starting loops..", flush=True) loop.run_forever()

import math import os import struct from argparse import ArgumentParser import av import numpy as np import open3d as o3d import rosbag import yaml from sensor_msgs import point_cloud2 import subprocess from protoc.octopudata_localizationinfo_pb2 import LocalizationInfoFrame, LocalizationInfo from protoc.octopudata_trackedobject_pb2 import TrackedObjectFrame, Object, TrackedObject from protoc.octopusdata_controlcommand_pb2 import CommandFrame, ControlCommand from protoc.octopusdata_gnss_pb2 import GnssPoint, GnssPoints from protoc.octopusdata_plantrajectory_pb2 import Trajectory, TrajectoryPoint, PlanTrajectory from protoc.octopusdata_predictionobstacles_pb2 import PerceptionObstacle, \ Obstacle, PredictionTrajectory, PathPoint, PredictionObstacles from protoc.octopusdata_routingpath_pb2 import RoutingPath, Path, Point, RoutingFrames from protoc.octopusdata_vehicleinfo_pb2 import VehicleFrame, VehicleInfo av.logging.set_level(av.logging.PANIC) codec_ctx = av.codec.Codec('hevc','r') h265_code = codec_ctx.create() class Pose: def __init__(self, q0, q1, q2, q3, x, y, z): self.q0 = q0 self.q1 = q1 self.q2 = q2 self.q3 = q3 self.x = x self.y = y self.z = z def get_ts(secs, nsecs): return int(secs * 1000 + nsecs / 1000000) def get_modulo(x, y, z): return math.sqrt(x * x + y * y + z * z) def yaw_from_quaternions(w, x, y, z): a = 2 * (w * z + x * y) b = 1 - 2 * (y * y + z * z) return math.atan2(a, b) def pose_has_nan(p): return math.isnan(p.x) or math.isnan(p.y) or math.isnan(p.z) or \ math.isnan(p.q0) or math.isnan(p.q1) or math.isnan(p.q2) or \ math.isnan(p.q3) def get_t(q0, q1, q2, q3, x, y, z): aa = 1 - 2 * (q2 * q2 + q3 * q3) ab = 2 * (q1 * q2 - q0 * q3) ac = 2 * (q1 * q3 + q0 * q2) ba = 2 * (q1 * q2 + q0 * q3) bb = 1 - 2 * (q1 * q1 + q3 * q3) bc = 2 * (q2 * q3 - q0 * q1) ca = 2 * (q1 * q3 - q0 * q2) cb = 2 * (q2 * q3 + q0 * q1) cc = 1 - 2 * (q1 * q1 + q2 * q2) t = np.mat([[aa, ab, ac, x], [ba, bb, bc, y], [ca, cb, cc, z], [0, 0, 0, 1]]) return t def get_label(perception_typ, subtype): if perception_typ == 3: return 'Pedestrian' elif perception_typ == 4: return 'Bi_Tricycle' elif perception_typ == 5: if subtype == 5: return 'Truck' elif subtype == 6: return 'Bus' else: return 'Car' else: return 'unknow' def main(args): for file in os.listdir(args.input): if file.endswith('.bag'): bag_path = os.path.join(args.input, file) bag = rosbag.Bag(bag_path, "r") output_dir = os.getenv('output_dir') if not os.path.exists(os.path.join(output_dir, 'innoPtClound_A4')): os.makedirs(os.path.join(output_dir, 'innoPtClound_A4')) if not os.path.exists(os.path.join(output_dir, 'innoPtClound_B2')): os.makedirs(os.path.join(output_dir, 'innoPtClound_B2')) if not os.path.exists(os.path.join(output_dir, 'radar_track_array_0')): os.makedirs(os.path.join(output_dir, 'radar_track_array_0')) if not os.path.exists(os.path.join(output_dir, 'radar_track_array_3')): os.makedirs(os.path.join(output_dir, 'radar_track_array_3')) if not os.path.exists(os.path.join(output_dir, 'mdc_camera_instance_74')): os.makedirs(os.path.join(output_dir, 'mdc_camera_instance_74')) if not os.path.exists(os.path.join(output_dir, 'mdc_camera_instance_73')): os.makedirs(os.path.join(output_dir, 'mdc_camera_instance_73')) if not os.path.exists(os.path.join(output_dir, 'mdc_camera_instance_72')): os.makedirs(os.path.join(output_dir, 'mdc_camera_instance_72')) if not os.path.exists(os.path.join(output_dir, 'mdc_camera_instance_71')): os.makedirs(os.path.join(output_dir, 'mdc_camera_instance_71')) routes = [] controls = [] plans = [] preds = [] gnss = [] vehs = [] locs = [] objs = [] ego_pose = None has_camera_71 = False has_camera_72 = False has_camera_73 = False has_camera_74 = False has_lidar_A4 = False has_lidar_B2 = False has_radar_0 = False has_radar_3 = False lidar_num = 0 image_num = 0 radar_num = 0 for topic, msg, t in bag.read_messages(): time_stamp = int(t.to_sec() * 1000) # 以 rosbag 时间戳(t)为基准,转换为 13 位时间戳 if topic == '/innoPtClound_A4': ### 图达通 时间辍应该是13位数字,图达通雷达8位,华为96线6位 # file_path = os.path.join(output_dir, 'innoPtClound_A4', '{}.pcd'.format(int(msg.header.stamp.secs * 1000 + # msg.header.stamp.nsecs / 1000000))) file_path = os.path.join(output_dir, 'innoPtClound_A4', '{}.pcd'.format(time_stamp)) # print(file_path) # 提取点云数据,包括 x, y, z points = list(point_cloud2.read_points(msg, field_names=["x", "y", "z", "intensity"], skip_nans=True)) if points: # 转换为 numpy 数组,添加 intensity, ring, timestamp 字段 np_points = np.array(points) # (N, 3), 包含 x, y, z # 转换为 Open3D 格式点云 pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(np_points[:, :3]) # x, y, z pcd.colors = o3d.utility.Vector3dVector(np.tile(np_points[:, 3:4] / np_points[:, 3:4].max(), (1, 3))) # 用 intensity 作为灰度颜色 o3d.io.write_point_cloud(file_path, pcd) lidar_num += 1 has_lidar_A4 = True elif topic == '/innoPtClound_B2': ### 图达通 # file_path = os.path.join(output_dir, 'innoPtClound_B2', '{}.pcd'.format(int(msg.header.stamp.secs * 1000 + # msg.header.stamp.nsecs / 1000000))) file_path = os.path.join(output_dir, 'innoPtClound_B2', '{}.pcd'.format(time_stamp)) # print(file_path) # 提取点云数据,包括 x, y, z points = list(point_cloud2.read_points(msg, field_names=["x", "y", "z", "intensity"], skip_nans=True)) if points: # 转换为 numpy 数组,添加 intensity, ring, timestamp 字段 np_points = np.array(points) # (N, 3), 包含 x, y, z # 转换为 Open3D 格式点云 pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(np_points[:, :3]) # x, y, z pcd.colors = o3d.utility.Vector3dVector(np.tile(np_points[:, 3:4] / np_points[:, 3:4].max(), (1, 3))) # 用 intensity 作为灰度颜色 o3d.io.write_point_cloud(file_path, pcd) lidar_num += 1 has_lidar_B2 = True elif topic == 'mdc_camera_instance_74': ### 相机 时间辍应该是13位数字 # time_stamp = int(msg.header.stamp.secs * 1000 + # msg.header.stamp.nsecs / 1000000) file_path = os.path.join(output_dir, 'mdc_camera_instance_74', '{}.jpg'.format(time_stamp)) packet = av.packet.Packet(msg.data) try: out = h265_code.decode(packet) for frame in out: if frame.format.name != 'rgb24': frame = frame.reformat(format='rgb24') img = frame.to_image() img.save(file_path) image_num += 1 has_camera_74 = True except Exception as e: print("{} frame can not trans to jpg".format(time_stamp), e) elif topic == 'mdc_camera_instance_73': ### 相机 时间辍应该是13位数字 # time_stamp = int(msg.header.stamp.secs * 1000 + # msg.header.stamp.nsecs / 1000000) file_path = os.path.join(output_dir, 'mdc_camera_instance_73', '{}.jpg'.format(time_stamp)) packet = av.packet.Packet(msg.data) try: out = h265_code.decode(packet) for frame in out: if frame.format.name != 'rgb24': frame = frame.reformat(format='rgb24') img = frame.to_image() img.save(file_path) image_num += 1 has_camera_73 = True except Exception as e: print("{} frame can not trans to jpg".format(time_stamp), e) elif topic == 'mdc_camera_instance_72': ### 相机 # time_stamp = int(msg.header.stamp.secs * 1000 + # msg.header.stamp.nsecs / 1000000) file_path = os.path.join(output_dir, 'mdc_camera_instance_72', '{}.jpg'.format(time_stamp)) packet = av.packet.Packet(msg.data) try: out = h265_code.decode(packet) for frame in out: if frame.format.name != 'rgb24': frame = frame.reformat(format='rgb24') img = frame.to_image() img.save(file_path) image_num += 1 has_camera_72 = True except Exception as e: print("{} frame can not trans to jpg".format(time_stamp), e) elif topic == 'mdc_camera_instance_71': ### 相机 # time_stamp = int(msg.header.stamp.secs * 1000 + # msg.header.stamp.nsecs / 1000000) file_path = os.path.join(output_dir, 'mdc_camera_instance_71', '{}.jpg'.format(time_stamp)) packet = av.packet.Packet(msg.data) try: out = h265_code.decode(packet) for frame in out: if frame.format.name != 'rgb24': frame = frame.reformat(format='rgb24') img = frame.to_image() img.save(file_path) image_num += 1 has_camera_71 = True except Exception as e: print("{} frame can not trans to jpg".format(time_stamp), e) elif topic == '/radar_track_array_0': ### 大陆408 时间辍应该是13位数字 # file_path = os.path.join(output_dir, 'radar_track_array_0', '{}.pcd'.format(int(msg.header.stamp.secs * 1000 + # msg.header.stamp.nsecs / 1000000))) file_path = os.path.join(output_dir, 'radar_track_array_0', '{}.pcd'.format(time_stamp)) points = [] for track in msg.trackList: x, y, z = track.x, track.y, track.z vx, vy, ax, ay = track.vx, track.vy, track.ax, track.ay rcs, snr, yawRate = track.rcs, track.snr, track.yawRate obj_id, trackType, lifetime = track.id, track.trackType, track.lifetime # 过滤无效点 if np.isnan(x) or np.isnan(y) or np.isnan(z): continue points.append([x, y, z, vx, vy, ax, ay, rcs, snr, yawRate, obj_id, trackType, lifetime]) if not points: print("没有有效点云数据") continue points = np.array(points) # **写入 PCD 文件** with open(file_path, 'w') as f: # **写入 PCD 头部** f.write("# .PCD v0.7 - Point Cloud Data file format\n") f.write("VERSION 0.7\n") f.write(f"FIELDS x y z vx vy ax ay rcs snr yawRate id trackType lifetime\n") f.write("SIZE 4 4 4 4 4 4 4 4 4 4 4 4 4\n") f.write("TYPE F F F F F F F F F F I I I\n") # F = float, I = int f.write("COUNT 1 1 1 1 1 1 1 1 1 1 1 1 1\n") f.write(f"WIDTH {points.shape[0]}\n") f.write("HEIGHT 1\n") f.write("VIEWPOINT 0 0 0 1 0 0 0\n") f.write(f"POINTS {points.shape[0]}\n") f.write("DATA ascii\n") # **写入点云数据** np.savetxt(f, points, fmt="%.6f %.6f %.6f %.6f %.6f %.6f %.6f %.6f %.6f %.6f %d %d %d") radar_num += 1 has_radar_0 = True elif topic == '/radar_track_array_3': ### 大陆408 # file_path = os.path.join(output_dir, 'radar_track_array_3', '{}.pcd'.format(int(msg.header.stamp.secs * 1000 + # msg.header.stamp.nsecs / 1000000))) file_path = os.path.join(output_dir, 'radar_track_array_3', '{}.pcd'.format(time_stamp)) points = [] for track in msg.trackList: x, y, z = track.x, track.y, track.z vx, vy, ax, ay = track.vx, track.vy, track.ax, track.ay rcs, snr, yawRate = track.rcs, track.snr, track.yawRate obj_id, trackType, lifetime = track.id, track.trackType, track.lifetime # 过滤无效点 if np.isnan(x) or np.isnan(y) or np.isnan(z): continue points.append([x, y, z, vx, vy, ax, ay, rcs, snr, yawRate, obj_id, trackType, lifetime]) if not points: print("没有有效点云数据") continue points = np.array(points) # **写入 PCD 文件** with open(file_path, 'w') as f: # **写入 PCD 头部** f.write("# .PCD v0.7 - Point Cloud Data file format\n") f.write("VERSION 0.7\n") f.write(f"FIELDS x y z vx vy ax ay rcs snr yawRate id trackType lifetime\n") f.write("SIZE 4 4 4 4 4 4 4 4 4 4 4 4 4\n") f.write("TYPE F F F F F F F F F F I I I\n") # F = float, I = int f.write("COUNT 1 1 1 1 1 1 1 1 1 1 1 1 1\n") f.write(f"WIDTH {points.shape[0]}\n") f.write("HEIGHT 1\n") f.write("VIEWPOINT 0 0 0 1 0 0 0\n") f.write(f"POINTS {points.shape[0]}\n") f.write("DATA ascii\n") # **写入点云数据** np.savetxt(f, points, fmt="%.6f %.6f %.6f %.6f %.6f %.6f %.6f %.6f %.6f %.6f %d %d %d") radar_num += 1 has_radar_3 = True elif topic == '/routing/routing_response_viz': rv = RoutingPath() rv.timestamp = int(t.secs * 1000 + t.nsecs / 1000000) rv.stamp_secs = t.secs rv.stamp_nsecs = t.nsecs mark_list = list() for mark in msg.markers: path_pb = Path() path_pb.id = mark.id point_list = [] for point in mark.points: point_pb = Point() point_pb.x = point.x point_pb.y = point.y point_pb.z = point.z point_list.append(point_pb) path_pb.path_point.extend(point_list) mark_list.append(path_pb) rv.routing_path_info.extend(mark_list) routes.append(rv) elif topic == '/holo/ControlCommand': cf = CommandFrame() cf.timestamp = int(t.secs * 1000 + t.nsecs / 1000000) cf.stamp_secs = t.secs cf.stamp_nsecs = t.nsecs cf.acceleration = msg.acceleration cf.front_wheel_angle = msg.front_wheel_angle cf.gear = msg.gear controls.append(cf) elif topic == '/planning/trajectory': tj = Trajectory() tj.timestamp = int(t.secs * 1000 + t.nsecs / 1000000) tj.stamp_secs = t.secs tj.stamp_nsecs = t.nsecs p_list = [] for point in msg.trajectory_points: p = TrajectoryPoint() p.x = point.path_point.point.x p.y = point.path_point.point.y p.z = point.path_point.point.z p.theta = point.path_point.theta p.kappa = point.path_point.kappa p.v = point.v p.a = point.a p.relative_time = point.relative_time p_list.append(p) tj.trajectory_points.extend(p_list) plans.append(tj) elif topic == '/prediction/prediction_obstacles': tr_pb = PerceptionObstacle() tr_pb.timestamp = int(msg.header.stamp.secs * 1000 + msg.header.stamp.nsecs / 1000000) tr_pb.stamp_secs = msg.header.stamp.secs tr_pb.stamp_nsecs = msg.header.stamp.nsecs obj_list = list() for obj in msg.prediction_obstacle: ob_pb = Obstacle() ob_pb.obstacle_timestamp = int(obj.timestamp * 1000) ob_pb.id = obj.perception_obstacle.id ob_pb.x = obj.perception_obstacle.position.x ob_pb.y = obj.perception_obstacle.position.y ob_pb.z = obj.perception_obstacle.position.z traj_pbs = [] for traj in obj.trajectory: traj_pb = PredictionTrajectory() points_pbs = [] for trajectory_point in traj.trajectory_points: point_pb = PathPoint() point_pb.x = trajectory_point.path_point.point.x point_pb.y = trajectory_point.path_point.point.y point_pb.z = trajectory_point.path_point.point.z point_pb.theta = trajectory_point.path_point.theta point_pb.kappa = trajectory_point.path_point.kappa point_pb.lane_id = trajectory_point.path_point.lane_id point_pb.v = trajectory_point.v point_pb.a = trajectory_point.a point_pb.relative_time = trajectory_point.relative_time points_pbs.append(point_pb) traj_pb.path_point.extend(points_pbs) traj_pbs.append(traj_pb) ob_pb.prediction_trajectory.extend(traj_pbs) obj_list.append(ob_pb) tr_pb.obstacle_info.extend(obj_list) preds.append(tr_pb) elif topic == '/inspvax': pb_loc_gnss = GnssPoint() pb_loc_gnss.stamp_secs = msg.header.stamp.secs # 1 pb_loc_gnss.stamp_nsecs = msg.header.stamp.nsecs # 2 pb_loc_gnss.timestamp = get_ts(msg.header.stamp.secs, msg.header.stamp.nsecs) pb_loc_gnss.latitude = msg.latitude # 3 pb_loc_gnss.longitude = msg.longitude # 4 pb_loc_gnss.elevation = msg.altitude gnss.append(pb_loc_gnss) elif topic == '/holo/VehicleInfoMagotan': veh_pb = VehicleFrame() veh_pb.stamp_secs = msg.timestamp.secs # 1 veh_pb.stamp_nsecs = msg.timestamp.nsecs # 2 veh_pb.timestamp = get_ts(veh_pb.stamp_secs, veh_pb.stamp_nsecs) veh_pb.gear_value = msg.gear # 4 veh_pb.vehicle_speed = msg.vehicle_speed * 3.6 # 5 veh_pb.steering_angle = msg.steering_angle # 6 veh_pb.longitude_acc = msg.longitude_acc veh_pb.lateral_acc = msg.lateral_acc veh_pb.turn_left_light = msg.turn_left_light veh_pb.turn_right_light = msg.turn_right_light veh_pb.brake = msg.brake_torque veh_pb.autonomy_status = 0 vehs.append(veh_pb) elif topic == '/localization/localization_info': lo_pb = LocalizationInfoFrame() lo_pb.timestamp = get_ts(msg.header.stamp.secs, msg.header.stamp.nsecs) lo_pb.stamp_secs = msg.header.stamp.secs lo_pb.stamp_nsecs = msg.header.stamp.nsecs lo_pb.pose_position_x = msg.pose.position.x lo_pb.pose_position_y = msg.pose.position.y lo_pb.pose_position_z = msg.pose.position.z lo_pb.pose_orientation_x = msg.pose.orientation.x lo_pb.pose_orientation_y = msg.pose.orientation.y lo_pb.pose_orientation_z = msg.pose.orientation.z lo_pb.pose_orientation_w = msg.pose.orientation.w lo_pb.pose_orientation_yaw = \ yaw_from_quaternions(msg.pose.orientation.w, msg.pose.orientation.x, msg.pose.orientation.y, msg.pose.orientation.z) lo_pb.velocity_linear = get_modulo(msg.pose.linear_velocity.x, msg.pose.linear_velocity.y, msg.pose.linear_velocity.z) lo_pb.velocity_angular = get_modulo(msg.pose.angular_velocity.x, msg.pose.angular_velocity.y, msg.pose.angular_velocity.z) lo_pb.acceleration_linear = get_modulo(msg.pose.linear_acceleration_vrf.x, msg.pose.linear_acceleration_vrf.y, msg.pose.linear_acceleration_vrf.z) lo_pb.acceleration_angular = get_modulo(msg.pose.angular_velocity_vrf.x, msg.pose.angular_velocity_vrf.y, msg.pose.angular_velocity_vrf.z) locs.append(lo_pb) ego_pose = Pose(msg.pose.orientation.w, msg.pose.orientation.x, msg.pose.orientation.y, msg.pose.orientation.z, msg.pose.position.x, msg.pose.position.y, msg.pose.position.z) elif topic == '/perception/perception_obstacles': if ego_pose is None or pose_has_nan(ego_pose): continue tr_pb = TrackedObjectFrame() tr_pb.timestamp = get_ts(msg.header.stamp.secs, msg.header.stamp.nsecs) tr_pb.stamp_secs = msg.header.stamp.secs tr_pb.stamp_nsecs = msg.header.stamp.nsecs obj_list = list() for object in msg.perception_obstacle: ob_pb = Object() ob_pb.id = object.id ob_pb.label = get_label(object.type, object.sub_type) ob_pb.pose_position_x = object.position.x ob_pb.pose_position_y = object.position.y ob_pb.pose_position_z = object.position.z ob_pb.pose_orientation_x = 0 ob_pb.pose_orientation_y = 0 ob_pb.pose_orientation_z = math.sin(object.theta / 2) ob_pb.pose_orientation_w = math.cos(object.theta / 2) ob_pb.pose_orientation_yaw = object.theta ob_pb.dimensions_x = object.length ob_pb.dimensions_y = object.width ob_pb.dimensions_z = object.height ob_pb.speed_vector_linear_x = object.velocity.x ob_pb.speed_vector_linear_y = object.velocity.y ob_pb.speed_vector_linear_z = object.velocity.z world_obj = np.transpose(np.array([[object.position.x, object.position.y, object.position.z, 1]])) world_ego_t = get_t(ego_pose.q0, ego_pose.q1, ego_pose.q2, ego_pose.q3, ego_pose.x, ego_pose.y, ego_pose.z) try: world_ego_invt = np.linalg.pinv(world_ego_t) except Exception as err: print('pinv failed:', world_ego_t) raise err vehicle_obj = world_ego_invt * world_obj ob_pb.relative_position_x = vehicle_obj[0] ob_pb.relative_position_y = vehicle_obj[1] ob_pb.relative_position_z = vehicle_obj[2] obj_list.append(ob_pb) tr_pb.objects.extend(obj_list) objs.append(tr_pb) print(f"lidar_num : {lidar_num}") print(f"image_num : {image_num}") print(f"radar_num : {radar_num}") folders = [] if len(routes) > 0: os.makedirs(os.path.join(output_dir, 'routing_routing_response_viz')) folders.append({'folder': 'routing_routing_response_viz', 'sensor_type': 'routing_path'}) route_out = RoutingFrames() route_out.routing_frame.extend(routes) with open(os.path.join(output_dir, 'routing_routing_response_viz', 'route.pb'), "wb") as c: c.write(route_out.SerializeToString()) if len(controls) > 0: os.makedirs(os.path.join(output_dir, 'holo_ControlCommand')) folders.append({'folder': 'holo_ControlCommand', 'sensor_type': 'control'}) ctl_cmd_pb_out = ControlCommand() ctl_cmd_pb_out.command_frame.extend(controls) with open(os.path.join(output_dir, 'holo_ControlCommand', 'control.pb'), "wb") as c: c.write(ctl_cmd_pb_out.SerializeToString()) if len(plans) > 0: os.makedirs(os.path.join(output_dir, 'planning_trajectory')) folders.append({'folder': 'planning_trajectory', 'sensor_type': 'planning_trajectory'}) plan_traj_pb_out = PlanTrajectory() plan_traj_pb_out.trajectory_info.extend(plans) with open(os.path.join(output_dir, 'planning_trajectory', 'planning.pb'), "wb") as p: p.write(plan_traj_pb_out.SerializeToString()) if len(preds) > 0: os.makedirs(os.path.join(output_dir, 'prediction_prediction_obstacles')) folders.append({'folder': 'prediction_prediction_obstacles', 'sensor_type': 'predicted_objects'}) pred_obstacles_pb_out = PredictionObstacles() pred_obstacles_pb_out.perception_obstacle.extend(preds) with open(os.path.join(output_dir, 'prediction_prediction_obstacles', 'predicted.pb'), "wb") as p: p.write(pred_obstacles_pb_out.SerializeToString()) if len(gnss) > 0: os.makedirs(os.path.join(output_dir, 'inspvax')) folders.append({'folder': 'inspvax', 'sensor_type': 'gnss'}) gn_pb_out = GnssPoints() gn_pb_out.gnss_points.extend(gnss) with open(os.path.join(output_dir, 'inspvax', 'gnss.pb'), "wb") as g: g.write(gn_pb_out.SerializeToString()) if len(vehs) > 0: os.makedirs(os.path.join(output_dir, 'holo_VehicleInfoMagotan')) folders.append({'folder': 'holo_VehicleInfoMagotan', 'sensor_type': 'vehicle'}) veh_pb_out = VehicleInfo() veh_pb_out.vehicle_info.extend(vehs) with open(os.path.join(output_dir, 'holo_VehicleInfoMagotan', 'vehicle.pb'), "wb") as v: v.write(veh_pb_out.SerializeToString()) if len(locs) > 0: os.makedirs(os.path.join(output_dir, 'localization_localization_info')) folders.append({'folder': 'localization_localization_info', 'sensor_type': 'ego_tf'}) lo_pb_out = LocalizationInfo() lo_pb_out.localization_info.extend(locs) with open(os.path.join(output_dir, 'localization_localization_info', 'ego_tf.pb'), "wb") as lo: lo.write(lo_pb_out.SerializeToString()) if len(objs) > 0: os.makedirs(os.path.join(output_dir, 'perception_perception_obstacles')) folders.append({'folder': 'perception_perception_obstacles', 'sensor_type': 'object_array_vision'}) tr_pb_out = TrackedObject() tr_pb_out.tracked_object.extend(objs) with open(os.path.join(output_dir, 'perception_perception_obstacles', 'object_array_vision.pb'), "wb") as tr: tr.write(tr_pb_out.SerializeToString()) if has_camera_74: folders.append({'folder': 'mdc_camera_instance_74', 'sensor_type': 'camera'}) if has_camera_73: folders.append({'folder': 'mdc_camera_instance_73', 'sensor_type': 'camera'}) if has_camera_72: folders.append({'folder': 'mdc_camera_instance_72', 'sensor_type': 'camera'}) if has_camera_71: folders.append({'folder': 'mdc_camera_instance_71', 'sensor_type': 'camera'}) if has_lidar_A4: if args.calibration_id: folders.append({'folder': 'innoPtClound_A4', 'sensor_type': 'lidar', 'calibration_item_id': args.calibration_id}) else: folders.append({'folder': 'innoPtClound_A4', 'sensor_type': 'lidar'}) if has_lidar_B2: if args.calibration_id: folders.append({'folder': 'innoPtClound_B2', 'sensor_type': 'lidar', 'calibration_item_id': args.calibration_id}) else: folders.append({'folder': 'innoPtClound_B2', 'sensor_type': 'lidar'}) if has_radar_0: folders.append({'folder': 'radar_track_array_0', 'sensor_type': 'radar'}) if has_radar_3: folders.append({'folder': 'radar_track_array_3', 'sensor_type': 'radar'}) collect_yaml = {'folders': folders} with open(os.path.join(output_dir, "opendata_to_platform.yaml"), 'w', encoding='utf-8') as collect_file: yaml.safe_dump(collect_yaml, collect_file) with open(os.path.join(os.getenv('output_dir'), '_SUCCESS'), 'w') as f: f.write("") os.system('chmod -R a+r ${output_dir}/*') if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('-i', '--input', help="input bag path", default=os.getenv('rosbag_path')) parser.add_argument('-o', '--output', default=os.getenv('output_dir'), help="result output directory, default to ./bags/") parser.add_argument('-ci', '--calibration_id', type=int) params = parser.parse_args() main(params)

import glob import json import os from datetime import datetime from docker_outlier import get_changed_firmwares from extract_config import floorconfig_db from config import LOG_DATA_PATH, PP_WORK_DIR now = datetime.now() os.makedirs(os.path.join(PP_WORK_DIR, "outlier_logs"), exist_ok=True, mode=0o775) class DetectFalseFinishes: FALSE_FINISH_THRESHOLD = 0.8 def __init__(self, firmware): self.firmware = firmware self.firmwarefolder = os.path.join(LOG_DATA_PATH, firmware) self.analysis_folder = os.path.join(LOG_DATA_PATH, firmware, "filtered_logs", "analyzed") self.image_folder = os.path.join(LOG_DATA_PATH, firmware, "filtered_logs", "images") self.json_logs = glob.glob(f"{self.analysis_folder}/*.json") self.reference_table = floorconfig_db() self.find_false_finishes_for_all_floors() def check_if_run_is_a_false_finish(self, run, floor): relevant_keys = [ "autonomous_dock", "total_cleaning_time", "Area_cleand", "docking_failed", "SerialNumber", "Software-Version", "Logfile", "tarball_location", "bump_count", "bump_count_heavy", "tilt_count", "img_path" ] run_data = {key: run[0][key] for key in relevant_keys} if run[0]["autonomous_dock"] is True: reference_area = self.reference_table[self.reference_table.floorplan.isin([floor])]["area"].values[0] run_data["false_finish_reference_area"] = float(reference_area) run_data["false_finish_reference_threshold"] = self.FALSE_FINISH_THRESHOLD logname = run[0]["Logfile"].split(os.sep)[-1] firmware = run[0]["Software-Version"].split("_")[0] complete_name_of_tarball = run[0]["tarball_location"].split(os.sep)[-1] complete_name_of_tarball = complete_name_of_tarball.split(".")[0] date = comp

import os import shutil from sklearn.model_selection import train_test_split import pandas as pd # 定义源目录路径 source_folder = 'C:/Users\Lenovo\Desktop\crcw不同端12k在0负载下\风扇端' subfolders = ["内圈故障", "球故障"] # 创建目标文件夹结构 target_folders = { 'train': '训练集', 'val': '验证集', 'test': '测试集' } os.makedirs('划分后的数据', exist_ok=True) for key, folder in target_folders.items(): for subfolder in subfolders: path = os.path.join('划分后的数据', folder, subfolder) os.makedirs(path, exist_ok=True) # 遍历每个子文件夹并读取CSV文件 all_data = [] for subfolder in subfolders: folder_path = os.path.join(source_folder, subfolder) files = [f for f in os.listdir(folder_path) if f.endswith('.csv')] # 将所有数据加载到列表中 for file in files: data = pd.read_csv(os.path.join(folder_path, file)) all_data.append((data, subfolder)) # 划分数据集 train_val, test = train_test_split(all_data, test_size=0.2, random_state=42) train, val = train_test_split(train_val, test_size=0.25, random_state=42) # 即最终比例为6:2:2 # 移动文件至对应的目标文件夹 def move_files(data_list, target_dir): for data, label in data_list: filename = f"{label}_{hash(str(data.iloc[0].values))}.csv" # 确保唯一性 save_path = os.path.join(target_folders[target_dir], label, filename) data.to_csv(save_path, index=False) move_files(train, 'train') move_files(val, 'val') move_files(test, 'test') print("数据集已成功划分为训练集、验证集和测试集!")可以分析一下这段代码吗

# by CSDN 迪菲赫尔曼 import os import random import shutil def copy_files(src_dir, dst_dir, filenames, extension): os.makedirs(dst_dir, exist_ok=True) missing_files = 0 for filename in filenames: src_path = os.path.join(src_dir, filename + extension) dst_path = os.path.join(dst_dir, filename + extension) # Check if the file exists before copying if os.path.exists(src_path): shutil.copy(src_path, dst_path) else: print(f"Warning: File not found for {filename}") missing_files += 1 return missing_files def split_and_copy_dataset(image_dir, label_dir, output_dir, train_ratio=0.7, valid_ratio=0.15, test_ratio=0.15): # 获取所有图像文件的文件名(不包括文件扩展名) image_filenames = [os.path.splitext(f)[0] for f in os.listdir(image_dir)] # 随机打乱文件名列表 random.shuffle(image_filenames) # 计算训练集、验证集和测试集的数量 total_count = len(image_filenames) train_count = int(total_count * train_ratio) valid_count = int(total_count * valid_ratio) test_count = total_count - train_count - valid_count # 定义输出文件夹路径 train_image_dir = os.path.join(output_dir, 'train', 'images') train_label_dir = os.path.join(output_dir, 'train', 'labels') valid_image_dir = os.path.join(output_dir, 'valid', 'images') valid_label_dir = os.path.join(output_dir, 'valid', 'labels') test_image_dir = os.path.join(output_dir, 'test', 'images') test_label_dir = os.path.join(output_dir, 'test', 'labels') # 复制图像和标签文件到对应的文件夹 train_missing_files = copy_files(image_dir, train_image_dir, image_filenames[:train_count], '.jpg') train_missing_files += copy_files(label_dir, train_label_dir, image_filenames[:train_count], '.txt') valid_missing_files = copy_files(image_dir, valid_image_dir, image_filenames[train_count:train_count + valid_count], '.jpg') valid_missing_files += copy_files(label_dir, valid_label_dir, image_filenames[train_count:train_count + valid_count], '.txt')

最新推荐

recommend-type

零点GZDSP 4.80A-PRO电脑DSP调音软件下载

零点GZDSP 4.80A-PRO电脑DSP调音软件下载
recommend-type

云计算-软件开发云PPT课件(1).pptx

云计算-软件开发云PPT课件(1).pptx
recommend-type

C++实现的DecompressLibrary库解压缩GZ文件

根据提供的文件信息,我们可以深入探讨C++语言中关于解压缩库(Decompress Library)的使用,特别是针对.gz文件格式的解压过程。这里的“lib”通常指的是库(Library),是软件开发中用于提供特定功能的代码集合。在本例中,我们关注的库是用于处理.gz文件压缩包的解压库。 首先,我们要明确一个概念:.gz文件是一种基于GNU zip压缩算法的压缩文件格式,广泛用于Unix、Linux等操作系统上,对文件进行压缩以节省存储空间或网络传输时间。要解压.gz文件,开发者需要使用到支持gzip格式的解压缩库。 在C++中,处理.gz文件通常依赖于第三方库,如zlib或者Boost.IoStreams。codeproject.com是一个提供编程资源和示例代码的网站,程序员可以在该网站上找到现成的C++解压lib代码,来实现.gz文件的解压功能。 解压库(Decompress Library)提供的主要功能是读取.gz文件,执行解压缩算法,并将解压缩后的数据写入到指定的输出位置。在使用这些库时,我们通常需要链接相应的库文件,这样编译器在编译程序时能够找到并使用这些库中定义好的函数和类。 下面是使用C++解压.gz文件时,可能涉及的关键知识点: 1. Zlib库 - zlib是一个用于数据压缩的软件库,提供了许多用于压缩和解压缩数据的函数。 - zlib库支持.gz文件格式,并且在多数Linux发行版中都预装了zlib库。 - 在C++中使用zlib库,需要包含zlib.h头文件,同时链接z库文件。 2. Boost.IoStreams - Boost是一个提供大量可复用C++库的组织,其中的Boost.IoStreams库提供了对.gz文件的压缩和解压缩支持。 - Boost库的使用需要下载Boost源码包,配置好编译环境,并在编译时链接相应的Boost库。 3. C++ I/O操作 - 解压.gz文件需要使用C++的I/O流操作,比如使用ifstream读取.gz文件,使用ofstream输出解压后的文件。 - 对于流操作,我们常用的是std::ifstream和std::ofstream类。 4. 错误处理 - 解压缩过程中可能会遇到各种问题,如文件损坏、磁盘空间不足等,因此进行适当的错误处理是必不可少的。 - 正确地捕获异常,并提供清晰的错误信息,对于调试和用户反馈都非常重要。 5. 代码示例 - 从codeproject找到的C++解压lib很可能包含一个或多个源代码文件,这些文件会包含解压.gz文件所需的函数或类。 - 示例代码可能会展示如何初始化库、如何打开.gz文件、如何读取并处理压缩数据,以及如何释放资源等。 6. 库文件的链接 - 编译使用解压库的程序时,需要指定链接到的库文件,这在不同的编译器和操作系统中可能略有不同。 - 通常,在编译命令中加入-l参数,比如使用g++的话可能是`g++ -o DecompressLibrary DecompressLibrary.cpp -lz`,其中`-lz`表示链接zlib库。 7. 平台兼容性 - 在不同平台上使用解压库可能需要考虑平台兼容性问题。 - Windows系统可能需要额外的配置和库文件,因为zlib或其他库可能不是默认预装的。 根据以上知识点,我们可以得出,在C++中解压.gz文件主要涉及到对zlib或类似库的使用,以及熟悉C++的I/O操作。正确使用这些库,能够有效地对压缩文件进行解压,并处理可能出现的错误情况。如果从codeproject获取到的C++解压lib确实是针对.gz文件格式的,那么它很可能已经封装好了大部分的操作细节,让开发者能够以更简单的方式实现解压功能。
recommend-type

【数据融合技术】:甘肃土壤类型空间分析中的专业性应用

# 摘要 数据融合技术作为一种集成多源数据信息的方法,在土壤类型空间分析中发挥着关键作用。本文介绍了数据融合技术的基本概念及其理论基础,阐述了数据预处理、同步整合及冲突解决等关键技术,并详细描述了甘肃土壤类型数据准备的流程,包括数据采集、质量评估、空间化处理及融合实践准备。通过具体案例分析,展示了数据融合在土壤类型空间分布分析、土壤质量评估及土壤保护规划中的应用。同时,文
recommend-type

VM ware如何查看软件版本信息

<think>首先,用户的问题是:“我想查找VMware软件版本信息的查看方法请问如何在VMware中查看软件版本信息”。用户参考了之前的引用,但那些引用主要是关于查看VMware版本的一般方法,现在用户具体想知道在VMware软件中如何查看版本信息。回顾之前的对话:-用户第一次问的是VSCode的版本查看方法。-现在用户问的是VMware的版本查看方法。-系统级指令要求使用中文回答,正确格式化数学表达式(如果需要),但这里可能不需要数学表达式。-指令还要求生成相关问题,并在回答中引用段落时添加引用标识。用户提供的引用[1]到[5]是关于VMware版本的查看方法、下载等,但用户特别强调“参考
recommend-type

数据库课程设计报告:常用数据库综述

数据库是现代信息管理的基础,其技术广泛应用于各个领域。在高等教育中,数据库课程设计是一个重要环节,它不仅是学习理论知识的实践,也是培养学生综合运用数据库技术解决问题能力的平台。本知识点将围绕“经典数据库课程设计报告”展开,详细阐述数据库的基本概念、课程设计的目的和内容,以及在设计报告中常用的数据库技术。 ### 1. 数据库基本概念 #### 1.1 数据库定义 数据库(Database)是存储在计算机存储设备中的数据集合,这些数据集合是经过组织的、可共享的,并且可以被多个应用程序或用户共享访问。数据库管理系统(DBMS)提供了数据的定义、创建、维护和控制功能。 #### 1.2 数据库类型 数据库按照数据模型可以分为关系型数据库(如MySQL、Oracle)、层次型数据库、网状型数据库、面向对象型数据库等。其中,关系型数据库因其简单性和强大的操作能力而广泛使用。 #### 1.3 数据库特性 数据库具备安全性、完整性、一致性和可靠性等重要特性。安全性指的是防止数据被未授权访问和破坏。完整性指的是数据和数据库的结构必须符合既定规则。一致性保证了事务的执行使数据库从一个一致性状态转换到另一个一致性状态。可靠性则保证了系统发生故障时数据不会丢失。 ### 2. 课程设计目的 #### 2.1 理论与实践结合 数据库课程设计旨在将学生在课堂上学习的数据库理论知识与实际操作相结合,通过完成具体的数据库设计任务,加深对数据库知识的理解。 #### 2.2 培养实践能力 通过课程设计,学生能够提升分析问题、设计解决方案以及使用数据库技术实现这些方案的能力。这包括需求分析、概念设计、逻辑设计、物理设计、数据库实现、测试和维护等整个数据库开发周期。 ### 3. 课程设计内容 #### 3.1 需求分析 在设计报告的开始,需要对项目的目标和需求进行深入分析。这涉及到确定数据存储需求、数据处理需求、数据安全和隐私保护要求等。 #### 3.2 概念设计 概念设计阶段要制定出数据库的E-R模型(实体-关系模型),明确实体之间的关系。E-R模型的目的是确定数据库结构并形成数据库的全局视图。 #### 3.3 逻辑设计 基于概念设计,逻辑设计阶段将E-R模型转换成特定数据库系统的逻辑结构,通常是关系型数据库的表结构。在此阶段,设计者需要确定各个表的属性、数据类型、主键、外键以及索引等。 #### 3.4 物理设计 在物理设计阶段,针对特定的数据库系统,设计者需确定数据的存储方式、索引的具体实现方法、存储过程、触发器等数据库对象的创建。 #### 3.5 数据库实现 根据物理设计,实际创建数据库、表、视图、索引、触发器和存储过程等。同时,还需要编写用于数据录入、查询、更新和删除的SQL语句。 #### 3.6 测试与维护 设计完成之后,需要对数据库进行测试,确保其满足需求分析阶段确定的各项要求。测试过程包括单元测试、集成测试和系统测试。测试无误后,数据库还需要进行持续的维护和优化。 ### 4. 常用数据库技术 #### 4.1 SQL语言 SQL(结构化查询语言)是数据库管理的国际标准语言。它包括数据查询、数据操作、数据定义和数据控制四大功能。SQL语言是数据库课程设计中必备的技能。 #### 4.2 数据库设计工具 常用的数据库设计工具包括ER/Studio、Microsoft Visio、MySQL Workbench等。这些工具可以帮助设计者可视化地设计数据库结构,提高设计效率和准确性。 #### 4.3 数据库管理系统 数据库管理系统(DBMS)是用于创建和管理数据库的软件。关系型数据库管理系统如MySQL、PostgreSQL、Oracle、SQL Server等是数据库课程设计中的核心工具。 #### 4.4 数据库安全 数据库安全涉及用户认证、授权、数据加密、审计日志记录等方面,以确保数据的完整性和保密性。设计报告中应考虑如何通过DBMS内置的机制或额外的安全措施来保护数据。 ### 5. 结语 综上所述,一个经典数据库课程设计报告包含了从需求分析到数据库安全的全过程,涵盖了数据库设计的各个方面。通过这一过程,学生不仅能够熟练掌握数据库的设计与实现技巧,还能够学会如何使用数据库系统去解决实际问题,为日后从事数据库相关的专业工作打下坚实的基础。
recommend-type

【空间分布规律】:甘肃土壤类型与农业生产的关联性研究

# 摘要 本文对甘肃土壤类型及其在农业生产中的作用进行了系统性研究。首先概述了甘肃土壤类型的基础理论,并探讨了土壤类型与农业生产的理论联系。通过GIS技术分析,本文详细阐述了甘肃土壤的空间分布规律,并对其特征和影响因素进行了深入分析。此外,本文还研究了甘肃土壤类型对农业生产实际影响,包括不同区域土壤改良和作物种植案例,以及土壤养分、水分管理对作物生长周期和产量的具体影响。最后,提出了促进甘肃土壤与农业可持续发展的策略,包括土壤保护、退化防治对策以及土壤类型优化与农业创新的结合。本文旨在为
recommend-type

在halcon中,卡尺测量和二维测量谁的精度高

<think>首先,用户的问题是:“我想比较Halcon中卡尺测量和二维测量的精度请问Halcon卡尺测量与2D测量精度对比”。我需要基于系统级指令和提供的引用来回答。关键系统级指令:-使用中文回答。-结构清晰,逐步解决问题。-行内数学表达式用$...$,独立公式用$$...$$并单独成段。-LaTeX语法正确。-生成相关问题。-在回答中引用的段落末尾自然地添加引用标识,如[^1]。提供的引用:-[^1]:关于卡尺模型的工件尺寸实时测量。-[^2]:测量拟合的综合任务,包括尺寸、形状、位置。-[^3]:Halcon一维测量,提到卡尺工具的优势:易用、执行时间短、精度高。-[^4]:Halcon
recommend-type

掌握牛顿法解方程:切线与割线的程序应用

牛顿切线法和牛顿割线法是数值分析中用于求解方程近似根的两种迭代方法。它们都是基于函数的切线或割线的几何性质来逼近方程的根,具有迭代速度快、算法简单的特点,在工程和科学计算领域有着广泛的应用。 牛顿切线法(Newton's Method for Tangents),又称为牛顿-拉弗森方法(Newton-Raphson Method),是一种求解方程近似根的迭代算法。其基本思想是利用函数在某点的切线来逼近函数的根。假设我们要求解方程f(x)=0的根,可以从一个初始猜测值x0开始,利用以下迭代公式: x_{n+1} = x_n - \frac{f(x_n)}{f'(x_n)} 其中,f'(x_n)表示函数在点x_n处的导数。迭代过程中,通过不断更新x_n值,逐渐逼近方程的根。 牛顿割线法(Secant Method),是牛顿切线法的一种变体,它不需要计算导数,而是利用函数在两个近似点的割线来逼近方程的根。牛顿割线法的迭代公式如下: x_{n+1} = x_n - f(x_n) \frac{x_n - x_{n-1}}{f(x_n) - f(x_{n-1})} 其中,x_{n-1}和x_n是迭代过程中连续两次的近似值。牛顿割线法相比牛顿切线法,其优点在于不需要计算函数的导数,但通常收敛速度会比牛顿切线法慢一些。 在实际应用中,这两种方法都需要注意迭代的起始点选择,否则可能会导致迭代过程不收敛。同时,这两种方法都是局部收敛方法,即它们只能保证在初始点附近有足够的近似根时才收敛。 关于例题和程序,牛顿切线法和牛顿割线法都可以通过编程实现。通常在编程实现时,需要输入函数的表达式、初始猜测值、迭代次数限制以及误差容忍度等参数。程序会根据这些输入,通过循环迭代计算,直到满足误差容忍度或达到迭代次数限制为止。 在编程实现过程中,需要注意以下几点: 1. 初始猜测值的选择对迭代的收敛性有较大影响,需要根据具体问题来合理选择。 2. 当迭代过程中遇到函数值或导数值过大、过小,或者分母趋近于零时,需要进行适当的数值处理,以避免数值不稳定或除以零的错误。 3. 对于不同的方程和函数,可能需要选择不同的迭代终止条件,如设定一个误差值或迭代次数上限。 牛顿法(包含牛顿切线法和牛顿割线法)是一类非常强大的数值解法,尤其适用于求解非线性方程,其基本原理和程序实现的知识点在理工科的许多领域都有着广泛的应用,是数值分析领域的一个基石。 请注意,本知识点仅涵盖标题和描述中提到的内容,压缩包子文件列表中的信息并未提供,因此无法提供相关内容的知识点。
recommend-type

【制图技术】:甘肃高质量土壤分布TIF图件的成图策略

# 摘要 本文针对甘肃土壤分布数据的TIF图件制作进行了系统研究。首先概述了甘肃土壤的分布情况,接着介绍了TIF图件的基础知识,包括其格式特点、空间数据表达以及质量控制方法。随后,文中构建了成图策略的理论框架,分析了土壤分布图的信息需求与数据处理流程,并探讨了成图原则与标准。在实践操作部分,详细阐述了制图软