diff --git a/CHANGELOG.md b/CHANGELOG.md index 757be4319..9583123a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [unreleased] ### Added +* Introducing Gmpv9 [PR 157](https://github.com/greenbone/python-gvm/pull/157) +* Added new `create_audit` method, to create a task with the `usage_type` `audit` [PR 157](https://github.com/greenbone/python-gvm/pull/157) +* Added new `create_policy` method, to create a config with the `usage_type` `policy` [PR 157](https://github.com/greenbone/python-gvm/pull/157) +* Added the new methods `create_tls_certificate`, `modify_tls_certificate` and `clone_tls_certificate` to create, modify and copy TLS certificates [PR 157](https://github.com/greenbone/python-gvm/pull/157) +* Added the new method `get_tls_certificates`, to request TLS certificates from the server [PR 157](https://github.com/greenbone/python-gvm/pull/157) ### Changed +* Added type `TLS_CERTIFICATE` to `EntityType` and `FilterType` [PR 157](https://github.com/greenbone/python-gvm/pull/157) ### Fixed diff --git a/gvm/protocols/gmpv9/__init__.py b/gvm/protocols/gmpv9/__init__.py new file mode 100644 index 000000000..841f62aa3 --- /dev/null +++ b/gvm/protocols/gmpv9/__init__.py @@ -0,0 +1,470 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 - 2019 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# pylint: disable=arguments-differ, redefined-builtin, too-many-lines + +""" +Module for communication with gvmd in `Greenbone Management Protocol version 9`_ + +.. _Greenbone Management Protocol version 9: + https://docs.greenbone.net/API/GMP/gmp-9.0.html +""" +import collections +import numbers + +from enum import Enum +from typing import Any, List, Optional + +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.utils import deprecation +from gvm.xml import XmlCommand + +from gvm.protocols.gmpv8 import Gmp as Gmpv8, _to_bool, _add_filter +from gvm.protocols.gmpv7 import _is_list_like, _to_comma_list + +from . import types +from .types import HostsOrdering, EntityType, FilterType +from .types import get_entity_type_from_string, get_filter_type_from_string +from .types import _UsageType as UsageType + +PROTOCOL_VERSION = (9,) + + +class Gmp(Gmpv8): + + types = types + + @staticmethod + def get_protocol_version() -> tuple: + """Determine the Greenbone Management Protocol version. + + Returns: + tuple: Implemented version of the Greenbone Management Protocol + """ + return PROTOCOL_VERSION + + def create_audit( + self, + name: str, + audit_id: str, + target_id: str, + scanner_id: str, + *, + alterable: Optional[bool] = None, + hosts_ordering: Optional[HostsOrdering] = None, + schedule_id: Optional[str] = None, + alert_ids: Optional[List[str]] = None, + comment: Optional[str] = None, + schedule_periods: Optional[int] = None, + observers: Optional[List[str]] = None, + preferences: Optional[dict] = None + ) -> Any: + """Create a new audit task + + Arguments: + name: Name of the new audit + audit_id: UUID of scan config to use by the audit + target_id: UUID of target to be scanned + scanner_id: UUID of scanner to use for scanning the target + comment: Comment for the audit + alterable: Whether the task should be alterable + alert_ids: List of UUIDs for alerts to be applied to the audit + hosts_ordering: The order hosts are scanned in + schedule_id: UUID of a schedule when the audit should be run. + schedule_periods: A limit to the number of times the audit will be + scheduled, or 0 for no limit + observers: List of names or ids of users which should be allowed to + observe this audit + preferences: Name/Value pairs of scanner preferences. + + Returns: + The response. See :py:meth:`send_command` for details. + """ + + self.__create_task( + name=name, + config_id=audit_id, + target_id=target_id, + scanner_id=scanner_id, + usage_type=UsageType.AUDIT, # pylint: disable=W0212 + function=self.create_audit.__name__, + alterable=alterable, + hosts_ordering=hosts_ordering, + schedule_id=schedule_id, + alert_ids=alert_ids, + comment=comment, + schedule_periods=schedule_periods, + observers=observers, + preferences=preferences, + ) + + def create_config(self, config_id: str, name: str) -> Any: + """Create a new scan config + Arguments: + config_id: UUID of the existing scan config + name: Name of the new scan config + + Returns: + The response. See :py:meth:`send_command` for details. + """ + self.__create_config( + config_id=config_id, + name=name, + usage_type=UsageType.SCAN, # pylint: disable=W0212 + function=self.create_config.__name__, + ) + + def create_policy(self, policy_id: str, name: str) -> Any: + """Create a new policy config + Arguments: + policy_id: UUID of the existing policy config + name: Name of the new scan config + + Returns: + The response. See :py:meth:`send_command` for details. + """ + self.__create_config( + config_id=policy_id, + name=name, + usage_type=UsageType.POLICY, # pylint: disable=W0212 + function=self.create_policy.__name__, + ) + + def create_task( + self, + name: str, + config_id: str, + target_id: str, + scanner_id: str, + *, + alterable: Optional[bool] = None, + hosts_ordering: Optional[HostsOrdering] = None, + schedule_id: Optional[str] = None, + alert_ids: Optional[List[str]] = None, + comment: Optional[str] = None, + schedule_periods: Optional[int] = None, + observers: Optional[List[str]] = None, + preferences: Optional[dict] = None + ) -> Any: + """Create a new scan task + + Arguments: + name: Name of the task + config_id: UUID of scan config to use by the task + target_id: UUID of target to be scanned + scanner_id: UUID of scanner to use for scanning the target + comment: Comment for the task + alterable: Whether the task should be alterable + alert_ids: List of UUIDs for alerts to be applied to the task + hosts_ordering: The order hosts are scanned in + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: A limit to the number of times the task will be + scheduled, or 0 for no limit + observers: List of names or ids of users which should be allowed to + observe this task + preferences: Name/Value pairs of scanner preferences. + + Returns: + The response. See :py:meth:`send_command` for details. + """ + self.__create_task( + name=name, + config_id=config_id, + target_id=target_id, + scanner_id=scanner_id, + usage_type=UsageType.SCAN, # pylint: disable=W0212 + function=self.create_task.__name__, + alterable=alterable, + hosts_ordering=hosts_ordering, + schedule_id=schedule_id, + alert_ids=alert_ids, + comment=comment, + schedule_periods=schedule_periods, + observers=observers, + preferences=preferences, + ) + + def create_tls_certificate( + self, + name: str, + certificate: str, + *, + comment: Optional[str] = None, + copy: Optional[str] = None, + trust: Optional[bool] = None + ) -> Any: + """Create a new TLS certificate + + Arguments: + comment: Comment for the TLS certificate. + name: Name of the TLS certificate, defaulting to the MD5 fingerprint + copy: The UUID of an existing TLS certificate + trust: Whether the certificate is trusted. + certificate: The Base64 encoded certificate data (x.509 DER or PEM). + + Returns: + The response. See :py:meth:`send_command`for details. + """ + if not name: + raise RequiredArgument( + argument='name', function=self.create_tls_certificate.__name__ + ) + if not certificate: + raise RequiredArgument( + argument="certificate", + function=self.create_tls_certificate.__name__, + ) + + cmd = XmlCommand("create_tls_certificate") + + if comment: + cmd.add_element("comment", comment) + + if copy: + cmd.add_element("copy", copy) + + cmd.add_element("name", name) + cmd.add_element("certificate", certificate) + + if trust: + cmd.add_element("trust", _to_bool(trust)) + + return self._send_xml_command(cmd) + + def get_tls_certificates( + self, + *, + filter: Optional[str] = None, + filter_id: Optional[str] = None, + include_certificate_data: Optional[bool] = None + ) -> Any: + """Request a list of TLS certificates + + Arguments: + filter: Filter term to use for the query + filter_id: UUID of an existing filter to use for the query + + Returns: + The response. See :py:meth:`send_command` for details. + """ + + cmd = XmlCommand("get_tls_certificates") + + _add_filter(cmd, filter, filter_id) + + if include_certificate_data is not None: + cmd.set_attribute( + "include_certificate_data", _to_bool(include_certificate_data) + ) + + return self._send_xml_command(cmd) + + def modify_tls_certificate( + self, + tls_certificate_id: str, + *, + name: Optional[str] = None, + comment: Optional[str] = None, + trust: Optional[bool] = None + ) -> Any: + """Modifies an existing TLS certificate. + + Arguments: + tls_certificate_id: UUID of the TLS certificate to be modified. + name: Name of the TLS certificate, defaulting to the MD5 fingerprint + comment: Comment for the TLS certificate. + trust: Whether the certificate is trusted. + + Returns: + The response. See :py:meth:`send_command` for details. + """ + if not tls_certificate_id: + raise RequiredArgument( + argument="tls_certificate_id", + function=self.modify_tls_certificate.__name__, + ) + + cmd = XmlCommand("modify_tls_certificate") + cmd.set_attribute("tls_certificate_id", str(tls_certificate_id)) + + if comment: + cmd.add_element("comment", comment) + + if name: + cmd.add_element("name", name) + + if trust: + cmd.add_element("trust", _to_bool(trust)) + + return self._send_xml_command(cmd) + + def clone_tls_certificate(self, tls_certificate_id: str) -> Any: + """Modifies an existing TLS certificate. + + Arguments: + tls_certificate_id: The UUID of an existing TLS certificate + + Returns: + The response. See :py:meth:`send_command` for details. + """ + if not tls_certificate_id: + raise RequiredArgument( + argument="tls_certificate_id", + function=self.modify_tls_certificate.__name__, + ) + + cmd = XmlCommand("create_tls_certificate") + + cmd.add_element("copy", tls_certificate_id) + + return self._send_xml_command(cmd) + + def __create_task( + self, + name: str, + config_id: str, + target_id: str, + scanner_id: str, + usage_type: types._UsageType, # pylint: disable=W0212 + function: str, + *, + alterable: Optional[bool] = None, + hosts_ordering: Optional[HostsOrdering] = None, + schedule_id: Optional[str] = None, + alert_ids: Optional[List[str]] = None, + comment: Optional[str] = None, + schedule_periods: Optional[int] = None, + observers: Optional[List[str]] = None, + preferences: Optional[dict] = None + ) -> Any: + if not name: + raise RequiredArgument( + "{} requires a name argument".format(function) + ) + + if not config_id: + raise RequiredArgument( + "{} requires a config_id argument".format(function) + ) + + if not target_id: + raise RequiredArgument( + "{} requires a target_id argument".format(function) + ) + + if not scanner_id: + raise RequiredArgument( + "{} requires a scanner_id argument".format(function) + ) + + # don't allow to create a container task with create_task + if target_id == '0': + raise InvalidArgument( + 'Invalid argument {} for target_id'.format(target_id) + ) + + cmd = XmlCommand("create_task") + cmd.add_element("name", name) + cmd.add_element("usage_type", usage_type.value) + cmd.add_element("config", attrs={"id": config_id}) + cmd.add_element("target", attrs={"id": target_id}) + cmd.add_element("scanner", attrs={"id": scanner_id}) + + if comment: + cmd.add_element("comment", comment) + + if not alterable is None: + cmd.add_element("alterable", _to_bool(alterable)) + + if hosts_ordering: + if not isinstance(hosts_ordering, HostsOrdering): + raise InvalidArgument( + function="create_task", argument="hosts_ordering" + ) + cmd.add_element("hosts_ordering", hosts_ordering.value) + + if alert_ids: + if isinstance(alert_ids, str): + deprecation( + "Please pass a list as alert_ids parameter to {}. " + "Passing a string is deprecated and will be removed in " + "future.".format(function) + ) + + # if a single id is given as a string wrap it into a list + alert_ids = [alert_ids] + if _is_list_like(alert_ids): + # parse all given alert id's + for alert in alert_ids: + cmd.add_element("alert", attrs={"id": str(alert)}) + + if schedule_id: + cmd.add_element("schedule", attrs={"id": schedule_id}) + + if schedule_periods is not None: + if ( + not isinstance(schedule_periods, numbers.Integral) + or schedule_periods < 0 + ): + raise InvalidArgument( + "schedule_periods must be an integer greater or equal " + "than 0" + ) + cmd.add_element("schedule_periods", str(schedule_periods)) + + if observers is not None: + if not _is_list_like(observers): + raise InvalidArgument("obeservers argument must be a list") + + # gvmd splits by comma and space + # gvmd tries to lookup each value as user name and afterwards as + # user id. So both user name and user id are possible + cmd.add_element("observers", _to_comma_list(observers)) + + if preferences is not None: + if not isinstance(preferences, collections.abc.Mapping): + raise InvalidArgument('preferences argument must be a dict') + + _xmlprefs = cmd.add_element("preferences") + for pref_name, pref_value in preferences.items(): + _xmlpref = _xmlprefs.add_element("preference") + _xmlpref.add_element("scanner_name", pref_name) + _xmlpref.add_element("value", str(pref_value)) + + return self._send_xml_command(cmd) + + def __create_config( + self, + config_id: str, + name: str, + usage_type: types._UsageType, # pylint: disable=W0212 + function: str, + ) -> Any: + if not name: + raise RequiredArgument("{} requires name argument".format(function)) + + if not config_id: + raise RequiredArgument( + "{} requires config_id argument".format(function) + ) + + cmd = XmlCommand("create_config") + cmd.add_element("copy", config_id) + cmd.add_element("name", name) + cmd.add_element("usage_type", usage_type.value) + return self._send_xml_command(cmd) diff --git a/gvm/protocols/gmpv9/types.py b/gvm/protocols/gmpv9/types.py new file mode 100644 index 000000000..de7e3c538 --- /dev/null +++ b/gvm/protocols/gmpv9/types.py @@ -0,0 +1,267 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2019 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from enum import Enum + +from typing import Optional + +from gvm.errors import InvalidArgument + +from gvm.protocols.gmpv8.types import ( + AlertCondition, + AlertEvent, + AlertMethod, + AliveTest, + AssetType, + CredentialFormat, + CredentialType, + FeedType, + HostsOrdering, + InfoType, + PermissionSubjectType, + PortRangeType, + ScannerType, + SeverityLevel, + SnmpAuthAlgorithm, + SnmpPrivacyAlgorithm, + TicketStatus, + TimeUnit, + get_alert_condition_from_string, + get_alert_event_from_string, + get_alert_method_from_string, + get_alive_test_from_string, + get_asset_type_from_string, + get_credential_format_from_string, + get_credential_type_from_string, + get_feed_type_from_string, + get_hosts_ordering_from_string, + get_info_type_from_string, + get_permission_subject_type_from_string, + get_port_range_type_from_string, + get_severity_level_from_string, + get_scanner_type_from_string, + get_snmp_auth_algorithm_from_string, + get_snmp_privacy_algorithm_from_string, + get_ticket_status_from_string, +) + + +__all__ = [ + "AlertCondition", + "AlertEvent", + "AlertMethod", + "AliveTest", + "AssetType", + "CredentialFormat", + "CredentialType", + "EntityType", + "FeedType", + "FilterType", + "HostsOrdering", + "InfoType", + "PermissionSubjectType", + "PortRangeType", + "ScannerType", + "SeverityLevel", + "SnmpAuthAlgorithm", + "SnmpPrivacyAlgorithm", + "TicketStatus", + "TimeUnit", + "get_alert_condition_from_string", + "get_alert_event_from_string", + "get_alert_method_from_string", + "get_alive_test_from_string", + "get_asset_type_from_string", + "get_credential_format_from_string", + "get_credential_type_from_string", + "get_entity_type_from_string", + "get_feed_type_from_string", + "get_filter_type_from_string", + "get_hosts_ordering_from_string", + "get_info_type_from_string", + "get_permission_subject_type_from_string", + "get_port_range_type_from_string", + "get_scanner_type_from_string", + "get_severity_level_from_string", + "get_snmp_auth_algorithm_from_string", + "get_snmp_privacy_algorithm_from_string", + "get_ticket_status_from_string", +] + + +class EntityType(Enum): + """ Enum for entity types """ + + AGENT = "note" + ALERT = "alert" + ASSET = "asset" + CERT_BUND_ADV = "cert_bund_adv" + CPE = "cpe" + CREDENTIAL = "credential" + CVE = "cve" + DFN_CERT_ADV = "dfn_cert_adv" + FILTER = "filter" + GROUP = "group" + HOST = "host" + INFO = "info" + NOTE = "note" + NVT = "nvt" + OPERATING_SYSTEM = "os" + OVALDEF = "ovaldef" + OVERRIDE = "override" + PERMISSION = "permission" + PORT_LIST = "port_list" + REPORT = "report" + REPORT_FORMAT = "report_format" + RESULT = "result" + ROLE = "role" + SCAN_CONFIG = "config" + SCANNER = "scanner" + SCHEDULE = "schedule" + TAG = "tag" + TARGET = "target" + TASK = "task" + TICKET = "ticket" + TLS_CERTIFICATE = "tls_certificate" + USER = "user" + VULNERABILITY = "vuln" + + +def get_entity_type_from_string( + entity_type: Optional[str] +) -> Optional[EntityType]: + """ Convert a entity type string to an actual EntityType instance + + Arguments: + entity_type: Entity type string to convert to a EntityType + """ + if not entity_type: + return None + + if entity_type == 'vuln': + return EntityType.VULNERABILITY + + if entity_type == 'os': + return EntityType.OPERATING_SYSTEM + + if entity_type == 'config': + return EntityType.SCAN_CONFIG + + if entity_type == 'tls_certificate': + return EntityType.TLS_CERTIFICATE + + try: + return EntityType[entity_type.upper()] + except KeyError: + raise InvalidArgument( + argument='entity_type', + function=get_entity_type_from_string.__name__, + ) + + +class FilterType(Enum): + """ Enum for filter types """ + + AGENT = "agent" + ALERT = "alert" + ASSET = "asset" + SCAN_CONFIG = "config" + CREDENTIAL = "credential" + FILTER = "filter" + GROUP = "group" + HOST = "host" + NOTE = "note" + OPERATING_SYSTEM = "os" + OVERRIDE = "override" + PERMISSION = "permission" + PORT_LIST = "port_list" + REPORT = "report" + REPORT_FORMAT = "report_format" + RESULT = "result" + ROLE = "role" + SCHEDULE = "schedule" + ALL_SECINFO = "secinfo" + TAG = "tag" + TARGET = "target" + TASK = "task" + TICKET = "ticket" + TLS_CERTIFICATE = "tls_certificate" + USER = "user" + VULNERABILITY = "vuln" + + +def get_filter_type_from_string( + filter_type: Optional[str] +) -> Optional[FilterType]: + """ Convert a filter type string to an actual FilterType instance + + Arguments: + filter_type (str): Filter type string to convert to a FilterType + """ + if not filter_type: + return None + + if filter_type == 'vuln': + return FilterType.VULNERABILITY + + if filter_type == 'os': + return FilterType.OPERATING_SYSTEM + + if filter_type == 'config': + return FilterType.SCAN_CONFIG + + if filter_type == 'secinfo': + return FilterType.ALL_SECINFO + + if filter_type == 'tls_certificate': + return FilterType.TLS_CERTIFICATE + + try: + return FilterType[filter_type.upper()] + except KeyError: + raise InvalidArgument( + argument='filter_type', + function=get_filter_type_from_string.__name__, + ) + + +class _UsageType(Enum): + """ Enum for usage types """ + + AUDIT = "audit" + POLICY = "policy" + SCAN = "scan" + + +def __get_usage_type_from_string( + usage_type: Optional[str] +) -> Optional[_UsageType]: + """ Convert a usage type string to an actual _UsageType instance + + Arguments: + entity_type: Usage type string to convert to a _UsageType + """ + if not usage_type: + return None + + try: + return _UsageType[usage_type.upper()] + except KeyError: + raise InvalidArgument( + argument='usage_type', + function=__get_usage_type_from_string.__name__, + ) diff --git a/tests/protocols/gmpv8/test_create_tag.py b/tests/protocols/gmpv8/test_create_tag.py index a4a476a7c..ea87f4631 100644 --- a/tests/protocols/gmpv8/test_create_tag.py +++ b/tests/protocols/gmpv8/test_create_tag.py @@ -23,6 +23,7 @@ from . import Gmpv8TestCase + class GmpCreateTagTestCase(Gmpv8TestCase): def test_create_tag_missing_name(self): with self.assertRaises(RequiredArgument): @@ -112,7 +113,9 @@ def test_create_tag_missing_resource_type(self): def test_create_tag_with_resource_filter(self): self.gmp.create_tag( - name='foo', resource_filter='name=foo', resource_type=EntityType.TASK + name='foo', + resource_filter='name=foo', + resource_type=EntityType.TASK, ) self.connection.send.has_been_called_with( @@ -140,7 +143,9 @@ def test_create_tag_with_resource_ids(self): ) self.gmp.create_tag( - name='foo', resource_ids=['foo', 'bar'], resource_type=EntityType.TASK + name='foo', + resource_ids=['foo', 'bar'], + resource_type=EntityType.TASK, ) self.connection.send.has_been_called_with( @@ -175,7 +180,10 @@ def test_create_tag_with_comment(self): def test_create_tag_with_value(self): self.gmp.create_tag( - name='foo', resource_ids=['foo'], resource_type=EntityType.TASK, value='bar' + name='foo', + resource_ids=['foo'], + resource_type=EntityType.TASK, + value='bar', ) self.connection.send.has_been_called_with( @@ -191,7 +199,10 @@ def test_create_tag_with_value(self): def test_create_tag_with_active(self): self.gmp.create_tag( - name='foo', resource_ids=['foo'], resource_type=EntityType.TASK, active=True + name='foo', + resource_ids=['foo'], + resource_type=EntityType.TASK, + active=True, ) self.connection.send.has_been_called_with( @@ -206,7 +217,10 @@ def test_create_tag_with_active(self): ) self.gmp.create_tag( - name='foo', resource_ids=['foo'], resource_type=EntityType.TASK, active=False + name='foo', + resource_ids=['foo'], + resource_type=EntityType.TASK, + active=False, ) self.connection.send.has_been_called_with( diff --git a/tests/protocols/gmpv8/test_modify_tag.py b/tests/protocols/gmpv8/test_modify_tag.py index c03986ba2..0343015fd 100644 --- a/tests/protocols/gmpv8/test_modify_tag.py +++ b/tests/protocols/gmpv8/test_modify_tag.py @@ -73,7 +73,9 @@ def test_modify_tag_with_active(self): def test_modify_tag_with_resource_filter_and_type(self): self.gmp.modify_tag( - tag_id='t1', resource_filter='name=foo', resource_type=EntityType.TASK + tag_id='t1', + resource_filter='name=foo', + resource_type=EntityType.TASK, ) self.connection.send.has_been_called_with( diff --git a/tests/protocols/gmpv9/__init__.py b/tests/protocols/gmpv9/__init__.py new file mode 100644 index 000000000..5673dccbb --- /dev/null +++ b/tests/protocols/gmpv9/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2019 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from gvm.protocols.gmpv9 import Gmp + +from .. import GmpTestCase + + +class Gmpv9TestCase(GmpTestCase): + + gmp_class = Gmp diff --git a/tests/protocols/gmpv9/test_clone_tls_certificate.py b/tests/protocols/gmpv9/test_clone_tls_certificate.py new file mode 100644 index 000000000..6f427ca71 --- /dev/null +++ b/tests/protocols/gmpv9/test_clone_tls_certificate.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from gvm.errors import RequiredArgument + +from . import Gmpv9TestCase + + +class GmpCloneTLSCertificateTestCase(Gmpv9TestCase): + def test_clone(self): + self.gmp.clone_tls_certificate('a1') + + self.connection.send.has_been_called_with( + '' + 'a1' + '' + ) + + def test_missing_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.clone_tls_certificate('') + + with self.assertRaises(RequiredArgument): + self.gmp.clone_tls_certificate(None) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/protocols/gmpv9/test_create_audit.py b/tests/protocols/gmpv9/test_create_audit.py new file mode 100644 index 000000000..9c7a66b4b --- /dev/null +++ b/tests/protocols/gmpv9/test_create_audit.py @@ -0,0 +1,441 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest +import warnings + +from collections import OrderedDict + +from gvm.errors import RequiredArgument, InvalidArgument + +from gvm.protocols.gmpv9 import HostsOrdering + +from . import Gmpv9TestCase + + +class GmpCreateAuditCommandTestCase(Gmpv9TestCase): + def test_create_task(self): + self.gmp.create_audit( + name='foo', audit_id='c1', target_id='t1', scanner_id='s1' + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '' + ) + + def test_create_audit_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name=None, audit_id='c1', target_id='t1', scanner_id='s1' + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name='', audit_id='c1', target_id='t1', scanner_id='s1' + ) + + def test_create_audit_missing_audit_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name='foo', audit_id=None, target_id='t1', scanner_id='s1' + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name='foo', audit_id='', target_id='t1', scanner_id='s1' + ) + + def test_create_audit_missing_target_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name='foo', audit_id='c1', target_id=None, scanner_id='s1' + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name='foo', audit_id='c1', target_id='', scanner_id='s1' + ) + + def test_create_audit_missing_scanner_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name='foo', audit_id='c1', target_id='t1', scanner_id=None + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name='foo', audit_id='c1', target_id='t1', scanner_id='' + ) + + def test_create_audit_with_comment(self): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + comment='bar', + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + 'bar' + '' + ) + + def test_create_audit_single_alert(self): + # pylint: disable=invalid-name + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + alert_ids='a1', # will be removed in future + ) + + self.assertEqual(len(w), 1) + self.assertTrue(issubclass(w[0].category, DeprecationWarning)) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '' + '' + ) + + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + alert_ids=['a1'], + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '' + '' + ) + + def test_create_audit_multiple_alerts(self): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + alert_ids=['a1', 'a2', 'a3'], + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '' + '' + '' + '' + ) + + def test_create_audit_with_alterable(self): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + alterable=True, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '1' + '' + ) + + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + alterable=False, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '0' + '' + ) + + def test_create_audit_with_hosts_ordering(self): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + hosts_ordering=HostsOrdering.REVERSE, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + 'reverse' + '' + ) + + def test_create_audit_invalid_hosts_ordering(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + hosts_ordering='foo', + ) + + def test_create_audit_with_schedule(self): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '' + '' + ) + + def test_create_audit_with_schedule_and_schedule_periods(self): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + schedule_periods=0, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '' + '0' + '' + ) + + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + schedule_periods=5, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '' + '5' + '' + ) + + def test_create_audit_with_schedule_and_invalid_schedule_periods(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + schedule_periods='foo', + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + schedule_periods=-1, + ) + + def test_create_audit_with_observers(self): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + observers=['u1', 'u2'], + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + 'u1,u2' + '' + ) + + def test_create_audit_invalid_observers(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + observers='', + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + observers='foo', + ) + + def test_create_audit_with_preferences(self): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + preferences=OrderedDict([('foo', 'bar'), ('lorem', 'ipsum')]), + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'audit' + '' + '' + '' + '' + '' + 'foo' + 'bar' + '' + '' + 'lorem' + 'ipsum' + '' + '' + '' + ) + + def test_create_audit_invalid_preferences(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + preferences='', + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='t1', + scanner_id='s1', + preferences=['foo', 'bar'], + ) + + def test_create_audit_don_t_allow_container_task(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id='0', + scanner_id='s1', + observers='', + ) + + # target_id=0 is considered as False + with self.assertRaises(RequiredArgument): + self.gmp.create_audit( + name='foo', + audit_id='c1', + target_id=0, + scanner_id='s1', + observers='', + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/protocols/gmpv9/test_create_config.py b/tests/protocols/gmpv9/test_create_config.py new file mode 100644 index 000000000..0cc1a75df --- /dev/null +++ b/tests/protocols/gmpv9/test_create_config.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from gvm.errors import RequiredArgument + +from . import Gmpv9TestCase + + +class GmpCreateConfigTestCase(Gmpv9TestCase): + def test_create_config(self): + self.gmp.create_config('a1', 'foo') + + self.connection.send.has_been_called_with( + '' + 'a1' + 'foo' + 'scan' + '' + ) + + def test_missing_config_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_config(config_id='', name='foo') + + with self.assertRaises(RequiredArgument): + self.gmp.create_config(config_id=None, name='foo') + + def test_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_config(config_id='c1', name=None) + + with self.assertRaises(RequiredArgument): + self.gmp.create_config(config_id='c1', name='') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/protocols/gmpv9/test_create_policy.py b/tests/protocols/gmpv9/test_create_policy.py new file mode 100644 index 000000000..72c09388e --- /dev/null +++ b/tests/protocols/gmpv9/test_create_policy.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from gvm.errors import RequiredArgument + +from . import Gmpv9TestCase + + +class GmpCreatePolicyTestCase(Gmpv9TestCase): + def test_create_policy(self): + self.gmp.create_policy('a1', 'foo') + + self.connection.send.has_been_called_with( + '' + 'a1' + 'foo' + 'policy' + '' + ) + + def test_missing_policy_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_policy(policy_id='', name='foo') + + with self.assertRaises(RequiredArgument): + self.gmp.create_policy(policy_id=None, name='foo') + + def test_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_policy(policy_id='c1', name=None) + + with self.assertRaises(RequiredArgument): + self.gmp.create_policy(policy_id='c1', name='') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/protocols/gmpv9/test_create_task.py b/tests/protocols/gmpv9/test_create_task.py new file mode 100644 index 000000000..81f3d97a4 --- /dev/null +++ b/tests/protocols/gmpv9/test_create_task.py @@ -0,0 +1,441 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest +import warnings + +from collections import OrderedDict + +from gvm.errors import RequiredArgument, InvalidArgument + +from gvm.protocols.gmpv9 import HostsOrdering + +from . import Gmpv9TestCase + + +class GmpCreateTaskCommandTestCase(Gmpv9TestCase): + def test_create_task(self): + self.gmp.create_task( + name='foo', config_id='c1', target_id='t1', scanner_id='s1' + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '' + ) + + def test_create_task_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name=None, config_id='c1', target_id='t1', scanner_id='s1' + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name='', config_id='c1', target_id='t1', scanner_id='s1' + ) + + def test_create_task_missing_config_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name='foo', config_id=None, target_id='t1', scanner_id='s1' + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name='foo', config_id='', target_id='t1', scanner_id='s1' + ) + + def test_create_task_missing_target_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name='foo', config_id='c1', target_id=None, scanner_id='s1' + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name='foo', config_id='c1', target_id='', scanner_id='s1' + ) + + def test_create_task_missing_scanner_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name='foo', config_id='c1', target_id='t1', scanner_id=None + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name='foo', config_id='c1', target_id='t1', scanner_id='' + ) + + def test_create_task_with_comment(self): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + comment='bar', + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + 'bar' + '' + ) + + def test_create_task_single_alert(self): + # pylint: disable=invalid-name + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + alert_ids='a1', # will be removed in future + ) + + self.assertEqual(len(w), 1) + self.assertTrue(issubclass(w[0].category, DeprecationWarning)) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '' + '' + ) + + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + alert_ids=['a1'], + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '' + '' + ) + + def test_create_task_multiple_alerts(self): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + alert_ids=['a1', 'a2', 'a3'], + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '' + '' + '' + '' + ) + + def test_create_task_with_alterable(self): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + alterable=True, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '1' + '' + ) + + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + alterable=False, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '0' + '' + ) + + def test_create_task_with_hosts_ordering(self): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + hosts_ordering=HostsOrdering.REVERSE, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + 'reverse' + '' + ) + + def test_create_task_invalid_hosts_ordering(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + hosts_ordering='foo', + ) + + def test_create_task_with_schedule(self): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '' + '' + ) + + def test_create_task_with_schedule_and_schedule_periods(self): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + schedule_periods=0, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '' + '0' + '' + ) + + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + schedule_periods=5, + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '' + '5' + '' + ) + + def test_create_task_with_schedule_and_invalid_schedule_periods(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + schedule_periods='foo', + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + schedule_id='s1', + schedule_periods=-1, + ) + + def test_create_task_with_observers(self): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + observers=['u1', 'u2'], + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + 'u1,u2' + '' + ) + + def test_create_task_invalid_observers(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + observers='', + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + observers='foo', + ) + + def test_create_task_with_preferences(self): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + preferences=OrderedDict([('foo', 'bar'), ('lorem', 'ipsum')]), + ) + + self.connection.send.has_been_called_with( + '' + 'foo' + 'scan' + '' + '' + '' + '' + '' + 'foo' + 'bar' + '' + '' + 'lorem' + 'ipsum' + '' + '' + '' + ) + + def test_create_task_invalid_preferences(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + preferences='', + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='t1', + scanner_id='s1', + preferences=['foo', 'bar'], + ) + + def test_create_task_don_t_allow_container_task(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id='0', + scanner_id='s1', + observers='', + ) + + # target_id=0 is considered as False + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name='foo', + config_id='c1', + target_id=0, + scanner_id='s1', + observers='', + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/protocols/gmpv9/test_create_tls_certificate.py b/tests/protocols/gmpv9/test_create_tls_certificate.py new file mode 100644 index 000000000..815cc2bb2 --- /dev/null +++ b/tests/protocols/gmpv9/test_create_tls_certificate.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from gvm.errors import RequiredArgument + +from . import Gmpv9TestCase + + +class GmpCreateTLSCertificateTestCase(Gmpv9TestCase): + def test_create_tls_certificate(self): + self.gmp.create_tls_certificate('foo', 'c1', copy='a1', comment='bar') + + self.connection.send.has_been_called_with( + '' + 'bar' + 'a1' + 'foo' + 'c1' + '' + ) + + def test_missing_certificate(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_tls_certificate(name='foo', certificate='') + + with self.assertRaises(RequiredArgument): + self.gmp.create_tls_certificate(name='foo', certificate=None) + + def test_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_tls_certificate(name=None, certificate='c1') + + with self.assertRaises(RequiredArgument): + self.gmp.create_tls_certificate(name='', certificate='c1') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/protocols/gmpv9/test_entity_type.py b/tests/protocols/gmpv9/test_entity_type.py new file mode 100644 index 000000000..dc313ef5e --- /dev/null +++ b/tests/protocols/gmpv9/test_entity_type.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2019 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from gvm.errors import InvalidArgument +from gvm.protocols.gmpv9 import EntityType, get_entity_type_from_string + + +class GetEntityTypeFromStringTestCase(unittest.TestCase): + def test_invalid(self): + with self.assertRaises(InvalidArgument): + get_entity_type_from_string('foo') + + def test_none_or_empty(self): + ct = get_entity_type_from_string(None) + self.assertIsNone(ct) + ct = get_entity_type_from_string('') + self.assertIsNone(ct) + + def test_agent(self): + ct = get_entity_type_from_string('agent') + self.assertEqual(ct, EntityType.AGENT) + + def test_alert(self): + ct = get_entity_type_from_string('alert') + self.assertEqual(ct, EntityType.ALERT) + + def test_asset(self): + ct = get_entity_type_from_string('asset') + self.assertEqual(ct, EntityType.ASSET) + + def test_cert_bund_adv(self): + ct = get_entity_type_from_string('cert_bund_adv') + self.assertEqual(ct, EntityType.CERT_BUND_ADV) + + def test_cpe(self): + ct = get_entity_type_from_string('cpe') + self.assertEqual(ct, EntityType.CPE) + + def test_credential(self): + ct = get_entity_type_from_string('credential') + self.assertEqual(ct, EntityType.CREDENTIAL) + + def test_dfn_cert_adv(self): + ct = get_entity_type_from_string('dfn_cert_adv') + self.assertEqual(ct, EntityType.DFN_CERT_ADV) + + def test_filter(self): + ct = get_entity_type_from_string('filter') + self.assertEqual(ct, EntityType.FILTER) + + def test_group(self): + ct = get_entity_type_from_string('group') + self.assertEqual(ct, EntityType.GROUP) + + def test_host(self): + ct = get_entity_type_from_string('host') + self.assertEqual(ct, EntityType.HOST) + + def test_info(self): + ct = get_entity_type_from_string('info') + self.assertEqual(ct, EntityType.INFO) + + def test_note(self): + ct = get_entity_type_from_string('note') + self.assertEqual(ct, EntityType.NOTE) + + def test_nvt(self): + ct = get_entity_type_from_string('nvt') + self.assertEqual(ct, EntityType.NVT) + + def test_operating_system(self): + ct = get_entity_type_from_string('os') + self.assertEqual(ct, EntityType.OPERATING_SYSTEM) + + ct = get_entity_type_from_string('operating_system') + self.assertEqual(ct, EntityType.OPERATING_SYSTEM) + + def test_ovaldef(self): + ct = get_entity_type_from_string('ovaldef') + self.assertEqual(ct, EntityType.OVALDEF) + + def test_override(self): + ct = get_entity_type_from_string('override') + self.assertEqual(ct, EntityType.OVERRIDE) + + def test_permission(self): + ct = get_entity_type_from_string('permission') + self.assertEqual(ct, EntityType.PERMISSION) + + def test_port_list(self): + ct = get_entity_type_from_string('port_list') + self.assertEqual(ct, EntityType.PORT_LIST) + + def test_report(self): + ct = get_entity_type_from_string('report') + self.assertEqual(ct, EntityType.REPORT) + + def test_report_format(self): + ct = get_entity_type_from_string('report_format') + self.assertEqual(ct, EntityType.REPORT_FORMAT) + + def test_result(self): + ct = get_entity_type_from_string('result') + self.assertEqual(ct, EntityType.RESULT) + + def test_role(self): + ct = get_entity_type_from_string('role') + self.assertEqual(ct, EntityType.ROLE) + + def test_scan_config(self): + ct = get_entity_type_from_string('config') + self.assertEqual(ct, EntityType.SCAN_CONFIG) + + ct = get_entity_type_from_string('scan_config') + self.assertEqual(ct, EntityType.SCAN_CONFIG) + + def test_scanner(self): + ct = get_entity_type_from_string('scanner') + self.assertEqual(ct, EntityType.SCANNER) + + def test_schedule(self): + ct = get_entity_type_from_string('schedule') + self.assertEqual(ct, EntityType.SCHEDULE) + + def test_tag(self): + ct = get_entity_type_from_string('tag') + self.assertEqual(ct, EntityType.TAG) + + def test_target(self): + ct = get_entity_type_from_string('target') + self.assertEqual(ct, EntityType.TARGET) + + def test_task(self): + ct = get_entity_type_from_string('task') + self.assertEqual(ct, EntityType.TASK) + + def test_ticket(self): + ct = get_entity_type_from_string('ticket') + self.assertEqual(ct, EntityType.TICKET) + + def test_tls_certificate(self): + ft = get_entity_type_from_string('tls_certificate') + self.assertEqual(ft, EntityType.TLS_CERTIFICATE) + + def test_user(self): + ct = get_entity_type_from_string('user') + self.assertEqual(ct, EntityType.USER) + + def test_vulnerability(self): + ct = get_entity_type_from_string('vuln') + self.assertEqual(ct, EntityType.VULNERABILITY) + + ct = get_entity_type_from_string('vulnerability') + self.assertEqual(ct, EntityType.VULNERABILITY) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/protocols/gmpv9/test_filter_type.py b/tests/protocols/gmpv9/test_filter_type.py new file mode 100644 index 000000000..db4bf2d48 --- /dev/null +++ b/tests/protocols/gmpv9/test_filter_type.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2019 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from gvm.errors import InvalidArgument +from gvm.protocols.gmpv9 import FilterType, get_filter_type_from_string + + +class GetFilterTypeFomStringTestCase(unittest.TestCase): + def test_filter_type_agent(self): + ft = get_filter_type_from_string('agent') + self.assertEqual(ft, FilterType.AGENT) + + def test_filter_type_alert(self): + ft = get_filter_type_from_string('alert') + self.assertEqual(ft, FilterType.ALERT) + + def test_filter_type_asset(self): + ft = get_filter_type_from_string('asset') + self.assertEqual(ft, FilterType.ASSET) + + def test_filter_type_credential(self): + ft = get_filter_type_from_string('credential') + self.assertEqual(ft, FilterType.CREDENTIAL) + + def test_filter_type_filter(self): + ft = get_filter_type_from_string('filter') + self.assertEqual(ft, FilterType.FILTER) + + def test_filter_type_group(self): + ft = get_filter_type_from_string('group') + self.assertEqual(ft, FilterType.GROUP) + + def test_filter_type_host(self): + ft = get_filter_type_from_string('host') + self.assertEqual(ft, FilterType.HOST) + + def test_filter_type_note(self): + ft = get_filter_type_from_string('note') + self.assertEqual(ft, FilterType.NOTE) + + def test_filter_type_override(self): + ft = get_filter_type_from_string('override') + self.assertEqual(ft, FilterType.OVERRIDE) + + def test_filter_type_permission(self): + ft = get_filter_type_from_string('permission') + self.assertEqual(ft, FilterType.PERMISSION) + + def test_filter_type_port_list(self): + ft = get_filter_type_from_string('port_list') + self.assertEqual(ft, FilterType.PORT_LIST) + + def test_filter_type_report(self): + ft = get_filter_type_from_string('report') + self.assertEqual(ft, FilterType.REPORT) + + def test_filter_type_report_format(self): + ft = get_filter_type_from_string('report_format') + self.assertEqual(ft, FilterType.REPORT_FORMAT) + + def test_filter_type_result(self): + ft = get_filter_type_from_string('result') + self.assertEqual(ft, FilterType.RESULT) + + def test_filter_type_role(self): + ft = get_filter_type_from_string('role') + self.assertEqual(ft, FilterType.ROLE) + + def test_filter_type_schedule(self): + ft = get_filter_type_from_string('schedule') + self.assertEqual(ft, FilterType.SCHEDULE) + + def test_filter_type_secinfo(self): + ft = get_filter_type_from_string('secinfo') + self.assertEqual(ft, FilterType.ALL_SECINFO) + + def test_filter_type_all_secinfo(self): + ft = get_filter_type_from_string('all_secinfo') + self.assertEqual(ft, FilterType.ALL_SECINFO) + + def test_filter_type_tag(self): + ft = get_filter_type_from_string('tag') + self.assertEqual(ft, FilterType.TAG) + + def test_filter_type_task(self): + ft = get_filter_type_from_string('task') + self.assertEqual(ft, FilterType.TASK) + + def test_filter_type_target(self): + ft = get_filter_type_from_string('target') + self.assertEqual(ft, FilterType.TARGET) + + def test_filter_type_ticket(self): + ft = get_filter_type_from_string('ticket') + self.assertEqual(ft, FilterType.TICKET) + + def test_filter_type_tls_certificate(self): + ft = get_filter_type_from_string('tls_certificate') + self.assertEqual(ft, FilterType.TLS_CERTIFICATE) + + def test_filter_type_operating_system(self): + ft = get_filter_type_from_string('operating_system') + self.assertEqual(ft, FilterType.OPERATING_SYSTEM) + + def test_filter_type_user(self): + ft = get_filter_type_from_string('user') + self.assertEqual(ft, FilterType.USER) + + def test_filter_type_vuln(self): + ft = get_filter_type_from_string('vuln') + self.assertEqual(ft, FilterType.VULNERABILITY) + + def test_filter_type_vulnerability(self): + ft = get_filter_type_from_string('vulnerability') + self.assertEqual(ft, FilterType.VULNERABILITY) + + def test_filter_type_config(self): + ft = get_filter_type_from_string('config') + self.assertEqual(ft, FilterType.SCAN_CONFIG) + + def test_filter_type_scan_config(self): + ft = get_filter_type_from_string('scan_config') + self.assertEqual(ft, FilterType.SCAN_CONFIG) + + def test_filter_type_os(self): + ft = get_filter_type_from_string('os') + self.assertEqual(ft, FilterType.OPERATING_SYSTEM) + + def test_invalid_filter_type(self): + with self.assertRaises(InvalidArgument): + get_filter_type_from_string('foo') + + def test_non_or_empty_filter_type(self): + ft = get_filter_type_from_string(None) + self.assertIsNone(ft) + + ft = get_filter_type_from_string('') + self.assertIsNone(ft) diff --git a/tests/protocols/gmpv9/test_get_tls_certificates.py b/tests/protocols/gmpv9/test_get_tls_certificates.py new file mode 100644 index 000000000..9c33a47a9 --- /dev/null +++ b/tests/protocols/gmpv9/test_get_tls_certificates.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from . import Gmpv9TestCase + + +class GmpGetTLSCertificatesTestCase(Gmpv9TestCase): + def test_get_tls_certificates(self): + self.gmp.get_tls_certificates() + + self.connection.send.has_been_called_with('') + + def test_get_tls_certificates_with_filter(self): + self.gmp.get_tls_certificates(filter='name=foo') + + self.connection.send.has_been_called_with( + '' + ) + + def test_get_tls_certificates_with_include_certificate_data(self): + self.gmp.get_tls_certificates(include_certificate_data='1') + + self.connection.send.has_been_called_with( + '' + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/protocols/gmpv9/test_modify_tls_certificate.py b/tests/protocols/gmpv9/test_modify_tls_certificate.py new file mode 100644 index 000000000..4ef188063 --- /dev/null +++ b/tests/protocols/gmpv9/test_modify_tls_certificate.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Greenbone Networks GmbH +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from gvm.errors import RequiredArgument + +from . import Gmpv9TestCase + + +class GmpModifyTLSCertificateTestCase(Gmpv9TestCase): + def test_modify_tls_certificate(self): + self.gmp.modify_tls_certificate('c1') + + self.connection.send.has_been_called_with( + '' + ) + + def test_modify_tls_certificate_with_name(self): + self.gmp.modify_tls_certificate('c1', name='foo') + + self.connection.send.has_been_called_with( + '' + 'foo' + '' + ) + + def test_missing_tls_certificate_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.modify_tls_certificate(name='foo', tls_certificate_id='') + + with self.assertRaises(RequiredArgument): + self.gmp.modify_tls_certificate(name='foo', tls_certificate_id=None) + + +if __name__ == '__main__': + unittest.main()