From 615e5312a2fa89e9f4736832338ce7a9256e7004 Mon Sep 17 00:00:00 2001 From: Travis Van Duyn Date: Wed, 5 May 2021 12:22:40 -0700 Subject: [PATCH] [show][config] Add new snmp commands (#1347) Added new SNMP show and config commands using ConfigDB as well as unittests. show commands: show runningconfiguration snmp show runningconfiguration snmp contact [--json] show runningconfiguration snmp location [--json] show runningconfiguration snmp community [--json] show runningconfiguration snmp user [--json] config commands: sudo config snmp community add/del/replace sudo config snmp contact add/del/modify sudo config snmp location add/del/modify sudo config snmp user add/del --- config/main.py | 540 +++++++++++++++++++ show/main.py | 179 ++++++- tests/config_snmp_test.py | 872 +++++++++++++++++++++++++++++++ tests/mock_tables/config_db.json | 166 ++++++ tests/show_snmp_test.py | 467 +++++++++++++++++ 5 files changed, 2210 insertions(+), 14 deletions(-) create mode 100644 tests/config_snmp_test.py create mode 100644 tests/show_snmp_test.py diff --git a/config/main.py b/config/main.py index e9bab3172d..953af72e79 100644 --- a/config/main.py +++ b/config/main.py @@ -2191,6 +2191,546 @@ def delete_snmptrap_server(ctx, ver): cmd="systemctl restart snmp" os.system (cmd) + + +# +# 'snmp' group ('config snmp ...') +# +@config.group(cls=clicommon.AbbreviationGroup, name='snmp') +@clicommon.pass_db +def snmp(db): + """SNMP configuration tasks""" + + +@snmp.group(cls=clicommon.AbbreviationGroup) +@clicommon.pass_db +def community(db): + pass + + +def is_valid_community_type(commstr_type): + commstr_types = ['RO', 'RW'] + if commstr_type not in commstr_types: + click.echo("Invalid community type. Must be either RO or RW") + return False + return True + + +def is_valid_user_type(user_type): + convert_user_type = {'noauthnopriv': 'noAuthNoPriv', 'authnopriv': 'AuthNoPriv', 'priv': 'Priv'} + if user_type not in convert_user_type: + message = ("Invalid user type. Must be one of these one of these three " + "'noauthnopriv' or 'authnopriv' or 'priv'") + click.echo(message) + return False, message + return True, convert_user_type[user_type] + + +def is_valid_auth_type(user_auth_type): + user_auth_types = ['MD5', 'SHA', 'HMAC-SHA-2'] + if user_auth_type not in user_auth_types: + click.echo("Invalid user authentication type. Must be one of these 'MD5', 'SHA', or 'HMAC-SHA-2'") + return False + return True + + +def is_valid_encrypt_type(encrypt_type): + encrypt_types = ['DES', 'AES'] + if encrypt_type not in encrypt_types: + click.echo("Invalid user encryption type. Must be one of these two 'DES' or 'AES'") + return False + return True + + +def snmp_community_secret_check(snmp_secret): + excluded_special_symbols = ['@', ":"] + if len(snmp_secret) > 32: + click.echo("SNMP community string length should be not be greater than 32") + click.echo("SNMP community string should not have any of these special " + "symbols {}".format(excluded_special_symbols)) + click.echo("FAILED: SNMP community string length should be not be greater than 32") + return False + if any(char in excluded_special_symbols for char in snmp_secret): + click.echo("SNMP community string length should be not be greater than 32") + click.echo("SNMP community string should not have any of these special " + "symbols {}".format(excluded_special_symbols)) + click.echo("FAILED: SNMP community string should not have any of these " + "special symbols {}".format(excluded_special_symbols)) + return False + return True + + +def snmp_username_check(snmp_username): + excluded_special_symbols = ['@', ":"] + if len(snmp_username) > 32: + click.echo("SNMP user {} length should be not be greater than 32 characters".format(snmp_username)) + click.echo("SNMP community string should not have any of these special " + "symbols {}".format(excluded_special_symbols)) + click.echo("FAILED: SNMP user {} length should not be greater than 32 characters".format(snmp_username)) + return False + if any(char in excluded_special_symbols for char in snmp_username): + click.echo("SNMP user {} length should be not be greater than 32 characters".format(snmp_username)) + click.echo("SNMP community string should not have any of these special " + "symbols {}".format(excluded_special_symbols)) + click.echo("FAILED: SNMP user {} should not have any of these special " + "symbols {}".format(snmp_username, excluded_special_symbols)) + return False + return True + + +def snmp_user_secret_check(snmp_secret): + excluded_special_symbols = ['@', ":"] + if len(snmp_secret) < 8: + click.echo("SNMP user password length should be at least 8 characters") + click.echo("SNMP user password length should be not be greater than 64") + click.echo("SNMP user password should not have any of these special " + "symbols {}".format(excluded_special_symbols)) + click.echo("FAILED: SNMP user password length should be at least 8 characters") + return False + if len(snmp_secret) > 64: + click.echo("SNMP user password length should be at least 8 characters") + click.echo("SNMP user password length should be not be greater than 64") + click.echo("SNMP user password should not have any of these special " + "symbols {}".format(excluded_special_symbols)) + click.echo("FAILED: SNMP user password length should be not be greater than 64") + return False + if any(char in excluded_special_symbols for char in snmp_secret): + click.echo("SNMP user password length should be at least 8 characters") + click.echo("SNMP user password length should be not be greater than 64") + click.echo("SNMP user password should not have any of these special " + "symbols {}".format(excluded_special_symbols)) + click.echo("FAILED: SNMP user password should not have any of these special " + "symbols {}".format(excluded_special_symbols)) + return False + return True + + +@community.command('add') +@click.argument('community', metavar='', required=True) +@click.argument('string_type', metavar='', required=True) +@clicommon.pass_db +def add_community(db, community, string_type): + """ Add snmp community string""" + string_type = string_type.upper() + if not is_valid_community_type(string_type): + sys.exit(1) + if not snmp_community_secret_check(community): + sys.exit(2) + snmp_communities = db.cfgdb.get_table("SNMP_COMMUNITY") + if community in snmp_communities: + click.echo("SNMP community {} is already configured".format(community)) + sys.exit(3) + db.cfgdb.set_entry('SNMP_COMMUNITY', community, {'TYPE': string_type}) + click.echo("SNMP community {} added to configuration".format(community)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + + +@community.command('del') +@click.argument('community', metavar='', required=True) +@clicommon.pass_db +def del_community(db, community): + """ Delete snmp community string""" + snmp_communities = db.cfgdb.get_table("SNMP_COMMUNITY") + if community not in snmp_communities: + click.echo("SNMP community {} is not configured".format(community)) + sys.exit(1) + else: + db.cfgdb.set_entry('SNMP_COMMUNITY', community, None) + click.echo("SNMP community {} removed from configuration".format(community)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + + +@community.command('replace') +@click.argument('current_community', metavar='', required=True) +@click.argument('new_community', metavar='', required=True) +@clicommon.pass_db +def replace_community(db, current_community, new_community): + """ Replace snmp community string""" + snmp_communities = db.cfgdb.get_table("SNMP_COMMUNITY") + if not current_community in snmp_communities: + click.echo("Current SNMP community {} is not configured".format(current_community)) + sys.exit(1) + if not snmp_community_secret_check(new_community): + sys.exit(2) + elif new_community in snmp_communities: + click.echo("New SNMP community {} to replace current SNMP community {} already " + "configured".format(new_community, current_community)) + sys.exit(3) + else: + string_type = snmp_communities[current_community]['TYPE'] + db.cfgdb.set_entry('SNMP_COMMUNITY', new_community, {'TYPE': string_type}) + click.echo("SNMP community {} added to configuration".format(new_community)) + db.cfgdb.set_entry('SNMP_COMMUNITY', current_community, None) + click.echo('SNMP community {} replace community {}'.format(new_community, current_community)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + + +@snmp.group(cls=clicommon.AbbreviationGroup) +@clicommon.pass_db +def contact(db): + pass + + +def is_valid_email(email): + return bool(re.search(r"^[\w\.\+\-]+\@[\w]+\.[a-z]{2,3}$", email)) + + +@contact.command('add') +@click.argument('contact', metavar='', required=True) +@click.argument('contact_email', metavar='', required=True) +@clicommon.pass_db +def add_contact(db, contact, contact_email): + """ Add snmp contact name and email """ + snmp = db.cfgdb.get_table("SNMP") + try: + if snmp['CONTACT']: + click.echo("Contact already exists. Use sudo config snmp contact modify instead") + sys.exit(1) + else: + db.cfgdb.set_entry('SNMP', 'CONTACT', {contact: contact_email}) + click.echo("Contact name {} and contact email {} have been added to " + "configuration".format(contact, contact_email)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + except KeyError: + if "CONTACT" not in snmp.keys(): + if not is_valid_email(contact_email): + click.echo("Contact email {} is not valid".format(contact_email)) + sys.exit(2) + db.cfgdb.set_entry('SNMP', 'CONTACT', {contact: contact_email}) + click.echo("Contact name {} and contact email {} have been added to " + "configuration".format(contact, contact_email)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + + +@contact.command('del') +@click.argument('contact', metavar='', required=True) +@clicommon.pass_db +def del_contact(db, contact): + """ Delete snmp contact name and email """ + snmp = db.cfgdb.get_table("SNMP") + try: + if not contact in (list(snmp['CONTACT'].keys()))[0]: + click.echo("SNMP contact {} is not configured".format(contact)) + sys.exit(1) + else: + db.cfgdb.set_entry('SNMP', 'CONTACT', None) + click.echo("SNMP contact {} removed from configuration".format(contact)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + except KeyError: + if "CONTACT" not in snmp.keys(): + click.echo("Contact name {} is not configured".format(contact)) + sys.exit(2) + + +@contact.command('modify') +@click.argument('contact', metavar='', required=True) +@click.argument('contact_email', metavar='', required=True) +@clicommon.pass_db +def modify_contact(db, contact, contact_email): + """ Modify snmp contact""" + snmp = db.cfgdb.get_table("SNMP") + try: + current_snmp_contact_name = (list(snmp['CONTACT'].keys()))[0] + if current_snmp_contact_name == contact: + current_snmp_contact_email = snmp['CONTACT'][contact] + else: + current_snmp_contact_email = '' + if contact == current_snmp_contact_name and contact_email == current_snmp_contact_email: + click.echo("SNMP contact {} {} already exists".format(contact, contact_email)) + sys.exit(1) + elif contact == current_snmp_contact_name and contact_email != current_snmp_contact_email: + if not is_valid_email(contact_email): + click.echo("Contact email {} is not valid".format(contact_email)) + sys.exit(2) + db.cfgdb.mod_entry('SNMP', 'CONTACT', {contact: contact_email}) + click.echo("SNMP contact {} email updated to {}".format(contact, contact_email)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + else: + if not is_valid_email(contact_email): + click.echo("Contact email {} is not valid".format(contact_email)) + sys.exit(2) + db.cfgdb.set_entry('SNMP', 'CONTACT', None) + db.cfgdb.set_entry('SNMP', 'CONTACT', {contact: contact_email}) + click.echo("SNMP contact {} and contact email {} updated".format(contact, contact_email)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + except KeyError: + if "CONTACT" not in snmp.keys(): + click.echo("Contact name {} is not configured".format(contact)) + sys.exit(3) + + +@snmp.group(cls=clicommon.AbbreviationGroup) +@clicommon.pass_db +def location(db): + pass + + +@location.command('add') +@click.argument('location', metavar='', required=True, nargs=-1) +@clicommon.pass_db +def add_location(db, location): + """ Add snmp location""" + if isinstance(location, tuple): + location = " ".join(location) + elif isinstance(location, list): + location = " ".join(location) + snmp = db.cfgdb.get_table("SNMP") + try: + if snmp['LOCATION']: + click.echo("Location already exists") + sys.exit(1) + except KeyError: + if "LOCATION" not in snmp.keys(): + db.cfgdb.set_entry('SNMP', 'LOCATION', {'Location': location}) + click.echo("SNMP Location {} has been added to configuration".format(location)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + + +@location.command('del') +@click.argument('location', metavar='', required=True, nargs=-1) +@clicommon.pass_db +def delete_location(db, location): + """ Delete snmp location""" + if isinstance(location, tuple): + location = " ".join(location) + elif isinstance(location, list): + location = " ".join(location) + snmp = db.cfgdb.get_table("SNMP") + try: + if location == snmp['LOCATION']['Location']: + db.cfgdb.set_entry('SNMP', 'LOCATION', None) + click.echo("SNMP Location {} removed from configuration".format(location)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + else: + click.echo("SNMP Location {} does not exist. The location is {}".format(location, snmp['LOCATION']['Location'])) + sys.exit(1) + except KeyError: + if "LOCATION" not in snmp.keys(): + click.echo("SNMP Location {} is not configured".format(location)) + sys.exit(2) + + +@location.command('modify') +@click.argument('location', metavar='', required=True, nargs=-1) +@clicommon.pass_db +def modify_location(db, location): + """ Modify snmp location""" + if isinstance(location, tuple): + location = " ".join(location) + elif isinstance(location, list): + location = " ".join(location) + snmp = db.cfgdb.get_table("SNMP") + try: + snmp_location = snmp['LOCATION']['Location'] + if location in snmp_location: + click.echo("SNMP location {} already exists".format(location)) + sys.exit(1) + else: + db.cfgdb.mod_entry('SNMP', 'LOCATION', {'Location': location}) + click.echo("SNMP location {} modified in configuration".format(location)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + except KeyError: + click.echo("Cannot modify SNMP Location. You must use 'config snmp location add command '") + sys.exit(2) + + +from enum import IntEnum +class SnmpUserError(IntEnum): + NameCheckFailure = 1 + TypeNoAuthNoPrivOrAuthNoPrivOrPrivCheckFailure = 2 + RoRwCheckFailure = 3 + NoAuthNoPrivHasAuthType = 4 + AuthTypeMd5OrShaOrHmacsha2IsMissing = 5 + AuthTypeMd5OrShaOrHmacsha2Failure = 6 + AuthPasswordMissing = 7 + AuthPasswordFailsComplexityRequirements = 8 + EncryptPasswordNotAllowedWithAuthNoPriv = 9 + EncryptTypeDesOrAesIsMissing = 10 + EncryptTypeFailsComplexityRequirements = 11 + EncryptPasswordMissingFailure = 12 + EncryptPasswordFailsComplexityRequirements = 13 + UserAlreadyConfigured = 14 + + +@snmp.group(cls=clicommon.AbbreviationGroup) +@clicommon.pass_db +def user(db): + pass + + +@user.command('add') +@click.argument('user', metavar='', required=True) +@click.argument('user_type', metavar='', required=True) +@click.argument('user_permission_type', metavar='', required=True) +@click.argument('user_auth_type', metavar='', required=False) +@click.argument('user_auth_password', metavar='', required=False) +@click.argument('user_encrypt_type', metavar='', required=False) +@click.argument('user_encrypt_password', metavar='', required=False) +@clicommon.pass_db +def add_user(db, user, user_type, user_permission_type, user_auth_type, user_auth_password, user_encrypt_type, + user_encrypt_password): + """ Add snmp user""" + if not snmp_username_check(user): + sys.exit(SnmpUserError.NameCheckFailure) + user_type = user_type.lower() + user_type_info = is_valid_user_type(user_type) + if not user_type_info[0]: + sys.exit(SnmpUserError.TypeNoAuthNoPrivOrAuthNoPrivOrPrivCheckFailure) + user_type = user_type_info[1] + user_permission_type = user_permission_type.upper() + if not is_valid_community_type(user_permission_type): + sys.exit(SnmpUserError.RoRwCheckFailure) + if user_type == "noAuthNoPriv": + if user_auth_type: + click.echo("User auth type not used with 'noAuthNoPriv'. Please use 'AuthNoPriv' or 'Priv' instead") + sys.exit(SnmpUserError.NoAuthNoPrivHasAuthType) + else: + if not user_auth_type: + click.echo("User auth type is missing. Must be MD5, SHA, or HMAC-SHA-2") + sys.exit(SnmpUserError.AuthTypeMd5OrShaOrHmacsha2IsMissing) + if user_auth_type: + user_auth_type = user_auth_type.upper() + if not is_valid_auth_type(user_auth_type): + sys.exit(SnmpUserError.AuthTypeMd5OrShaOrHmacsha2Failure) + elif not user_auth_password: + click.echo("User auth password is missing") + sys.exit(SnmpUserError.AuthPasswordMissing) + elif user_auth_password: + if not snmp_user_secret_check(user_auth_password): + sys.exit(SnmpUserError.AuthPasswordFailsComplexityRequirements) + if user_type == "AuthNoPriv": + if user_encrypt_type: + click.echo("User encrypt type not used with 'AuthNoPriv'. Please use 'Priv' instead") + sys.exit(SnmpUserError.EncryptPasswordNotAllowedWithAuthNoPriv) + elif user_type == "Priv": + if not user_encrypt_type: + click.echo("User encrypt type is missing. Must be DES or AES") + sys.exit(SnmpUserError.EncryptTypeDesOrAesIsMissing) + if user_encrypt_type: + user_encrypt_type = user_encrypt_type.upper() + if not is_valid_encrypt_type(user_encrypt_type): + sys.exit(SnmpUserError.EncryptTypeFailsComplexityRequirements) + elif not user_encrypt_password: + click.echo("User encrypt password is missing") + sys.exit(SnmpUserError.EncryptPasswordMissingFailure) + elif user_encrypt_password: + if not snmp_user_secret_check(user_encrypt_password): + sys.exit(SnmpUserError.EncryptPasswordFailsComplexityRequirements) + snmp_users = db.cfgdb.get_table("SNMP_USER") + if user in snmp_users.keys(): + click.echo("SNMP user {} is already configured".format(user)) + sys.exit(SnmpUserError.UserAlreadyConfigured) + else: + if not user_auth_type: + user_auth_type = '' + if not user_auth_password: + user_auth_password = '' + if not user_encrypt_type: + user_encrypt_type = '' + if not user_encrypt_password: + user_encrypt_password = '' + db.cfgdb.set_entry('SNMP_USER', user, {'SNMP_USER_TYPE': user_type, + 'SNMP_USER_PERMISSION': user_permission_type, + 'SNMP_USER_AUTH_TYPE': user_auth_type, + 'SNMP_USER_AUTH_PASSWORD': user_auth_password, + 'SNMP_USER_ENCRYPTION_TYPE': user_encrypt_type, + 'SNMP_USER_ENCRYPTION_PASSWORD': user_encrypt_password}) + click.echo("SNMP user {} added to configuration".format(user)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + + +@user.command('del') +@click.argument('user', metavar='', required=True) +@clicommon.pass_db +def del_user(db, user): + """ Del snmp user""" + snmp_users = db.cfgdb.get_table("SNMP_USER") + if user not in snmp_users: + click.echo("SNMP user {} is not configured".format(user)) + sys.exit(1) + else: + db.cfgdb.set_entry('SNMP_USER', user, None) + click.echo("SNMP user {} removed from configuration".format(user)) + try: + click.echo("Restarting SNMP service...") + clicommon.run_command("systemctl reset-failed snmp.service", display_cmd=False) + clicommon.run_command("systemctl restart snmp.service", display_cmd=False) + except SystemExit as e: + click.echo("Restart service snmp failed with error {}".format(e)) + raise click.Abort() + # # 'bgp' group ('config bgp ...') # diff --git a/show/main.py b/show/main.py index d0ca14650a..1cea9e6534 100755 --- a/show/main.py +++ b/show/main.py @@ -377,6 +377,7 @@ def snmptrap (ctx): body.append([ver, traptable[row]['DestIp'], traptable[row]['DestPort'], traptable[row]['vrf'], traptable[row]['Community']]) click.echo(tabulate(body, header)) + # # 'subinterfaces' group ("show subinterfaces ...") # @@ -1109,20 +1110,6 @@ def interfaces(interfacename, verbose): run_command(cmd, display_cmd=verbose) -# 'snmp' subcommand ("show runningconfiguration snmp") -@runningconfiguration.command() -@click.argument('server', required=False) -@click.option('--verbose', is_flag=True, help="Enable verbose output") -def snmp(server, verbose): - """Show SNMP information""" - cmd = "sudo docker exec snmp cat /etc/snmp/snmpd.conf" - - if server is not None: - cmd += " | grep -i agentAddress" - - run_command(cmd, display_cmd=verbose) - - # 'ntp' subcommand ("show runningconfiguration ntp") @runningconfiguration.command() @click.option('--verbose', is_flag=True, help="Enable verbose output") @@ -1140,6 +1127,170 @@ def ntp(verbose): print(tabulate(ntp_dict, headers=list(ntp_dict.keys()), tablefmt="simple", stralign='left', missingval="")) + +# 'snmp' subcommand ("show runningconfiguration snmp") +@runningconfiguration.group("snmp", invoke_without_command=True) +@clicommon.pass_db +@click.pass_context +def snmp(ctx, db): + """Show SNMP running configuration""" + if ctx.invoked_subcommand is None: + show_run_snmp(db.cfgdb) + + +# ("show runningconfiguration snmp community") +@snmp.command('community') +@click.option('--json', 'json_output', required=False, is_flag=True, type=click.BOOL, + help="Display the output in JSON format") +@clicommon.pass_db +def community(db, json_output): + """show SNMP running configuration community""" + snmp_comm_header = ["Community String", "Community Type"] + snmp_comm_body = [] + snmp_comm_keys = db.cfgdb.get_table('SNMP_COMMUNITY') + snmp_comm_strings = snmp_comm_keys.keys() + if json_output: + click.echo(snmp_comm_keys) + else: + for line in snmp_comm_strings: + comm_string = line + comm_string_type = snmp_comm_keys[line]['TYPE'] + snmp_comm_body.append([comm_string, comm_string_type]) + click.echo(tabulate(natsorted(snmp_comm_body), snmp_comm_header)) + + +# ("show runningconfiguration snmp contact") +@snmp.command('contact') +@click.option('--json', 'json_output', required=False, is_flag=True, type=click.BOOL, + help="Display the output in JSON format") +@clicommon.pass_db +def contact(db, json_output): + """show SNMP running configuration contact""" + snmp = db.cfgdb.get_table('SNMP') + snmp_header = ["Contact", "Contact Email"] + snmp_body = [] + if json_output: + try: + if snmp['CONTACT']: + click.echo(snmp['CONTACT']) + except KeyError: + snmp['CONTACT'] = {} + click.echo(snmp['CONTACT']) + else: + try: + if snmp['CONTACT']: + snmp_contact = list(snmp['CONTACT'].keys()) + snmp_contact_email = [snmp['CONTACT'][snmp_contact[0]]] + snmp_body.append([snmp_contact[0], snmp_contact_email[0]]) + except KeyError: + snmp['CONTACT'] = '' + click.echo(tabulate(snmp_body, snmp_header)) + + +# ("show runningconfiguration snmp location") +@snmp.command('location') +@click.option('--json', 'json_output', required=False, is_flag=True, type=click.BOOL, + help="Display the output in JSON format") +@clicommon.pass_db +def location(db, json_output): + """show SNMP running configuration location""" + snmp = db.cfgdb.get_table('SNMP') + snmp_header = ["Location"] + snmp_body = [] + if json_output: + try: + if snmp['LOCATION']: + click.echo(snmp['LOCATION']) + except KeyError: + snmp['LOCATION'] = {} + click.echo(snmp['LOCATION']) + else: + try: + if snmp['LOCATION']: + snmp_location = [snmp['LOCATION']['Location']] + snmp_body.append(snmp_location) + except KeyError: + snmp['LOCATION'] = '' + click.echo(tabulate(snmp_body, snmp_header)) + + +# ("show runningconfiguration snmp user") +@snmp.command('user') +@click.option('--json', 'json_output', required=False, is_flag=True, type=click.BOOL, + help="Display the output in JSON format") +@clicommon.pass_db +def users(db, json_output): + """show SNMP running configuration user""" + snmp_users = db.cfgdb.get_table('SNMP_USER') + snmp_user_header = ['User', "Permission Type", "Type", "Auth Type", "Auth Password", "Encryption Type", + "Encryption Password"] + snmp_user_body = [] + if json_output: + click.echo(snmp_users) + else: + for snmp_user, snmp_user_value in snmp_users.items(): + snmp_user_permissions_type = snmp_users[snmp_user].get('SNMP_USER_PERMISSION', 'Null') + snmp_user_auth_type = snmp_users[snmp_user].get('SNMP_USER_AUTH_TYPE', 'Null') + snmp_user_auth_password = snmp_users[snmp_user].get('SNMP_USER_AUTH_PASSWORD', 'Null') + snmp_user_encryption_type = snmp_users[snmp_user].get('SNMP_USER_ENCRYPTION_TYPE', 'Null') + snmp_user_encryption_password = snmp_users[snmp_user].get('SNMP_USER_ENCRYPTION_PASSWORD', 'Null') + snmp_user_type = snmp_users[snmp_user].get('SNMP_USER_TYPE', 'Null') + snmp_user_body.append([snmp_user, snmp_user_permissions_type, snmp_user_type, snmp_user_auth_type, + snmp_user_auth_password, snmp_user_encryption_type, snmp_user_encryption_password]) + click.echo(tabulate(natsorted(snmp_user_body), snmp_user_header)) + + +# ("show runningconfiguration snmp") +@clicommon.pass_db +def show_run_snmp(db, ctx): + snmp_contact_location_table = db.cfgdb.get_table('SNMP') + snmp_comm_table = db.cfgdb.get_table('SNMP_COMMUNITY') + snmp_users = db.cfgdb.get_table('SNMP_USER') + snmp_location_header = ["Location"] + snmp_location_body = [] + snmp_contact_header = ["SNMP_CONTACT", "SNMP_CONTACT_EMAIL"] + snmp_contact_body = [] + snmp_comm_header = ["Community String", "Community Type"] + snmp_comm_body = [] + snmp_user_header = ['User', "Permission Type", "Type", "Auth Type", "Auth Password", "Encryption Type", + "Encryption Password"] + snmp_user_body = [] + try: + if snmp_contact_location_table['LOCATION']: + snmp_location = [snmp_contact_location_table['LOCATION']['Location']] + snmp_location_body.append(snmp_location) + except KeyError: + snmp_contact_location_table['LOCATION'] = '' + click.echo(tabulate(snmp_location_body, snmp_location_header)) + click.echo("\n") + try: + if snmp_contact_location_table['CONTACT']: + snmp_contact = list(snmp_contact_location_table['CONTACT'].keys()) + snmp_contact_email = [snmp_contact_location_table['CONTACT'][snmp_contact[0]]] + snmp_contact_body.append([snmp_contact[0], snmp_contact_email[0]]) + except KeyError: + snmp_contact_location_table['CONTACT'] = '' + click.echo(tabulate(snmp_contact_body, snmp_contact_header)) + click.echo("\n") + snmp_comm_strings = snmp_comm_table.keys() + for line in snmp_comm_strings: + comm_string = line + comm_string_type = snmp_comm_table[line]['TYPE'] + snmp_comm_body.append([comm_string, comm_string_type]) + click.echo(tabulate(natsorted(snmp_comm_body), snmp_comm_header)) + click.echo("\n") + for snmp_user, snmp_user_value in snmp_users.items(): + snmp_user_permissions_type = snmp_users[snmp_user].get('SNMP_USER_PERMISSION', 'Null') + snmp_user_auth_type = snmp_users[snmp_user].get('SNMP_USER_AUTH_TYPE', 'Null') + snmp_user_auth_password = snmp_users[snmp_user].get('SNMP_USER_AUTH_PASSWORD', 'Null') + snmp_user_encryption_type = snmp_users[snmp_user].get('SNMP_USER_ENCRYPTION_TYPE', 'Null') + snmp_user_encryption_password = snmp_users[snmp_user].get('SNMP_USER_ENCRYPTION_PASSWORD', 'Null') + snmp_user_type = snmp_users[snmp_user].get('SNMP_USER_TYPE', 'Null') + snmp_user_body.append([snmp_user, snmp_user_permissions_type, snmp_user_type, snmp_user_auth_type, + snmp_user_auth_password, snmp_user_encryption_type, snmp_user_encryption_password]) + click.echo(tabulate(natsorted(snmp_user_body), snmp_user_header)) + + # 'syslog' subcommand ("show runningconfiguration syslog") @runningconfiguration.command() @click.option('--verbose', is_flag=True, help="Enable verbose output") diff --git a/tests/config_snmp_test.py b/tests/config_snmp_test.py new file mode 100644 index 0000000000..1be2704e47 --- /dev/null +++ b/tests/config_snmp_test.py @@ -0,0 +1,872 @@ +import sys +import os +import click +from click.testing import CliRunner + +import show.main as show +import clear.main as clear +import config.main as config + +import pytest + +from unittest import mock +from unittest.mock import patch +from utilities_common.db import Db + +tabular_data_show_run_snmp_contact_expected = """\ +Contact Contact Email\n--------- --------------------\ntestuser testuser@contoso.com +""" + +json_data_show_run_snmp_contact_expected = """\ +{'testuser': 'testuser@contoso.com'} +""" + +config_snmp_contact_add_del_new_contact ="""\ +Contact name testuser and contact email testuser@contoso.com have been added to configuration +Restarting SNMP service... +""" + +config_snmp_location_add_new_location ="""\ +SNMP Location public has been added to configuration +Restarting SNMP service... +""" + + +expected_snmp_community_add_new_community_ro_output = {"TYPE": "RO"} +expected_snmp_community_add_new_community_rw_output = {"TYPE": "RW"} +expected_snmp_community_replace_existing_community_with_new_community_output = {'TYPE': 'RW'} + +expected_snmp_user_priv_ro_md5_des_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'MD5', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'DES', + 'SNMP_USER_PERMISSION': 'RO', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_ro_md5_aes_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'MD5', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'AES', + 'SNMP_USER_PERMISSION': 'RO', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_ro_sha_des_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'SHA', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'DES', + 'SNMP_USER_PERMISSION': 'RO', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_ro_sha_aes_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'SHA', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'AES', + 'SNMP_USER_PERMISSION': 'RO', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_ro_hmac_sha_2_des_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'DES', + 'SNMP_USER_PERMISSION': 'RO', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_ro_hmac_sha_2_aes_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'AES', + 'SNMP_USER_PERMISSION': 'RO', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_rw_md5_des_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'MD5', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'DES', + 'SNMP_USER_PERMISSION': 'RW', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_rw_md5_aes_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'MD5', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'AES', + 'SNMP_USER_PERMISSION': 'RW', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_rw_sha_des_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'SHA', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'DES', + 'SNMP_USER_PERMISSION': 'RW', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_rw_sha_aes_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'SHA', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'AES', + 'SNMP_USER_PERMISSION': 'RW', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_rw_hmac_sha_2_des_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'DES', + 'SNMP_USER_PERMISSION': 'RW', + 'SNMP_USER_TYPE': 'Priv'} +expected_snmp_user_priv_rw_hmac_sha_2_aes_config_db_output = {'SNMP_USER_AUTH_PASSWORD': 'user_auth_pass', + 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', + 'SNMP_USER_ENCRYPTION_PASSWORD': 'user_encrypt_pass', + 'SNMP_USER_ENCRYPTION_TYPE': 'AES', + 'SNMP_USER_PERMISSION': 'RW', + 'SNMP_USER_TYPE': 'Priv'} + +class TestSNMPConfigCommands(object): + @classmethod + def setup_class(cls): + print("SETUP") + os.environ["UTILITIES_UNIT_TESTING"] = "1" + + # Add snmp community tests + def test_config_snmp_community_add_new_community_ro(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["add"], + ["Everest", "ro"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP community Everest added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_COMMUNITY", "Everest") == expected_snmp_community_add_new_community_ro_output + + def test_config_snmp_community_add_new_community_rw(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["add"], + ["Shasta", "rw"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP community Shasta added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_COMMUNITY", "Shasta") == expected_snmp_community_add_new_community_rw_output + + def test_config_snmp_community_add_new_community_with_invalid_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["add"], ["Everest", "RT"]) + print(result.exit_code) + assert result.exit_code == 1 + assert 'Invalid community type. Must be either RO or RW' in result.output + + def test_config_snmp_community_add_invalid_community_over_32_characters(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["add"], + ["over_32_character_community_string", "ro"]) + print(result.exit_code) + assert result.exit_code == 2 + assert 'FAILED: SNMP community string length should be not be greater than 32' in result.output + + def test_config_snmp_community_add_invalid_community_with_excluded_special_characters(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["add"], + ["Test@snmp", "ro"]) + print(result.exit_code) + assert result.exit_code == 2 + assert 'FAILED: SNMP community string should not have any of these special symbols' in result.output + + def test_config_snmp_community_add_existing_community(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["add"], ["Rainer", "rw"]) + print(result.exit_code) + assert result.exit_code == 3 + assert 'SNMP community Rainer is already configured' in result.output + + # Del snmp community tests + def test_config_snmp_community_del_existing_community(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["del"], + ["Rainer"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP community Rainer removed from configuration' in result.output + assert db.cfgdb.get_entry("SNMP_COMMUNITY", "Everest") == {} + + def test_config_snmp_community_del_non_existing_community(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["del"], ["Everest"]) + print(result.exit_code) + assert result.exit_code == 1 + assert 'SNMP community Everest is not configured' in result.output + + # Replace snmp community tests + def test_config_snmp_community_replace_existing_community_with_new_community(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["replace"], + ["Rainer", "Everest"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP community Everest added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_COMMUNITY", "Everest") == \ + expected_snmp_community_replace_existing_community_with_new_community_output + + def test_config_snmp_community_replace_existing_community_non_existing_community(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["replace"], + ["Denali", "Everest"]) + print(result.exit_code) + assert result.exit_code == 1 + assert 'Current SNMP community Denali is not configured' in result.output + + def test_config_snmp_community_replace_new_community_already_exists(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["replace"], + ["Rainer", "msft"]) + print(result.exit_code) + assert result.exit_code == 3 + assert 'New SNMP community msft to replace current SNMP community Rainer already configured' in result.output + + def test_config_snmp_community_replace_with_invalid_new_community_bad_symbol(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["replace"], + ["Rainer", "msft@"]) + print(result.exit_code) + assert result.exit_code == 2 + assert 'FAILED: SNMP community string should not have any of these special symbols' in result.output + + def test_config_snmp_community_replace_with_invalid_new_community_over_32_chars(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["community"].commands["replace"], + ["Rainer", "over_32_characters_community_string"]) + print(result.exit_code) + assert result.exit_code == 2 + assert 'FAILED: SNMP community string length should be not be greater than 32' in result.output + + + # Del snmp contact when CONTACT not setup in REDIS + def test_config_snmp_contact_del_without_contact_redis(self): + db = Db() + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["del"], ["blah"], obj=db) + print(result.exit_code) + assert result.exit_code == 2 + assert 'Contact name blah is not configured' in result.output + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {} + + def test_config_snmp_contact_modify_without_contact_redis(self): + db = Db() + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["modify"], + ["blah", "blah@contoso.com"], obj=db) + print(result.exit_code) + assert result.exit_code == 3 + assert 'Contact name blah is not configured' in result.output + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {} + + def test_config_snmp_contact_add_del_new_contact(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["del"], + ["testuser"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert 'SNMP contact testuser removed from configuration' in result.output + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {} + + # Add snmp contact tests + def test_config_snmp_contact_add_with_existing_contact(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["blah", "blah@contoso.com"], obj=db) + print(result.exit_code) + assert result.exit_code == 1 + assert 'Contact already exists. Use sudo config snmp contact modify instead' in result.output + + def test_config_snmp_contact_add_invalid_email(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testusercontoso.com"], obj=db) + print(result.exit_code) + assert result.exit_code == 2 + assert "Contact email testusercontoso.com is not valid" in result.output + + + # Delete snmp contact tests + def test_config_snmp_contact_del_new_contact_when_contact_exists(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["del"], ["blah"], obj=db) + print(result.exit_code) + assert result.exit_code == 1 + assert 'SNMP contact blah is not configured' in result.output + + def test_config_snmp_contact_del_with_existing_contact(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["del"], + ["testuser"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP contact testuser removed from configuration' in result.output + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {} + + # Modify snmp contact tests + def test_config_snmp_contact_modify_email_with_existing_contact(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["modify"], + ["testuser", "testuser@test.com"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP contact testuser email updated to testuser@test.com' in result.output + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@test.com"} + + def test_config_snmp_contact_modify_contact_and_email_with_existing_entry(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["modify"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + assert result.exit_code == 1 + assert 'SNMP contact testuser testuser@contoso.com already exists' in result.output + + def test_config_snmp_contact_modify_existing_contact_with_invalid_email(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["modify"], + ["testuser", "testuser@contosocom"], obj=db) + print(result.exit_code) + assert result.exit_code == 2 + assert 'Contact email testuser@contosocom is not valid' in result.output + + + def test_config_snmp_contact_modify_new_contact_with_invalid_email(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["modify"], + ["blah", "blah@contoso@com"], obj=db) + print(result.exit_code) + assert result.exit_code == 2 + assert 'Contact email blah@contoso@com is not valid' in result.output + + # Add snmp location tests + def test_config_snmp_location_add_exiting_location_with_same_location_already_existing(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + assert result.exit_code == 1 + assert 'Location already exists' in result.output + + def test_config_snmp_location_add_new_location_with_location_already_existing(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["Mile High"], obj=db) + print(result.exit_code) + assert result.exit_code == 1 + assert 'Location already exists' in result.output + + # Del snmp location tests + def test_config_snmp_location_del_with_existing_location(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["del"], + ["public"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP Location public removed from configuration' in result.output + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {} + + def test_config_snmp_location_del_new_location_with_location_already_existing(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["del"], + ["Mile High"], obj=db) + print(result.exit_code) + assert result.exit_code == 1 + assert 'SNMP Location Mile High does not exist. The location is public' in result.output + + # Modify snmp location tests + def test_config_snmp_location_modify_with_same_location(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["modify"], + ["public"], obj=db) + print(result.exit_code) + assert result.exit_code == 1 + assert 'SNMP location public already exists' in result.output + + def test_config_snmp_location_modify_without_redis(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["modify"], + ["Rainer"],obj=db) + print(result.exit_code) + assert result.exit_code == 2 + assert "Cannot modify SNMP Location. You must use 'config snmp location add " \ + "command '" in result.output + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {} + + def test_config_snmp_location_modify_without_existing_location(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["modify"], + ["Rainer"],obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert "SNMP location Rainer modified in configuration" in result.output + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "Rainer"} + + # Add snmp user tests + def test_config_snmp_user_add_invalid_user_name_over_32_characters(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["over_32_characters_community_user", "noAUthNoPRiv", "ro"]) + print(result.exit_code) + assert result.exit_code == 1 + assert 'FAILED: SNMP user over_32_characters_community_user length should not be greater than 32 characters' \ + in result.output + + def test_config_snmp_user_add_excluded_special_characters_in_username(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["Test@user", "noAUthNoPRiv", "ro"]) + print(result.exit_code) + assert result.exit_code == 1 + assert 'FAILED: SNMP user Test@user should not have any of these special symbols' in result.output + + def test_config_snmp_user_add_existing_user(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_1", "noAUthNoPRiv", "ro"]) + print(result.exit_code) + assert result.exit_code == 14 + assert 'SNMP user test_nopriv_RO_1 is already configured' in result.output + + def test_config_snmp_user_add_invalid_user_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "nopriv", "ro"]) + print(result.exit_code) + print(result) + print(result.output) + assert result.exit_code == 2 + assert "Invalid user type. Must be one of these one of these three 'noauthnopriv' or 'authnopriv' or 'priv'" in result.output + + def test_config_snmp_user_add_invalid_permission_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "noauthnopriv", "ab"]) + print(result.exit_code) + assert result.exit_code == 3 + assert "Invalid community type. Must be either RO or RW" in result.output + + def test_config_snmp_user_add_user_type_noauthnopriv_with_unnecessary_auth_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "noauthnopriv", "ro", "sha"]) + print(result.exit_code) + assert result.exit_code == 4 + assert "User auth type not used with 'noAuthNoPriv'. Please use 'AuthNoPriv' or 'Priv' instead" in result.output + + def test_config_snmp_user_add_user_type_authnopriv_missing_auth_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "authnopriv", "ro"]) + print(result.exit_code) + assert result.exit_code == 5 + assert "User auth type is missing. Must be MD5, SHA, or HMAC-SHA-2" in result.output + + def test_config_snmp_user_add_user_type_authnopriv_missing_auth_password(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "authnopriv", "ro", "sha"]) + print(result.exit_code) + assert result.exit_code == 7 + assert "User auth password is missing" in result.output + + def test_config_snmp_user_add_user_type_authnopriv_with_unnecessary_encrypt_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "authnopriv", "ro", "sha", "testauthpass", "DES"]) + print(result.exit_code) + assert result.exit_code == 9 + assert "User encrypt type not used with 'AuthNoPriv'. Please use 'Priv' instead" in result.output + + def test_config_snmp_user_add_user_type_priv_missing_auth_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "priv", "ro"]) + print(result.exit_code) + assert result.exit_code == 5 + assert "User auth type is missing. Must be MD5, SHA, or HMAC-SHA-2" in result.output + + def test_config_snmp_user_add_user_type_priv_missing_auth_password(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "priv", "ro", "md5"]) + print(result.exit_code) + assert result.exit_code == 7 + assert "User auth password is missing" in result.output + + def test_config_snmp_user_add_user_type_priv_missing_encrypt_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "priv", "ro", "md5", "testauthpass"]) + print(result.exit_code) + assert result.exit_code == 10 + assert "User encrypt type is missing. Must be DES or AES" in result.output + + def test_config_snmp_user_add_user_type_priv_invalid_encrypt_password_over_64_characters(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "priv", "ro", "md5", "testauthpass", "DES", + "superlongencryptionpasswordtotestbeingoverthesixtyfourcharacterlimit"]) + print(result.exit_code) + assert result.exit_code == 13 + assert "FAILED: SNMP user password length should be not be greater than 64" in result.output + + def test_config_snmp_user_add_user_type_priv_invalid_encrypt_password_excluded_special_characters(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "priv", "ro", "md5", "testauthpass", "DES", "testencrypt@pass"]) + print(result.exit_code) + assert result.exit_code == 13 + assert "FAILED: SNMP user password should not have any of these special symbols" in result.output + + def test_config_snmp_user_add_user_type_priv_invalid_encrypt_password_not_long_enough(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "priv", "ro", "md5", "testauthpass", "DES", "test1"]) + print(result.exit_code) + assert result.exit_code == 13 + assert "FAILED: SNMP user password length should be at least 8 characters" in result.output + + def test_config_snmp_user_add_invalid_auth_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "authnopriv", "ro", "DM5", "user_auth_pass"]) + print(result.exit_code) + assert result.exit_code == 6 + assert "Invalid user authentication type. Must be one of these 'MD5', 'SHA', or 'HMAC-SHA-2'" in result.output + + def test_config_snmp_user_add_missing_auth_password(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "authnopriv", "ro", "SHA", ""]) + print(result.exit_code) + assert result.exit_code == 7 + assert 'User auth password is missing' in result.output + + def test_config_snmp_user_add_invalid_encrypt_type(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "priv", "ro", "SHA", "user_auth_pass", "EAS", "user_encrypt_pass"]) + print(result.exit_code) + assert result.exit_code == 11 + assert "Invalid user encryption type. Must be one of these two 'DES' or 'AES'" in result.output + + def test_config_snmp_user_add_missing_encrypt_password(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_3", "priv", "ro", "SHA", "user_auth_pass", "AES"]) + print(result.exit_code) + assert result.exit_code == 12 + assert 'User encrypt password is missing' in result.output + + def test_config_snmp_user_add_user_already_existing(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_nopriv_RO_1", "noauthnopriv", "ro"]) + print(result.exit_code) + assert result.exit_code == 14 + assert 'SNMP user test_nopriv_RO_1 is already configured' in result.output + + def test_config_snmp_user_add_valid_user_priv_ro_md5_des(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RO_7", "priv", "ro", "MD5", "user_auth_pass", "DES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_7 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RO_7") == expected_snmp_user_priv_ro_md5_des_config_db_output + + def test_config_snmp_user_add_valid_user_priv_ro_md5_aes(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RO_8", "priv", "ro", "MD5", "user_auth_pass", "AES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_8 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RO_8") == expected_snmp_user_priv_ro_md5_aes_config_db_output + + def test_config_snmp_user_add_valid_user_priv_ro_sha_des(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RO_9", "priv", "ro", "SHA", "user_auth_pass", "DES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_9 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RO_9") == expected_snmp_user_priv_ro_sha_des_config_db_output + + def test_config_snmp_user_add_valid_user_priv_ro_sha_aes(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RO_10", "priv", "ro", "SHA", "user_auth_pass", "AES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_10 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RO_10") == expected_snmp_user_priv_ro_sha_aes_config_db_output + + def test_config_snmp_user_add_valid_user_priv_ro_hmac_sha_2_des(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RO_11", "priv", "ro", "HMAC-SHA-2", "user_auth_pass", "DES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_11 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RO_11") == \ + expected_snmp_user_priv_ro_hmac_sha_2_des_config_db_output + + def test_config_snmp_user_add_valid_user_priv_ro_hmac_sha_2_aes(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RO_12", "priv", "ro", "HMAC-SHA-2", "user_auth_pass", "AES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_12 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RO_12") == \ + expected_snmp_user_priv_ro_hmac_sha_2_aes_config_db_output + + def test_config_snmp_user_add_valid_user_priv_rw_md5_des(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RW_7", "priv", "rw", "MD5", "user_auth_pass", "DES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_7 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RW_7") == expected_snmp_user_priv_rw_md5_des_config_db_output + + def test_config_snmp_user_add_valid_user_priv_rw_md5_aes(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RW_8", "priv", "rw", "MD5", "user_auth_pass", "AES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_8 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RW_8") == expected_snmp_user_priv_rw_md5_aes_config_db_output + + def test_config_snmp_user_add_valid_user_priv_rw_sha_des(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RW_9", "priv", "rw", "SHA", "user_auth_pass", "DES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_9 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RW_9") == expected_snmp_user_priv_rw_sha_des_config_db_output + + def test_config_snmp_user_add_valid_user_priv_rw_sha_aes(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RW_10", "priv", "rw", "SHA", "user_auth_pass", "AES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_10 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RW_10") == expected_snmp_user_priv_rw_sha_aes_config_db_output + + def test_config_snmp_user_add_valid_user_priv_rw_hmac_sha_2_des(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RW_11", "priv", "rw", "HMAC-SHA-2", "user_auth_pass", "DES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_11 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RW_11") == \ + expected_snmp_user_priv_rw_hmac_sha_2_des_config_db_output + + def test_config_snmp_user_add_valid_user_priv_rw_hmac_sha_2_aes(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["add"], + ["test_priv_RW_12", "priv", "rw", "HMAC-SHA-2", "user_auth_pass", "AES", "user_encrypt_pass"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_12 added to configuration' in result.output + assert db.cfgdb.get_entry("SNMP_USER", "test_priv_RW_12") == \ + expected_snmp_user_priv_rw_hmac_sha_2_aes_config_db_output + + # Del snmp user tests + def test_config_snmp_user_del_valid_user(self): + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_nopriv_RO_1"]) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_nopriv_RO_1 removed from configuration' in result.output + + def test_config_snmp_user_del_invalid_user(self): + runner = CliRunner() + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_nopriv_RO_2"]) + print(result.exit_code) + assert result.exit_code == 1 + assert 'SNMP user test_nopriv_RO_2 is not configured' in result.output + + @pytest.mark.parametrize("invalid_email", ['test@contoso', 'test.contoso.com', 'testcontoso@com', + '123_%contoso.com', 'mytest@contoso.comm']) + def test_is_valid_email(self, invalid_email): + output = config.is_valid_email(invalid_email) + assert output == False + + @classmethod + def teardown_class(cls): + print("TEARDOWN") + os.environ["UTILITIES_UNIT_TESTING"] = "0" + diff --git a/tests/mock_tables/config_db.json b/tests/mock_tables/config_db.json index f8ceebffbf..6c554f8f98 100644 --- a/tests/mock_tables/config_db.json +++ b/tests/mock_tables/config_db.json @@ -695,6 +695,172 @@ "peer_switch": "sonic-switch", "type": "ToRRouter" }, + "SNMP_COMMUNITY|msft": { + "TYPE": "RO" + }, + "SNMP_COMMUNITY|Rainer": { + "TYPE": "RW" + }, + "SNMP_USER|test_authpriv_RO_2": { + "SNMP_USER_TYPE": "AuthNoPriv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "SHA", + "SNMP_USER_AUTH_PASSWORD": "test_authpriv_RO_2_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "", + "SNMP_USER_ENCRYPTION_PASSWORD": "" + }, + "SNMP_USER|test_authpriv_RO_3": { + "SNMP_USER_TYPE": "AuthNoPriv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "HMAC-SHA-2", + "SNMP_USER_AUTH_PASSWORD": "test_authpriv_RO_3_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "", + "SNMP_USER_ENCRYPTION_PASSWORD": "" + }, + "SNMP_USER|test_priv_RW_4": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "SHA", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RW_4_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "AES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RW_4_encrpytpass" + }, + "SNMP_USER|test_priv_RW_3": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "SHA", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RW_3_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "DES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RW_3_encrpytpass" + }, + "SNMP_USER|test_priv_RO_2": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "MD5", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RO_2_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "AES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RO_2_encrpytpass" + }, + "SNMP_USER|test_nopriv_RO_1": { + "SNMP_USER_TYPE": "noAuthNoPriv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "", + "SNMP_USER_AUTH_PASSWORD": "", + "SNMP_USER_ENCRYPTION_TYPE": "", + "SNMP_USER_ENCRYPTION_PASSWORD": "" + }, + "SNMP_USER|test_priv_RW_1": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "MD5", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RO_1_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "DES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RW_1_encrpytpass" + }, + "SNMP_USER|test_authpriv_RW_1": { + "SNMP_USER_TYPE": "AuthNoPriv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "MD5", + "SNMP_USER_AUTH_PASSWORD": "test_authpriv_RW_1_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "", + "SNMP_USER_ENCRYPTION_PASSWORD": "" + }, + "SNMP_USER|test_priv_RO_6": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "HMAC-SHA-2", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RO_6_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "AES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RO_6_encrpytpass" + }, + "SNMP_USER|test_priv_RO_1": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "MD5", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RO_1_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "DES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RO_1_encrpytpass" + }, + "SNMP_USER|test_priv_RO_5": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "HMAC-SHA-2", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RO_5_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "DES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RO_5_encrpytpass" + }, + "SNMP_USER|test_nopriv_RW_1": { + "SNMP_USER_TYPE": "noAuthNoPriv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "", + "SNMP_USER_AUTH_PASSWORD": "", + "SNMP_USER_ENCRYPTION_TYPE": "", + "SNMP_USER_ENCRYPTION_PASSWORD": "" + }, + "SNMP_USER|test_priv_RO_3": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "SHA", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RO_3_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "DES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RO_3_encrpytpass" + }, + "SNMP_USER|test_priv_RW_2": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "MD5", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RO_2_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "AES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RW_2_encrpytpass" + }, + "SNMP_USER|test_authpriv_RW_3": { + "SNMP_USER_TYPE": "AuthNoPriv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "HMAC-SHA-2", + "SNMP_USER_AUTH_PASSWORD": "test_authpriv_RW_3_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "", + "SNMP_USER_ENCRYPTION_PASSWORD": "" + }, + "SNMP_USER|test_priv_RW_5": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "HMAC-SHA-2", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RW_5_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "DES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RW_5_encrpytpass" + }, + "SNMP_USER|test_priv_RW_6": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "HMAC-SHA-2", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RW_6_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "AES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RW_6_encrpytpass" + }, + "SNMP_USER|test_authpriv_RW_2": { + "SNMP_USER_TYPE": "AuthNoPriv", + "SNMP_USER_PERMISSION": "RW", + "SNMP_USER_AUTH_TYPE": "SHA", + "SNMP_USER_AUTH_PASSWORD": "test_authpriv_RW_2_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "", + "SNMP_USER_ENCRYPTION_PASSWORD": "" + }, + "SNMP_USER|test_priv_RO_4": { + "SNMP_USER_TYPE": "Priv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "SHA", + "SNMP_USER_AUTH_PASSWORD": "test_priv_RO_4_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "AES", + "SNMP_USER_ENCRYPTION_PASSWORD": "test_priv_RO_4_encrpytpass" + }, + "SNMP_USER|test_authpriv_RO_1": { + "SNMP_USER_TYPE": "AuthNoPriv", + "SNMP_USER_PERMISSION": "RO", + "SNMP_USER_AUTH_TYPE": "MD5", + "SNMP_USER_AUTH_PASSWORD": "test_authpriv_RO_1_authpass", + "SNMP_USER_ENCRYPTION_TYPE": "", + "SNMP_USER_ENCRYPTION_PASSWORD": "" + }, "DEVICE_NEIGHBOR|Ethernet0": { "name": "Servers", "port": "eth0" diff --git a/tests/show_snmp_test.py b/tests/show_snmp_test.py new file mode 100644 index 0000000000..753e20c418 --- /dev/null +++ b/tests/show_snmp_test.py @@ -0,0 +1,467 @@ +import sys +import os +import click +from click.testing import CliRunner +import pytest +import swsssdk +import traceback + +test_path = os.path.dirname(os.path.abspath(__file__)) +modules_path = os.path.dirname(test_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, test_path) +sys.path.insert(0, modules_path) + +import show.main as show +import clear.main as clear +import config.main as config + +import mock_tables.dbconnector + +from unittest import mock +from unittest.mock import patch +from utilities_common.db import Db + +config_snmp_location_add_new_location ="""\ +SNMP Location public has been added to configuration +Restarting SNMP service... +""" + +config_snmp_contact_add_del_new_contact ="""\ +Contact name testuser and contact email testuser@contoso.com have been added to configuration +Restarting SNMP service... +""" + +tabular_data_show_run_snmp_contact_expected = """\ +Contact Contact Email\n--------- --------------------\ntestuser testuser@contoso.com +""" + +json_data_show_run_snmp_contact_expected = """\ +{'testuser': 'testuser@contoso.com'} +""" + +tabular_data_show_run_snmp_community_expected = """\ +Community String Community Type +------------------ ---------------- +Rainer RW +msft RO +""" + +json_data_show_run_snmp_community_expected = """\ +{'msft': {'TYPE': 'RO'}, 'Rainer': {'TYPE': 'RW'}} +""" + +tabular_data_show_run_snmp_location_expected = """\ +Location +---------- +public +""" + +json_data_show_run_snmp_location_expected = """\ +{'Location': 'public'} +""" + + +tabular_data_show_run_snmp_user_expected = """\ +User Permission Type Type Auth Type Auth Password Encryption Type Encryption Password +------------------ ----------------- ------------ ----------- --------------------------- ----------------- -------------------------- +test_authpriv_RO_1 RO AuthNoPriv MD5 test_authpriv_RO_1_authpass +test_authpriv_RO_2 RO AuthNoPriv SHA test_authpriv_RO_2_authpass +test_authpriv_RO_3 RO AuthNoPriv HMAC-SHA-2 test_authpriv_RO_3_authpass +test_authpriv_RW_1 RW AuthNoPriv MD5 test_authpriv_RW_1_authpass +test_authpriv_RW_2 RW AuthNoPriv SHA test_authpriv_RW_2_authpass +test_authpriv_RW_3 RW AuthNoPriv HMAC-SHA-2 test_authpriv_RW_3_authpass +test_nopriv_RO_1 RO noAuthNoPriv +test_nopriv_RW_1 RW noAuthNoPriv +test_priv_RO_1 RO Priv MD5 test_priv_RO_1_authpass DES test_priv_RO_1_encrpytpass +test_priv_RO_2 RO Priv MD5 test_priv_RO_2_authpass AES test_priv_RO_2_encrpytpass +test_priv_RO_3 RO Priv SHA test_priv_RO_3_authpass DES test_priv_RO_3_encrpytpass +test_priv_RO_4 RO Priv SHA test_priv_RO_4_authpass AES test_priv_RO_4_encrpytpass +test_priv_RO_5 RO Priv HMAC-SHA-2 test_priv_RO_5_authpass DES test_priv_RO_5_encrpytpass +test_priv_RO_6 RO Priv HMAC-SHA-2 test_priv_RO_6_authpass AES test_priv_RO_6_encrpytpass +test_priv_RW_1 RW Priv MD5 test_priv_RO_1_authpass DES test_priv_RW_1_encrpytpass +test_priv_RW_2 RW Priv MD5 test_priv_RO_2_authpass AES test_priv_RW_2_encrpytpass +test_priv_RW_3 RW Priv SHA test_priv_RW_3_authpass DES test_priv_RW_3_encrpytpass +test_priv_RW_4 RW Priv SHA test_priv_RW_4_authpass AES test_priv_RW_4_encrpytpass +test_priv_RW_5 RW Priv HMAC-SHA-2 test_priv_RW_5_authpass DES test_priv_RW_5_encrpytpass +test_priv_RW_6 RW Priv HMAC-SHA-2 test_priv_RW_6_authpass AES test_priv_RW_6_encrpytpass +""" + + + + +json_data_show_run_snmp_user_expected = """{'test_authpriv_RO_2': {'SNMP_USER_TYPE': 'AuthNoPriv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'SHA', 'SNMP_USER_AUTH_PASSWORD': 'test_authpriv_RO_2_authpass', 'SNMP_USER_ENCRYPTION_TYPE': '', 'SNMP_USER_ENCRYPTION_PASSWORD': ''}, 'test_authpriv_RO_3': {'SNMP_USER_TYPE': 'AuthNoPriv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', 'SNMP_USER_AUTH_PASSWORD': 'test_authpriv_RO_3_authpass', 'SNMP_USER_ENCRYPTION_TYPE': '', 'SNMP_USER_ENCRYPTION_PASSWORD': ''}, 'test_priv_RW_4': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'SHA', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RW_4_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'AES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RW_4_encrpytpass'}, 'test_priv_RW_3': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'SHA', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RW_3_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'DES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RW_3_encrpytpass'}, 'test_priv_RO_2': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'MD5', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RO_2_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'AES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RO_2_encrpytpass'}, 'test_nopriv_RO_1': {'SNMP_USER_TYPE': 'noAuthNoPriv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': '', 'SNMP_USER_AUTH_PASSWORD': '', 'SNMP_USER_ENCRYPTION_TYPE': '', 'SNMP_USER_ENCRYPTION_PASSWORD': ''}, 'test_priv_RW_1': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'MD5', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RO_1_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'DES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RW_1_encrpytpass'}, 'test_authpriv_RW_1': {'SNMP_USER_TYPE': 'AuthNoPriv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'MD5', 'SNMP_USER_AUTH_PASSWORD': 'test_authpriv_RW_1_authpass', 'SNMP_USER_ENCRYPTION_TYPE': '', 'SNMP_USER_ENCRYPTION_PASSWORD': ''}, 'test_priv_RO_6': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RO_6_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'AES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RO_6_encrpytpass'}, 'test_priv_RO_1': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'MD5', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RO_1_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'DES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RO_1_encrpytpass'}, 'test_priv_RO_5': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RO_5_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'DES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RO_5_encrpytpass'}, 'test_nopriv_RW_1': {'SNMP_USER_TYPE': 'noAuthNoPriv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': '', 'SNMP_USER_AUTH_PASSWORD': '', 'SNMP_USER_ENCRYPTION_TYPE': '', 'SNMP_USER_ENCRYPTION_PASSWORD': ''}, 'test_priv_RO_3': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'SHA', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RO_3_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'DES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RO_3_encrpytpass'}, 'test_priv_RW_2': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'MD5', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RO_2_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'AES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RW_2_encrpytpass'}, 'test_authpriv_RW_3': {'SNMP_USER_TYPE': 'AuthNoPriv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', 'SNMP_USER_AUTH_PASSWORD': 'test_authpriv_RW_3_authpass', 'SNMP_USER_ENCRYPTION_TYPE': '', 'SNMP_USER_ENCRYPTION_PASSWORD': ''}, 'test_priv_RW_5': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RW_5_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'DES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RW_5_encrpytpass'}, 'test_priv_RW_6': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'HMAC-SHA-2', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RW_6_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'AES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RW_6_encrpytpass'}, 'test_authpriv_RW_2': {'SNMP_USER_TYPE': 'AuthNoPriv', 'SNMP_USER_PERMISSION': 'RW', 'SNMP_USER_AUTH_TYPE': 'SHA', 'SNMP_USER_AUTH_PASSWORD': 'test_authpriv_RW_2_authpass', 'SNMP_USER_ENCRYPTION_TYPE': '', 'SNMP_USER_ENCRYPTION_PASSWORD': ''}, 'test_priv_RO_4': {'SNMP_USER_TYPE': 'Priv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'SHA', 'SNMP_USER_AUTH_PASSWORD': 'test_priv_RO_4_authpass', 'SNMP_USER_ENCRYPTION_TYPE': 'AES', 'SNMP_USER_ENCRYPTION_PASSWORD': 'test_priv_RO_4_encrpytpass'}, 'test_authpriv_RO_1': {'SNMP_USER_TYPE': 'AuthNoPriv', 'SNMP_USER_PERMISSION': 'RO', 'SNMP_USER_AUTH_TYPE': 'MD5', 'SNMP_USER_AUTH_PASSWORD': 'test_authpriv_RO_1_authpass', 'SNMP_USER_ENCRYPTION_TYPE': '', 'SNMP_USER_ENCRYPTION_PASSWORD': ''}} +""" + +tabular_data_show_run_snmp_expected = """\ +Location +---------- +public + + +SNMP_CONTACT SNMP_CONTACT_EMAIL +-------------- -------------------- +testuser testuser@contoso.com + + +Community String Community Type +------------------ ---------------- +Rainer RW +msft RO + + +User Permission Type Type Auth Type Auth Password Encryption Type Encryption Password +------------------ ----------------- ------------ ----------- --------------------------- ----------------- -------------------------- +test_authpriv_RO_1 RO AuthNoPriv MD5 test_authpriv_RO_1_authpass +test_authpriv_RO_2 RO AuthNoPriv SHA test_authpriv_RO_2_authpass +test_authpriv_RO_3 RO AuthNoPriv HMAC-SHA-2 test_authpriv_RO_3_authpass +test_authpriv_RW_1 RW AuthNoPriv MD5 test_authpriv_RW_1_authpass +test_authpriv_RW_2 RW AuthNoPriv SHA test_authpriv_RW_2_authpass +test_authpriv_RW_3 RW AuthNoPriv HMAC-SHA-2 test_authpriv_RW_3_authpass +test_nopriv_RO_1 RO noAuthNoPriv +test_nopriv_RW_1 RW noAuthNoPriv +test_priv_RO_1 RO Priv MD5 test_priv_RO_1_authpass DES test_priv_RO_1_encrpytpass +test_priv_RO_2 RO Priv MD5 test_priv_RO_2_authpass AES test_priv_RO_2_encrpytpass +test_priv_RO_3 RO Priv SHA test_priv_RO_3_authpass DES test_priv_RO_3_encrpytpass +test_priv_RO_4 RO Priv SHA test_priv_RO_4_authpass AES test_priv_RO_4_encrpytpass +test_priv_RO_5 RO Priv HMAC-SHA-2 test_priv_RO_5_authpass DES test_priv_RO_5_encrpytpass +test_priv_RO_6 RO Priv HMAC-SHA-2 test_priv_RO_6_authpass AES test_priv_RO_6_encrpytpass +test_priv_RW_1 RW Priv MD5 test_priv_RO_1_authpass DES test_priv_RW_1_encrpytpass +test_priv_RW_2 RW Priv MD5 test_priv_RO_2_authpass AES test_priv_RW_2_encrpytpass +test_priv_RW_3 RW Priv SHA test_priv_RW_3_authpass DES test_priv_RW_3_encrpytpass +test_priv_RW_4 RW Priv SHA test_priv_RW_4_authpass AES test_priv_RW_4_encrpytpass +test_priv_RW_5 RW Priv HMAC-SHA-2 test_priv_RW_5_authpass DES test_priv_RW_5_encrpytpass +test_priv_RW_6 RW Priv HMAC-SHA-2 test_priv_RW_6_authpass AES test_priv_RW_6_encrpytpass +""" + + +class TestSNMPShowCommands(object): + @classmethod + def setup_class(cls): + print("SETUP") + os.environ["PATH"] += os.pathsep + scripts_path + os.environ["UTILITIES_UNIT_TESTING"] = "1" + + # mock the redis for unit test purposes # + try: + if os.environ["UTILITIES_UNIT_TESTING"] == "1": + modules_path = os.path.join(os.path.dirname(__file__), "..") + test_path = os.path.join(modules_path, "sonic-utilities-tests") + sys.path.insert(0, modules_path) + sys.path.insert(0, test_path) + import mock_tables.dbconnector + except KeyError: + pass + + def test_show_run_snmp_location_tabular(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["location"], + [], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == tabular_data_show_run_snmp_location_expected + + def test_show_run_snmp_location_json(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["location"], + ["--json"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == json_data_show_run_snmp_location_expected + + def test_show_run_snmp_location_json_bad_key(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["location"], ["--json"]) + print(result.exit_code) + print(result.output) + traceback.print_tb(result.exc_info[2]) + assert result.exit_code == 0 + assert "{}" in result.output + + + def test_show_run_snmp_location_bad_key(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["location"], []) + print(result.exit_code) + print(result.output) + traceback.print_tb(result.exc_info[2]) + assert result.exit_code == 0 + assert "" in result.output + + def test_show_run_snmp_contact_tabular(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["contact"], + [], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == tabular_data_show_run_snmp_contact_expected + + def test_show_run_snmp_contact_json(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["contact"], + ["--json"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == json_data_show_run_snmp_contact_expected + + def test_show_run_snmp_contact_json_bad_key(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["contact"], ["--json"]) + print(result.exit_code) + print(result.output) + traceback.print_tb(result.exc_info[2]) + assert result.exit_code == 0 + assert '{}' in result.output + + def test_show_run_snmp_contact_tabular_bad_key(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["contact"]) + print(result.exit_code) + print(result.output) + traceback.print_tb(result.exc_info[2]) + assert result.exit_code == 0 + assert '' in result.output + + + def test_show_run_snmp_community_tabular(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["community"], []) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == tabular_data_show_run_snmp_community_expected + + def test_show_run_snmp_community_json(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["community"], + ["--json"]) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == json_data_show_run_snmp_community_expected + + def test_show_run_snmp_user_tabular(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["user"], []) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == tabular_data_show_run_snmp_user_expected + + def test_show_run_snmp_user_json(self): + runner = CliRunner() + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["user"], ["--json"]) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == json_data_show_run_snmp_user_expected + + def test_show_run_snmp_user_json_bad_key(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_authpriv_RO_1"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_authpriv_RO_1 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_authpriv_RO_2"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_authpriv_RO_2 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_authpriv_RO_3"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_authpriv_RO_3 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_authpriv_RW_1"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_authpriv_RW_1 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_authpriv_RW_2"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_authpriv_RW_2 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_authpriv_RW_3"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_authpriv_RW_3 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_nopriv_RO_1"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_nopriv_RO_1 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_nopriv_RW_1"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_nopriv_RW_1 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RO_1"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_1 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RO_2"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_2 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RO_3"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_3 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RO_4"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_4 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RO_5"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_5 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RO_6"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RO_6 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RW_1"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_1 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RW_2"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_2 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RW_3"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_3 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RW_4"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_4 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RW_5"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_5 removed from configuration' in result.output + + result = runner.invoke(config.config.commands["snmp"].commands["user"].commands["del"], + ["test_priv_RW_6"], obj=db) + print(result.exit_code) + assert result.exit_code == 0 + assert 'SNMP user test_priv_RW_6 removed from configuration' in result.output + + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"].commands["user"], ["--json"], obj=db) + print(result.exit_code) + print(result.output) + traceback.print_tb(result.exc_info[2]) + assert result.exit_code == 0 + assert "{}" in result.output + + + def test_show_run_snmp_tabular(self): + db = Db() + runner = CliRunner() + with mock.patch('utilities_common.cli.run_command') as mock_run_command: + result = runner.invoke(config.config.commands["snmp"].commands["contact"].commands["add"], + ["testuser", "testuser@contoso.com"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_contact_add_del_new_contact + assert db.cfgdb.get_entry("SNMP", "CONTACT") == {"testuser": "testuser@contoso.com"} + + result = runner.invoke(config.config.commands["snmp"].commands["location"].commands["add"], + ["public"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == config_snmp_location_add_new_location + assert db.cfgdb.get_entry("SNMP", "LOCATION") == {"Location": "public"} + + result = runner.invoke(show.cli.commands["runningconfiguration"].commands["snmp"], [], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code == 0 + assert result.output == tabular_data_show_run_snmp_expected + + + @classmethod + def teardown_class(cls): + print("TEARDOWN") + os.environ["PATH"] = os.pathsep.join(os.environ["PATH"].split(os.pathsep)[:-1]) + os.environ["UTILITIES_UNIT_TESTING"] = "0" +