Skip to content

Commit

Permalink
Configuration adds sso info from session file #TASK-7102
Browse files Browse the repository at this point in the history
  • Loading branch information
dapregi committed Dec 20, 2024
1 parent 6336ac1 commit a6e50d7
Showing 1 changed file with 89 additions and 34 deletions.
123 changes: 89 additions & 34 deletions opencga-client/src/main/python/pyopencga/opencga_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,101 +5,156 @@


class ClientConfiguration(object):
"""
Configuration class shared between OpenCGA python clients
"""
"""PyOpenCGA configuration"""

def __init__(self, config_input):
"""
:param config_input: a dict, or a yaml/json file containing an OpenCGA valid client configuration.
:param dict or str config_input: a dict, or a yaml/json file containing an OpenCGA valid client configuration.
"""

# Default config params
self._configuration_input = config_input
self._retry_config = None

if isinstance(config_input, dict):
self._config = config_input
else:
self._config = self._get_dictionary_from_file(config_input)

self._validate_configuration(self._config)
self.get_sso_login_info()

self._validate_configuration()

@staticmethod
def _get_dictionary_from_file(config_fpath):
config_fhand = open(config_fpath, 'r')

config_dict = None
if config_fpath.endswith('.yml') or config_fpath.endswith('.yaml'):
config_dict = yaml.safe_load(config_fhand)
if config_fpath.endswith('.json'):
elif config_fpath.endswith('.json'):
config_dict = json.loads(config_fhand.read())
else:
raise NotImplementedError('Input must be a dictionary, a yaml file (.yml or .yaml) or a json file (.json)')

config_fhand.close()

return config_dict

def _validate_configuration(self, config):
if config is None:
raise ValueError('Missing configuration dictionary')
def get_sso_login_info(self):
if (('sso_login' in self._config and self._config['sso_login']) or
('cookies' in self._config and self._config['cookies'])):
python_session_fhand = open(os.path.expanduser("~/.opencga/session.json"), 'r')
session_info = json.loads(python_session_fhand.read())
self._config['sso_login'] = True
self._config['cookies'] = session_info['cookies']
self._config['token'] = session_info['token']

if 'rest' not in config:
raise ValueError('Missing "rest" field from configuration dictionary. Please, pass a valid OpenCGA configuration dictionary')
def _validate_configuration(self):
if self._config is None:
raise ValueError('Missing configuration dictionary.')

if 'host' not in config['rest'] or not config['rest']['host']:
raise ValueError('Missing or empty "host" in OpenCGA configuration')
if 'rest' not in self._config:
raise ValueError('Missing configuration field "rest".')

self._validate_host(config['rest']['host'])
if 'host' not in self._config['rest'] or not self._config['rest']['host']:
raise ValueError('Missing configuration field "host".')

self._validate_host(self._config['rest']['host'])

@staticmethod
def _validate_host(host):
if not (host.startswith('http://') or host.startswith('https://')):
host = 'http://' + host
try:
r = requests.head(host, timeout=2)
if r.status_code == 302:
return
except requests.ConnectionError:
raise Exception('Unreachable host ' + host)
raise Exception('Unreachable host "{}"'.format(host))

@property
def configuration(self):
return self._config

@property
def host(self):
return self._config['rest']['host']

@host.setter
def host(self, new_host):
if not (new_host.startswith('http://') or new_host.startswith('https://')):
new_host = 'http://' + new_host
self._config['rest']['host'] = new_host
def host(self, host):
self._config['rest']['host'] = host
self._validate_host(host)

@property
def version(self):
return self._config['version'] if 'version' in self._config else 'v2'

@version.setter
def version(self, version):
self._config['version'] = version

@property
def sso_login(self):
if 'sso_login' in self._config and self._config['sso_login']:
return self._config['sso_login']
else:
return False

@sso_login.setter
def sso_login(self, sso_login):
if isinstance(str, sso_login):
self._config['sso_login'] = sso_login.lower() == 'true'
if isinstance(bool, sso_login):
self._config['sso_login'] = sso_login

@property
def cookies(self):
if 'cookies' in self._config and self._config['cookies']:
python_session_fhand = open(os.path.expanduser("~/.opencga/python_session.json"), 'r')
session_info = json.loads(python_session_fhand.read())
return session_info['cookies']
return self._config['cookies']
else:
return None

@cookies.setter
def cookies(self, cookies):
self._config['cookies'] = cookies

@property
def configuration(self):
return self._config
def token(self):
if 'token' in self._config and self._config['token']:
return self._config['token']
else:
return None

@token.setter
def token(self, token):
self._config['token'] = token

@property
def max_attempts(self):
"""Returns configured max_attempts or 1 if not configured"""
return 1
if 'max_attempts' in self._config and self._config['max_attempts']:
return self._config['max_attempts']
else:
return 1

@max_attempts.setter
def max_attempts(self, max_attempts):
self._config['max_attempts'] = max_attempts

@property
def min_retry_secs(self):
"""Returns configured min_retry_seconds or 0 if not configured"""
return 0
if 'min_retry_secs' in self._config and self._config['min_retry_secs']:
return self._config['min_retry_secs']
else:
return 0

@min_retry_secs.setter
def min_retry_secs(self, min_retry_secs):
self._config['min_retry_secs'] = min_retry_secs

@property
def max_retry_secs(self):
"""Returns configured max_retry_seconds or 0 if not configured"""
return 0
if 'max_retry_secs' in self._config and self._config['max_retry_secs']:
return self._config['max_retry_secs']
else:
return 0

@max_retry_secs.setter
def max_retry_secs(self, max_retry_secs):
self._config['max_retry_secs'] = max_retry_secs

0 comments on commit a6e50d7

Please sign in to comment.