Skip to content

Commit

Permalink
Merge pull request #2 from davama/dev
Browse files Browse the repository at this point in the history
Update to version 0.0.48
  • Loading branch information
skarpe-github authored Feb 9, 2024
2 parents 8233037 + f246e37 commit effa665
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 52 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
<li>get_config</li>
<li>get_facts</li>
<li>get_vlans</li>
<li>is_alive</li>
<li>get_interfaces</li>
<li>get_lldp_neighbors</li>
<li>get_lldp_neighbors_detail</li>
<li>get_equipment_ont_status_xpon</li>
<li>get_equipment_ont_interfaces</li>
<li>get_equipment_ont_status_pon</li>
Expand All @@ -28,7 +32,6 @@
<li>get_equipment_temperature</li>
<li>cli</li>
<li>get_ntp_servers</li>
<li>get_interfaces</li>
<li>send_single_command</li>
</ul>
<p>Each function will always return the output as dict data-type </p>
<p>Each function will always return the output as dict data-type </p>
2 changes: 1 addition & 1 deletion napalm_nokia_olt/napalm_nokia_olt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from napalm_nokia_olt.nokia_olt import NokiaOltDriver
__all__ = ("NokiaOltDriver",)

__version__ = "0.0.47"
__version__ = "0.0.48"
209 changes: 162 additions & 47 deletions napalm_nokia_olt/napalm_nokia_olt/nokia_olt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import print_function
from __future__ import unicode_literals
import socket
import re
from netmiko import ConnectHandler
from napalm.base.base import NetworkDriver
import xml.etree.ElementTree as ET
Expand Down Expand Up @@ -66,18 +67,10 @@ def __init__(

def _send_command(self, command, xml_format=False):
"""Send command to device"""
# if true:
# append xml to the end of the command
if xml_format:
command = command + " " + "xml"
output = self.device.send_command(command, expect_string=r"#$")
return output
# elif "info" in command:
# output = self.device.send_command(command, expect_string=r"#$")
# return output
# else:
# output = self.device.send_command(command, expect_string=r"(#|$)")
# return output
command += " xml"
output = self.device.send_command(command, expect_string=r"#$")
return output

def open(self):
"""Open an SSH tunnel connection to the device."""
Expand All @@ -92,8 +85,6 @@ def open(self):
**self.netmiko_optional_args,
)
self._prep_session()
# ensure in enable mode
# self.device.enable()

def close(self):
"""Close the connection to the device."""
Expand Down Expand Up @@ -229,7 +220,7 @@ def get_facts(self):
os_command = "show software-mngt version ansi"
uptime_command = "show core1-uptime"
sn_command = "show equipment shelf 1/1 detail"
port_command = "show interface port"
port_command = "show equipment ont interface"

hostname_output = self._send_command(hostname_command, xml_format=True)
os_output = self._send_command(os_command, xml_format=True)
Expand Down Expand Up @@ -274,6 +265,9 @@ def get_facts(self):
if "serial-no" in dummy_data:
serial_number = dummy_data["serial-no"]
facts["serial_number"] = serial_number
if "variant" in dummy_data:
variant = dummy_data["variant"]
facts["model"] += f" ({variant})"

# get uptime
for line in uptime_output.splitlines():
Expand All @@ -290,12 +284,9 @@ def get_facts(self):
# get interface_list
for elem in port_xml_tree.findall(".//instance"):
dummy_data = self._convert_xml_elem_to_dict(elem=elem)
if "port" in dummy_data:
port_raw = dummy_data["port"]
if "vlan" in port_raw:
continue
port_list.append(port_raw)
port_list.sort()
if "ont-idx" in dummy_data:
port_list.append(dummy_data["ont-idx"])
port_list.sort(key=lambda p: list(map(int, p.split("/"))))
facts["interface_list"] = port_list
return facts

Expand All @@ -317,7 +308,7 @@ def get_vlans(self):
# create default dict and get vlan_id and name
for elem in output_xml_tree.findall(".//instance"):
dummy_data = self._convert_xml_elem_to_dict(elem=elem)
primary_key = dummy_data["id"]
primary_key = int(dummy_data["id"])
if primary_key not in vlans:
vlans[primary_key] = {}
vlans[primary_key]["name"] = dummy_data["name"]
Expand All @@ -326,11 +317,9 @@ def get_vlans(self):
# get tagged/untagged ports
for elem in tag_xml_tree.findall(".//instance"):
dummy_data = self._convert_xml_elem_to_dict(elem=elem)
vlan_id = dummy_data["vlan-id"]
vlan_id = int(dummy_data["vlan-id"])
port_raw = dummy_data["vlan-port"]
port = ":".join(
port_raw.replace("vlan-port", "uni").split(":")[0:2]
)
port = port_raw.split(":")[1]
if (
"single-tagged" in dummy_data["transmit-mode"]
or "untagged" in dummy_data["transmit-mode"]
Expand Down Expand Up @@ -582,13 +571,13 @@ def get_equipment_diagnostics_sfp(self):
return f"No available ** {command} ** data from the {self.hostname}"

def get_vlan_name(self):
"""Returns VLANs info"""
command = "show vlan name"
data = self._send_command(command, xml_format=True)
if data:
return self.convert_xml_to_dict(data)
else:
return f"No available ** {command} ** data from the {self.hostname}"
"""Returns VLANs info"""
command = "show vlan name"
data = self._send_command(command, xml_format=True)
if data:
return self.convert_xml_to_dict(data)
else:
return f"No available ** {command} ** data from the {self.hostname}"

def get_vlan_bridge_port_fdb(self):
"""Returns VLANs info details"""
Expand Down Expand Up @@ -641,22 +630,148 @@ def get_ntp_servers(self):
return dict(ntp_servers)

def get_interfaces(self):
"""Returns a list of all interfaces in a dict"""
port_command = "show interface port"
port_output = self._send_command(port_command, xml_format=True)
port_xml_tree = ET.fromstring(port_output)
"""
Returns a dictionary of dictionaries for all ONT interfaces
Example Output:
{
'1/1/1/1/1': {'is_enabled': True,
'is_up': True,
'description': 'test description',
'mac_address': '0000.0000.0000',
'last_flapped': -1.0,
'mtu': 0,
'speed': 1000}
}
interfaces = {}
port_list = []
mac_address is not implemented
last_flapped is not implemented
mtu is not implemented
# get a list of interfaces
for elem in port_xml_tree.findall(".//instance"):
"""
pon_command = "show equipment ont status pon"
xpon_command = "show equipment ont status x-pon"
pon_output = self._send_command(pon_command, xml_format=True)
xpon_output = self._send_command(xpon_command, xml_format=True)
pon_xml_tree = ET.fromstring(pon_output)
xpon_xml_tree = ET.fromstring(xpon_output)

interface_dict = {}

# parse 1GE ports
for elem in pon_xml_tree.findall(".//instance"):
dummy_data = self._convert_xml_elem_to_dict(elem=elem)
if "port" in dummy_data:
port_raw = dummy_data["port"]
if "vlan" in port_raw:
if "ont" in dummy_data:
is_enabled = bool("up" in dummy_data.get("admin-status", ""))
is_up = bool("up" in dummy_data.get("oper-status", ""))
interface_dict[dummy_data["ont"]] = {
"is_enabled": is_enabled,
"is_up": is_up,
"description": dummy_data.get("desc1", ""),
"mac_address": "0000.0000.0000",
"last_flapped": -1.0,
"mtu": 0,
"speed": 1000,
}

# parse 10GE ports (X-PON)
for elem in xpon_xml_tree.findall(".//instance"):
dummy_data = self._convert_xml_elem_to_dict(elem=elem)
if "ont" in dummy_data:
is_enabled = bool("up" in dummy_data.get("admin-status", ""))
is_up = bool("up" in dummy_data.get("oper-status", ""))
interface_dict[dummy_data["ont"]] = {
"is_enabled": is_enabled,
"is_up": is_up,
"description": dummy_data.get("desc1", ""),
"mac_address": "0000.0000.0000",
"last_flapped": -1.0,
"mtu": 0,
"speed": 10000,
}

return interface_dict

def get_lldp_neighbors(self):
"""Returns a dictionary with LLDP neighbors"""
port_command = "show port"
port_data = self._send_command(port_command, xml_format=False)
ports = []
lldp = {}
port_regex = r"^(?!=|-|\s|Port|Id.).+$"
remote_host_regex = r"^System Name[\s:]+(.+)$"
remote_port_regex = r"""^Port Id[\s:]+([0-9a-zA-Z:]+[\n][\s"\/a-zA-Z0-9]+$)"""
all_ports = re.findall(port_regex, port_data, re.MULTILINE)
for line in all_ports:
line = line.split()
if len(line) > 0:
if not line[-1] == "vport":
ports.append(line[0])
for port in ports:
lldp_command = f"show port {port} ethernet lldp remote-info"
lldp_data = self._send_command(lldp_command, xml_format=False)
remote_host = re.search(remote_host_regex, lldp_data, re.MULTILINE)
if remote_host:
lldp[port] = [{"hostname": remote_host.group(1), "port": ""}]
try:
remote_port_data = re.search(remote_port_regex, lldp_data, re.MULTILINE)
remote_port = remote_port_data.group(1).split()[1]
lldp[port][0]["port"] = remote_port.replace('"','')
except IndexError:
continue
return lldp

def get_lldp_neighbors_detail(self):
"""Returns a detailed view of the LLDP neighbors as a dictionary"""
port_command = "show port"
port_data = self._send_command(port_command, xml_format=False)
ports = []
lldp = {}
port_regex = r"^(?!=|-|\s|Port|Id.).+$"
remote_host_regex = r"^System Name[\s:]+(.+)$"
remote_port_regex = r"""^Port Id[\s:]+([0-9a-zA-Z:]+[\n][\s"\/a-zA-Z0-9]+$)"""
remote_chassis_regex = r"^Chassis Id[\s]{3,}[:\s](.+)$"
remote_port_descr_regex = r"^Port Description[:\s]+(.+)$"
remote_sys_descr_regex = r"(?<=^System Description)\s*:\s(.*)(?=\n\n\n)"
remote_sys_cap_regex = r"^Supported Caps[\s:]+(.+)$"
remote_sys_en_cap_regex = r"^Enabled Caps[\s:]+(.+)$"
all_ports = re.findall(port_regex, port_data, re.MULTILINE)
for line in all_ports:
line = line.split()
if len(line) > 0:
if not line[-1] == "vport":
ports.append(line[0])
for port in ports:
lldp_command = f"show port {port} ethernet lldp remote-info"
lldp_data = self._send_command(lldp_command, xml_format=False)
remote_host = re.findall(remote_host_regex, lldp_data, re.MULTILINE)
if len(remote_host) > 0:
lldp[port] = [
{"parent_interface": port,
"remote_chassis_id": "",
"remote_system_name": remote_host[0],
"remote_port": "",
"remote_port_description": "",
"remote_system_description": "",
"remote_system_capab": "",
"remote_system_enable_capab": "",
}
]
remote_chassis_id_data = re.search(remote_chassis_regex, lldp_data, re.MULTILINE)
remote_port_descr_data = re.search(remote_port_descr_regex, lldp_data, re.MULTILINE)
remote_sys_descr_data = re.search(remote_sys_descr_regex, lldp_data, flags = re.S | re.M)
remote_sys_cap_data = re.search(remote_sys_cap_regex, lldp_data, re.MULTILINE)
remote_sys_en_cap_data = re.search(remote_sys_en_cap_regex, lldp_data, re.MULTILINE)
lldp[port][0]["remote_chassis_id"] = remote_chassis_id_data.group(1)
lldp[port][0]["remote_port_description"] = remote_port_descr_data.group(1)
lldp[port][0]["remote_system_description"] = remote_sys_descr_data.group(1)
lldp[port][0]["remote_system_description"] = ' '.join(lldp[port][0]["remote_system_description"].split())
lldp[port][0]["remote_system_capab"] = remote_sys_cap_data.group(1)
lldp[port][0]["remote_system_enable_capab"] = remote_sys_en_cap_data.group(1)
try:
remote_port_data = re.search(remote_port_regex, lldp_data, re.MULTILINE)
remote_port = remote_port_data.group(1).split()[1]
lldp[port][0]["remote_port"] = remote_port.replace('"','')
except IndexError:
continue
port_list.append(port_raw)
port_list.sort()
interfaces["interface_list"] = port_list
return interfaces
return lldp
4 changes: 2 additions & 2 deletions napalm_nokia_olt/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setuptools.setup(
name="napalm_nokia_olt",
version="0.0.47",
version="0.0.48",
author="Dave Macias",
author_email = "davama@gmail.com",
description=("Network Automation and Programmability Abstraction "
Expand All @@ -26,5 +26,5 @@
"License :: OSI Approved :: BSD License",
],
include_package_data=True,
install_requires=('napalm>=3','xmltodict', 'sshFRIEND'),
install_requires=('napalm>=3','xmltodict'),
)

0 comments on commit effa665

Please sign in to comment.