From 2b578d96b03edb869d222e34d804dd8d757449b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Pauchet?= Date: Mon, 12 Jun 2023 17:26:44 +0200 Subject: [PATCH] Fix bug with urllib3 and certificate authentication, update tests with parsed response --- .gitignore | 1 + README.md | 6 +- setup.py | 5 +- stormshield/sns/cli.py | 4 + stormshield/sns/sslclient/__init__.py | 30 +++++--- stormshield/sns/sslclient/__version__.py | 2 +- tests/test_format_ini.py | 94 ++++++++++++------------ tox.ini | 12 +++ 8 files changed, 92 insertions(+), 62 deletions(-) create mode 100644 tox.ini diff --git a/.gitignore b/.gitignore index 18be071..d160e55 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ build stormshield.sns.sslclient.egg-info __pycache__ *.pyc +.tox diff --git a/README.md b/README.md index 708399e..29f8fc5 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,11 @@ The library and `snscli` tool support HTTP and SOCKS proxies, use `--proxy schem Warning: some tests require a remote SNS appliance. -`$ PASSWORD=password APPLIANCE=10.0.0.254 python3 setup.py test` +`$ PASSWORD=password APPLIANCE=10.0.0.254 tox` + +To run one test: + +`tox -- tests/test_format_ini` To run `snscli` from the source folder without install: diff --git a/setup.py b/setup.py index e9fbcde..83b7cd3 100755 --- a/setup.py +++ b/setup.py @@ -34,9 +34,8 @@ 'pyreadline; platform_system == "Windows"' ], include_package_data=True, - tests_require=["nose"], - test_suite='nose.collector', - classifiers=[ + tests_require=["pytest"], + classifiers=[ "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.7", "License :: OSI Approved :: Apache Software License", diff --git a/stormshield/sns/cli.py b/stormshield/sns/cli.py index 8af739a..1fe0947 100644 --- a/stormshield/sns/cli.py +++ b/stormshield/sns/cli.py @@ -21,6 +21,8 @@ from stormshield.sns.sslclient import SSLClient, ServerError, TOTPNeededError from stormshield.sns.sslclient.__version__ import __version__ as libversion +from urllib3 import __version__ as urllib3version +from requests import __version__ as requestsversion # define missing exception for python2 try: @@ -179,6 +181,8 @@ def logcommand(self, message, *args, **kwargs): if version: logging.info("snscli - stormshield.sns.sslclient version {}".format(libversion)) + logging.info(" urllib3 {}".format(urllib3version)) + logging.info(" requests {}".format(requestsversion)) sys.exit(0) if script is not None: diff --git a/stormshield/sns/sslclient/__init__.py b/stormshield/sns/sslclient/__init__.py index 2298446..e66c747 100644 --- a/stormshield/sns/sslclient/__init__.py +++ b/stormshield/sns/sslclient/__init__.py @@ -60,7 +60,7 @@ def __init__(self, host=None): self.host = host super().__init__() - def init_poolmanager(self, connections, maxsize, block=False): + def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs): if URLLIB3V2: context = ssl.create_default_context() @@ -71,12 +71,14 @@ def init_poolmanager(self, connections, maxsize, block=False): maxsize=maxsize, block=block, assert_hostname=self.host, - ssl_context=context) + ssl_context=context, + **pool_kwargs) else: self.poolmanager = PoolManager(num_pools=connections, maxsize=maxsize, block=block, - assert_hostname=False) + assert_hostname=False, + **pool_kwargs) def proxy_manager_for(self, proxy, **proxy_kwargs): if proxy in self.proxy_manager: @@ -97,7 +99,7 @@ def proxy_manager_for(self, proxy, **proxy_kwargs): maxsize=self._pool_maxsize, block=self._pool_block, assert_hostname=self.host, - ssl_context=context + ssl_context=context, **proxy_kwargs ) else: @@ -126,7 +128,7 @@ def proxy_manager_for(self, proxy, **proxy_kwargs): maxsize=self._pool_maxsize, block=self._pool_block, assert_hostname=self.host, - ssl_context=context + ssl_context=context, **proxy_kwargs) else: manager = self.proxy_manager[proxy] = proxy_from_url( @@ -148,6 +150,11 @@ def __init__(self, common_name, host, pool_connections=DEFAULT_POOLSIZE, pool_block=DEFAULT_POOLBLOCK): self.__common_name = common_name self.__host = host + + self.__is_stormshield_cert = True + if re.search(r"\.", self.__common_name): + self.__is_stormshield_cert = False + super(DNSResolverHTTPSAdapter, self).__init__(pool_connections=pool_connections, pool_maxsize=pool_maxsize, max_retries=max_retries, @@ -155,6 +162,11 @@ def __init__(self, common_name, host, pool_connections=DEFAULT_POOLSIZE, def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs): pool_kwargs['assert_hostname'] = self.__common_name + if URLLIB3V2 and self.__is_stormshield_cert: + context = ssl.create_default_context() + context.hostname_checks_common_name = True # use CN field for factory Stormshield certificates + context.check_hostname = False # check is done with assert_hostname + pool_kwargs["ssl_context"] = context super(DNSResolverHTTPSAdapter, self).init_poolmanager(connections, maxsize, block=block, @@ -420,11 +432,7 @@ def __init__(self, user='admin', password=None, totp=None, host=None, ip=None, p except ipaddress.AddressValueError: urlip = self.ip self.baseurl = 'https://' + urlip + ':' + str(self.port) - - if URLLIB3V2: - self.session.mount(self.baseurl, HostNameAdapter(self.host)) - else: - self.session.mount(self.baseurl.lower(), DNSResolverHTTPSAdapter(self.host, self.ip)) + self.session.mount(self.baseurl.lower(), DNSResolverHTTPSAdapter(self.host, self.ip)) if self.usercert is not None: self.session.cert = self.usercert @@ -454,11 +462,13 @@ def connect(self): # 1. Authentication and get cookie if self.usercert is not None: # user cert authentication + self.logger.log(logging.DEBUG, "Authentication with SSL certificate") request = self.session.get( self.baseurl + '/auth/admin.html?sslcert=1&app={}'.format(self.app), headers=self.headers, **self.conn_options) else: # password authentication + self.logger.log(logging.DEBUG, "Authentication with user/password") data = { 'uid':base64.b64encode(self.user.encode('utf-8')), 'pswd':base64.b64encode(self.password.encode('utf-8')), 'app':self.app } diff --git a/stormshield/sns/sslclient/__version__.py b/stormshield/sns/sslclient/__version__.py index 16997da..0ea46ad 100644 --- a/stormshield/sns/sslclient/__version__.py +++ b/stormshield/sns/sslclient/__version__.py @@ -2,4 +2,4 @@ # major: breaking API change # minor: new functionality # patch: bugfix -__version__ = "1.0.7" +__version__ = "1.1.0" diff --git a/tests/test_format_ini.py b/tests/test_format_ini.py index dc95ff3..2a9e13e 100644 --- a/tests/test_format_ini.py +++ b/tests/test_format_ini.py @@ -38,81 +38,81 @@ def test_raw(self): def test_section(self): """ section format """ - - expected = """101 code=00a01000 msg="Begin" format="section" -[Global] -State=0 -RiskHalfLife=21600 -RiskTTL=86400 -[Alarm] -Minor=2 -Major=10 -[Sandboxing] -Suspicious=2 -Malicious=50 -Failed=0 -[Antivirus] -Infected=100 -Unknown=2 -Failed=0 -100 code=00a00100 msg="Ok\"""" + + expected = { + "Global": { + "State": "0", + "RiskHalfLife": "21600", + "RiskTTL": "86400" + }, + "Alarm": { + "Minor": "2", + "Major": "10" + }, + "Sandboxing" : { + "Suspicious": "2", + "Malicious": "50", + "Failed": "0" + }, + "Antivirus": { + "Infected": "100", + "Unknown": "2", + "Failed": "0" + } + } response = self.client.send_command('CONFIG HOSTREP SHOW') - self.assertEqual(response.output, expected) + self.assertEqual(response.data, expected) self.assertEqual(response.ret, 100) def test_section_line(self): """ section_line format """ - expected="""101 code=00a01000 msg="Begin" format="section_line" -[Result] -id=pvm_detailed type=pvm name="Detailed Vulnerability Mail" -id=pvm_summary type=pvm name="Summary Vulnerability Mail" -id=app_cert_req type=cert_req name="Accept the certificate request" -id=rej_cert_req type=cert_req name="Reject the certificate request" -id=app_user_req type=user_req name="Accept the user request" -id=rej_user_req type=user_req name="Reject the user request" -id=sponsor_req type=sponsoring name="Sponsoring request" -id=smtp_test_msg type=smtp_conf name="Test SMTP configuration" -100 code=00a00100 msg="Ok\"""" + expected = { + 'Result': [ + {'id': 'pvm_detailed', 'type': 'pvm', 'name': 'Detailed Vulnerability Mail'}, + {'id': 'pvm_summary', 'type': 'pvm', 'name': 'Summary Vulnerability Mail'}, + {'id': 'app_cert_req', 'type': 'cert_req', 'name': 'Accept the certificate request'}, + {'id': 'rej_cert_req', 'type': 'cert_req', 'name': 'Reject the certificate request'}, + {'id': 'app_user_req', 'type': 'user_req', 'name': 'Accept the user request'}, + {'id': 'rej_user_req', 'type': 'user_req', 'name': 'Reject the user request'}, + {'id': 'sponsor_req', 'type': 'sponsoring', 'name': 'Sponsoring request'}, + {'id': 'smtp_test_msg', 'type': 'smtp_conf', 'name': 'Test SMTP configuration'} + ] + } response = self.client.send_command('CONFIG COMMUNICATION EMAIL TEMPLATE LIST') - self.assertEqual(response.output, expected) + self.assertEqual(response.data, expected) self.assertEqual(response.ret, 100) def test_list(self): """ list format """ - expected = """101 code=00a01000 msg="Begin" format="list" -[Result] -network_internals -labo_network -100 code=00a00100 msg="Ok\"""" + expected = {'Result': ['network_internals', 'labo_network']} response = self.client.send_command('CONFIG WEBADMIN ACCESS SHOW') - self.assertEqual(response.output, expected) + self.assertEqual(response.data, expected) self.assertEqual(response.ret, 100) def test_xml(self): """ xml text output """ - expected = """101 code=00a01000 msg="Begin" format="xml" - - - - - - + expected = """ + + + + + + - -100 code=00a00100 msg="Ok\"""" +""" response = self.client.send_command('CONFIG FILTER EXPLICIT index=1 type=filter output=xml') - self.assertEqual(response.output, expected) + self.assertEqual(response.xml, expected) self.assertEqual(response.ret, 100) if __name__ == '__main__': diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..83ed72a --- /dev/null +++ b/tox.ini @@ -0,0 +1,12 @@ +[tox] +env_list = py3-urllib{1,2} + +[testenv] +description = run unit tests +deps = + pytest + urllib1: urllib3>=1.25.0,<2.0.0 + urllib2: urllib3>=2.0.0 +commands = + pytest {posargs:tests} +passenv = * \ No newline at end of file