From 97b9f6641d7439089c4ad95c3785be186e0af245 Mon Sep 17 00:00:00 2001 From: aloftus23 Date: Fri, 23 Dec 2022 16:03:10 -0500 Subject: [PATCH] fix cybersixgill mentions api call Fix the mentions api call to collect large numbers in different segments --- src/pe_source/data/sixgill/api.py | 4 +- src/pe_source/data/sixgill/source.py | 96 ++++++++++++++++++++-------- 2 files changed, 70 insertions(+), 30 deletions(-) diff --git a/src/pe_source/data/sixgill/api.py b/src/pe_source/data/sixgill/api.py index 8b2cc0f4..b46319a4 100644 --- a/src/pe_source/data/sixgill/api.py +++ b/src/pe_source/data/sixgill/api.py @@ -36,10 +36,9 @@ def org_assets(org_id): return resp -def intel_post(query, frm, scroll, result_size): +def intel_post(auth, query, frm, scroll, result_size): """Get intel items - advanced variation.""" url = "https://api.cybersixgill.com/intel/intel_items" - auth = cybersix_token() headers = { "Content-Type": "application/json", "Cache-Control": "no-cache", @@ -60,7 +59,6 @@ def intel_post(query, frm, scroll, result_size): resp = requests.post(url, headers=headers, json=payload).json() return resp - def alerts_list(organization_id, fetch_size, offset): """Get actionable alerts by ID using organization_id with optional filters.""" url = "https://api.cybersixgill.com/alerts/actionable-alert" diff --git a/src/pe_source/data/sixgill/source.py b/src/pe_source/data/sixgill/source.py index 8f4200e9..db3b539e 100644 --- a/src/pe_source/data/sixgill/source.py +++ b/src/pe_source/data/sixgill/source.py @@ -1,7 +1,8 @@ """Scripts for importing Sixgill data into PE Postgres database.""" # Standard Python Libraries - +import logging +import time # Third-Party Libraries import pandas as pd @@ -9,6 +10,7 @@ # cisagov Libraries from pe_reports import app +from pe_source.data.pe_db.config import cybersix_token from .api import ( alerts_count, @@ -40,6 +42,9 @@ def root_domains(org_id): def mentions(date, aliases): """Pull dark web mentions data for an organization.""" + token = cybersix_token() + + # Build the query using the org's aliases mentions = "" for mention in aliases: mentions += '"' + mention + '"' + "," @@ -47,42 +52,79 @@ def mentions(date, aliases): query = "site:forum_* AND date:" + date + " AND " + "(" + str(mentions) + ")" LOGGER.info("Query:") LOGGER.info(query) + + # Get the total number of mentions count = 1 while count < 7: try: - LOGGER.info("Intel post try #%s", count) - resp = intel_post(query, frm=0, scroll=False, result_size=1) + LOGGER.info("Total mentions try #%s", count) + resp = intel_post(token, query, frm=0, scroll=False, result_size=1) break except Exception: - LOGGER.info("Error. Trying intel_post again...") + LOGGER.info("Error. Trying to get mentions count again...") count += 1 continue - count_total = resp["total_intel_items"] - LOGGER.info("Total Mentions: %s", count_total) + total_mentions = resp["total_intel_items"] + LOGGER.info("Total Mentions: %s", total_mentions) + # Fetch mentions in segments + # Recommended segment is 50. The maximum is 400. i = 0 + segment_size = 100 + smaller_segment_count = 1 all_mentions = [] - if count_total < 10000: - while i < count_total: - # Recommended "from" and "result_size" is 50. The maximum is 400. - resp = intel_post(query, frm=i, scroll=False, result_size=200) - i += 200 - LOGGER.info("Getting %s of %s....", i, count_total) - intel_items = resp["intel_items"] - df_mentions = pd.DataFrame.from_dict(intel_items) - all_mentions.append(df_mentions) - df_all_mentions = pd.concat(all_mentions).reset_index(drop=True) - else: - while i < count_total: - # Recommended "from" and "result_size" is 50. The maximum is 400. - resp = intel_post(query, frm=i, scroll=True, result_size=400) - i += 400 - LOGGER.info("Getting %s of %s....", i, count_total) - intel_items = resp["intel_items"] - df_mentions = pd.DataFrame.from_dict(intel_items) - all_mentions.append(df_mentions) - df_all_mentions = pd.concat(all_mentions).reset_index(drop=True) - + while i < total_mentions: + # Try to get a mentions segment 3 times + try_count = 1 + while try_count < 4: + try: + # If segment size was decreased, only use for 10 iterations + if smaller_segment_count == 10: + LOGGER.info("Switching back to a segment size of 100.") + segment_size = 100 + smaller_segment_count = 1 + if segment_size <= 10: + smaller_segment_count += 1 + # API post + resp = intel_post( + token, query, frm=i, scroll=False, result_size=segment_size + ) + i += segment_size + LOGGER.info( + "Got %s-%s of %s...", + i - segment_size, + i, + total_mentions, + ) + intel_items = resp["intel_items"] + df_mentions = pd.DataFrame.from_dict(intel_items) + all_mentions.append(df_mentions) + df_all_mentions = pd.concat(all_mentions).reset_index(drop=True) + break + except Exception: + # Sleep for 2 seconds + time.sleep(2) + # If the API post failed 3 times + if try_count == 3: + # If a segment was already decreased to 1, skip the mention + if segment_size == 1: + LOGGER.critical("Failed 3 times fetching 1 post. Skipping it.") + i += segment_size + break + # Decrease the segment to 10, then if still failing, to 1 + if segment_size == 10: + segment_size = 1 + smaller_segment_count = 1 + else: + segment_size = 10 + LOGGER.error( + "Failed 3 times. Switching to a segment size of %s", + segment_size, + ) + try_count = 1 + continue + LOGGER.error("Try %s/3 failed.", try_count) + try_count += 1 return df_all_mentions