SNMP获取路由器交换机信息
编辑
1
2025-03-31
# #!/usr/bin/env python3
# # 使用 PySNMP 4.4.12 版本
# # 可以通过 pip install pysnmp==4.4.12 安装此版本
# from pysnmp.hlapi import *
# import sys
# import time
#
#
# def get_router_info(router_ip, community_string, port=161):
# """
# 获取路由器的基本信息,包括系统描述、联系人、名称、位置、运行时间和接口信息
#
# 参数:
# router_ip (str): 路由器的IP地址
# community_string (str): SNMP团体字符串(通常是'public'或'private')
# port (int): SNMP端口,默认为161
#
# 返回:
# dict: 包含路由器信息的字典
# """
# # 定义要查询的OID
# oids = {
# 'sysDescr': '1.3.6.1.2.1.1.1.0', # 系统描述
# 'sysContact': '1.3.6.1.2.1.1.4.0', # 系统联系人
# 'sysName': '1.3.6.1.2.1.1.5.0', # 系统名称
# 'sysLocation': '1.3.6.1.2.1.1.6.0', # 系统位置
# 'sysUpTime': '1.3.6.1.2.1.1.3.0', # 系统运行时间
# }
#
# # 存储结果的字典
# router_info = {}
#
# # 获取基本系统信息
# for name, oid in oids.items():
# errorIndication, errorStatus, errorIndex, varBinds = next(
# getCmd(SnmpEngine(),
# CommunityData(community_string),
# UdpTransportTarget((router_ip, port)),
# ContextData(),
# ObjectType(ObjectIdentity(oid)))
# )
#
# if errorIndication:
# print(f"错误: {errorIndication}")
# sys.exit(1)
# elif errorStatus:
# print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
# sys.exit(1)
# else:
# for varBind in varBinds:
# router_info[name] = str(varBind[1])
#
# # 获取接口信息
# router_info['interfaces'] = get_interfaces(router_ip, community_string, port)
#
# return router_info
#
#
# def get_interfaces(router_ip, community_string, port=161):
# """
# 获取路由器的所有接口信息
#
# 参数:
# router_ip (str): 路由器的IP地址
# community_string (str): SNMP团体字符串
# port (int): SNMP端口,默认为161
#
# 返回:
# list: 包含接口信息的列表
# """
# interfaces = []
#
# # 获取接口索引列表
# if_indices = []
# for (errorIndication,
# errorStatus,
# errorIndex,
# varBinds) in nextCmd(SnmpEngine(),
# CommunityData(community_string),
# UdpTransportTarget((router_ip, port)),
# ContextData(),
# ObjectType(ObjectIdentity('1.3.6.1.2.1.2.2.1.1')), # ifIndex
# lexicographicMode=False):
#
# if errorIndication:
# print(f"获取接口索引时出错: {errorIndication}")
# return interfaces
# elif errorStatus:
# print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
# return interfaces
# else:
# for varBind in varBinds:
# if_indices.append(str(varBind[1]))
#
# # 遍历每个接口,获取详细信息
# for ifIndex in if_indices:
# # 为每个接口获取详细信息
# interface = {'ifIndex': ifIndex}
#
# # 定义要查询的接口OID
# interface_oids = {
# 'ifDescr': f'1.3.6.1.2.1.2.2.1.2.{ifIndex}', # 接口描述
# 'ifType': f'1.3.6.1.2.1.2.2.1.3.{ifIndex}', # 接口类型
# 'ifMtu': f'1.3.6.1.2.1.2.2.1.4.{ifIndex}', # 最大传输单元
# 'ifSpeed': f'1.3.6.1.2.1.2.2.1.5.{ifIndex}', # 接口速度
# 'ifPhysAddress': f'1.3.6.1.2.1.2.2.1.6.{ifIndex}', # MAC地址
# 'ifAdminStatus': f'1.3.6.1.2.1.2.2.1.7.{ifIndex}', # 管理状态
# 'ifOperStatus': f'1.3.6.1.2.1.2.2.1.8.{ifIndex}', # 操作状态
# 'ifInOctets': f'1.3.6.1.2.1.2.2.1.10.{ifIndex}', # 入站字节数
# 'ifOutOctets': f'1.3.6.1.2.1.2.2.1.16.{ifIndex}', # 出站字节数
# 'ifInErrors': f'1.3.6.1.2.1.2.2.1.14.{ifIndex}', # 入站错误数
# 'ifOutErrors': f'1.3.6.1.2.1.2.2.1.20.{ifIndex}', # 出站错误数
# }
#
# # 获取每个接口属性
# for name, oid in interface_oids.items():
# errorIndication, errorStatus, errorIndex, varBinds = next(
# getCmd(SnmpEngine(),
# CommunityData(community_string),
# UdpTransportTarget((router_ip, port)),
# ContextData(),
# ObjectType(ObjectIdentity(oid)))
# )
#
# if errorIndication:
# print(f"获取接口 {ifIndex} 的 {name} 时出错: {errorIndication}")
# elif errorStatus:
# print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
# else:
# for varBind in varBinds:
# value = str(varBind[1])
# # 处理MAC地址格式
# if name == 'ifPhysAddress' and len(value) > 0 and value != "":
# try:
# # PySNMP 4.4.12 返回的物理地址格式可能是OctetString
# if hasattr(varBind[1], 'asNumbers'):
# hex_string = ':'.join(f'{x:02x}' for x in varBind[1].asNumbers())
# interface[name] = hex_string
# else:
# hex_string = ':'.join(f'{ord(x):02x}' for x in value)
# interface[name] = hex_string
# except:
# interface[name] = value
# else:
# interface[name] = value
#
# # 添加到接口列表
# interfaces.append(interface)
#
# return interfaces
#
#
# def display_router_info(router_info):
# """
# 显示路由器信息
#
# 参数:
# router_info (dict): 包含路由器信息的字典
# """
# print("\n路由器基本信息:")
# print("-" * 40)
# print(f"系统描述: {router_info.get('sysDescr', 'N/A')}")
# print(f"系统名称: {router_info.get('sysName', 'N/A')}")
# print(f"系统位置: {router_info.get('sysLocation', 'N/A')}")
# print(f"系统联系人: {router_info.get('sysContact', 'N/A')}")
#
# # 转换运行时间从timeticks到可读格式 (PySNMP 4.4.12可能会返回更易读的格式)
# uptime_str = router_info.get('sysUpTime', '0')
# try:
# # 尝试提取数字部分
# if '(' in uptime_str:
# uptime_ticks = int(uptime_str.split('(')[1].split(')')[0])
# else:
# uptime_ticks = int(uptime_str)
#
# uptime_seconds = uptime_ticks / 100 # 转换为秒
# days, remainder = divmod(uptime_seconds, 86400)
# hours, remainder = divmod(remainder, 3600)
# minutes, seconds = divmod(remainder, 60)
# print(f"系统运行时间: {int(days)}天 {int(hours)}小时 {int(minutes)}分钟 {int(seconds)}秒")
# except:
# # 如果解析失败,直接显示原始值
# print(f"系统运行时间: {uptime_str}")
#
# print("\n接口信息:")
# print("-" * 80)
# print(f"{'索引':<6} {'描述':<25} {'类型':<10} {'状态':<10} {'速度(bps)':<15} {'MAC地址':<20}")
# print("-" * 80)
#
# for interface in router_info.get('interfaces', []):
# # 接口状态转换 (1=up, 2=down)
# admin_status = "启用" if interface.get('ifAdminStatus') == "1" else "禁用"
# oper_status = "运行" if interface.get('ifOperStatus') == "1" else "关闭"
# status = f"{admin_status}/{oper_status}"
#
# print(f"{interface.get('ifIndex', 'N/A'):<6} "
# f"{interface.get('ifDescr', 'N/A'):<25} "
# f"{interface.get('ifType', 'N/A'):<10} "
# f"{status:<10} "
# f"{interface.get('ifSpeed', 'N/A'):<15} "
# f"{interface.get('ifPhysAddress', 'N/A'):<20}")
#
#
# def monitor_interface_traffic(router_ip, community_string, interface_index, duration=60, interval=5, port=161):
# """
# 监控指定接口的流量
#
# 参数:
# router_ip (str): 路由器的IP地址
# community_string (str): SNMP团体字符串
# interface_index (str): 接口索引
# duration (int): 监控总时长(秒)
# interval (int): 监控间隔(秒)
# port (int): SNMP端口,默认为161
# """
# # 定义接口流量OID
# in_octets_oid = f'1.3.6.1.2.1.2.2.1.10.{interface_index}' # ifInOctets
# out_octets_oid = f'1.3.6.1.2.1.2.2.1.16.{interface_index}' # ifOutOctets
#
# print(f"\n开始监控接口 {interface_index} 的流量,每 {interval} 秒采样一次,总共 {duration} 秒")
# print("-" * 80)
# print(f"{'时间':<20} {'入站 (Mbps)':<15} {'出站 (Mbps)':<15} {'总流量 (Mbps)':<15}")
# print("-" * 80)
#
# # 初始化上一次的计数
# last_in_octets = 0
# last_out_octets = 0
# start_time = time.time()
#
# # 第一次获取数据
# errorIndication, errorStatus, errorIndex, varBinds = next(
# getCmd(SnmpEngine(),
# CommunityData(community_string),
# UdpTransportTarget((router_ip, port)),
# ContextData(),
# ObjectType(ObjectIdentity(in_octets_oid)),
# ObjectType(ObjectIdentity(out_octets_oid)))
# )
#
# if errorIndication:
# print(f"错误: {errorIndication}")
# return
# elif errorStatus:
# print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
# return
#
# for varBind in varBinds:
# oid = str(varBind[0])
# if in_octets_oid in oid:
# last_in_octets = int(varBind[1])
# elif out_octets_oid in oid:
# last_out_octets = int(varBind[1])
#
# elapsed_time = 0
# while elapsed_time < duration:
# time.sleep(interval)
#
# # 获取当前数据
# errorIndication, errorStatus, errorIndex, varBinds = next(
# getCmd(SnmpEngine(),
# CommunityData(community_string),
# UdpTransportTarget((router_ip, port)),
# ContextData(),
# ObjectType(ObjectIdentity(in_octets_oid)),
# ObjectType(ObjectIdentity(out_octets_oid)))
# )
#
# if errorIndication:
# print(f"错误: {errorIndication}")
# continue
# elif errorStatus:
# print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
# continue
#
# current_in_octets = 0
# current_out_octets = 0
#
# for varBind in varBinds:
# oid = str(varBind[0])
# if in_octets_oid in oid:
# current_in_octets = int(varBind[1])
# elif out_octets_oid in oid:
# current_out_octets = int(varBind[1])
#
# # 处理计数器溢出情况
# if current_in_octets < last_in_octets:
# # 32位计数器的最大值是 2^32 - 1
# delta_in = (2 ** 32 - last_in_octets) + current_in_octets
# else:
# delta_in = current_in_octets - last_in_octets
#
# if current_out_octets < last_out_octets:
# delta_out = (2 ** 32 - last_out_octets) + current_out_octets
# else:
# delta_out = current_out_octets - last_out_octets
#
# # 计算速率 (Mbps)
# in_rate = delta_in * 8 / (interval * 1000000)
# out_rate = delta_out * 8 / (interval * 1000000)
# total_rate = in_rate + out_rate
#
# # 更新上一次的计数
# last_in_octets = current_in_octets
# last_out_octets = current_out_octets
#
# # 输出结果
# current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# print(f"{current_time:<20} {in_rate:.2f} {out_rate:.2f} {total_rate:.2f}")
#
# elapsed_time = time.time() - start_time
#
#
# def main():
# """主函数"""
# # if len(sys.argv) < 3:
# print("用法: python3 router_snmp.py <路由器IP> <团体字符串> [端口]")
# print("例如: python3 router_snmp.py 192.168.1.1 public 161")
# # sys.exit(1)
#
# # router_ip = sys.argv[1]
# # community_string = sys.argv[2]
# # port = int(sys.argv[3]) if len(sys.argv) > 3 else 161
#
# router_ip = '192.168.1.2'
# community_string = 'public123'
# port = 161
#
# try:
# # 获取路由器信息
# print(f"正在获取路由器 {router_ip} 的信息...")
# router_info = get_router_info(router_ip, community_string, port)
#
# # 显示路由器信息
# display_router_info(router_info)
#
# # 可选:监控特定接口的流量
# if router_info.get('interfaces'):
# while True:
# try:
# choice = input("\n是否要监控接口流量?(y/n): ").lower()
# if choice == 'y':
# interface_indices = [interface['ifIndex'] for interface in router_info['interfaces']]
# print("\n可用接口索引:", ', '.join(interface_indices))
#
# if_index = input("输入要监控的接口索引 (例如 1): ")
# if if_index not in interface_indices:
# print(f"接口索引 {if_index} 不存在")
# continue
#
# duration = int(input("输入监控时长(秒)[默认 60]: ") or 60)
# interval = int(input("输入采样间隔(秒)[默认 5]: ") or 5)
#
# monitor_interface_traffic(router_ip, community_string, if_index, duration, interval, port)
# else:
# break
# except ValueError as e:
# print(f"输入错误: {e}")
# except KeyboardInterrupt:
# print("\n监控被用户中断")
# break
#
# except KeyboardInterrupt:
# print("\n程序被用户中断")
# except Exception as e:
# print(f"发生错误: {e}")
#
#
# if __name__ == "__main__":
# main()
from pysnmp.hlapi import *
import time
import logging
from rich.console import Console
from rich.table import Table
from tqdm import tqdm
# 配置日志
logging.basicConfig(filename="snmp_monitor.log", level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s")
console = Console()
def get_snmp_data(target, community, oid, timeout=1, retries=3):
try:
iterator = getCmd(SnmpEngine(),
CommunityData(community, mpModel=0),
UdpTransportTarget((target, 161), timeout=timeout, retries=retries),
ContextData(),
ObjectType(ObjectIdentity(oid)))
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
if errorIndication:
logging.error(f"SNMP 错误: {errorIndication}")
return None
elif errorStatus:
logging.error(f"SNMP 响应错误: {errorStatus.prettyPrint()}")
return None
else:
return varBinds[0][1].prettyPrint()
except Exception as e:
logging.error(f"SNMP 获取数据失败: {e}")
return None
def get_router_info(router_ip, community):
info = {}
info["sysName"] = get_snmp_data(router_ip, community, '1.3.6.1.2.1.1.5.0')
info["sysDescr"] = get_snmp_data(router_ip, community, '1.3.6.1.2.1.1.1.0')
info["sysUpTime"] = get_snmp_data(router_ip, community, '1.3.6.1.2.1.1.3.0')
return info
def get_interfaces(router_ip, community):
interfaces = []
for i in range(1, 32): # 示例: 获取前5个接口
interface = {
"ifIndex": i,
"ifDescr": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.2.{i}'),
"ifType": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.3.{i}'),
"ifSpeed": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.5.{i}'),
"ifAdminStatus": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.7.{i}'),
"ifOperStatus": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.8.{i}'),
}
if interface["ifDescr"]:
interfaces.append(interface)
return interfaces
def display_router_info(router_info, interfaces):
console.print(f"[bold green]设备名称:[/bold green] {router_info['sysName']}")
console.print(f"[bold blue]设备描述:[/bold blue] {router_info['sysDescr']}")
console.print(f"[bold yellow]设备运行时间:[/bold yellow] {router_info['sysUpTime']}\n")
table = Table(title="接口信息")
table.add_column("索引", justify="center")
table.add_column("描述", justify="left")
table.add_column("类型", justify="center")
table.add_column("速度 (bps)", justify="right")
table.add_column("状态", justify="center")
for interface in interfaces:
table.add_row(
str(interface['ifIndex']),
interface.get('ifDescr', 'N/A'),
interface.get('ifType', 'N/A'),
interface.get('ifSpeed', 'N/A'),
f"{'启用' if interface.get('ifAdminStatus') == '1' else '禁用'}/{'运行' if interface.get('ifOperStatus') == '1' else '关闭'}"
)
console.print(table)
def monitor_interface(router_ip, community, interface_index, duration=30, interval=5):
console.print(f"[bold cyan]监控接口 {interface_index} 流量 {duration} 秒,每 {interval} 秒采样[/bold cyan]")
with tqdm(total=duration, desc=f"接口 {interface_index} 流量监控", unit="s") as pbar:
start_time = time.time()
while time.time() - start_time < duration:
in_octets = get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.10.{interface_index}')
out_octets = get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.16.{interface_index}')
console.print(f"流入: {in_octets} bytes, 流出: {out_octets} bytes")
time.sleep(interval)
pbar.update(interval)
if __name__ == "__main__":
router_ip = "192.168.1.2"
community_string = "public123"
router_info = get_router_info(router_ip, community_string)
interfaces = get_interfaces(router_ip, community_string)
display_router_info(router_info, interfaces)
monitor_interface(router_ip, community_string, interface_index=32, duration=30, interval=5)
- 0
-
分享