diff --git a/osfv_cli/osfv_cli/osfv_cli.py b/osfv_cli/osfv_cli/osfv_cli.py index 2bd0014..68b5910 100755 --- a/osfv_cli/osfv_cli/osfv_cli.py +++ b/osfv_cli/osfv_cli/osfv_cli.py @@ -5,12 +5,12 @@ import pexpect import requests - from rte import RTE from snipeit_api import SnipeIT from sonoff_api import SonoffDevice from zabbix import Zabbix + # Check out an asset def check_out_asset(snipeit_api, asset_id): success, data = snipeit_api.check_out_asset(asset_id) @@ -112,6 +112,7 @@ def print_asset_details(asset): print() + # extracts an asset to zabbix assets def get_zabbix_compatible_assets_from_asset(asset): result = {} @@ -125,13 +126,14 @@ def get_zabbix_compatible_assets_from_asset(asset): result[key] = field_value return result + # Print asset details formatted as an input for Zabbix import script def print_asset_details_for_zabbix(asset): - assets = get_zabbix_compatible_assets_from_asset(asset) for key in assets.keys(): print(f"{key}: {assets[key]}") + def relay_toggle(rte, args): state_str = rte.relay_get() if state_str == "low": @@ -282,19 +284,21 @@ def sonoff_tgl(sonoff, args): except requests.exceptions.RequestException as e: print(f"Failed to toggle Sonoff relay state. Error: {e}") + def ask_to_proceed(message="Do you want to proceed (y/n): "): print("") while True: choice = input(message).lower() - if choice in ['y', 'n']: - return choice == 'y' + if choice in ["y", "n"]: + return choice == "y" else: print("Invalid input. Please enter 'y' or 'n'.") + def update_zabbix_assets(snipeit_api): zabbix = Zabbix() all_assets = snipeit_api.get_all_assets() - + current_zabbix_assets = zabbix.get_all_hosts() # snipeit assets but converted to zabbix-form assets snipeit_assets = {} @@ -305,28 +309,63 @@ def update_zabbix_assets(snipeit_api): for asset in all_assets: snipeit_assets.update(get_zabbix_compatible_assets_from_asset(asset)) - keys_not_present_in_zabbix = set(snipeit_assets.keys()) - set(current_zabbix_assets.keys()) + snipeit_assets_keys = list(snipeit_assets.keys()) + + duplicates = False + # check for duplicates + for i in range(snipeit_assets.__len__()): + for j in range(i + 1, snipeit_assets.__len__()): + if ( + snipeit_assets[snipeit_assets_keys[i]] + == snipeit_assets[snipeit_assets_keys[j]] + ): + print( + f"{snipeit_assets_keys[i]} has the same IP as {snipeit_assets_keys[j]}!" + ) + duplicates = True + if snipeit_assets_keys[i] == snipeit_assets_keys[j]: + print( + f"There are at least 2 assets with name {snipeit_assets_keys[i]} present!" + ) + duplicates = True + + if duplicates: + print( + "\nSnipeIT configuration errors have been detected! Fix them and then continue." + ) + return + + keys_not_present_in_zabbix = set(snipeit_assets.keys()) - set( + current_zabbix_assets.keys() + ) - keys_not_present_in_snipeit = set(current_zabbix_assets.keys()) - set(snipeit_assets.keys()) + keys_not_present_in_snipeit = set(current_zabbix_assets.keys()) - set( + snipeit_assets.keys() + ) - if keys_not_present_in_zabbix.__len__() > 0 or keys_not_present_in_snipeit.__len__() > 0: + if ( + keys_not_present_in_zabbix.__len__() > 0 + or keys_not_present_in_snipeit.__len__() > 0 + ): update_available = True common_keys = set(snipeit_assets.keys()) & set(current_zabbix_assets.keys()) - + if keys_not_present_in_zabbix.__len__() > 0: print("Assets not present in Zabbix (these will be added):") - print('\n'.join(keys_not_present_in_zabbix)) - + print("\n".join(keys_not_present_in_zabbix)) + if keys_not_present_in_snipeit.__len__() > 0: print("\nAssets present in Zabbix but not in SnipeIT (these will be removed):") - print('\n'.join(keys_not_present_in_snipeit)) + print("\n".join(keys_not_present_in_snipeit)) print("") keys_for_ip_change = [] for key in common_keys: if snipeit_assets[key] != current_zabbix_assets[key]: - print(f"{key} has wrong IP! (Zabbix one will be updated from {current_zabbix_assets[key]} to {snipeit_assets[key]})") + print( + f"{key} has wrong IP! (Zabbix one will be updated from {current_zabbix_assets[key]} to {snipeit_assets[key]})" + ) keys_for_ip_change.append(key) if keys_for_ip_change.__len__() > 0: @@ -339,21 +378,21 @@ def update_zabbix_assets(snipeit_api): if not ask_to_proceed("Do you want to appply above changes? (y/n): "): print("Changes were not applied") return - + # removing zabbix hosts for key in keys_not_present_in_snipeit: print(f"Removing {key}({current_zabbix_assets[key]})...") result = zabbix.remove_host_by_name(key) if "error" in result: print("Failed to remove the host!") - + # updating zabbix ips for key in keys_for_ip_change: print(f"Updating {key} IP to {snipeit_assets[key]}...") - result = zabbix.update_host_ip(key, snipeit_assets[key]) + result = zabbix.update_host_ip(key, snipeit_assets[key]) if "error" in result: print("Failed to change host's IP!") - + # adding zabbix hosts for key in keys_not_present_in_zabbix: print(f"Adding {key}({snipeit_assets[key]})...") @@ -362,6 +401,7 @@ def update_zabbix_assets(snipeit_api): except ValueError: print("Failed to add the host!") + # Main function def main(): parser = argparse.ArgumentParser(description="Open Source Firmware Validation CLI") @@ -413,8 +453,9 @@ def main(): ) update_zabbix_assets_parser = snipeit_subparsers.add_parser( - "update_zabbix", help="Syncs Zabbix assets with SnipeIT ones", - ) + "update_zabbix", + help="Syncs Zabbix assets with SnipeIT ones", + ) check_out_parser = snipeit_subparsers.add_parser( "check_out", help="Check out an asset by providing the Asset ID or RTE IP" diff --git a/osfv_cli/osfv_cli/rte.py b/osfv_cli/osfv_cli/rte.py index 0e2cb79..e5bf541 100644 --- a/osfv_cli/osfv_cli/rte.py +++ b/osfv_cli/osfv_cli/rte.py @@ -3,7 +3,6 @@ import paramiko import yaml - from rtectrl_api import rtectrl from sonoff_api import SonoffDevice diff --git a/osfv_cli/osfv_cli/zabbix.py b/osfv_cli/osfv_cli/zabbix.py index 09b62cb..0dbfddd 100644 --- a/osfv_cli/osfv_cli/zabbix.py +++ b/osfv_cli/osfv_cli/zabbix.py @@ -1,180 +1,178 @@ -import yaml -import requests import json -class Zabbix: +import requests +import yaml + +class Zabbix: CONFIG_FILENAME = "zabbix_config.yaml" - + def __init__(self): - - with open(Zabbix.CONFIG_FILENAME, 'r') as config_file: + with open(Zabbix.CONFIG_FILENAME, "r") as config_file: config = yaml.safe_load(config_file) - self.api_url = config['api_url'] - self.api_username = config['username'] - self.api_password = config['password'] + self.api_url = config["api_url"] + self.api_username = config["username"] + self.api_password = config["password"] self.auth_token = self.authenticate() - + return - + raise Exception("Failed to load " + Zabbix.CONFIG_FILENAME) def get_headers(self): - return {'Content-Type': 'application/json'} + return {"Content-Type": "application/json"} # Function to authenticate and retrieve the authentication token def authenticate(self): payload = { - 'jsonrpc': '2.0', - 'method': 'user.login', - 'params': { - 'user': self.api_username, - 'password': self.api_password - }, - 'id': 1, - 'auth': None + "jsonrpc": "2.0", + "method": "user.login", + "params": {"user": self.api_username, "password": self.api_password}, + "id": 1, + "auth": None, } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) result = response.json() - if 'result' in result: - return result['result'] - elif 'error' in result: - error_message = result['error']['message'] + if "result" in result: + return result["result"] + elif "error" in result: + error_message = result["error"]["message"] raise ValueError(f"Zabbix API authentication failed: {error_message}") else: raise ValueError("Invalid response from Zabbix API authentication") def get_all_hosts_json(self): payload = { - 'jsonrpc': '2.0', - 'method': 'host.get', - 'params': { -# "output": "extend", - "selectInterfaces": ['ip'], - "output": ['hostid', 'host'] - }, - 'id': 1, - 'auth': self.auth_token + "jsonrpc": "2.0", + "method": "host.get", + "params": {"selectInterfaces": ["ip"], "output": ["hostid", "host"]}, + "id": 1, + "auth": self.auth_token, } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) return response.json() def get_all_hosts(self): return self.format_hosts(self.get_all_hosts_json()) - - # converts hosts json to dictionary + + # converts hosts json to dictionary def format_hosts(self, hosts): result = {} - for host in hosts['result']: - result[host['host']] = host['interfaces'][0]['ip'] + for host in hosts["result"]: + result[host["host"]] = host["interfaces"][0]["ip"] return result # Function to add a new host with ICMP template def add_host(self, host_name, ip_address): payload = { - 'jsonrpc': '2.0', - 'method': 'host.get', - 'params': { - 'output': ['hostid'], - 'filter': { - 'host': [host_name] - } - }, - 'auth': self.auth_token, - 'id': 1 + "jsonrpc": "2.0", + "method": "host.get", + "params": {"output": ["hostid"], "filter": {"host": [host_name]}}, + "auth": self.auth_token, + "id": 1, } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) result = response.json() - if 'result' in result and len(result['result']) > 0: - print(f"A host with name '{host_name}' already exists. Skipping host creation.") - return result['result'][0]['hostid'] - - payload['params']['filter'] = { - 'ip': [ip_address] - } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + if "result" in result and len(result["result"]) > 0: + print( + f"A host with name '{host_name}' already exists. Skipping host creation." + ) + return result["result"][0]["hostid"] + + payload["params"]["filter"] = {"ip": [ip_address]} + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) result = response.json() - if 'result' in result and len(result['result']) > 0: - print(f"A host with IP address '{ip_address}' already exists. Skipping host creation.") - return result['result'][0]['hostid'] + if "result" in result and len(result["result"]) > 0: + print( + f"A host with IP address '{ip_address}' already exists. Skipping host creation." + ) + return result["result"][0]["hostid"] # Host doesn't exist, proceed with host creation payload = { - 'jsonrpc': '2.0', - 'method': 'host.create', - 'params': { - 'host': host_name, - 'interfaces': [{ - 'type': 1, - 'main': 1, - 'useip': 1, - 'ip': ip_address, - 'dns': '', - 'port': '10050' - }], - 'groups': [{ - 'groupid': '1' - }], - 'templates': [{ - 'templateid': '10186' # ICMP template ID - }] + "jsonrpc": "2.0", + "method": "host.create", + "params": { + "host": host_name, + "interfaces": [ + { + "type": 1, + "main": 1, + "useip": 1, + "ip": ip_address, + "dns": "", + "port": "10050", + } + ], + "groups": [{"groupid": "1"}], + "templates": [{"templateid": "10186"}], # ICMP template ID }, - 'auth': self.auth_token, - 'id': 1 + "auth": self.auth_token, + "id": 1, } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) result = response.json() - if 'result' in result and 'hostids' in result['result']: - return result['result']['hostids'][0] - elif 'error' in result: - error_message = result['error']['message'] + if "result" in result and "hostids" in result["result"]: + return result["result"]["hostids"][0] + elif "error" in result: + error_message = result["error"]["message"] raise ValueError(f"Failed to add host: {error_message}") else: raise ValueError("Invalid response from Zabbix API host creation") - def get_host_id_by_name(self, host_name): payload = { - 'jsonrpc': '2.0', - 'method': 'host.get', - 'params': { - 'output': ['hostid'], - 'filter': { - 'host': [host_name] - } - }, - 'auth': self.auth_token, - 'id': 1 + "jsonrpc": "2.0", + "method": "host.get", + "params": {"output": ["hostid"], "filter": {"host": [host_name]}}, + "auth": self.auth_token, + "id": 1, } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) data = response.json() # Extract the hostid - return data['result'][0]['hostid'] if data.get('result') else None + return data["result"][0]["hostid"] if data.get("result") else None def get_host_interface_id(self, host_name): payload = { - 'jsonrpc': '2.0', - 'method': 'host.get', - 'params': { - 'output': ['hostid'], - 'filter': { - 'host': [host_name] - }, - 'selectInterfaces': ['interfaceid'] + "jsonrpc": "2.0", + "method": "host.get", + "params": { + "output": ["hostid"], + "filter": {"host": [host_name]}, + "selectInterfaces": ["interfaceid"], }, - 'auth': self.auth_token, - 'id': 1 + "auth": self.auth_token, + "id": 1, } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) data = response.json() - return data['result'][0]['interfaces'][0]['interfaceid'] if data.get('result') else None + return ( + data["result"][0]["interfaces"][0]["interfaceid"] + if data.get("result") + else None + ) def update_host_ip(self, host_name, new_ip): interface_id = self.get_host_interface_id(host_name) @@ -182,17 +180,16 @@ def update_host_ip(self, host_name, new_ip): return {"error": "Could not find the host or its interface."} payload = { - 'jsonrpc': '2.0', - 'method': 'hostinterface.update', - 'params': { - 'interfaceid': interface_id, - 'ip': new_ip - }, - 'auth': self.auth_token, - 'id': 2 + "jsonrpc": "2.0", + "method": "hostinterface.update", + "params": {"interfaceid": interface_id, "ip": new_ip}, + "auth": self.auth_token, + "id": 2, } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) return response.json() def remove_host_by_name(self, host_name): @@ -201,16 +198,17 @@ def remove_host_by_name(self, host_name): return {"error": "Could not find the host."} payload = { - 'jsonrpc': '2.0', - 'method': 'host.delete', - 'params': [host_id], - 'auth': self.auth_token, - 'id': 2 + "jsonrpc": "2.0", + "method": "host.delete", + "params": [host_id], + "auth": self.auth_token, + "id": 2, } - response = requests.post(self.api_url, headers=self.get_headers(), data=json.dumps(payload)) + response = requests.post( + self.api_url, headers=self.get_headers(), data=json.dumps(payload) + ) return response.json() + z = Zabbix() -#print(json.dumps(z.get_all_hosts(), indent=4)) -#print(z.remove_host_by_name("BrightSign_RTE_IP")) diff --git a/osfv_cli/osfv_cli/zabbix_config.yaml b/osfv_cli/osfv_cli/zabbix_config.yaml index 10284ef..bb74a64 100644 --- a/osfv_cli/osfv_cli/zabbix_config.yaml +++ b/osfv_cli/osfv_cli/zabbix_config.yaml @@ -1,3 +1,4 @@ +--- api_url: 'http://192.168.4.232/zabbix/api_jsonrpc.php' username: 'Admin' password: 'zabbix'