import time class Node: def __init__(self, name, need_time, privilege, state): self.name = name self.need_time = need_time self.privilege = privilege self.state = state class LNode: def __init__(self, data=None, link=None): self.data = data self.link = link def delay(i): while i > 0: x = 0 while x < 10000: y = 0 while y < 40000: y += 1 x += 1 i -= 1 def len_queue(hpt, x): q = hpt r = hpt p = LNode(x) if hpt is None: hpt = p else: while q is not None and p.data.privilege < q.data.privilege: r = q q = q.link if q is None: r.link = p elif r == q: p.link = hpt hpt = p else: r.link = p p.link = q return hpt def lde_queue(hpt, cp): p = hpt cp[0] = hpt.data hpt = hpt.link del p print(f"the elected process's name: {cp[0].name}\n") return hpt def output(hpt): p = hpt print("Name\tNeed_time\tPrivilege\tState") while p is not None: print(f"{p.data.name}\t{p.data.need_time}\t\t{p.data.privilege}\t\t{p.data.state}") p = p.link delay(4) def main(): hpt = None print("The period time is 4s") print("Please input:") print("If need_time = 0, input over") print("Name\tNeed_time\tPrivilege") while True: name, need_time, privilege = input().split() need_time = int(need_time) privilege = int(privilege) if need_time == 0: break state = 'R' temp = Node(name, need_time, privilege, state) hpt = len_queue(hpt, temp) cp = [None] while hpt is not None: output(hpt) hpt = lde_queue(hpt, cp) cp[0].need_time -= 1 cp[0].privilege -= 1 if cp[0].need_time != 0: hpt = len_queue(hpt, cp[0]) if __name__ == "__main__": main()
时间: 2024-02-14 22:18:57 浏览: 100
这段代码是一个简单的操作系统进程调度模拟程序,采用了优先级抢占式调度算法。程序首先读入进程的名称、需要运行时间和优先级,然后将进程按照优先级加入到一个链式队列中。程序每4秒钟输出一次当前队列中的进程信息,并选择队首进程运行,同时将该进程的优先级和需要运行时间分别减1,如果该进程的需要运行时间不为0,则重新将其加入队列中。程序的运行会一直持续,直到所有的进程都运行完毕。
相关问题
from user import User class Admin(User): def __init__(self, username, password, email, privilege): super().__init__(username, password, email) self.privilege = privilege class Admin(User): def __init__(self, username, password, email, privilege): super().__init__(username, password, email) self.privilege = privilege def login(self): print("Admin login success")
这段代码存在重复定义类Admin的问题。应该把第一个定义类Admin的代码删除,修改为如下代码:
```python
from user import User
class Admin(User):
def __init__(self, username, password, email, privilege):
super().__init__(username, password, email)
self.privilege = privilege
def login(self):
print("Admin login success")
```
这样就可以定义一个继承自User类的Admin类,并添加新的属性privilege和重写login方法了。
``` import paramiko, re, time from datetime import datetime from config02 import userlist from threading import Thread import os # 新增导入 class SW(): def __init__(self): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh.connect(hostname="192.168.100.1", port=22, username="python", password="Huawei@123", allow_agent=False, look_for_keys=False) self.vty = self.ssh.invoke_shell() print("ssh 连接成功") self.send_com("screen-length 0 temporary") def send_com(self, com): self.vty.send(com + "\n") time.sleep(2) return self.vty.recv(65535).decode("utf-8") def jhjzt(self): while True: p_list = [] for i in userlist.keys(): pat = self.send_com(userlist[i]["com"]) res = re.compile(userlist[i]["re"]).findall(pat) if i == "fan": res = res if res else "all fan are faulty" p_list.append(f"{i}:{res}") # 创建日期目录逻辑 current_date = datetime.now().strftime("%Y-%m-%d") # 生成日期字符串 target_dir = os.path.join(r"E:\PYTHON", current_date) # 拼接目标路径 os.makedirs(target_dir, exist_ok=True) # 自动创建目录(存在则不创建) file_path = os.path.join(target_dir, "device_status.txt") # 新文件路径 with open(file_path, 'w') as f: f.write(pat) print("\n".join(p_list)) time.sleep(60 * 5) def xiazai(self): while True: current_date = datetime.now().strftime("%Y-%m-%d") target_dir = os.path.join(r"E:\PYTHON", current_date) os.makedirs(target_dir, exist_ok=True) file_name = f"{datetime.now().strftime('%H-%M-%S')}_X_T1_AGG1.zip" # 按时间命名文件 file_path = os.path.join(target_dir, file_name) with paramiko.Transport(("192.168.100.1", 22)) as t: t.connect(username="python", password="Huawei@123") sftp = paramiko.SFTPClient.from_transport(t) sftp.get("/vrpcfg.zip", file_path) # 保存到日期目录 print("下载成功") time.sleep(60 * 60 * 24) def binfa(self): Thread(target=self.jhjzt).start() Thread(target=self.xiazai).start() if __name__ == '__main__': SW().binfa()```Exception in thread Thread-4: Traceback (most recent call last): File "E:\Users\zzChen\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner self.run() File "E:\Users\zzChen\AppData\Local\Programs\Python\Python38\lib\threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "E:/PYTHON/tets011/test03.py", line 63, in xiazai sftp.get("/vrpcfg.zip", file_path) # 保存到日期目录 File "E:\PYTHON\tets011\venv\lib\site-packages\paramiko\sftp_client.py", line 840, in get size = self.getfo( File "E:\PYTHON\tets011\venv\lib\site-packages\paramiko\sftp_client.py", line 795, in getfo file_size = self.stat(remotepath).st_size File "E:\PYTHON\tets011\venv\lib\site-packages\paramiko\sftp_client.py", line 493, in stat t, msg = self._request(CMD_STAT, path) File "E:\PYTHON\tets011\venv\lib\site-packages\paramiko\sftp_client.py", line 857, in _request return self._read_response(num) File "E:\PYTHON\tets011\venv\lib\site-packages\paramiko\sftp_client.py", line 909, in _read_response self._convert_status(msg) File "E:\PYTHON\tets011\venv\lib\site-packages\paramiko\sftp_client.py", line 938, in _convert_status raise IOError(errno.ENOENT, text) FileNotFoundError: [Errno 2] No such file Process finished with exit code -1
<think>好的,我现在需要帮助用户解决他们遇到的Python脚本错误。用户提供的代码在运行时报了一个FileNotFoundError,具体是在使用Paramiko的SFTPClient下载文件时找不到远程文件"/vrpcfg.zip"。错误发生在xiazai方法的sftp.get调用处。我需要仔细分析可能的原因,并给出解决方案。
首先,我要理解代码的结构。SW类初始化时通过SSH连接到交换机,然后启动两个线程:jhjzt和xiazai。jhjzt负责定期收集设备状态并保存到文件,而xiazai负责每天下载一次文件。问题出在xiazai方法的sftp.get这一行,提示远程文件不存在。
可能的原因有几个:
1. **远程文件路径错误**:用户可能错误地指定了远程文件路径。例如,正确的路径可能不是根目录下的vrpcfg.zip,而是其他位置,比如某个子目录中,或者文件名不同。
2. **权限问题**:虽然用户使用SSH连接成功,但可能SFTP用户没有权限访问该文件。需要确认用户"python"是否有权限读取该文件。
3. **文件不存在**:目标设备上可能没有vrpcfg.zip这个文件,或者文件名不一致。比如,可能是vrpcfg.cfg或者其他命名方式。
4. **连接问题**:虽然SSH连接成功,但SFTP传输时可能出现问题,例如连接超时或中断,但根据错误信息,更可能是路径问题。
接下来,我需要逐一验证这些可能性。首先检查远程路径是否正确。用户代码中使用的是"/vrpcfg.zip",这在Linux系统中是根目录下的文件。但实际设备(如华为交换机)可能将配置文件存放在其他路径,比如"/vrpcfg.cfg"或者"/config/vrpcfg.zip"。需要确认设备上的实际路径。
其次,权限问题。用户使用的账号是否有权限通过SFTP访问该文件?有些设备可能限制SFTP访问,或者需要特定用户权限才能下载配置文件。
另外,用户是否在正确的时间触发下载?例如,设备可能只在特定时间生成该文件,比如配置保存后才会存在。但根据用户代码中的间隔是24小时,可能假设文件一直存在。
解决方案方面,建议用户首先手动登录设备,确认文件路径是否存在。例如,通过SSH执行ls命令查看文件是否存在。如果路径错误,修正代码中的远程路径即可。如果权限不足,需要调整设备上的用户权限。此外,可以添加异常处理,比如捕获FileNotFoundError并给出更清晰的提示,或者检查文件是否存在再下载。
另外,用户代码中的xiazai方法使用了一个独立的Transport,而不是复用已有的SSH连接。虽然Paramiko的SSHClient和SFTPClient可以共享,但用户这里可能没问题,不过建议检查是否需要使用同一个Transport或SSHClient来维持连接。
此外,用户可能在测试时没有生成该文件,导致下载失败。需要确认设备上是否确实存在该文件,或者是否需要先执行保存配置的命令,例如在SSH连接后发送save命令生成配置文件。
总结,可能的解决步骤是:
1. 确认远程文件路径是否正确。
2. 检查用户权限是否足够。
3. 添加异常处理和日志,帮助调试。
4. 确保文件在设备上存在,必要时先执行保存配置的命令。
最后,用户可能需要修改sftp.get的远程路径,或者在下载前执行命令生成文件,并处理可能的异常情况。</think>让我们逐步分析并解决代码中出现的`FileNotFoundError`问题:
### 错误定位
错误发生在`xiazai`方法的`sftp.get("/vrpcfg.zip", file_path)`语句,报错信息明确提示**远程服务器上不存在`/vrpcfg.zip`文件**。以下是关键分析步骤:
---
### 第一步:验证远程文件路径
华为/华三设备配置文件默认路径通常为:
$$ \text{/vrpcfg.zip} \quad \text{或} \quad \text{/config/vrpcfg.zip} $$
**需通过手动验证**:
```python
# 在现有代码的SSH连接后添加验证命令
def __init__(self):
...
print(self.send_com("dir /vrpcfg.zip")) # 检查文件是否存在
```
---
### 第二步:配置文件生成机制
* **文件可能未自动生成**,需手动触发保存:
```python
# 在下载前添加保存配置命令
def xiazai(self):
while True:
self.send_com("save") # 发送保存配置命令
time.sleep(5) # 等待保存完成
...
```
---
### 第三步:增强异常处理
添加更健壮的异常捕获机制:
```python
def xiazai(self):
while True:
try:
# 原有下载代码
except IOError as e:
print(f"文件不存在或权限不足: {e}")
except Exception as e:
print(f"未知错误: {e}")
time.sleep(60 * 60 * 24)
```
---
### 第四步:验证SFTP权限
华为设备需开启SFTP服务并配置权限:
```
[设备配置]
sftp server enable
aaa
local-user python service-type ssh
local-user python privilege level 3
```
---
### 完整修复方案
```python
def xiazai(self):
while True:
try:
# 1. 保存配置
self.send_com("save")
time.sleep(5)
# 2. 验证文件存在
output = self.send_com("dir /vrpcfg.zip")
if "File not found" in output:
raise FileNotFoundError("远程设备未生成配置文件")
# 3. 执行下载
with paramiko.Transport(("192.168.100.1", 22)) as t:
t.connect(username="python", password="Huawei@123")
sftp = paramiko.SFTPClient.from_transport(t)
sftp.get("/vrpcfg.zip", file_path)
except FileNotFoundError as e:
print(f"配置错误: {e}")
except paramiko.SSHException as e:
print(f"连接故障: {e}")
except Exception as e:
print(f"意外错误: {e}")
finally:
time.sleep(60 * 60 * 24)
```
---
### 补充建议
1. **路径验证**:通过`send_com("dir")`列出设备根目录文件
2. **日志记录**:将错误信息写入日志文件便于后续分析
3. **配置文件**:将`/vrpcfg.zip`路径改为可配置项
$$ \text{核心问题} \Rightarrow \text{远程文件路径准确性 + 配置文件生成机制} $$
阅读全文
相关推荐

















