From da3471c21d3f36e3f208bf7f344fadd77ab00c80 Mon Sep 17 00:00:00 2001 From: Christoph Souris Date: Tue, 2 Jul 2024 13:31:55 +0200 Subject: [PATCH] Feature/simple app user access (#55) * Add function to get current user. * Add function to get current user. * Fixed typo. * Work in progress. * Added documentation for class Availability. * Added support for current user API. * Fixed limiting. * Finalized current user support. * Linting fixes. * Removed obsolete imports. * Code cleanup. * Testing code tweaks; Adding support for TFA Settings at user level. * Added function to logout all users. --------- Co-authored-by: Christoph Souris --- .gitignore | 1 + c8y_api/_base_api.py | 1 + c8y_api/model/__init__.py | 1 + c8y_api/model/_base.py | 2 +- c8y_api/model/_parser.py | 6 +- c8y_api/model/administration.py | 392 +++++++++++++++++++++++++++----- c8y_api/model/managedobjects.py | 85 ++++--- integration_tests/test_users.py | 138 ++++++++++- requirements.txt | 1 + tests/model/current_user.json | 25 ++ tests/model/test_user.py | 59 ++++- tests/utils.py | 2 +- util/__init__.py | 3 - 13 files changed, 598 insertions(+), 118 deletions(-) create mode 100644 tests/model/current_user.json diff --git a/.gitignore b/.gitignore index 1141658..83b34f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .idea* .vscode* +.pytest* __pycache__ dist* build* diff --git a/c8y_api/_base_api.py b/c8y_api/_base_api.py index f6a6293..ed6c128 100644 --- a/c8y_api/_base_api.py +++ b/c8y_api/_base_api.py @@ -28,6 +28,7 @@ class CumulocityRestApi: ACCEPT_MANAGED_OBJECT = 'application/vnd.com.nsn.cumulocity.managedobject+json' ACCEPT_USER = 'application/vnd.com.nsn.cumulocity.user+json' + ACCEPT_CURRENT_USER = 'application/vnd.com.nsn.cumulocity.currentuser+json' ACCEPT_GLOBAL_ROLE = 'application/vnd.com.nsn.cumulocity.group+json' CONTENT_AUDIT_RECORD = 'application/vnd.com.nsn.cumulocity.auditrecord+json' CONTENT_MANAGED_OBJECT = 'application/vnd.com.nsn.cumulocity.managedobject+json' diff --git a/c8y_api/model/__init__.py b/c8y_api/model/__init__.py index 9d9217b..54ed546 100644 --- a/c8y_api/model/__init__.py +++ b/c8y_api/model/__init__.py @@ -54,6 +54,7 @@ 'Fragment', 'NamedObject', 'User', + 'CurrentUser', 'GlobalRole', 'Permission', 'ReadPermission', diff --git a/c8y_api/model/_base.py b/c8y_api/model/_base.py index ac3dcdd..6af2b98 100644 --- a/c8y_api/model/_base.py +++ b/c8y_api/model/_base.py @@ -559,9 +559,9 @@ def _iterate(self, base_query: str, page_number: int | None, limit: int, parse_f break for result in results: result.c8y = self.c8y # inject c8y connection into instance - yield result if limit and num_results >= limit: break + yield result num_results = num_results + 1 # when a specific page was specified we don't read more pages if page_number: diff --git a/c8y_api/model/_parser.py b/c8y_api/model/_parser.py index 85a99f7..5c190b2 100644 --- a/c8y_api/model/_parser.py +++ b/c8y_api/model/_parser.py @@ -18,8 +18,10 @@ class SimpleObjectParser(object): a simple field mapping dictionary. """ - def __init__(self, mapping): - self._obj_to_json = {**mapping, 'id': 'id'} + def __init__(self, mapping: dict = None, **kwargs): + if mapping is None: + mapping = {} + self._obj_to_json = {**mapping, 'id': 'id', **kwargs} self._json_to_object = {v: k for k, v in self._obj_to_json.items()} def from_json(self, obj_json, new_obj, skip=None): diff --git a/c8y_api/model/administration.py b/c8y_api/model/administration.py index 8611b33..5b14d76 100644 --- a/c8y_api/model/administration.py +++ b/c8y_api/model/administration.py @@ -374,15 +374,68 @@ def build_inventoryrole_assignment(object_id: int | str, *role_ids: int | str) - return {'managedObject': int(object_id), 'roles': [{'id': int(rid)} for rid in role_ids]} -class User(SimpleObject): - """Represents a User object within Cumulocity. +class TfaSettings: + """TFA settings representation within Cumulocity.""" - Notes: - - Only a limited set of properties are actually updatable. Others must - be set explicitly using the corresponding API (for example: global roles, permissions, - owner, etc.) - """ + _parser = SimpleObjectParser( + enabled='tfaEnabled', + enforced='tfaEnforced', + strategy='strategy', + last_request_time='lastTfaRequestTime') + + def __init__(self, enabled: bool = None, enforced: bool = None, strategy: str = None, last_request_time: str | datetime = None): + """Create a TfaSettings instance. + + Args: + enabled (bool): Whether TFA is enabled + enforced (bool): Whether TFA is enforced + strategy (str): TFA strategy, e.g. "SMS" or "TOTP" + last_request_time (str|datetime): The last time TFA was requested + """ + self.enabled = enabled + self.enforced = enforced + self.strategy = strategy + self.last_request_time = _DateUtil.ensure_timestring(last_request_time) + + @property + def last_request_datetime(self) -> datetime: + """Convert the last requests time to a Python datetime object. + + Returns: + Standard Python datetime object + """ + return _DateUtil.to_datetime(self.last_request_time) + + @classmethod + def from_json(cls, object_json: dict) -> TfaSettings: + """Create an object instance from Cumulocity JSON format. + + Caveat: this function is primarily for internal use and does not + return a full representation of the JSON. It is used for object + creation and update within Cumulocity. + + Args: + object_json (dict): The JSON to parse. + + Returns: + A TfaSettings instance. + """ + return cls._parser.from_json(object_json, TfaSettings()) + + def to_json(self) -> dict: + """Create a representation of this object in Cumulocity JSON format. + + Caveat: this function is primarily for internal use and does not + return a full representation of the object. It is used for object + creation and update within Cumulocity. + + Returns: + A JSON (nested dict) object. + """ + return self._parser.to_json(self) + +class _BaseUser(SimpleObject): _parser = SimpleObjectParser({ 'username': 'userName', 'password_strength': 'passwordStrength', @@ -399,33 +452,12 @@ class User(SimpleObject): '_password_reset_mail': 'sendPasswordResetEmail', '_last_password_change': 'lastPasswordChange'}) _resource = 'INVALID' # needs to be dynamically generated. see _build_resource_path - _accept = CumulocityRestApi.ACCEPT_USER - _custom_properties_parser = ComplexObjectParser({}, []) def __init__(self, c8y: CumulocityRestApi = None, username: str = None, email: str = None, - enabled: bool = True, display_name: str = None, password:str = None, + enabled: bool = True, display_name: str = None, password: str = None, first_name: str = None, last_name: str = None, phone: str = None, tfa_enabled: bool = None, require_password_reset: bool = None): - """ - Create a new User instance. - Args: - c8y (CumulocityRestApi): Cumulocity connection reference; needs - to be set for direct manipulation (create, delete). - username (str): The user's username. - email (str): The user's email address. - enabled (bool): Whether the user is enabled. - display_name (str): The user's display name - password (str): The initial password for the user. If omitted, - a newly created user will be sent a password reset link - (for human users). - first_name (str): The user's first name. - last_name (str): The user's last name. - phone (str): The user's phone number. - tfa_enabled (bool): Whether 2nd factor login is enabled. - require_password_reset (bool): Whether the password must be - reset by the user after the next login. - """ super().__init__(c8y) self.username = username self.password_strength = None @@ -442,10 +474,6 @@ def __init__(self, c8y: CumulocityRestApi = None, username: str = None, email: s self._u_require_password_reset = require_password_reset self._password_reset_mail = not self._u_password self._last_password_change = None - self.global_role_ids = set() - self.permission_ids = set() - self.application_ids = set() - # self.custom_properties = WithUpdatableFragments() display_name = SimpleObject.UpdatableProperty('_u_display_name') email = SimpleObject.UpdatableProperty('_u_email') @@ -468,12 +496,61 @@ def last_password_change_datetime(self) -> datetime: # hint: could be cached, but it is rarely accessed multiple times return _DateUtil.to_datetime(self._last_password_change) + +class User(_BaseUser): + """Represents a User object within Cumulocity. + + Notes: + - Only a limited set of properties are actually updatable. Others must + be set explicitly using the corresponding API (for example: global + roles, permissions, owner, etc.) + """ + _resource = 'INVALID' # needs to be dynamically generated. see _build_resource_path + _accept = CumulocityRestApi.ACCEPT_USER + _custom_properties_parser = ComplexObjectParser({}, []) + + def __init__(self, c8y=None, username=None, email=None, enabled=True, display_name=None, + password=None, first_name=None, last_name=None, phone=None, + tfa_enabled=None, require_password_reset=None): + """ + Create a new User instance. + + Args: + c8y (CumulocityRestApi): Cumulocity connection reference; needs + to be set for direct manipulation (create, delete). + username (str): The user's username. + email (str): The user's email address. + enabled (bool): Whether the user is enabled. + display_name (str): The user's display name + password (str): The initial password for the user. If omitted, + a newly created user will be sent a password reset link + (for human users). + first_name (str): The user's first name. + last_name (str): The user's last name. + phone (str): The user's phone number. + tfa_enabled (bool): Whether 2nd factor login is enabled. + require_password_reset (bool): Whether the password must be + reset by the user after the next login. + """ + super().__init__(c8y, + username=username, email=email, enabled=enabled, + display_name=display_name, password=password, + first_name=first_name, last_name=last_name, + phone=phone, tfa_enabled=tfa_enabled, + require_password_reset=require_password_reset) + self.global_role_ids = set() + self.permission_ids = set() + self.application_ids = set() + # self.effective_permission_ids = set() + # self.custom_properties = WithUpdatableFragments() + + @classmethod def from_json(cls, json: dict) -> User: user = cls._from_json(json, User()) - if json['groups'] and json['groups']['references']: + if 'groups' in json and 'references' in json['groups']: user.global_role_ids = {str(ref['group']['id']) for ref in json['groups']['references']} - if json['roles'] and json['roles']['references']: + if 'roles' in json and 'references' in json['roles']: user.permission_ids = {ref['role']['id'] for ref in json['roles']['references']} if 'applications' in json: user.application_ids = {x['id'] for x in json['applications']} @@ -511,19 +588,6 @@ def delete(self): """Delete the User within the database.""" self._delete() - def update_password(self, new_password: str): - """Update the password. - - This operation is executed immediately. No additional call to - the ``update`` function required. - - Args: - new_password (str): The new password to set - """ - self._assert_c8y() - self._assert_username() - Users(self.c8y).set_password(self.username, new_password) - def set_owner(self, user_id: str): """Set the owner for this user. @@ -657,6 +721,189 @@ def _assert_username(self): raise ValueError("Username must be provided.") +class CurrentUser(_BaseUser): + """Represents a "current" User object within Cumulocity. + + See also https://cumulocity.com/api/core/#tag/Current-User + """ + + class TotpActivity: + """User's TOTP activity information.""" + def __init__(self, is_active: bool = None): + self.is_active = is_active + + @classmethod + def from_json(cls, object_json: dict) -> CurrentUser.TotpActivity: + """Create an object instance from Cumulocity JSON format. + + Caveat: this function is primarily for internal use and does not + return a full representation of the JSON. It is used for object + creation and update within Cumulocity. + + Args: + object_json (dict): The JSON to parse. + + Returns: + A TotpActivity instance. + """ + obj = CurrentUser.TotpActivity() + obj.is_active = object_json['isActive'] + return obj + + def to_json(self) -> dict: + """Create a representation of this object in Cumulocity JSON format. + + Caveat: this function is primarily for internal use and does not + return a full representation of the object. It is used for object + creation and update within Cumulocity. + + Returns: + A JSON (nested dict) object. + """ + return {'isActive': self.is_active} + + _resource = '/user/currentUser' + _accept = CumulocityRestApi.ACCEPT_CURRENT_USER + + def __init__(self, c8y:CumulocityRestApi = None): + super().__init__(c8y) + self.effective_permission_ids = {} + + @classmethod + def from_json(cls, json: dict) -> CurrentUser: + user:CurrentUser = cls._from_json(json, CurrentUser()) + if 'effectiveRoles' in json: + user.effective_permission_ids = {ref['id'] for ref in json['effectiveRoles']} + return user + + # no need to override the standard to_json method + + def update(self) -> CurrentUser: + """Update the current user within the database. + + Returns: + A fresh CurrentUser object representing what the updated + state within the database (including the ID). + """ + self._assert_c8y() + result_json = self.c8y.put(self._resource, self.to_diff_json(), accept=self._accept) + user = self.from_json(result_json) + user.c8y = self.c8y + return user + + def update_password(self, current_password: str, new_password: str): + """Update the current user's password: + + Args: + current_password(str): the current password + new_password (str): the new password to set + """ + self._assert_c8y() + Users(self.c8y).set_current_password(current_password, new_password) + + def get_tfa_settings(self) -> TfaSettings: + """Read the TFA settings for the current user. + + Returns: + A TfaSettings instance. + """ + self._assert_c8y() + return Users(self.c8y).get_tfa_settings(self.username) + + def _read_totp_activity(self) -> dict: + self._assert_c8y() + return self.c8y.get(f'{self._resource}/totpSecret/activity') + + def _write_totp_activity(self, activity_json: dict): + self._assert_c8y() + self.c8y.post(f'{self._resource}/totpSecret/activity', activity_json) + + def get_totp_activity(self) -> TotpActivity: + """Read the TOTP activity details. + + Returns: + A TotpActivity instance. + """ + return CurrentUser.TotpActivity.from_json(self._read_totp_activity()) + + def set_totp_activity(self, activity: TotpActivity): + """Update the TFA feature activity details. + + Args: + activity(TotpActivity): The TFA activity details. + """ + self._write_totp_activity(activity.to_json()) + + def get_totp_enabled(self) -> bool: + """Check whether the TOTP feature is enabled for the current user. + + Returns: + True if the feature is enabled, False otherwise. + """ + try: + return self._read_totp_activity()['isActive'] + except KeyError: + return False + + def _set_totp_enabled(self, enabled: bool): + """Enable/disable the TOTP feature for the current user. + + Args: + enabled (bool): Whether to enable the feature. + """ + self._write_totp_activity({'isActive': enabled}) + + def enable_totp(self): + """Enable the TOTP feature for the current user.""" + self._set_totp_enabled(True) + + def disable_totp(self): + """Enable the TOTP feature for the current user.""" + self._set_totp_enabled(False) + + def generate_totp_secret(self) -> (str, str): + """Generate a new TOTP secret for the current user. + + Returns: + A (str, str) tuple of the raw secret token and the secret URL. + """ + self._assert_c8y() + result_json = self.c8y.post(f'{self._resource}/totpSecret', {}) + return result_json['rawSecret'], result_json['secretQrUrl'] + + def verify_totp(self, code: str): + """Verify a TFA/TOTP token. + + Args: + code (str): A TOTP token + + Raises: + ValueError if the token is invalid/could not be verified. + """ + self._assert_c8y() + self.c8y.post(f'{self._resource}/totpSecret/verify', {'code': code}) + + def is_valid_totp(self, code: str) -> bool: + """Verify a TFA/TOTP token. + + Args: + code (str): A TOTP token + + Returns: + True if the token was valid, False otherwise, + """ + try: + self.verify_totp(code) + return True + except ValueError: + return False + + def revoke_totp_secret(self): + """Revoke the currently set TFA/TOTP secret for the current user.""" + self._assert_c8y() + Users(self.c8y).revoke_totp_secret(self.username) + + class InventoryRoles(CumulocityResource): """Provides access to the InventoryRole API. @@ -783,7 +1030,7 @@ def __init__(self, c8y): super().__init__(c8y, 'user/' + c8y.tenant_id + '/users') self.__groups = GlobalRoles(c8y) - def get(self, username): + def get(self, username: str): """Retrieve a specific user. Args: @@ -796,6 +1043,16 @@ def get(self, username): user.c8y = self.c8y # inject c8y connection into instance return user + def get_current(self) -> CurrentUser: + """Retrieve current user. + + Returns: + CurrentUser instance + """ + user = CurrentUser.from_json(self.c8y.get('/user/currentUser')) + user.c8y = self.c8y + return user + def select(self, username: str = None, groups: str | int | GlobalRole | List[str] | List[int] | List[GlobalRole] = None, @@ -870,14 +1127,24 @@ def create(self, *users): """ super()._create(lambda u: u.to_full_json(), *users) - def set_password(self, username: str, new_password: str): - """Set the password of a user. + def logout_all(self): + """Terminate all user's sessions.""" + self.c8y.post(f'/user/logout/{self.c8y.tenant_id}/allUsers', json={}) + + def set_current_password(self, current_password: str, new_password: str): + """Set the password of the current user. + + Note: This automatically updates the connection with the new auth information. Args: - username (str): Username of a Cumulocity user + current_password (str): The current password new_password (str): The new password to set """ - self.c8y.put(self.resource + '/' + username, {'password': new_password}) + request_json = { + 'currentUserPassword': current_password, + 'newPassword': new_password} + self.c8y.put('/user/currentUser/password', request_json) + self.c8y.auth.password = new_password def set_owner(self, user_id: str, owner_id: str | None): """Set the owner of a given user. @@ -906,6 +1173,25 @@ def set_delegate(self, user_id: str, delegate_id: str | None): else: self.c8y.delete(self.build_object_path(user_id) + '/delegatedby') + def get_tfa_settings(self, user_id: str) -> TfaSettings: + """Read the TFA settings of a given user. + + Args: + user_id (str): The user to query the settings for + + Returns: + A TfaSettings object + """ + return TfaSettings.from_json(self.c8y.get(self.build_object_path(user_id) + '/tfa')) + + def revoke_totp_secret(self, user_id: str): + """Revoke the currently set TFA/TOTP secret for a user. + + Args: + user_id (str): The user to set an owner for + """ + self.c8y.delete(self.build_object_path(user_id) + '/totpSecret/revoke') + class GlobalRoles(CumulocityResource): """Provides access to the Global Role API. diff --git a/c8y_api/model/managedobjects.py b/c8y_api/model/managedobjects.py index 84c5206..759a77c 100644 --- a/c8y_api/model/managedobjects.py +++ b/c8y_api/model/managedobjects.py @@ -15,38 +15,36 @@ class NamedObject(object): This class is used to model Cumulocity references. """ - def __init__(self, id: str = None, name: str = None): # noqa + def __init__(self, id=None, name=None): # noqa """ Create a new instance. - Args: - id (str): Database ID of the object - name (str): Name of the object + :param id: Database ID of the object + :param name: Name of the object + :returns: New NamedObject instance """ self.id = id self.name = name @classmethod - def from_json(cls, object_json: dict) -> NamedObject: + def from_json(cls, object_json): """ Build a new instance from JSON. The JSON is assumed to be in the format as it is used by the Cumulocity REST API. - Args: - object_json (dict): JSON object (nested dictionary) + + :param object_json: JSON object (nested dictionary) representing a named object within Cumulocity - Returns: - NamedObject instance + :returns: NamedObject instance """ return NamedObject(id=object_json['id'], name=object_json.get('name', '')) - def to_json(self) -> dict: + def to_json(self): """ Convert the instance to JSON. The JSON format produced by this function is what is used by the Cumulocity REST API. - Returns: - JSON object (nested dictionary) + :returns: JSON object (nested dictionary) """ return {'id': self.id, 'name': self.name} @@ -58,7 +56,7 @@ class Fragment(object): within their data model. For example, every measurement contains such a fragment, holding - the actual data points:: + the actual data points: "pt_current": { "CURR": { @@ -67,29 +65,31 @@ class Fragment(object): } } - A fragment has a name (`pt_current` in above example) and can virtually + A fragment has a name (*pt_current* in above example) and can virtually define any substructure. """ def __init__(self, name: str, **kwargs): """ Create a new fragment. - Args: + Params name (str): Name of the fragment - **kwargs: Named elements of the fragment. Each element + kwargs: Named elements of the fragment. Each element can either be a simple value of a complex substructure modelled as nested dictionary. + Returns: + New Fragment instance """ self.name = name self.items = kwargs - def __getattr__(self, name: str) -> str|int|float|bool|dict: + def __getattr__(self, name: str): """ Get a specific element of the fragment. Args: name (str): Name of the element Returns: - Value of the element. Can be a simple value or a + Value of the element. May be a simple value or a complex substructure defined as nested dictionary. """ item = self.items[name] @@ -106,16 +106,13 @@ def has(self, name: str) -> bool: """ return name in self.items - def add_element(self, name: str, element: Any|dict) -> Fragment: + def add_element(self, name, element): """ Add an element. - Args: - name (str): Name of the element. - element (Any|dict): Value of the element, either a simple - value or a complex substructure defined as nested dictionary. - - Returns: - `self` reference + :param name: Name of the element + :param element: Value of the element, either a simple value or + a complex substructure defined as nested dictionary. + :returns: self """ self.items[name] = element return self @@ -154,13 +151,17 @@ def interval_minutes(self) -> int: @classmethod def from_json(cls, object_json: dict) -> Availability: - """Parse from Cumulocity JSON. + """Create an object instance from Cumulocity JSON format. + + Caveat: this function is primarily for internal use and does not + return a full representation of the JSON. It is used for object + creation and update within Cumulocity. Args: - object_json (dict): Cumulocity JSON representation + object_json (dict): The JSON to parse. Returns: - A new Availability instance. + A Availability instance. """ obj = Availability() obj.device_id = object_json['deviceId'] @@ -171,6 +172,7 @@ def from_json(cls, object_json: dict) -> Availability: obj.last_message = object_json['lastMessage'] return obj + class ManagedObjectUtil: """Utility functions to work with the Inventory API.""" @@ -241,13 +243,13 @@ class ManagedObject(ComplexObject): 'deviceParents', 'assetParents', 'additionParents']) class Resource: - """Inventory sub-resources.""" + """Standard resource names.""" AVAILABILITY = 'availability' SUPPORTED_MEASUREMENTS = 'supportedMeasurements' SUPPORTED_SERIES = 'supportedSeries' class Fragment: - """Standard fragments.""" + """Standard fragment names.""" SUPPORTED_MEASUREMENTS = 'c8y_SupportedMeasurements' SUPPORTED_SERIES = 'c8y_SupportedSeries' @@ -374,7 +376,7 @@ def create(self) -> ManagedObject: object within the database. This instance can be used to get at the ID of the new managed object. - See also function `Inventory.create` which doesn't parse the result. + See also function Inventory.create which doesn't parse the result. """ return self._create() @@ -385,7 +387,7 @@ def update(self) -> ManagedObject: A fresh ManagedObject instance representing the updated object within the database. - See also function `Inventory.update` which doesn't parse the result. + See also function Inventory.update which doesn't parse the result. """ return self._update() @@ -397,10 +399,10 @@ def apply_to(self, other_id: str | int) -> ManagedObject: Args: other_id (str|int): Database ID of the event to update. Returns: - A fresh `ManagedObject` instance representing the updated + A fresh ManagedObject instance representing the updated object within the database. - See also function `Inventory.apply_to` which doesn't parse the result. + See also function Inventory.apply_to which doesn't parse the result. """ self._assert_c8y() # put diff json to another object (by ID) @@ -620,7 +622,7 @@ class DeviceGroup(ManagedObject): def __init__(self, c8y=None, root: bool = False, name: str = None, owner: str = None, **kwargs): """ Build a new DeviceGroup object. - The type of a device group will always be either `c8y_DeviceGroup` + A type of a device group will always be either `c8y_DeviceGroup` or `c8y_DeviceSubGroup` (depending on it's level). This is handled by the API. @@ -680,8 +682,7 @@ def create_child(self, name: str, owner: str = None, **kwargs) -> DeviceGroup: child_json = DeviceGroup(name=name, owner=owner if owner else self.owner, **kwargs).to_json() response_json = self.c8y.post(self._build_object_path() + '/childAssets', json=child_json, - accept=CumulocityRestApi.ACCEPT_MANAGED_OBJECT, - content_type=CumulocityRestApi.CONTENT_MANAGED_OBJECT) + accept=CumulocityRestApi.ACCEPT_MANAGED_OBJECT) result = self.from_json(response_json) result.c8y = self.c8y return result @@ -692,12 +693,11 @@ def create(self) -> DeviceGroup: This operation will create the group and all added child groups within the database. - Returns: - A fresh DeviceGroup instance representing the created + :returns: A fresh DeviceGroup instance representing the created object within the database. This instance can be used to get at the ID of the new object. - See also function `DeviceGroupInventory.create` which doesn't parse + See also function DeviceGroupInventory.create which doesn't parse the result. """ return super()._create() @@ -707,8 +707,7 @@ def update(self) -> DeviceGroup: Note: Removing child groups is currently not supported. - Returns: - A fresh DeviceGroup instance representing the updated + :returns: A fresh DeviceGroup instance representing the updated object within the database. """ return super()._update() diff --git a/integration_tests/test_users.py b/integration_tests/test_users.py index 1547a6c..42375ce 100644 --- a/integration_tests/test_users.py +++ b/integration_tests/test_users.py @@ -6,7 +6,13 @@ # pylint: disable=redefined-outer-name +import secrets +import string +import time +from typing import Union + import pytest +import pyotp from c8y_api import CumulocityApi from c8y_api.model import User @@ -14,6 +20,11 @@ from util.testing_util import RandomNameGenerator +def generate_password(): + """Generate a strong password meeting Cumulocity requirements.""" + alphabet = string.ascii_letters + string.digits + '_-.#&$' + return 'Aa0.' + ''.join(secrets.choice(alphabet) for _ in range(12)) + def test_CRUD(live_c8y: CumulocityApi): # noqa (case) """Verify that basic CRUD functionality works.""" @@ -45,6 +56,75 @@ def test_CRUD(live_c8y: CumulocityApi): # noqa (case) assert user.username in str(e) +def test_get_current(live_c8y: CumulocityApi): + """Verify that the current user can be read.""" + current1 = live_c8y.users.get(live_c8y.username) + current2 = live_c8y.users.get_current() + + assert current1.username == current2.username + assert current1.id == current2.id + + assert all(i in current2.effective_permission_ids for i in current1.permission_ids) + + +def test_current_update(live_c8y: CumulocityApi, user_c8y: CumulocityApi): + """Verify that updating the current user works as expected.""" + current_user = user_c8y.users.get_current() + + current_user.first_name = "New" + current_user = current_user.update() + assert current_user.first_name == "New" + + +def test_current_totp(live_c8y: CumulocityApi, user_c8y: CumulocityApi): + """Verify that the TOTP settings can be updated for the current user.""" + current_user = user_c8y.users.get_current() + + # a new user without TFA won't have the TOTP activity set up + with pytest.raises(KeyError): + current_user.get_totp_activity() + + # the auxiliary function should intercept the KeyError + assert not current_user.get_totp_enabled() + + # generating a secret won't enable TOTP + secret, _ = current_user.generate_totp_secret() + assert not current_user.get_totp_activity().is_active + + # explicitly enabling the feature using different methods + current_user.enable_totp() + assert current_user.get_totp_enabled() + assert current_user.get_totp_activity().is_active + + # generate and verify TOTP codes + totp = pyotp.TOTP(secret) + code = totp.now() + current_user.verify_totp(code) + + # wait for the code to become invalid + while code == totp.now(): + time.sleep(1) + # Cumulocity has a tolerance for the last code + time.sleep(30) + + assert not current_user.is_valid_totp(code) + with pytest.raises(ValueError) as ex: + current_user.verify_totp(code) + assert '403' in str(ex) + assert 'Invalid verification code' in str(ex) + + # Simply disabling the TOTP feature is no longer supported (v10.20) + with pytest.raises(ValueError) as ex: + current_user.disable_totp() + assert '403' in str(ex) + assert 'Cannot deactivate TOTP setup!' in str(ex) + + # Revoking does automatically disable the feature + current_user.revoke_totp_secret() + assert not current_user.get_totp_enabled() + assert not current_user.get_totp_activity().is_active + + @pytest.fixture(scope='function') def user_factory(live_c8y: CumulocityApi): """Provides a user factory function which removes the created users after @@ -52,11 +132,15 @@ def user_factory(live_c8y: CumulocityApi): created_users = [] - def factory_fun(): + def factory_fun(with_password=False) -> Union[User, tuple[User, str]]: username = RandomNameGenerator.random_name(2) email = f'{username}@software.ag' - user = User(c8y=live_c8y, username=username, email=email).create() + password = generate_password() + print(f"User: {email}, Password: {password}") + user = User(c8y=live_c8y, username=username, password=password, email=email).create() created_users.append(user) + if with_password: + return user, password return user yield factory_fun @@ -65,18 +149,38 @@ def factory_fun(): u.delete() -def test_set_password(live_c8y: CumulocityApi, user_factory): - """Verify that the password of a user can be set and removed.""" +@pytest.fixture(scope='function') +def user_c8y(live_c8y: CumulocityApi, user_factory): + """Provides a Cumulocity connection for a new user.""" + new_user, password = user_factory(with_password=True) + + return CumulocityApi(base_url=live_c8y.base_url, tenant_id=live_c8y.tenant_id, + username=new_user.username, password=password) + + +def test_current_set_password(live_c8y: CumulocityApi, user_c8y): + """Verify that the password of a user can not be set.""" - user = user_factory() + user = user_c8y.users.get_current() + # password strength requirements are tested before updating with pytest.raises(ValueError) as ve: - user.update_password('pw') + user.update_password(user_c8y.auth.password, 'pw') assert 'least' in str(ve) - # this is not a real password, obviously. - # but it should meet the password requirements - user.update_password('ja89NAk,2k23jhL_Paasd0') + # store last password change datetime + before_datetime = user.last_password_change_datetime + + # updating for the current user should be ok + new_password = generate_password() + user.update_password(user_c8y.auth.password, new_password) + + # password timestamp should have been updated + user = user_c8y.users.get_current() + assert user.last_password_change_datetime != before_datetime + + # follow-up requests should still work + assert len(user_c8y.inventory.get_all(limit=10)) == 10 def test_set_owner(live_c8y: CumulocityApi, user_factory): @@ -115,3 +219,19 @@ def test_set_delegate(live_c8y: CumulocityApi, user_factory): db_user1 = live_c8y.users.get(user1.username) # -> owner property must be unset assert not db_user1.delegated_by + + +def test_get_tfa_settings(live_c8y, user_c8y): + """Verify that the TFA settings can be retrieved as expected.""" + + # all users have TFA settings + tfa_settings = live_c8y.users.get_tfa_settings(user_c8y.username) + assert tfa_settings + assert not tfa_settings.enabled + + # to enable TFA, we first generate a secret, then enable + # this is only possible for the current user + current_user = user_c8y.users.get_current() + current_user.generate_totp_secret() + current_user.enable_totp() + assert current_user.get_tfa_settings().enabled diff --git a/requirements.txt b/requirements.txt index 8999db5..24ad9bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,4 @@ inputimeout Flask websockets pandas +PyOTP diff --git a/tests/model/current_user.json b/tests/model/current_user.json new file mode 100644 index 0000000..2d7d63b --- /dev/null +++ b/tests/model/current_user.json @@ -0,0 +1,25 @@ +{ + "shouldResetPassword": false, + "userName": "user.name@cumulocity.com", + "self": "https://t12345.cumulocity.com/user/currentUser", + "effectiveRoles": [ + { + "name": "ROLE_IDENTITY_ADMIN", + "self": "https://t12345.cumulocity.com/user/roles/ROLE_IDENTITY_ADMIN", + "id": "ROLE_IDENTITY_ADMIN" + }, + { + "name": "ROLE_REMOTE_ACCESS_ADMIN", + "self": "https://t12345.cumulocity.com/user/roles/ROLE_REMOTE_ACCESS_ADMIN", + "id": "ROLE_REMOTE_ACCESS_ADMIN" + }, + { + "name": "ROLE_RETENTION_RULE_ADMIN", + "self": "https://t12345.cumulocity.com/user/roles/ROLE_RETENTION_RULE_ADMIN", + "id": "ROLE_RETENTION_RULE_ADMIN" + } + ], + "id": "user.name@cumulocity.com", + "lastPasswordChange": "2021-11-17T08:09:53.257Z", + "email": "user.name@cumulocity.com" +} \ No newline at end of file diff --git a/tests/model/test_user.py b/tests/model/test_user.py index ce78a84..cfb93ed 100644 --- a/tests/model/test_user.py +++ b/tests/model/test_user.py @@ -3,7 +3,7 @@ # and/or its subsidiaries and/or its affiliates and/or their licensors. # Use, reproduction, transfer, publication or disclosure is prohibited except # as specifically provided for in your License Agreement with Software AG. - +import datetime # pylint: disable=redefined-outer-name import json @@ -11,7 +11,7 @@ import pytest -from c8y_api.model import User +from c8y_api.model import User, CurrentUser, TfaSettings @pytest.fixture(scope='function') @@ -55,6 +55,55 @@ def test_parsing(): assert user.permission_ids == permission_ids +def test_current_parsing(): + """Verify that parsing a "current" User from JSON works.""" + + # 1) read a sample user from file + path = os.path.dirname(__file__) + '/current_user.json' + with open(path, encoding='utf-8', mode='rt') as f: + user_json = json.load(f) + + user = CurrentUser.from_json(user_json) + + # 2) verify that all parsed fields match the file counterpart + # including fields from abstract base class + assert user.id == user_json['id'] + assert user.username == user_json['userName'] + assert user.email == user_json['email'] + + # 3) Current user specific sets are being parsed + assert all(r['id'] in user.effective_permission_ids for r in user_json['effectiveRoles']) + + +def test_tfa_settings_parsing(): + """Verify that TFA settings can be parsed from JSON as expected.""" + data = {"tfaEnabled": True, + "tfaEnforced": True, + "strategy": "TOTP", + "lastTfaRequestTime": "2022-08-01T20:00:00.123Z"} + + tfa_settings = TfaSettings.from_json(data) + assert tfa_settings.enabled == data['tfaEnabled'] + assert tfa_settings.enforced == data['tfaEnforced'] + assert tfa_settings.strategy == data['strategy'] + assert tfa_settings.last_request_time == data['lastTfaRequestTime'] + + +def test_tfa_settings_formatting(): + """Verify that TFA settings can be formatted to JSON as expected.""" + tfa_settings = TfaSettings( + enabled=True, + enforced=True, + strategy='SMS', + last_request_time=datetime.datetime.utcnow() + ) + data = tfa_settings.to_json() + data['tfaEnabled'] = tfa_settings.enabled + data['tfaEnforced'] = tfa_settings.enforced + data['strategy'] = tfa_settings.strategy + data['lastTfaRequestTime'] = tfa_settings.last_request_time + + def test_formatting(sample_user: User): """Verify that user formatting works.""" user_json = sample_user.to_json() @@ -72,6 +121,7 @@ def test_updating(sample_user: User): sample_user.password_strength = 'x' sample_user.global_role_ids = {'x'} sample_user.permission_ids = {'x'} + sample_user.effective_permission_ids = {'x'} # -> no changes are recorded, diff is empty assert not sample_user.get_updates() @@ -79,15 +129,12 @@ def test_updating(sample_user: User): # 2) other fields can be updated sample_user.email = 'x' - sample_user.enabled = not sample_user.enabled - sample_user.first_name = 'x' - sample_user.last_name = 'x' sample_user.display_name = 'x' sample_user.tfa_enabled = not sample_user.tfa_enabled sample_user.require_password_reset = not sample_user.require_password_reset # -> we expect an according number of recorded changes - assert len(sample_user.get_updates()) == 7 + assert len(sample_user.get_updates()) == 4 # -> all changes should be reflected in the diff diff_json = sample_user.to_diff_json() diff --git a/tests/utils.py b/tests/utils.py index 2abe0a3..7b62c9f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -15,7 +15,7 @@ from c8y_api.model._base import CumulocityObject -from util.testing_util import RandomNameGenerator +from testing_util import RandomNameGenerator def get_ids(objs: List[CumulocityObject]) -> Set[str]: diff --git a/util/__init__.py b/util/__init__.py index 0871503..9f6c5db 100644 --- a/util/__init__.py +++ b/util/__init__.py @@ -3,6 +3,3 @@ # and/or its subsidiaries and/or its affiliates and/or their licensors. # Use, reproduction, transfer, publication or disclosure is prohibited except # as specifically provided for in your License Agreement with Software AG. - -from util.testing_util import * -from util.microservice_util import *