Skip to content

Commit

Permalink
Add support for TOTP (#6)
Browse files Browse the repository at this point in the history
* Add support for TOTP
* Add informative message when bruteforce protection is running.

Co-authored-by: remi.pauchet <remi.pauchet@stormshield.eu>
  • Loading branch information
jfhren-stormshield and remip2 authored Apr 28, 2022
1 parent 577dc14 commit d115e9d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 29 deletions.
60 changes: 37 additions & 23 deletions stormshield/sns/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pygments.formatters import TerminalFormatter
from colorlog import LevelFormatter

from stormshield.sns.sslclient import SSLClient, ServerError
from stormshield.sns.sslclient import SSLClient, ServerError, TOTPNeededError
from stormshield.sns.sslclient.__version__ import __version__ as libversion

# define missing exception for python2
Expand Down Expand Up @@ -87,9 +87,10 @@ def main():
group.add_argument("-t", "--timeout", help="Connection timeout in seconds", default=-1, type=int)

group = parser.add_argument_group("Authentication parameters")
group.add_argument("-u", "--user", help="User name", default="admin")
group.add_argument("-p", "--password", help="Password", default=None)
group.add_argument("-U", "--usercert", help="User certificate file", default=None)
group.add_argument("-u", "--user", help="User name", default="admin")
group.add_argument("-p", "--password", help="Password", default=None)
group.add_argument("-t", "--totp", help="Time-based one time password", default=None)
group.add_argument("-U", "--usercert", help="User certificate file", default=None)

group = parser.add_argument_group("SSL parameters")
group.add_argument("-C", "--cabundle", help="CA bundle file", default=None)
Expand Down Expand Up @@ -119,6 +120,7 @@ def main():
usercert = args.usercert
cabundle = args.cabundle
password = args.password
totp = args.totp
port = args.port
proxy = args.proxy
timeout = args.timeout
Expand Down Expand Up @@ -200,27 +202,39 @@ def logcommand(self, message, *args, **kwargs):
if timeout == -1:
timeout = None

try:
client = SSLClient(
host=host, ip=ip, port=port, user=user, password=password,
sslverifypeer=sslverifypeer, sslverifyhost=sslverifyhost,
credentials=credentials, proxy=proxy, timeout=timeout,
usercert=usercert, cabundle=cabundle, autoconnect=False)
except Exception as exception:
logging.error(str(exception))
sys.exit(1)
# first try without totp, if needed ask for totp
for i in range(0, 2):
try:
client = SSLClient(
host=host, ip=ip, port=port, user=user, password=password, totp=totp,
sslverifypeer=sslverifypeer, sslverifyhost=sslverifyhost,
credentials=credentials, proxy=proxy, timeout=timeout,
usercert=usercert, cabundle=cabundle, autoconnect=False)
except Exception as exception:
logging.error(str(exception))
sys.exit(1)

try:
client.connect()
except Exception as exception:
search = re.search(r'doesn\'t match \'(.*)\'', str(exception))
if search:
logging.error(("Appliance name can't be verified, to force connection "
"use \"--host %s --ip %s\" or \"--no-sslverifyhost\" "
"options"), search.group(1), host)
try:
client.connect()
except TOTPNeededError as exception:
if i == 0 and totp is None:
logging.warning("Second factor authentication is required.")
totp = getpass.getpass("Totp:")
continue
else:
logging.error(str(exception))
sys.exit(1)
except Exception as exception:
search = re.search(r'doesn\'t match \'(.*)\'', str(exception))
if search:
logging.error(("Appliance name can't be verified, to force connection "
"use \"--host %s --ip %s\" or \"--no-sslverifyhost\" "
"options"), search.group(1), host)
else:
logging.error(str(exception))
sys.exit(1)
else:
logging.error(str(exception))
sys.exit(1)
break

# disconnect gracefuly at exit
atexit.register(client.disconnect)
Expand Down
31 changes: 25 additions & 6 deletions stormshield/sns/sslclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ class MissingAuth(ValueError):
class MissingCABundle(ValueError):
""" The certificate authority bundle is missing """

class TOTPNeededError(Exception):
""" Time-base one time password needed """

class AuthenticationError(Exception):
""" authentication failed """

Expand Down Expand Up @@ -261,18 +264,21 @@ class SSLClient:
SERVERD_WAIT_UPLOAD = "00a00300"
AUTH_SUCCESS = "AUTH_SUCCESS"
AUTH_FAILED = "AUTH_FAILED"
NEED_TOTP_AUTH = "NEED_TOTP_AUTH"
ERR_BRUTEFORCE = "ERR_BRUTEFORCE"

fileregexp = re.compile(r'(.*)\s*(\<|\>)\s*(.*)\s*')

CHUNK_SIZE = 10240 # bytes

def __init__(self, user='admin', password=None, host=None, ip=None, port=443, cabundle=None,
def __init__(self, user='admin', password=None, totp=None, host=None, ip=None, port=443, cabundle=None,
sslverifypeer=True, sslverifyhost=True, credentials=None,
usercert=None, autoconnect=True, proxy=None, timeout=None):
""":class:`SSLclient <SSLClient>` constructor.
:param user: Optional user name.
:param password: Optional password.
:param totp: Optional time-based one time password.
:param host: hostname to connect or certificate common name (appliance serial).
:param ip: Optional ip address to connect.
:param port: Optional port number.
Expand All @@ -288,6 +294,7 @@ def __init__(self, user='admin', password=None, host=None, ip=None, port=443, ca

self.user = user
self.password = password
self.totp = totp
self.host = host
self.ip = ip
self.port = port
Expand All @@ -310,6 +317,8 @@ def __init__(self, user='admin', password=None, host=None, ip=None, port=443, ca
raise MissingHost("Host parameter must be provided")
if password is None and usercert is None:
raise MissingAuth("Password parameter must be provided")
if password is None and totp is not None:
raise MissingAuth("Password parameter must be provided when totp parameter is provided")
if usercert is not None and not os.path.isfile(usercert):
raise MissingAuth("User certificate not found")
if cabundle is None:
Expand Down Expand Up @@ -384,12 +393,16 @@ def connect(self):
headers=self.headers, **self.conn_options)
else:
# password authentication
data = { 'uid':base64.b64encode(self.user.encode('utf-8')),
'pswd':base64.b64encode(self.password.encode('utf-8')),
'app':self.app }

if self.totp is not None:
data['totp']=base64.b64encode(self.totp.encode('utf-8'))

request = self.session.post(
self.baseurl + '/auth/admin.html',
data={
'uid':base64.b64encode(self.user.encode('utf-8')),
'pswd':base64.b64encode(self.password.encode('utf-8')),
'app':self.app},
data,
headers=self.headers,
**self.conn_options)

Expand All @@ -401,7 +414,13 @@ def connect(self):
except (ElementTree.ParseError, KeyError):
raise ServerError("Can't decode authentication result")

if msg != self.AUTH_SUCCESS:
if msg == self.ERR_BRUTEFORCE:
nws_node = ElementTree.fromstring(request.content)
delay = nws_node.attrib['delay']
raise AuthenticationError("Brut force detected, try again after " + delay + " seconds.")
if msg == self.NEED_TOTP_AUTH:
raise TOTPNeededError("TOTP is needed")
if msg != self.AUTH_SUCCESS:
raise AuthenticationError("Authentication failed")

# 2. Serverd session
Expand Down

0 comments on commit d115e9d

Please sign in to comment.