From 22d64f6e27f5298d60aba46924e2f2ff9e7fbfe6 Mon Sep 17 00:00:00 2001 From: llaszuk-r7 <99184394+llaszuk-r7@users.noreply.github.com> Date: Wed, 27 Sep 2023 12:04:30 +0200 Subject: [PATCH] plgn-380 salesforce add deduping (#1993) --- plugins/salesforce/help.md | 2 +- .../tasks/monitor_users/task.py | 36 ++++++++++++++++++- .../komand_salesforce/util/event.py | 27 ++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 plugins/salesforce/komand_salesforce/util/event.py diff --git a/plugins/salesforce/help.md b/plugins/salesforce/help.md index 8d0c4502cb..45fcf28bd0 100644 --- a/plugins/salesforce/help.md +++ b/plugins/salesforce/help.md @@ -531,7 +531,7 @@ _This plugin does not contain any troubleshooting information._ # Version History -* 2.0.3 - Implemented token auto-refresh on expiration for continuous sessions +* 2.0.3 - Implemented token auto-refresh on expiration for continuous sessions | Task Monitor Users: add flag `remove_duplicates` for duplicated events * 2.0.2 - Task Monitor Users: query improvement | Handle exception related with grant type * 2.0.1 - Add extra logs register * 2.0.0 - Code refactor | Update plugin to be cloud enabled | Add new task Monitor Users diff --git a/plugins/salesforce/komand_salesforce/tasks/monitor_users/task.py b/plugins/salesforce/komand_salesforce/tasks/monitor_users/task.py index 85a0eb045c..e139539d49 100755 --- a/plugins/salesforce/komand_salesforce/tasks/monitor_users/task.py +++ b/plugins/salesforce/komand_salesforce/tasks/monitor_users/task.py @@ -7,6 +7,8 @@ from komand_salesforce.util.exceptions import ApiException from komand_salesforce.util.helpers import clean, convert_to_camel_case +from ...util.event import UserEvent + class MonitorUsers(insightconnect_plugin_runtime.Task): USER_LOGIN_QUERY = "SELECT LoginTime, UserId, LoginType, LoginUrl, SourceIp, Status, Application, Browser FROM LoginHistory WHERE LoginTime >= {start_timestamp} AND LoginTime < {end_timestamp}" @@ -19,6 +21,7 @@ class MonitorUsers(insightconnect_plugin_runtime.Task): NEXT_USER_COLLECTION_TIMESTAMP = "next_user_collection_timestamp" NEXT_USER_LOGIN_COLLECTION_TIMESTAMP = "next_user_login_collection_timestamp" LAST_USER_LOGIN_COLLECTION_TIMESTAMP = "last_user_login_collection_timestamp" + REMOVE_DUPLICATES = "remove_duplicates" def __init__(self): super(self.__class__, self).__init__( @@ -38,6 +41,7 @@ def run(self, params={}, state={}): # noqa: C901 get_users = False get_user_login_history = False + remove_duplicates = state.pop(self.REMOVE_DUPLICATES, True) # true as a default users_next_page_id = state.get(self.USERS_NEXT_PAGE_ID) state.pop(self.USERS_NEXT_PAGE_ID, None) user_login_next_page_id = state.get(self.USER_LOGIN_NEXT_PAGE_ID) @@ -111,6 +115,7 @@ def run(self, params={}, state={}): # noqa: C901 self.UPDATED_USERS_QUERY.format(user_ids=concatenated_ids), None ).get("records", []) + self.logger.info(f"{len(updated_users)} updated users added to output") records.extend(self.add_data_type_field(updated_users, "User Update")) if get_users: @@ -119,6 +124,8 @@ def run(self, params={}, state={}): # noqa: C901 if users_next_page_id: state[self.USERS_NEXT_PAGE_ID] = users_next_page_id has_more_pages = True + + self.logger.info(f"{len(response.get('records'))} users added to output") records.extend(self.add_data_type_field(response.get("records", []), "User")) if get_user_login_history: @@ -133,13 +140,38 @@ def run(self, params={}, state={}): # noqa: C901 if user_login_next_page_id: state[self.USER_LOGIN_NEXT_PAGE_ID] = user_login_next_page_id has_more_pages = True + + self.logger.info(f"{len(response.get('records'))} users login added to output") records.extend(self.add_data_type_field(response.get("records", []), "User Login")) + + if remove_duplicates is True: + records = self.remove_duplicates(records) + + records = [record.__dict__ for record in records] + return convert_to_camel_case(clean(records)), state, has_more_pages, 200, None except ApiException as error: return [], state, False, error.status_code, error except Exception as error: return [], state, False, 500, PluginException(preset=PluginException.Preset.UNKNOWN, data=error) + def remove_duplicates(self, records: list) -> list: + """ + Remove duplicate entries from the provided list of records. + + Args: + records (list): A list containing the records to be de-duplicated. + + Returns: + list: A list containing only the unique records from the input list. + """ + unique_records = list(dict.fromkeys(records)) + if len(records) != len(unique_records): + self.logger.info( + f"Removed {len(records) - len(unique_records)} duplicate from a total of {len(records)} duplicate records." + ) + return unique_records + @staticmethod def get_current_time() -> datetime: return datetime.now(timezone.utc) @@ -154,6 +186,8 @@ def convert_to_datetime(timestamp: str) -> datetime: @staticmethod def add_data_type_field(records: list, field_value: str) -> list: + event_records = [] for record in records: record["dataType"] = field_value - return records + event_records.append(UserEvent(**record)) + return event_records diff --git a/plugins/salesforce/komand_salesforce/util/event.py b/plugins/salesforce/komand_salesforce/util/event.py new file mode 100644 index 0000000000..80f366a5b1 --- /dev/null +++ b/plugins/salesforce/komand_salesforce/util/event.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass, fields + +from typing import Optional + + +@dataclass(frozen=True, eq=True) +class UserEvent: + attributes: dict + dataType: str + Id: Optional[str] = None + FirstName: Optional[str] = None + LastName: Optional[str] = None + Email: Optional[str] = None + Alias: Optional[str] = None + IsActive: Optional[bool] = None + LoginTime: Optional[str] = None + UserId: Optional[str] = None + LoginType: Optional[str] = None + LoginUrl: Optional[str] = None + SourceIp: Optional[str] = None + Status: Optional[str] = None + Application: Optional[str] = None + Browser: Optional[str] = None + + def __hash__(self): + exclude_fields = ["attributes"] + return hash(tuple(getattr(self, field.name) for field in fields(self) if field.name not in exclude_fields))