Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for pysnmp version6 #3473

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 98 additions & 29 deletions netmiko/snmp_autodetect.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@

from typing import Optional, Dict, List
from typing.re import Pattern
import asyncio
import re
import socket

try:
from pysnmp.entity.rfc3413.oneliner import cmdgen

SNMP_MODE = "legacy"
except ImportError:
from pysnmp.hlapi.asyncio import cmdgen

SNMP_MODE = "v6_async"
except ImportError:
raise ImportError("pysnmp not installed; please install it: 'pip install pysnmp'")

Expand Down Expand Up @@ -304,12 +311,14 @@ def __init__(
self.snmp_target, timeout=1.5, retries=2
)

def _get_snmpv3(self, oid: str) -> str:
async def _run_query(self, creds: object, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv3 for the specified OID.
Asynchronous getCmd query to the device.

Parameters
----------
creds : UsmUserData or CommunityData object
The authentication credentials.
oid : str
The SNMP OID that you want to get.

Expand All @@ -318,26 +327,82 @@ def _get_snmpv3(self, oid: str) -> str:
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
errorIndication, errorStatus, errorIndex, varBinds = await cmdgen.getCmd(
cmdgen.SnmpEngine(),
creds,
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
cmdgen.ContextData(),
cmdgen.ObjectType(cmdgen.ObjectIdentity(oid)),
)

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
if not errorIndication and varBinds[0][1]:
return str(varBinds[0][1])
return ""

def _get_snmpv3_asyncwr(self, oid: str) -> str:
"""
This is an asynchronous wrapper to call code in newer versions of the pysnmp library
(V6 and later).
"""
return asyncio.run(
self._run_query(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
oid,
)
)

def _get_snmpv3(self, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv3 for the specified OID.

Parameters
----------
oid : str
The SNMP OID that you want to get.

Returns
-------
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
if SNMP_MODE == "legacy":
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""
elif SNMP_MODE == "v6_async":
return self._get_snmpv3_asyncwr(oid=oid)
else:
raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'")

def _get_snmpv2c_asyncwr(self, oid: str) -> str:
"""
This is an asynchronous wrapper to call code in newer versions of the pysnmp library
(V6 and later).
"""
return asyncio.run(self._run_query(cmdgen.CommunityData(self.community), oid))

def _get_snmpv2c(self, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv2 for the specified OID.
Expand All @@ -352,19 +417,23 @@ def _get_snmpv2c(self, oid: str) -> str:
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.CommunityData(self.community),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)
if SNMP_MODE == "legacy":
cmd_gen = cmdgen.CommandGenerator()
(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.CommunityData(self.community),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)
if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""
elif SNMP_MODE == "v6_async":
return self._get_snmpv2c_asyncwr(oid=oid)
else:
raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'")

def _get_snmp(self, oid: str) -> str:
"""Wrapper for generic SNMP call."""
Expand Down
Loading