活动介绍

定义函数,接受一个开始日期和总天数作为参数,计算这段时间内所以工作日的总时长(1-5),一天8小时

时间: 2025-06-13 07:23:32 浏览: 21
<think>好的,我现在需要解决一个问题:定义一个函数,接受开始日期和总天数作为参数,计算这段时间内所有工作日的总时长,工作日是周一到周五,每天8小时。我得仔细想清楚怎么一步步实现这个功能。 首先,我需要明确输入是什么。输入是开始日期和总天数。例如,假设开始日期是2023-10-01,总天数是10天。那么我需要计算从这天开始的10天内,有多少个工作日,然后乘以8小时得到总时长。 接下来,我得考虑如何处理日期。可能需要使用Python的datetime模块来处理日期计算,包括日期的加减和判断星期几。比如,datetime模块中的weekday()方法返回0(周一)到6(周日),或者isoweekday()返回1(周一)到7(周日),这样就能判断某天是否是工作日。 然后,我需要遍历从开始日期开始的每一天,直到总天数为止。例如,开始日期加上n天,n从0到总天数-1。对于每个日期,检查是否是周一到周五中的一天,如果是,就算作一个工作日。 不过,总天数是指自然日还是工作日呢?题目里说的是“总天数”,应该是指自然日,比如开始日期之后的连续N天,包括周末。例如,如果总天数是7天,那么可能包含5个工作日和2个周末。所以需要计算在开始日期后的自然日总天数里,有多少个工作日。 接下来,函数的大致步骤可能是这样的: 1. 将输入的起始日期字符串转换为datetime对象,比如用户可能传入"2023-10-01"这样的字符串,需要先处理成日期对象。 2. 初始化一个计数器,记录工作日的数量。 3. 循环从0到总天数-1的每一天: a. 计算当前日期是开始日期加上第i天。 b. 判断该日期是否是工作日(周一到周五)。 c. 如果是,计数器加1。 4. 最后,总时长是计数器乘以8小时。 需要注意的问题包括: - 日期加法是否正确,比如是否包括开始日期当天作为第一天。例如,如果总天数是1,那么就是开始日期那一天。所以循环次数应该是总天数次,从第0天到总天数-1天。 - 日期格式的处理是否正确,比如用户输入的字符串可能需要转换为datetime对象。 - 是否考虑节假日?题目中没有提到节假日,只按周一到周五计算,所以不需要考虑,只需排除周末。 现在,具体的Python代码结构可能如下: 使用datetime模块,可能需要导入datetime和timedelta。 函数定义: def calculate_working_hours(start_date_str, total_days): from datetime import datetime, timedelta # 转换开始日期 start_date = datetime.strptime(start_date_str, "%Y-%m-%d") working_days = 0 for day in range(total_days): current_date = start_date + timedelta(days=day) if current_date.weekday() < 5: # 0是周一,4是周五,5和6是周末 working_days += 1 total_hours = working_days * 8 return total_hours 这里需要注意,weekday()方法返回的是0(周一)到4(周五),所以判断是否小于5即可。或者用isoweekday(),返回1-5为工作日,对应的判断条件就是<=5吗?不,isoweekday()的周一到周五是1到5,周六是6,周日是7。所以如果用isoweekday()的话,判断是否<=5。但原代码中使用的是weekday(),其中周一到周日对应0到4,5和6是周六和周日。所以原代码的判断是正确的。 测试一下例子:比如开始日期是周一,总天数7天(一周),那么工作日是5天,总时长40小时。如果开始日期是周六,总天数2天,那么这两天都是周六和周日,总时长0。 可能的错误情况:比如总天数为0或者负数,这时候应该处理,但题目可能假设参数是合理的正整数。 所以,这样的函数应该可以正确计算工作日的总时长。</think>要定义一个函数,计算从指定开始日期起若干自然日中的工作日总时长(周一至周五,每天8小时),可按照以下步骤实现: --- ### **步骤分析** 1. **输入处理** 将开始日期字符串(如`"2023-10-01"`)转换为日期对象,方便后续计算。 2. **遍历每一天** 从开始日期依次遍历之后的每一天,直到覆盖全部自然日数。 3. **判断工作日** 检查当前日期是否为周一至周五。Python中可通过: - `weekday()` 方法:返回`0`(周一)到`4`(周五),`5`(周六)和`6`(周日)。 - `isoweekday()` 方法:返回`1`(周一)到`5`(周五),`6`(周六)和`7`(周日)。 4. **统计工作日并计算总时长** 每遇到一个工作日,计数器加1,最终乘以8小时。 --- ### **代码实现** ```python from datetime import datetime, timedelta def calculate_working_hours(start_date_str, total_days): # 将字符串转换为日期对象 start_date = datetime.strptime(start_date_str, "%Y-%m-%d") working_days = 0 # 遍历每一天 for day_offset in range(total_days): current_date = start_date + timedelta(days=day_offset) # 判断是否为工作日(周一至周五) if current_date.weekday() < 5: working_days += 1 # 计算总时长 total_hours = working_days * 8 return total_hours ``` --- ### **使用示例** ```python # 示例:从2023-10-02(周一)开始,计算7天的工作时长 print(calculate_working_hours("2023-10-02", 7)) # 输出:5天 × 8小时 = 40 # 示例:从2023-10-07(周六)开始,计算3天的工作时长 print(calculate_working_hours("2023-10-07", 3)) # 输出:周一(第3天) → 8小时 ``` --- ### **关键点说明** 1. **日期转换** 使用`datetime.strptime()`解析日期字符串,需确保输入格式匹配(如`"%Y-%m-%d"`)。 2. **遍历逻辑** `range(total_days)`生成从`0`到`total_days-1`的偏移量,逐个计算日期。 3. **工作日判断** `weekday() < 5`确保只统计周一至周五。 4. **忽略节假日** 题目未要求处理法定节假日,因此仅排除周末。 --- ### **扩展性** 若需支持节假日,可额外传入一个节假日列表,并在遍历时检查当前日期是否在列表中。例如: ```python def calculate_working_hours_v2(start_date_str, total_days, holidays=[]): # ...(同上) for day_offset in range(total_days): current_date = start_date + timedelta(days=day_offset) if current_date.weekday() < 5 and current_date not in holidays: working_days += 1 # ... ```
阅读全文

相关推荐

import pandas as pd import logging from geopy.distance import geodesic import time from contextlib import contextmanager import numpy as np from sqlalchemy import create_engine, text from tenacity import retry, stop_after_attempt, wait_exponential from datetime import datetime # -------------------------- # 通用配置 # -------------------------- DB_CONFIG = { "conn_str": "mysql+pymysql://root:[email protected]:3306/test?charset=utf8", "pool_size": 10, "pool_recycle": 300 } member_list_path = r"D:\OneDrive\ERIC\维护\平台数据\代维信息\2025-07人员信息快照_20250708.xlsx" member_list_columns = ['登陆账号', '所属组织', '所属地市', '在职状态'] TIME_RANGE = { "start": '2025-07-01 00:00:00', "end": '2025-07-08 23:59:59' } # 定义日期格式转换器 def format_time_range(time_range): # 解析开始时间和结束时间 start_date = datetime.strptime(time_range["start"], '%Y-%m-%d %H:%M:%S') end_date = datetime.strptime(time_range["end"], '%Y-%m-%d %H:%M:%S') # 格式化为YYYYMMDD形式并拼接 return f"{start_date.strftime('%Y%m%d')}~{end_date.strftime('%Y%m%d')}" # 获取格式化后的时间范围 source_time = format_time_range(TIME_RANGE) FILE_PATHS = { "cran_info": r'D:\OneDrive\ERIC\维护\test\CRAN机房信息.csv', "output": fr'D:\OneDrive\ERIC\维护\工单\现场执行工单\现场执行工单_{source_time}.xlsx' } TABLE_CONFIG = { '工单_保障': {'columns': ["工单当前状态", "工单号", "工单分类", "维护分类"]}, '工单_巡检': {'columns': [ "工单当前状态", "工单号", "资源ID", "资源名称", "资源经度", "资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '工单_拆站': {'columns': [ "工单当前状态", "工单号", "资源cid AS 资源ID", "资源名称", "资源经度", "资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '工单_验收': {'columns': [ "工单当前状态", "工单号", "资源cid AS 资源ID", "资源名称", "资源经度", "资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '工单_发电': {'columns': [ "工单当前状态", "工单号", "站点ID AS 资源ID", "站点名称 AS 资源名称", "站点经度 AS 资源经度", "站点纬度 AS 资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '工单_通用': {'columns': [ "工单当前状态", "工单号", "资源名称", "资源经度", "资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '故障工单': {'columns': [ "工单状态 AS 工单当前状态", "工单编号 AS 工单号", "STATION_NAME AS 资源名称", "LONGITUDE AS 资源经度", "LATITUDE AS 资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类", "故障处理时长_小时", "是否延期有效", "是否及时处理", "高频故障", "是否超长退服", "网元分类" ], 'where_column': '工单编号'} } DIMENSIONS = [ {'name': '执行人', 'keys': ['执行人']}, {'name': '所属组织', 'keys': ['所属组织']}, {'name': '地市', 'keys': ['地市']}, {'name': '代维简称', 'keys': ['代维简称']}, {'name': '地市代维', 'keys': ['地市', '代维简称']} ] # -------------------------- # 工具函数 # -------------------------- @contextmanager def time_monitor(step_name): """耗时监控上下文管理器""" start_time = time.time() try: yield finally: print(f"{step_name} 耗时: {time.time() - start_time:.4f}秒") def setup_logging(): """日志配置""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("data_processor_v2.log"), logging.StreamHandler() ] ) return logging.getLogger(__name__) logger = setup_logging() @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def safe_read_sql(sql: text, engine, params: dict = None) -> pd.DataFrame: """带重试机制的SQL读取""" logger.debug(f"执行SQL: {sql} \n参数: {params}") return pd.read_sql(sql, engine, params=params) def init_db_engine(): """初始化数据库引擎""" return create_engine( DB_CONFIG["conn_str"], pool_size=DB_CONFIG["pool_size"], pool_recycle=DB_CONFIG["pool_recycle"] ) # -------------------------- # 数据校验函数 # -------------------------- def is_valid_coordinates(lat, lon): """经纬度有效性校验""" if pd.isna(lat) or pd.isna(lon): return False if not (-90 <= lat <= 90 and -180 <= lon <= 180): return False if abs(lat) < 1e-6 and abs(lon) < 1e-6: return False return True def calculate_distance(coord1, coord2): """带校验的距离计算""" try: lat1, lon1 = coord1 lat2, lon2 = coord2 except (TypeError, ValueError) as e: logger.debug(f"坐标解析失败: {str(e)} | coord1={coord1} coord2={coord2}") return 99999.0 if not (is_valid_coordinates(lat1, lon1) and is_valid_coordinates(lat2, lon2)): return 99999.0 return geodesic(coord1, coord2).meters # -------------------------- # 数据加载模块 # -------------------------- def load_site_orders(engine): """加载现场执行工单数据""" sql = text(""" SELECT 地市, 代维简称, 专业, 工单类型, 工单编号, 执行人, 所属组织, 出发时间, 出发经度, 出发纬度, 签到时间, 签到经度, 签到纬度, 完成时间, 完成经度, 完成纬度, 路途时长, 实际工作时长 FROM 现场执行工单 WHERE 出发时间 BETWEEN :start_time AND :end_time """) params = { 'start_time': TIME_RANGE["start"], 'end_time': TIME_RANGE["end"] } df = safe_read_sql(sql, engine, params) df = df[~df['所属组织'].str.contains('高铁')] df['代维简称'] = df['代维简称'].replace('中移铁通', '铁通') valid_companies = ['中贝', '中通服', '中邮建', '唐人', '宜通', '怡创', '浙通服', '润建', '虹信', '超讯', '铁通', '长实'] df = df[df['代维简称'].isin(valid_companies)] df['地市代维'] = df['地市'].str.split("-").str[0] + '-' + df['代维简称'] logger.info(f"加载现场工单记录数: {len(df)}") return df def batch_load_order_data(engine, order_numbers, batch_size=500): """批量加载工单相关数据""" dfs = [] for table, config in TABLE_CONFIG.items(): try: columns = ", ".join(config['columns']) where_col = config.get('where_column', '工单号') # 修复:正确构造SQL查询字符串,确保引号闭合 sql_template = text(f"SELECT {columns} FROM {table} WHERE {where_col} IN :order_nums") table_dfs = [] for i in range(0, len(order_numbers), batch_size): batch = order_numbers[i:i + batch_size] df = safe_read_sql(sql_template, engine, params={'order_nums': batch}) table_dfs.append(df) if table_dfs: table_df = pd.concat(table_dfs, ignore_index=True) dfs.append(table_df) logger.info(f"表 {table} 加载完成,记录数: {len(table_df)}") except Exception as e: logger.error(f"加载表 {table} 失败: {str(e)}", exc_info=True) return dfs # -------------------------- # 数据处理模块 # -------------------------- def merge_order_data(site_df, order_dfs): """合并工单数据(增强版)""" if not order_dfs: logger.warning("没有需要合并的工单数据") return pd.DataFrame() final_df = pd.concat(order_dfs, axis=0, ignore_index=True) final_df = final_df.drop_duplicates(subset=['工单号'], keep='first') merged_df = pd.merge( left=site_df, right=final_df, left_on='工单编号', right_on='工单号', how='left', ) merged_df.drop(columns=['工单号'], inplace=True) merged_df['路途时长(分钟)'] = (merged_df['路途时长'] * 60).round(2) merged_df['实际工作时长(分钟)'] = (merged_df['实际工作时长'] * 60).round(2) # 过滤有效状态 condition = ( merged_df['工单当前状态'].notna() & (merged_df['工单当前状态'] != '') & ~merged_df['工单当前状态'].str.contains('已驳回|已撤销|已关闭', na=False) ) return merged_df[condition] def get_custom_natural_week(date): """ 计算“自然周”编号。本定义中,一周从上周三开始,到本周二结束。 """ if pd.isnull(date): return np.nan weekday = date.weekday() days_since_last_thursday = (weekday + 5) % 7 date = date - pd.Timedelta(days=days_since_last_thursday) return date.isocalendar()[1] # -------------------------- # 核心处理模块 # -------------------------- def generate_time_features(df): """增强时间特征生成""" df['出发时间'] = pd.to_datetime(df['出发时间'], errors='coerce') df['自然年'] = df['出发时间'].dt.year df['自然月'] = df['出发时间'].dt.to_period('M').astype(str) df['自然周'] = df['出发时间'].apply(get_custom_natural_week) df['出发日期'] = df['出发时间'].dt.date return df def process_coordinates(df): """坐标处理""" coord_columns = [ "资源纬度", "资源经度", "签到纬度", "签到经度", "LATITUDE", "LONGITUDE", "完成纬度", "完成经度" ] for col in coord_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') else: logger.warning(f"列 {col} 不存在") fault_columns = ['是否超长退服', '高频故障', '是否延期有效', '是否及时处理'] for col in fault_columns: if col not in df.columns: df[col] = '否' return df def calculate_distances(df): """距离计算""" df["签到距离"] = df.apply( lambda row: calculate_distance( (row["签到纬度"], row["签到经度"]), (row["资源纬度"], row["资源经度"]) ), axis=1 ) return df def merge_cran_data(main_df, cran_path): """合并CRAN机房数据并取最近RU距离""" cran_df = pd.read_csv(cran_path) for col in ["ru纬度", "ru经度"]: cran_df[col] = pd.to_numeric(cran_df[col], errors="coerce").fillna(0) merged = pd.merge( main_df[["工单编号", "STATION_NAME", "签到经度", "签到纬度"]], cran_df, left_on="STATION_NAME", right_on="station_name", how="left" ) merged["签到距离_CRAN"] = merged.apply( lambda row: calculate_distance( (row["签到纬度"], row["签到经度"]), (row["ru纬度"], row["ru经度"]) ), axis=1 ) min_distances = merged.groupby("工单编号", as_index=False)["签到距离_CRAN"].min() return pd.merge( main_df, min_distances, on="工单编号", how="left" ).fillna({"签到距离_CRAN": 99999.0}) # -------------------------- # 统计计算模块 # -------------------------- def calculate_workload(df): """工作量评定""" df['工作量评定'] = '有效' df['原因'] = '' no_completion = df['完成时间'].isna() df.loc[no_completion, ['工作量评定', '原因']] = ['无效', '无完成时间'] short_work = df['实际工作时长(分钟)'] < 5 df.loc[short_work, ['工作量评定', '原因']] = ['无效', '工作时长过短'] if all(col in df.columns for col in ['签到距离', '签到距离_CRAN', '维护分类']): invalid_dist = ( ~df['维护分类'].isin(['发电', '保障']) & (df['签到距离'] > 300) & (df['签到距离_CRAN'] > 300) ) df.loc[invalid_dist, ['工作量评定', '原因']] = ['无效', '签到距离过大'] return df def _calculate_base_stats(df, group_keys): """通用基础指标计算""" categories = df['维护分类'].dropna().unique() agg_dict = { # 总工单数(已去重) "总工单数": ('工单编号', 'nunique'), # 有效工单数(去重) "有效工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['工作量评定'] == "有效"].nunique()), # 故障工单数(去重):统计工单分类为"故障工单"的工单编号唯一数 "故障工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['工单分类'] == "故障工单"].nunique()), # 故障及时数(去重):统计同时满足工单分类为故障工单且是否及时处理为"是"的工单编号唯一数 "故障及时数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][ (df['工单分类'] == "故障工单") & (df['是否及时处理'] == "是") ].nunique()), # 故障超时数(去重):统计同时满足工单分类为故障工单且是否及时处理为"否"的工单编号唯一数 "故障超时数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][ (df['工单分类'] == "故障工单") & (df['是否及时处理'] == "否") ].nunique()), # 超长退服工单数(去重):统计是否超长退服为"是"的工单编号唯一数 "超长退服工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['是否超长退服'] == "是"].nunique()), # 超频故障工单数(去重):统计高频故障为"是"的工单编号唯一数 "超频故障工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['高频故障'] == "是"].nunique()), # 延期工单数(去重):统计是否延期有效为"是"的工单编号唯一数 "延期工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['是否延期有效'] == "是"].nunique()), # 其他非去重指标保持不变 "故障处理时长": ('故障处理时长_小时', lambda x: x[df['工单分类'] == "故障工单"].sum()), "路途总时长": ('路途时长(分钟)', 'sum'), "工作总时长": ('实际工作时长(分钟)', 'sum'), "出勤人数": ('执行人', 'nunique'), "出勤人天": ('工单编号', lambda x: df.loc[x.index][['执行人', '出发日期']].drop_duplicates().shape[0]), } for cat in categories: agg_dict[f"{cat}工单数"] = ( '维护分类', lambda x, current_cat=cat: x.eq(current_cat).sum() ) stats = df.groupby(group_keys).agg(**agg_dict).reset_index() return stats def _add_derived_metrics(stats, time_level): """通用衍生指标计算""" stats['出勤总时长'] = stats['路途总时长'] + stats['工作总时长'] hour_cols = ['出勤总时长', '路途总时长', '工作总时长'] for col in hour_cols: stats[f'{col}(h)'] = (stats[col] / 60).round(2) stats['平均工单工作时长(h)'] = ( stats['工作总时长(h)'] / stats['总工单数'] ).replace([np.inf, -np.inf], 0).fillna(0).round(2) stats['平均工单路途时长(h)'] = ( stats['路途总时长(h)'] / stats['总工单数'] ).replace([np.inf, -np.inf], 0).fillna(0).round(2) if time_level == 'weekly': stats['人均能效_周'] = (stats['有效工单数'] / stats['出勤人天']).round(2) elif time_level == 'monthly': stats['人均能效_月'] = (stats['有效工单数'] / stats['出勤人天']).round(2) stats['故障处理及时率'] = (stats['故障及时数'] / stats['故障工单数']).replace(np.inf, 0) stats['工作时长占比'] = (stats['工作总时长'] / stats['出勤总时长']).replace(np.inf, 0) stats['人均出勤天数'] = (stats['出勤人天'] / stats['出勤人数']).replace(np.inf, 0).round(2) stats['日均出勤人数'] = (stats['出勤人天'] / stats['出勤天数']).replace(np.inf, 0).round(2) return stats def calculate_attendance_metrics(df, denominator): """计算出勤相关指标""" df = df.copy() df['出勤总时长(h)'] = (df['路途时长(分钟)'] + df['实际工作时长(分钟)']) / 60 df['工作总时长(h)'] = df['实际工作时长(分钟)'] / 60 # 按执行人和出发日期分组,计算每天的数据 daily_stats = df.groupby(['执行人', '出发日期']).agg( station_count=('STATION_NAME', 'nunique'), total_attendance_hours=('出勤总时长(h)', 'sum'), total_work_hours=('工作总时长(h)', 'sum') ).reset_index() # 计算每天是否达标(条件1) daily_stats['达标'] = ( (daily_stats['station_count'] >= 4) & (daily_stats['total_attendance_hours'] >= 7) & (daily_stats['total_work_hours'] >= 2) ) # 按执行人聚合计算总指标 executor_stats = daily_stats.groupby('执行人').agg( meet_days=('达标', 'sum'), total_stations=('station_count', 'sum'), total_attendance_hours=('total_attendance_hours', 'sum'), total_work_hours=('total_work_hours', 'sum') ).reset_index() # 计算衍生指标 executor_stats['平均每天到站址'] = executor_stats['total_stations'] / denominator executor_stats['日均出勤出工时长'] = executor_stats['total_attendance_hours'] / denominator executor_stats['日均有效出勤工时'] = executor_stats['total_work_hours'] / denominator # 添加条件2达标标识 executor_stats['条件2达标'] = ( (executor_stats['平均每天到站址'] >= 4) & (executor_stats['日均出勤出工时长'] >= 7) & (executor_stats['日均有效出勤工时'] >= 2) ) return executor_stats.rename(columns={'meet_days': '出勤达标天数'}) def generate_stats(df, dimension, time_level): """通用统计生成函数""" time_key = { 'daily': '出发日期', 'weekly': '自然周', 'monthly': '自然月' }[time_level] group_keys = dimension['keys'] + [time_key] # 获取维度的主键(用于合并的键名) dimension_key = dimension['keys'][0] # 取维度的第一个键作为合并主键 stats = _calculate_base_stats(df, group_keys) stats['出勤天数'] = df.groupby(group_keys)['出发日期'].nunique().values # 计算月度分母(MIN(TIME_RANGE["end"]的日期部分, 22)) denominator = None if time_level == 'daily': stats['出发日期'] = pd.to_datetime(stats['出发日期']).dt.strftime('%Y-%m-%d') else: stats['出勤天数'] = df.groupby(group_keys)['出发日期'].nunique().values if time_level == 'monthly': # 移除原工作日天数计算,替换为新分母 end_date = datetime.strptime(TIME_RANGE["end"], '%Y-%m-%d %H:%M:%S') end_day = end_date.day denominator = min(end_day, 22) stats = _add_derived_metrics(stats, time_level) # 当维度为执行人且时间粒度为月度时,合并出勤指标(使用正确的键名) if dimension['name'] == '执行人' and time_level == 'monthly' and denominator is not None: attendance_metrics = calculate_attendance_metrics(df, denominator) # 使用维度的实际键名(如'执行人')作为合并键,而非固定的'维度' stats = pd.merge( stats, attendance_metrics[[ # 保留原始'执行人'列作为合并键 '执行人', '出勤达标天数', '平均每天到站址', '日均出勤出工时长', '日均有效出勤工时' ]], on=dimension_key, # 动态使用维度的键名(如'执行人')进行合并 how='left' ) return stats # -------------------------- # 代维统计增强模块 # -------------------------- def enhance_maintainer_stats(filtered_data, member_list_df_filter): time_range_data = filtered_data.copy() # 初始化维度框架 GROUP_KEYS = ['所属组织', '代维简称', '地市', '地市代维'] dim_dfs = [] for key in GROUP_KEYS: df = member_list_df_filter.groupby(key).agg( 应出勤人数=('登陆账号', 'count') ).reset_index() dim_dfs.append(df) if not time_range_data.empty: # ===================== 原有统计逻辑 ===================== # 执行人级别统计 executor_stats = time_range_data.groupby(['执行人', '所属组织', '代维简称', '地市', '地市代维']).agg( 总工单数=('工单编号', 'nunique'), 有效工单数=('工作量评定', lambda s: s.eq('有效').sum()), 总路途时长=('路途时长(分钟)', 'sum'), 总工作时长=('实际工作时长(分钟)', 'sum'), 总出勤天数=('出发日期', 'nunique'), 故障工单数=('工单分类', lambda s: s.eq('故障工单').sum()), 超长退服工单数=('是否超长退服', lambda s: s.eq('是').sum()), 高频故障工单数=('高频故障', lambda s: s.eq('是').sum()), 延期工单数=('是否延期有效', lambda s: s.eq('是').sum()), 故障及时数=('是否及时处理', lambda s: s.eq('是').sum()) ).reset_index() # 计算执行人级别指标 metrics = [ ('平均路途时长(小时)', (executor_stats['总路途时长'] / 60) / executor_stats['总工单数'].replace(0, np.nan)), ('工作时长占比', executor_stats['总工作时长'] / (executor_stats['总路途时长'] + executor_stats['总工作时长']).replace(0, np.nan)), ('人均每日工单数', executor_stats['总工单数'] / executor_stats['总出勤天数'].replace(0, np.nan)), ('人均每日有效工单数', executor_stats['有效工单数'] / executor_stats['总出勤天数'].replace(0, np.nan)), ('人均每日出勤时长(小时)', (executor_stats['总路途时长'] + executor_stats['总工作时长']) / 60 / executor_stats['总出勤天数'].replace( 0, np.nan)), ('人均每日工作时长(小时)', executor_stats['总工作时长'] / 60 / executor_stats['总出勤天数'].replace(0, np.nan)), ('每工单路途时长(小时)', executor_stats['总路途时长'] / 60 / executor_stats['总工单数'].replace(0, np.nan)), ('每工单工作时长(小时)', executor_stats['总工作时长'] / 60 / executor_stats['总工单数'].replace(0, np.nan)), ('超长退服工单占比', executor_stats['超长退服工单数'] / executor_stats['故障工单数'].replace(0, np.nan)), ('高频故障工单占比', executor_stats['高频故障工单数'] / executor_stats['故障工单数'].replace(0, np.nan)), ('延期工单占比', executor_stats['延期工单数'] / executor_stats['故障工单数'].replace(0, np.nan)), ] for col, formula in metrics: executor_stats[col] = formula executor_stats = executor_stats.fillna(0).round(4) # 维度聚合统计 def calculate_dimension_stats(grouped_df, current_key): return grouped_df.groupby(current_key).agg( 总工单数=('总工单数', 'sum'), 有效工单数=('有效工单数', 'sum'), 工单平均路途超2小时人数=('平均路途时长(小时)', lambda x: (x > 2).sum()), 平均工作时长占比低人数=('工作时长占比', lambda x: (x < 0.1).sum()), 人均每日工单数=('人均每日工单数', 'mean'), 人均每日有效工单数=('人均每日有效工单数', 'mean'), 人均每日出勤时长=('人均每日出勤时长(小时)', 'mean'), 人均每日工作时长=('人均每日工作时长(小时)', 'mean'), 每工单路途时长=('每工单路途时长(小时)', 'mean'), 每工单工作时长=('每工单工作时长(小时)', 'mean'), 超长退服工单占比=('超长退服工单占比', 'mean'), 高频故障工单占比=('高频故障工单占比', 'mean'), 延期工单占比=('延期工单占比', 'mean'), ).reset_index().round(4) # 合并维度统计 updated_dims = [] for i, key in enumerate(GROUP_KEYS): dim_stats = calculate_dimension_stats(executor_stats, key) dim_stats['有效工单占比'] = (dim_stats['有效工单数'] / dim_stats['总工单数']).replace(np.inf, 0).fillna( 0).round(4) merged = pd.merge( dim_dfs[i], dim_stats, on=key, how='left' ).fillna(0) updated_dims.append(merged) dim_dfs = updated_dims # ===================== 新增需求处理 ===================== # 提取问题工单 sign_in_issue = time_range_data[time_range_data['原因'] == '签到距离过大'] short_duration_issue = time_range_data[time_range_data['原因'] == '工作时长过短'] top50_travel_orders = time_range_data.nlargest(50, '路途时长(分钟)')['工单编号'].unique() top50_travel_data = time_range_data[time_range_data['工单编号'].isin(top50_travel_orders)] # 处理各维度指标 for i, key in enumerate(GROUP_KEYS): # 合并签到距离过大条目数 sign_in_counts = sign_in_issue.groupby(key)['工单编号'].nunique().reset_index(name='签到距离过大条目数') dim_dfs[i] = pd.merge(dim_dfs[i], sign_in_counts, on=key, how='left') # 合并工作时长过短条目数 short_duration_counts = short_duration_issue.groupby(key)['工单编号'].nunique().reset_index( name='工作时长过短条目数') dim_dfs[i] = pd.merge(dim_dfs[i], short_duration_counts, on=key, how='left') # 合并路途时长TOP50条目数 top50_counts = top50_travel_data.groupby(key)['工单编号'].nunique().reset_index(name='路途时长TOP50条目数') dim_dfs[i] = pd.merge(dim_dfs[i], top50_counts, on=key, how='left') dim_dfs[i] = dim_dfs[i].fillna(0) # ===================== 出勤相关统计 ===================== # 实际出勤人数 def merge_attendance(source_df, target_df, grp_key): att = source_df.groupby(grp_key)['执行人'].nunique().reset_index(name='实际出勤人数') return pd.merge(target_df, att, on=grp_key, how='left').fillna(0) for i, key in enumerate(GROUP_KEYS): dim_dfs[i] = merge_attendance(time_range_data, dim_dfs[i], key) dim_dfs[i]['出勤率'] = (dim_dfs[i]['实际出勤人数'] / dim_dfs[i]['应出勤人数']).replace(np.inf, 0).fillna( 0).round(4) # 出勤天数统计 for i, key in enumerate(GROUP_KEYS): att_cols = ['执行人', key, '出发日期'] att_days = time_range_data[att_cols].drop_duplicates(subset=['执行人', '出发日期']) # 计算每个执行人的出勤天数 att_days_per_executor = att_days.groupby([key, '执行人']).size().reset_index(name='出勤天数') # 总出勤人天(所有人的出勤天数总和) total_attendance_person_days = att_days_per_executor.groupby(key)['出勤天数'].sum().reset_index( name='总出勤人天') dim_dfs[i] = pd.merge(dim_dfs[i], total_attendance_person_days, on=key, how='left').fillna(0) # 出勤天数(不同出发日期的数量) attendance_days = att_days.groupby(key)['出发日期'].nunique().reset_index(name='出勤天数_实际') dim_dfs[i] = pd.merge(dim_dfs[i], attendance_days, on=key, how='left').fillna(0) # 计算人均出勤天数和日均出勤人数 dim_dfs[i]['人均出勤天数'] = (dim_dfs[i]['总出勤人天'] / dim_dfs[i]['实际出勤人数']).replace(np.inf, 0).fillna( 0).round(2) dim_dfs[i]['日均出勤人数'] = (dim_dfs[i]['总出勤人天'] / dim_dfs[i]['出勤天数_实际']).replace(np.inf, 0).fillna( 0).round(2) # 出勤不足3天人数统计 under3 = att_days_per_executor[att_days_per_executor['出勤天数'] < 3].groupby(key)[ '执行人'].count().reset_index(name='出勤不足3天人数') dim_dfs[i] = pd.merge(dim_dfs[i], under3, on=key, how='left').fillna(0) dim_dfs[i]['出勤不足3天人员占比'] = (dim_dfs[i]['出勤不足3天人数'] / dim_dfs[i]['实际出勤人数']).replace( [np.inf, -np.inf], 0).fillna(0).round(4) # ===================== 故障处理及时率计算 ===================== for i, key in enumerate(GROUP_KEYS): fault_data = time_range_data[time_range_data['工单分类'] == '故障工单'] # 初始化要计算的占比列 ratio_columns = ['故障处理及时率', '超长退服工单占比', '高频故障工单占比', '延期工单占比'] if not fault_data.empty: agg_dict = { '是否及时处理': lambda x: x.eq('是').any(), '是否超长退服': lambda x: x.eq('是').any(), '高频故障': lambda x: x.eq('是').any(), '是否延期有效': lambda x: x.eq('是').any(), '所属组织': 'first', '代维简称': 'first', '地市': 'first', '地市代维': 'first' } if key == '地市代维': agg_dict['地市'] = 'first' agg_dict['代维简称'] = 'first' unique_faults = fault_data.groupby('工单编号').agg(agg_dict).reset_index() # 按维度key分组统计 fault_stats_current = unique_faults.groupby(key).agg( 总故障工单数=('工单编号', 'nunique'), 总故障及时数=('是否及时处理', 'sum'), 超长退服工单数=('是否超长退服', 'sum'), 高频故障工单数=('高频故障', 'sum'), 延期工单数=('是否延期有效', 'sum') ).reset_index() # 计算各项占比 fault_stats_current['故障处理及时率'] = ( fault_stats_current['总故障及时数'] / fault_stats_current['总故障工单数'] ).replace([np.inf, -np.inf], 0).fillna(0) fault_stats_current['超长退服工单占比'] = ( fault_stats_current['超长退服工单数'] / fault_stats_current['总故障工单数'] ).replace([np.inf, -np.inf], 0).fillna(0) fault_stats_current['高频故障工单占比'] = ( fault_stats_current['高频故障工单数'] / fault_stats_current['总故障工单数'] ).replace([np.inf, -np.inf], 0).fillna(0) fault_stats_current['延期工单占比'] = ( fault_stats_current['延期工单数'] / fault_stats_current['总故障工单数'] ).replace([np.inf, -np.inf], 0).fillna(0) else: # 创建空数据框并初始化所有占比列为0 fault_stats_current = pd.DataFrame(columns=[key] + ratio_columns) for col in ratio_columns: fault_stats_current[col] = 0.0 # 输出统计结果 logger.info(f"维度[{key}]故障统计结果:") logger.info(f" 总故障工单数: {fault_stats_current.get('总故障工单数', 0).sum()}") logger.info(f" 超长退服工单数: {fault_stats_current.get('超长退服工单数', 0).sum()}") logger.info(f" 高频故障工单数: {fault_stats_current.get('高频故障工单数', 0).sum()}") logger.info(f" 延期工单数: {fault_stats_current.get('延期工单数', 0).sum()}") # 使用map函数手动添加占比数据 for col in ratio_columns: mapping = dict(zip(fault_stats_current[key], fault_stats_current[col])) dim_dfs[i][col] = dim_dfs[i][key].map(mapping).fillna(0) # 输出合并后占比 logger.info(f"维度[{key}]合并后占比:") for col in ratio_columns: mean_value = dim_dfs[i][col].mean() logger.info(f" {col}: {mean_value}") # ===================== 调整字段顺序 ===================== base_columns = [ '应出勤人数', '实际出勤人数', '出勤率', '出勤不足3天人数', '出勤不足3天人员占比', '总工单数', '有效工单数', '有效工单占比', '签到距离过大条目数', '工作时长过短条目数', '路途时长TOP50条目数', '人均出勤天数', '日均出勤人数', '工单平均路途超2小时人数', '平均工作时长占比低人数', '人均每日工单数', '人均每日有效工单数', '人均每日出勤时长', '人均每日工作时长', '每工单路途时长', '每工单工作时长', '超长退服工单占比', '高频故障工单占比', '延期工单占比', '故障处理及时率' ] # 确保所有列都存在 for i, key in enumerate(GROUP_KEYS): # 只保留数据框中实际存在的列 available_columns = [col for col in base_columns if col in dim_dfs[i].columns] ordered_columns = [key] + available_columns dim_dfs[i] = dim_dfs[i][ordered_columns] # ===================== 总计行处理 ===================== for i in range(len(dim_dfs)): df = dim_dfs[i] group_key = GROUP_KEYS[i] # 创建总计行字典 total_row = {group_key: '总计'} # 处理需要sum的字段 sum_cols = [ '应出勤人数', '实际出勤人数', '出勤不足3天人数', '总工单数', '有效工单数', '签到距离过大条目数', '工作时长过短条目数', '路途时长TOP50条目数', '工单平均路途超2小时人数', '平均工作时长占比低人数' ] # 检查并添加出勤相关列 has_total_attendance = '总出勤人天' in df.columns has_attendance_days = '出勤天数_实际' in df.columns if has_total_attendance: sum_cols.append('总出勤人天') if has_attendance_days: sum_cols.append('出勤天数_实际') # 计算总计行的和 for col in sum_cols: if col in df.columns: total_row[col] = df[df[group_key] != '总计'][col].sum() # 计算平均值字段 mean_cols = [ '人均每日工单数', '人均每日有效工单数', '人均每日出勤时长', '人均每日工作时长', '人均出勤天数', '日均出勤人数', '每工单路途时长', '每工单工作时长', '超长退服工单占比', '高频故障工单占比', '延期工单占比', '故障处理及时率' ] for col in mean_cols: if col in df.columns: # 使用加权平均计算总计行的平均值 if '实际出勤人数' in df.columns: total_row[col] = (df[df[group_key] != '总计'][col] * df[df[group_key] != '总计'][ '实际出勤人数']).sum() / df[df[group_key] != '总计']['实际出勤人数'].sum() else: total_row[col] = df[df[group_key] != '总计'][col].mean() # 重新计算总计行中的百分比和比率字段 # 出勤率 total_actual = total_row.get('实际出勤人数', 0) total_expected = total_row.get('应出勤人数', 0) total_row['出勤率'] = total_actual / total_expected if total_expected > 0 else 0 # 出勤不足3天人员占比 total_under3 = total_row.get('出勤不足3天人数', 0) total_row['出勤不足3天人员占比'] = total_under3 / total_actual if total_actual > 0 else 0 # 有效工单占比 total_valid = total_row.get('有效工单数', 0) total_orders = total_row.get('总工单数', 0) total_row['有效工单占比'] = total_valid / total_orders if total_orders > 0 else 0 # 添加总计行到数据框 total_df = pd.DataFrame([total_row]) dim_dfs[i] = pd.concat([df, total_df], ignore_index=True) return tuple(dim_dfs) # -------------------------- # 输出模块 # -------------------------- def save_to_excel(dataframes, path): """增强版Excel输出(带百分比格式)""" with pd.ExcelWriter(path, engine='xlsxwriter') as writer: workbook = writer.book # 定义格式(新增垂直居中设置) header_format = workbook.add_format({ 'bold': True, 'text_wrap': True, # 自动换行 'border': 1, 'bg_color': '#D9EAD3', # 浅绿色背景 'align': 'center', # 水平居中 'valign': 'vcenter' # 垂直居中(新增) }) cell_format = workbook.add_format({ 'border': 1, 'align': 'center', # 内容水平居中 'valign': 'vcenter' # 内容垂直居中(新增) }) percent_format = workbook.add_format({ 'num_format': '0.00%', # 百分比格式 'border': 1, 'align': 'center', # 内容水平居中 'valign': 'vcenter' # 内容垂直居中(新增) }) for name, df in dataframes.items(): # 写入数据时添加标题格式 df.to_excel( writer, sheet_name=name, index=False, header=False, # 禁用自动标题 startrow=1 # 从第二行开始写数据 ) worksheet = writer.sheets[name] # 动态识别百分比列 percent_columns = [ col for col in df.columns if col.endswith(('率', '占比')) ] # 设置列格式(包含标题和数据) for col_num, col_name in enumerate(df.columns): col_width = max(12, len(col_name) * 1.5) # 动态列宽 # 设置标题 worksheet.write(0, col_num, col_name, header_format) # 设置列格式 if col_name in percent_columns: fmt = percent_format else: fmt = cell_format # 应用格式到整列 worksheet.set_column( first_col=col_num, last_col=col_num, width=col_width, cell_format=fmt ) # 冻结首行 worksheet.freeze_panes(1, 0) # 自动筛选(仅对数据量小于10000行的sheet启用) if len(df) < 10000: worksheet.autofilter(0, 0, len(df), len(df.columns) - 1) # -------------------------- # 主流程 # -------------------------- def main(): engine = init_db_engine() try: with time_monitor("数据加载"): site_orders = load_site_orders(engine) order_numbers = site_orders['工单编号'].unique().tolist() order_dfs = batch_load_order_data(engine, order_numbers) with time_monitor("数据合并与处理"): merged_data = merge_order_data(site_orders, order_dfs) if merged_data.empty: logger.warning("无有效数据") return processed_data = ( merged_data .pipe(process_coordinates) .pipe(calculate_distances) .pipe(merge_cran_data, cran_path=FILE_PATHS["cran_info"]) .pipe(generate_time_features) .pipe(calculate_workload) ) output_dfs = {'原始数据': processed_data} with time_monitor("统计计算"): for dim in DIMENSIONS: for time_level in ['daily', 'weekly', 'monthly']: stats_df = generate_stats( processed_data, dimension=dim, time_level=time_level ) level_name = { 'daily': '日统计', 'weekly': '周统计', 'monthly': '月统计' }[time_level] sheet_name = f"{level_name}_{dim['name']}" output_dfs[sheet_name] = stats_df with time_monitor("处理代维组织数据"): member_list_df = pd.read_excel(member_list_path, usecols=member_list_columns, engine='calamine') pattern = r'(?=.*无线2023)(?=.*维护组)(?!.*OLT)' member_list_df = member_list_df[member_list_df['所属组织'].str.contains(pattern, regex=True, na=False)] member_list_df['代维简称'] = member_list_df['所属组织'].str.split("-").str[0] member_list_df['代维简称'] = member_list_df['代维简称'].replace('中移铁通', '铁通') member_list_df = member_list_df.rename(columns={'所属地市': '地市'}) member_list_df['地市代维'] = member_list_df['地市'].str.split("-").str[0] + '-' + member_list_df['代维简称'] member_list_df = member_list_df.drop_duplicates(subset=['登陆账号']) member_list_df['时间戳'] = source_time member_list_df_filter = member_list_df[member_list_df['在职状态'] == '在职'] filtered_data = processed_data[processed_data['所属组织'].isin(member_list_df_filter['所属组织'])] output_dfs['原始数据_副本'] = filtered_data.copy() stats_group, stats_company, stats_city, stats_area = enhance_maintainer_stats( filtered_data, member_list_df_filter ) output_dfs.update({ '代维组维度': stats_group, '代维公司维度': stats_company, '地市维度': stats_city, '地市代维维度': stats_area }) with time_monitor("生成Top50统计"): # 从原始数据_副本中提取Top50 top50_travel = filtered_data.nlargest(50, '路途时长(分钟)') # 选择需要的列 top50_travel = top50_travel[['工单编号', '执行人', '所属组织', '地市', '代维简称', '路途时长(分钟)']] output_dfs['路途时长Top50'] = top50_travel # 在指定的结果表第一列添加时间戳 target_sheets = ['代维组维度', '代维公司维度', '地市维度', '地市代维维度', '路途时长Top50'] for sheet in target_sheets: df = output_dfs[sheet] df.insert(0, '时间戳', source_time) with time_monitor("保存结果"): save_to_excel(output_dfs, FILE_PATHS["output"]) logger.info(f"结果已保存至:{FILE_PATHS['output']}") except Exception as e: logger.error(f"处理失败:{str(e)}", exc_info=True) raise if __name__ == "__main__": main()

任务中心 当前任务列表,单击可以进入任务中心 公司公告 显示公告列表,没有阅读的公告,加大加粗,显示颜色,点开后,失去未读特征 本日签到和签退 显示本日登录时间和本日最后在系统的时间 本日任务进度 饼状图 在建项目总进度 多行日期折现图模式 提醒便签 个人随时记录的提醒功能 汇总模式:单独一个首页,数据为上述功能的汇总和统计 文件与共享 文件区 可以上传和下载文件,记录文件名称,时间,关联项目,关联类别等信息。可以搜索。文件根据对应扩展名不同,显示不同图标,自己上传的文件可以修改类别,名称,描述等新奇。 文件单击可以下载。 文件分为共享区和私有区,分为左右结构,单击图标,有动画过渡,可以显示共享或者私有区域。 项目管理 项目立项 记录项目的名称,建立时间,完结时间,状态,基本描述,备注等信息 项目分析 为项目配置,大的模块,小的模块,功能点和对应的描述信息。可以导出本文档对应的格式docx(不包括图片)。 报价模式 可以配置需求工时和单价,可以配置公式自动生成,也可以手动填写。 项目分配和计划 根据项目和人员进行分配和排期,每个单元格可以单选和多选分配给人员任务。可以导出对应xlsx格式 分配的任务进入任务中心 项目进度 展示和下载两种文档模式的进度 任务中心 任务清单 任务分为周计划和指派任务 所有人可以申请任务,默认的责任人为申请人。 任务包括名称,内容,开始时间,结束时间,任务关联项目,任务类型,申请人,附件等信息 可以设置任务进入初审状态并提交,该任务将由管理员指派人员进入任务测试阶段。 任务可以由执行人设置百分比进度。 测试人完成测试任务后,提交状态为终审。管理员可以将状态设置完成或者驳回。 驳回(定义为缺陷)后,任务人将再次完善任务。提交审核流程。 每一个流程均记录在任务的详细流程中,每个流程,均需要添加备注 根据流程和状态生成流程图。 任务申请后,不可修改,默认为待审核状态,管理员可以修改并分配给对应人员。 管理员额外可以查看到完成任务的实际时间和设置工时(数字类型) 当前任务 按照人员列举,当前所有人员任务列表,鼠标点击显示完整任务信息 任务统计 按照人员项目时间段筛选 统计人员完成任务信息以及工时统计和实际耗时统计 显示并可导出 工时报表 需求报表 缺陷报表 系统配置 管理员管理 可以设定管理员的账号密码以及角色信息 部门管理 可以设定公司的部门信息 角色管理 设定不同的角色名称,用来区分权限功能 角色权限配置 为不同的角色配置不同权限 系统字典类型数据配置 用来配置系统中所有使用的字典数据类型参数,例如事件的处理状态,该功能内置。 出勤管理 可以设定上班下班和加班下班时间点 可以进行搜索,包括普通模式和汇总模式 普通模式:显示各个帐号每天的在线时长明细,如果该天出现多次不同IP,记录不同ip,并标记异常。 汇总模式:显示各个帐号统计的在线时长和出勤天数 单击名字可以按照月度查看出勤情况,并可以在日历上显示和标注,事假和病假调休等事件。 通讯录 记录公司人员相关电话,微信号,应急联系人名字和电话,以及其他相关通讯信息例如物业,水电。 公告系统 新闻类型,可以发布和审核公告信息,显示已经阅读列表 公司制度 新闻类型,可以发布和修改信息,制度带有编号 操作规范 新闻类型,可以发布和修改信息,带有编号 物资系统 物资包括虚拟和真实,每一个物资对应唯一编号,例如一次引进4把椅子,4把椅子各自有自己的编号。 物资的状态可以扩展,包括不限制于,购买,在用,损坏,支出。 物资的购买,需要可以按照时间段统计,总金额,和各状态分类统计 物资的在用和支出,物资可以分配给员工,项目,公司来实现在用状态。 系统参数 系统基本参数配置,配置系统内常用参数,应为可能发生变化的参数点。 系统日志 记录人员和系统操作的历史 使用net6以上,EF+webApi+vue+ele前后端怎么写

def calculate_workday(): server = '100.20.00.10' database = 'D' username = '000' password = '@000' port = '1433' # 连接数据库并执行查询 conn = pymssql.connect(server, user=username, password=password, database=database, port=port) query ="SELECT * FROM DLCH.dbo.CH_MainDt WHERE ((YWNRId >= 1 AND YWNRId <= 41) OR (YWNRId >= 85 AND YWNRId <= 97)) ORDER BY CreateTime" query2 = "SELECT * FROM DLCH.dbo.CH_FlowInstance WHERE LXTypeId = 1 ORDER BY SADate " # 获取查询结果,将结果写入df csv_data = pd.read_sql_query(query, conn) delay_data = pd.read_sql_query(query2, conn) conn.close() # 保证需要参与计算工期的列为数值 columns_to_convert = ['paraInt1', 'paraInt2', 'paraInt3', 'paraFloat1', 'paraFloat2', 'paraFloat5'] for column in columns_to_convert: csv_data[column] = pd.to_numeric(csv_data[column], errors='coerce') # 判断工期和工作量并将结果存入新的列中 def cal_workday(csv_data): csv_data['工期'] = '-999' csv_data['工作量'] = '' csv_data['工作量'] = csv_data['工作量'].astype(str) # 转化成字符串形式 # 处理 YWNRId=7 mask_7 = (csv_data['YWNRId'] == 7) & (csv_data['paraFloat1'].notna()) csv_data.loc[mask_7, '工期'] = csv_data.loc[mask_7, 'paraFloat1'].apply( lambda x: 7 if x <= 50000 else min(7 + 2 * math.ceil((x - 50000) / 50000), 25) ) # YWNRId=85 csv_data.loc[csv_data['YWNRId'] == 85, '工期'] = 3 # YWNRId=11, csv_data.loc[csv_data['YWNRId'] == 11, '工期'] = 10 csv_data = cal_workday(csv_data) csv_data.to_csv('计算结果.csv', index=False, encoding='utf_8_sig') # 执行主函数 calculate_workday()query可以通过字段CHId和query2 连接起来,query通过def calculate_workday()计算了工期的时长,query2的行数多余query。query2的字段SADate为案件登记时间,GongQiHuanBan为缓办案件的天数,StatusName为案件办理状态字段,bjdate办结的日期。计算案件是否超时及超时天数,当案件办理状态=办结或者案件办理状态=发件时,办结的日期-(收件登记时间+工期+节假日(工期天数为工作日)+缓办案件时间)=剩余天数(工作日)大于等于0,未超时;<0,超时;反之,为其他状态时,查询的日期-(收件登记时间+工期(工期天数为工作日)+节假日+缓办案件时间)=剩余天数(工作日)大于等于0,未超时;<0,超时,新增两列,一列为超时判断结果,一列为超时天数。

我有一张excel表格,其中一子表‘blottter’,里面记录了债券的名字、债券的交易日期、债券的交易金额(买入债券金额为正,卖出债券金额为负,单位为元),现在我想找出某一个债券在指定时间段内的持仓金额和持仓天数,该债券可能有多于2笔的交易记录。可以怎么处理?我提供一个示例,债券A在25年7月1日买入1000万,在25年7月3日卖出500万,在25年7月8日卖出500万,那么在25年7月1日至25年7月31日的统计区间内,债券A持仓了1000万2天,持仓了500万5天。需要注意的是,有的债券可能在统计区间之前就已经持仓,比如债券B在25年6月28日买入200万,至25年7月31日仍未卖出,那么在25年7月1日至25年7月31日的统计区间内,债券B持仓200万30天。我在另一个子表表‘统计’中存有需要统计的债券名字以及统计区间,根据‘统计’表中的名字和统计区间,统计债券在区间内的持仓情况。提供一个excel的VBA解决方案。结果的数据我希望分成多列输出,比如债券A持仓1000万2天,持仓500万5天,那么输出为4列,分别为10000000,2,5000000,5。blotter表中的数据列顺序是,A列=交易日期,B列=债券名称,C列=交易金额(单位:元)。‘统计’表的格式为,A1、A2为开始日期和结束日期,B列为债券名,C列开始存储结果,所有债券统一调用A1、A2作为开始日期和结束日期。‘blotter’表和‘统计’表为同一个excel表中的不同子表,结果仍然输出在‘统计’表中

我有一张excel表格,其中一子表‘blotter source’,里面记录了A列=债券的交易日期、B列=债券的名字、C列=债券的交易金额(买入债券金额为正,卖出债券金额为负,单位为元),在另一个子表‘统计’中存有我需要统计的统计时间区间和债券名字,‘统计’表的格式为,A1、A2分别记录统计区间开始日期和结束日期,B列=债券的名字,B列所有债券统一调用A1、A2作为开始日期和结束日期。根据‘统计’表中的债券名字和统计区间,以及‘blotter source’表中的债券交易记录,统计债券在区间内的持仓情况,即找出‘统计’表中债券在统计时间段内的持仓金额和持仓天数,注意每只债券可能有多于2笔的交易记录。我提供一个示例,债券A在25年7月1日买入1000万,在25年7月3日卖出500万,在25年7月8日卖出500万,那么在25年7月1日至25年7月31日的统计区间内,债券A持仓了1000万2天,持仓了500万5天。需要注意的是,有的债券可能在统计区间之前就已经持仓,比如债券B在25年6月28日买入200万,至25年7月31日仍未卖出,那么在25年7月1日至25年7月31日的统计区间内,债券B持仓200万30天。提供一个excel的VBA解决方案。结果的数据我希望分成多列在‘统计’表中对应的债券名称后输出,比如债券A持仓1000万2天,持仓500万5天,那么输出为4列,分别为10000000,2,5000000,5。

最新推荐

recommend-type

the homework of ROS summer school

the homework of ROS summer school
recommend-type

Python打造的Slaee管理系统升级版发布

由于提供的文件信息中,文件名《基于python的slaee管理系统 (15).zip》与描述《基于python的slaee管理系统 (15).zip》相同,并且给出的压缩包文件名称列表中只有一个文件《基于python的slaee管理系统 (14).zip》,该信息表明我们正在讨论两个不同版本的Python系统管理软件的压缩包。以下知识点将根据这些信息详细展开: 知识点一:Python编程语言基础 Python是一种高级编程语言,以其简洁的语法和强大的库支持而闻名。它是解释型语言,具有动态类型系统和垃圾回收功能,适用于多种编程范式,包括面向对象、命令式、函数式和过程式编程。Python广泛应用于系统管理、网络服务器、开发脚本、科学计算、数据挖掘和人工智能等领域。 知识点二:系统管理相关知识 系统管理指的是对计算机系统进行配置、监控和维护的过程,包括硬件资源、软件资源和数据资源的管理。在Python中,系统管理通常涉及操作系统级别的任务,如进程管理、文件系统管理、网络配置、系统日志监控等。Python的系统管理库(例如psutil、fabric、paramiko等)提供了丰富的API来简化这些任务。 知识点三:项目版本控制 从文件名《基于python的slaee管理系统 (14).zip》和《基于python的slaee管理系统 (15).zip》可以看出,这是一个项目在不同版本之间的迭代。版本控制是一种记录一个或多个文件随时间变化的方式,它允许用户可以回到特定版本。在软件开发中,版本控制非常重要,它有助于团队协作、代码合并、分支管理和错误跟踪。常见的版本控制系统包括Git、Subversion (SVN)、Mercurial等。 知识点四:打包与部署 提到“压缩包子文件”,这通常意味着文件已经被压缩打包成一个ZIP文件。在软件开发中,打包是为了便于文件传输、存档保存和分发。在Python项目中,打包也是部署过程的一部分。一个Python项目通常需要包含源代码、依赖关系、配置文件和安装脚本等。打包成ZIP文件后,可以通过各种方式部署到服务器上运行,如使用Fabric或Ansible等自动化部署工具。 知识点五:项目命名及版本命名规则 文件命名中的“基于python的slaee管理系统”表明这是一个与Python语言相关的系统管理项目。而数字“15”和“14”则代表着项目的版本号,这表明项目在持续发展,不同的数字代表了项目在不同时期的迭代。版本号的命名规则通常遵循语义化版本控制(SemVer),这种版本控制系统以 MAJOR.MINOR.PATCH 的形式表示版本号。 知识点六:文件压缩与解压缩技术 ZIP是一种常用的文件压缩格式,它通过减少文件大小来提高存储效率和传输速度。ZIP文件通常是无损压缩,意味着文件在压缩和解压缩的过程中不会丢失信息。Python标准库提供了zipfile模块,允许用户在Python程序中创建ZIP文件、读取ZIP文件内容、提取ZIP文件中的文件等操作。用户可以使用各种图形界面工具(如WinRAR、7-Zip)或命令行工具来处理ZIP文件。 总结以上内容,从文件信息中我们可以得知该内容涉及了Python编程语言、系统管理知识、版本控制、打包与部署、命名规则以及文件压缩技术等多方面的知识点。这些知识点对于理解和应用Python进行系统管理软件开发以及软件的版本迭代、打包与部署均具有重要意义。
recommend-type

深入解析PCB走线传输延时:关键因素与实用公式

# 1. PCB走线传输延时基础 在电子工程领域,尤其是在印刷电路板(PCB)设计中,走线传输延时是一个不可忽视的重要概念。传输延时主要指的是电信号通过PCB上的导电路径从源头传送到目的地所耗费的时间。本章将介绍PCB走线传输延时的基础知识点,为后续章节的深入探讨打下坚实的基础。 首先,了解什么是传输延时是至关重要的。简单来说,它反
recommend-type

gpio很弱是什么意思

<think>嗯,用户的问题是关于GPIO信号弱的概念及其影响。用户似乎对硬件或嵌入式系统有一定了解,但需要更深入的解释。从用户提到的"信号弱"来看,ta可能遇到了实际电路设计中的驱动能力问题,或者在学习GPIO原理时遇到了术语困惑。 用户引用了四篇资料,其中提到GPIO的模拟输入输出模式、施密特触发器的作用、上拉下拉电阻的配置,以及信号线串联电阻的作用。这些内容都与GPIO的驱动能力和信号质量相关。特别是引用[4]中提到的"信号线串联小电阻"和"低频电路不考虑反射",暗示用户可能正在处理实际电路中的信号完整性问题。 用户真正想知道的可能是:为什么我的GPIO输出无法正确驱动某个设备?或者
recommend-type

Python打造的Slaee管理系统升级版发布

标题中的“基于python的slaee管理系统”表明这是一个使用Python编程语言开发的系统。Python是一种广泛使用的高级编程语言,以其易读性和简洁的语法而闻名。SLAEE管理系统可能是指一个特定类型的管理软件,但由于没有给出缩写的完整解释,我们可以假设SLAEE可能是某机构或系统名称的缩写。 从标题和描述来看,存在一处笔误:“基于python的slaee管理系统 (19).zip”和“基于python的slaee管理系统 (18).zip”所指的似乎是同一软件系统,只是版本号不同。根据文件名称列表中的两个文件名,可以推断系统至少有两个版本,一个是版本18,一个是版本19。通常情况下,版本号的增加表示软件进行了更新或改进。 接下来,根据这些信息,我们可以阐述一些相关的知识点: 1. Python编程基础:Python是一种解释型、面向对象、高级编程语言。Python支持多种编程范式,包括过程式、面向对象和函数式编程。Python由于其简洁和易于学习的特性,被广泛应用于网络开发、数据分析、人工智能、机器学习和科学计算等领域。 2. 文件压缩与打包:文件压缩是将文件的大小减小以节省存储空间或网络传输时间的技术。常见的文件压缩格式包括ZIP、RAR、7Z等。文件打包通常指的是将多个文件或文件夹压缩成一个单独的文件。这在数据备份、软件分发和档案管理中非常常见。 3. 版本控制:在软件开发中,“版本”通常指软件的特定状态,版本号则用来标识这些状态。版本控制是一种记录文件、目录或集合随着时间变化的方式,以便将来可以检索特定版本。对于软件项目来说,版本控制是至关重要的,它不仅允许开发者追踪和管理代码的变化,而且还能帮助团队协作,解决冲突,并回滚到旧版本。 4. 软件管理系统的开发:一个软件管理系统可能是针对特定业务领域而设计的,它可能包括用户界面、数据库管理、业务逻辑处理、报告生成和其他许多功能。软件管理系统的开发通常涉及需求分析、系统设计、编程、测试和维护等多个阶段。 5. Python在软件开发中的应用:Python因为具有丰富的库和框架,被广泛用于开发各种类型的软件。例如,Django和Flask是用于Web开发的流行Python框架;而对于数据分析和数据科学任务,Pandas、NumPy和Matplotlib等库提供了强大的数据处理和可视化工具;对于机器学习和人工智能,TensorFlow、PyTorch等库使得复杂算法的实现变得更为简单。 6. 系统更新与维护:随着软件的使用和发展,需求可能会变化,新的问题可能会出现,所以软件系统需要定期进行更新和维护。软件更新可能包括修复已知问题、改进现有功能、增加新功能或优化性能。开发者需要评估是否需要为修复安全漏洞或提高系统性能而更新系统,以及更新之后对用户体验的影响。 由于文件名中只提到了“基于python的slaee管理系统”,没有提供该系统具体功能的详细描述,我们无法提供更加具体的技术知识点。如果需要分析系统的工作原理或具体的技术实现细节,还需要更多的信息。
recommend-type

【Keil-ARM编程艺术】:如何编写可维护且高效的代码

# 摘要 本文旨在为读者提供ARM微控制器编程和Keil开发环境的全面指南。第一章概览了ARM微控制
recommend-type

应用层协议概述

<think>首先,用户的问题是:“我想了解应用层协议的基本概念和常见类型 请问应用层协议简介 常见应用层协议概述”。这翻译成:“I want to understand the basic concepts of application layer protocols and common types. Please introduce application layer protocols and overview common application layer protocols.” 根据系统级指令: - 所有行内数学表达式必须使用$...$格式。 - 独立公式使用$$...$$格式
recommend-type

Delphi 12 TeeChartVCLFMX控件包下载及功能介绍

标题中提到的"Delphi 12 控件之TeeChartVCLFMX-2024.40.rar"指的是Delphi 12版本中使用的TeeChartVCLFMX图表控件的特定版本(2024.40版本)。Delphi是由Embarcadero Technologies开发的一款流行的集成开发环境(IDE),专门用于使用Object Pascal和C++语言开发软件应用程序。该标题强调了Delphi 12环境下TeeChartVCLFMX控件的使用,这表明Delphi的图形用户界面(GUI)组件库中包含了一个专门用于创建复杂图表和图形的组件。 从描述中仅能得到的关于文件的名称是"TeeChartVCLFMX-2024.40.rar",这意味着文件是一个压缩包,具体包含了一个TeeChartVCLFMX的图表控件,版本号为2024.40。它可能包含了在Delphi 12版本中使用该图表控件所需的所有文件,包括库文件、二进制文件、文档等。 标签"delphi 控件"简单而直接地指出了该文件属于Delphi编程环境中的一个控件类别,表明了目标用户是Delphi开发者,他们通常使用这些控件来丰富他们的应用程序界面或增强应用程序的功能。 文件名称列表提供了关于TeeChartVCLFMX压缩包内包含的具体文件及其用途的详细信息: 1. TeeChartVCLFMX-2024.40.exe:这个文件很可能是一个安装程序或可执行文件,用于安装或运行TeeChartVCLFMX图表控件。 2. Keygen.exe:这个文件名表明它可能是一个密钥生成器(Key Generator),用于生成软件的注册码或激活码,使得控件可以脱离试用限制或进行合法授权。 3. Delphi29Binaries-2024.40-windows.pak:这个文件名暗示它包含了特定于Windows平台的Delphi 29(可能指的是Delphi 12的内部版本号)的二进制文件。pak文件是压缩包的一种格式,可能包含了运行TeeChartVCLFMX图表控件所需的库文件、DLLs、组件文件等。 4. TeeChartVCLFMX-2024.40 - D12.pdf:这是一个PDF格式的文件,很可能是用户手册或帮助文档,提供了对TeeChartVCLFMX图表控件版本2024.40在Delphi 12中的使用说明,安装指南,功能介绍或示例代码等。 综合以上信息,可以推断TeeChartVCLFMX-2024.40压缩包是为Delphi 12的开发人员提供了一个专业的图表解决方案,使得用户能够将图表功能集成到他们用Delphi开发的应用程序中。TeeChartVCLFMX可能包含各种图表类型(如条形图、折线图、饼图等),以及丰富的定制选项,如颜色、样式、图例、数据绑定、交互式功能等。开发者可以利用TeeChartVCLFMX提供的图表控件在应用程序中创建强大的数据可视化功能,这对数据分析、科学计算、商业智能、财务报告等领域特别有用。 开发者需要使用Delphi 12 IDE配合提供的组件安装程序或执行文件安装和配置TeeChartVCLFMX控件,通过阅读PDF文档来了解如何使用控件,并在需要时使用Keygen.exe激活控件。Delphi29Binaries-2024.40-windows.pak文件可能包含了控件运行所需的核心二进制组件。通过合理使用这些资源,开发者可以将TeeChartVCLFMX控件集成到自己的项目中,快速构建功能丰富的图表和数据可视化组件。
recommend-type

【Keil-ARM性能优化全解】:代码监控与优化工具的实用技巧

# 摘要 本文对基于Keil-ARM平台的性能优化进行了系统性探讨,涵盖了性能监控工具的介绍、代码优化策略、系统资源管理以及综合案例分析。首先,介绍了Keil-ARM的性能监控工具及其在性能分析中的重要性,包括工具的配置和性能数据解读方法。其次,文中讨论了代码层面的优化,包括高效循环和条件语句的编写,函数内联与尾调用优化,以及编译器优化选项的应用。然后,深入分析了ARM系统资源分配、
recommend-type

电子邮箱协议

<think>首先,用户的问题是:“我想了解电子邮箱相关的协议 请问电子邮件协议有哪些”。这直接询问电子邮件协议的种类。 从系统级指令中,我需要: - 使用中文回答。 - 结构清晰,帮助用户逐步解决问题。 - 保证回答真实可靠。 - 在行内数学表达式使用 $...$,但这里可能没有数学内容。 - 独立公式使用 $$...$$,但可能不需要。 - 生成相关问题。 - 在引用的段落末尾添加引用标识如 [^1]。 参考提供的引用: - 引用[1]: 讨论了SMTP、POP3和IMAP。 - 引用[2]: 提到电子邮件系统采用客户机/服务器模式,涉及接收和发送邮件。 - 引用[3]: 详细描述了P