Skip to content

Commit

Permalink
Fix bug with urllib3 and certificate authentication, update tests wit…
Browse files Browse the repository at this point in the history
…h parsed response
  • Loading branch information
remip2 committed Jun 12, 2023
1 parent df815bd commit 2b578d9
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 62 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ build
stormshield.sns.sslclient.egg-info
__pycache__
*.pyc
.tox
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ The library and `snscli` tool support HTTP and SOCKS proxies, use `--proxy schem

Warning: some tests require a remote SNS appliance.

`$ PASSWORD=password APPLIANCE=10.0.0.254 python3 setup.py test`
`$ PASSWORD=password APPLIANCE=10.0.0.254 tox`

To run one test:

`tox -- tests/test_format_ini`


To run `snscli` from the source folder without install:
Expand Down
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@
'pyreadline; platform_system == "Windows"'
],
include_package_data=True,
tests_require=["nose"],
test_suite='nose.collector',
classifiers=[
tests_require=["pytest"],
classifiers=[
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"License :: OSI Approved :: Apache Software License",
Expand Down
4 changes: 4 additions & 0 deletions stormshield/sns/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

from stormshield.sns.sslclient import SSLClient, ServerError, TOTPNeededError
from stormshield.sns.sslclient.__version__ import __version__ as libversion
from urllib3 import __version__ as urllib3version
from requests import __version__ as requestsversion

# define missing exception for python2
try:
Expand Down Expand Up @@ -179,6 +181,8 @@ def logcommand(self, message, *args, **kwargs):

if version:
logging.info("snscli - stormshield.sns.sslclient version {}".format(libversion))
logging.info(" urllib3 {}".format(urllib3version))
logging.info(" requests {}".format(requestsversion))
sys.exit(0)

if script is not None:
Expand Down
30 changes: 20 additions & 10 deletions stormshield/sns/sslclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, host=None):
self.host = host
super().__init__()

def init_poolmanager(self, connections, maxsize, block=False):
def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):

if URLLIB3V2:
context = ssl.create_default_context()
Expand All @@ -71,12 +71,14 @@ def init_poolmanager(self, connections, maxsize, block=False):
maxsize=maxsize,
block=block,
assert_hostname=self.host,
ssl_context=context)
ssl_context=context,
**pool_kwargs)
else:
self.poolmanager = PoolManager(num_pools=connections,
maxsize=maxsize,
block=block,
assert_hostname=False)
assert_hostname=False,
**pool_kwargs)

def proxy_manager_for(self, proxy, **proxy_kwargs):
if proxy in self.proxy_manager:
Expand All @@ -97,7 +99,7 @@ def proxy_manager_for(self, proxy, **proxy_kwargs):
maxsize=self._pool_maxsize,
block=self._pool_block,
assert_hostname=self.host,
ssl_context=context
ssl_context=context,
**proxy_kwargs
)
else:
Expand Down Expand Up @@ -126,7 +128,7 @@ def proxy_manager_for(self, proxy, **proxy_kwargs):
maxsize=self._pool_maxsize,
block=self._pool_block,
assert_hostname=self.host,
ssl_context=context
ssl_context=context,
**proxy_kwargs)
else:
manager = self.proxy_manager[proxy] = proxy_from_url(
Expand All @@ -148,13 +150,23 @@ def __init__(self, common_name, host, pool_connections=DEFAULT_POOLSIZE,
pool_block=DEFAULT_POOLBLOCK):
self.__common_name = common_name
self.__host = host

self.__is_stormshield_cert = True
if re.search(r"\.", self.__common_name):
self.__is_stormshield_cert = False

super(DNSResolverHTTPSAdapter, self).__init__(pool_connections=pool_connections,
pool_maxsize=pool_maxsize,
max_retries=max_retries,
pool_block=pool_block)

def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
pool_kwargs['assert_hostname'] = self.__common_name
if URLLIB3V2 and self.__is_stormshield_cert:
context = ssl.create_default_context()
context.hostname_checks_common_name = True # use CN field for factory Stormshield certificates
context.check_hostname = False # check is done with assert_hostname
pool_kwargs["ssl_context"] = context
super(DNSResolverHTTPSAdapter, self).init_poolmanager(connections,
maxsize,
block=block,
Expand Down Expand Up @@ -420,11 +432,7 @@ def __init__(self, user='admin', password=None, totp=None, host=None, ip=None, p
except ipaddress.AddressValueError:
urlip = self.ip
self.baseurl = 'https://' + urlip + ':' + str(self.port)

if URLLIB3V2:
self.session.mount(self.baseurl, HostNameAdapter(self.host))
else:
self.session.mount(self.baseurl.lower(), DNSResolverHTTPSAdapter(self.host, self.ip))
self.session.mount(self.baseurl.lower(), DNSResolverHTTPSAdapter(self.host, self.ip))

if self.usercert is not None:
self.session.cert = self.usercert
Expand Down Expand Up @@ -454,11 +462,13 @@ def connect(self):
# 1. Authentication and get cookie
if self.usercert is not None:
# user cert authentication
self.logger.log(logging.DEBUG, "Authentication with SSL certificate")
request = self.session.get(
self.baseurl + '/auth/admin.html?sslcert=1&app={}'.format(self.app),
headers=self.headers, **self.conn_options)
else:
# password authentication
self.logger.log(logging.DEBUG, "Authentication with user/password")
data = { 'uid':base64.b64encode(self.user.encode('utf-8')),
'pswd':base64.b64encode(self.password.encode('utf-8')),
'app':self.app }
Expand Down
2 changes: 1 addition & 1 deletion stormshield/sns/sslclient/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# major: breaking API change
# minor: new functionality
# patch: bugfix
__version__ = "1.0.7"
__version__ = "1.1.0"
94 changes: 47 additions & 47 deletions tests/test_format_ini.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,81 +38,81 @@ def test_raw(self):

def test_section(self):
""" section format """

expected = """101 code=00a01000 msg="Begin" format="section"
[Global]
State=0
RiskHalfLife=21600
RiskTTL=86400
[Alarm]
Minor=2
Major=10
[Sandboxing]
Suspicious=2
Malicious=50
Failed=0
[Antivirus]
Infected=100
Unknown=2
Failed=0
100 code=00a00100 msg="Ok\""""

expected = {
"Global": {
"State": "0",
"RiskHalfLife": "21600",
"RiskTTL": "86400"
},
"Alarm": {
"Minor": "2",
"Major": "10"
},
"Sandboxing" : {
"Suspicious": "2",
"Malicious": "50",
"Failed": "0"
},
"Antivirus": {
"Infected": "100",
"Unknown": "2",
"Failed": "0"
}
}

response = self.client.send_command('CONFIG HOSTREP SHOW')

self.assertEqual(response.output, expected)
self.assertEqual(response.data, expected)
self.assertEqual(response.ret, 100)

def test_section_line(self):
""" section_line format """

expected="""101 code=00a01000 msg="Begin" format="section_line"
[Result]
id=pvm_detailed type=pvm name="Detailed Vulnerability Mail"
id=pvm_summary type=pvm name="Summary Vulnerability Mail"
id=app_cert_req type=cert_req name="Accept the certificate request"
id=rej_cert_req type=cert_req name="Reject the certificate request"
id=app_user_req type=user_req name="Accept the user request"
id=rej_user_req type=user_req name="Reject the user request"
id=sponsor_req type=sponsoring name="Sponsoring request"
id=smtp_test_msg type=smtp_conf name="Test SMTP configuration"
100 code=00a00100 msg="Ok\""""
expected = {
'Result': [
{'id': 'pvm_detailed', 'type': 'pvm', 'name': 'Detailed Vulnerability Mail'},
{'id': 'pvm_summary', 'type': 'pvm', 'name': 'Summary Vulnerability Mail'},
{'id': 'app_cert_req', 'type': 'cert_req', 'name': 'Accept the certificate request'},
{'id': 'rej_cert_req', 'type': 'cert_req', 'name': 'Reject the certificate request'},
{'id': 'app_user_req', 'type': 'user_req', 'name': 'Accept the user request'},
{'id': 'rej_user_req', 'type': 'user_req', 'name': 'Reject the user request'},
{'id': 'sponsor_req', 'type': 'sponsoring', 'name': 'Sponsoring request'},
{'id': 'smtp_test_msg', 'type': 'smtp_conf', 'name': 'Test SMTP configuration'}
]
}

response = self.client.send_command('CONFIG COMMUNICATION EMAIL TEMPLATE LIST')

self.assertEqual(response.output, expected)
self.assertEqual(response.data, expected)
self.assertEqual(response.ret, 100)

def test_list(self):
""" list format """

expected = """101 code=00a01000 msg="Begin" format="list"
[Result]
network_internals
labo_network
100 code=00a00100 msg="Ok\""""
expected = {'Result': ['network_internals', 'labo_network']}

response = self.client.send_command('CONFIG WEBADMIN ACCESS SHOW')

self.assertEqual(response.output, expected)
self.assertEqual(response.data, expected)
self.assertEqual(response.ret, 100)

def test_xml(self):
""" xml text output """

expected = """101 code=00a01000 msg="Begin" format="xml"
<data format="xml"><filters total_lines="5">
<separator collapse="0" color="c0c0c0" comment="Remote Management: Go to System - Configuration to setup the web administration application access" first_ruleid="1" nb_elements="2" position="1" />
<filter action="pass" comment="Admin from everywhere" index="1" position="2" status="active" type="local_filter_slot"><noconnlog disk="0" ipfix="0" syslog="0" /><from><target type="any" value="any" /></from><to><port type="single" value="firewall_srv" /><port type="single" value="https" /><target type="group" value="firewall_all" /></to></filter>
<filter action="pass" comment="Allow Ping from everywhere" icmp_code="0" icmp_type="8" index="2" ipproto="icmp" position="3" proto="none" status="active" type="local_filter_slot"><noconnlog disk="0" ipfix="0" syslog="0" /><from><target type="any" value="any" /></from><to><target type="group" value="firewall_all" /></to></filter>
<separator collapse="0" color="c0c0c0" comment="Default policy" first_ruleid="3" nb_elements="1" position="4" />
<filter action="block" comment="Block all" index="3" position="5" status="active" type="local_filter_slot"><noconnlog disk="0" ipfix="0" syslog="0" /><from><target type="any" value="any" /></from><to><target type="any" value="any" /></to></filter>
expected = """<?xml version="1.0"?>
<nws code="100" msg="OK"><serverd ret="101" code="00a01000" msg="Begin"><data format="xml"><filters total_lines="5">
<separator nb_elements="2" first_ruleid="1" position="1" color="c0c0c0" comment="Remote Management: Go to System - Configuration to setup the web administration application access" collapse="0" />
<filter position="2" index="1" type="local_filter_slot" action="pass" status="active" comment="Admin from everywhere"><noconnlog disk="0" syslog="0" ipfix="0" /><from><target value="any" type="any" /></from><to><port value="firewall_srv" type="single" /><port value="https" type="single" /><target value="firewall_all" type="group" /></to></filter>
<filter position="3" index="2" type="local_filter_slot" action="pass" status="active" ipproto="icmp" proto="none" icmp_type="8" icmp_code="0" comment="Allow Ping from everywhere"><noconnlog disk="0" syslog="0" ipfix="0" /><from><target value="any" type="any" /></from><to><target value="firewall_all" type="group" /></to></filter>
<separator nb_elements="1" first_ruleid="3" position="4" color="c0c0c0" comment="Default policy" collapse="0" />
<filter position="5" index="3" type="local_filter_slot" action="block" status="active" comment="Block all"><noconnlog disk="0" syslog="0" ipfix="0" /><from><target value="any" type="any" /></from><to><target value="any" type="any" /></to></filter>
</filters>
</data>
100 code=00a00100 msg="Ok\""""
</data></serverd><serverd ret="100" code="00a00100" msg="Ok"></serverd></nws>"""

response = self.client.send_command('CONFIG FILTER EXPLICIT index=1 type=filter output=xml')

self.assertEqual(response.output, expected)
self.assertEqual(response.xml, expected)
self.assertEqual(response.ret, 100)

if __name__ == '__main__':
Expand Down
12 changes: 12 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[tox]
env_list = py3-urllib{1,2}

[testenv]
description = run unit tests
deps =
pytest
urllib1: urllib3>=1.25.0,<2.0.0
urllib2: urllib3>=2.0.0
commands =
pytest {posargs:tests}
passenv = *

0 comments on commit 2b578d9

Please sign in to comment.