活动介绍
file-type

Python实现Cisco交换机端口IP跟踪与MAC定位

下载需积分: 50 | 5KB | 更新于2025-08-11 | 193 浏览量 | 2 下载量 举报 收藏
download 立即下载
从给定文件信息中,我们可以提取以下知识点进行详细解释: 1. Python脚本的作用: 该Python脚本主要的功能是追踪网络中特定IP地址或者CIDR网络范围内的IP地址,直至其在思科网络设备中最终连接的交换机端口。这对于网络问题诊断、网络管理、IP地址分配追踪以及安全监控都具有重要意义。 2. 脚本的输入参数和操作模式: 脚本支持两种操作模式:交互式和参数输入。使用交互式模式时,用户可以通过命令提示符或终端界面按提示输入必要的参数,如需要扫描的网络范围、核心交换机信息、用户名等;而使用参数模式时,则直接通过命令行参数指定这些输入值。 3. 可以获取的信息: 脚本运行后,能够输出包括目标IP地址、MAC地址、边缘交换机名称、端口名称、端口描述、接口模式(访问或中继)、端口上允许的VLAN以及边缘端口上当前学习的MAC地址数量在内的信息。这些信息对于网络的日常管理和故障排查都极为重要。 4. 输出格式的选择: 该脚本默认将追踪结果输出至控制台,但提供了输出到CSV文件的选项,以便用户可以根据个人喜好或特定需求来选择输出格式,使信息的整理和进一步分析更为方便。 5. 脚本运行环境: 根据描述,该脚本仅设计为在思科的IOS和NX-OS设备上运行。这意味着该脚本使用了与这些设备兼容的命令集和协议。 6. 技术栈与标签解析: - "Python":指明了脚本是用Python语言编写的,Python作为一种高级编程语言,广泛应用于网络编程、脚本自动化等领域。 - "cisco":体现了脚本是面向思科设备的特定应用,需要利用思科设备特有的管理接口和命令集。 - "networking":指明了脚本的功能范畴,即网络管理,用于追踪和诊断网络问题。 - "trace":是脚本的具体作用,即进行IP追踪,找到网络中的特定IP地址最后定位到的交换机端口。 - "ip":脚本的核心功能之一是对IP地址的追踪。 - "netmiko":是一个Python库,用于与网络设备进行SSH连接和命令交互,该脚本可能使用了netmiko库来实现与思科设备的通信。 7. 文件名称列表中的信息: 给定的文件压缩包名称为“cisco-ip-trace-master”,这意味着包含该Python脚本的压缩包是一个主版本,用户可以通过解压该压缩包获取脚本文件及其他可能包含的文档、依赖库等资源。 总结以上知识点,可以看出这是一个专门针对思科网络设备的网络管理工具,利用Python语言编写,提供了强大的IP追踪功能,并且能够根据实际工作需要灵活地输出结果。该脚本可以显著提高网络管理人员的工作效率,同时也有助于网络的稳定性和安全性管理。

相关推荐

filetype

我有一个代码如下,大致功能通过MAC地址是登录到OA核心交换机一层一层找对应的接入交换机,代码可能不全,你先理解下,我等会儿发新需求给你import logging import re import time from netmiko import ConnectHandler # ----------------------- 日志配置 ----------------------- logging.basicConfig( filename='mac_manager.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', encoding='utf-8' ) # ----------------------- 设备配置 ----------------------- DEVICES = { 'oa_core_switch': { 'device_type': 'hp_comware', 'host': '172.17.201.1', 'username': 'user', 'password': 'UESR', 'timeout': 15 }, 'aggregation_switch': { 'device_type': 'hp_comware', 'username': 'user', 'password': 'UESR', 'timeout': 15 }, 'access_switch': { 'device_type': 'hp_comware', 'username': 'python', 'password': 'USER', 'timeout': 10 } } # ----------------------- 增强的LLDP解析 ----------------------- def parse_lldp_neighbor(output): """解析LLDP输出获取管理IP""" management_ip = None for line in output.splitlines(): if 'Management address' in line and ':' in line: ip_match = re.search(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line) if ip_match: management_ip = ip_match.group() break return management_ip # ----------------------- 设备连接管理 ----------------------- def connect_with_retry(device_info, retries=3, delay=5): """带重试机制的设备连接""" for attempt in range(retries): try: conn = ConnectHandler(**device_info) logging.info(f"成功连接 {device_info['host']}") return conn except Exception as e: logging.error(f"连接失败({attempt + 1}/{retries}): {str(e)}") time.sleep(delay) raise ConnectionError(f"无法连接 {device_info['host']}") # ----------------------- 增强的端口处理 ----------------------- def convert_port_format(physical_port): """转换端口格式 XGE -> Ten-GigabitEthernet""" return physical_port.replace("XGE", "Ten-GigabitEthernet") def handle_special_aggregation(ip, mac, vlan): """处理特殊汇聚交换机""" try: # 创建特殊设备配置(根据实际情况调整凭证) special_device = { 'device_type': 'hp_comware', 'host': ip, 'username': 'user', # 特殊设备的用户名 'password': 'abc#1234567', # 特殊设备的密码 'timeout': 20 } conn = connect_with_retry(special_device) # 查找聚合端口 agg_port, phys_ports = get_aggregation_info(conn, mac) if not agg_port: return f"{ip} 未找到聚合端口" # 遍历物理端口查找接入交换机 for port in phys_ports: access_ip = get_lldp_neighbor(conn, port) if not access_ip: continue # 配置接入交换机 result = configure_access_switch(access_ip, mac, vlan) if "成功" in result: return result return f"{ip} 未找到有效接入链路" except Exception as e: logging.error(f"特殊汇聚处理失败: {str(e)}") return f"特殊设备处理失败: {str(e)}" finally: if 'conn' in locals(): conn.disconnect() def trace_oa_connection(mac, vlan): """OA区域全链路追踪(新增函数)""" try: oa_conn = connect_with_retry(DEVICES['oa_core_switch']) # 核心逻辑与trace_fab_connection类似,需补充 agg_port, phys_ports = get_aggregation_info(oa_conn, mac) if not agg_port: return "OA核心交换机未找到聚合端口" # 处理特殊IP范围(保持原有逻辑) SPECIAL_IPS = {"172.17.201.2", "172.17.201.3", "172.17.201.4", "172.17.201.22"} for port in phys_ports: neighbor_ip = get_lldp_neighbor(oa_conn, port) if not neighbor_ip: continue if neighbor_ip in SPECIAL_IPS: return handle_special_aggregation(neighbor_ip, mac, vlan) else: return configure_access_switch(neighbor_ip, mac, vlan) return "未找到有效链路" except Exception as e: logging.error(f"OA区域处理失败: {str(e)}") return f"处理失败: {str(e)}" finally: if oa_conn: oa_conn.disconnect() # ----------------------- 通用工具函数 ----------------------- def get_aggregation_info(conn, mac): """获取聚合端口信息(核心/汇聚通用)""" try: cli_mac = f"{mac[:4]}-{mac[4:8]}-{mac[8:12]}" output = conn.send_command(f"display mac-address {cli_mac}") logging.info(f"MAC地址查询结果:\n{output}") # 查找聚合端口 agg_port = None for line in output.splitlines(): if 'BAGG' in line: parts = line.split() if len(parts) > 1: agg_port = parts[-2] # 假设聚合口在倒数第二列 break if not agg_port: logging.warning("未找到聚合端口") return None, None # 获取物理端口 output = conn.send_command(f"display link-aggregation verbose {agg_port}") logging.info(f"链路聚合信息:\n{output}") phys_ports = [] for line in output.splitlines(): if any(x in line for x in ['XGE', 'GE']): port = line.split()[0].replace('(R)', '').strip() if port not in phys_ports: phys_ports.append(port) return agg_port, phys_ports except Exception as e: logging.error(f"获取聚合信息失败: {str(e)}") return None, None def get_lldp_neighbor(conn, port): """获取LLDP邻居信息(核心/汇聚通用)""" try: converted_port = convert_port_format(port) output = conn.send_command( f"display lldp neighbor-information interface {converted_port} verbose", read_timeout=60 ) logging.info(f"LLDP邻居信息:\n{output}") return parse_lldp_neighbor(output) except Exception as e: logging.error(f"LLDP查询失败: {str(e)}") return None def handle_oamac(raw_mac, vlan): """处理OA区域有线MAC""" try: mac = re.sub(r'[^0-9a-fA-F]', '', raw_mac).lower() if len(mac) != 12: raise ValueError("无效的MAC地址格式") return trace_oa_connection(mac, vlan) except Exception as e: return f"参数错误: {str(e)}"

filetype

你参考我给你的逻辑找MAC的代码来做,实际业务情况也是这样写的,同意的交换机同样的环境import logging import re import time from netmiko import ConnectHandler # ----------------------- 日志配置 ----------------------- logging.basicConfig( filename='mac_manager.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', encoding='utf-8' ) # ----------------------- 设备配置 ----------------------- DEVICES = { 'oa_core_switch': { 'device_type': 'hp_comware', 'host': '175.25.56.22', 'username': 'user', 'password': 'UESR', 'timeout': 15 }, 'aggregation_switch': { 'device_type': 'hp_comware', 'username': 'user', 'password': 'UESR', 'timeout': 15 }, 'access_switch': { 'device_type': 'hp_comware', 'username': 'python', 'password': 'USER', 'timeout': 10 } } # ----------------------- 增强的LLDP解析 ----------------------- def parse_lldp_neighbor(output): """解析LLDP输出获取管理IP""" management_ip = None for line in output.splitlines(): if 'Management address' in line and ':' in line: ip_match = re.search(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line) if ip_match: management_ip = ip_match.group() break return management_ip # ----------------------- 设备连接管理 ----------------------- def connect_with_retry(device_info, retries=3, delay=5): """带重试机制的设备连接""" for attempt in range(retries): try: conn = ConnectHandler(**device_info) logging.info(f"成功连接 {device_info['host']}") return conn except Exception as e: logging.error(f"连接失败({attempt + 1}/{retries}): {str(e)}") time.sleep(delay) raise ConnectionError(f"无法连接 {device_info['host']}") # ----------------------- 增强的端口处理 ----------------------- def convert_port_format(physical_port): """转换端口格式 XGE -> Ten-GigabitEthernet""" return physical_port.replace("XGE", "Ten-GigabitEthernet") def handle_special_aggregation(ip, mac, vlan): """处理特殊汇聚交换机""" try: # 创建特殊设备配置(根据实际情况调整凭证) special_device = { 'device_type': 'hp_comware', 'host': ip, 'username': 'user', # 特殊设备的用户名 'password': 'abc#1234567', # 特殊设备的密码 'timeout': 20 } conn = connect_with_retry(special_device) # 查找聚合端口 agg_port, phys_ports = get_aggregation_info(conn, mac) if not agg_port: return f"{ip} 未找到聚合端口" # 遍历物理端口查找接入交换机 for port in phys_ports: access_ip = get_lldp_neighbor(conn, port) if not access_ip: continue # 配置接入交换机 result = configure_access_switch(access_ip, mac, vlan) if "成功" in result: return result return f"{ip} 未找到有效接入链路" except Exception as e: logging.error(f"特殊汇聚处理失败: {str(e)}") return f"特殊设备处理失败: {str(e)}" finally: if 'conn' in locals(): conn.disconnect() def trace_oa_connection(mac, vlan): """OA区域全链路追踪(新增函数)""" try: oa_conn = connect_with_retry(DEVICES['oa_core_switch']) # 核心逻辑与trace_fab_connection类似,需补充 agg_port, phys_ports = get_aggregation_info(oa_conn, mac) if not agg_port: return "OA核心交换机未找到聚合端口" # 处理特殊IP范围(保持原有逻辑) SPECIAL_IPS = {"172.17.201.2", "172.17.201.3", "172.17.201.4", "172.17.201.22"} for port in phys_ports: neighbor_ip = get_lldp_neighbor(oa_conn, port) if not neighbor_ip: continue if neighbor_ip in SPECIAL_IPS: return handle_special_aggregation(neighbor_ip, mac, vlan) else: return configure_access_switch(neighbor_ip, mac, vlan) return "未找到有效链路" except Exception as e: logging.error(f"OA区域处理失败: {str(e)}") return f"处理失败: {str(e)}" finally: if oa_conn: oa_conn.disconnect() # ----------------------- 通用工具函数 ----------------------- def get_aggregation_info(conn, mac): """获取聚合端口信息(核心/汇聚通用)""" try: cli_mac = f"{mac[:4]}-{mac[4:8]}-{mac[8:12]}" output = conn.send_command(f"display mac-address {cli_mac}") logging.info(f"MAC地址查询结果:\n{output}") # 查找聚合端口 agg_port = None for line in output.splitlines(): if 'BAGG' in line: parts = line.split() if len(parts) > 1: agg_port = parts[-2] # 假设聚合口在倒数第二列 break if not agg_port: logging.warning("未找到聚合端口") return None, None # 获取物理端口 output = conn.send_command(f"display link-aggregation verbose {agg_port}") logging.info(f"链路聚合信息:\n{output}") phys_ports = [] for line in output.splitlines(): if any(x in line for x in ['XGE', 'GE']): port = line.split()[0].replace('(R)', '').strip() if port not in phys_ports: phys_ports.append(port) return agg_port, phys_ports except Exception as e: logging.error(f"获取聚合信息失败: {str(e)}") return None, None def get_lldp_neighbor(conn, port): """获取LLDP邻居信息(核心/汇聚通用)""" try: converted_port = convert_port_format(port) output = conn.send_command( f"display lldp neighbor-information interface {converted_port} verbose", read_timeout=60 ) logging.info(f"LLDP邻居信息:\n{output}") return parse_lldp_neighbor(output) except Exception as e: logging.error(f"LLDP查询失败: {str(e)}") return None def handle_oamac(raw_mac, vlan): """处理OA区域有线MAC""" try: mac = re.sub(r'[^0-9a-fA-F]', '', raw_mac).lower() if len(mac) != 12: raise ValueError("无效的MAC地址格式") return trace_oa_connection(mac, vlan) except Exception as e: return f"参数错误: {str(e)}"

哈奇明
  • 粉丝: 44
上传资源 快速赚钱