活动介绍

import pandas as pd # 读取CSV文件 df = pd.read_csv('merged.csv') # 指定日期列的列名 date_column = 'Actual_Arrival_Date' # 将日期列转换为日期类型 df[date_column] = pd.to_datetime(df[date_column]) # 指定日期范围 start_date = pd.to_datetime('2021-01-01') end_date = pd.to_datetime('2021-07-01') # 根据日期范围筛选数据 filtered_df = df[(df[date_column] >= start_date) & (df[date_column] <= end_date)] # 统计两列数据的对应关系 count = filtered_df.groupby(['Place_of_Receipt', 'Port_of_Unlading']).size().reset_index(name='count') # 按照统计数量降序排序 sorted_count = count.sort_values(by='count', ascending=False) #打印前30 top_30 = sorted_count.head(30) print(top_30) 是什么意思

时间: 2024-04-15 13:25:16 浏览: 335
这段代码是用来读取一个名为 'merged.csv' 的CSV文件,并对其中的数据进行处理和统计。 首先,使用 `pd.read_csv('merged.csv')` 读取CSV文件并将数据存储到一个DataFrame对象 `df` 中。 然后,通过指定日期列的列名,使用 `pd.to_datetime()` 将该列的数据转换为日期类型。 接下来,指定了一个日期范围,即从 '2021-01-01' 到 '2021-07-01'。 然后,使用筛选条件 `df[(df[date_column] >= start_date) & (df[date_column] <= end_date)]` 对数据进行筛选,只保留在日期范围内的记录,并将结果存储到 `filtered_df` 中。 接着,使用 `groupby` 函数对 'Place_of_Receipt' 和 'Port_of_Unlading' 两列进行分组,并使用 `size()` 函数统计每个组的数量。 之后,使用 `reset_index()` 将结果重置索引,并将统计结果的列名设置为 'count',存储到 `count` 中。 最后,使用 `sort_values()` 对统计结果按照 'count' 列进行降序排序,并将排序后的结果存储到 `sorted_count` 中。 最终,通过 `head(30)` 只打印排序后的前30行数据,并使用 `print()` 函数将结果打印出来。 这段代码的目的是统计在指定日期范围内,根据 'Place_of_Receipt' 和 'Port_of_Unlading' 两列的对应关系,并按照统计数量降序排序。最后,将排序结果中的前30行打印出来。 希望能够解答你的疑问!如果还有其他问题,请随时提问。
阅读全文

相关推荐

import pandas as pd import numpy as np # 数据合并 train_df1 = pd.read_csv('station00.csv') train_df2 = pd.read_csv('station01.csv') train_df3 = pd.read_csv('station02.csv') combined_df = pd.concat([train_df1, train_df2, train_df3], ignore_index=True) combined_df.to_csv('combined_data.csv', index=False) # 读取数据并设置时间索引 df = pd.read_csv( "combined_data.csv", parse_dates=["date_time"], index_col="date_time" # 关键步骤:指定时间索引 ) # 标记白天时段(6:00至18:00) day_mask = (df.index.hour >= 6) & (df.index.hour < 18) # 替换白天合理零值为 NaN(白天辐照度和功率为0视为异常) df.loc[day_mask, ["nwp_globalirrad", "nwp_directirrad", "power"]] = ( df.loc[day_mask, ["nwp_globalirrad", "nwp_directirrad", "power"]] .replace(0.0, np.nan) ) # 删除关键字段缺失的行 df.dropna(subset=["lmd_totalirrad", "power"], inplace=True) # 确保 power 列为浮点型 df["power"] = df["power"].astype(float) # 时间插值(此时索引已为 DatetimeIndex) df["power"] = df["power"].interpolate(method="time") # 标记夜间时段 night_mask = (df.index.hour < 6) | (df.index.hour >= 18) # 替换夜间合理零值为 NaN df.loc[night_mask, ["nwp_globalirrad", "nwp_directirrad", "power"]] = ( df.loc[night_mask, ["nwp_globalirrad", "nwp_directirrad", "power"]] .replace(0.0, np.nan) ) # 删除关键字段缺失的行 df.dropna(subset=["lmd_totalirrad", "power"], inplace=True) # 确保 power 列为浮点型 df["power"] = df["power"].astype(float) # 时间插值(此时索引已为 DatetimeIndex) df["power"] = df["power"].interpolate(method="time") # 保存处理后的数据 df.to_csv("processed_data.csv") #验证输出 # 检查索引类型和插值结果 print(df.index) print(df["power"].isna().sum())

import pandas as pd import logging from geopy.distance import geodesic import time from contextlib import contextmanager import numpy as np from sqlalchemy import create_engine, text from tenacity import retry, stop_after_attempt, wait_exponential from datetime import datetime # -------------------------- # 通用配置 # -------------------------- DB_CONFIG = { "conn_str": "mysql+pymysql://root:[email protected]:3306/test?charset=utf8", "pool_size": 10, "pool_recycle": 300 } member_list_path = r"D:\OneDrive\ERIC\维护\平台数据\代维信息\2025-07人员信息快照_20250708.xlsx" member_list_columns = ['登陆账号', '所属组织', '所属地市', '在职状态'] TIME_RANGE = { "start": '2025-07-01 00:00:00', "end": '2025-07-08 23:59:59' } # 定义日期格式转换器 def format_time_range(time_range): # 解析开始时间和结束时间 start_date = datetime.strptime(time_range["start"], '%Y-%m-%d %H:%M:%S') end_date = datetime.strptime(time_range["end"], '%Y-%m-%d %H:%M:%S') # 格式化为YYYYMMDD形式并拼接 return f"{start_date.strftime('%Y%m%d')}~{end_date.strftime('%Y%m%d')}" # 获取格式化后的时间范围 source_time = format_time_range(TIME_RANGE) FILE_PATHS = { "cran_info": r'D:\OneDrive\ERIC\维护\test\CRAN机房信息.csv', "output": fr'D:\OneDrive\ERIC\维护\工单\现场执行工单\现场执行工单_{source_time}.xlsx' } TABLE_CONFIG = { '工单_保障': {'columns': ["工单当前状态", "工单号", "工单分类", "维护分类"]}, '工单_巡检': {'columns': [ "工单当前状态", "工单号", "资源ID", "资源名称", "资源经度", "资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '工单_拆站': {'columns': [ "工单当前状态", "工单号", "资源cid AS 资源ID", "资源名称", "资源经度", "资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '工单_验收': {'columns': [ "工单当前状态", "工单号", "资源cid AS 资源ID", "资源名称", "资源经度", "资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '工单_发电': {'columns': [ "工单当前状态", "工单号", "站点ID AS 资源ID", "站点名称 AS 资源名称", "站点经度 AS 资源经度", "站点纬度 AS 资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '工单_通用': {'columns': [ "工单当前状态", "工单号", "资源名称", "资源经度", "资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类" ]}, '故障工单': {'columns': [ "工单状态 AS 工单当前状态", "工单编号 AS 工单号", "STATION_NAME AS 资源名称", "LONGITUDE AS 资源经度", "LATITUDE AS 资源纬度", "STATION_NAME", "STATION_ID", "STATION_LEVEL", "LONGITUDE", "LATITUDE", "工单分类", "维护分类", "故障处理时长_小时", "是否延期有效", "是否及时处理", "高频故障", "是否超长退服", "网元分类" ], 'where_column': '工单编号'} } DIMENSIONS = [ {'name': '执行人', 'keys': ['执行人']}, {'name': '所属组织', 'keys': ['所属组织']}, {'name': '地市', 'keys': ['地市']}, {'name': '代维简称', 'keys': ['代维简称']}, {'name': '地市代维', 'keys': ['地市', '代维简称']} ] # -------------------------- # 工具函数 # -------------------------- @contextmanager def time_monitor(step_name): """耗时监控上下文管理器""" start_time = time.time() try: yield finally: print(f"{step_name} 耗时: {time.time() - start_time:.4f}秒") def setup_logging(): """日志配置""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("data_processor_v2.log"), logging.StreamHandler() ] ) return logging.getLogger(__name__) logger = setup_logging() @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def safe_read_sql(sql: text, engine, params: dict = None) -> pd.DataFrame: """带重试机制的SQL读取""" logger.debug(f"执行SQL: {sql} \n参数: {params}") return pd.read_sql(sql, engine, params=params) def init_db_engine(): """初始化数据库引擎""" return create_engine( DB_CONFIG["conn_str"], pool_size=DB_CONFIG["pool_size"], pool_recycle=DB_CONFIG["pool_recycle"] ) # -------------------------- # 数据校验函数 # -------------------------- def is_valid_coordinates(lat, lon): """经纬度有效性校验""" if pd.isna(lat) or pd.isna(lon): return False if not (-90 <= lat <= 90 and -180 <= lon <= 180): return False if abs(lat) < 1e-6 and abs(lon) < 1e-6: return False return True def calculate_distance(coord1, coord2): """带校验的距离计算""" try: lat1, lon1 = coord1 lat2, lon2 = coord2 except (TypeError, ValueError) as e: logger.debug(f"坐标解析失败: {str(e)} | coord1={coord1} coord2={coord2}") return 99999.0 if not (is_valid_coordinates(lat1, lon1) and is_valid_coordinates(lat2, lon2)): return 99999.0 return geodesic(coord1, coord2).meters # -------------------------- # 数据加载模块 # -------------------------- def load_site_orders(engine): """加载现场执行工单数据""" sql = text(""" SELECT 地市, 代维简称, 专业, 工单类型, 工单编号, 执行人, 所属组织, 出发时间, 出发经度, 出发纬度, 签到时间, 签到经度, 签到纬度, 完成时间, 完成经度, 完成纬度, 路途时长, 实际工作时长 FROM 现场执行工单 WHERE 出发时间 BETWEEN :start_time AND :end_time """) params = { 'start_time': TIME_RANGE["start"], 'end_time': TIME_RANGE["end"] } df = safe_read_sql(sql, engine, params) df = df[~df['所属组织'].str.contains('高铁')] df['代维简称'] = df['代维简称'].replace('中移铁通', '铁通') valid_companies = ['中贝', '中通服', '中邮建', '唐人', '宜通', '怡创', '浙通服', '润建', '虹信', '超讯', '铁通', '长实'] df = df[df['代维简称'].isin(valid_companies)] df['地市代维'] = df['地市'].str.split("-").str[0] + '-' + df['代维简称'] logger.info(f"加载现场工单记录数: {len(df)}") return df def batch_load_order_data(engine, order_numbers, batch_size=500): """批量加载工单相关数据""" dfs = [] for table, config in TABLE_CONFIG.items(): try: columns = ", ".join(config['columns']) where_col = config.get('where_column', '工单号') # 修复:正确构造SQL查询字符串,确保引号闭合 sql_template = text(f"SELECT {columns} FROM {table} WHERE {where_col} IN :order_nums") table_dfs = [] for i in range(0, len(order_numbers), batch_size): batch = order_numbers[i:i + batch_size] df = safe_read_sql(sql_template, engine, params={'order_nums': batch}) table_dfs.append(df) if table_dfs: table_df = pd.concat(table_dfs, ignore_index=True) dfs.append(table_df) logger.info(f"表 {table} 加载完成,记录数: {len(table_df)}") except Exception as e: logger.error(f"加载表 {table} 失败: {str(e)}", exc_info=True) return dfs # -------------------------- # 数据处理模块 # -------------------------- def merge_order_data(site_df, order_dfs): """合并工单数据(增强版)""" if not order_dfs: logger.warning("没有需要合并的工单数据") return pd.DataFrame() final_df = pd.concat(order_dfs, axis=0, ignore_index=True) final_df = final_df.drop_duplicates(subset=['工单号'], keep='first') merged_df = pd.merge( left=site_df, right=final_df, left_on='工单编号', right_on='工单号', how='left', ) merged_df.drop(columns=['工单号'], inplace=True) merged_df['路途时长(分钟)'] = (merged_df['路途时长'] * 60).round(2) merged_df['实际工作时长(分钟)'] = (merged_df['实际工作时长'] * 60).round(2) # 过滤有效状态 condition = ( merged_df['工单当前状态'].notna() & (merged_df['工单当前状态'] != '') & ~merged_df['工单当前状态'].str.contains('已驳回|已撤销|已关闭', na=False) ) return merged_df[condition] def get_custom_natural_week(date): """ 计算“自然周”编号。本定义中,一周从上周三开始,到本周二结束。 """ if pd.isnull(date): return np.nan weekday = date.weekday() days_since_last_thursday = (weekday + 5) % 7 date = date - pd.Timedelta(days=days_since_last_thursday) return date.isocalendar()[1] # -------------------------- # 核心处理模块 # -------------------------- def generate_time_features(df): """增强时间特征生成""" df['出发时间'] = pd.to_datetime(df['出发时间'], errors='coerce') df['自然年'] = df['出发时间'].dt.year df['自然月'] = df['出发时间'].dt.to_period('M').astype(str) df['自然周'] = df['出发时间'].apply(get_custom_natural_week) df['出发日期'] = df['出发时间'].dt.date return df def process_coordinates(df): """坐标处理""" coord_columns = [ "资源纬度", "资源经度", "签到纬度", "签到经度", "LATITUDE", "LONGITUDE", "完成纬度", "完成经度" ] for col in coord_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') else: logger.warning(f"列 {col} 不存在") fault_columns = ['是否超长退服', '高频故障', '是否延期有效', '是否及时处理'] for col in fault_columns: if col not in df.columns: df[col] = '否' return df def calculate_distances(df): """距离计算""" df["签到距离"] = df.apply( lambda row: calculate_distance( (row["签到纬度"], row["签到经度"]), (row["资源纬度"], row["资源经度"]) ), axis=1 ) return df def merge_cran_data(main_df, cran_path): """合并CRAN机房数据并取最近RU距离""" cran_df = pd.read_csv(cran_path) for col in ["ru纬度", "ru经度"]: cran_df[col] = pd.to_numeric(cran_df[col], errors="coerce").fillna(0) merged = pd.merge( main_df[["工单编号", "STATION_NAME", "签到经度", "签到纬度"]], cran_df, left_on="STATION_NAME", right_on="station_name", how="left" ) merged["签到距离_CRAN"] = merged.apply( lambda row: calculate_distance( (row["签到纬度"], row["签到经度"]), (row["ru纬度"], row["ru经度"]) ), axis=1 ) min_distances = merged.groupby("工单编号", as_index=False)["签到距离_CRAN"].min() return pd.merge( main_df, min_distances, on="工单编号", how="left" ).fillna({"签到距离_CRAN": 99999.0}) # -------------------------- # 统计计算模块 # -------------------------- def calculate_workload(df): """工作量评定""" df['工作量评定'] = '有效' df['原因'] = '' no_completion = df['完成时间'].isna() df.loc[no_completion, ['工作量评定', '原因']] = ['无效', '无完成时间'] short_work = df['实际工作时长(分钟)'] < 5 df.loc[short_work, ['工作量评定', '原因']] = ['无效', '工作时长过短'] if all(col in df.columns for col in ['签到距离', '签到距离_CRAN', '维护分类']): invalid_dist = ( ~df['维护分类'].isin(['发电', '保障']) & (df['签到距离'] > 300) & (df['签到距离_CRAN'] > 300) ) df.loc[invalid_dist, ['工作量评定', '原因']] = ['无效', '签到距离过大'] return df def _calculate_base_stats(df, group_keys): """通用基础指标计算""" categories = df['维护分类'].dropna().unique() agg_dict = { # 总工单数(已去重) "总工单数": ('工单编号', 'nunique'), # 有效工单数(去重) "有效工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['工作量评定'] == "有效"].nunique()), # 故障工单数(去重):统计工单分类为"故障工单"的工单编号唯一数 "故障工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['工单分类'] == "故障工单"].nunique()), # 故障及时数(去重):统计同时满足工单分类为故障工单且是否及时处理为"是"的工单编号唯一数 "故障及时数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][ (df['工单分类'] == "故障工单") & (df['是否及时处理'] == "是") ].nunique()), # 故障超时数(去重):统计同时满足工单分类为故障工单且是否及时处理为"否"的工单编号唯一数 "故障超时数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][ (df['工单分类'] == "故障工单") & (df['是否及时处理'] == "否") ].nunique()), # 超长退服工单数(去重):统计是否超长退服为"是"的工单编号唯一数 "超长退服工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['是否超长退服'] == "是"].nunique()), # 超频故障工单数(去重):统计高频故障为"是"的工单编号唯一数 "超频故障工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['高频故障'] == "是"].nunique()), # 延期工单数(去重):统计是否延期有效为"是"的工单编号唯一数 "延期工单数": ('工单编号', lambda x: df.loc[x.index, '工单编号'][df['是否延期有效'] == "是"].nunique()), # 其他非去重指标保持不变 "故障处理时长": ('故障处理时长_小时', lambda x: x[df['工单分类'] == "故障工单"].sum()), "路途总时长": ('路途时长(分钟)', 'sum'), "工作总时长": ('实际工作时长(分钟)', 'sum'), "出勤人数": ('执行人', 'nunique'), "出勤人天": ('工单编号', lambda x: df.loc[x.index][['执行人', '出发日期']].drop_duplicates().shape[0]), } for cat in categories: agg_dict[f"{cat}工单数"] = ( '维护分类', lambda x, current_cat=cat: x.eq(current_cat).sum() ) stats = df.groupby(group_keys).agg(**agg_dict).reset_index() return stats def _add_derived_metrics(stats, time_level): """通用衍生指标计算""" stats['出勤总时长'] = stats['路途总时长'] + stats['工作总时长'] hour_cols = ['出勤总时长', '路途总时长', '工作总时长'] for col in hour_cols: stats[f'{col}(h)'] = (stats[col] / 60).round(2) stats['平均工单工作时长(h)'] = ( stats['工作总时长(h)'] / stats['总工单数'] ).replace([np.inf, -np.inf], 0).fillna(0).round(2) stats['平均工单路途时长(h)'] = ( stats['路途总时长(h)'] / stats['总工单数'] ).replace([np.inf, -np.inf], 0).fillna(0).round(2) if time_level == 'weekly': stats['人均能效_周'] = (stats['有效工单数'] / stats['出勤人天']).round(2) elif time_level == 'monthly': stats['人均能效_月'] = (stats['有效工单数'] / stats['出勤人天']).round(2) stats['故障处理及时率'] = (stats['故障及时数'] / stats['故障工单数']).replace(np.inf, 0) stats['工作时长占比'] = (stats['工作总时长'] / stats['出勤总时长']).replace(np.inf, 0) stats['人均出勤天数'] = (stats['出勤人天'] / stats['出勤人数']).replace(np.inf, 0).round(2) stats['日均出勤人数'] = (stats['出勤人天'] / stats['出勤天数']).replace(np.inf, 0).round(2) return stats def calculate_attendance_metrics(df, denominator): """计算出勤相关指标""" df = df.copy() df['出勤总时长(h)'] = (df['路途时长(分钟)'] + df['实际工作时长(分钟)']) / 60 df['工作总时长(h)'] = df['实际工作时长(分钟)'] / 60 # 按执行人和出发日期分组,计算每天的数据 daily_stats = df.groupby(['执行人', '出发日期']).agg( station_count=('STATION_NAME', 'nunique'), total_attendance_hours=('出勤总时长(h)', 'sum'), total_work_hours=('工作总时长(h)', 'sum') ).reset_index() # 计算每天是否达标(条件1) daily_stats['达标'] = ( (daily_stats['station_count'] >= 4) & (daily_stats['total_attendance_hours'] >= 7) & (daily_stats['total_work_hours'] >= 2) ) # 按执行人聚合计算总指标 executor_stats = daily_stats.groupby('执行人').agg( meet_days=('达标', 'sum'), total_stations=('station_count', 'sum'), total_attendance_hours=('total_attendance_hours', 'sum'), total_work_hours=('total_work_hours', 'sum') ).reset_index() # 计算衍生指标 executor_stats['平均每天到站址'] = executor_stats['total_stations'] / denominator executor_stats['日均出勤出工时长'] = executor_stats['total_attendance_hours'] / denominator executor_stats['日均有效出勤工时'] = executor_stats['total_work_hours'] / denominator # 添加条件2达标标识 executor_stats['条件2达标'] = ( (executor_stats['平均每天到站址'] >= 4) & (executor_stats['日均出勤出工时长'] >= 7) & (executor_stats['日均有效出勤工时'] >= 2) ) return executor_stats.rename(columns={'meet_days': '出勤达标天数'}) def generate_stats(df, dimension, time_level): """通用统计生成函数""" time_key = { 'daily': '出发日期', 'weekly': '自然周', 'monthly': '自然月' }[time_level] group_keys = dimension['keys'] + [time_key] # 获取维度的主键(用于合并的键名) dimension_key = dimension['keys'][0] # 取维度的第一个键作为合并主键 stats = _calculate_base_stats(df, group_keys) stats['出勤天数'] = df.groupby(group_keys)['出发日期'].nunique().values # 计算月度分母(MIN(TIME_RANGE["end"]的日期部分, 22)) denominator = None if time_level == 'daily': stats['出发日期'] = pd.to_datetime(stats['出发日期']).dt.strftime('%Y-%m-%d') else: stats['出勤天数'] = df.groupby(group_keys)['出发日期'].nunique().values if time_level == 'monthly': # 移除原工作日天数计算,替换为新分母 end_date = datetime.strptime(TIME_RANGE["end"], '%Y-%m-%d %H:%M:%S') end_day = end_date.day denominator = min(end_day, 22) stats = _add_derived_metrics(stats, time_level) # 当维度为执行人且时间粒度为月度时,合并出勤指标(使用正确的键名) if dimension['name'] == '执行人' and time_level == 'monthly' and denominator is not None: attendance_metrics = calculate_attendance_metrics(df, denominator) # 使用维度的实际键名(如'执行人')作为合并键,而非固定的'维度' stats = pd.merge( stats, attendance_metrics[[ # 保留原始'执行人'列作为合并键 '执行人', '出勤达标天数', '平均每天到站址', '日均出勤出工时长', '日均有效出勤工时' ]], on=dimension_key, # 动态使用维度的键名(如'执行人')进行合并 how='left' ) return stats # -------------------------- # 代维统计增强模块 # -------------------------- def enhance_maintainer_stats(filtered_data, member_list_df_filter): time_range_data = filtered_data.copy() # 初始化维度框架 GROUP_KEYS = ['所属组织', '代维简称', '地市', '地市代维'] dim_dfs = [] for key in GROUP_KEYS: df = member_list_df_filter.groupby(key).agg( 应出勤人数=('登陆账号', 'count') ).reset_index() dim_dfs.append(df) if not time_range_data.empty: # ===================== 原有统计逻辑 ===================== # 执行人级别统计 executor_stats = time_range_data.groupby(['执行人', '所属组织', '代维简称', '地市', '地市代维']).agg( 总工单数=('工单编号', 'nunique'), 有效工单数=('工作量评定', lambda s: s.eq('有效').sum()), 总路途时长=('路途时长(分钟)', 'sum'), 总工作时长=('实际工作时长(分钟)', 'sum'), 总出勤天数=('出发日期', 'nunique'), 故障工单数=('工单分类', lambda s: s.eq('故障工单').sum()), 超长退服工单数=('是否超长退服', lambda s: s.eq('是').sum()), 高频故障工单数=('高频故障', lambda s: s.eq('是').sum()), 延期工单数=('是否延期有效', lambda s: s.eq('是').sum()), 故障及时数=('是否及时处理', lambda s: s.eq('是').sum()) ).reset_index() # 计算执行人级别指标 metrics = [ ('平均路途时长(小时)', (executor_stats['总路途时长'] / 60) / executor_stats['总工单数'].replace(0, np.nan)), ('工作时长占比', executor_stats['总工作时长'] / (executor_stats['总路途时长'] + executor_stats['总工作时长']).replace(0, np.nan)), ('人均每日工单数', executor_stats['总工单数'] / executor_stats['总出勤天数'].replace(0, np.nan)), ('人均每日有效工单数', executor_stats['有效工单数'] / executor_stats['总出勤天数'].replace(0, np.nan)), ('人均每日出勤时长(小时)', (executor_stats['总路途时长'] + executor_stats['总工作时长']) / 60 / executor_stats['总出勤天数'].replace( 0, np.nan)), ('人均每日工作时长(小时)', executor_stats['总工作时长'] / 60 / executor_stats['总出勤天数'].replace(0, np.nan)), ('每工单路途时长(小时)', executor_stats['总路途时长'] / 60 / executor_stats['总工单数'].replace(0, np.nan)), ('每工单工作时长(小时)', executor_stats['总工作时长'] / 60 / executor_stats['总工单数'].replace(0, np.nan)), ('超长退服工单占比', executor_stats['超长退服工单数'] / executor_stats['故障工单数'].replace(0, np.nan)), ('高频故障工单占比', executor_stats['高频故障工单数'] / executor_stats['故障工单数'].replace(0, np.nan)), ('延期工单占比', executor_stats['延期工单数'] / executor_stats['故障工单数'].replace(0, np.nan)), ] for col, formula in metrics: executor_stats[col] = formula executor_stats = executor_stats.fillna(0).round(4) # 维度聚合统计 def calculate_dimension_stats(grouped_df, current_key): return grouped_df.groupby(current_key).agg( 总工单数=('总工单数', 'sum'), 有效工单数=('有效工单数', 'sum'), 工单平均路途超2小时人数=('平均路途时长(小时)', lambda x: (x > 2).sum()), 平均工作时长占比低人数=('工作时长占比', lambda x: (x < 0.1).sum()), 人均每日工单数=('人均每日工单数', 'mean'), 人均每日有效工单数=('人均每日有效工单数', 'mean'), 人均每日出勤时长=('人均每日出勤时长(小时)', 'mean'), 人均每日工作时长=('人均每日工作时长(小时)', 'mean'), 每工单路途时长=('每工单路途时长(小时)', 'mean'), 每工单工作时长=('每工单工作时长(小时)', 'mean'), 超长退服工单占比=('超长退服工单占比', 'mean'), 高频故障工单占比=('高频故障工单占比', 'mean'), 延期工单占比=('延期工单占比', 'mean'), ).reset_index().round(4) # 合并维度统计 updated_dims = [] for i, key in enumerate(GROUP_KEYS): dim_stats = calculate_dimension_stats(executor_stats, key) dim_stats['有效工单占比'] = (dim_stats['有效工单数'] / dim_stats['总工单数']).replace(np.inf, 0).fillna( 0).round(4) merged = pd.merge( dim_dfs[i], dim_stats, on=key, how='left' ).fillna(0) updated_dims.append(merged) dim_dfs = updated_dims # ===================== 新增需求处理 ===================== # 提取问题工单 sign_in_issue = time_range_data[time_range_data['原因'] == '签到距离过大'] short_duration_issue = time_range_data[time_range_data['原因'] == '工作时长过短'] top50_travel_orders = time_range_data.nlargest(50, '路途时长(分钟)')['工单编号'].unique() top50_travel_data = time_range_data[time_range_data['工单编号'].isin(top50_travel_orders)] # 处理各维度指标 for i, key in enumerate(GROUP_KEYS): # 合并签到距离过大条目数 sign_in_counts = sign_in_issue.groupby(key)['工单编号'].nunique().reset_index(name='签到距离过大条目数') dim_dfs[i] = pd.merge(dim_dfs[i], sign_in_counts, on=key, how='left') # 合并工作时长过短条目数 short_duration_counts = short_duration_issue.groupby(key)['工单编号'].nunique().reset_index( name='工作时长过短条目数') dim_dfs[i] = pd.merge(dim_dfs[i], short_duration_counts, on=key, how='left') # 合并路途时长TOP50条目数 top50_counts = top50_travel_data.groupby(key)['工单编号'].nunique().reset_index(name='路途时长TOP50条目数') dim_dfs[i] = pd.merge(dim_dfs[i], top50_counts, on=key, how='left') dim_dfs[i] = dim_dfs[i].fillna(0) # ===================== 出勤相关统计 ===================== # 实际出勤人数 def merge_attendance(source_df, target_df, grp_key): att = source_df.groupby(grp_key)['执行人'].nunique().reset_index(name='实际出勤人数') return pd.merge(target_df, att, on=grp_key, how='left').fillna(0) for i, key in enumerate(GROUP_KEYS): dim_dfs[i] = merge_attendance(time_range_data, dim_dfs[i], key) dim_dfs[i]['出勤率'] = (dim_dfs[i]['实际出勤人数'] / dim_dfs[i]['应出勤人数']).replace(np.inf, 0).fillna( 0).round(4) # 出勤天数统计 for i, key in enumerate(GROUP_KEYS): att_cols = ['执行人', key, '出发日期'] att_days = time_range_data[att_cols].drop_duplicates(subset=['执行人', '出发日期']) # 计算每个执行人的出勤天数 att_days_per_executor = att_days.groupby([key, '执行人']).size().reset_index(name='出勤天数') # 总出勤人天(所有人的出勤天数总和) total_attendance_person_days = att_days_per_executor.groupby(key)['出勤天数'].sum().reset_index( name='总出勤人天') dim_dfs[i] = pd.merge(dim_dfs[i], total_attendance_person_days, on=key, how='left').fillna(0) # 出勤天数(不同出发日期的数量) attendance_days = att_days.groupby(key)['出发日期'].nunique().reset_index(name='出勤天数_实际') dim_dfs[i] = pd.merge(dim_dfs[i], attendance_days, on=key, how='left').fillna(0) # 计算人均出勤天数和日均出勤人数 dim_dfs[i]['人均出勤天数'] = (dim_dfs[i]['总出勤人天'] / dim_dfs[i]['实际出勤人数']).replace(np.inf, 0).fillna( 0).round(2) dim_dfs[i]['日均出勤人数'] = (dim_dfs[i]['总出勤人天'] / dim_dfs[i]['出勤天数_实际']).replace(np.inf, 0).fillna( 0).round(2) # 出勤不足3天人数统计 under3 = att_days_per_executor[att_days_per_executor['出勤天数'] < 3].groupby(key)[ '执行人'].count().reset_index(name='出勤不足3天人数') dim_dfs[i] = pd.merge(dim_dfs[i], under3, on=key, how='left').fillna(0) dim_dfs[i]['出勤不足3天人员占比'] = (dim_dfs[i]['出勤不足3天人数'] / dim_dfs[i]['实际出勤人数']).replace( [np.inf, -np.inf], 0).fillna(0).round(4) # ===================== 故障处理及时率计算 ===================== for i, key in enumerate(GROUP_KEYS): fault_data = time_range_data[time_range_data['工单分类'] == '故障工单'] # 初始化要计算的占比列 ratio_columns = ['故障处理及时率', '超长退服工单占比', '高频故障工单占比', '延期工单占比'] if not fault_data.empty: agg_dict = { '是否及时处理': lambda x: x.eq('是').any(), '是否超长退服': lambda x: x.eq('是').any(), '高频故障': lambda x: x.eq('是').any(), '是否延期有效': lambda x: x.eq('是').any(), '所属组织': 'first', '代维简称': 'first', '地市': 'first', '地市代维': 'first' } if key == '地市代维': agg_dict['地市'] = 'first' agg_dict['代维简称'] = 'first' unique_faults = fault_data.groupby('工单编号').agg(agg_dict).reset_index() # 按维度key分组统计 fault_stats_current = unique_faults.groupby(key).agg( 总故障工单数=('工单编号', 'nunique'), 总故障及时数=('是否及时处理', 'sum'), 超长退服工单数=('是否超长退服', 'sum'), 高频故障工单数=('高频故障', 'sum'), 延期工单数=('是否延期有效', 'sum') ).reset_index() # 计算各项占比 fault_stats_current['故障处理及时率'] = ( fault_stats_current['总故障及时数'] / fault_stats_current['总故障工单数'] ).replace([np.inf, -np.inf], 0).fillna(0) fault_stats_current['超长退服工单占比'] = ( fault_stats_current['超长退服工单数'] / fault_stats_current['总故障工单数'] ).replace([np.inf, -np.inf], 0).fillna(0) fault_stats_current['高频故障工单占比'] = ( fault_stats_current['高频故障工单数'] / fault_stats_current['总故障工单数'] ).replace([np.inf, -np.inf], 0).fillna(0) fault_stats_current['延期工单占比'] = ( fault_stats_current['延期工单数'] / fault_stats_current['总故障工单数'] ).replace([np.inf, -np.inf], 0).fillna(0) else: # 创建空数据框并初始化所有占比列为0 fault_stats_current = pd.DataFrame(columns=[key] + ratio_columns) for col in ratio_columns: fault_stats_current[col] = 0.0 # 输出统计结果 logger.info(f"维度[{key}]故障统计结果:") logger.info(f" 总故障工单数: {fault_stats_current.get('总故障工单数', 0).sum()}") logger.info(f" 超长退服工单数: {fault_stats_current.get('超长退服工单数', 0).sum()}") logger.info(f" 高频故障工单数: {fault_stats_current.get('高频故障工单数', 0).sum()}") logger.info(f" 延期工单数: {fault_stats_current.get('延期工单数', 0).sum()}") # 使用map函数手动添加占比数据 for col in ratio_columns: mapping = dict(zip(fault_stats_current[key], fault_stats_current[col])) dim_dfs[i][col] = dim_dfs[i][key].map(mapping).fillna(0) # 输出合并后占比 logger.info(f"维度[{key}]合并后占比:") for col in ratio_columns: mean_value = dim_dfs[i][col].mean() logger.info(f" {col}: {mean_value}") # ===================== 调整字段顺序 ===================== base_columns = [ '应出勤人数', '实际出勤人数', '出勤率', '出勤不足3天人数', '出勤不足3天人员占比', '总工单数', '有效工单数', '有效工单占比', '签到距离过大条目数', '工作时长过短条目数', '路途时长TOP50条目数', '人均出勤天数', '日均出勤人数', '工单平均路途超2小时人数', '平均工作时长占比低人数', '人均每日工单数', '人均每日有效工单数', '人均每日出勤时长', '人均每日工作时长', '每工单路途时长', '每工单工作时长', '超长退服工单占比', '高频故障工单占比', '延期工单占比', '故障处理及时率' ] # 确保所有列都存在 for i, key in enumerate(GROUP_KEYS): # 只保留数据框中实际存在的列 available_columns = [col for col in base_columns if col in dim_dfs[i].columns] ordered_columns = [key] + available_columns dim_dfs[i] = dim_dfs[i][ordered_columns] # ===================== 总计行处理 ===================== for i in range(len(dim_dfs)): df = dim_dfs[i] group_key = GROUP_KEYS[i] # 创建总计行字典 total_row = {group_key: '总计'} # 处理需要sum的字段 sum_cols = [ '应出勤人数', '实际出勤人数', '出勤不足3天人数', '总工单数', '有效工单数', '签到距离过大条目数', '工作时长过短条目数', '路途时长TOP50条目数', '工单平均路途超2小时人数', '平均工作时长占比低人数' ] # 检查并添加出勤相关列 has_total_attendance = '总出勤人天' in df.columns has_attendance_days = '出勤天数_实际' in df.columns if has_total_attendance: sum_cols.append('总出勤人天') if has_attendance_days: sum_cols.append('出勤天数_实际') # 计算总计行的和 for col in sum_cols: if col in df.columns: total_row[col] = df[df[group_key] != '总计'][col].sum() # 计算平均值字段 mean_cols = [ '人均每日工单数', '人均每日有效工单数', '人均每日出勤时长', '人均每日工作时长', '人均出勤天数', '日均出勤人数', '每工单路途时长', '每工单工作时长', '超长退服工单占比', '高频故障工单占比', '延期工单占比', '故障处理及时率' ] for col in mean_cols: if col in df.columns: # 使用加权平均计算总计行的平均值 if '实际出勤人数' in df.columns: total_row[col] = (df[df[group_key] != '总计'][col] * df[df[group_key] != '总计'][ '实际出勤人数']).sum() / df[df[group_key] != '总计']['实际出勤人数'].sum() else: total_row[col] = df[df[group_key] != '总计'][col].mean() # 重新计算总计行中的百分比和比率字段 # 出勤率 total_actual = total_row.get('实际出勤人数', 0) total_expected = total_row.get('应出勤人数', 0) total_row['出勤率'] = total_actual / total_expected if total_expected > 0 else 0 # 出勤不足3天人员占比 total_under3 = total_row.get('出勤不足3天人数', 0) total_row['出勤不足3天人员占比'] = total_under3 / total_actual if total_actual > 0 else 0 # 有效工单占比 total_valid = total_row.get('有效工单数', 0) total_orders = total_row.get('总工单数', 0) total_row['有效工单占比'] = total_valid / total_orders if total_orders > 0 else 0 # 添加总计行到数据框 total_df = pd.DataFrame([total_row]) dim_dfs[i] = pd.concat([df, total_df], ignore_index=True) return tuple(dim_dfs) # -------------------------- # 输出模块 # -------------------------- def save_to_excel(dataframes, path): """增强版Excel输出(带百分比格式)""" with pd.ExcelWriter(path, engine='xlsxwriter') as writer: workbook = writer.book # 定义格式(新增垂直居中设置) header_format = workbook.add_format({ 'bold': True, 'text_wrap': True, # 自动换行 'border': 1, 'bg_color': '#D9EAD3', # 浅绿色背景 'align': 'center', # 水平居中 'valign': 'vcenter' # 垂直居中(新增) }) cell_format = workbook.add_format({ 'border': 1, 'align': 'center', # 内容水平居中 'valign': 'vcenter' # 内容垂直居中(新增) }) percent_format = workbook.add_format({ 'num_format': '0.00%', # 百分比格式 'border': 1, 'align': 'center', # 内容水平居中 'valign': 'vcenter' # 内容垂直居中(新增) }) for name, df in dataframes.items(): # 写入数据时添加标题格式 df.to_excel( writer, sheet_name=name, index=False, header=False, # 禁用自动标题 startrow=1 # 从第二行开始写数据 ) worksheet = writer.sheets[name] # 动态识别百分比列 percent_columns = [ col for col in df.columns if col.endswith(('率', '占比')) ] # 设置列格式(包含标题和数据) for col_num, col_name in enumerate(df.columns): col_width = max(12, len(col_name) * 1.5) # 动态列宽 # 设置标题 worksheet.write(0, col_num, col_name, header_format) # 设置列格式 if col_name in percent_columns: fmt = percent_format else: fmt = cell_format # 应用格式到整列 worksheet.set_column( first_col=col_num, last_col=col_num, width=col_width, cell_format=fmt ) # 冻结首行 worksheet.freeze_panes(1, 0) # 自动筛选(仅对数据量小于10000行的sheet启用) if len(df) < 10000: worksheet.autofilter(0, 0, len(df), len(df.columns) - 1) # -------------------------- # 主流程 # -------------------------- def main(): engine = init_db_engine() try: with time_monitor("数据加载"): site_orders = load_site_orders(engine) order_numbers = site_orders['工单编号'].unique().tolist() order_dfs = batch_load_order_data(engine, order_numbers) with time_monitor("数据合并与处理"): merged_data = merge_order_data(site_orders, order_dfs) if merged_data.empty: logger.warning("无有效数据") return processed_data = ( merged_data .pipe(process_coordinates) .pipe(calculate_distances) .pipe(merge_cran_data, cran_path=FILE_PATHS["cran_info"]) .pipe(generate_time_features) .pipe(calculate_workload) ) output_dfs = {'原始数据': processed_data} with time_monitor("统计计算"): for dim in DIMENSIONS: for time_level in ['daily', 'weekly', 'monthly']: stats_df = generate_stats( processed_data, dimension=dim, time_level=time_level ) level_name = { 'daily': '日统计', 'weekly': '周统计', 'monthly': '月统计' }[time_level] sheet_name = f"{level_name}_{dim['name']}" output_dfs[sheet_name] = stats_df with time_monitor("处理代维组织数据"): member_list_df = pd.read_excel(member_list_path, usecols=member_list_columns, engine='calamine') pattern = r'(?=.*无线2023)(?=.*维护组)(?!.*OLT)' member_list_df = member_list_df[member_list_df['所属组织'].str.contains(pattern, regex=True, na=False)] member_list_df['代维简称'] = member_list_df['所属组织'].str.split("-").str[0] member_list_df['代维简称'] = member_list_df['代维简称'].replace('中移铁通', '铁通') member_list_df = member_list_df.rename(columns={'所属地市': '地市'}) member_list_df['地市代维'] = member_list_df['地市'].str.split("-").str[0] + '-' + member_list_df['代维简称'] member_list_df = member_list_df.drop_duplicates(subset=['登陆账号']) member_list_df['时间戳'] = source_time member_list_df_filter = member_list_df[member_list_df['在职状态'] == '在职'] filtered_data = processed_data[processed_data['所属组织'].isin(member_list_df_filter['所属组织'])] output_dfs['原始数据_副本'] = filtered_data.copy() stats_group, stats_company, stats_city, stats_area = enhance_maintainer_stats( filtered_data, member_list_df_filter ) output_dfs.update({ '代维组维度': stats_group, '代维公司维度': stats_company, '地市维度': stats_city, '地市代维维度': stats_area }) with time_monitor("生成Top50统计"): # 从原始数据_副本中提取Top50 top50_travel = filtered_data.nlargest(50, '路途时长(分钟)') # 选择需要的列 top50_travel = top50_travel[['工单编号', '执行人', '所属组织', '地市', '代维简称', '路途时长(分钟)']] output_dfs['路途时长Top50'] = top50_travel # 在指定的结果表第一列添加时间戳 target_sheets = ['代维组维度', '代维公司维度', '地市维度', '地市代维维度', '路途时长Top50'] for sheet in target_sheets: df = output_dfs[sheet] df.insert(0, '时间戳', source_time) with time_monitor("保存结果"): save_to_excel(output_dfs, FILE_PATHS["output"]) logger.info(f"结果已保存至:{FILE_PATHS['output']}") except Exception as e: logger.error(f"处理失败:{str(e)}", exc_info=True) raise if __name__ == "__main__": main()

最新推荐

recommend-type

Comsol声子晶体能带计算:六角与三角晶格原胞选取及布里渊区高对称点选择 - 声子晶体 v1.0

内容概要:本文详细探讨了利用Comsol进行声子晶体能带计算过程中,六角晶格和三角晶格原胞选取的不同方法及其对简约布里渊区高对称点选择的影响。文中不仅介绍了两种晶格类型的基矢量定义方式,还强调了正确设置周期性边界条件(特别是相位补偿)的重要性,以避免计算误差如鬼带现象。同时,提供了具体的MATLAB代码片段用于演示关键步骤,并分享了一些实践经验,例如如何通过观察能带图中的狄拉克锥特征来验证路径设置的准确性。 适合人群:从事材料科学、物理学研究的专业人士,尤其是那些正在使用或计划使用Comsol软件进行声子晶体模拟的研究人员。 使用场景及目标:帮助研究人员更好地理解和掌握在Comsol环境中针对不同类型晶格进行精确的声子晶体能带计算的方法和技术要点,从而提高仿真精度并减少常见错误的发生。 其他说明:文章中提到的实际案例展示了因晶格类型混淆而导致的问题,提醒使用者注意细节差异,确保模型构建无误。此外,文中提供的代码片段可以直接应用于相关项目中作为参考模板。
recommend-type

springboot213大学生心理健康管理系统的设计与实现.zip

springboot213大学生心理健康管理系统的设计与实现
recommend-type

三轴自动锁螺丝机PLC配方编程:吸钉式锁螺丝智能调整与注释详解 变址寄存器 高效版

一种基于三菱FX系列PLC的三轴自动锁螺丝机的配方编程方法。该系统采用吸钉式锁螺丝方式,通过PLC进行智能管理和调整。主要内容包括:利用D寄存器阵列和变址寄存器Z来存储和管理不同配方的数据,如坐标和螺丝数量;通过触摸屏和示教器简化调试流程,使工人能够快速设置和保存参数;并通过RS指令将数据保存到触摸屏内置存储中。此外,还展示了具体的PLC程序片段,解释了如何通过简单的寄存器操作实现复杂的配方管理和自动化操作。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是熟悉PLC编程和机械设备调试的专业人士。 使用场景及目标:适用于需要提高生产效率和简化调试流程的制造业企业。主要目标是帮助技术人员掌握如何使用PLC进行配方管理,优化自动锁螺丝机的操作流程,减少人工干预,提升设备的智能化水平。 其他说明:文中提供的具体PLC程序代码和详细的注释有助于读者更好地理解和应用相关技术。同时,通过实例演示了如何利用PLC寄存器寻址特性和变址寄存器简化程序逻辑,为类似项目提供有价值的参考。
recommend-type

基于QT与STM32的Modbus-TCP四遥功能实现及源码解析

基于Qt开发的Modbus-TCP远程控制系统,用于实现四遥(遥测、遥控、遥信、遥调)功能。系统由上位机和下位机组成,上位机使用Qt进行图形界面开发,下位机采用STM32和W5500以太网模块,所有Modbus功能均自行实现,未使用第三方库。文中具体展示了各个功能的实现细节,包括ADC数据采集、LED控制、按键状态读取以及参数调节等功能的具体代码实现。 适合人群:具有一定嵌入式开发经验的研发人员,尤其是熟悉Qt和STM32的开发者。 使用场景及目标:适用于工业自动化、智能家居等领域,旨在帮助开发者理解和实现基于Modbus-TCP协议的远程控制系统,掌握四遥功能的具体实现方法。 其他说明:文中提供了详细的代码片段和技术难点解析,有助于读者深入理解系统的实现过程。同时,针对常见的开发问题给出了具体的解决方案,如浮点数转换、字节序处理等。
recommend-type

ERP系统客户与供应商信息视图创建:Oracle数据库中客户和供应商数据整合查询设计

内容概要:本文档定义了一个名为 `xxx_SCustSuplier_info` 的视图,用于整合和展示客户(Customer)和供应商(Supplier)的相关信息。视图通过连接多个表来获取组织单位、客户账户、站点使用、位置、财务代码组合等数据。对于客户部分,视图选择了与账单相关的记录,并提取了账单客户ID、账单站点ID、客户名称、账户名称、站点代码、状态、付款条款等信息;对于供应商部分,视图选择了有效的供应商及其站点信息,包括供应商ID、供应商名称、供应商编号、状态、付款条款、财务代码组合等。视图还通过外连接确保即使某些字段为空也能显示相关信息。 适合人群:熟悉Oracle ERP系统,尤其是应付账款(AP)和应收账款(AR)模块的数据库管理员或开发人员;需要查询和管理客户及供应商信息的业务分析师。 使用场景及目标:① 数据库管理员可以通过此视图快速查询客户和供应商的基本信息,包括账单信息、财务代码组合等;② 开发人员可以利用此视图进行报表开发或数据迁移;③ 业务分析师可以使用此视图进行数据分析,如信用评估、付款周期分析等。 阅读建议:由于该视图涉及多个表的复杂连接,建议读者先熟悉各个表的结构和关系,特别是 `hz_parties`、`hz_cust_accounts`、`ap_suppliers` 等核心表。此外,注意视图中使用的外连接(如 `gl_code_combinations_kfv` 表的连接),这可能会影响查询结果的完整性。
recommend-type

Web前端开发:CSS与HTML设计模式深入解析

《Pro CSS and HTML Design Patterns》是一本专注于Web前端设计模式的书籍,特别针对CSS(层叠样式表)和HTML(超文本标记语言)的高级应用进行了深入探讨。这本书籍属于Pro系列,旨在为专业Web开发人员提供实用的设计模式和实践指南,帮助他们构建高效、美观且可维护的网站和应用程序。 在介绍这本书的知识点之前,我们首先需要了解CSS和HTML的基础知识,以及它们在Web开发中的重要性。 HTML是用于创建网页和Web应用程序的标准标记语言。它允许开发者通过一系列的标签来定义网页的结构和内容,如段落、标题、链接、图片等。HTML5作为最新版本,不仅增强了网页的表现力,还引入了更多新的特性,例如视频和音频的内置支持、绘图API、离线存储等。 CSS是用于描述HTML文档的表现(即布局、颜色、字体等样式)的样式表语言。它能够让开发者将内容的表现从结构中分离出来,使得网页设计更加模块化和易于维护。随着Web技术的发展,CSS也经历了多个版本的更新,引入了如Flexbox、Grid布局、过渡、动画以及Sass和Less等预处理器技术。 现在让我们来详细探讨《Pro CSS and HTML Design Patterns》中可能包含的知识点: 1. CSS基础和选择器: 书中可能会涵盖CSS基本概念,如盒模型、边距、填充、边框、背景和定位等。同时还会介绍CSS选择器的高级用法,例如属性选择器、伪类选择器、伪元素选择器以及选择器的组合使用。 2. CSS布局技术: 布局是网页设计中的核心部分。本书可能会详细讲解各种CSS布局技术,包括传统的浮动(Floats)布局、定位(Positioning)布局,以及最新的布局模式如Flexbox和CSS Grid。此外,也会介绍响应式设计的媒体查询、视口(Viewport)单位等。 3. 高级CSS技巧: 这些技巧可能包括动画和过渡效果,以及如何优化性能和兼容性。例如,CSS3动画、关键帧动画、转换(Transforms)、滤镜(Filters)和混合模式(Blend Modes)。 4. HTML5特性: 书中可能会深入探讨HTML5的新标签和语义化元素,如`<article>`、`<section>`、`<nav>`等,以及如何使用它们来构建更加标准化和语义化的页面结构。还会涉及到Web表单的新特性,比如表单验证、新的输入类型等。 5. 可访问性(Accessibility): Web可访问性越来越受到重视。本书可能会介绍如何通过HTML和CSS来提升网站的无障碍访问性,比如使用ARIA标签(Accessible Rich Internet Applications)来增强屏幕阅读器的使用体验。 6. 前端性能优化: 性能优化是任何Web项目成功的关键。本书可能会涵盖如何通过优化CSS和HTML来提升网站的加载速度和运行效率。内容可能包括代码压缩、合并、避免重绘和回流、使用Web字体的最佳实践等。 7. JavaScript与CSS/HTML的交互: 在现代Web开发中,JavaScript与CSS及HTML的交云并用是不可或缺的。书中可能会讲解如何通过JavaScript动态地修改样式、操作DOM元素以及使用事件监听和响应用户交互。 8. Web框架和预处理器: 这本书可能会提到流行的Web开发框架和预处理器,比如Bootstrap、Foundation、Sass和Less等,它们是如何简化和加速开发流程的。 9. 测试和维护: 书中也可能包含关于如何测试网页以及如何持续优化和维护CSS和HTML代码的章节。例如,使用断言测试、自动化测试、性能分析工具等。 最后,鉴于文件名称列表中的“压缩包子文件”的表述,这可能是对“压缩包”文件的一种误译或误用,此处“压缩包”应该指的是包含该书籍PDF文件的压缩文件格式,如ZIP或RAR。而“Pro CSS and HTML Design Patterns.pdf”指的就是该书籍的PDF格式电子版文件。 以上所述,构成了《Pro CSS and HTML Design Patterns》一书可能包含的核心知识点。通过学习这些内容,Web前端开发者可以掌握更为高效和优雅的设计模式,从而在日常工作中更高效地解决实际问题。
recommend-type

Zotero 7数据同步:Attanger插件安装&设置,打造文献管理利器

/i.s3.glbimg.com/v1/AUTH_08fbf48bc0524877943fe86e43087e7a/internal_photos/bs/2018/L/j/ipKA8TRPyt4ptaG1DGww/t-m-01.png) # 1. Zotero 7数据同步简介 在数字时代,管理研究数据变得日益重要,而Zot
recommend-type

卷积神经网络的基础理论200字

<think>我们正在回答用户关于卷积神经网络(CNN)基础理论的询问。根据提供的引用资料,我们可以概述CNN的基本结构和工作原理。引用[1]提到CNN是一种前馈神经网络,信息从输入单向流动到输出。引用[2]指出CNN在图像处理等领域应用广泛,通过层次结构和参数调整实现模式识别。引用[3]说明CNN包含卷积层,使用卷积核提取局部特征,减少参数量,提高效率。引用[4]则提到了训练过程,包括前向传播、误差计算和反向传播(梯度下降)。因此,我们将从以下几个方面概述CNN:1.CNN的基本结构(卷积层、池化层、全连接层)2.卷积操作的工作原理(局部感受野、权重共享)3.训练过程(前向传播、损失函数、反
recommend-type

轻便实用的Java库类查询工具介绍

标题 "java2库类查询" 和描述表明,所提及的工具是一个专门用于查询Java库类的应用程序。此软件旨在帮助开发者快速地查找和引用Java的标准开发工具包(SDK)中包含的所有应用程序编程接口(API)类。通过这样的工具,开发者可以节省大量在官方文档或搜索引擎上寻找类定义和使用方法的时间。它被描述为轻巧且方便,这表明其占用的系统资源相对较少,同时提供直观的用户界面,使得查询过程简洁高效。 从描述中可以得出几个关键知识点: 1. Java SDK:Java的软件开发工具包(SDK)是Java平台的一部分,提供了一套用于开发Java应用软件的软件包和库。这些软件包通常被称为API,为开发者提供了编程界面,使他们能够使用Java语言编写各种类型的应用程序。 2. 库类查询:这个功能对于开发者来说非常关键,因为它提供了一个快速查找特定库类及其相关方法、属性和使用示例的途径。良好的库类查询工具可以帮助开发者提高工作效率,减少因查找文档而中断编程思路的时间。 3. 轻巧性:软件的轻巧性通常意味着它对计算机资源的要求较低。这样的特性对于资源受限的系统尤为重要,比如老旧的计算机、嵌入式设备或是当开发者希望最小化其开发环境占用空间时。 4. 方便性:软件的方便性通常关联于其用户界面设计,一个直观、易用的界面可以让用户快速上手,并减少在使用过程中遇到的障碍。 5. 包含所有API:一个优秀的Java库类查询软件应当能够覆盖Java所有标准API,这包括Java.lang、Java.util、Java.io等核心包,以及Java SE平台的所有其他标准扩展包。 从标签 "java 库 查询 类" 可知,这个软件紧密关联于Java编程语言的核心功能——库类的管理和查询。这些标签可以关联到以下知识点: - Java:一种广泛用于企业级应用、移动应用(如Android应用)、网站后端、大型系统和许多其他平台的编程语言。 - 库:在Java中,库是一组预打包的类和接口,它们可以被应用程序重复使用。Java提供了庞大的标准库,以支持各种常见的任务和功能。 - 查询:查询指的是利用软件工具搜索、定位和检索信息的过程。对于Java库类查询工具来说,这意味着可以通过类名、方法签名或其他标识符来查找特定的API条目。 最后,压缩包文件列表包含了两个文件:“java.dit”和“Java.exe”。其中“Java.exe”很可能是程序的可执行文件,而“java.dit”可能是一个数据文件,用于存储Java类的索引或数据。由于文件名后缀通常与文件类型相关联,但“dit”并不是一个常见的文件扩展名。这可能是一个特定于软件的自定义格式,或是一个打字错误。 总结来说,"java2库类查询" 是一个针对Java开发者的实用工具,它提供了一个轻量级、易用的平台来查询和定位Java标准库中的所有类和API。此工具对优化开发流程,减少查找Java类文档的时间大有裨益,尤其适合需要频繁查阅Java API的开发者使用。
recommend-type

【Zotero 7终极指南】:新手必备!Attanger插件全攻略与数据同步神技

# 1. Zotero 7与Attanger插件的介绍 在当今的学术研究和知识管理领域,高效的文献管理工具至关重要。Zotero 7作为一个流行的参考文献管理软件,因其强大的功能和用户友好的界面而受到专业人士的青睐。而Attanger插件则为Zotero 7带来了更多定制化和高级功能,极大地增强