花海

花海

SNMP获取路由器交换机信息

2025-03-31
# #!/usr/bin/env python3
# # 使用 PySNMP 4.4.12 版本
# # 可以通过 pip install pysnmp==4.4.12 安装此版本
# from pysnmp.hlapi import *
# import sys
# import time
#
#
# def get_router_info(router_ip, community_string, port=161):
#     """
#     获取路由器的基本信息,包括系统描述、联系人、名称、位置、运行时间和接口信息
#
#     参数:
#         router_ip (str): 路由器的IP地址
#         community_string (str): SNMP团体字符串(通常是'public'或'private')
#         port (int): SNMP端口,默认为161
#
#     返回:
#         dict: 包含路由器信息的字典
#     """
#     # 定义要查询的OID
#     oids = {
#         'sysDescr': '1.3.6.1.2.1.1.1.0',  # 系统描述
#         'sysContact': '1.3.6.1.2.1.1.4.0',  # 系统联系人
#         'sysName': '1.3.6.1.2.1.1.5.0',  # 系统名称
#         'sysLocation': '1.3.6.1.2.1.1.6.0',  # 系统位置
#         'sysUpTime': '1.3.6.1.2.1.1.3.0',  # 系统运行时间
#     }
#
#     # 存储结果的字典
#     router_info = {}
#
#     # 获取基本系统信息
#     for name, oid in oids.items():
#         errorIndication, errorStatus, errorIndex, varBinds = next(
#             getCmd(SnmpEngine(),
#                    CommunityData(community_string),
#                    UdpTransportTarget((router_ip, port)),
#                    ContextData(),
#                    ObjectType(ObjectIdentity(oid)))
#         )
#
#         if errorIndication:
#             print(f"错误: {errorIndication}")
#             sys.exit(1)
#         elif errorStatus:
#             print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
#             sys.exit(1)
#         else:
#             for varBind in varBinds:
#                 router_info[name] = str(varBind[1])
#
#     # 获取接口信息
#     router_info['interfaces'] = get_interfaces(router_ip, community_string, port)
#
#     return router_info
#
#
# def get_interfaces(router_ip, community_string, port=161):
#     """
#     获取路由器的所有接口信息
#
#     参数:
#         router_ip (str): 路由器的IP地址
#         community_string (str): SNMP团体字符串
#         port (int): SNMP端口,默认为161
#
#     返回:
#         list: 包含接口信息的列表
#     """
#     interfaces = []
#
#     # 获取接口索引列表
#     if_indices = []
#     for (errorIndication,
#          errorStatus,
#          errorIndex,
#          varBinds) in nextCmd(SnmpEngine(),
#                               CommunityData(community_string),
#                               UdpTransportTarget((router_ip, port)),
#                               ContextData(),
#                               ObjectType(ObjectIdentity('1.3.6.1.2.1.2.2.1.1')),  # ifIndex
#                               lexicographicMode=False):
#
#         if errorIndication:
#             print(f"获取接口索引时出错: {errorIndication}")
#             return interfaces
#         elif errorStatus:
#             print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
#             return interfaces
#         else:
#             for varBind in varBinds:
#                 if_indices.append(str(varBind[1]))
#
#     # 遍历每个接口,获取详细信息
#     for ifIndex in if_indices:
#         # 为每个接口获取详细信息
#         interface = {'ifIndex': ifIndex}
#
#         # 定义要查询的接口OID
#         interface_oids = {
#             'ifDescr': f'1.3.6.1.2.1.2.2.1.2.{ifIndex}',  # 接口描述
#             'ifType': f'1.3.6.1.2.1.2.2.1.3.{ifIndex}',  # 接口类型
#             'ifMtu': f'1.3.6.1.2.1.2.2.1.4.{ifIndex}',  # 最大传输单元
#             'ifSpeed': f'1.3.6.1.2.1.2.2.1.5.{ifIndex}',  # 接口速度
#             'ifPhysAddress': f'1.3.6.1.2.1.2.2.1.6.{ifIndex}',  # MAC地址
#             'ifAdminStatus': f'1.3.6.1.2.1.2.2.1.7.{ifIndex}',  # 管理状态
#             'ifOperStatus': f'1.3.6.1.2.1.2.2.1.8.{ifIndex}',  # 操作状态
#             'ifInOctets': f'1.3.6.1.2.1.2.2.1.10.{ifIndex}',  # 入站字节数
#             'ifOutOctets': f'1.3.6.1.2.1.2.2.1.16.{ifIndex}',  # 出站字节数
#             'ifInErrors': f'1.3.6.1.2.1.2.2.1.14.{ifIndex}',  # 入站错误数
#             'ifOutErrors': f'1.3.6.1.2.1.2.2.1.20.{ifIndex}',  # 出站错误数
#         }
#
#         # 获取每个接口属性
#         for name, oid in interface_oids.items():
#             errorIndication, errorStatus, errorIndex, varBinds = next(
#                 getCmd(SnmpEngine(),
#                        CommunityData(community_string),
#                        UdpTransportTarget((router_ip, port)),
#                        ContextData(),
#                        ObjectType(ObjectIdentity(oid)))
#             )
#
#             if errorIndication:
#                 print(f"获取接口 {ifIndex} 的 {name} 时出错: {errorIndication}")
#             elif errorStatus:
#                 print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
#             else:
#                 for varBind in varBinds:
#                     value = str(varBind[1])
#                     # 处理MAC地址格式
#                     if name == 'ifPhysAddress' and len(value) > 0 and value != "":
#                         try:
#                             # PySNMP 4.4.12 返回的物理地址格式可能是OctetString
#                             if hasattr(varBind[1], 'asNumbers'):
#                                 hex_string = ':'.join(f'{x:02x}' for x in varBind[1].asNumbers())
#                                 interface[name] = hex_string
#                             else:
#                                 hex_string = ':'.join(f'{ord(x):02x}' for x in value)
#                                 interface[name] = hex_string
#                         except:
#                             interface[name] = value
#                     else:
#                         interface[name] = value
#
#         # 添加到接口列表
#         interfaces.append(interface)
#
#     return interfaces
#
#
# def display_router_info(router_info):
#     """
#     显示路由器信息
#
#     参数:
#         router_info (dict): 包含路由器信息的字典
#     """
#     print("\n路由器基本信息:")
#     print("-" * 40)
#     print(f"系统描述: {router_info.get('sysDescr', 'N/A')}")
#     print(f"系统名称: {router_info.get('sysName', 'N/A')}")
#     print(f"系统位置: {router_info.get('sysLocation', 'N/A')}")
#     print(f"系统联系人: {router_info.get('sysContact', 'N/A')}")
#
#     # 转换运行时间从timeticks到可读格式 (PySNMP 4.4.12可能会返回更易读的格式)
#     uptime_str = router_info.get('sysUpTime', '0')
#     try:
#         # 尝试提取数字部分
#         if '(' in uptime_str:
#             uptime_ticks = int(uptime_str.split('(')[1].split(')')[0])
#         else:
#             uptime_ticks = int(uptime_str)
#
#         uptime_seconds = uptime_ticks / 100  # 转换为秒
#         days, remainder = divmod(uptime_seconds, 86400)
#         hours, remainder = divmod(remainder, 3600)
#         minutes, seconds = divmod(remainder, 60)
#         print(f"系统运行时间: {int(days)}天 {int(hours)}小时 {int(minutes)}分钟 {int(seconds)}秒")
#     except:
#         # 如果解析失败,直接显示原始值
#         print(f"系统运行时间: {uptime_str}")
#
#     print("\n接口信息:")
#     print("-" * 80)
#     print(f"{'索引':<6} {'描述':<25} {'类型':<10} {'状态':<10} {'速度(bps)':<15} {'MAC地址':<20}")
#     print("-" * 80)
#
#     for interface in router_info.get('interfaces', []):
#         # 接口状态转换 (1=up, 2=down)
#         admin_status = "启用" if interface.get('ifAdminStatus') == "1" else "禁用"
#         oper_status = "运行" if interface.get('ifOperStatus') == "1" else "关闭"
#         status = f"{admin_status}/{oper_status}"
#
#         print(f"{interface.get('ifIndex', 'N/A'):<6} "
#               f"{interface.get('ifDescr', 'N/A'):<25} "
#               f"{interface.get('ifType', 'N/A'):<10} "
#               f"{status:<10} "
#               f"{interface.get('ifSpeed', 'N/A'):<15} "
#               f"{interface.get('ifPhysAddress', 'N/A'):<20}")
#
#
# def monitor_interface_traffic(router_ip, community_string, interface_index, duration=60, interval=5, port=161):
#     """
#     监控指定接口的流量
#
#     参数:
#         router_ip (str): 路由器的IP地址
#         community_string (str): SNMP团体字符串
#         interface_index (str): 接口索引
#         duration (int): 监控总时长(秒)
#         interval (int): 监控间隔(秒)
#         port (int): SNMP端口,默认为161
#     """
#     # 定义接口流量OID
#     in_octets_oid = f'1.3.6.1.2.1.2.2.1.10.{interface_index}'  # ifInOctets
#     out_octets_oid = f'1.3.6.1.2.1.2.2.1.16.{interface_index}'  # ifOutOctets
#
#     print(f"\n开始监控接口 {interface_index} 的流量,每 {interval} 秒采样一次,总共 {duration} 秒")
#     print("-" * 80)
#     print(f"{'时间':<20} {'入站 (Mbps)':<15} {'出站 (Mbps)':<15} {'总流量 (Mbps)':<15}")
#     print("-" * 80)
#
#     # 初始化上一次的计数
#     last_in_octets = 0
#     last_out_octets = 0
#     start_time = time.time()
#
#     # 第一次获取数据
#     errorIndication, errorStatus, errorIndex, varBinds = next(
#         getCmd(SnmpEngine(),
#                CommunityData(community_string),
#                UdpTransportTarget((router_ip, port)),
#                ContextData(),
#                ObjectType(ObjectIdentity(in_octets_oid)),
#                ObjectType(ObjectIdentity(out_octets_oid)))
#     )
#
#     if errorIndication:
#         print(f"错误: {errorIndication}")
#         return
#     elif errorStatus:
#         print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
#         return
#
#     for varBind in varBinds:
#         oid = str(varBind[0])
#         if in_octets_oid in oid:
#             last_in_octets = int(varBind[1])
#         elif out_octets_oid in oid:
#             last_out_octets = int(varBind[1])
#
#     elapsed_time = 0
#     while elapsed_time < duration:
#         time.sleep(interval)
#
#         # 获取当前数据
#         errorIndication, errorStatus, errorIndex, varBinds = next(
#             getCmd(SnmpEngine(),
#                    CommunityData(community_string),
#                    UdpTransportTarget((router_ip, port)),
#                    ContextData(),
#                    ObjectType(ObjectIdentity(in_octets_oid)),
#                    ObjectType(ObjectIdentity(out_octets_oid)))
#         )
#
#         if errorIndication:
#             print(f"错误: {errorIndication}")
#             continue
#         elif errorStatus:
#             print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
#             continue
#
#         current_in_octets = 0
#         current_out_octets = 0
#
#         for varBind in varBinds:
#             oid = str(varBind[0])
#             if in_octets_oid in oid:
#                 current_in_octets = int(varBind[1])
#             elif out_octets_oid in oid:
#                 current_out_octets = int(varBind[1])
#
#         # 处理计数器溢出情况
#         if current_in_octets < last_in_octets:
#             # 32位计数器的最大值是 2^32 - 1
#             delta_in = (2 ** 32 - last_in_octets) + current_in_octets
#         else:
#             delta_in = current_in_octets - last_in_octets
#
#         if current_out_octets < last_out_octets:
#             delta_out = (2 ** 32 - last_out_octets) + current_out_octets
#         else:
#             delta_out = current_out_octets - last_out_octets
#
#         # 计算速率 (Mbps)
#         in_rate = delta_in * 8 / (interval * 1000000)
#         out_rate = delta_out * 8 / (interval * 1000000)
#         total_rate = in_rate + out_rate
#
#         # 更新上一次的计数
#         last_in_octets = current_in_octets
#         last_out_octets = current_out_octets
#
#         # 输出结果
#         current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#         print(f"{current_time:<20} {in_rate:.2f}           {out_rate:.2f}           {total_rate:.2f}")
#
#         elapsed_time = time.time() - start_time
#
#
# def main():
#     """主函数"""
#     # if len(sys.argv) < 3:
#     print("用法: python3 router_snmp.py <路由器IP> <团体字符串> [端口]")
#     print("例如: python3 router_snmp.py 192.168.1.1 public 161")
#         # sys.exit(1)
#
#     # router_ip = sys.argv[1]
#     # community_string = sys.argv[2]
#     # port = int(sys.argv[3]) if len(sys.argv) > 3 else 161
#
#     router_ip = '192.168.1.2'
#     community_string = 'public123'
#     port = 161
#
#     try:
#         # 获取路由器信息
#         print(f"正在获取路由器 {router_ip} 的信息...")
#         router_info = get_router_info(router_ip, community_string, port)
#
#         # 显示路由器信息
#         display_router_info(router_info)
#
#         # 可选:监控特定接口的流量
#         if router_info.get('interfaces'):
#             while True:
#                 try:
#                     choice = input("\n是否要监控接口流量?(y/n): ").lower()
#                     if choice == 'y':
#                         interface_indices = [interface['ifIndex'] for interface in router_info['interfaces']]
#                         print("\n可用接口索引:", ', '.join(interface_indices))
#
#                         if_index = input("输入要监控的接口索引 (例如 1): ")
#                         if if_index not in interface_indices:
#                             print(f"接口索引 {if_index} 不存在")
#                             continue
#
#                         duration = int(input("输入监控时长(秒)[默认 60]: ") or 60)
#                         interval = int(input("输入采样间隔(秒)[默认 5]: ") or 5)
#
#                         monitor_interface_traffic(router_ip, community_string, if_index, duration, interval, port)
#                     else:
#                         break
#                 except ValueError as e:
#                     print(f"输入错误: {e}")
#                 except KeyboardInterrupt:
#                     print("\n监控被用户中断")
#                     break
#
#     except KeyboardInterrupt:
#         print("\n程序被用户中断")
#     except Exception as e:
#         print(f"发生错误: {e}")
#
#
# if __name__ == "__main__":
#     main()


from pysnmp.hlapi import *
import time
import logging
from rich.console import Console
from rich.table import Table
from tqdm import tqdm

# 配置日志
logging.basicConfig(filename="snmp_monitor.log", level=logging.INFO,
                    format="%(asctime)s - %(levelname)s - %(message)s")
console = Console()


def get_snmp_data(target, community, oid, timeout=1, retries=3):
    try:
        iterator = getCmd(SnmpEngine(),
                          CommunityData(community, mpModel=0),
                          UdpTransportTarget((target, 161), timeout=timeout, retries=retries),
                          ContextData(),
                          ObjectType(ObjectIdentity(oid)))

        errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
        if errorIndication:
            logging.error(f"SNMP 错误: {errorIndication}")
            return None
        elif errorStatus:
            logging.error(f"SNMP 响应错误: {errorStatus.prettyPrint()}")
            return None
        else:
            return varBinds[0][1].prettyPrint()
    except Exception as e:
        logging.error(f"SNMP 获取数据失败: {e}")
        return None


def get_router_info(router_ip, community):
    info = {}
    info["sysName"] = get_snmp_data(router_ip, community, '1.3.6.1.2.1.1.5.0')
    info["sysDescr"] = get_snmp_data(router_ip, community, '1.3.6.1.2.1.1.1.0')
    info["sysUpTime"] = get_snmp_data(router_ip, community, '1.3.6.1.2.1.1.3.0')
    return info


def get_interfaces(router_ip, community):
    interfaces = []
    for i in range(1, 32):  # 示例: 获取前5个接口
        interface = {
            "ifIndex": i,
            "ifDescr": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.2.{i}'),
            "ifType": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.3.{i}'),
            "ifSpeed": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.5.{i}'),
            "ifAdminStatus": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.7.{i}'),
            "ifOperStatus": get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.8.{i}'),
        }
        if interface["ifDescr"]:
            interfaces.append(interface)
    return interfaces


def display_router_info(router_info, interfaces):
    console.print(f"[bold green]设备名称:[/bold green] {router_info['sysName']}")
    console.print(f"[bold blue]设备描述:[/bold blue] {router_info['sysDescr']}")
    console.print(f"[bold yellow]设备运行时间:[/bold yellow] {router_info['sysUpTime']}\n")

    table = Table(title="接口信息")
    table.add_column("索引", justify="center")
    table.add_column("描述", justify="left")
    table.add_column("类型", justify="center")
    table.add_column("速度 (bps)", justify="right")
    table.add_column("状态", justify="center")

    for interface in interfaces:
        table.add_row(
            str(interface['ifIndex']),
            interface.get('ifDescr', 'N/A'),
            interface.get('ifType', 'N/A'),
            interface.get('ifSpeed', 'N/A'),
            f"{'启用' if interface.get('ifAdminStatus') == '1' else '禁用'}/{'运行' if interface.get('ifOperStatus') == '1' else '关闭'}"
        )
    console.print(table)


def monitor_interface(router_ip, community, interface_index, duration=30, interval=5):
    console.print(f"[bold cyan]监控接口 {interface_index} 流量 {duration} 秒,每 {interval} 秒采样[/bold cyan]")
    with tqdm(total=duration, desc=f"接口 {interface_index} 流量监控", unit="s") as pbar:
        start_time = time.time()
        while time.time() - start_time < duration:
            in_octets = get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.10.{interface_index}')
            out_octets = get_snmp_data(router_ip, community, f'1.3.6.1.2.1.2.2.1.16.{interface_index}')
            console.print(f"流入: {in_octets} bytes, 流出: {out_octets} bytes")
            time.sleep(interval)
            pbar.update(interval)


if __name__ == "__main__":
    router_ip = "192.168.1.2"
    community_string = "public123"

    router_info = get_router_info(router_ip, community_string)
    interfaces = get_interfaces(router_ip, community_string)
    display_router_info(router_info, interfaces)

    monitor_interface(router_ip, community_string, interface_index=32, duration=30, interval=5)

  • 0