``` # 读取组信息(新增代码) df_group = pd.read_csv('项目-组.csv') df_group = df_group.drop_duplicates(subset='PMNameMsg', keep='first') # 合并数据(新增代码) merged_data = pd.merge( merged_data, df_group[['PMNameMsg', '组名', '组长']], on='PMNameMsg', how='left' ) # 处理工期列中的"协商"文本(替换为0) merged_data['工期'] = pd.to_numeric(merged_data['工期'].replace('协商', -999), errors='coerce').fillna(-999) # # 处理空值 # merged_data['GongQiHuanBan'] = merged_data['GongQiHuanBan'].fillna(0) # 新增日期列转换 merged_data['HuanBanDate'] = pd.to_datetime(merged_data['HuanBanDate'], errors='coerce') merged_data['HuanBanReDate'] = pd.to_datetime(merged_data['HuanBanReDate'], errors='coerce') # 日期格式转换 merged_data['SADate'] = pd.to_datetime(merged_data['SADate']) merged_data['UpdateTime'] = pd.to_datetime(merged_data['UpdateTime']) holidays = [ # 元旦(不调休) '2025-01-01', # 周三 # 春节(调休2天) '2025-01-28', '2025-01-29', '2025-01-30', '2025-01-31', '2025-02-01', '2025-02-02', '2025-02-03', '2025-02-04', # 1.28(除夕)-2.4 # 清明节(不调休) '2025-04-04', '2025-04-05', '2025-04-06', # 周五-周日 # 劳动节(调休1天) '2025-05-01', '2025-05-02', '2025-05-03', '2025-05-04', '2025-05-05', # 周四-周一 # 端午节(不调休) '2025-05-31', '2025-06-01', '2025-06-02', # 周六-周一 # 中秋节+国庆节(调休2天) '2025-10-01', '2025-10-02', '2025-10-03', '2025-10-04', '2025-10-05', '2025-10-06', '2025-10-07', '2025-10-08' # 周三-下周三 ] holiday_dates = pd.to_datetime(holidays) # 新增调休工作日列表(转换为日期格式) workdays_adjustment = [ '2025-01-26', # 周日补春节 '2025-02-08', # 周六补春节 '2025-04-27', # 周日补劳动节 '2025-09-28', # 周日补国庆 '2025-10-11' # 周六补国庆 ] adjustment_dates = pd.to_datetime(workdays_adjustment) # 转换日期格式 def calculate_workdays(start_date, end_date): # 处理未结案情况 end_date = end_date if pd.notnull(end_date) else pd.Timestamp.now() # 生成日期序列 date_range = pd.date_range(start=start_date.normalize(), end=end_date.normalize(), freq='D') # 修改过滤条件(增加调休判断) workdays = [date for date in date_range if ( (date.weekday() < 5) or # 常规工作日 (date in adjustment_dates) # 调休补班日 ) and (date not in holiday_dates) # 排除法定假日 ] return len(workdays) def compute_huanban_days(row): start = row['HuanBanDate'] if pd.isnull(start): return 0 end = row['HuanBanReDate'] if pd.notnull(row['HuanBanReDate']) else pd.Timestamp.now() if start > end: return 0 return calculate_workdays(start, end) merged_data['GongQiHuanBan'] = merged_data.apply(compute_huanban_days, axis=1) # 计算总工期 merged_data['总工期'] = merged_data['工期'] + merged_data['GongQiHuanBan'] # 计算实际工作日 merged_data['实际工作日'] = merged_data.apply( lambda x: calculate_workdays(x['SADate'], x['UpdateTime']), axis=1 ) # 判断超时 merged_data['是否超时'] = '否' merged_data['超时天数'] = 0 mask = merged_data['实际工作日'] > merged_data['总工期'] merged_data.loc[mask, '是否超时'] = '是' merged_data.loc[mask, '超时天数'] = merged_data['实际工作日'] - merged_data['总工期']```StatusName为案件状态,当状态=办结或者=发件时,案件为已完成;等于其他情况,案件为正在办件。组名用来分组。是否超时列,存储了是和否。以程序查询时间开始,往前获取5个工作日,统计SADate在这个时间范围内的案件个数,为本周案件总数以组统计;并根据案件状态,分别统计本周已完成和正在处理的案件数量;根据是否超时,为是的,分组本周统计各组超时案件数量
时间: 2025-07-08 09:22:29 浏览: 18
<think>嗯,用户给了一段Python代码,主要是处理数据框,然后需要根据某些条件统计案件数量。首先,我得先理解用户的需求。他们想要根据StatusName来判断案件是否完成,然后按组名分组,统计本周的案件情况。具体来说,需要统计本周总案件数、已完成和正在处理的案件数,以及各组的超时案件数。
首先,我需要理清楚用户的时间范围。用户提到“以程序查询时间开始,往前获取5个工作日”,也就是最近五个工作日。这里需要注意的是,工作日可能排除节假日和周末,但用户之前的代码里已经有了处理工作日的函数calculate_workdays,可能需要用同样的逻辑来确定时间范围。
不过,用户可能希望直接获取当前时间,然后往前推五个工作日,而不是自然日。比如,如果今天是周三,那么五个工作日就是上周的周四到本周的周三,但如果有节假日的话会更复杂。不过用户可能希望简化处理,直接用pandas的日期功能来找五个工作日前的日期。
接下来,用户需要筛选出SADate在这个时间范围内的案件。也就是说,案件的SADate应该大于等于五个工作日前的起始日期,并且小于等于当前日期。这一步可能需要先确定起始日期。
然后,根据StatusName的值,案件分为已完成(办结或发件)和正在处理(其他状态)。同时,根据是否超时列为“是”来统计超时案件的数量,按组统计。
现在,我需要考虑如何实现这些步骤。首先,如何计算五个工作日前的时间?用户的代码中有一个calculate_workdays函数,但该函数是计算两个日期之间的工作日数。如果要反过来,找到从今天往前五个工作日的起始日期,可能需要另一个方法。或者,用户可能希望使用自然日,但考虑到工作日,可能需要逐个检查日期,直到累积五个工作日为止。不过这可能比较复杂。
或者,用户可能希望使用最近的五个自然日,但排除非工作日。不过根据用户之前的代码,他们可能有holiday_dates和adjustment_dates来处理节假日和调休,所以在计算五个工作日前的日期时需要考虑这些因素。
不过,可能用户希望简化处理,直接使用pandas的bdate_range或者自定义函数来找五个工作日前的日期。例如,从当前时间开始,往前数五天,跳过非工作日。这里可能需要一个循环,逐个日期检查是否为工作日,直到累积五个工作日。
假设当前时间是pd.Timestamp.now(),也就是代码中的end_date默认值。那么,需要找到起始日期start_date,使得从start_date到当前日期的工作日数为5天。或者,可能需要找到五个工作日前的日期,即起始日期是第五个工作日前的日期。
比如,假设今天是2023-10-12(星期四),五个工作日前的起始日期可能是2023-10-05(星期四),如果中间没有节假日的话。但如果中间有节假日,比如国庆节,那就要调整。
不过用户的holidays和workdays_adjustment是针对2025年的,可能用户的数据是2025年的,所以需要根据这些节假日来计算。
这可能会比较复杂。或许用户希望直接使用自然日,然后使用calculate_workdays函数来计算每个案件的SADate是否在最近五个工作日内。不过这可能不太容易。
或者,用户需要的是从当前时间往前推五个工作日,得到一个起始日期,然后筛选出SADate在该起始日期到当前日期之间的案件。例如,假设今天是2025-10-10,五个工作日前的起始日期可能是2025-10-03(假设中间没有节假日),那么SADate在这之间的案件都属于本周案件。
但是,如果中间有节假日或者调休,就需要调整。比如,如果中间有国庆长假,那么五个工作日前可能需要追溯到更早的日期。
因此,正确的做法可能是生成从当前日期往前推的日期列表,直到累积五个工作日为止。例如,从当前日期开始,逐个往前检查每个日期是否是工作日(根据用户的定义),如果是,则计数,直到达到五个。
例如,定义一个函数来找到五个工作日前的日期:
def get_five_workdays_ago():
current_date = pd.Timestamp.now().normalize()
count = 0
days_ago = 0
while count < 5:
date_to_check = current_date - pd.Timedelta(days=days_ago)
# 检查是否为工作日
if (date_to_check.weekday() <5 or date_to_check in adjustment_dates) and (date_to_check not in holiday_dates):
count +=1
days_ago +=1
# 返回第五个工作日的日期
return current_date - pd.Timedelta(days=days_ago -1)
不过这可能存在效率问题,但考虑到只需要处理最多几周的数据,应该没问题。
得到起始日期后,筛选出SADate在起始日期到当前日期之间的案件。
然后,按组名分组,统计总数、已完成、处理中和超时数量。
接下来,处理数据:
1. 计算五个工作日的起始日期。
2. 筛选merged_data中SADate在该范围内的数据,作为本周案件。
3. 按组名分组统计总数。
4. 根据案件状态,统计已完成(StatusName为办结或发件)和处理中(其他)。
5. 统计是否超时为是的数量。
需要注意的是,用户提到的“本周”可能是指最近五个工作日,而不是自然周的周一至周五。因此需要明确时间范围是基于工作日的。
另外,代码中的holiday_dates和adjustment_dates已经被转换为pd.Timestamp,所以在比较日期时需要注意类型一致。
现在,将这些步骤转化为代码:
首先,获取当前时间:
current_time = pd.Timestamp.now()
然后,找到五个工作日前的起始日期:
start_date = current_time
workdays_count = 0
while workdays_count <5:
start_date -= pd.Timedelta(days=1)
if calculate_workdays(start_date, current_time) ==5:
break
不过这可能不太直接,或者需要另一种方法。
或者,逐个日期往前检查,直到找到五个工作日:
def find_start_date(current_date, holidays, workdays_adjustment, n_workdays=5):
count =0
start_date = current_date
while count <n_workdays:
start_date -= pd.Timedelta(days=1)
# 检查是否为工作日
is_workday = (start_date.weekday() <5 or start_date in adjustment_dates) and (start_date not in holiday_dates)
if is_workday:
count +=1
return start_date
不过这可能有问题,因为可能需要循环更多次。或者,另一种方法是生成从current_date往前推一定天数,然后筛选出工作日,取前五个,然后取最晚的那个。
比如:
date_range = pd.date_range(end=current_time, periods=30, freq='D') # 假设最多30天内有5个工作日
workdays = [d for d in date_range if (d.weekday() <5 or d in adjustment_dates) and (d not in holiday_dates)]
# 取最近的五个工作日
if len(workdays) >=5:
start_date = workdays[-5]
else:
# 处理不足的情况,可能返回最早的日期
start_date = workdays[0] if len(workdays) >0 else current_time - pd.Timedelta(days=5)
然后,筛选SADate在start_date到current_time之间的数据。
不过这种方法可能更可靠。
但需要将adjustment_dates和holiday_dates转换为日期列表,可能需要先转换为日期索引或者集合。
不过用户代码中已经有holiday_dates和adjustment_dates作为pd.Timestamp的列表,所以可以直接比较。
因此,生成日期范围,然后筛选出工作日,取最后五个中的第一个作为起始日期。
这可能更高效。
例如:
current_date = pd.Timestamp.now().normalize() # 当前日期,去除时间部分
dates = []
day_count =0
date_candidate = current_date
while len(dates) <5:
# 检查date_candidate是否为工作日
is_workday = (date_candidate.weekday() <5 or date_candidate in adjustment_dates) and (date_candidate not in holiday_dates)
if is_workday:
dates.append(date_candidate)
date_candidate -= pd.Timedelta(days=1)
# 这样得到的dates是最近的五个工作日,按逆序排列,dates[0]是当前日期(如果是工作日),dates[1]是前一个工作日等。
# 但可能需要重新排列顺序,取最早的日期作为起始日期
if len(dates) >=5:
start_date = dates[-5] # 第五个工作日前的日期
else:
# 处理不足五个的情况,可能取全部
start_date = dates[-1] if dates else current_date - pd.Timedelta(days=5)
然后,筛选SADate >= start_date且 <= current_date的数据:
本周案件 = merged_data[(merged_data['SADate'] >= start_date) & (merged_data['SADate'] <= current_date)]
不过可能需要注意时区问题,但假设所有日期都是无时区的。
接下来,按组统计:
grouped = 本周案件.groupby('组名')
总案件数 = grouped.size().rename('总案件数')
已完成案件数 = grouped.apply(lambda x: x[x['StatusName'].isin(['办结', '发件'])].shape[0]).rename('已完成数')
正在处理数 = grouped.apply(lambda x: x[~x['StatusName'].isin(['办结', '发件'])].shape[0]).rename('处理中数')
超时案件数 = grouped.apply(lambda x: x[x['是否超时'] == '是'].shape[0]).rename('超时数')
最后,合并这些统计结果:
result = pd.concat([总案件数, 已完成案件数, 正在处理数, 超时案件数], axis=1).fillna(0)
然后,可能需要将结果转换为所需的格式,比如重置索引,或者处理没有数据的组。
不过,用户可能需要将这些统计结果合并到一个DataFrame中,按组名显示。
此外,需要注意,如果某些组在本周没有案件,可能需要填充0,但groupby默认会忽略这些组,除非先创建所有可能的组名。
或者,可以预先获取所有组名,确保每个组都有记录。
例如:
所有组 = df_group['组名'].unique()
然后,在result中reindex,确保所有组都存在:
result = result.reindex(所有组).fillna(0)
不过这可能根据数据情况而定。
综上所述,代码的大致步骤为:
1. 计算五个工作日的起始日期。
2. 筛选出SADate在该范围内的案件。
3. 按组统计各项指标。
4. 合并结果并处理可能的缺失组。
现在,将这些步骤写成代码,并检查可能的错误点。比如,在计算起始日期时,是否准确排除了节假日和考虑了调休;在筛选SADate时是否正确处理了时间范围;分组统计时是否正确应用了条件。
此外,用户提供的代码中已经处理了merged_data的日期列,如SADate转换为datetime,所以可以直接比较日期。
可能的优化点:将计算起始日期的逻辑封装成函数,确保复用性。
另外,需要注意,当往前推的时候,可能遇到无限循环,比如遇到很多节假日,所以需要设置一个最大天数,比如最多检查365天,防止无限循环。
例如,在查找五个工作日时:
def get_start_date(current_date, holidays, adjustment_dates, n_workdays=5, max_days=30):
dates = []
date_candidate = current_date
for _ in range(max_days):
is_workday = (date_candidate.weekday() <5 or date_candidate in adjustment_dates) and (date_candidate not in holidays)
if is_workday:
dates.append(date_candidate)
if len(dates) == n_workdays:
break
date_candidate -= pd.Timedelta(days=1)
if len(dates) <n_workdays:
# 不足五个,取所有能找到的
pass
# 起始日期是第五个工作日前的日期,即dates[-1]
if len(dates) >=n_workdays:
start_date = dates[-1]
else:
# 如果没有足够的,可能返回最早的工作日或当前日期 - max_days
start_date = dates[-1] if dates else current_date - pd.Timedelta(days=max_days)
return start_date
然后,调用这个函数获取start_date。
之后,筛选数据:
本周案件 = merged_data[(merged_data['SADate'] >= start_date) & (merged_data['SADate'] <= current_date)]
需要注意的是,SADate可能包含时间部分,所以需要标准化为日期,或者比较时使用normalize(),但merged_data['SADate']已经被转换为pd.to_datetime,可能已经去除了时间部分,或者需要处理。
在代码中,用户的merged_data['SADate'] = pd.to_datetime(merged_data['SADate']),所以假设是日期类型,可能包含时间,但如果是日期,则比较时需要注意。比如,如果SADate是2025-10-10 14:30:00,而start_date是2025-10-05 00:00:00,那么是否包括当天的所有时间?此时,应该使用.dt.normalize()来比较日期部分。
所以,正确的筛选应该是:
本周案件 = merged_data[
(merged_data['SADate'].dt.normalize() >= start_date) &
(merged_data['SADate'].dt.normalize() <= current_date.normalize())
]
或者,确保start_date和current_date是日期类型,没有时间部分。
然后,进行分组统计。
最后,整合结果,可能需要将结果按组名排序,或者按照df_group中的组顺序来展示。
总结代码步骤:
1. 获取当前日期。
2. 计算五个工作日前的起始日期。
3. 筛选出SADate在该范围内的数据。
4. 按组名分组统计总案件数、已完成、处理中和超时数。
5. 处理可能的缺失组,填充0。
6. 输出结果。
现在,将这些步骤转化为代码,并确保正确性。</think>让我们分步骤实现这个统计需求:
```python
# 第一步:获取当前时间并确定时间范围
current_date = pd.Timestamp.now().normalize()
# 定义获取n个工作日前日期的函数
def get_workday_range(base_date, n_workdays, holidays, workdays_adjustment):
dates = []
date_candidate = base_date
max_days_check = 60 # 最多向前检查60天防止无限循环
while len(dates) < n_workdays and max_days_check > 0:
# 判断是否为工作日(考虑调休和节假日)
is_workday = (
(date_candidate.weekday() < 5) or
(date_candidate in workdays_adjustment)
) and (
date_candidate not in holidays
)
if is_workday:
dates.append(date_candidate)
date_candidate -= pd.Timedelta(days=1)
max_days_check -= 1
if len(dates) >= n_workdays:
return dates[-n_workdays] # 返回第n个工作日
else:
return date_candidate # 返回能找到的最早日期
# 第二步:计算5个工作日前日期
start_date = get_workday_range(
base_date=current_date,
n_workdays=5,
holidays=holiday_dates,
workdays_adjustment=adjustment_dates
)
# 第三步:筛选本周案件
mask_this_week = (
(merged_data['SADate'].dt.normalize() >= start_date) &
(merged_data['SADate'].dt.normalize() <= current_date)
)
df_this_week = merged_data[mask_this_week].copy()
# 第四步:定义状态判断列
df_this_week['状态分类'] = df_this_week['StatusName'].apply(
lambda x: '已完成' if x in ['办结', '发件'] else '处理中'
)
# 第五步:分组统计
result = df_this_week.groupby('组名').agg(
总案件数=('SADate', 'count'),
已完成=('状态分类', lambda x: (x == '已完成').sum()),
处理中=('状态分类', lambda x: (x == '处理中').sum()),
超时案件数=('是否超时', lambda x: (x == '是').sum())
).reset_index()
# 处理可能缺失的组(确保所有组都显示)
all_groups = df_group['组名'].unique()
result = result.set_index('组名').reindex(all_groups, fill_value=0).reset_index()
# 展示结果(示例)
print("各组本周工作情况统计:")
print(result.to_string(index=False))
```
执行效果示例:
```
各组本周工作情况统计:
组名 总案件数 已完成 处理中 超时案件数
A组 12 8 4 2
B组 7 5 2 1
C组 0 0 0 0 # 表示该组本周无案件
```
代码关键点说明:
1. **工作日计算逻辑**
- 使用与原始代码相同的节假日定义(`holiday_dates`)和调休定义(`adjustment_dates`)
- 向前查找时遵循相同的工作日判断规则:$\text{工作日} = (\text{周一至周五} \lor \text{调休日}) \land \lnot \text{节假日}$
2. **时间范围筛选**
- 使用矢量化的日期比较操作,比循环遍历效率更高
- `dt.normalize()`用于保证只比较日期部分
3. **统计实现技巧**
- 使用`groupby+agg`组合实现高效统计
- 通过`reindex`确保显示所有组别(即使无案件)
4. **特殊值处理**
- 处理了可能存在的空组情况(填充0值)
- 使用`copy()`避免SettingWithCopyWarning警告
> 注意:实际使用时需确保`holiday_dates`和`adjustment_dates`与业务年份一致,此处代码延用原数据中的2025年节假日定义。
阅读全文
相关推荐



















