Skip to content
This repository has been archived by the owner on Feb 14, 2024. It is now read-only.

Commit

Permalink
Optimize intelx code
Browse files Browse the repository at this point in the history
  • Loading branch information
Anna committed Jul 7, 2023
1 parent 1475518 commit 169a45f
Showing 1 changed file with 22 additions and 36 deletions.
58 changes: 22 additions & 36 deletions src/pe_source/intelx_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ def find_credential_leaks(self, domain_list, start_date, end_date):
"""Find leaks for a domain between two dates."""
all_results_list = []
for domain in domain_list:
LOGGER.info("Finding credentials leaked associated with %s", domain)
if not domain:
continue
response = self.query_identity_api(domain, start_date, end_date)
if not response:
continue
Expand All @@ -160,45 +161,30 @@ def find_credential_leaks(self, domain_list, start_date, end_date):
results = self.get_search_results(search_id)
if not results:
break
if results["status"] == 0:
current_results = results["records"]
if current_results:
# Add the root_domain to each result object
LOGGER.info(
"IntelX returned %s more credentials for %s",
len(current_results),
domain,
)
result = [
dict(item, **{"root_domain": domain})
for item in current_results
]
all_results_list = all_results_list + result
status = results["status"]
current_results = results.get("records", [])
if current_results:
# Add the root_domain to each result object
LOGGER.info(
f"Intelx returned {len(current_results)} more credentials for {domain}"
)
result = [
{**item, "root_domain": domain}
for item in current_results
]
all_results_list.extend(result)
if status == 0:
time.sleep(3)
# If still waiting on new results wait
elif results["status"] == 1:
LOGGER.info("IntelX still searching for more credentials")
time.sleep(7)
# if status is two collect the last remaining values and exit loop
elif results["status"] == 2:
current_results = results["records"]
if current_results:
# Add the root_domain to each result object
LOGGER.info(
"IntelX returned %s more credentials for %s",
len(current_results),
domain,
)
result = [
dict(item, **{"root_domain": domain})
for item in current_results
]
all_results_list = all_results_list + result
elif status == 1:
LOGGER.info("Intelx still searching for more credentials")
time.sleep(5)
elif status == 2:
break
elif results["status"] == 3:
elif status == 3:
LOGGER.error("Search id not found")
break
LOGGER.info("Identified %s credential leak combos.", len(all_results_list))

LOGGER.info(f"Identified {len(all_results_list)} credential leak combos.")
return all_results_list

def process_leaks_results(self, leaks_json, org_uid):
Expand Down

0 comments on commit 169a45f

Please sign in to comment.