Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix image upload v2 #91

Merged
merged 4 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nb-dt-import.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def main():
settings.handle.verbose_log(
f'Script took {(datetime.now() - startTime)} to run')
settings.handle.log(f'{netbox.counter["added"]} devices created')
settings.handle.log(f'{netbox.counter["images"]} images uploaded')
settings.handle.log(
f'{netbox.counter["updated"]} interfaces/ports updated')
settings.handle.log(
Expand Down
137 changes: 92 additions & 45 deletions netbox_api.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from collections import Counter
import pynetbox
import requests
import os
import glob
# from pynetbox import RequestError as APIRequestError

class NetBox:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)

def __init__(self, settings):
self.counter = Counter(
added=0,
updated=0,
manufacturer=0,
module_added=0,
module_port_added=0,
images=0,
)
self.url = settings.NETBOX_URL
self.token = settings.NETBOX_TOKEN
Expand All @@ -25,7 +28,7 @@ def __init__(self, settings):
self.verify_compatibility()
self.existing_manufacturers = self.get_manufacturers()
self.device_types = DeviceTypes(self.netbox, self.handle, self.counter)

def connect_api(self):
try:
self.netbox = pynetbox.api(self.url, token=self.token)
Expand All @@ -35,13 +38,13 @@ def connect_api(self):
self.netbox.http_session.verify = False
except Exception as e:
self.handle.exception("Exception", 'NetBox API Error', e)

def get_api(self):
return self.netbox

def get_counter(self):
return self.counter

def verify_compatibility(self):
# nb.version should be the version in the form '3.2'
version_split = [int(x) for x in self.netbox.version.split('.')]
Expand All @@ -50,10 +53,10 @@ def verify_compatibility(self):
# Might want to check for the module-types entry as well?
if version_split[0] > 3 or (version_split[0] == 3 and version_split[1] >= 2):
self.modules = True

def get_manufacturers(self):
return {str(item): item for item in self.netbox.dcim.manufacturers.all()}

def create_manufacturers(self, vendors):
to_create = []
self.existing_manufacturers = self.get_manufacturers()
Expand All @@ -64,7 +67,7 @@ def create_manufacturers(self, vendors):
except KeyError:
to_create.append(vendor)
self.handle.verbose_log(f"Manufacturer queued for addition: {vendor['name']}")

if to_create:
try:
created_manufacturers = self.netbox.dcim.manufacturers.create(to_create)
Expand All @@ -75,9 +78,28 @@ def create_manufacturers(self, vendors):
except pynetbox.RequestError as request_error:
self.handle.log("Error creating manufacturers")
self.handle.verbose_log(f"Error during manufacturer creation. - {request_error.error}")

def create_device_types(self, device_types_to_add):
for device_type in device_types_to_add:

# Remove file base path
src_file = device_type["src"]
del device_type["src"]

# Pre-process front/rear_image flag, remove it if present
saved_images = {}
image_base = os.path.dirname(src_file).replace("device-types","elevation-images")
for i in ["front_image","rear_image"]:
if i in device_type:
if device_type[i]:
image_glob = f"{image_base}/{device_type['slug']}.{i.split('_')[0]}.*"
images = glob.glob(image_glob, recursive=False)
if images:
saved_images[i] = images[0]
else:
self.handle.log(f"Error locating image file using '{image_glob}'")
del device_type[i]

try:
dt = self.device_types.existing_device_types[device_type["model"]]
self.handle.verbose_log(f'Device Type Exists: {dt.manufacturer.name} - '
Expand Down Expand Up @@ -114,6 +136,10 @@ def create_device_types(self, device_types_to_add):
if self.modules and 'module-bays' in device_type:
self.device_types.create_module_bays(device_type['module-bays'], dt.id)

# Finally, update images if any
if saved_images:
self.device_types.upload_images(self.url, self.token, saved_images, dt.id)

def create_module_types(self, module_types):
all_module_types = {}
for curr_nb_mt in self.netbox.dcim.module_types.all():
Expand Down Expand Up @@ -152,46 +178,46 @@ def create_module_types(self, module_types):
self.device_types.create_module_rear_ports(curr_mt["rear-ports"], module_type_res.id)
if "front-ports" in curr_mt:
self.device_types.create_module_front_ports(curr_mt["front-ports"], module_type_res.id)

class DeviceTypes:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)

def __init__(self, netbox, handle, counter):
self.netbox = netbox
self.handle = handle
self.counter = counter
self.existing_device_types = self.get_device_types()

def get_device_types(self):
return {str(item): item for item in self.netbox.dcim.device_types.all()}

def get_power_ports(self, device_type):
return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(devicetype_id=device_type)}

def get_rear_ports(self, device_type):
return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(devicetype_id=device_type)}

def get_module_power_ports(self, module_type):
return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(moduletype_id=module_type)}

def get_module_rear_ports(self, module_type):
return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(moduletype_id=module_type)}

def get_device_type_ports_to_create(self, dcim_ports, device_type, existing_ports):
to_create = [port for port in dcim_ports if port['name'] not in existing_ports]
for port in to_create:
port['device_type'] = device_type

return to_create

def get_module_type_ports_to_create(self, module_ports, module_type, existing_ports):
to_create = [port for port in module_ports if port['name'] not in existing_ports]
for port in to_create:
port['module_type'] = module_type

return to_create

def create_interfaces(self, interfaces, device_type):
existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(
devicetype_id=device_type)}
Expand All @@ -206,11 +232,11 @@ def create_interfaces(self, interfaces, device_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Interface")

def create_power_ports(self, power_ports, device_type):
existing_power_ports = self.get_power_ports(device_type)
to_create = self.get_device_type_ports_to_create(power_ports, device_type, existing_power_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -219,11 +245,11 @@ def create_power_ports(self, power_ports, device_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Power Port")

def create_console_ports(self, console_ports, device_type):
existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(console_ports, device_type, existing_console_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -245,7 +271,7 @@ def create_power_outlets(self, power_outlets, device_type):
outlet['power_port'] = power_port.id
except KeyError:
pass

try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
Expand Down Expand Up @@ -283,7 +309,7 @@ def create_rear_ports(self, rear_ports, device_type):
def create_front_ports(self, front_ports, device_type):
existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(front_ports, device_type, existing_front_ports)

if to_create:
all_rearports = self.get_rear_ports(device_type)
for port in to_create:
Expand All @@ -293,7 +319,7 @@ def create_front_ports(self, front_ports, device_type):
except KeyError:
self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - '
+ f'{port["type"]} - {device_type}')

try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
Expand All @@ -305,7 +331,7 @@ def create_front_ports(self, front_ports, device_type):
def create_device_bays(self, device_bays, device_type):
existing_device_bays = {str(item): item for item in self.netbox.dcim.device_bay_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(device_bays, device_type, existing_device_bays)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -314,11 +340,11 @@ def create_device_bays(self, device_bays, device_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Device Bay")

def create_module_bays(self, module_bays, device_type):
existing_module_bays = {str(item): item for item in self.netbox.dcim.module_bay_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(module_bays, device_type, existing_module_bays)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -331,7 +357,7 @@ def create_module_bays(self, module_bays, device_type):
def create_module_interfaces(self, module_interfaces, module_type):
existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(module_interfaces, module_type, existing_interfaces)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -340,11 +366,11 @@ def create_module_interfaces(self, module_interfaces, module_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Interface")

def create_module_power_ports(self, power_ports, module_type):
existing_power_ports = self.get_module_power_ports(module_type)
to_create = self.get_module_type_ports_to_create(power_ports, module_type, existing_power_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -353,11 +379,11 @@ def create_module_power_ports(self, power_ports, module_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Power Port")

def create_module_console_ports(self, console_ports, module_type):
existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(console_ports, module_type, existing_console_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -366,11 +392,11 @@ def create_module_console_ports(self, console_ports, module_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Console Port")

def create_module_power_outlets(self, power_outlets, module_type):
existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(power_outlets, module_type, existing_power_outlets)

if to_create:
existing_power_ports = self.get_module_power_ports(module_type)
for outlet in to_create:
Expand All @@ -379,19 +405,19 @@ def create_module_power_outlets(self, power_outlets, module_type):
outlet['power_port'] = power_port.id
except KeyError:
pass

try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.power_outlet_templates.create(to_create), "Module Power Outlet")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Power Outlet")

def create_module_console_server_ports(self, console_server_ports, module_type):
existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(console_server_ports, module_type, existing_console_server_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -400,11 +426,11 @@ def create_module_console_server_ports(self, console_server_ports, module_type):
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Console Server Port")

def create_module_rear_ports(self, rear_ports, module_type):
existing_rear_ports = self.get_module_rear_ports(module_type)
to_create = self.get_module_type_ports_to_create(rear_ports, module_type, existing_rear_ports)

if to_create:
try:
self.counter.update({'updated':
Expand All @@ -417,7 +443,7 @@ def create_module_rear_ports(self, rear_ports, module_type):
def create_module_front_ports(self, front_ports, module_type):
existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(front_ports, module_type, existing_front_ports)

if to_create:
existing_rear_ports = self.get_module_rear_ports(module_type)
for port in to_create:
Expand All @@ -427,11 +453,32 @@ def create_module_front_ports(self, front_ports, module_type):
except KeyError:
self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - '
+ f'{port["type"]} - {module_type}')

try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.front_port_templates.create(to_create), "Module Front Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Front Port")
self.handle.log(f"Error '{excep.error}' creating Module Front Port")

def upload_images(self,baseurl,token,images,device_type):
'''Upload front_image and/or rear_image for the given device type

Args:
baseurl: URL for Netbox instance
token: Token to access Netbox instance
images: map of front_image and/or rear_image filename
device_type: id for the device-type to update

Returns:
None
'''
url = f"{baseurl}/api/dcim/device-types/{device_type}/"
headers = { "Authorization": f"Token {token}" }

files = { i: (os.path.basename(f), open(f,"rb") ) for i,f in images.items() }
response = requests.patch(url, headers=headers, files=files)

self.handle.log( f'Images {images} updated at {url}: {response}' )
self.counter["images"] += len(images)
3 changes: 3 additions & 0 deletions repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ def parse_files(self, files: list, slugs: list = None):
data['manufacturer'] = {
'name': manufacturer, 'slug': self.slug_format(manufacturer)}

# Save file location to resolve any relative paths for images
data['src'] = file

if slugs and True not in [True if s.casefold() in data['slug'].casefold() else False for s in slugs]:
self.handle.verbose_log(f"Skipping {data['model']}")
continue
Expand Down