diff --git a/jupiterone/client.py b/jupiterone/client.py index 5977e46..bb4039f 100644 --- a/jupiterone/client.py +++ b/jupiterone/client.py @@ -1,4 +1,5 @@ """ Python SDK for JupiterOne GraphQL API """ + # pylint: disable=W0212,no-name-in-module # see https://github.com/PyCQA/pylint/issues/409 @@ -28,7 +29,7 @@ DELETE_ENTITY, UPDATE_ENTITY, CREATE_RELATIONSHIP, - UPDATE_RELATIONSHIP, + UPDATE_RELATIONSHIPV2, DELETE_RELATIONSHIP, CURSOR_QUERY_V1, CREATE_INSTANCE, @@ -64,6 +65,7 @@ def retry_on_429(exc): """Used to trigger retry on rate limit""" return isinstance(exc, JupiterOneApiRetryError) + class JupiterOneClient: """Python client class for the JupiterOne GraphQL API""" @@ -79,7 +81,13 @@ class JupiterOneClient: "retry_on_exception": retry_on_429, } - def __init__(self, account: str = None, token: str = None, url: str = DEFAULT_URL, sync_url: str = SYNC_API_URL): + def __init__( + self, + account: str = None, + token: str = None, + url: str = DEFAULT_URL, + sync_url: str = SYNC_API_URL, + ): self.account = account self.token = token self.graphql_url = url @@ -87,7 +95,7 @@ def __init__(self, account: str = None, token: str = None, url: str = DEFAULT_UR self.headers = { "Authorization": "Bearer {}".format(self.token), "JupiterOne-Account": self.account, - "Content-Type": "application/json" + "Content-Type": "application/json", } @property @@ -129,11 +137,9 @@ def _execute_query(self, query: str, variables: Dict = None) -> Dict: # initiate requests session and implement retry logic of 5 request retries with 1 second between s = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) - s.mount('https://', HTTPAdapter(max_retries=retries)) + s.mount("https://", HTTPAdapter(max_retries=retries)) - response = s.post( - self.graphql_url, headers=self.headers, json=data, timeout=60 - ) + response = s.post(self.graphql_url, headers=self.headers, json=data, timeout=60) # It is still unclear if all responses will have a status # code of 200 or if 429 will eventually be used to @@ -267,7 +273,7 @@ def _execute_syncapi_request(self, endpoint: str, payload: Dict = None) -> Dict: # initiate requests session and implement retry logic of 5 request retries with 1 second between s = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) - s.mount('https://', HTTPAdapter(max_retries=retries)) + s.mount("https://", HTTPAdapter(max_retries=retries)) response = s.post( self.sync_url + endpoint, headers=self.headers, json=payload, timeout=60 @@ -429,16 +435,16 @@ def update_relationship(self, **kwargs) -> Dict: now_dt = datetime.now() variables = { - "relationshipId": kwargs.pop("relationship_id"), - "timestamp": datetime.strptime(str(now_dt), "%Y-%m-%d %H:%M:%S.%f").timestamp() + "relationship": {"_id": kwargs.pop("relationship_id")}, + "timestamp": int(datetime.now().timestamp() * 1000), } properties = kwargs.pop("properties", None) if properties: - variables["properties"] = properties + variables["relationship"].update(properties) - response = self._execute_query(query=UPDATE_RELATIONSHIP, variables=variables) - return response["data"]["updateRelationship"] + response = self._execute_query(query=UPDATE_RELATIONSHIPV2, variables=variables) + return response["data"]["updateRelationshipV2"] def delete_relationship(self, relationship_id: str = None): """Deletes a relationship between two entities. @@ -451,10 +457,12 @@ def delete_relationship(self, relationship_id: str = None): response = self._execute_query(DELETE_RELATIONSHIP, variables=variables) return response["data"]["deleteRelationship"] - def create_integration_instance(self, - instance_name: str = None, - instance_description: str = None, - integration_definition_id: str = "8013680b-311a-4c2e-b53b-c8735fd97a5c"): + def create_integration_instance( + self, + instance_name: str = None, + instance_description: str = None, + integration_definition_id: str = "8013680b-311a-4c2e-b53b-c8735fd97a5c", + ): """Creates a new Custom Integration Instance. args: @@ -464,73 +472,64 @@ def create_integration_instance(self, if no parameter is passed, then the Custom Integration definition ID will be used. """ variables = { - "instance": { - "name": instance_name, - "description": instance_description, - "integrationDefinitionId": integration_definition_id, - "pollingInterval": "DISABLED", - "config": { - "@tag": { - "Production": False, - "AccountName": True - } - }, - "pollingIntervalCronExpression": {}, - "ingestionSourcesOverrides": [] - } + "instance": { + "name": instance_name, + "description": instance_description, + "integrationDefinitionId": integration_definition_id, + "pollingInterval": "DISABLED", + "config": {"@tag": {"Production": False, "AccountName": True}}, + "pollingIntervalCronExpression": {}, + "ingestionSourcesOverrides": [], + } } response = self._execute_query(CREATE_INSTANCE, variables=variables) - return response['data']['createIntegrationInstance'] + return response["data"]["createIntegrationInstance"] def fetch_all_entity_properties(self): - """Fetch list of aggregated property keys from all entities in the graph. - - """ + """Fetch list of aggregated property keys from all entities in the graph.""" response = self._execute_query(query=ALL_PROPERTIES) return_list = [] - for i in response['data']['getAllAssetProperties']: + for i in response["data"]["getAllAssetProperties"]: - if i.startswith(('parameter.', 'tag.')) == False: + if i.startswith(("parameter.", "tag.")) == False: return_list.append(i) return return_list def fetch_all_entity_tags(self): - """Fetch list of aggregated property keys from all entities in the graph. - - """ + """Fetch list of aggregated property keys from all entities in the graph.""" response = self._execute_query(query=ALL_PROPERTIES) return_list = [] - for i in response['data']['getAllAssetProperties']: + for i in response["data"]["getAllAssetProperties"]: - if i.startswith(('tag.')) == True: + if i.startswith(("tag.")) == True: return_list.append(i) return return_list def fetch_entity_raw_data(self, entity_id: str = None): - """Fetch the contents of raw data for a given entity in a J1 Account. - - """ - variables = { - "entityId": entity_id, - "source": "integration-managed" - } + """Fetch the contents of raw data for a given entity in a J1 Account.""" + variables = {"entityId": entity_id, "source": "integration-managed"} response = self._execute_query(query=GET_ENTITY_RAW_DATA, variables=variables) return response - - def start_sync_job(self, instance_id: str = None, sync_mode: str = None, source: str = None,): + + def start_sync_job( + self, + instance_id: str = None, + sync_mode: str = None, + source: str = None, + ): """Start a synchronization job. args: @@ -541,16 +540,18 @@ def start_sync_job(self, instance_id: str = None, sync_mode: str = None, source: endpoint = "/persister/synchronization/jobs" data = { - "source": source, - "integrationInstanceId": instance_id, - "syncMode": sync_mode - } + "source": source, + "integrationInstanceId": instance_id, + "syncMode": sync_mode, + } response = self._execute_syncapi_request(endpoint=endpoint, payload=data) return response - def upload_entities_batch_json(self, instance_job_id: str = None, entities_list: list = None): + def upload_entities_batch_json( + self, instance_job_id: str = None, entities_list: list = None + ): """Upload batch of entities. args: @@ -559,15 +560,15 @@ def upload_entities_batch_json(self, instance_job_id: str = None, entities_list: """ endpoint = f"/persister/synchronization/jobs/{instance_job_id}/entities" - data = { - "entities": entities_list - } + data = {"entities": entities_list} response = self._execute_syncapi_request(endpoint=endpoint, payload=data) return response - def upload_relationships_batch_json(self, instance_job_id: str = None, relationships_list: list = None): + def upload_relationships_batch_json( + self, instance_job_id: str = None, relationships_list: list = None + ): """Upload batch of relationships. args: @@ -576,15 +577,15 @@ def upload_relationships_batch_json(self, instance_job_id: str = None, relations """ endpoint = f"/persister/synchronization/jobs/{instance_job_id}/relationships" - data = { - "relationships": relationships_list - } + data = {"relationships": relationships_list} response = self._execute_syncapi_request(endpoint=endpoint, payload=data) return response - def upload_combined_batch_json(self, instance_job_id: str = None, combined_payload: Dict = None): + def upload_combined_batch_json( + self, instance_job_id: str = None, combined_payload: Dict = None + ): """Upload batch of entities and relationships together. args: @@ -593,11 +594,15 @@ def upload_combined_batch_json(self, instance_job_id: str = None, combined_paylo """ endpoint = f"/persister/synchronization/jobs/{instance_job_id}/upload" - response = self._execute_syncapi_request(endpoint=endpoint, payload=combined_payload) + response = self._execute_syncapi_request( + endpoint=endpoint, payload=combined_payload + ) return response - def bulk_delete_entities(self, instance_job_id: str = None, entities_list: list = None): + def bulk_delete_entities( + self, instance_job_id: str = None, entities_list: list = None + ): """Send a request to bulk delete existing entities. args: @@ -606,9 +611,7 @@ def bulk_delete_entities(self, instance_job_id: str = None, entities_list: list """ endpoint = f"/persister/synchronization/jobs/{instance_job_id}/upload" - data = { - "deleteEntities": entities_list - } + data = {"deleteEntities": entities_list} response = self._execute_syncapi_request(endpoint=endpoint, payload=data) @@ -634,16 +637,15 @@ def fetch_integration_jobs(self, instance_id: str = None): args: instance_id (str): The "integrationInstanceId" of the integration to fetch jobs from. """ - variables = { - "integrationInstanceId": instance_id, - "size": 100 - } + variables = {"integrationInstanceId": instance_id, "size": 100} response = self._execute_query(INTEGRATION_JOB_VALUES, variables=variables) - return response['data']['integrationJobs'] + return response["data"]["integrationJobs"] - def fetch_integration_job_events(self, instance_id: str = None, instance_job_id: str = None): + def fetch_integration_job_events( + self, instance_id: str = None, instance_job_id: str = None + ): """Fetch events within an integration job run. args: @@ -651,101 +653,96 @@ def fetch_integration_job_events(self, instance_id: str = None, instance_job_id: instance_job_id (str): The integration Job ID of the integration to fetch job events from. """ variables = { - "integrationInstanceId": instance_id, - "jobId": instance_job_id, - "size": 1000 + "integrationInstanceId": instance_id, + "jobId": instance_job_id, + "size": 1000, } - response = self._execute_query(INTEGRATION_INSTANCE_EVENT_VALUES, variables=variables) + response = self._execute_query( + INTEGRATION_INSTANCE_EVENT_VALUES, variables=variables + ) - return response['data']['integrationEvents'] + return response["data"]["integrationEvents"] def get_integration_definition_details(self, integration_type: str = None): - """Fetch the Integration Definition Details for a given integration type. - - """ - variables = { - "integrationType": integration_type, - "includeConfig": True - } + """Fetch the Integration Definition Details for a given integration type.""" + variables = {"integrationType": integration_type, "includeConfig": True} response = self._execute_query(FIND_INTEGRATION_DEFINITION, variables=variables) return response def fetch_integration_instances(self, definition_id: str = None): - """Fetch all configured Instances for a given integration type. - - """ - variables = { - "definitionId": definition_id, - "limit": 100 - } + """Fetch all configured Instances for a given integration type.""" + variables = {"definitionId": definition_id, "limit": 100} response = self._execute_query(INTEGRATION_INSTANCES, variables=variables) return response def get_integration_instance_details(self, instance_id: str = None): - """Fetch configuration details for a single configured Integration Instance. - - """ - variables = { - "integrationInstanceId": instance_id - } + """Fetch configuration details for a single configured Integration Instance.""" + variables = {"integrationInstanceId": instance_id} response = self._execute_query(INTEGRATION_INSTANCE, variables=variables) return response - def update_integration_instance_config_value(self, - instance_id: str = None, - config_key: str = None, - config_value: str = None): - """Update a single config k:v pair existing on a configured Integration Instance. - - """ + def update_integration_instance_config_value( + self, instance_id: str = None, config_key: str = None, config_value: str = None + ): + """Update a single config k:v pair existing on a configured Integration Instance.""" - # fetch existing instnace configuration + # fetch existing instnace configuration instance_config = self.get_integration_instance_details(instance_id=instance_id) - config_dict = instance_config['data']['integrationInstance']['config'] + config_dict = instance_config["data"]["integrationInstance"]["config"] if str(config_dict.get(config_key, "Not Found")) != "Not Found": # update config key value with new provided value config_dict[config_key] = config_value - instance_config['data']['integrationInstance']['config'] = config_dict + instance_config["data"]["integrationInstance"]["config"] = config_dict # remove externalId to not include in update payload - del instance_config['data']['integrationInstance']['config']['externalId'] + del instance_config["data"]["integrationInstance"]["config"]["externalId"] # prepare variables GraphQL payload for updating config - instance_details = instance_config['data']['integrationInstance'] + instance_details = instance_config["data"]["integrationInstance"] variables = { - "id": instance_details['id'], + "id": instance_details["id"], "update": { - "pollingInterval": instance_details['pollingInterval'], - "config": instance_details['config'], - "description": instance_details['description'], - "name": instance_details['name'], - "collectorPoolId": instance_details['collectorPoolId'], - "pollingIntervalCronExpression": instance_details['pollingIntervalCronExpression'], - "ingestionSourcesOverrides": instance_details['ingestionSourcesOverrides'] - } + "pollingInterval": instance_details["pollingInterval"], + "config": instance_details["config"], + "description": instance_details["description"], + "name": instance_details["name"], + "collectorPoolId": instance_details["collectorPoolId"], + "pollingIntervalCronExpression": instance_details[ + "pollingIntervalCronExpression" + ], + "ingestionSourcesOverrides": instance_details[ + "ingestionSourcesOverrides" + ], + }, } # remove problem fields from previous response - del variables['update']['pollingIntervalCronExpression']['__typename'] + del variables["update"]["pollingIntervalCronExpression"]["__typename"] - for ingestion_source in instance_details['ingestionSourcesOverrides']: - ingestion_source.pop("__typename", None) # Removes key if it exists, ignores if not + for ingestion_source in instance_details["ingestionSourcesOverrides"]: + ingestion_source.pop( + "__typename", None + ) # Removes key if it exists, ignores if not - response = self._execute_query(UPDATE_INTEGRATION_INSTANCE, variables=variables) + response = self._execute_query( + UPDATE_INTEGRATION_INSTANCE, variables=variables + ) return response else: return "Provided 'config_key' not found in existing Integration Instance config" - def create_smartclass(self, smartclass_name: str = None, smartclass_description: str = None): + def create_smartclass( + self, smartclass_name: str = None, smartclass_description: str = None + ): """Creates a new Smart Class within Assets. args: @@ -753,17 +750,19 @@ def create_smartclass(self, smartclass_name: str = None, smartclass_description: smartclass_description (str): The "Description" for Smart Class to be created. """ variables = { - "input": { - "tagName": smartclass_name, - "description": smartclass_description - } + "input": {"tagName": smartclass_name, "description": smartclass_description} } response = self._execute_query(CREATE_SMARTCLASS, variables=variables) - return response['data']['createSmartClass'] + return response["data"]["createSmartClass"] - def create_smartclass_query(self, smartclass_id: str = None, query: str = None, query_description: str = None): + def create_smartclass_query( + self, + smartclass_id: str = None, + query: str = None, + query_description: str = None, + ): """Creates a new J1QL Query within a defined Smart Class. args: @@ -772,16 +771,16 @@ def create_smartclass_query(self, smartclass_id: str = None, query: str = None, query_description (str): The description of the query being created. """ variables = { - "input": { - "smartClassId": smartclass_id, - "query": query, - "description": query_description - } + "input": { + "smartClassId": smartclass_id, + "query": query, + "description": query_description, + } } response = self._execute_query(CREATE_SMARTCLASS_QUERY, variables=variables) - return response['data']['createSmartClassQuery'] + return response["data"]["createSmartClassQuery"] def evaluate_smartclass(self, smartclass_id: str = None): """Execute an on-demand Evaluation of a defined Smartclass. @@ -789,13 +788,11 @@ def evaluate_smartclass(self, smartclass_id: str = None): args: smartclass_id (str): The unique ID of the Smart Class to trigger the evaluation for. """ - variables = { - "smartClassId": smartclass_id - } + variables = {"smartClassId": smartclass_id} response = self._execute_query(EVALUATE_SMARTCLASS, variables=variables) - return response['data']['evaluateSmartClassRule'] + return response["data"]["evaluateSmartClassRule"] def get_smartclass_details(self, smartclass_id: str = None): """Fetch config details from defined Smart Class. @@ -803,13 +800,11 @@ def get_smartclass_details(self, smartclass_id: str = None): args: smartclass_id (str): The unique ID of the Smart Class to fetch details from. """ - variables = { - "id": smartclass_id - } + variables = {"id": smartclass_id} response = self._execute_query(GET_SMARTCLASS_DETAILS, variables=variables) - return response['data']['smartClass'] + return response["data"]["smartClass"] def generate_j1ql(self, natural_language_prompt: str = None): """Generate J1QL query syntax from natural language user input. @@ -817,246 +812,213 @@ def generate_j1ql(self, natural_language_prompt: str = None): args: natural_language_prompt (str): The naturalLanguageQuery prompt input to generate J1QL from. """ - variables = { - "input": { - "naturalLanguageQuery": natural_language_prompt - } - } + variables = {"input": {"naturalLanguageQuery": natural_language_prompt}} response = self._execute_query(J1QL_FROM_NATURAL_LANGUAGE, variables=variables) - return response['data']['j1qlFromNaturalLanguage'] + return response["data"]["j1qlFromNaturalLanguage"] def list_alert_rules(self): - """List all defined Alert Rules configured in J1 account - - """ + """List all defined Alert Rules configured in J1 account""" results = [] - data = { - "query": LIST_RULE_INSTANCES, - "flags": { - "variableResultSize": True - } - } + data = {"query": LIST_RULE_INSTANCES, "flags": {"variableResultSize": True}} - r = requests.post(url=self.graphql_url, headers=self.headers, json=data, verify=True).json() - results.extend(r['data']['listRuleInstances']['questionInstances']) + r = requests.post( + url=self.graphql_url, headers=self.headers, json=data, verify=True + ).json() + results.extend(r["data"]["listRuleInstances"]["questionInstances"]) - while r['data']['listRuleInstances']['pageInfo']['hasNextPage'] == True: + while r["data"]["listRuleInstances"]["pageInfo"]["hasNextPage"] == True: - cursor = r['data']['listRuleInstances']['pageInfo']['endCursor'] + cursor = r["data"]["listRuleInstances"]["pageInfo"]["endCursor"] # cursor query until last page fetched data = { "query": LIST_RULE_INSTANCES, - "variables": { - "cursor": cursor - }, - "flags":{ - "variableResultSize": True - } + "variables": {"cursor": cursor}, + "flags": {"variableResultSize": True}, } - r = requests.post(url=self.graphql_url, headers=self.headers, json=data, verify=True).json() - results.extend(r['data']['listRuleInstances']['questionInstances']) + r = requests.post( + url=self.graphql_url, headers=self.headers, json=data, verify=True + ).json() + results.extend(r["data"]["listRuleInstances"]["questionInstances"]) return results def get_alert_rule_details(self, rule_id: str = None): - """Get details of a single defined Alert Rule configured in J1 account - - """ + """Get details of a single defined Alert Rule configured in J1 account""" results = [] - data = { - "query": LIST_RULE_INSTANCES, - "flags": { - "variableResultSize": True - } - } + data = {"query": LIST_RULE_INSTANCES, "flags": {"variableResultSize": True}} - r = requests.post(url=self.graphql_url, headers=self.headers, json=data, verify=True).json() - results.extend(r['data']['listRuleInstances']['questionInstances']) + r = requests.post( + url=self.graphql_url, headers=self.headers, json=data, verify=True + ).json() + results.extend(r["data"]["listRuleInstances"]["questionInstances"]) - while r['data']['listRuleInstances']['pageInfo']['hasNextPage'] == True: + while r["data"]["listRuleInstances"]["pageInfo"]["hasNextPage"] == True: - cursor = r['data']['listRuleInstances']['pageInfo']['endCursor'] + cursor = r["data"]["listRuleInstances"]["pageInfo"]["endCursor"] # cursor query until last page fetched data = { "query": LIST_RULE_INSTANCES, - "variables": { - "cursor": cursor - }, - "flags":{ - "variableResultSize": True - } + "variables": {"cursor": cursor}, + "flags": {"variableResultSize": True}, } - r = requests.post(url=self.graphql_url, headers=self.headers, json=data, verify=True).json() - results.extend(r['data']['listRuleInstances']['questionInstances']) - + r = requests.post( + url=self.graphql_url, headers=self.headers, json=data, verify=True + ).json() + results.extend(r["data"]["listRuleInstances"]["questionInstances"]) + # pick result out of list of results by 'id' key - item = next((item for item in results if item['id'] == rule_id), None) + item = next((item for item in results if item["id"] == rule_id), None) if item: return item else: - return 'Alert Rule not found for provided ID in configured J1 Account' - - def create_alert_rule(self, - name: str = None, - description: str = None, - tags: List[str] = None, - polling_interval: str = None, - severity: str = None, - j1ql: str = None, - action_configs: Dict = None): - """Create Alert Rule Configuration in J1 account + return "Alert Rule not found for provided ID in configured J1 Account" - """ + def create_alert_rule( + self, + name: str = None, + description: str = None, + tags: List[str] = None, + polling_interval: str = None, + severity: str = None, + j1ql: str = None, + action_configs: Dict = None, + ): + """Create Alert Rule Configuration in J1 account""" variables = { - "instance": { - "name": name, - "description": description, - "notifyOnFailure": True, - "triggerActionsOnNewEntitiesOnly": True, - "ignorePreviousResults": False, - "operations": [ - { - "when": { - "type": "FILTER", - "condition": [ - "AND", - [ - "queries.query0.total", - ">", - 0 + "instance": { + "name": name, + "description": description, + "notifyOnFailure": True, + "triggerActionsOnNewEntitiesOnly": True, + "ignorePreviousResults": False, + "operations": [ + { + "when": { + "type": "FILTER", + "condition": ["AND", ["queries.query0.total", ">", 0]], + }, + "actions": [ + { + "type": "SET_PROPERTY", + "targetProperty": "alertLevel", + "targetValue": severity, + }, + {"type": "CREATE_ALERT"}, + ], + } + ], + "outputs": ["alertLevel"], + "pollingInterval": polling_interval, + "question": { + "queries": [ + { + "query": j1ql, + "name": "query0", + "version": "v1", + "includeDeleted": False, + } ] - ] }, - "actions": [ - { - "type": "SET_PROPERTY", - "targetProperty": "alertLevel", - "targetValue": severity - }, - { - "type": "CREATE_ALERT" - } - ] - } - ], - "outputs": [ - "alertLevel" - ], - "pollingInterval": polling_interval, - "question": { - "queries": [ - { - "query": j1ql, - "name": "query0", - "version": "v1", - "includeDeleted": False - } - ] - }, - "specVersion": 1, - "tags": tags, - "templates": {} - } + "specVersion": 1, + "tags": tags, + "templates": {}, + } } if action_configs: - variables['instance']['operations'][0]['actions'].append(action_configs) + variables["instance"]["operations"][0]["actions"].append(action_configs) print(variables) response = self._execute_query(CREATE_RULE_INSTANCE, variables=variables) - return response['data']['createInlineQuestionRuleInstance'] + return response["data"]["createInlineQuestionRuleInstance"] def delete_alert_rule(self, rule_id: str = None): - """Delete a single Alert Rule configured in J1 account - - """ - variables = { - "id": rule_id - } + """Delete a single Alert Rule configured in J1 account""" + variables = {"id": rule_id} response = self._execute_query(DELETE_RULE_INSTANCE, variables=variables) - return response['data']['deleteRuleInstance'] - - def update_alert_rule(self, - rule_id: str = None, - name: str = None, - description: str = None, - j1ql: str = None, - polling_interval: str = None, - severity: str = None, - tags: List[str] = None, - tag_op: str = None, - action_configs: List[dict] = None, - action_configs_op: str = None): - """Update Alert Rule Configuration in J1 account + return response["data"]["deleteRuleInstance"] - """ + def update_alert_rule( + self, + rule_id: str = None, + name: str = None, + description: str = None, + j1ql: str = None, + polling_interval: str = None, + severity: str = None, + tags: List[str] = None, + tag_op: str = None, + action_configs: List[dict] = None, + action_configs_op: str = None, + ): + """Update Alert Rule Configuration in J1 account""" # fetch existing alert rule alert_rule_config = self.get_alert_rule_details(rule_id) # increment rule config version - rule_version = alert_rule_config['version'] + 1 + rule_version = alert_rule_config["version"] + 1 # fetch current operations config - operations = alert_rule_config['operations'] - del operations[0]['__typename'] + operations = alert_rule_config["operations"] + del operations[0]["__typename"] # update name if provided if name is not None: alert_name = name else: - alert_name = alert_rule_config['name'] + alert_name = alert_rule_config["name"] # update description if provided if description is not None: alert_description = description else: - alert_description = alert_rule_config['description'] + alert_description = alert_rule_config["description"] # update J1QL query if provided if j1ql is not None: - question_config = alert_rule_config['question'] + question_config = alert_rule_config["question"] # remove problematic fields - del question_config['__typename'] - del question_config['queries'][0]['__typename'] + del question_config["__typename"] + del question_config["queries"][0]["__typename"] # update query string if provided - question_config['queries'][0]['query'] = j1ql + question_config["queries"][0]["query"] = j1ql else: - question_config = alert_rule_config['question'] + question_config = alert_rule_config["question"] # remove problematic fields - del question_config['__typename'] - del question_config['queries'][0]['__typename'] + del question_config["__typename"] + del question_config["queries"][0]["__typename"] # update polling_interval if provided if polling_interval is not None: interval_config = polling_interval else: - interval_config = alert_rule_config['pollingInterval'] + interval_config = alert_rule_config["pollingInterval"] # update tags list if provided if tags is not None: if tag_op == "OVERWRITE": tags_config = tags elif tag_op == "APPEND": - tags_config = alert_rule_config['tags'] + tags + tags_config = alert_rule_config["tags"] + tags else: - tags_config = alert_rule_config['tags'] + tags_config = alert_rule_config["tags"] else: - tags_config = alert_rule_config['tags'] + tags_config = alert_rule_config["tags"] # update action_configs list if provided if action_configs is not None: @@ -1065,201 +1027,162 @@ def update_alert_rule(self, # maintain first item and build new list from input alert_action_configs = [] - base_action = alert_rule_config['operations'][0]['actions'][0] + base_action = alert_rule_config["operations"][0]["actions"][0] alert_action_configs.append(base_action) alert_action_configs.extend(action_configs) # update actions field inside operations payload - operations[0]['actions'] = alert_action_configs + operations[0]["actions"] = alert_action_configs elif action_configs_op == "APPEND": # update actions field inside operations payload - operations[0]['actions'].extend(action_configs) + operations[0]["actions"].extend(action_configs) # update alert severity if provided if severity is not None: - operations[0]['actions'][0]['targetValue'] = severity + operations[0]["actions"][0]["targetValue"] = severity variables = { - "instance": { - "id": rule_id, - "version": rule_version, - "specVersion": alert_rule_config['specVersion'], - "name": alert_name, - "description": alert_description, - "question": question_config, - "operations": operations, - "pollingInterval": interval_config, - "tags": tags_config - } + "instance": { + "id": rule_id, + "version": rule_version, + "specVersion": alert_rule_config["specVersion"], + "name": alert_name, + "description": alert_description, + "question": question_config, + "operations": operations, + "pollingInterval": interval_config, + "tags": tags_config, + } } response = self._execute_query(UPDATE_RULE_INSTANCE, variables=variables) - return response['data']['updateInlineQuestionRuleInstance'] + return response["data"]["updateInlineQuestionRuleInstance"] def evaluate_alert_rule(self, rule_id: str = None): - """Run an Evaluation for a defined Alert Rule configured in J1 account - - """ - variables = { - "id": rule_id - } + """Run an Evaluation for a defined Alert Rule configured in J1 account""" + variables = {"id": rule_id} response = self._execute_query(EVALUATE_RULE_INSTANCE, variables=variables) return response def list_alert_rule_evaluation_results(self, rule_id: str = None): - """Fetch a list of Evaluation Results for an Alert Rule configured in J1 account - - """ + """Fetch a list of Evaluation Results for an Alert Rule configured in J1 account""" variables = { "collectionType": "RULE_EVALUATION", "collectionOwnerId": rule_id, "beginTimestamp": 0, "endTimestamp": round(time.time() * 1000), - "limit": 40 + "limit": 40, } response = self._execute_query(LIST_COLLECTION_RESULTS, variables=variables) return response def fetch_evaluation_result_download_url(self, raw_data_key: str = None): - """Fetch evaluation result Download URL for Alert Rule configured in J1 account - - """ - variables = { - "rawDataKey": raw_data_key - } + """Fetch evaluation result Download URL for Alert Rule configured in J1 account""" + variables = {"rawDataKey": raw_data_key} response = self._execute_query(GET_RAW_DATA_DOWNLOAD_URL, variables=variables) return response def fetch_downloaded_evaluation_results(self, download_url: str = None): - """Return full Alert Rule J1QL results from Download URL - - """ + """Return full Alert Rule J1QL results from Download URL""" # initiate requests session and implement retry logic of 5 request retries with 1 second between s = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) - s.mount('https://', HTTPAdapter(max_retries=retries)) - + s.mount("https://", HTTPAdapter(max_retries=retries)) + try: - response = s.get( - download_url, timeout=60 - ) - + response = s.get(download_url, timeout=60) + return response.json() - + except Exception as e: return e def list_questions(self): - """List all defined Questions configured in J1 account Questions Library - - """ + """List all defined Questions configured in J1 account Questions Library""" results = [] - data = { - "query": QUESTIONS, - "flags": { - "variableResultSize": True - } - } + data = {"query": QUESTIONS, "flags": {"variableResultSize": True}} - r = requests.post(url=self.graphql_url, headers=self.headers, json=data, verify=True).json() - results.extend(r['data']['questions']['questions']) + r = requests.post( + url=self.graphql_url, headers=self.headers, json=data, verify=True + ).json() + results.extend(r["data"]["questions"]["questions"]) - while r['data']['questions']['pageInfo']['hasNextPage'] == True: + while r["data"]["questions"]["pageInfo"]["hasNextPage"] == True: - cursor = r['data']['questions']['pageInfo']['endCursor'] + cursor = r["data"]["questions"]["pageInfo"]["endCursor"] # cursor query until last page fetched data = { "query": QUESTIONS, - "variables": { - "cursor": cursor - }, - "flags":{ - "variableResultSize": True - } + "variables": {"cursor": cursor}, + "flags": {"variableResultSize": True}, } - r = requests.post(url=self.graphql_url, headers=self.headers, json=data, verify=True).json() - results.extend(r['data']['questions']['questions']) + r = requests.post( + url=self.graphql_url, headers=self.headers, json=data, verify=True + ).json() + results.extend(r["data"]["questions"]["questions"]) return results def get_compliance_framework_item_details(self, item_id: str = None): - """Fetch Details of a Compliance Framework Requirement configured in J1 account - - """ - variables = { - "input": { - "id": item_id - } - } + """Fetch Details of a Compliance Framework Requirement configured in J1 account""" + variables = {"input": {"id": item_id}} response = self._execute_query(COMPLIANCE_FRAMEWORK_ITEM, variables=variables) return response def get_parameter_details(self, name: str = None): - """Fetch Details of a configured Parameter in J1 account - - """ - variables = { - "name": name - } + """Fetch Details of a configured Parameter in J1 account""" + variables = {"name": name} response = self._execute_query(PARAMETER, variables=variables) return response def list_account_parameters(self): - """Fetch List of all configured Account Parameters in J1 account - - """ + """Fetch List of all configured Account Parameters in J1 account""" results = [] - data = { - "query": PARAMETER_LIST, - "flags": { - "variableResultSize": True - } - } + data = {"query": PARAMETER_LIST, "flags": {"variableResultSize": True}} - r = requests.post(url=self.graphql_url, headers=self.headers, json=data, verify=True).json() - results.extend(r['data']['parameterList']['items']) + r = requests.post( + url=self.graphql_url, headers=self.headers, json=data, verify=True + ).json() + results.extend(r["data"]["parameterList"]["items"]) - while r['data']['parameterList']['pageInfo']['hasNextPage'] == True: - cursor = r['data']['parameterList']['pageInfo']['endCursor'] + while r["data"]["parameterList"]["pageInfo"]["hasNextPage"] == True: + cursor = r["data"]["parameterList"]["pageInfo"]["endCursor"] # cursor query until last page fetched data = { "query": PARAMETER_LIST, - "variables": { - "cursor": cursor - }, - "flags": { - "variableResultSize": True - } + "variables": {"cursor": cursor}, + "flags": {"variableResultSize": True}, } - r = requests.post(url=self.graphql_url, headers=self.headers, json=data, verify=True).json() - results.extend(r['data']['parameterList']['items']) + r = requests.post( + url=self.graphql_url, headers=self.headers, json=data, verify=True + ).json() + results.extend(r["data"]["parameterList"]["items"]) return results - def create_update_parameter(self, name: str = None, value: Union[str, int, bool, list] = None, secret: bool = False): - """Create or Update Account Parameter in J1 account - - """ - variables = { - "name": name, - "value": value, - "secret": secret - } + def create_update_parameter( + self, + name: str = None, + value: Union[str, int, bool, list] = None, + secret: bool = False, + ): + """Create or Update Account Parameter in J1 account""" + variables = {"name": name, "value": value, "secret": secret} response = self._execute_query(UPSERT_PARAMETER, variables=variables) - return response \ No newline at end of file + return response diff --git a/jupiterone/constants.py b/jupiterone/constants.py index 2232c4a..26ac77a 100644 --- a/jupiterone/constants.py +++ b/jupiterone/constants.py @@ -87,33 +87,18 @@ } } """ -UPDATE_RELATIONSHIP = """ - mutation UpdateRelationship ( - $relationshipId: String! - $timestamp: Long - $properties: JSON - ) { - updateRelationship ( - relationshipId: $relationshipId, - timestamp: $timestamp, - properties: $properties - ) { - relationship { - _id - ... - } - edge { - id - toVertexId - fromVertexId - relationship { - _id - ... - } - properties - } - } - } +UPDATE_RELATIONSHIPV2 = """ +mutation UpdateRelationshipV2 ( + $relationship: JSON! + $timestamp: Long +) { + updateRelationshipV2 ( + relationship: $relationship, + timestamp: $timestamp, + ) { + relationship + } +} """ DELETE_RELATIONSHIP = """ mutation DeleteRelationship($relationshipId: String! $timestamp: Long) {